Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: ping peers on connection loss to detect if they went offline
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 23, 2020
1 parent de39c57 commit d6be64f
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 9 deletions.
4 changes: 4 additions & 0 deletions src/messages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ use serde::{Deserialize, Serialize};
use std::fmt::{self, Debug, Formatter};
use xor_name::Prefix;

// Message used to probe a peer connection.
// NOTE: ideally this would be empty, but that is currently treated as error by qp2p.
pub(crate) const PING: &[u8] = &[0];

/// Message sent over the network.
#[derive(Clone, Eq, Serialize, Deserialize)]
pub(crate) struct Message {
Expand Down
24 changes: 22 additions & 2 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
message_filter::MessageFilter,
messages::{
BootstrapResponse, JoinRequest, Message, MessageHash, MessageStatus, PlainMessage, Variant,
VerifyStatus,
VerifyStatus, PING,
},
network::Network,
node::Node,
Expand Down Expand Up @@ -199,8 +199,28 @@ impl Approved {
}
}

pub fn handle_connection_lost(&self, addr: &SocketAddr) -> Option<Command> {
if !self.is_elder() {
return None;
}

if let Some(peer) = self.section.find_joined_member_by_addr(addr) {
trace!("Lost connection to {}", peer);
} else {
return None;
}

// Try to send a "ping" message to probe the peer connection. If it succeeds, the
// connection loss was just temporary. Otherwise the peer is assumed lost and we will vote
// it offline.
Some(Command::send_message_to_target(
addr,
Bytes::from_static(PING),
))
}

pub fn handle_peer_lost(&self, addr: &SocketAddr) -> Result<Vec<Command>> {
let name = if let Some(peer) = self.section.find_member_from_addr(addr) {
let name = if let Some(peer) = self.section.find_joined_member_by_addr(addr) {
debug!("Lost known peer {}", peer);
*peer.name()
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/routing/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use tokio::{
const CONNECTIONS_CACHE_SIZE: usize = 1024;

/// Maximal number of resend attempts to the same target.
pub const RESEND_MAX_ATTEMPTS: u8 = 3;
pub(crate) const RESEND_MAX_ATTEMPTS: u8 = 3;

// Communication component of the node to interact with other nodes.
pub(crate) struct Comm {
Expand Down
5 changes: 5 additions & 0 deletions src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ pub(crate) enum Command {
},
/// Handle a timeout previously scheduled with `ScheduleTimeout`.
HandleTimeout(u64),
/// Handle lost connection to a peer.
HandleConnectionLost(SocketAddr),
/// Handle peer that's been detected as lost.
HandlePeerLost(SocketAddr),
/// Handle vote cast either by us or some other peer.
Expand Down Expand Up @@ -110,6 +112,9 @@ impl Debug for Command {
.field("message", message)
.finish(),
Self::HandleTimeout(token) => f.debug_tuple("HandleTimeout").field(token).finish(),
Self::HandleConnectionLost(addr) => {
f.debug_tuple("HandleConnectionLost").field(addr).finish()
}
Self::HandlePeerLost(addr) => f.debug_tuple("HandlePeerLost").field(addr).finish(),
Self::HandleVote { vote, proof_share } => f
.debug_struct("HandleVote")
Expand Down
22 changes: 18 additions & 4 deletions src/routing/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use super::{
comm::{ConnectionEvent, IncomingConnections},
Command, Stage,
};
use crate::{event::Event, messages::Message};
use crate::{
event::Event,
messages::{Message, PING},
};
use bytes::Bytes;
use std::{net::SocketAddr, sync::Arc};
use tokio::{sync::oneshot, task};
Expand All @@ -33,7 +36,7 @@ impl Executor {

let _ = task::spawn(async move {
tokio::select! {
_ = handle_incoming_messages(stage, incoming_conns) => (),
_ = handle_events(stage, incoming_conns) => (),
_ = cancel_rx => (),
}
});
Expand All @@ -44,7 +47,7 @@ impl Executor {
}
}

async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_conns: IncomingConnections) {
async fn handle_events(stage: Arc<Stage>, mut incoming_conns: IncomingConnections) {
while let Some(event) = incoming_conns.next().await {
match event {
ConnectionEvent::Received(qp2p::Message::UniStream { bytes, src, .. }) => {
Expand All @@ -56,6 +59,12 @@ async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_conns: Incomin
// Since it's arriving on a uni-stream we treat it as a Node
// message which needs to be processed by us, as well as
// potentially reported to the event stream consumer.

// Ignore pings.
if bytes == PING {
continue;
}

spawn_node_message_handler(stage.clone(), bytes, src);
}
ConnectionEvent::Received(qp2p::Message::BiStream {
Expand All @@ -82,7 +91,12 @@ async fn handle_incoming_messages(stage: Arc<Stage>, mut incoming_conns: Incomin

stage.send_event(event).await;
}
ConnectionEvent::Disconnected(addr) => trace!("Connection lost: {}", addr),
ConnectionEvent::Disconnected(addr) => {
let _ = stage
.clone()
.handle_commands(Command::HandleConnectionLost(addr))
.await;
}
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/routing/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ impl Stage {
Command::HandleConsensus { vote, proof } => {
self.state.lock().await.handle_consensus(vote, proof)
}
Command::HandleConnectionLost(addr) => Ok(self
.state
.lock()
.await
.handle_connection_lost(&addr)
.into_iter()
.collect()),
Command::HandlePeerLost(addr) => self.state.lock().await.handle_peer_lost(&addr),
Command::HandleDkgParticipationResult {
dkg_key,
Expand Down
4 changes: 2 additions & 2 deletions src/section/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,9 @@ impl Section {
.filter(move |peer| !self.is_elder(peer.name()))
}

pub fn find_member_from_addr(&self, addr: &SocketAddr) -> Option<&Peer> {
pub fn find_joined_member_by_addr(&self, addr: &SocketAddr) -> Option<&Peer> {
self.members
.all()
.joined()
.find(|info| info.peer.addr() == addr)
.map(|info| &info.peer)
}
Expand Down

0 comments on commit d6be64f

Please sign in to comment.