Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: end stale outbound queue immediately on disconnect, auto retry outbound messages #3664

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions comms/src/connection_manager/peer_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

use std::{
fmt,
future::Future,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
Expand Down Expand Up @@ -185,6 +186,12 @@ impl PeerConnection {
!self.request_tx.is_closed()
}

/// Returns a owned future that resolves on disconnection
pub fn on_disconnect(&self) -> impl Future<Output = ()> + 'static {
let request_tx = self.request_tx.clone();
async move { request_tx.closed().await }
}

pub fn age(&self) -> Duration {
self.started_at.elapsed()
}
Expand Down Expand Up @@ -354,7 +361,7 @@ impl PeerConnectionActor {
match maybe_request {
Some(request) => self.handle_request(request).await,
None => {
debug!(target: LOG_TARGET, "[{}] All peer connection handled dropped closing the connection", self);
debug!(target: LOG_TARGET, "[{}] All peer connection handles dropped closing the connection", self);
break;
}
}
Expand Down Expand Up @@ -468,11 +475,7 @@ impl PeerConnectionActor {
}

async fn notify_event(&mut self, event: ConnectionManagerEvent) {
log_if_error!(
target: LOG_TARGET,
self.event_notifier.send(event).await,
"Failed to send connection manager notification because '{}'",
);
let _ = self.event_notifier.send(event).await;
}

/// Disconnect this peer connection.
Expand Down
136 changes: 84 additions & 52 deletions comms/src/protocol/messaging/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
use std::time::{Duration, Instant};

use futures::{future::Either, SinkExt, StreamExt, TryStreamExt};
use tokio::sync::mpsc as tokiompsc;
use tokio::sync::mpsc;
use tracing::{debug, error, event, span, Instrument, Level};

use super::{error::MessagingProtocolError, metrics, MessagingEvent, MessagingProtocol, SendFailReason};
Expand All @@ -44,24 +44,27 @@ const MAX_SEND_RETRIES: usize = 1;

pub struct OutboundMessaging {
connectivity: ConnectivityRequester,
request_rx: tokiompsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: tokiompsc::Sender<MessagingEvent>,
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: mpsc::Sender<MessagingEvent>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
inactivity_timeout: Option<Duration>,
}

impl OutboundMessaging {
pub fn new(
connectivity: ConnectivityRequester,
messaging_events_tx: tokiompsc::Sender<MessagingEvent>,
request_rx: tokiompsc::UnboundedReceiver<OutboundMessage>,
messaging_events_tx: mpsc::Sender<MessagingEvent>,
messages_rx: mpsc::UnboundedReceiver<OutboundMessage>,
retry_queue_tx: mpsc::UnboundedSender<OutboundMessage>,
peer_node_id: NodeId,
inactivity_timeout: Option<Duration>,
) -> Self {
Self {
connectivity,
request_rx,
messages_rx,
messaging_events_tx,
retry_queue_tx,
peer_node_id,
inactivity_timeout,
}
Expand All @@ -77,8 +80,7 @@ impl OutboundMessaging {
async move {
debug!(
target: LOG_TARGET,
"Attempting to dial peer '{}' if required",
self.peer_node_id.short_str()
"Attempting to dial peer '{}' if required", self.peer_node_id
);
let peer_node_id = self.peer_node_id.clone();
let messaging_events_tx = self.messaging_events_tx.clone();
Expand All @@ -87,25 +89,23 @@ impl OutboundMessaging {
event!(
Level::DEBUG,
"Outbound messaging for peer '{}' has stopped because the stream was closed",
peer_node_id.short_str()
peer_node_id
);

debug!(
target: LOG_TARGET,
"Outbound messaging for peer '{}' has stopped because the stream was closed",
peer_node_id.short_str()
"Outbound messaging for peer '{}' has stopped because the stream was closed", peer_node_id
);
},
Err(MessagingProtocolError::Inactivity) => {
event!(
Level::DEBUG,
"Outbound messaging for peer '{}' has stopped because it was inactive",
peer_node_id.short_str()
peer_node_id
);
debug!(
target: LOG_TARGET,
"Outbound messaging for peer '{}' has stopped because it was inactive",
peer_node_id.short_str()
"Outbound messaging for peer '{}' has stopped because it was inactive", peer_node_id
);
},
Err(MessagingProtocolError::PeerDialFailed(err)) => {
Expand Down Expand Up @@ -135,11 +135,11 @@ impl OutboundMessaging {
async fn run_inner(mut self) -> Result<(), MessagingProtocolError> {
let mut attempts = 0;

let substream = loop {
let (conn, substream) = loop {
match self.try_establish().await {
Ok(substream) => {
Ok(conn_and_substream) => {
event!(Level::DEBUG, "Substream established");
break substream;
break conn_and_substream;
},
Err(err) => {
if attempts >= MAX_SEND_RETRIES {
Expand All @@ -158,7 +158,7 @@ impl OutboundMessaging {
},
}
};
self.start_forwarding_messages(substream).await?;
self.start_forwarding_messages(conn, substream).await?;

Ok(())
}
Expand All @@ -178,16 +178,14 @@ impl OutboundMessaging {
target: LOG_TARGET,
"Dial was cancelled for peer '{}'. This is probably because of connection tie-breaking. \
Retrying...",
self.peer_node_id.short_str(),
self.peer_node_id,
);
continue;
},
Err(err) => {
debug!(
target: LOG_TARGET,
"MessagingProtocol failed to dial peer '{}' because '{:?}'",
self.peer_node_id.short_str(),
err
"MessagingProtocol failed to dial peer '{}' because '{:?}'", self.peer_node_id, err
);

break Err(MessagingProtocolError::PeerDialFailed(err));
Expand All @@ -199,7 +197,9 @@ impl OutboundMessaging {
.await
}

async fn try_establish(&mut self) -> Result<NegotiatedSubstream<Substream>, MessagingProtocolError> {
async fn try_establish(
&mut self,
) -> Result<(PeerConnection, NegotiatedSubstream<Substream>), MessagingProtocolError> {
let span = span!(
Level::DEBUG,
"establish_connection",
Expand All @@ -208,32 +208,30 @@ impl OutboundMessaging {
async move {
debug!(
target: LOG_TARGET,
"Attempting to establish messaging protocol connection to peer `{}`",
self.peer_node_id.short_str()
"Attempting to establish messaging protocol connection to peer `{}`", self.peer_node_id
);
let start = Instant::now();
let conn = self.try_dial_peer().await?;
let mut conn = self.try_dial_peer().await?;
debug!(
target: LOG_TARGET,
"Connection succeeded for peer `{}` in {:.0?}",
self.peer_node_id.short_str(),
self.peer_node_id,
start.elapsed()
);
let substream = self.try_open_substream(conn).await?;
let substream = self.try_open_substream(&mut conn).await?;
debug!(
target: LOG_TARGET,
"Substream established for peer `{}`",
self.peer_node_id.short_str(),
"Substream established for peer `{}`", self.peer_node_id,
);
Ok(substream)
Ok((conn, substream))
}
.instrument(span)
.await
}

async fn try_open_substream(
&mut self,
mut conn: PeerConnection,
conn: &mut PeerConnection,
) -> Result<NegotiatedSubstream<Substream>, MessagingProtocolError> {
let span = span!(
Level::DEBUG,
Expand All @@ -247,7 +245,7 @@ impl OutboundMessaging {
debug!(
target: LOG_TARGET,
"MessagingProtocol failed to open a substream to peer '{}' because '{}'",
self.peer_node_id.short_str(),
self.peer_node_id,
err
);
Err(err.into())
Expand All @@ -260,44 +258,43 @@ impl OutboundMessaging {

async fn start_forwarding_messages(
self,
conn: PeerConnection,
substream: NegotiatedSubstream<Substream>,
) -> Result<(), MessagingProtocolError> {
let Self {
mut messages_rx,
inactivity_timeout,
peer_node_id,
..
} = self;
let span = span!(
Level::DEBUG,
"start_forwarding_messages",
node_id = self.peer_node_id.to_string().as_str()
node_id = peer_node_id.to_string().as_str()
);
let _enter = span.enter();
debug!(
target: LOG_TARGET,
"Starting direct message forwarding for peer `{}`",
self.peer_node_id.short_str()
"Starting direct message forwarding for peer `{}`", peer_node_id
);
let substream = substream.stream;

let framed = MessagingProtocol::framed(substream);

let Self {
request_rx,
inactivity_timeout,
..
} = self;
let framed = MessagingProtocol::framed(substream.stream);

// Convert unbounded channel to a stream
let stream = futures::stream::unfold(request_rx, |mut rx| async move {
let stream = futures::stream::unfold(&mut messages_rx, |rx| async move {
let v = rx.recv().await;
v.map(|v| (v, rx))
});

let stream = match inactivity_timeout {
let outbound_stream = match inactivity_timeout {
Some(timeout) => Either::Left(
tokio_stream::StreamExt::timeout(stream, timeout).map_err(|_| MessagingProtocolError::Inactivity),
),
None => Either::Right(stream.map(Ok)),
};

let outbound_count = metrics::outbound_message_count(&self.peer_node_id);
let stream = stream.map(|msg| {
let outbound_count = metrics::outbound_message_count(&peer_node_id);
let stream = outbound_stream.map(|msg| {
outbound_count.inc();
msg.map(|mut out_msg| {
event!(Level::DEBUG, "Message buffered for sending {}", out_msg);
Expand All @@ -306,21 +303,56 @@ impl OutboundMessaging {
})
});

// Stop the stream as soon as the disconnection occurs, this allows the outbound stream to terminate as soon as
// the connection terminates rather than detecting the disconnect on the next message send.
let stream = stream.take_until(async move {
let on_disconnect = conn.on_disconnect();
let peer_node_id = conn.peer_node_id().clone();
// We drop the conn handle here BEFORE awaiting a disconnect to ensure that the outbound messaging isn't
// holding onto the handle keeping the connection alive
drop(conn);
on_disconnect.await;
debug!(
target: LOG_TARGET,
"Peer connection closed. Ending outbound messaging stream for peer {}.", peer_node_id
)
});

super::forward::Forward::new(stream, framed.sink_map_err(Into::into)).await?;

// Close so that the protocol handler does not resend to this session
messages_rx.close();
// The stream ended, perhaps due to a disconnect, but there could be more messages left on the queue. Collect
// any messages and queue them up for retry. If we cannot reconnect to the peer, the queued messages will be
// dropped.
let mut retried_messages_count = 0;
while let Some(msg) = messages_rx.recv().await {
if self.retry_queue_tx.send(msg).is_err() {
// The messaging protocol has shut down, so let's exit too
break;
}
retried_messages_count += 1;
}

if retried_messages_count > 0 {
debug!(
target: LOG_TARGET,
"{} pending message(s) were still queued after disconnect. Retrying them.", retried_messages_count
);
}

debug!(
target: LOG_TARGET,
"Direct message forwarding successfully completed for peer `{}`.",
self.peer_node_id.short_str()
"Direct message forwarding successfully completed for peer `{}`.", peer_node_id
);
Ok(())
}

async fn fail_all_pending_messages(&mut self, reason: SendFailReason) {
// Close the request channel so that we can read all the remaining messages and flush them
// to a failed event
self.request_rx.close();
while let Some(mut out_msg) = self.request_rx.recv().await {
self.messages_rx.close();
while let Some(mut out_msg) = self.messages_rx.recv().await {
out_msg.reply_fail(reason);
let _ = self
.messaging_events_tx
Expand Down
Loading