Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: improve Comm api and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Sep 29, 2020
1 parent 9b0971e commit 9ecfe8a
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 77 deletions.
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@
)]
// FIXME: it seems the code in `Comm::send_message_to_targets` is triggering type-length limit
// reached error for some reason. This is a quick workaround, but we should probably look into it
// closely and find a proper fix (or establish that this is already a proper fix).
#![type_length_limit = "2268004"]
// closely to find out whether there are any downsides doing this or whether there is another,
// more "proper" way.
#![type_length_limit = "2295944"]

#[macro_use]
extern crate log;
Expand Down
3 changes: 2 additions & 1 deletion src/node/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ fn spawn_timer_handler(
});
}

// Helpers to asynchronously watch when something gets dropped.
// A single consumer, multiple producer one-shot channel that sends when the sender gets dropped.
// Used to observe termination of some object from any number of tasks simultaneously.
//
// Note: it seems we could have used `tokio::sync::watch` for this exact purpose. The reason why we
// didn't is that `watch` interacts poorly with the `select!` macro. It requires the future
Expand Down
20 changes: 12 additions & 8 deletions src/node/stage/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ impl Approved {
}
*/

async fn handle_peer_lost(&mut self, peer_addr: SocketAddr) -> Result<()> {
let name = if let Some(node) = self.shared_state.find_p2p_node_from_addr(&peer_addr) {
async fn handle_peer_lost(&mut self, peer_addr: &SocketAddr) -> Result<()> {
let name = if let Some(node) = self.shared_state.find_p2p_node_from_addr(peer_addr) {
debug!("Lost known peer {}", node);
*node.name()
} else {
Expand Down Expand Up @@ -1920,16 +1920,20 @@ impl Approved {
delivery_group_size: usize,
msg: Bytes,
) -> Result<()> {
let status = self
match self
.comm
.send_message_to_targets(recipients, delivery_group_size, msg)
.await;
.await
{
Ok(()) => Ok(()),
Err(error) => {
for addr in &error.failed_recipients {
self.handle_peer_lost(addr).await?;
}

for addr in status.failed_recipients {
self.handle_peer_lost(addr).await?;
Err(error.into())
}
}

Ok(())
}

async fn send_message_to_target(&mut self, recipient: &SocketAddr, msg: Bytes) -> Result<()> {
Expand Down
5 changes: 3 additions & 2 deletions src/node/stage/bootstrapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,9 @@ impl Bootstrapping {
debug!("Sending BootstrapRequest to {}", dst);
self.comm
.send_message_to_target(&dst, message.to_bytes())
.await
.into()
.await?;

Ok(())
}

async fn reconnect_to_new_section(&self, new_conn_infos: Vec<SocketAddr>) -> Result<()> {
Expand Down
118 changes: 63 additions & 55 deletions src/node/stage/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

use crate::error::{Error, Result};
use bytes::Bytes;
use err_derive::Error;
use futures::{
lock::Mutex,
stream::{FuturesUnordered, StreamExt},
Expand Down Expand Up @@ -81,12 +82,21 @@ impl Comm {
})
}

/// Sends a message to multiple recipients. Attempts to send to `delivery_group_size`
/// recipients out of the `recipients` list. If a send fails, attempts to send to the next peer
/// until `delivery_goup_size` successful sends complete or there are no more recipients to
/// try. Each recipient will be attempted at most `RESEND_MAX_ATTEMPTS` times. If it fails all
/// the attempts, it is considered as lost.
///
/// Returns `Ok` if all of `delivery_group_size` sends succeeded and `Err` if less that
/// `delivery_group_size` succeeded. The returned error contains a list of all the recipients
/// that failed all their respective attempts.
pub async fn send_message_to_targets(
&self,
recipients: &[SocketAddr],
delivery_group_size: usize,
msg: Bytes,
) -> SendStatus {
) -> Result<(), SendError> {
if recipients.len() < delivery_group_size {
warn!(
"Less than delivery_group_size valid recipients - delivery_group_size: {}, recipients: {:?}",
Expand All @@ -96,11 +106,12 @@ impl Comm {
}

// Use `FuturesUnordered` to execute all the send tasks concurrently, but still on the same
// thread.
// thread. Keep track of the sending progress using the `SendState` helper.
let mut state = SendState::new(recipients, delivery_group_size);
let mut tasks = FuturesUnordered::new();

loop {
// Start a batch of sends.
while let Some(addr) = state.next() {
trace!("Sending message to {}", addr);
let msg = msg.clone();
Expand All @@ -111,7 +122,10 @@ impl Comm {
tasks.push(task);
}

// Await until one of the started sends completes.
if let Some((addr, result)) = tasks.next().await {
// Notify `SendState` about the result of the send and potentially start the next
// send, appending to the ones still in progress (if any).
match result {
Ok(_) => {
trace!("Sending message to {} succeeded", addr);
Expand All @@ -123,44 +137,47 @@ impl Comm {
}
}
} else {
// No sends in progress, we are done.
break;
}
}

let status = state.finish();
let failed_recipients = state.finish();

trace!(
"Sending message finished to {}/{} recipients (failed: {:?})",
delivery_group_size - status.remaining,
delivery_group_size - failed_recipients.len(),
delivery_group_size,
status.failed_recipients
failed_recipients
);

status
if failed_recipients.is_empty() {
Ok(())
} else {
Err(SendError { failed_recipients })
}
}

pub async fn send_message_to_target(&self, recipient: &SocketAddr, msg: Bytes) -> SendStatus {
pub async fn send_message_to_target(
&self,
recipient: &SocketAddr,
msg: Bytes,
) -> Result<(), SendError> {
self.send_message_to_targets(slice::from_ref(recipient), 1, msg)
.await
}
}

#[derive(Debug)]
pub struct SendStatus {
// The number of recipients out of the requested delivery group that we haven't successfully
// sent the message to.
pub remaining: usize,
#[derive(Debug, Error)]
#[error(display = "Send failed to: {:?}", failed_recipients)]
pub struct SendError {
// Recipients that failed all the send attempts.
pub failed_recipients: Vec<SocketAddr>,
}

impl From<SendStatus> for Result<(), Error> {
fn from(status: SendStatus) -> Self {
if status.remaining == 0 {
Ok(())
} else {
Err(Error::FailedSend)
}
impl From<SendError> for Error {
fn from(_: SendError) -> Self {
Error::FailedSend
}
}

Expand Down Expand Up @@ -222,7 +239,7 @@ impl SendState {
}
}

// Returns the next recipient to sent to.
// Returns the next recipient to send to.
fn next(&mut self) -> Option<SocketAddr> {
let active = self
.recipients
Expand Down Expand Up @@ -269,16 +286,13 @@ impl SendState {
}
}

fn finish(self) -> SendStatus {
SendStatus {
remaining: self.remaining,
failed_recipients: self
.recipients
.into_iter()
.filter(|recipient| !recipient.sending && recipient.attempt >= RESEND_MAX_ATTEMPTS)
.map(|recipient| recipient.addr)
.collect(),
}
// Consumes the state and returns the list of recipients that failed all attempts (if any).
fn finish(self) -> Vec<SocketAddr> {
self.recipients
.into_iter()
.filter(|recipient| !recipient.sending && recipient.attempt >= RESEND_MAX_ATTEMPTS)
.map(|recipient| recipient.addr)
.collect()
}
}

Expand All @@ -301,12 +315,8 @@ mod tests {
let mut peer1 = Peer::new()?;

let message = Bytes::from_static(b"hello world");
let status = comm
.send_message_to_targets(&[peer0.addr, peer1.addr], 2, message.clone())
.await;

assert_eq!(status.remaining, 0);
assert!(status.failed_recipients.is_empty());
comm.send_message_to_targets(&[peer0.addr, peer1.addr], 2, message.clone())
.await?;

assert_eq!(peer0.rx.recv().await, Some(message.clone()));
assert_eq!(peer1.rx.recv().await, Some(message));
Expand All @@ -322,12 +332,8 @@ mod tests {
let mut peer1 = Peer::new()?;

let message = Bytes::from_static(b"hello world");
let status = comm
.send_message_to_targets(&[peer0.addr, peer1.addr], 1, message.clone())
.await;

assert_eq!(status.remaining, 0);
assert!(status.failed_recipients.is_empty());
comm.send_message_to_targets(&[peer0.addr, peer1.addr], 1, message.clone())
.await?;

assert_eq!(peer0.rx.recv().await, Some(message));

Expand All @@ -345,12 +351,13 @@ mod tests {
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
let status = comm
match comm
.send_message_to_targets(&[invalid_addr], 1, message.clone())
.await;

assert_eq!(status.remaining, 1);
assert_eq!(status.failed_recipients, [invalid_addr]);
.await
{
Err(error) => assert_eq!(error.failed_recipients, [invalid_addr]),
Ok(_) => panic!("unexpected success"),
}

Ok(())
}
Expand All @@ -362,11 +369,9 @@ mod tests {
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
let status = comm
.send_message_to_targets(&[invalid_addr, peer.addr], 1, message.clone())
.await;
comm.send_message_to_targets(&[invalid_addr, peer.addr], 1, message.clone())
.await?;

assert_eq!(status.remaining, 0);
assert_eq!(peer.rx.recv().await, Some(message));

Ok(())
Expand All @@ -379,12 +384,15 @@ mod tests {
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
let status = comm

match comm
.send_message_to_targets(&[invalid_addr, peer.addr], 2, message.clone())
.await;
.await
{
Ok(_) => panic!("unexpected success"),
Err(error) => assert_eq!(error.failed_recipients, [invalid_addr]),
}

assert_eq!(status.remaining, 1);
assert_eq!(status.failed_recipients, [invalid_addr]);
assert_eq!(peer.rx.recv().await, Some(message));

Ok(())
Expand Down
6 changes: 3 additions & 3 deletions src/node/stage/joining.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,15 +250,15 @@ impl Joining {
None,
)?;

let status = self
let result = self
.comm
.send_message_to_targets(&recipients, recipients.len(), message.to_bytes())
.await;

if !status.failed_recipients.is_empty() {
if let Err(error) = result {
error!(
"Failed to send JoinRequest to {:?}",
status.failed_recipients
error.failed_recipients
)
}

Expand Down
10 changes: 4 additions & 6 deletions src/node/stage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl Stage {
mpsc::UnboundedReceiver<u64>,
mpsc::UnboundedReceiver<Event>,
)> {
let comm = Comm::new(transport_config.into())?;
let comm = Comm::new(transport_config)?;
let connection_info = comm.our_connection_info()?;
let p2p_node = P2pNode::new(*full_id.public_id(), connection_info);

Expand Down Expand Up @@ -154,7 +154,7 @@ impl Stage {
mpsc::UnboundedReceiver<u64>,
mpsc::UnboundedReceiver<Event>,
)> {
let (comm, addr) = Comm::from_bootstrapping(transport_config.into()).await?;
let (comm, addr) = Comm::from_bootstrapping(transport_config).await?;

let (events_tx, events_rx) = mpsc::unbounded_channel();
let node_info = NodeInfo {
Expand Down Expand Up @@ -227,10 +227,8 @@ impl Stage {
recipient: &SocketAddr,
msg: Bytes,
) -> Result<()> {
self.comm
.send_message_to_target(recipient, msg)
.await
.into()
self.comm.send_message_to_target(recipient, msg).await?;
Ok(())
}

/// Process a message accordng to current stage.
Expand Down

0 comments on commit 9ecfe8a

Please sign in to comment.