From 075b2d78e10ed0fdadd4df30fac4f8fcad1fe4b9 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Wed, 29 Jul 2020 14:44:15 +0200 Subject: [PATCH] client/network: Add peers to DHT only if protocols match (#6549) * client/network/src/discovery: Adjust to Kademlia API changes * client/network: Add peers to DHT only if protocols match With https://github.com/libp2p/rust-libp2p/pull/1628 rust-libp2p allows manually controlling which peers are inserted into the routing table. Instead of adding each peer to the routing table automatically, insert them only if they support the local nodes protocol id (e.g. `dot`) retrieved via the `identify` behaviour. For now this works around https://github.com/libp2p/rust-libp2p/issues/1611. In the future one might add more requirements. For example one might try to exclude light-clients. * Cargo.toml: Remove crates.io patch for libp2p * client/network/src/behaviour: Adjust to PeerInfo name change * client/network/src/discovery: Rework Kademlia event matching * client/network/discovery: Add trace on adding peer to DHT * client/network/discovery: Retrieve protocol name from kad behaviour * client/network/discovery: Fix formatting * client/network: Change DiscoveryBehaviour::add_self_reported signature * client/network: Document manual insertion strategy * client/network/discovery: Remove TODO for ignoring DHT address Co-authored-by: Pierre Krieger --- client/network/src/behaviour.rs | 29 ++-- client/network/src/discovery.rs | 237 +++++++++++++++++++++++++------- 2 files changed, 212 insertions(+), 54 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 9a466388f4fcc..2afcd2741381f 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -25,6 +25,7 @@ use bytes::Bytes; use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; +use libp2p::identify::IdentifyInfo; use libp2p::kad::record; use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use log::debug; @@ -413,16 +414,28 @@ impl NetworkBehaviourEventProcess NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: peer_info::PeerInfoEvent) { - let peer_info::PeerInfoEvent::Identified { peer_id, mut info } = event; - if info.listen_addrs.len() > 30 { - debug!(target: "sub-libp2p", "Node {:?} has reported more than 30 addresses; \ - it is identified by {:?} and {:?}", peer_id, info.protocol_version, - info.agent_version + let peer_info::PeerInfoEvent::Identified { + peer_id, + info: IdentifyInfo { + protocol_version, + agent_version, + mut listen_addrs, + protocols, + .. + }, + } = event; + + if listen_addrs.len() > 30 { + debug!( + target: "sub-libp2p", + "Node {:?} has reported more than 30 addresses; it is identified by {:?} and {:?}", + peer_id, protocol_version, agent_version ); - info.listen_addrs.truncate(30); + listen_addrs.truncate(30); } - for addr in &info.listen_addrs { - self.discovery.add_self_reported_address(&peer_id, addr.clone()); + + for addr in listen_addrs { + self.discovery.add_self_reported_address(&peer_id, protocols.iter(), addr); } self.substrate.add_discovered_nodes(iter::once(peer_id)); } diff --git a/client/network/src/discovery.rs b/client/network/src/discovery.rs index d08f9d44a15b9..8216d6b2cbe73 100644 --- a/client/network/src/discovery.rs +++ b/client/network/src/discovery.rs @@ -52,7 +52,7 @@ use ip_network::IpNetwork; use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint, Multiaddr, PeerId, PublicKey}; use libp2p::swarm::{NetworkBehaviour, NetworkBehaviourAction, PollParameters, ProtocolsHandler}; use libp2p::swarm::protocols_handler::multi::MultiHandler; -use libp2p::kad::{Kademlia, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record}; +use libp2p::kad::{Kademlia, KademliaBucketInserts, KademliaConfig, KademliaEvent, QueryResult, Quorum, Record}; use libp2p::kad::GetClosestPeersError; use libp2p::kad::handler::KademliaHandler; use libp2p::kad::QueryId; @@ -137,17 +137,9 @@ impl DiscoveryConfig { } /// Add discovery via Kademlia for the given protocol. - pub fn add_protocol(&mut self, p: ProtocolId) -> &mut Self { - // NB: If this protocol name derivation is changed, check if - // `DiscoveryBehaviour::new_handler` is still correct. - let proto_name = { - let mut v = vec![b'/']; - v.extend_from_slice(p.as_bytes()); - v.extend_from_slice(b"/kad"); - v - }; - - self.add_kademlia(p, proto_name); + pub fn add_protocol(&mut self, id: ProtocolId) -> &mut Self { + let name = protocol_name_from_protocol_id(&id); + self.add_kademlia(id, name); self } @@ -159,6 +151,10 @@ impl DiscoveryConfig { let mut config = KademliaConfig::default(); config.set_protocol_name(proto_name); + // By default Kademlia attempts to insert all peers into its routing table once a dialing + // attempt succeeds. In order to control which peer is added, disable the auto-insertion and + // instead add peers manually. + config.set_kbucket_inserts(KademliaBucketInserts::Manual); let store = MemoryStore::new(self.local_peer_id.clone()); let mut kad = Kademlia::with_config(self.local_peer_id.clone(), store, config); @@ -259,17 +255,43 @@ impl DiscoveryBehaviour { } } - /// Call this method when a node reports an address for itself. + /// Add a self-reported address of a remote peer to the k-buckets of the supported + /// DHTs (`supported_protocols`). /// - /// **Note**: It is important that you call this method, otherwise the discovery mechanism will - /// not properly work. - pub fn add_self_reported_address(&mut self, peer_id: &PeerId, addr: Multiaddr) { - if self.allow_non_globals_in_dht || self.can_add_to_dht(&addr) { - for k in self.kademlias.values_mut() { - k.add_address(peer_id, addr.clone()); + /// **Note**: It is important that you call this method. The discovery mechanism will not + /// automatically add connecting peers to the Kademlia k-buckets. + pub fn add_self_reported_address( + &mut self, + peer_id: &PeerId, + supported_protocols: impl Iterator>, + addr: Multiaddr + ) { + if !self.allow_non_globals_in_dht && !self.can_add_to_dht(&addr) { + log::trace!(target: "sub-libp2p", "Ignoring self-reported non-global address {} from {}.", addr, peer_id); + return + } + + let mut added = false; + for protocol in supported_protocols { + for kademlia in self.kademlias.values_mut() { + if protocol.as_ref() == kademlia.protocol_name() { + log::trace!( + target: "sub-libp2p", + "Adding self-reported address {} from {} to Kademlia DHT {}.", + addr, peer_id, String::from_utf8_lossy(kademlia.protocol_name()), + ); + kademlia.add_address(peer_id, addr.clone()); + added = true; + } } - } else { - log::trace!(target: "sub-libp2p", "Ignoring self-reported address {} from {}", addr, peer_id); + } + + if !added { + log::trace!( + target: "sub-libp2p", + "Ignoring self-reported address {} from {} as remote node is not part of any \ + Kademlia DHTs supported by the local node.", addr, peer_id, + ); } } @@ -340,17 +362,21 @@ impl DiscoveryBehaviour { } /// Event generated by the `DiscoveryBehaviour`. +#[derive(Debug)] pub enum DiscoveryOut { - /// The address of a peer has been added to the Kademlia routing table. - /// - /// Can be called multiple times with the same identity. + /// A connection to a peer has been established but the peer has not been + /// added to the routing table because [`KademliaBucketInserts::Manual`] is + /// configured. If the peer is to be included in the routing table, it must + /// be explicitly added via + /// [`DiscoveryBehaviour::add_self_reported_address`]. Discovered(PeerId), /// A peer connected to this node for whom no listen address is known. /// /// In order for the peer to be added to the Kademlia routing table, a known - /// listen address must be added via [`DiscoveryBehaviour::add_self_reported_address`], - /// e.g. obtained through the `identify` protocol. + /// listen address must be added via + /// [`DiscoveryBehaviour::add_self_reported_address`], e.g. obtained through + /// the `identify` protocol. UnroutablePeer(PeerId), /// The DHT yielded results for the record request, grouped in (key, value) pairs. @@ -569,8 +595,12 @@ impl NetworkBehaviour for DiscoveryBehaviour { let ev = DiscoveryOut::UnroutablePeer(peer); return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); } - KademliaEvent::RoutablePeer { .. } | KademliaEvent::PendingRoutablePeer { .. } => { - // We are not interested in these events at the moment. + KademliaEvent::RoutablePeer { peer, .. } => { + let ev = DiscoveryOut::Discovered(peer); + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + KademliaEvent::PendingRoutablePeer { .. } => { + // We are not interested in this event at the moment. } KademliaEvent::QueryResult { result: QueryResult::GetClosestPeers(res), .. } => { match res { @@ -689,25 +719,36 @@ impl NetworkBehaviour for DiscoveryBehaviour { } } +// NB: If this protocol name derivation is changed, check if +// `DiscoveryBehaviour::new_handler` is still correct. +fn protocol_name_from_protocol_id(id: &ProtocolId) -> Vec { + let mut v = vec![b'/']; + v.extend_from_slice(id.as_bytes()); + v.extend_from_slice(b"/kad"); + v +} + #[cfg(test)] mod tests { use crate::config::ProtocolId; use futures::prelude::*; use libp2p::identity::Keypair; - use libp2p::Multiaddr; + use libp2p::{Multiaddr, PeerId}; use libp2p::core::upgrade; use libp2p::core::transport::{Transport, MemoryTransport}; use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; use libp2p::swarm::Swarm; use std::{collections::HashSet, task::Poll}; - use super::{DiscoveryConfig, DiscoveryOut}; + use super::{DiscoveryConfig, DiscoveryOut, protocol_name_from_protocol_id}; #[test] fn discovery_working() { - let mut user_defined = Vec::new(); + let mut first_swarm_peer_id_and_addr = None; + let protocol_id = ProtocolId::from(b"dot".as_ref()); - // Build swarms whose behaviour is `DiscoveryBehaviour`. - let mut swarms = (0..25).map(|_| { + // Build swarms whose behaviour is `DiscoveryBehaviour`, each aware of + // the first swarm via `with_user_defined`. + let mut swarms = (0..25).map(|i| { let keypair = Keypair::generate_ed25519(); let keypair2 = keypair.clone(); @@ -730,14 +771,12 @@ mod tests { }); let behaviour = { - let protocol_id: &[u8] = b"/test/kad/1.0.0"; - let mut config = DiscoveryConfig::new(keypair.public()); - config.with_user_defined(user_defined.clone()) + config.with_user_defined(first_swarm_peer_id_and_addr.clone()) .allow_private_ipv4(true) .allow_non_globals_in_dht(true) .discovery_limit(50) - .add_protocol(ProtocolId::from(protocol_id)); + .add_protocol(protocol_id.clone()); config.finish() }; @@ -745,8 +784,8 @@ mod tests { let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); - if user_defined.is_empty() { - user_defined.push((keypair.public().into_peer_id(), listen_addr.clone())); + if i == 0 { + first_swarm_peer_id_and_addr = Some((keypair.public().into_peer_id(), listen_addr.clone())) } Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); @@ -755,7 +794,10 @@ mod tests { // Build a `Vec>` with the list of nodes remaining to be discovered. let mut to_discover = (0..swarms.len()).map(|n| { - (0..swarms.len()).filter(|p| *p != n) + (0..swarms.len()) + // Skip the first swarm as all other swarms already know it. + .skip(1) + .filter(|p| *p != n) .map(|p| Swarm::local_peer_id(&swarms[p].0).clone()) .collect::>() }).collect::>(); @@ -766,7 +808,7 @@ mod tests { match swarms[swarm_n].0.poll_next_unpin(cx) { Poll::Ready(Some(e)) => { match e { - DiscoveryOut::UnroutablePeer(other) => { + DiscoveryOut::UnroutablePeer(other) | DiscoveryOut::Discovered(other) => { // Call `add_self_reported_address` to simulate identify happening. let addr = swarms.iter().find_map(|(s, a)| if s.local_peer_id == other { @@ -775,12 +817,16 @@ mod tests { None }) .unwrap(); - swarms[swarm_n].0.add_self_reported_address(&other, addr); - }, - DiscoveryOut::Discovered(other) => { + swarms[swarm_n].0.add_self_reported_address( + &other, + [protocol_name_from_protocol_id(&protocol_id)].iter(), + addr, + ); + to_discover[swarm_n].remove(&other); - } - _ => {} + }, + DiscoveryOut::RandomKademliaStarted(_) => {}, + e => {panic!("Unexpected event: {:?}", e)}, } continue 'polling } @@ -799,4 +845,103 @@ mod tests { futures::executor::block_on(fut); } + + #[test] + fn discovery_ignores_peers_with_unknown_protocols() { + let supported_protocol_id = ProtocolId::from(b"a".as_ref()); + let unsupported_protocol_id = ProtocolId::from(b"b".as_ref()); + + let mut discovery = { + let keypair = Keypair::generate_ed25519(); + let mut config = DiscoveryConfig::new(keypair.public()); + config.allow_private_ipv4(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .add_protocol(supported_protocol_id.clone()); + config.finish() + }; + + let remote_peer_id = PeerId::random(); + let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + // Add remote peer with unsupported protocol. + discovery.add_self_reported_address( + &remote_peer_id, + [protocol_name_from_protocol_id(&unsupported_protocol_id)].iter(), + remote_addr.clone(), + ); + + for kademlia in discovery.kademlias.values_mut() { + assert!( + kademlia.kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .is_empty(), + "Expect peer with unsupported protocol not to be added." + ); + } + + // Add remote peer with supported protocol. + discovery.add_self_reported_address( + &remote_peer_id, + [protocol_name_from_protocol_id(&supported_protocol_id)].iter(), + remote_addr.clone(), + ); + + for kademlia in discovery.kademlias.values_mut() { + assert_eq!( + 1, + kademlia.kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .num_entries(), + "Expect peer with supported protocol to be added." + ); + } + } + + #[test] + fn discovery_adds_peer_to_kademlia_of_same_protocol_only() { + let protocol_a = ProtocolId::from(b"a".as_ref()); + let protocol_b = ProtocolId::from(b"b".as_ref()); + + let mut discovery = { + let keypair = Keypair::generate_ed25519(); + let mut config = DiscoveryConfig::new(keypair.public()); + config.allow_private_ipv4(true) + .allow_non_globals_in_dht(true) + .discovery_limit(50) + .add_protocol(protocol_a.clone()) + .add_protocol(protocol_b.clone()); + config.finish() + }; + + let remote_peer_id = PeerId::random(); + let remote_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + // Add remote peer with `protocol_a` only. + discovery.add_self_reported_address( + &remote_peer_id, + [protocol_name_from_protocol_id(&protocol_a)].iter(), + remote_addr.clone(), + ); + + assert_eq!( + 1, + discovery.kademlias.get_mut(&protocol_a) + .expect("Kademlia instance to exist.") + .kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .num_entries(), + "Expected remote peer to be added to `protocol_a` Kademlia instance.", + + ); + + assert!( + discovery.kademlias.get_mut(&protocol_b) + .expect("Kademlia instance to exist.") + .kbucket(remote_peer_id.clone()) + .expect("Remote peer id not to be equal to local peer id.") + .is_empty(), + "Expected remote peer not to be added to `protocol_b` Kademlia instance.", + ); + } }