Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: do not create connection when failed to send to client
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Jan 20, 2021
1 parent a65030d commit d5eadd8
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 27 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ futures = "~0.3.6"
hex_fmt = "~0.3.0"
itertools = "~0.9.0"
lru_time_cache = "~0.11.0"
qp2p = "~0.9.8"
qp2p = "~0.9.10"
rand = "~0.7.3"
rand_chacha = "~0.2.2"
thiserror = "1.0.23"
Expand Down
1 change: 1 addition & 0 deletions examples/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
src,
HexFmt(&content)
),
Event::ClientLost(addr) => info!("Node #{} received ClientLost({:?})", index, addr),
}

true
Expand Down
3 changes: 3 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ pub enum Event {
/// Stream to receive more messages from the client on the same channel
recv: RecvStream,
},
/// Failed in sending a message to client, or connection to client is lost
ClientLost(SocketAddr),
}

impl Debug for Event {
Expand Down Expand Up @@ -159,6 +161,7 @@ impl Debug for Event {
HexFmt(content),
src,
),
Self::ClientLost(addr) => write!(formatter, "ClientLost({:?})", addr),
}
}
}
14 changes: 14 additions & 0 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,20 @@ impl Comm {
})
}

/// Sends a message to client
pub async fn send_message_to_client(
&self,
client: &SocketAddr,
msg: Bytes,
) -> Result<(), SendError> {
if let Some(conn) = self.endpoint.get_connection(client) {
if conn.send_uni(msg.clone()).await.is_ok() {
return Ok(());
}
}
Err(SendError)
}

/// Sends a message to multiple recipients. Attempts to send to `delivery_group_size`
/// recipients out of the `recipients` list. If a send fails, attempts to send to the next peer
/// until `delivery_goup_size` successful sends complete or there are no more recipients to
Expand Down
22 changes: 16 additions & 6 deletions src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,16 @@ pub(crate) enum Command {
proofs: DkgFailureProofSet,
},
/// Send a message to `delivery_group_size` peers out of the given `recipients`.
SendMessage {
SendMessageToNodes {
recipients: Vec<SocketAddr>,
delivery_group_size: usize,
message: Bytes,
},
/// Send a message to a client.
SendMessageToClient {
recipient: SocketAddr,
message: Bytes,
},
/// Send `UserMessage` with the given source and destination.
SendUserMessage {
src: SrcLocation,
Expand All @@ -83,18 +88,18 @@ pub(crate) enum Command {
}

impl Command {
/// Convenience method to create `Command::SendMessage` with a single recipient.
/// Convenience method to create `Command::SendMessageToNodes` with a single recipient.
pub fn send_message_to_target(recipient: &SocketAddr, message: Bytes) -> Self {
Self::send_message_to_targets(slice::from_ref(recipient), 1, message)
}

/// Convenience method to create `Command::SendMessage` with multiple recipients.
/// Convenience method to create `Command::SendMessageToNodes` with multiple recipients.
pub fn send_message_to_targets(
recipients: &[SocketAddr],
delivery_group_size: usize,
message: Bytes,
) -> Self {
Self::SendMessage {
Self::SendMessageToNodes {
recipients: recipients.to_vec(),
delivery_group_size,
message,
Expand Down Expand Up @@ -141,16 +146,21 @@ impl Debug for Command {
.field("elders_info", elders_info)
.field("proofs", proofs)
.finish(),
Self::SendMessage {
Self::SendMessageToNodes {
recipients,
delivery_group_size,
message,
} => f
.debug_struct("SendMessage")
.debug_struct("SendMessageToNodes")
.field("recipients", recipients)
.field("delivery_group_size", delivery_group_size)
.field("message", &format_args!("{:10}", hex_fmt::HexFmt(message)))
.finish(),
Self::SendMessageToClient { recipient, message } => f
.debug_struct("SendMessageToClient")
.field("recipient", recipient)
.field("message", &format_args!("{:10}", hex_fmt::HexFmt(message)))
.finish(),
Self::SendUserMessage { src, dst, content } => f
.debug_struct("SendUserMessage")
.field("src", src)
Expand Down
6 changes: 1 addition & 5 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,11 +298,7 @@ impl Routing {
recipient: SocketAddr,
message: Bytes,
) -> Result<()> {
let command = Command::SendMessage {
recipients: vec![recipient],
delivery_group_size: 1,
message,
};
let command = Command::SendMessageToClient { recipient, message };
self.stage.clone().handle_commands(command).await
}

Expand Down
21 changes: 18 additions & 3 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,16 @@ impl Stage {
.await
.handle_dkg_failure(elders_info, proofs)
.map(|command| vec![command]),
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
delivery_group_size,
message,
} => Ok(self
.send_message(&recipients, delivery_group_size, message)
.send_message_to_nodes(&recipients, delivery_group_size, message)
.await),
Command::SendMessageToClient { recipient, message } => {
Ok(self.send_message_to_client(recipient, message).await)
}
Command::SendUserMessage { src, dst, content } => {
self.state.lock().await.send_user_message(src, dst, content)
}
Expand Down Expand Up @@ -168,7 +171,7 @@ impl Stage {
let _ = tokio::spawn(self.handle_commands(command));
}

async fn send_message(
async fn send_message_to_nodes(
&self,
recipients: &[SocketAddr],
delivery_group_size: usize,
Expand All @@ -183,6 +186,18 @@ impl Stage {
.collect()
}

async fn send_message_to_client(&self, recipient: SocketAddr, message: Bytes) -> Vec<Command> {
if self
.comm
.send_message_to_client(&recipient, message)
.await
.is_err()
{
self.send_event(Event::ClientLost(recipient)).await;
}
vec![]
}

async fn handle_schedule_timeout(&self, duration: Duration, token: u64) -> Option<Command> {
let mut cancel_rx = self.cancel_timer_rx.clone();

Expand Down
24 changes: 12 additions & 12 deletions src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ async fn receive_bootstrap_request() -> Result<()> {

let (recipients, message) = assert_matches!(
commands.next(),
Some(Command::SendMessage {
Some(Command::SendMessageToNodes {
recipients,
message, ..
}) => (recipients, message)
Expand Down Expand Up @@ -117,7 +117,7 @@ async fn receive_join_request_without_resource_proof_response() -> Result<()> {

let response_message = assert_matches!(
commands.next(),
Some(Command::SendMessage { message, .. }) => message
Some(Command::SendMessageToNodes { message, .. }) => message
);
let response_message = Message::from_bytes(&response_message)?;

Expand Down Expand Up @@ -430,7 +430,7 @@ async fn handle_consensus_on_online_of_elder_candidate() -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -503,7 +503,7 @@ async fn handle_online_command(

for command in commands {
let (message, recipients) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -703,7 +703,7 @@ async fn handle_consensus_on_offline_of_elder() -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -837,7 +837,7 @@ async fn handle_unknown_message(source: UnknownMessageSource) -> Result<()> {
// TODO: test also that the message got relayed to the elders.

for command in commands {
let (recipients, message) = if let Command::SendMessage {
let (recipients, message) = if let Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -948,7 +948,7 @@ async fn handle_untrusted_message(source: UntrustedMessageSource) -> Result<()>
let mut bounce_sent = false;

for command in commands {
let (recipients, message) = if let Command::SendMessage {
let (recipients, message) = if let Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -1039,7 +1039,7 @@ async fn handle_bounced_unknown_message() -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -1137,7 +1137,7 @@ async fn handle_bounced_untrusted_message() -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -1345,7 +1345,7 @@ async fn relocation(relocated_peer_role: RelocatedPeerRole) -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -1507,7 +1507,7 @@ async fn handle_elders_update() -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down Expand Up @@ -1655,7 +1655,7 @@ async fn handle_demote_during_split() -> Result<()> {

for command in commands {
let (recipients, message) = match command {
Command::SendMessage {
Command::SendMessageToNodes {
recipients,
message,
..
Expand Down

0 comments on commit d5eadd8

Please sign in to comment.