Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
Merge pull request #1108 from madadam/handle-action-fix
Browse files Browse the repository at this point in the history
fix/states: do not terminate when sending action result fails
  • Loading branch information
afck committed Jul 27, 2016
2 parents 31e8c6a + fb22729 commit dc0a018
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 74 deletions.
31 changes: 16 additions & 15 deletions src/states/bootstrapping.rs
Expand Up @@ -79,32 +79,33 @@ impl Bootstrapping {
}

pub fn handle_action(&mut self, action: Action) -> Transition {
let result = match action {
match action {
Action::ClientSendRequest { ref result_tx, .. } |
Action::NodeSendMessage { ref result_tx, .. } => {
warn!("{:?} - Cannot handle {:?} - not bootstrapped", self, action);
// TODO: return Err here eventually. Returning Ok for now to
// preserve the pre-refactor behaviour.
result_tx.send(Ok(())).is_ok()
let _ = result_tx.send(Ok(()));
}
Action::Name { result_tx } => result_tx.send(*self.name()).is_ok(),
Action::Timeout(token) => {
self.handle_timeout(token);
true
Action::Name { result_tx } => {
let _ = result_tx.send(*self.name());
}
Action::Timeout(token) => self.handle_timeout(token),
Action::Terminate => {
return Transition::Terminate;
}
Action::Terminate => false,

// TODO: these actions make no sense in this state, but we handle
// them for now, to preserve the pre-refactor behaviour.
Action::CloseGroup { result_tx, .. } => result_tx.send(None).is_ok(),
Action::QuorumSize { result_tx } => result_tx.send(0).is_ok(),
};

if result {
Transition::Stay
} else {
Transition::Terminate
Action::CloseGroup { result_tx, .. } => {
let _ = result_tx.send(None);
}
Action::QuorumSize { result_tx } => {
let _ = result_tx.send(0);
}
}

Transition::Stay
}

pub fn handle_crust_event(&mut self, crust_event: CrustEvent) -> Transition {
Expand Down
31 changes: 15 additions & 16 deletions src/states/client.rs
Expand Up @@ -90,7 +90,7 @@ impl Client {
}

pub fn handle_action(&mut self, action: Action) -> Transition {
let result = match action {
match action {
Action::ClientSendRequest { content, dst, priority, result_tx } => {
let src = Authority::Client {
client_key: *self.full_id.public_id().signing_public_key(),
Expand All @@ -104,30 +104,29 @@ impl Client {
Err(_) | Ok(_) => Ok(()),
};

result_tx.send(result).is_ok()
let _ = result_tx.send(result);
}
Action::NodeSendMessage { result_tx, .. } => {
result_tx.send(Err(InterfaceError::InvalidState)).is_ok()
let _ = result_tx.send(Err(InterfaceError::InvalidState));
}
Action::CloseGroup { result_tx, .. } => {
let _ = result_tx.send(None);
}
Action::Name { result_tx } => {
let _ = result_tx.send(*self.name());
}
Action::CloseGroup { result_tx, .. } => result_tx.send(None).is_ok(),
Action::Name { result_tx } => result_tx.send(*self.name()).is_ok(),
Action::QuorumSize { result_tx } => {
// TODO: return the actual quorum size. To do that, we need to
// extend the MessageAccumulator's API with a method to retrieve it.
result_tx.send(0).is_ok()
let _ = result_tx.send(0);
}
Action::Timeout(token) => {
self.handle_timeout(token);
true
Action::Timeout(token) => self.handle_timeout(token),
Action::Terminate => {
return Transition::Terminate;
}
Action::Terminate => false,
};

if result {
Transition::Stay
} else {
Transition::Terminate
}

Transition::Stay
}

pub fn handle_crust_event(&mut self, crust_event: CrustEvent) -> Transition {
Expand Down
35 changes: 20 additions & 15 deletions src/states/joining_node.rs
Expand Up @@ -117,34 +117,39 @@ impl JoiningNode {
}

pub fn handle_action(&mut self, action: Action) -> Transition {
let result = match action {
Action::Name { result_tx } => result_tx.send(*self.name()).is_ok(),
match action {
Action::Name { result_tx } => {
let _ = result_tx.send(*self.name());
}
Action::ClientSendRequest { ref result_tx, .. } => {
result_tx.send(Err(InterfaceError::InvalidState)).is_ok()
let _ = result_tx.send(Err(InterfaceError::InvalidState));
}
Action::NodeSendMessage { ref result_tx, .. } => {
warn!("{:?} - Cannot handle {:?} - not bootstrapped", self, action);
// TODO: return Err here eventually. Returning Ok for now to
// preserve the pre-refactor behaviour.
result_tx.send(Ok(())).is_ok()
let _ = result_tx.send(Ok(()));
}
Action::Timeout(token) => {
self.handle_timeout(token);
true
if !self.handle_timeout(token) {
return Transition::Terminate;
}
}
Action::Terminate => {
return Transition::Terminate;
}
Action::Terminate => false,

// TODO: these actions make no sense in this state, but we handle
// them for now, to preserve the pre-refactor behaviour.
Action::CloseGroup { result_tx, .. } => result_tx.send(None).is_ok(),
Action::QuorumSize { result_tx } => result_tx.send(0).is_ok(),
};

if result {
Transition::Stay
} else {
Transition::Terminate
Action::CloseGroup { result_tx, .. } => {
let _ = result_tx.send(None);
}
Action::QuorumSize { result_tx } => {
let _ = result_tx.send(0);
}
}

Transition::Stay
}

pub fn handle_crust_event(&mut self, crust_event: CrustEvent) -> Transition {
Expand Down
54 changes: 26 additions & 28 deletions src/states/node.rs
Expand Up @@ -108,7 +108,7 @@ impl Node {
debug!("{:?} - State changed to node.", node);
Some(node)
} else {
let _ = node.event_sender.send(Event::Terminate);
node.send_event(Event::Terminate);
None
}
}
Expand Down Expand Up @@ -224,36 +224,36 @@ impl Node {
}

pub fn handle_action(&mut self, action: Action) -> Transition {
let result = match action {
match action {
Action::ClientSendRequest { result_tx, .. } => {
result_tx.send(Err(InterfaceError::InvalidState)).is_ok()
let _ = result_tx.send(Err(InterfaceError::InvalidState));
}
Action::NodeSendMessage { src, dst, content, priority, result_tx } => {
result_tx.send(match self.send_user_message(src, dst, content, priority) {
Err(RoutingError::Interface(err)) => Err(err),
Err(_) | Ok(_) => Ok(()),
})
.is_ok()
let result = match self.send_user_message(src, dst, content, priority) {
Err(RoutingError::Interface(err)) => Err(err),
Err(_) | Ok(_) => Ok(()),
};

let _ = result_tx.send(result);
}
Action::CloseGroup { name, result_tx } => {
result_tx.send(self.peer_mgr.routing_table().close_nodes(&name, GROUP_SIZE)).is_ok()
let _ =
result_tx.send(self.peer_mgr.routing_table().close_nodes(&name, GROUP_SIZE));
}
Action::Name { result_tx } => result_tx.send(*self.name()).is_ok(),
Action::QuorumSize { result_tx } => result_tx.send(self.dynamic_quorum_size()).is_ok(),
Action::Timeout(token) => {
self.handle_timeout(token);
true
Action::Name { result_tx } => {
let _ = result_tx.send(*self.name());
}
Action::Terminate => false,
};
Action::QuorumSize { result_tx } => {
let _ = result_tx.send(self.dynamic_quorum_size());
}
Action::Timeout(token) => self.handle_timeout(token),
Action::Terminate => {
return Transition::Terminate;
}
}

self.update_stats();

if result {
Transition::Stay
} else {
Transition::Terminate
}
Transition::Stay
}

pub fn handle_crust_event(&mut self, crust_event: CrustEvent) -> Transition {
Expand All @@ -280,7 +280,7 @@ impl Node {
}
CrustEvent::ListenerFailed => {
error!("{:?} Failed to start listening.", self);
let _ = self.event_sender.send(Event::Terminate);
self.send_event(Event::Terminate);
}
CrustEvent::WriteMsgSizeProhibitive(peer_id, msg) => {
error!("{:?} Failed to send {}-byte message to {:?}. Message too large.",
Expand Down Expand Up @@ -1324,11 +1324,9 @@ impl Node {
debug!("{:?} Lost connection, less than {} remaining.",
self,
GROUP_SIZE - 1);
let _ = self.event_sender.send(if self.is_first_node {
Event::Terminate
} else {
Event::RestartRequired
});
if !self.is_first_node {
self.send_event(Event::RestartRequired);
}
}
self.reset_bucket_refresh_timer();
};
Expand Down

0 comments on commit dc0a018

Please sign in to comment.