Skip to content

Commit

Permalink
Fix handling of IO WouldBlock error in Stratum server (#2528)
Browse files Browse the repository at this point in the history
If WouldBlock error happens Stratum server drops part of a message to
read or write. This PR inroduces a worker's buffer to store partially
read message which will be completed next time. For write the existing
util function is used.
Fixes #2524
  • Loading branch information
hashmap committed Feb 12, 2019
1 parent aa4f44b commit 65c0b1e
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions servers/src/mining/stratumserver.rs
Expand Up @@ -152,6 +152,7 @@ pub struct Worker {
stream: BufStream<TcpStream>,
error: bool,
authenticated: bool,
buffer: String,
}

impl Worker {
Expand All @@ -164,15 +165,18 @@ impl Worker {
stream: stream,
error: false,
authenticated: false,
buffer: String::with_capacity(4096),
}
}

// Get Message from the worker
fn read_message(&mut self, line: &mut String) -> Option<usize> {
fn read_message(&mut self) -> Option<String> {
// Read and return a single message or None
match self.stream.read_line(line) {
Ok(n) => {
return Some(n);
match self.stream.read_line(&mut self.buffer) {
Ok(_) => {
let res = self.buffer.clone();
self.buffer.clear();
return Some(res);
}
Err(ref e) if e.kind() == ErrorKind::WouldBlock => {
// Not an error, just no messages ready
Expand All @@ -183,6 +187,7 @@ impl Worker {
"(Server ID: {}) Error in connection with stratum client: {}",
self.id, e
);
self.buffer.clear();
self.error = true;
return None;
}
Expand All @@ -195,7 +200,11 @@ impl Worker {
if !message.ends_with("\n") {
message += "\n";
}
match self.stream.write(message.as_bytes()) {
match util::read_write::write_all(
&mut self.stream,
message.as_bytes(),
Duration::from_secs(1),
) {
Ok(_) => match self.stream.flush() {
Ok(_) => {}
Err(e) => {
Expand Down Expand Up @@ -281,10 +290,9 @@ impl StratumServer {
// Handle an RPC request message from the worker(s)
fn handle_rpc_requests(&mut self, stratum_stats: &mut Arc<RwLock<StratumStats>>) {
let mut workers_l = self.workers.lock();
let mut the_message = String::with_capacity(4096);
for num in 0..workers_l.len() {
match workers_l[num].read_message(&mut the_message) {
Some(_) => {
match workers_l[num].read_message() {
Some(the_message) => {
// Decompose the request from the JSONRpc wrapper
let request: RpcRequest = match serde_json::from_str(&the_message) {
Ok(request) => request,
Expand All @@ -297,13 +305,10 @@ impl StratumServer {
the_message.as_bytes(),
);
workers_l[num].error = true;
the_message.clear();
continue;
}
};

the_message.clear();

let mut stratum_stats = stratum_stats.write();
let worker_stats_id = match stratum_stats
.worker_stats
Expand Down

0 comments on commit 65c0b1e

Please sign in to comment.