Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: close outdated connections to non-RT peers #1057

Merged
merged 2 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 10 additions & 4 deletions sn_networking/src/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use libp2p::{
swarm::{
behaviour::toggle::Toggle,
dial_opts::{DialOpts, PeerCondition},
DialError, NetworkBehaviour, StreamProtocol, Swarm,
ConnectionId, DialError, NetworkBehaviour, StreamProtocol, Swarm,
},
Multiaddr, PeerId, Transport,
};
Expand All @@ -58,7 +58,7 @@ use std::{
net::SocketAddr,
num::NonZeroUsize,
path::PathBuf,
time::Duration,
time::{Duration, Instant},
};
use tiny_keccak::{Hasher, Sha3};
use tokio::sync::{mpsc, oneshot};
Expand Down Expand Up @@ -453,8 +453,6 @@ impl NetworkBuilder {
let gossipsub = if self.enable_gossip {
// Gossipsub behaviour
let gossipsub_config = libp2p::gossipsub::ConfigBuilder::default()
// disable sending to ALL_PEERS subscribed to a topic, which is the default behaviour
.flood_publish(false)
// we don't currently require source peer id and/or signing
.validation_mode(libp2p::gossipsub::ValidationMode::Permissive)
// we use the hash of the msg content as the msg id to deduplicate them
Expand All @@ -467,6 +465,10 @@ impl NetworkBuilder {
})
// set the heartbeat interval to be higher than default 1sec
.heartbeat_interval(Duration::from_secs(5))
// default is 3sec, increase to 10sec to avoid false alert
.iwant_followup_time(Duration::from_secs(10))
// default is 10sec, increase to 60sec to reduce the risk of looping
.published_message_ids_cache_time(Duration::from_secs(60))
.build()
.map_err(|err| Error::GossipsubConfigError(err.to_string()))?;

Expand Down Expand Up @@ -547,6 +549,7 @@ impl NetworkBuilder {
is_gossip_handler: false,
network_discovery: NetworkDiscovery::new(&peer_id),
bootstrap_peers: Default::default(),
live_connected_peers: Default::default(),
};

Ok((
Expand Down Expand Up @@ -593,6 +596,9 @@ pub struct SwarmDriver {
// This is to ensure a more accurate network discovery.
pub(crate) network_discovery: NetworkDiscovery,
pub(crate) bootstrap_peers: BTreeMap<Option<u32>, HashSet<PeerId>>,
// Peers that having live connection to. Any peer got contacted during kad network query
// will have live connection established. And they may not appear in the RT.
pub(crate) live_connected_peers: BTreeMap<ConnectionId, (PeerId, Instant)>,
}

impl SwarmDriver {
Expand Down
44 changes: 42 additions & 2 deletions sn_networking/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use sn_protocol::{
use std::{
collections::HashSet,
fmt::{Debug, Formatter},
time::{Duration, Instant},
};
use tokio::sync::oneshot;
use tracing::{info, warn};
Expand Down Expand Up @@ -459,7 +460,12 @@ impl SwarmDriver {
} => {
event_string = "ConnectionEstablished";
trace!(%peer_id, num_established, "ConnectionEstablished ({connection_id:?}): {}", endpoint_str(&endpoint));
info!("{:?}", self.swarm.network_info());
info!(%peer_id, ?connection_id, "ConnectionEstablished {:?}", self.swarm.network_info());

let _ = self.live_connected_peers.insert(
connection_id,
(peer_id, Instant::now() + Duration::from_secs(60)),
);

if endpoint.is_dialer() {
self.dialed_peers
Expand All @@ -475,8 +481,9 @@ impl SwarmDriver {
connection_id,
} => {
event_string = "ConnectionClosed";
info!("{:?}", self.swarm.network_info());
info!(%peer_id, ?connection_id, "ConnectionClosed: {:?}", self.swarm.network_info());
trace!(%peer_id, ?connection_id, ?cause, num_established, "ConnectionClosed: {}", endpoint_str(&endpoint));
let _ = self.live_connected_peers.remove(&connection_id);
}
SwarmEvent::OutgoingConnectionError {
peer_id: Some(failed_peer_id),
Expand Down Expand Up @@ -582,6 +589,9 @@ impl SwarmDriver {
trace!("SwarmEvent has been ignored: {other:?}")
}
}

self.remove_outdated_connections();

trace!(
"SwarmEvent handled in {:?}: {event_string:?}",
start.elapsed()
Expand Down Expand Up @@ -1025,6 +1035,36 @@ impl SwarmDriver {
.remove_peer(&to_be_removed_bootstrap);
}
}

// Remove outdated connection to a peer if it is not in the RT.
fn remove_outdated_connections(&mut self) {
let mut shall_removed = vec![];

self.live_connected_peers
.retain(|connection_id, (peer_id, timeout)| {
let shall_retained = *timeout > Instant::now();
if !shall_retained {
shall_removed.push((*connection_id, *peer_id))
}
shall_retained
});

// Only remove outdated peer not in the RT
for (connection_id, peer_id) in shall_removed {
if let Some(kbucket) = self.swarm.behaviour_mut().kademlia.kbucket(peer_id) {
if kbucket
.iter()
.any(|peer_entry| peer_id == *peer_entry.node.key.preimage())
{
// Skip the connection as peer presents in the RT.
continue;
}
}

info!("Removing outdated connection {connection_id:?} to {peer_id:?}");
let _result = self.swarm.close_connection(connection_id);
}
}
}

/// Helper function to print formatted connection role info.
Expand Down
5 changes: 3 additions & 2 deletions sn_node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ use tokio::{
/// serialised transfer info encrypted against the referenced public key.
pub const ROYALTY_TRANSFER_NOTIF_TOPIC: &str = "ROYALTY_TRANSFER_NOTIFICATION";

/// Defines the percentage (ie 1/FORWARDER_CHOOSING_FACTOR th of all nodes) of nodes which will act as royalty_transfer_notify forwarder.
const FORWARDER_CHOOSING_FACTOR: usize = 50;
/// Defines the percentage (ie 1/FORWARDER_CHOOSING_FACTOR th of all nodes) of nodes
/// which will act as royalty_transfer_notify forwarder.
const FORWARDER_CHOOSING_FACTOR: usize = 10;

/// Interval to trigger replication of all records to all peers.
/// This is the max time it should take. Minimum interval at any ndoe will be half this
Expand Down