From 00244a27d8a7ed8ba3a2224a442f61af64989d01 Mon Sep 17 00:00:00 2001 From: Kasey Date: Mon, 21 Nov 2022 18:20:42 -0500 Subject: [PATCH] fix(`iroh-p2p`): implement full local lookup We were previously only returning the peer id, external addrs, and listening addrs. We now return the same content as performing a lookup on a remote peer. --- iroh-api/src/p2p.rs | 14 +----- iroh-p2p/src/behaviour.rs | 7 ++- iroh-p2p/src/behaviour/peer_manager.rs | 26 +++++++++- iroh-p2p/src/node.rs | 66 +++++++++++++++++++++++++- iroh-p2p/src/rpc.rs | 23 ++++++++- iroh-rpc-client/src/network.rs | 17 ++++++- iroh-rpc-types/proto/p2p.proto | 5 +- iroh-rpc-types/src/p2p.rs | 1 + 8 files changed, 136 insertions(+), 23 deletions(-) diff --git a/iroh-api/src/p2p.rs b/iroh-api/src/p2p.rs index bac391abf56..50f9575bd3b 100644 --- a/iroh-api/src/p2p.rs +++ b/iroh-api/src/p2p.rs @@ -24,19 +24,7 @@ impl P2p { } pub async fn lookup_local(&self) -> Result { - let (_, listen_addrs) = self - .client - .get_listening_addrs() - .await - .map_err(|e| map_service_error("p2p", e))?; - Ok(Lookup { - peer_id: self.client.local_peer_id().await?, - listen_addrs, - observed_addrs: self.client.external_addresses().await?, - protocol_version: String::new(), - agent_version: String::new(), - protocols: Default::default(), - }) + self.client.lookup_local().await } pub async fn lookup(&self, addr: &PeerIdOrAddr) -> Result { diff --git a/iroh-p2p/src/behaviour.rs b/iroh-p2p/src/behaviour.rs index c9ce3e82d2c..8167d28ae17 100644 --- a/iroh-p2p/src/behaviour.rs +++ b/iroh-p2p/src/behaviour.rs @@ -27,6 +27,9 @@ use crate::config::Libp2pConfig; mod event; mod peer_manager; +pub const PROTOCOL_VERSION: &str = "ipfs/0.1.0"; +pub const AGENT_VERSION: &str = concat!("iroh/", env!("CARGO_PKG_VERSION")); + /// Libp2p behaviour for the node. #[derive(NetworkBehaviour)] #[behaviour(out_event = "Event")] @@ -188,8 +191,8 @@ impl NodeBehaviour { }; let identify = { - let config = identify::Config::new("ipfs/0.1.0".into(), local_key.public()) - .with_agent_version(format!("iroh/{}", env!("CARGO_PKG_VERSION"))) + let config = identify::Config::new(PROTOCOL_VERSION.into(), local_key.public()) + .with_agent_version(String::from(AGENT_VERSION)) .with_cache_size(64 * 1024); identify::Behaviour::new(config) }; diff --git a/iroh-p2p/src/behaviour/peer_manager.rs b/iroh-p2p/src/behaviour/peer_manager.rs index 3137aff547a..393811a3a5b 100644 --- a/iroh-p2p/src/behaviour/peer_manager.rs +++ b/iroh-p2p/src/behaviour/peer_manager.rs @@ -21,6 +21,7 @@ use lru::LruCache; pub struct PeerManager { info: AHashMap, bad_peers: LruCache, + supported_protocols: Vec, } #[derive(Default, Debug, Clone)] @@ -43,6 +44,7 @@ impl Default for PeerManager { PeerManager { info: Default::default(), bad_peers: LruCache::new(DEFAULT_BAD_PEER_CAP.unwrap()), + supported_protocols: Default::default(), } } } @@ -68,6 +70,10 @@ impl PeerManager { pub fn info_for_peer(&self, peer_id: &PeerId) -> Option<&Info> { self.info.get(peer_id) } + + pub fn supported_protocols(&self) -> Vec { + self.supported_protocols.clone() + } } impl NetworkBehaviour for PeerManager { @@ -186,8 +192,26 @@ impl NetworkBehaviour for PeerManager { fn poll( &mut self, _cx: &mut Context<'_>, - _params: &mut impl PollParameters, + params: &mut impl PollParameters, ) -> Poll> { + // TODO(ramfox): + // We can only get the supported protocols of the local node by examining the + // `PollParameters`, which mean you can only get the supported protocols by examining the + // `PollParameters` in this method (`poll`) of a network behaviour. + // I injected this responsibility in the `peer_manager`, because it's the only "simple" + // network behaviour we have implemented. + // There is an issue up to remove `PollParameters`, and a discussion into how to instead + // get the `supported_protocols` of the node: + // https://github.com/libp2p/rust-libp2p/issues/3124 + // When that is resolved, we can hopefully remove this responsibility from the `peer_manager`, + // where it, frankly, doesn't belong. + if self.supported_protocols.is_empty() { + self.supported_protocols = params + .supported_protocols() + .map(|p| String::from_utf8_lossy(&p).to_string()) + .collect(); + } + Poll::Pending } } diff --git a/iroh-p2p/src/node.rs b/iroh-p2p/src/node.rs index 0a554d5bf5c..dcf9f475f8f 100644 --- a/iroh-p2p/src/node.rs +++ b/iroh-p2p/src/node.rs @@ -31,6 +31,7 @@ use tokio::task::JoinHandle; use tracing::{debug, error, info, trace, warn}; use iroh_bitswap::{BitswapEvent, Block}; +use iroh_rpc_client::Lookup; use crate::keys::{Keychain, Storage}; use crate::providers::Providers; @@ -953,6 +954,29 @@ impl Node { response_channel.send(None).ok(); } } + RpcMessage::LookupLocalPeerInfo(response_channel) => { + let peer_id = self.swarm.local_peer_id(); + let listen_addrs = self.swarm.listeners().cloned().collect(); + let observed_addrs = self + .swarm + .external_addresses() + .map(|a| a.addr.clone()) + .collect(); + let protocol_version = String::from(crate::behaviour::PROTOCOL_VERSION); + let agent_version = String::from(crate::behaviour::AGENT_VERSION); + let protocols = self.swarm.behaviour().peer_manager.supported_protocols(); + + response_channel + .send(Lookup { + peer_id: *peer_id, + listen_addrs, + observed_addrs, + agent_version, + protocol_version, + protocols, + }) + .ok(); + } RpcMessage::CancelListenForIdentify(response_channel, peer_id) => { self.lookup_queries.remove(&peer_id); response_channel.send(()).ok(); @@ -1312,6 +1336,12 @@ mod tests { let peer_id_b = test_runner_b.client.local_peer_id().await?; assert_eq!(test_runner_b.peer_id, peer_id_b); + let lookup_a = test_runner_a.client.lookup_local().await?; + // since we aren't connected to any other nodes, we should not + // have any information about our observed addresses + assert!(lookup_a.observed_addrs.is_empty()); + assert_lookup(lookup_a, test_runner_a.peer_id, &test_runner_a.addr)?; + // connect test_runner_a.client.connect(peer_id_b, addrs_b).await?; // Make sure we have exchanged identity information @@ -1323,8 +1353,7 @@ mod tests { // lookup let lookup_b = test_runner_a.client.lookup(peer_id_b, None).await?; - assert_eq!(peer_id_b, lookup_b.peer_id); - + assert_lookup(lookup_b, test_runner_b.peer_id, &test_runner_b.addr)?; // now that we are connected & have exchanged identity information, // we should now be able to view the node's external addrs // these are the addresses that other nodes tell you "this is the address I see for you" @@ -1339,6 +1368,39 @@ mod tests { Ok(()) } + // assert_lookup ensures each part of the lookup is equal + fn assert_lookup( + got: Lookup, + expected_peer_id: PeerId, + expected_addr: &Multiaddr, + ) -> Result<()> { + let expected_protocols = vec![ + "/ipfs/ping/1.0.0", + "/ipfs/id/1.0.0", + "/ipfs/id/push/1.0.0", + "/ipfs/bitswap/1.2.0", + "/ipfs/bitswap/1.1.0", + "/ipfs/bitswap/1.0.0", + "/ipfs/bitswap", + "/ipfs/kad/1.0.0", + "/libp2p/autonat/1.0.0", + "/libp2p/circuit/relay/0.2.0/hop", + "/libp2p/circuit/relay/0.2.0/stop", + "/libp2p/dcutr", + "/meshsub/1.1.0", + "/meshsub/1.0.0", + ]; + let expected_protocol_version = "ipfs/0.1.0"; + let expected_agent_version = "iroh/0.1.0"; + + assert_eq!(expected_peer_id, got.peer_id); + assert!(got.listen_addrs.contains(expected_addr)); + assert_eq!(expected_protocols, got.protocols); + assert_eq!(expected_protocol_version, got.protocol_version); + assert_eq!(expected_agent_version, got.agent_version); + Ok(()) + } + #[tokio::test] async fn test_gossipsub() -> Result<()> { let mut test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?; diff --git a/iroh-p2p/src/rpc.rs b/iroh-p2p/src/rpc.rs index a9125a56051..d1789545108 100644 --- a/iroh-p2p/src/rpc.rs +++ b/iroh-p2p/src/rpc.rs @@ -20,6 +20,7 @@ use tracing::{debug, trace}; use async_trait::async_trait; use iroh_bitswap::Block; +use iroh_rpc_client::Lookup; use iroh_rpc_types::p2p::{ BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest, GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse, GossipsubPeerAndTopics, @@ -319,6 +320,14 @@ impl RpcP2p for P2p { Ok(ack) } + #[tracing::instrument(skip(self))] + async fn lookup_local(&self, _: ()) -> Result { + let (s, r) = oneshot::channel(); + self.sender.send(RpcMessage::LookupLocalPeerInfo(s)).await?; + let lookup = r.await?; + Ok(peer_info_from_lookup(lookup)) + } + #[tracing::instrument(skip(self, req))] async fn lookup(&self, req: LookupRequest) -> Result { let (s, r) = oneshot::channel(); @@ -522,7 +531,18 @@ fn peer_info_from_identify_info(i: IdentifyInfo) -> PeerInfo { .map(|addr| addr.to_vec()) .collect(), protocols: i.protocols, - observed_addr: i.observed_addr.to_vec(), + observed_addrs: vec![i.observed_addr.to_vec()], + } +} + +fn peer_info_from_lookup(l: Lookup) -> PeerInfo { + PeerInfo { + peer_id: l.peer_id.to_bytes(), + protocol_version: l.protocol_version, + agent_version: l.agent_version, + listen_addrs: l.listen_addrs.iter().map(|a| a.to_vec()).collect(), + protocols: l.protocols, + observed_addrs: l.observed_addrs.iter().map(|a| a.to_vec()).collect(), } } @@ -583,6 +603,7 @@ pub enum RpcMessage { ListenForIdentify(oneshot::Sender>, PeerId), CancelListenForIdentify(oneshot::Sender<()>, PeerId), AddressesOfPeer(oneshot::Sender>, PeerId), + LookupLocalPeerInfo(oneshot::Sender), Shutdown, } diff --git a/iroh-rpc-client/src/network.rs b/iroh-rpc-client/src/network.rs index 50913442107..6d6d5a6059f 100644 --- a/iroh-rpc-client/src/network.rs +++ b/iroh-rpc-client/src/network.rs @@ -187,6 +187,12 @@ impl P2pClient { Lookup::from_peer_info(peer_info) } + #[tracing::instrument(skip(self))] + pub async fn lookup_local(&self) -> Result { + let peer_info = self.backend.lookup_local(()).await?; + Lookup::from_peer_info(peer_info) + } + #[tracing::instrument(skip(self))] pub async fn disconnect(&self, peer_id: PeerId) -> Result<()> { warn!("NetDisconnect not yet implemented on p2p node"); @@ -296,14 +302,14 @@ impl Lookup { fn from_peer_info(p: PeerInfo) -> Result { let peer_id = peer_id_from_bytes(p.peer_id)?; let listen_addrs = addrs_from_bytes(p.listen_addrs)?; - let addr = addr_from_bytes(p.observed_addr)?; + let observed_addrs = addrs_from_bytes(p.observed_addrs)?; Ok(Self { peer_id, protocol_version: p.protocol_version, agent_version: p.agent_version, listen_addrs, protocols: p.protocols, - observed_addrs: vec![addr], + observed_addrs, }) } } @@ -523,6 +529,13 @@ mod tests { todo!() } + async fn lookup_local( + &self, + _request: Request<()>, + ) -> Result, tonic::Status> { + todo!() + } + async fn gossipsub_add_explicit_peer( &self, _request: Request, diff --git a/iroh-rpc-types/proto/p2p.proto b/iroh-rpc-types/proto/p2p.proto index 602dfcf473e..5494da8d555 100644 --- a/iroh-rpc-types/proto/p2p.proto +++ b/iroh-rpc-types/proto/p2p.proto @@ -22,6 +22,7 @@ service P2p { rpc PeerDisconnect(DisconnectRequest) returns (google.protobuf.Empty) {} rpc Shutdown(google.protobuf.Empty) returns (google.protobuf.Empty) {} rpc Lookup(LookupRequest) returns (PeerInfo) {} + rpc LookupLocal(google.protobuf.Empty) returns (PeerInfo) {} rpc GossipsubAddExplicitPeer(GossipsubPeerIdMsg) returns (google.protobuf.Empty) {} rpc GossipsubAllMeshPeers(google.protobuf.Empty) returns (GossipsubPeersResponse) {} @@ -132,8 +133,8 @@ message PeerInfo { repeated bytes listen_addrs = 4; // vec of Strings repeated string protocols = 5; - // Multiaddr - bytes observed_addr = 6; + // vec of Multiaddr + repeated bytes observed_addrs = 6; } message Multiaddrs { // Serialized list of multiaddrs diff --git a/iroh-rpc-types/src/p2p.rs b/iroh-rpc-types/src/p2p.rs index 5557e3407b9..c90cb6654e0 100644 --- a/iroh-rpc-types/src/p2p.rs +++ b/iroh-rpc-types/src/p2p.rs @@ -16,6 +16,7 @@ proxy!( peer_connect_by_peer_id: ConnectByPeerIdRequest => () => (), peer_disconnect: DisconnectRequest => () => (), lookup: LookupRequest => PeerInfo => PeerInfo, + lookup_local: () => PeerInfo => PeerInfo, gossipsub_add_explicit_peer: GossipsubPeerIdMsg => () => (), gossipsub_all_mesh_peers: () => GossipsubPeersResponse => GossipsubPeersResponse, gossipsub_all_peers: () => GossipsubAllPeersResponse => GossipsubAllPeersResponse,