Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat(comm): detect lost connections
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 23, 2020
1 parent 36a469a commit f4e9e3a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 65 deletions.
30 changes: 18 additions & 12 deletions src/routing/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::{comm::IncomingMessages, Comm};
use super::{
comm::{ConnectionEvent, IncomingConnections},
Comm,
};
use crate::{
consensus::Proven,
crypto,
Expand Down Expand Up @@ -35,7 +38,7 @@ const BACKLOG_CAPACITY: usize = 100;
pub(crate) async fn infant(
node: Node,
comm: &Comm,
incoming_messages: &mut IncomingMessages,
incoming_conns: &mut IncomingConnections,
bootstrap_addr: SocketAddr,
) -> Result<(Node, Section, Vec<(Message, SocketAddr)>)> {
let (send_tx, send_rx) = mpsc::channel(1);
Expand All @@ -47,7 +50,7 @@ pub(crate) async fn infant(
state.run(vec![bootstrap_addr], None),
future::join(
send_messages(send_rx, comm),
receive_messages(incoming_messages, recv_tx),
receive_messages(incoming_conns, recv_tx),
),
)
.await
Expand Down Expand Up @@ -415,20 +418,23 @@ enum JoinResponse {

// Keep receiving messages from `incoming_messages` and send them to `message_tx`.
async fn receive_messages(
incoming_messages: &mut IncomingMessages,
incoming_conns: &mut IncomingConnections,
mut message_tx: mpsc::Sender<(Message, SocketAddr)>,
) {
while let Some(message) = incoming_messages.next().await {
match message {
qp2p::Message::UniStream { bytes, src, .. } => match Message::from_bytes(&bytes) {
Ok(message) => {
let _ = message_tx.send((message, src)).await;
while let Some(event) = incoming_conns.next().await {
match event {
ConnectionEvent::Received(qp2p::Message::UniStream { bytes, src, .. }) => {
match Message::from_bytes(&bytes) {
Ok(message) => {
let _ = message_tx.send((message, src)).await;
}
Err(error) => debug!("Failed to deserialize message: {}", error),
}
Err(error) => debug!("Failed to deserialize message: {}", error),
},
qp2p::Message::BiStream { .. } => {
}
ConnectionEvent::Received(qp2p::Message::BiStream { .. }) => {
trace!("Ignore bi-stream messages during bootstrap");
}
ConnectionEvent::Disconnected(_) => {}
}
}
}
Expand Down
149 changes: 105 additions & 44 deletions src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,12 @@ use futures::{
};
use lru_time_cache::LruCache;
use qp2p::{Connection, Endpoint, QuicP2p};
use std::{future::Future, net::SocketAddr, sync::Arc};
use std::{
fmt::{self, Debug, Formatter},
future::Future,
net::SocketAddr,
sync::Arc,
};
use tokio::{
sync::{mpsc, watch},
task,
Expand Down Expand Up @@ -81,8 +86,8 @@ impl Comm {
/// event for the peers whom we received any messages from on the first stream. This means the
/// next message they send to us will fail the first send attempt (but will likely succeed on
/// the subsequent one).
pub fn listen(&self) -> Result<IncomingMessages> {
Ok(IncomingMessages::new(self.endpoint.listen()?))
pub fn listen(&self) -> Result<IncomingConnections> {
Ok(IncomingConnections::new(self.endpoint.listen()?))
}

pub async fn our_connection_info(&self) -> Result<SocketAddr> {
Expand Down Expand Up @@ -305,26 +310,44 @@ impl SendState {
}
}

/// Stream of incoming messages. Listens for incoming connections and multiplex all messages from
/// those connection into a single stream.
pub(crate) struct IncomingMessages {
message_rx: mpsc::Receiver<qp2p::Message>,
pub(crate) enum ConnectionEvent {
Received(qp2p::Message),
Disconnected(SocketAddr),
}

impl Debug for ConnectionEvent {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
match self {
Self::Received(qp2p::Message::UniStream { src, .. }) => {
write!(f, "Received(UniStream {{ src: {}, .. }})", src)
}
Self::Received(qp2p::Message::BiStream { src, .. }) => {
write!(f, "Received(BiStream {{ src: {}, .. }})", src)
}
Self::Disconnected(addr) => write!(f, "Disconnected({})", addr),
}
}
}

/// Stream of incoming connection events.
pub(crate) struct IncomingConnections {
event_rx: mpsc::Receiver<ConnectionEvent>,

// TODO: use `mpsc::Sender::closed` instead of this when we switch to the version of tokio that
// supports it (>= 0.3.0).
cancel_tx: watch::Sender<bool>,
}

impl Drop for IncomingMessages {
impl Drop for IncomingConnections {
fn drop(&mut self) {
let _ = self.cancel_tx.broadcast(true);
}
}

impl IncomingMessages {
impl IncomingConnections {
pub fn new(incoming_conns: qp2p::IncomingConnections) -> Self {
let (cancel_tx, mut cancel_rx) = watch::channel(false);
let (message_tx, message_rx) = mpsc::channel(1);
let (event_tx, event_rx) = mpsc::channel(1);

// Need to `recv` once, otherwise we would cancel all the tasks immediatelly
// NOTE: using block_on to avoid making this function `async`. It won't actually block,
Expand All @@ -334,23 +357,23 @@ impl IncomingMessages {

let _ = task::spawn(cancellable(
cancel_rx.clone(),
handle_incoming_connections(incoming_conns, message_tx, cancel_rx),
handle_incoming_connections(incoming_conns, event_tx, cancel_rx),
));

Self {
message_rx,
event_rx,
cancel_tx,
}
}

pub async fn next(&mut self) -> Option<qp2p::Message> {
self.message_rx.recv().await
pub async fn next(&mut self) -> Option<ConnectionEvent> {
self.event_rx.recv().await
}
}

async fn handle_incoming_connections(
mut incoming_conns: qp2p::IncomingConnections,
message_tx: mpsc::Sender<qp2p::Message>,
event_tx: mpsc::Sender<ConnectionEvent>,
cancel_rx: watch::Receiver<bool>,
) {
while let Some(incoming_msgs) = incoming_conns.next().await {
Expand All @@ -361,18 +384,22 @@ async fn handle_incoming_connections(

let _ = task::spawn(cancellable(
cancel_rx.clone(),
handle_incoming_messages(incoming_msgs, message_tx.clone()),
handle_incoming_messages(incoming_msgs, event_tx.clone()),
));
}
}

async fn handle_incoming_messages(
mut incoming_msgs: qp2p::IncomingMessages,
mut message_tx: mpsc::Sender<qp2p::Message>,
mut event_tx: mpsc::Sender<ConnectionEvent>,
) {
while let Some(msg) = incoming_msgs.next().await {
let _ = message_tx.send(msg).await;
let _ = event_tx.send(ConnectionEvent::Received(msg)).await;
}

let _ = event_tx
.send(ConnectionEvent::Disconnected(incoming_msgs.remote_addr()))
.await;
}

async fn cancellable<F: Future>(
Expand All @@ -391,12 +418,10 @@ struct Cancelled;
mod tests {
use super::*;
use anyhow::Result;
use assert_matches::assert_matches;
use futures::future;
use std::{
net::{IpAddr, Ipv4Addr},
slice,
time::Duration,
};
use qp2p::Config;
use std::{net::Ipv4Addr, slice, time::Duration};
use tokio::{net::UdpSocket, sync::mpsc, time};

const TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -441,24 +466,30 @@ mod tests {

#[tokio::test]
async fn failed_send() -> Result<()> {
let comm = Comm::new(transport_config())?;
let comm = Comm::new(Config {
// This makes this test faster.
idle_timeout_msec: Some(1),
..transport_config()
})?;
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");
match comm
.send_message_to_targets(&[invalid_addr], 1, message.clone())
.await
{
Err(error) => assert_eq!(error.failed_recipients, [invalid_addr]),
Ok(_) => panic!("unexpected success"),
}
assert_matches!(
comm
.send_message_to_targets(&[invalid_addr], 1, message.clone())
.await,
Err(error) => assert_eq!(error.failed_recipients, [invalid_addr])
);

Ok(())
}

#[tokio::test]
async fn successful_send_after_failed_attempts() -> Result<()> {
let comm = Comm::new(transport_config())?;
let comm = Comm::new(Config {
idle_timeout_msec: Some(1),
..transport_config()
})?;
let mut peer = Peer::new().await?;
let invalid_addr = get_invalid_addr().await?;

Expand All @@ -473,19 +504,21 @@ mod tests {

#[tokio::test]
async fn partially_successful_send() -> Result<()> {
let comm = Comm::new(transport_config())?;
let comm = Comm::new(Config {
idle_timeout_msec: Some(1),
..transport_config()
})?;
let mut peer = Peer::new().await?;
let invalid_addr = get_invalid_addr().await?;

let message = Bytes::from_static(b"hello world");

match comm
.send_message_to_targets(&[invalid_addr, peer.addr], 2, message.clone())
.await
{
Ok(_) => panic!("unexpected success"),
Err(error) => assert_eq!(error.failed_recipients, [invalid_addr]),
}
assert_matches!(
comm
.send_message_to_targets(&[invalid_addr, peer.addr], 2, message.clone())
.await,
Err(error) => assert_eq!(error.failed_recipients, [invalid_addr])
);

assert_eq!(peer.rx.recv().await, Some(message));

Expand Down Expand Up @@ -546,10 +579,38 @@ mod tests {
Ok(())
}

fn transport_config() -> qp2p::Config {
qp2p::Config {
ip: Some(IpAddr::V4(Ipv4Addr::LOCALHOST)),
idle_timeout_msec: Some(1),
#[tokio::test]
async fn incoming_connection_lost() -> Result<()> {
let comm0 = Comm::new(transport_config())?;
let addr0 = comm0.our_connection_info().await?;
let mut incoming_conns0 = comm0.listen()?;

let comm1 = Comm::new(transport_config())?;
let addr1 = comm1.our_connection_info().await?;

// Send a message to establish the connection
comm1
.send_message_to_targets(slice::from_ref(&addr0), 1, Bytes::from_static(b"hello"))
.await?;
assert_matches!(
incoming_conns0.next().await,
Some(ConnectionEvent::Received(_))
);

// Drop `comm1` to cause connection lost.
drop(comm1);

assert_matches!(
time::timeout(TIMEOUT, incoming_conns0.next()).await?,
Some(ConnectionEvent::Disconnected(addr)) => assert_eq!(addr, addr1)
);

Ok(())
}

fn transport_config() -> Config {
Config {
ip: Some(Ipv4Addr::LOCALHOST.into()),
..Default::default()
}
}
Expand Down
22 changes: 13 additions & 9 deletions src/routing/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use super::{comm::IncomingMessages, Command, Stage};
use super::{
comm::{ConnectionEvent, IncomingConnections},
Command, Stage,
};
use crate::{event::Event, messages::Message};
use bytes::Bytes;
use std::{net::SocketAddr, sync::Arc};
Expand All @@ -25,12 +28,12 @@ impl Drop for Executor {
}

impl Executor {
pub(crate) async fn new(stage: Arc<Stage>, incoming_msgs: IncomingMessages) -> Self {
pub(crate) async fn new(stage: Arc<Stage>, incoming_conns: IncomingConnections) -> Self {
let (cancel_tx, cancel_rx) = oneshot::channel();

let _ = task::spawn(async move {
tokio::select! {
_ = handle_incoming_messages(stage, incoming_msgs) => (),
_ = handle_incoming_messages(stage, incoming_conns) => (),
_ = cancel_rx => (),
}
});
Expand All @@ -41,10 +44,10 @@ impl Executor {
}
}

async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_msgs: IncomingMessages) {
while let Some(msg) = incoming_msgs.next().await {
match msg {
qp2p::Message::UniStream { bytes, src, .. } => {
async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_conns: IncomingConnections) {
while let Some(event) = incoming_conns.next().await {
match event {
ConnectionEvent::Received(qp2p::Message::UniStream { bytes, src, .. }) => {
trace!(
"New message ({} bytes) received on a uni-stream from: {}",
bytes.len(),
Expand All @@ -55,12 +58,12 @@ async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_msgs: Incoming
// potentially reported to the event stream consumer.
spawn_node_message_handler(stage.clone(), bytes, src);
}
qp2p::Message::BiStream {
ConnectionEvent::Received(qp2p::Message::BiStream {
bytes,
src,
send,
recv,
} => {
}) => {
trace!(
"New message ({} bytes) received on a bi-stream from: {}",
bytes.len(),
Expand All @@ -79,6 +82,7 @@ async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_msgs: Incoming

stage.send_event(event).await;
}
ConnectionEvent::Disconnected(addr) => trace!("Connection lost: {}", addr),
}
}
}
Expand Down

0 comments on commit f4e9e3a

Please sign in to comment.