From c5bcada2c2a1e34b8e39b885f10ed7c2d00e8373 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Fri, 14 May 2021 17:16:50 +1000 Subject: [PATCH] protocols/gossipsub: Rework connection keep-alive (#2043) Keep connections to peers in a mesh alive. Allow closing idle connections to peers not in a mesh. Co-authored-by: Max Inden --- Cargo.toml | 2 +- examples/gossipsub-chat.rs | 8 +- protocols/gossipsub/CHANGELOG.md | 7 + protocols/gossipsub/Cargo.toml | 2 +- protocols/gossipsub/src/behaviour.rs | 462 ++++++++++++++------- protocols/gossipsub/src/behaviour/tests.rs | 267 +++++++----- protocols/gossipsub/src/config.rs | 18 + protocols/gossipsub/src/handler.rs | 56 ++- protocols/gossipsub/src/types.rs | 10 +- 9 files changed, 584 insertions(+), 248 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e69f95e1978..487249ff400 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,7 +66,7 @@ futures = "0.3.1" lazy_static = "1.2" libp2p-core = { version = "0.28.3", path = "core", default-features = false } libp2p-floodsub = { version = "0.29.0", path = "protocols/floodsub", optional = true } -libp2p-gossipsub = { version = "0.30.1", path = "./protocols/gossipsub", optional = true } +libp2p-gossipsub = { version = "0.31.0", path = "./protocols/gossipsub", optional = true } libp2p-identify = { version = "0.29.0", path = "protocols/identify", optional = true } libp2p-kad = { version = "0.30.0", path = "protocols/kad", optional = true } libp2p-mplex = { version = "0.28.0", path = "muxers/mplex", optional = true } diff --git a/examples/gossipsub-chat.rs b/examples/gossipsub-chat.rs index 35bc7276df4..b309c79e6a9 100644 --- a/examples/gossipsub-chat.rs +++ b/examples/gossipsub-chat.rs @@ -116,7 +116,9 @@ async fn main() -> Result<(), Box> { }; // Listen on all interfaces and whatever port the OS assigns - swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + swarm + .listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()) + .unwrap(); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { @@ -138,7 +140,9 @@ async fn main() -> Result<(), Box> { task::block_on(future::poll_fn(move |cx: &mut Context<'_>| { loop { if let Err(e) = match stdin.try_poll_next_unpin(cx)? { - Poll::Ready(Some(line)) => swarm.behaviour_mut().publish(topic.clone(), line.as_bytes()), + Poll::Ready(Some(line)) => swarm + .behaviour_mut() + .publish(topic.clone(), line.as_bytes()), Poll::Ready(None) => panic!("Stdin closed"), Poll::Pending => break, } { diff --git a/protocols/gossipsub/CHANGELOG.md b/protocols/gossipsub/CHANGELOG.md index 7bc1f9aa036..f215203ff96 100644 --- a/protocols/gossipsub/CHANGELOG.md +++ b/protocols/gossipsub/CHANGELOG.md @@ -1,3 +1,10 @@ +# 0.31.0 [unreleased] + +- Keep connections to peers in a mesh alive. Allow closing idle connections to peers not in a mesh + [PR-2043]. + +[PR-2043]: https://github.com/libp2p/rust-libp2p/pull/2043https://github.com/libp2p/rust-libp2p/pull/2043 + # 0.30.1 [2021-04-27] - Remove `regex-filter` feature flag thus always enabling `regex::RegexSubscriptionFilter` [PR diff --git a/protocols/gossipsub/Cargo.toml b/protocols/gossipsub/Cargo.toml index a0ecc7e3cbf..2d6c7c6a748 100644 --- a/protocols/gossipsub/Cargo.toml +++ b/protocols/gossipsub/Cargo.toml @@ -2,7 +2,7 @@ name = "libp2p-gossipsub" edition = "2018" description = "Gossipsub protocol for libp2p" -version = "0.30.1" +version = "0.31.0" authors = ["Age Manning "] license = "MIT" repository = "https://github.com/libp2p/rust-libp2p" diff --git a/protocols/gossipsub/src/behaviour.rs b/protocols/gossipsub/src/behaviour.rs index c7b6a0f1d5e..facc3e2cb28 100644 --- a/protocols/gossipsub/src/behaviour.rs +++ b/protocols/gossipsub/src/behaviour.rs @@ -50,7 +50,7 @@ use crate::backoff::BackoffStorage; use crate::config::{GossipsubConfig, ValidationMode}; use crate::error::{PublishError, SubscriptionError, ValidationError}; use crate::gossip_promises::GossipPromises; -use crate::handler::{GossipsubHandler, HandlerEvent}; +use crate::handler::{GossipsubHandler, GossipsubHandlerIn, HandlerEvent}; use crate::mcache::MessageCache; use crate::peer_score::{PeerScore, PeerScoreParams, PeerScoreThresholds, RejectReason}; use crate::protocol::SIGNING_PREFIX; @@ -62,7 +62,7 @@ use crate::types::{ FastMessageId, GossipsubControlAction, GossipsubMessage, GossipsubSubscription, GossipsubSubscriptionAction, MessageAcceptance, MessageId, PeerInfo, RawGossipsubMessage, }; -use crate::types::{GossipsubRpc, PeerKind}; +use crate::types::{GossipsubRpc, PeerConnections, PeerKind}; use crate::{rpc_proto, TopicScoreParams}; use std::{cmp::Ordering::Equal, fmt::Debug}; @@ -193,7 +193,8 @@ impl From for PublishConfig { } } -type GossipsubNetworkBehaviourAction = NetworkBehaviourAction, GossipsubEvent>; +type GossipsubNetworkBehaviourAction = + NetworkBehaviourAction, GossipsubEvent>; /// Network behaviour that handles the gossipsub protocol. /// @@ -226,9 +227,9 @@ pub struct Gossipsub< /// duplicates from being propagated to the application and on the network. duplicate_cache: DuplicateCache, - /// A map of peers to their protocol kind. This is to identify different kinds of gossipsub - /// peers. - peer_protocols: HashMap, + /// A set of connected peers, indexed by their [`PeerId`]. tracking both the [`PeerKind`] and + /// the set of [`ConnectionId`]s. + connected_peers: HashMap, /// A map of all connected peers - A map of topic hash to a list of gossipsub peer Ids. topic_peers: HashMap>, @@ -414,7 +415,7 @@ where peer_score: None, count_received_ihave: HashMap::new(), count_sent_iwant: HashMap::new(), - peer_protocols: HashMap::new(), + connected_peers: HashMap::new(), published_message_ids: DuplicateCache::new(config.published_message_ids_cache_time()), config, subscription_filter, @@ -460,7 +461,7 @@ where /// Lists all known peers and their associated protocol. pub fn peer_protocol(&self) -> impl Iterator { - self.peer_protocols.iter() + self.connected_peers.iter().map(|(k, v)| (k, &v.kind)) } /// Returns the gossipsub score for a given peer, if one exists. @@ -489,17 +490,15 @@ where // send subscription request to all peers let peer_list = self.peer_topics.keys().cloned().collect::>(); if !peer_list.is_empty() { - let event = Arc::new( - GossipsubRpc { - messages: Vec::new(), - subscriptions: vec![GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Subscribe, - }], - control_msgs: Vec::new(), - } - .into_protobuf(), - ); + let event = GossipsubRpc { + messages: Vec::new(), + subscriptions: vec![GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Subscribe, + }], + control_msgs: Vec::new(), + } + .into_protobuf(); for peer in peer_list { debug!("Sending SUBSCRIBE to peer: {:?}", peer); @@ -531,17 +530,15 @@ where // announce to all peers let peer_list = self.peer_topics.keys().cloned().collect::>(); if !peer_list.is_empty() { - let event = Arc::new( - GossipsubRpc { - messages: Vec::new(), - subscriptions: vec![GossipsubSubscription { - topic_hash: topic_hash.clone(), - action: GossipsubSubscriptionAction::Unsubscribe, - }], - control_msgs: Vec::new(), - } - .into_protobuf(), - ); + let event = GossipsubRpc { + messages: Vec::new(), + subscriptions: vec![GossipsubSubscription { + topic_hash: topic_hash.clone(), + action: GossipsubSubscriptionAction::Unsubscribe, + }], + control_msgs: Vec::new(), + } + .into_protobuf(); for peer in peer_list { debug!("Sending UNSUBSCRIBE to peer: {}", peer.to_string()); @@ -580,14 +577,12 @@ where topic: raw_message.topic.clone(), }); - let event = Arc::new( - GossipsubRpc { - subscriptions: Vec::new(), - messages: vec![raw_message.clone()], - control_msgs: Vec::new(), - } - .into_protobuf(), - ); + let event = GossipsubRpc { + subscriptions: Vec::new(), + messages: vec![raw_message.clone()], + control_msgs: Vec::new(), + } + .into_protobuf(); // check that the size doesn't exceed the max transmission size if event.encoded_len() > self.config.max_transmit_size() { @@ -634,8 +629,8 @@ where } // Floodsub peers - for (peer, kind) in &self.peer_protocols { - if kind == &PeerKind::Floodsub + for (peer, connections) in &self.connected_peers { + if connections.kind == PeerKind::Floodsub && !self .score_below_threshold(peer, |ts| ts.publish_threshold) .0 @@ -657,7 +652,7 @@ where let mesh_n = self.config.mesh_n(); let new_peers = get_random_peers( &self.topic_peers, - &self.peer_protocols, + &self.connected_peers, &topic_hash, mesh_n, { @@ -898,6 +893,7 @@ where add_peers, topic_hash ); added_peers.extend(peers.iter().cloned().take(add_peers)); + self.mesh.insert( topic_hash.clone(), peers.into_iter().take(add_peers).collect(), @@ -911,7 +907,7 @@ where // get the peers let new_peers = get_random_peers( &self.topic_peers, - &self.peer_protocols, + &self.connected_peers, topic_hash, self.config.mesh_n() - added_peers.len(), |peer| { @@ -947,6 +943,16 @@ where topic_hash: topic_hash.clone(), }, ); + + // If the peer did not previously exist in any mesh, inform the handler + peer_added_to_mesh( + peer_id, + vec![topic_hash], + &self.mesh, + self.peer_topics.get(&peer_id), + &mut self.events, + &self.connected_peers, + ); } debug!("Completed JOIN for topic: {:?}", topic_hash); } @@ -962,7 +968,7 @@ where peer_score.prune(peer, topic_hash.clone()); } - match self.peer_protocols.get(peer) { + match self.connected_peers.get(peer).map(|v| &v.kind) { Some(PeerKind::Floodsub) => { error!("Attempted to prune a Floodsub peer"); } @@ -984,7 +990,7 @@ where let peers = if do_px { get_random_peers( &self.topic_peers, - &self.peer_protocols, + &self.connected_peers, &topic_hash, self.config.prune_peers(), |p| p != peer && !self.score_below_threshold(p, |_| 0.0).0, @@ -1018,6 +1024,16 @@ where info!("LEAVE: Sending PRUNE to peer: {:?}", peer); let control = self.make_prune(topic_hash, &peer, self.config.do_px()); Self::control_pool_add(&mut self.control_pool, peer, control); + + // If the peer did not previously exist in any mesh, inform the handler + peer_removed_from_mesh( + peer, + topic_hash, + &self.mesh, + self.peer_topics.get(&peer), + &mut self.events, + &self.connected_peers, + ); } } debug!("Completed LEAVE for topic: {:?}", topic_hash); @@ -1074,10 +1090,7 @@ where } // IHAVE flood protection - let peer_have = self - .count_received_ihave - .entry(*peer_id) - .or_insert(0); + let peer_have = self.count_received_ihave.entry(*peer_id).or_insert(0); *peer_have += 1; if *peer_have > self.config.max_ihave_messages() { debug!( @@ -1201,10 +1214,7 @@ where if !cached_messages.is_empty() { debug!("IWANT: Sending cached messages to peer: {:?}", peer_id); // Send the messages to the peer - let message_list = cached_messages - .into_iter() - .map(|entry| entry.1) - .collect(); + let message_list = cached_messages.into_iter().map(|entry| entry.1).collect(); if self .send_message( *peer_id, @@ -1282,7 +1292,7 @@ where } } - //check the score + // check the score if below_zero { // we don't GRAFT peers with negative score debug!( @@ -1312,6 +1322,15 @@ where peer_id, &topic_hash ); peers.insert(*peer_id); + // If the peer did not previously exist in any mesh, inform the handler + peer_added_to_mesh( + *peer_id, + vec![&topic_hash], + &self.mesh, + self.peer_topics.get(&peer_id), + &mut self.events, + &self.connected_peers, + ); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.graft(peer_id, topic_hash); @@ -1381,6 +1400,16 @@ where } update_backoff = true; + + // inform the handler + peer_removed_from_mesh( + *peer_id, + topic_hash, + &self.mesh, + self.peer_topics.get(&peer_id), + &mut self.events, + &self.connected_peers, + ); } } if update_backoff { @@ -1605,10 +1634,7 @@ where .or_insert_with(|| msg_id.clone()); } if !self.duplicate_cache.insert(msg_id.clone()) { - debug!( - "Message already received, ignoring. Message: {}", - msg_id - ); + debug!("Message already received, ignoring. Message: {}", msg_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.duplicated_message(propagation_source, &msg_id, &message.topic); } @@ -1710,8 +1736,8 @@ where } }; - // Collect potential graft messages for the peer. - let mut grafts = Vec::new(); + // Collect potential graft topics for the peer. + let mut topics_to_graft = Vec::new(); // Notify the application about the subscription, after the grafts are sent. let mut application_event = Vec::new(); @@ -1753,7 +1779,11 @@ where // if the mesh needs peers add the peer to the mesh if !self.explicit_peers.contains(propagation_source) - && match self.peer_protocols.get(propagation_source) { + && match self + .connected_peers + .get(propagation_source) + .map(|v| &v.kind) + { Some(PeerKind::Gossipsubv1_1) => true, Some(PeerKind::Gossipsub) => true, _ => false, @@ -1787,9 +1817,7 @@ where peer_score .graft(propagation_source, subscription.topic_hash.clone()); } - grafts.push(GossipsubControlAction::Graft { - topic_hash: subscription.topic_hash.clone(), - }); + topics_to_graft.push(subscription.topic_hash.clone()); } } } @@ -1811,8 +1839,7 @@ where } // remove topic from the peer_topics mapping subscribed_topics.remove(&subscription.topic_hash); - unsubscribed_peers - .push((*propagation_source, subscription.topic_hash.clone())); + unsubscribed_peers.push((*propagation_source, subscription.topic_hash.clone())); // generate an unsubscribe event to be polled application_event.push(NetworkBehaviourAction::GenerateEvent( GossipsubEvent::Unsubscribed { @@ -1829,16 +1856,32 @@ where self.remove_peer_from_mesh(&peer_id, &topic_hash, None, false); } + // Potentially inform the handler if we have added this peer to a mesh for the first time. + let topics_joined = topics_to_graft.iter().collect::>(); + if !topics_joined.is_empty() { + peer_added_to_mesh( + propagation_source.clone(), + topics_joined, + &self.mesh, + self.peer_topics.get(propagation_source), + &mut self.events, + &self.connected_peers, + ); + } + // If we need to send grafts to peer, do so immediately, rather than waiting for the // heartbeat. - if !grafts.is_empty() + if !topics_to_graft.is_empty() && self .send_message( *propagation_source, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), - control_msgs: grafts, + control_msgs: topics_to_graft + .into_iter() + .map(|topic_hash| GossipsubControlAction::Graft { topic_hash }) + .collect(), } .into_protobuf(), ) @@ -1898,9 +1941,7 @@ where let mut scores = HashMap::new(); let peer_score = &self.peer_score; let mut score = |p: &PeerId| match peer_score { - Some((peer_score, ..)) => *scores - .entry(*p) - .or_insert_with(|| peer_score.score(p)), + Some((peer_score, ..)) => *scores.entry(*p).or_insert_with(|| peer_score.score(p)), _ => 0.0, }; @@ -1952,7 +1993,7 @@ where let desired_peers = self.config.mesh_n() - peers.len(); let peer_list = get_random_peers( topic_peers, - &self.peer_protocols, + &self.connected_peers, topic_hash, desired_peers, |peer| { @@ -2008,15 +2049,15 @@ where } if self.outbound_peers.contains(&peer) { if outbound <= self.config.mesh_outbound_min() { - //do not remove anymore outbound peers + // do not remove anymore outbound peers continue; } else { - //an outbound peer gets removed + // an outbound peer gets removed outbound -= 1; } } - //remove the peer + // remove the peer peers.remove(&peer); let current_topic = to_prune.entry(peer).or_insert_with(Vec::new); current_topic.push(topic_hash.clone()); @@ -2034,7 +2075,7 @@ where let needed = self.config.mesh_outbound_min() - outbound; let peer_list = get_random_peers( topic_peers, - &self.peer_protocols, + &self.connected_peers, topic_hash, needed, |peer| { @@ -2091,7 +2132,7 @@ where if median < thresholds.opportunistic_graft_threshold { let peer_list = get_random_peers( topic_peers, - &self.peer_protocols, + &self.connected_peers, topic_hash, self.config.opportunistic_graft_peers(), |peer| { @@ -2174,7 +2215,7 @@ where let explicit_peers = &self.explicit_peers; let new_peers = get_random_peers( &self.topic_peers, - &self.peer_protocols, + &self.connected_peers, topic_hash, needed_peers, |peer| { @@ -2268,7 +2309,7 @@ where // get gossip_lazy random peers let to_msg_peers = get_random_peers_dynamic( &self.topic_peers, - &self.peer_protocols, + &self.connected_peers, &topic_hash, n_map, |peer| { @@ -2313,12 +2354,23 @@ where no_px: HashSet, ) { // handle the grafts and overlapping prunes per peer - for (peer, topics) in to_graft.iter() { - for topic in topics { - //inform scoring of graft + for (peer, topics) in to_graft.into_iter() { + for topic in &topics { + // inform scoring of graft if let Some((peer_score, ..)) = &mut self.peer_score { - peer_score.graft(peer, topic.clone()); + peer_score.graft(&peer, topic.clone()); } + + // inform the handler of the peer being added to the mesh + // If the peer did not previously exist in any mesh, inform the handler + peer_added_to_mesh( + peer, + vec![topic], + &self.mesh, + self.peer_topics.get(&peer), + &mut self.events, + &self.connected_peers, + ); } let mut control_msgs: Vec = topics .iter() @@ -2328,14 +2380,17 @@ where .collect(); // If there are prunes associated with the same peer add them. - if let Some(topics) = to_prune.remove(peer) { + // NOTE: In this case a peer has been added to a topic mesh, and removed from another. + // It therefore must be in at least one mesh and we do not need to inform the handler + // of its removal from another. + if let Some(topics) = to_prune.remove(&peer) { let mut prunes = topics .iter() .map(|topic_hash| { self.make_prune( topic_hash, - peer, - self.config.do_px() && !no_px.contains(peer), + &peer, + self.config.do_px() && !no_px.contains(&peer), ) }) .collect::>(); @@ -2345,7 +2400,7 @@ where // send the control messages if self .send_message( - *peer, + peer, GossipsubRpc { subscriptions: Vec::new(), messages: Vec::new(), @@ -2361,16 +2416,25 @@ where // handle the remaining prunes for (peer, topics) in to_prune.iter() { - let remaining_prunes = topics - .iter() - .map(|topic_hash| { - self.make_prune( - topic_hash, - peer, - self.config.do_px() && !no_px.contains(peer), - ) - }) - .collect(); + let mut remaining_prunes = Vec::new(); + for topic_hash in topics { + let prune = self.make_prune( + topic_hash, + peer, + self.config.do_px() && !no_px.contains(peer), + ); + remaining_prunes.push(prune); + // inform the handler + peer_removed_from_mesh( + peer.clone(), + topic_hash, + &self.mesh, + self.peer_topics.get(&peer), + &mut self.events, + &self.connected_peers, + ); + } + if self .send_message( *peer, @@ -2432,14 +2496,12 @@ where // forward the message to peers if !recipient_peers.is_empty() { - let event = Arc::new( - GossipsubRpc { - subscriptions: Vec::new(), - messages: vec![message.clone()], - control_msgs: Vec::new(), - } - .into_protobuf(), - ); + let event = GossipsubRpc { + subscriptions: Vec::new(), + messages: vec![message.clone()], + control_msgs: Vec::new(), + } + .into_protobuf(); for peer in recipient_peers.iter() { debug!("Sending message: {:?} to peer {:?}", msg_id, peer); @@ -2579,7 +2641,7 @@ where fn send_message( &mut self, peer_id: PeerId, - message: impl Into>, + message: rpc_proto::Rpc, ) -> Result<(), PublishError> { // If the message is oversized, try and fragment it. If it cannot be fragmented, log an // error and drop the message (all individual messages should be small enough to fit in the @@ -2591,7 +2653,7 @@ where self.events .push_back(NetworkBehaviourAction::NotifyHandler { peer_id, - event: message, + event: Arc::new(GossipsubHandlerIn::Message(message)), handler: NotifyHandler::Any, }) } @@ -2600,10 +2662,7 @@ where // If a message is too large to be sent as-is, this attempts to fragment it into smaller RPC // messages to be sent. - fn fragment_message( - &self, - rpc: Arc, - ) -> Result>, PublishError> { + fn fragment_message(&self, rpc: rpc_proto::Rpc) -> Result, PublishError> { if rpc.encoded_len() < self.config.max_transmit_size() { return Ok(vec![rpc]); } @@ -2719,7 +2778,7 @@ where } } - Ok(rpc_list.into_iter().map(Arc::new).collect()) + Ok(rpc_list) } } @@ -2744,6 +2803,7 @@ where self.config.protocol_id_prefix().clone(), self.config.max_transmit_size(), self.config.validation_mode().clone(), + self.config.idle_timeout().clone(), self.config.support_floodsub(), ) } @@ -2790,15 +2850,6 @@ where // Insert an empty set of the topics of this peer until known. self.peer_topics.insert(*peer_id, Default::default()); - // By default we assume a peer is only a floodsub peer. - // - // The protocol negotiation occurs once a message is sent/received. Once this happens we - // update the type of peer that this is in order to determine which kind of routing should - // occur. - self.peer_protocols - .entry(*peer_id) - .or_insert(PeerKind::Floodsub); - if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.add_peer(*peer_id); } @@ -2853,11 +2904,11 @@ where self.outbound_peers.remove(peer_id); } - // Remove peer from peer_topics and peer_protocols + // Remove peer from peer_topics and connected_peers // NOTE: It is possible the peer has already been removed from all mappings if it does not // support the protocol. self.peer_topics.remove(peer_id); - self.peer_protocols.remove(peer_id); + self.connected_peers.remove(peer_id); if let Some((peer_score, ..)) = &mut self.peer_score { peer_score.remove_peer(peer_id); @@ -2867,7 +2918,7 @@ where fn inject_connection_established( &mut self, peer_id: &PeerId, - _: &ConnectionId, + connection_id: &ConnectionId, endpoint: &ConnectedPoint, ) { // Ignore connections from blacklisted peers. @@ -2901,26 +2952,73 @@ where ) } } + + // By default we assume a peer is only a floodsub peer. + // + // The protocol negotiation occurs once a message is sent/received. Once this happens we + // update the type of peer that this is in order to determine which kind of routing should + // occur. + self.connected_peers + .entry(*peer_id) + .or_insert(PeerConnections { + kind: PeerKind::Floodsub, + connections: vec![*connection_id], + }) + .connections + .push(*connection_id); } fn inject_connection_closed( &mut self, - peer: &PeerId, - _: &ConnectionId, + peer_id: &PeerId, + connection_id: &ConnectionId, endpoint: &ConnectedPoint, ) { // Remove IP from peer scoring system if let Some((peer_score, ..)) = &mut self.peer_score { if let Some(ip) = get_ip_addr(endpoint.get_remote_address()) { - peer_score.remove_ip(peer, &ip); + peer_score.remove_ip(peer_id, &ip); } else { trace!( "Couldn't extract ip from endpoint of peer {} with endpoint {:?}", - peer, + peer_id, endpoint ) } } + + // Remove the connection from the list + // If there are no connections left, inject_disconnected will remove the mapping entirely. + if let Some(connections) = self.connected_peers.get_mut(peer_id) { + let index = connections + .connections + .iter() + .position(|v| v == connection_id) + .expect( + "Previously established connection to a non-black-listed peer to be present", + ); + connections.connections.remove(index); + + // If there are more connections and this peer is in a mesh, inform the first connection + // handler. + if !connections.connections.is_empty() { + if let Some(topics) = self.peer_topics.get(peer_id) { + for topic in topics { + if let Some(mesh_peers) = self.mesh.get(topic) { + if mesh_peers.contains(peer_id) { + self.events + .push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: peer_id.clone(), + event: Arc::new(GossipsubHandlerIn::JoinedMesh), + handler: NotifyHandler::One(connections.connections[0]), + }); + break; + } + } + } + } + } + } } fn inject_address_change( @@ -2969,15 +3067,15 @@ where ); // We treat this peer as disconnected self.inject_disconnected(&propagation_source); - } else if let Some(old_kind) = self.peer_protocols.get_mut(&propagation_source) { + } else if let Some(conn) = self.connected_peers.get_mut(&propagation_source) { // Only change the value if the old value is Floodsub (the default set in // inject_connected). All other PeerKind changes are ignored. debug!( "New peer type found: {} for peer: {}", kind, propagation_source ); - if let PeerKind::Floodsub = *old_kind { - *old_kind = kind; + if let PeerKind::Floodsub = conn.kind { + conn.kind = kind; } } } @@ -3126,12 +3224,92 @@ where } } +/// This is called when peers are added to any mesh. It checks if the peer existed +/// in any other mesh. If this is the first mesh they have joined, it queues a message to notify +/// the appropriate connection handler to maintain a connection. +fn peer_added_to_mesh( + peer_id: PeerId, + new_topics: Vec<&TopicHash>, + mesh: &HashMap>, + known_topics: Option<&BTreeSet>, + events: &mut VecDeque, + connections: &HashMap, +) { + // Ensure there is an active connection + let connection_id = { + let conn = connections.get(&peer_id).expect("To be connected to peer."); + assert!( + !conn.connections.is_empty(), + "Must have at least one connection" + ); + conn.connections[0] + }; + + if let Some(topics) = known_topics { + for topic in topics { + if !new_topics.contains(&topic) { + if let Some(mesh_peers) = mesh.get(topic) { + if mesh_peers.contains(&peer_id) { + // the peer is already in a mesh for another topic + return; + } + } + } + } + } + // This is the first mesh the peer has joined, inform the handler + events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + event: Arc::new(GossipsubHandlerIn::JoinedMesh), + handler: NotifyHandler::One(connection_id), + }); +} + +/// This is called when peers are removed from a mesh. It checks if the peer exists +/// in any other mesh. If this is the last mesh they have joined, we return true, in order to +/// notify the handler to no longer maintain a connection. +fn peer_removed_from_mesh( + peer_id: PeerId, + old_topic: &TopicHash, + mesh: &HashMap>, + known_topics: Option<&BTreeSet>, + events: &mut VecDeque, + connections: &HashMap, +) { + // Ensure there is an active connection + let connection_id = connections + .get(&peer_id) + .expect("To be connected to peer.") + .connections + .get(0) + .expect("There should be at least one connection to a peer."); + + if let Some(topics) = known_topics { + for topic in topics { + if topic != old_topic { + if let Some(mesh_peers) = mesh.get(topic) { + if mesh_peers.contains(&peer_id) { + // the peer exists in another mesh still + return; + } + } + } + } + } + // The peer is not in any other mesh, inform the handler + events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id, + event: Arc::new(GossipsubHandlerIn::LeftMesh), + handler: NotifyHandler::One(*connection_id), + }); +} + /// Helper function to get a subset of random gossipsub peers for a `topic_hash` /// filtered by the function `f`. The number of peers to get equals the output of `n_map` /// that gets as input the number of filtered peers. fn get_random_peers_dynamic( topic_peers: &HashMap>, - peer_protocols: &HashMap, + connected_peers: &HashMap, topic_hash: &TopicHash, // maps the number of total peers to the number of selected peers n_map: impl Fn(usize) -> usize, @@ -3143,9 +3321,9 @@ fn get_random_peers_dynamic( .iter() .cloned() .filter(|p| { - f(p) && match peer_protocols.get(p) { - Some(PeerKind::Gossipsub) => true, - Some(PeerKind::Gossipsubv1_1) => true, + f(p) && match connected_peers.get(p) { + Some(connections) if connections.kind == PeerKind::Gossipsub => true, + Some(connections) if connections.kind == PeerKind::Gossipsubv1_1 => true, _ => false, } }) @@ -3173,12 +3351,12 @@ fn get_random_peers_dynamic( /// filtered by the function `f`. fn get_random_peers( topic_peers: &HashMap>, - peer_protocols: &HashMap, + connected_peers: &HashMap, topic_hash: &TopicHash, n: usize, f: impl FnMut(&PeerId) -> bool, ) -> BTreeSet { - get_random_peers_dynamic(topic_peers, peer_protocols, topic_hash, |_| n, f) + get_random_peers_dynamic(topic_peers, connected_peers, topic_hash, |_| n, f) } /// Validates the combination of signing, privacy and message validation to ensure the @@ -3319,10 +3497,10 @@ mod local_test { rpc.messages.push(test_message()); let mut rpc_proto = rpc.clone().into_protobuf(); - let fragmented_messages = gs.fragment_message(Arc::new(rpc_proto.clone())).unwrap(); + let fragmented_messages = gs.fragment_message(rpc_proto.clone()).unwrap(); assert_eq!( fragmented_messages, - vec![Arc::new(rpc_proto.clone())], + vec![rpc_proto.clone()], "Messages under the limit shouldn't be fragmented" ); @@ -3334,7 +3512,7 @@ mod local_test { } let fragmented_messages = gs - .fragment_message(Arc::new(rpc_proto)) + .fragment_message(rpc_proto) .expect("Should be able to fragment the messages"); assert!( @@ -3369,7 +3547,7 @@ mod local_test { let rpc_proto = rpc.into_protobuf(); let fragmented_messages = gs - .fragment_message(Arc::new(rpc_proto.clone())) + .fragment_message(rpc_proto.clone()) .expect("Messages must be valid"); if rpc_proto.encoded_len() < max_transmit_size { @@ -3394,9 +3572,7 @@ mod local_test { // ensure they can all be encoded let mut buf = bytes::BytesMut::with_capacity(message.encoded_len()); - codec - .encode(Arc::try_unwrap(message).unwrap(), &mut buf) - .unwrap() + codec.encode(message, &mut buf).unwrap() } } QuickCheck::new() diff --git a/protocols/gossipsub/src/behaviour/tests.rs b/protocols/gossipsub/src/behaviour/tests.rs index cca72125fbf..d28ec937827 100644 --- a/protocols/gossipsub/src/behaviour/tests.rs +++ b/protocols/gossipsub/src/behaviour/tests.rs @@ -350,15 +350,18 @@ mod tests { gs.events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - for s in &event.subscriptions { - match s.subscribe { - Some(true) => collected_subscriptions.push(s.clone()), - _ => {} - }; + NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { + GossipsubHandlerIn::Message(ref message) => { + for s in &message.subscriptions { + match s.subscribe { + Some(true) => collected_subscriptions.push(s.clone()), + _ => {} + }; + } + collected_subscriptions } - collected_subscriptions - } + _ => collected_subscriptions, + }, _ => collected_subscriptions, }); @@ -411,19 +414,23 @@ mod tests { "should be able to unsubscribe successfully from each topic", ); + // collect all the subscriptions let subscriptions = gs.events .iter() .fold(vec![], |mut collected_subscriptions, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - for s in &event.subscriptions { - match s.subscribe { - Some(false) => collected_subscriptions.push(s.clone()), - _ => {} - }; + NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { + GossipsubHandlerIn::Message(ref message) => { + for s in &message.subscriptions { + match s.subscribe { + Some(true) => collected_subscriptions.push(s.clone()), + _ => {} + }; + } + collected_subscriptions } - collected_subscriptions - } + _ => collected_subscriptions, + }, _ => collected_subscriptions, }); @@ -518,8 +525,20 @@ mod tests { .insert(topic_hashes[1].clone(), Default::default()); let new_peers: Vec = vec![]; for _ in 0..3 { + let random_peer = PeerId::random(); + // inform the behaviour of a new peer + gs.inject_connection_established( + &random_peer, + &ConnectionId::new(1), + &ConnectedPoint::Dialer { + address: "/ip4/127.0.0.1".parse::().unwrap(), + }, + ); + gs.inject_connected(&random_peer); + + // add the new peer to the fanout let fanout_peers = gs.fanout.get_mut(&topic_hashes[1]).unwrap(); - fanout_peers.insert(PeerId::random()); + fanout_peers.insert(random_peer); } // subscribe to topic1 @@ -589,13 +608,16 @@ mod tests { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push(s.clone()); + NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { + GossipsubHandlerIn::Message(ref message) => { + let event = proto_to_message(&message); + for s in &event.messages { + collected_publish.push(s.clone()); + } + collected_publish } - collected_publish - } + _ => collected_publish, + }, _ => collected_publish, }); @@ -676,13 +698,16 @@ mod tests { .events .iter() .fold(vec![], |mut collected_publish, e| match e { - NetworkBehaviourAction::NotifyHandler { event, .. } => { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push(s.clone()); + NetworkBehaviourAction::NotifyHandler { event, .. } => match **event { + GossipsubHandlerIn::Message(ref message) => { + let event = proto_to_message(&message); + for s in &event.messages { + collected_publish.push(s.clone()); + } + collected_publish } - collected_publish - } + _ => collected_publish, + }, _ => collected_publish, }); @@ -722,12 +747,16 @@ mod tests { // check that our subscriptions are sent to each of the peers // collect all the SendEvents - let send_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs + let send_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs .events .iter() .filter(|e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - !event.subscriptions.is_empty() + if let GossipsubHandlerIn::Message(ref m) = **event { + !m.subscriptions.is_empty() + } else { + false + } } _ => false, }) @@ -737,10 +766,12 @@ mod tests { for sevent in send_events.clone() { match sevent { NetworkBehaviourAction::NotifyHandler { event, .. } => { - assert!( - event.subscriptions.len() == 2, - "There should be two subscriptions sent to each peer (1 for each topic)." - ); + if let GossipsubHandlerIn::Message(ref m) = **event { + assert!( + m.subscriptions.len() == 2, + "There should be two subscriptions sent to each peer (1 for each topic)." + ); + } } _ => {} }; @@ -873,47 +904,61 @@ mod tests { gs.topic_peers .insert(topic_hash.clone(), peers.iter().cloned().collect()); - gs.peer_protocols = peers + gs.connected_peers = peers .iter() - .map(|p| (p.clone(), PeerKind::Gossipsubv1_1)) + .map(|p| { + ( + p.clone(), + PeerConnections { + kind: PeerKind::Gossipsubv1_1, + connections: vec![ConnectionId::new(1)], + }, + ) + }) .collect(); let random_peers = - get_random_peers(&gs.topic_peers, &gs.peer_protocols, &topic_hash, 5, |_| { + get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { true }); assert_eq!(random_peers.len(), 5, "Expected 5 peers to be returned"); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.peer_protocols, &topic_hash, 30, |_| { - true - }); + let random_peers = get_random_peers( + &gs.topic_peers, + &gs.connected_peers, + &topic_hash, + 30, + |_| true, + ); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); - let random_peers = - get_random_peers(&gs.topic_peers, &gs.peer_protocols, &topic_hash, 20, |_| { - true - }); + let random_peers = get_random_peers( + &gs.topic_peers, + &gs.connected_peers, + &topic_hash, + 20, + |_| true, + ); assert!(random_peers.len() == 20, "Expected 20 peers to be returned"); assert!( random_peers == peers.iter().cloned().collect(), "Expected no shuffling" ); let random_peers = - get_random_peers(&gs.topic_peers, &gs.peer_protocols, &topic_hash, 0, |_| { + get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 0, |_| { true }); assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); // test the filter let random_peers = - get_random_peers(&gs.topic_peers, &gs.peer_protocols, &topic_hash, 5, |_| { + get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 5, |_| { false }); assert!(random_peers.len() == 0, "Expected 0 peers to be returned"); let random_peers = - get_random_peers(&gs.topic_peers, &gs.peer_protocols, &topic_hash, 10, { + get_random_peers(&gs.topic_peers, &gs.connected_peers, &topic_hash, 10, { |peer| peers.contains(peer) }); assert!(random_peers.len() == 10, "Expected 10 peers to be returned"); @@ -955,9 +1000,11 @@ mod tests { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - let event = proto_to_message(event); - for c in &event.messages { - collected_messages.push(c.clone()) + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(&m); + for c in &event.messages { + collected_messages.push(c.clone()) + } } collected_messages } @@ -1011,12 +1058,16 @@ mod tests { // is the message is being sent? let message_exists = gs.events.iter().any(|e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - let event = proto_to_message(event); - event - .messages - .iter() - .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) - .any(|msg| gs.config.message_id(&msg) == msg_id) + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + event + .messages + .iter() + .map(|msg| gs.data_transform.inbound_transform(msg.clone()).unwrap()) + .any(|msg| gs.config.message_id(&msg) == msg_id) + } else { + false + } } _ => false, }); @@ -1249,12 +1300,16 @@ mod tests { .iter() .map(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { - let event = proto_to_message(event); - event - .control_msgs - .iter() - .filter(|m| filter(peer_id, m)) - .count() + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + event + .control_msgs + .iter() + .filter(|m| filter(peer_id, m)) + .count() + } else { + 0 + } } _ => 0, }) @@ -1281,7 +1336,7 @@ mod tests { //add peer as explicit peer gs.add_explicit_peer(&peer); - let dial_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs + let dial_events: Vec<&NetworkBehaviourAction, GossipsubEvent>> = gs .events .iter() .filter(|e| match e { @@ -1485,14 +1540,18 @@ mod tests { .iter() .filter(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { - let event = proto_to_message(event); - peer_id == &peers[0] - && event - .messages - .iter() - .filter(|m| m.data == message.data) - .count() - > 0 + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + peer_id == &peers[0] + && event + .messages + .iter() + .filter(|m| m.data == message.data) + .count() + > 0 + } else { + false + } } _ => false, }) @@ -1998,9 +2057,11 @@ mod tests { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push(s.clone()); + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + for s in &event.messages { + collected_publish.push(s.clone()); + } } collected_publish } @@ -2575,9 +2636,11 @@ mod tests { .iter() .fold(vec![], |mut collected_messages, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - let event = proto_to_message(event); - for c in &event.messages { - collected_messages.push((peer_id.clone(), c.clone())) + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + for c in &event.messages { + collected_messages.push((peer_id.clone(), c.clone())) + } } collected_messages } @@ -2721,9 +2784,11 @@ mod tests { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push((peer_id.clone(), s.clone())); + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + for s in &event.messages { + collected_publish.push((peer_id.clone(), s.clone())); + } } collected_publish } @@ -2775,9 +2840,11 @@ mod tests { .iter() .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { event, peer_id, .. } => { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push((peer_id.clone(), s.clone())); + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + for s in &event.messages { + collected_publish.push((peer_id.clone(), s.clone())); + } } collected_publish } @@ -4300,8 +4367,12 @@ mod tests { .iter() .map(|e| match e { NetworkBehaviourAction::NotifyHandler { event, .. } => { - let event = proto_to_message(event); - event.messages.len() + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + event.messages.len() + } else { + 0 + } } _ => 0, }) @@ -4742,9 +4813,11 @@ mod tests { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push(s.clone()); + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + for s in &event.messages { + collected_publish.push(s.clone()); + } } } collected_publish @@ -4798,9 +4871,11 @@ mod tests { .fold(vec![], |mut collected_publish, e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if peer_id == &p1 || peer_id == &p2 { - let event = proto_to_message(event); - for s in &event.messages { - collected_publish.push(s.clone()); + if let GossipsubHandlerIn::Message(ref m) = **event { + let event = proto_to_message(m); + for s in &event.messages { + collected_publish.push(s.clone()); + } } } collected_publish @@ -5115,7 +5190,11 @@ mod tests { let messages_to_p1 = gs2.events.drain(..).filter_map(|e| match e { NetworkBehaviourAction::NotifyHandler { peer_id, event, .. } => { if &peer_id == &p1 { - Some(event) + if let GossipsubHandlerIn::Message(m) = Arc::try_unwrap(event).unwrap() { + Some(m) + } else { + None + } } else { None } diff --git a/protocols/gossipsub/src/config.rs b/protocols/gossipsub/src/config.rs index 3d9eccf6a12..eb8eac15212 100644 --- a/protocols/gossipsub/src/config.rs +++ b/protocols/gossipsub/src/config.rs @@ -65,6 +65,7 @@ pub struct GossipsubConfig { fanout_ttl: Duration, check_explicit_peers_ticks: u64, max_transmit_size: usize, + idle_timeout: Duration, duplicate_cache_time: Duration, validate_messages: bool, validation_mode: ValidationMode, @@ -181,6 +182,13 @@ impl GossipsubConfig { self.max_transmit_size } + /// The time a connection is maintained to a peer without being in the mesh and without + /// send/receiving a message from. Connections that idle beyond this timeout are disconnected. + /// Default is 120 seconds. + pub fn idle_timeout(&self) -> Duration { + self.idle_timeout + } + /// Duplicates are prevented by storing message id's of known messages in an LRU time cache. /// This settings sets the time period that messages are stored in the cache. Duplicates can be /// received if duplicate messages are sent at a time greater than this setting apart. The @@ -384,6 +392,7 @@ impl Default for GossipsubConfigBuilder { fanout_ttl: Duration::from_secs(60), check_explicit_peers_ticks: 300, max_transmit_size: 65536, + idle_timeout: Duration::from_secs(120), duplicate_cache_time: Duration::from_secs(60), validate_messages: false, validation_mode: ValidationMode::Strict, @@ -524,6 +533,14 @@ impl GossipsubConfigBuilder { self } + /// The time a connection is maintained to a peer without being in the mesh and without + /// send/receiving a message from. Connections that idle beyond this timeout are disconnected. + /// Default is 120 seconds. + pub fn idle_timeout(&mut self, idle_timeout: Duration) -> &mut Self { + self.config.idle_timeout = idle_timeout; + self + } + /// Duplicates are prevented by storing message id's of known messages in an LRU time cache. /// This settings sets the time period that messages are stored in the cache. Duplicates can be /// received if duplicate messages are sent at a time greater than this setting apart. The @@ -768,6 +785,7 @@ impl std::fmt::Debug for GossipsubConfig { let _ = builder.field("heartbeat_interval", &self.heartbeat_interval); let _ = builder.field("fanout_ttl", &self.fanout_ttl); let _ = builder.field("max_transmit_size", &self.max_transmit_size); + let _ = builder.field("idle_timeout", &self.idle_timeout); let _ = builder.field("duplicate_cache_time", &self.duplicate_cache_time); let _ = builder.field("validate_messages", &self.validate_messages); let _ = builder.field("validation_mode", &self.validation_mode); diff --git a/protocols/gossipsub/src/handler.rs b/protocols/gossipsub/src/handler.rs index f5eb278945f..4f0810d2934 100644 --- a/protocols/gossipsub/src/handler.rs +++ b/protocols/gossipsub/src/handler.rs @@ -22,9 +22,9 @@ use crate::config::ValidationMode; use crate::error::{GossipsubHandlerError, ValidationError}; use crate::protocol::{GossipsubCodec, ProtocolConfig}; use crate::types::{GossipsubRpc, PeerKind, RawGossipsubMessage}; +use asynchronous_codec::Framed; use futures::prelude::*; use futures::StreamExt; -use asynchronous_codec::Framed; use libp2p_core::upgrade::{InboundUpgrade, NegotiationError, OutboundUpgrade, UpgradeError}; use libp2p_swarm::protocols_handler::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, @@ -62,6 +62,17 @@ pub enum HandlerEvent { PeerKind(PeerKind), } +/// A message sent from the behaviour to the handler. +#[derive(Debug, Clone)] +pub enum GossipsubHandlerIn { + /// A gossipsub message to send. + Message(crate::rpc_proto::Rpc), + /// The peer has joined the mesh. + JoinedMesh, + /// The peer has left the mesh. + LeftMesh, +} + /// The maximum number of substreams we accept or create before disconnecting from the peer. /// /// Gossipsub is supposed to have a single long-lived inbound and outbound substream. On failure we @@ -108,11 +119,18 @@ pub struct GossipsubHandler { /// This value is set to true to indicate the peer doesn't support gossipsub. protocol_unsupported: bool, + /// The amount of time we allow idle connections before disconnecting. + idle_timeout: Duration, + /// Collection of errors from attempting an upgrade. upgrade_errors: VecDeque>, /// Flag determining whether to maintain the connection to the peer. keep_alive: KeepAlive, + + /// Keeps track of whether this connection is for a peer in the mesh. This is used to make + /// decisions about the keep alive state for this connection. + in_mesh: bool, } /// State of the inbound substream, opened either by us or by the remote. @@ -148,6 +166,7 @@ impl GossipsubHandler { protocol_id_prefix: std::borrow::Cow<'static, str>, max_transmit_size: usize, validation_mode: ValidationMode, + idle_timeout: Duration, support_floodsub: bool, ) -> Self { GossipsubHandler { @@ -169,19 +188,21 @@ impl GossipsubHandler { peer_kind: None, peer_kind_sent: false, protocol_unsupported: false, + idle_timeout, upgrade_errors: VecDeque::new(), keep_alive: KeepAlive::Until(Instant::now() + Duration::from_secs(INITIAL_KEEP_ALIVE)), + in_mesh: false, } } } impl ProtocolsHandler for GossipsubHandler { - type InEvent = crate::rpc_proto::Rpc; + type InEvent = GossipsubHandlerIn; type OutEvent = HandlerEvent; type Error = GossipsubHandlerError; type InboundOpenInfo = (); type InboundProtocol = ProtocolConfig; - type OutboundOpenInfo = Self::InEvent; + type OutboundOpenInfo = crate::rpc_proto::Rpc; type OutboundProtocol = ProtocolConfig; fn listen_protocol(&self) -> SubstreamProtocol { @@ -239,9 +260,21 @@ impl ProtocolsHandler for GossipsubHandler { } } - fn inject_event(&mut self, message: crate::rpc_proto::Rpc) { + fn inject_event(&mut self, message: GossipsubHandlerIn) { if !self.protocol_unsupported { - self.send_queue.push(message); + match message { + GossipsubHandlerIn::Message(m) => self.send_queue.push(m), + // If we have joined the mesh, keep the connection alive. + GossipsubHandlerIn::JoinedMesh => { + self.in_mesh = true; + self.keep_alive = KeepAlive::Yes; + } + // If we have left the mesh, start the idle timer. + GossipsubHandlerIn::LeftMesh => { + self.in_mesh = false; + self.keep_alive = KeepAlive::Until(Instant::now() + self.idle_timeout); + } + } } } @@ -355,6 +388,10 @@ impl ProtocolsHandler for GossipsubHandler { Some(InboundSubstreamState::WaitingInput(mut substream)) => { match substream.poll_next_unpin(cx) { Poll::Ready(Some(Ok(message))) => { + if !self.in_mesh { + self.keep_alive = + KeepAlive::Until(Instant::now() + self.idle_timeout); + } self.inbound_substream = Some(InboundSubstreamState::WaitingInput(substream)); return Poll::Ready(ProtocolsHandlerEvent::Custom(message)); @@ -398,7 +435,6 @@ impl ProtocolsHandler for GossipsubHandler { // substream. warn!("Inbound substream error while closing: {:?}", e); } - self.inbound_substream = None; if self.outbound_substream.is_none() { self.keep_alive = KeepAlive::No; @@ -465,6 +501,7 @@ impl ProtocolsHandler for GossipsubHandler { return Poll::Ready(ProtocolsHandlerEvent::Close(e)); } Poll::Pending => { + self.keep_alive = KeepAlive::Yes; self.outbound_substream = Some(OutboundSubstreamState::PendingSend(substream, message)); break; @@ -474,11 +511,17 @@ impl ProtocolsHandler for GossipsubHandler { Some(OutboundSubstreamState::PendingFlush(mut substream)) => { match Sink::poll_flush(Pin::new(&mut substream), cx) { Poll::Ready(Ok(())) => { + if !self.in_mesh { + // if not in the mesh, reset the idle timeout + self.keep_alive = + KeepAlive::Until(Instant::now() + self.idle_timeout); + } self.outbound_substream = Some(OutboundSubstreamState::WaitingOutput(substream)) } Poll::Ready(Err(e)) => return Poll::Ready(ProtocolsHandlerEvent::Close(e)), Poll::Pending => { + self.keep_alive = KeepAlive::Yes; self.outbound_substream = Some(OutboundSubstreamState::PendingFlush(substream)); break; @@ -506,6 +549,7 @@ impl ProtocolsHandler for GossipsubHandler { )); } Poll::Pending => { + self.keep_alive = KeepAlive::No; self.outbound_substream = Some(OutboundSubstreamState::_Closing(substream)); break; diff --git a/protocols/gossipsub/src/types.rs b/protocols/gossipsub/src/types.rs index 69879087941..97a24f3d670 100644 --- a/protocols/gossipsub/src/types.rs +++ b/protocols/gossipsub/src/types.rs @@ -21,7 +21,7 @@ //! A collection of types using the Gossipsub system. use crate::rpc_proto; use crate::TopicHash; -use libp2p_core::PeerId; +use libp2p_core::{connection::ConnectionId, PeerId}; use std::fmt; use std::fmt::Debug; @@ -80,6 +80,14 @@ declare_message_id_type!(MessageId, "MessageId"); // filter duplicates quickly without performing the overhead of decompression. declare_message_id_type!(FastMessageId, "FastMessageId"); +#[derive(Debug, Clone, PartialEq)] +pub struct PeerConnections { + /// The kind of protocol the peer supports. + pub kind: PeerKind, + /// Its current connections. + pub connections: Vec, +} + /// Describes the types of peers that can exist in the gossipsub context. #[derive(Debug, Clone, PartialEq)] pub enum PeerKind {