Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
refactor(comm): change return of send to reflect the logic of deliver…
Browse files Browse the repository at this point in the history
…y_group_size
  • Loading branch information
Dylan-DPC committed May 3, 2021
1 parent 3b7640b commit 27a48a5
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 62 deletions.
15 changes: 13 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::{
messages::{CreateError, ExtendProofChainError},
section::SectionChainError,
};
use std::net::SocketAddr;
use thiserror::Error;

/// The type returned by the sn_routing message handling methods.
Expand All @@ -24,6 +25,14 @@ pub enum Error {
FailedSignature,
#[error("Cannot route.")]
CannotRoute,
#[error("Empty recipient list")]
EmptyRecipientList,
#[error("The config is invalid")]
InvalidConfig,
#[error("Cannot connect to the endpoint")]
CannotConnectEndpoint,
#[error("Address not reachable")]
AddressNotReachable,
#[error("Network layer error: {0}")]
Network(#[from] qp2p::Error),
#[error("The node is not in a state to handle the action.")]
Expand All @@ -38,8 +47,10 @@ pub enum Error {
InvalidSignatureShare,
#[error("The secret key share is missing.")]
MissingSecretKeyShare,
#[error("Failed to send a message.")]
FailedSend,
#[error("Failed to send a message to {0}")]
FailedSend(SocketAddr),
#[error("Connection closed locally")]
ConnectionClosed,
#[error("Invalid section chain: {0}")]
InvalidSectionChain(#[from] SectionChainError),
#[error("Messaging protocol error: {0}")]
Expand Down
107 changes: 57 additions & 50 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use std::{
net::SocketAddr,
sync::RwLock,
};
use thiserror::Error;
use tokio::{sync::mpsc, task};

// Communication component of the node to interact with other nodes.
Expand Down Expand Up @@ -111,13 +110,13 @@ impl Comm {
&self,
recipient: &SocketAddr,
msg: Bytes,
) -> Result<(), SendError> {
) -> Result<()> {
self.endpoint
.send_message(msg, recipient)
.await
.map_err(|err| {
error!("Sending to {:?} failed with {}", recipient, err);
SendError
Error::FailedSend(*recipient)
})
}

Expand Down Expand Up @@ -147,18 +146,19 @@ impl Comm {

/// Sends a message to multiple recipients. Attempts to send to `delivery_group_size`
/// recipients out of the `recipients` list. If a send fails, attempts to send to the next peer
/// until `delivery_goup_size` successful sends complete or there are no more recipients to
/// until `delivery_group_size` successful sends complete or there are no more recipients to
/// try.
///
/// Returns `Ok` if all of `delivery_group_size` sends succeeded and `Err` if less that
/// `delivery_group_size` succeeded. Also returns all the failed recipients which can be used
/// by the caller to identify lost peers.
/// Returns an `Error::ConnectionClosed` if the connection is closed locally. Else it returns a
/// `SendStatus::MinDeliveryGroupSizeReached` or `SendStatus::MinDeliveryGroupSizeFailed` depending
/// on if the minimum delivery group size is met or not. The failed recipients are sent along
/// with the status. It returns a `SendStatus::AllRecipients` if message is sent to all the recipients.
pub async fn send(
&self,
recipients: &[SocketAddr],
delivery_group_size: usize,
msg: Bytes,
) -> (Result<(), SendError>, Vec<SocketAddr>) {
) -> Result<SendStatus> {
trace!(
"Sending message ({} bytes) to {} of {:?}",
msg.len(),
Expand Down Expand Up @@ -196,7 +196,7 @@ impl Comm {
Err(qp2p::Error::Connection(qp2p::ConnectionError::LocallyClosed)) => {
// The connection was closed by us which means we are terminating so let's cut
// this short.
return (Err(SendError), vec![]);
return Err(Error::ConnectionClosed);
}
Err(_) => {
failed_recipients.push(*addr);
Expand All @@ -217,13 +217,15 @@ impl Comm {
failed_recipients
);

let result = if successes == delivery_group_size {
Ok(())
if successes == delivery_group_size {
if failed_recipients.is_empty() {
Ok(SendStatus::AllRecipients)
} else {
Ok(SendStatus::MinDeliveryGroupSizeReached(failed_recipients))
}
} else {
Err(SendError)
};

(result, failed_recipients)
Ok(SendStatus::MinDeliveryGroupSizeFailed(failed_recipients))
}
}

// Low-level send
Expand Down Expand Up @@ -251,16 +253,6 @@ impl Drop for Comm {
}
}

#[derive(Debug, Error)]
#[error("Send failed")]
pub struct SendError;

impl From<SendError> for Error {
fn from(_: SendError) -> Self {
Error::FailedSend
}
}

pub(crate) enum ConnectionEvent {
Received((SocketAddr, Bytes)),
Disconnected(SocketAddr),
Expand Down Expand Up @@ -295,6 +287,14 @@ async fn handle_incoming_messages(
}
}

/// Returns the status of the send operation.
#[derive(Debug, Clone)]
pub enum SendStatus {
AllRecipients,
MinDeliveryGroupSizeReached(Vec<SocketAddr>),
MinDeliveryGroupSizeFailed(Vec<SocketAddr>),
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -316,9 +316,11 @@ mod tests {
let mut peer1 = Peer::new().await?;

let message = Bytes::from_static(b"hello world");
comm.send(&[peer0.addr, peer1.addr], 2, message.clone())
.await
.0?;
let status = comm
.send(&[peer0.addr, peer1.addr], 2, message.clone())
.await?;

assert_matches!(status, SendStatus::AllRecipients);

assert_eq!(peer0.rx.recv().await, Some(message.clone()));
assert_eq!(peer1.rx.recv().await, Some(message));
Expand All @@ -335,9 +337,11 @@ mod tests {
let mut peer1 = Peer::new().await?;

let message = Bytes::from_static(b"hello world");
comm.send(&[peer0.addr, peer1.addr], 1, message.clone())
.await
.0?;
let status = comm
.send(&[peer0.addr, peer1.addr], 1, message.clone())
.await?;

assert_matches!(status, SendStatus::AllRecipients);

assert_eq!(peer0.rx.recv().await, Some(message));

Expand All @@ -364,9 +368,12 @@ mod tests {
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
let (result, failed_recipients) = comm.send(&[invalid_addr], 1, message.clone()).await;
assert!(result.is_err());
assert_eq!(failed_recipients, [invalid_addr]);
let status = comm.send(&[invalid_addr], 1, message.clone()).await?;

assert_matches!(
&status,
&SendStatus::MinDeliveryGroupSizeFailed(_) => vec![invalid_addr]
);

Ok(())
}
Expand All @@ -386,9 +393,9 @@ mod tests {
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
comm.send(&[invalid_addr, peer.addr], 1, message.clone())
.await
.0?;
let _ = comm
.send(&[invalid_addr, peer.addr], 1, message.clone())
.await?;

assert_eq!(peer.rx.recv().await, Some(message));

Expand All @@ -410,12 +417,14 @@ mod tests {
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
let (result, failed_recipients) = comm
let status = comm
.send(&[invalid_addr, peer.addr], 2, message.clone())
.await;
.await?;

assert!(result.is_err());
assert_eq!(failed_recipients, [invalid_addr]);
assert_matches!(
status,
SendStatus::MinDeliveryGroupSizeFailed(_) => vec![invalid_addr]
);
assert_eq!(peer.rx.recv().await, Some(message));

Ok(())
Expand All @@ -432,10 +441,9 @@ mod tests {

// Send the first message.
let msg0 = Bytes::from_static(b"zero");
send_comm
let _ = send_comm
.send(slice::from_ref(&recv_addr), 1, msg0.clone())
.await
.0?;
.await?;

let mut msg0_received = false;

Expand All @@ -451,10 +459,9 @@ mod tests {

// Send the second message.
let msg1 = Bytes::from_static(b"one");
send_comm
let _ = send_comm
.send(slice::from_ref(&recv_addr), 1, msg1.clone())
.await
.0?;
.await?;

let mut msg1_received = false;

Expand All @@ -479,10 +486,10 @@ mod tests {
let addr1 = comm1.our_connection_info();

// Send a message to establish the connection
comm1
let _ = comm1
.send(slice::from_ref(&addr0), 1, Bytes::from_static(b"hello"))
.await
.0?;
.await?;

assert_matches!(rx0.recv().await, Some(ConnectionEvent::Received(_)));
// Drop `comm1` to cause connection lost.
drop(comm1);
Expand Down
2 changes: 1 addition & 1 deletion src/routing/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1108,7 +1108,7 @@ impl Core {
};
if recipients.is_empty() {
trace!("Cannot route user message, recipient list empty: {:?}", msg);
return Err(Error::CannotRoute);
return Err(Error::EmptyRecipientList);
};
trace!("sending user message {:?} to client {:?}", msg, recipients);
return Ok(vec![Command::SendMessage {
Expand Down
28 changes: 19 additions & 9 deletions src/routing/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{bootstrap, Comm, Command, Core};
use crate::{error::Result, event::Event, relocation::SignedRelocateDetails};
use crate::routing::comm::SendStatus;
use crate::{error::Result, event::Event, relocation::SignedRelocateDetails, Error};
use sn_messaging::MessageType;
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::{
Expand Down Expand Up @@ -191,14 +192,23 @@ impl Dispatcher {
let msg_bytes = message.serialize()?;

let cmds = match message {
MessageType::Ping | MessageType::NodeMessage(_) => self
.comm
.send(recipients, delivery_group_size, msg_bytes)
.await
.1
.into_iter()
.map(Command::HandlePeerLost)
.collect(),
MessageType::Ping | MessageType::NodeMessage(_) => {
let status = self
.comm
.send(recipients, delivery_group_size, msg_bytes)
.await?;
match status {
SendStatus::MinDeliveryGroupSizeReached(failed_recipients)
| SendStatus::MinDeliveryGroupSizeFailed(failed_recipients) => {
Ok(failed_recipients
.into_iter()
.map(Command::HandlePeerLost)
.collect())
}
_ => Ok(vec![]),
}
.map_err(|e: Error| e)?
}
MessageType::ClientMessage(_) => {
for recipient in recipients {
if self
Expand Down

0 comments on commit 27a48a5

Please sign in to comment.