Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions iroh-api/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,7 @@ impl P2p {
}

pub async fn lookup_local(&self) -> Result<Lookup> {
let (_, listen_addrs) = self
.client
.get_listening_addrs()
.await
.map_err(|e| map_service_error("p2p", e))?;
Ok(Lookup {
peer_id: self.client.local_peer_id().await?,
listen_addrs,
observed_addrs: self.client.external_addresses().await?,
protocol_version: String::new(),
agent_version: String::new(),
protocols: Default::default(),
})
self.client.lookup_local().await
}

pub async fn lookup(&self, addr: &PeerIdOrAddr) -> Result<Lookup> {
Expand Down
7 changes: 5 additions & 2 deletions iroh-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ use crate::config::Libp2pConfig;
mod event;
mod peer_manager;

pub const PROTOCOL_VERSION: &str = "ipfs/0.1.0";
pub const AGENT_VERSION: &str = concat!("iroh/", env!("CARGO_PKG_VERSION"));

/// Libp2p behaviour for the node.
#[derive(NetworkBehaviour)]
#[behaviour(out_event = "Event")]
Expand Down Expand Up @@ -188,8 +191,8 @@ impl NodeBehaviour {
};

let identify = {
let config = identify::Config::new("ipfs/0.1.0".into(), local_key.public())
.with_agent_version(format!("iroh/{}", env!("CARGO_PKG_VERSION")))
let config = identify::Config::new(PROTOCOL_VERSION.into(), local_key.public())
.with_agent_version(String::from(AGENT_VERSION))
.with_cache_size(64 * 1024);
identify::Behaviour::new(config)
};
Expand Down
26 changes: 25 additions & 1 deletion iroh-p2p/src/behaviour/peer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use lru::LruCache;
pub struct PeerManager {
info: AHashMap<PeerId, Info>,
bad_peers: LruCache<PeerId, ()>,
supported_protocols: Vec<String>,
}

#[derive(Default, Debug, Clone)]
Expand All @@ -43,6 +44,7 @@ impl Default for PeerManager {
PeerManager {
info: Default::default(),
bad_peers: LruCache::new(DEFAULT_BAD_PEER_CAP.unwrap()),
supported_protocols: Default::default(),
}
}
}
Expand All @@ -68,6 +70,10 @@ impl PeerManager {
pub fn info_for_peer(&self, peer_id: &PeerId) -> Option<&Info> {
self.info.get(peer_id)
}

pub fn supported_protocols(&self) -> Vec<String> {
self.supported_protocols.clone()
}
}

impl NetworkBehaviour for PeerManager {
Expand Down Expand Up @@ -186,8 +192,26 @@ impl NetworkBehaviour for PeerManager {
fn poll(
&mut self,
_cx: &mut Context<'_>,
_params: &mut impl PollParameters,
params: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
// TODO(ramfox):
// We can only get the supported protocols of the local node by examining the
// `PollParameters`, which mean you can only get the supported protocols by examining the
// `PollParameters` in this method (`poll`) of a network behaviour.
// I injected this responsibility in the `peer_manager`, because it's the only "simple"
// network behaviour we have implemented.
// There is an issue up to remove `PollParameters`, and a discussion into how to instead
// get the `supported_protocols` of the node:
// https://github.com/libp2p/rust-libp2p/issues/3124
// When that is resolved, we can hopefully remove this responsibility from the `peer_manager`,
// where it, frankly, doesn't belong.
if self.supported_protocols.is_empty() {
self.supported_protocols = params
.supported_protocols()
.map(|p| String::from_utf8_lossy(&p).to_string())
.collect();
}

Comment on lines 192 to +214
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dignifiedquire would specifically like your eyes on this.

I gave some rationale as to why this is how I decided to proceed, but I'm aware this is an inelegant solution to an inelegant problem.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems fine to me, we can just update this code once libp2p provides a better method

Poll::Pending
}
}
66 changes: 64 additions & 2 deletions iroh-p2p/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use tokio::task::JoinHandle;
use tracing::{debug, error, info, trace, warn};

use iroh_bitswap::{BitswapEvent, Block};
use iroh_rpc_client::Lookup;

use crate::keys::{Keychain, Storage};
use crate::providers::Providers;
Expand Down Expand Up @@ -953,6 +954,29 @@ impl<KeyStorage: Storage> Node<KeyStorage> {
response_channel.send(None).ok();
}
}
RpcMessage::LookupLocalPeerInfo(response_channel) => {
let peer_id = self.swarm.local_peer_id();
let listen_addrs = self.swarm.listeners().cloned().collect();
let observed_addrs = self
.swarm
.external_addresses()
.map(|a| a.addr.clone())
.collect();
let protocol_version = String::from(crate::behaviour::PROTOCOL_VERSION);
let agent_version = String::from(crate::behaviour::AGENT_VERSION);
let protocols = self.swarm.behaviour().peer_manager.supported_protocols();

response_channel
.send(Lookup {
peer_id: *peer_id,
listen_addrs,
observed_addrs,
agent_version,
protocol_version,
protocols,
})
.ok();
}
RpcMessage::CancelListenForIdentify(response_channel, peer_id) => {
self.lookup_queries.remove(&peer_id);
response_channel.send(()).ok();
Expand Down Expand Up @@ -1312,6 +1336,12 @@ mod tests {
let peer_id_b = test_runner_b.client.local_peer_id().await?;
assert_eq!(test_runner_b.peer_id, peer_id_b);

let lookup_a = test_runner_a.client.lookup_local().await?;
// since we aren't connected to any other nodes, we should not
// have any information about our observed addresses
assert!(lookup_a.observed_addrs.is_empty());
assert_lookup(lookup_a, test_runner_a.peer_id, &test_runner_a.addr)?;

// connect
test_runner_a.client.connect(peer_id_b, addrs_b).await?;
// Make sure we have exchanged identity information
Expand All @@ -1323,8 +1353,7 @@ mod tests {

// lookup
let lookup_b = test_runner_a.client.lookup(peer_id_b, None).await?;
assert_eq!(peer_id_b, lookup_b.peer_id);

assert_lookup(lookup_b, test_runner_b.peer_id, &test_runner_b.addr)?;
// now that we are connected & have exchanged identity information,
// we should now be able to view the node's external addrs
// these are the addresses that other nodes tell you "this is the address I see for you"
Expand All @@ -1339,6 +1368,39 @@ mod tests {
Ok(())
}

// assert_lookup ensures each part of the lookup is equal
fn assert_lookup(
got: Lookup,
expected_peer_id: PeerId,
expected_addr: &Multiaddr,
) -> Result<()> {
let expected_protocols = vec![
"/ipfs/ping/1.0.0",
"/ipfs/id/1.0.0",
"/ipfs/id/push/1.0.0",
"/ipfs/bitswap/1.2.0",
"/ipfs/bitswap/1.1.0",
"/ipfs/bitswap/1.0.0",
"/ipfs/bitswap",
"/ipfs/kad/1.0.0",
"/libp2p/autonat/1.0.0",
"/libp2p/circuit/relay/0.2.0/hop",
"/libp2p/circuit/relay/0.2.0/stop",
"/libp2p/dcutr",
"/meshsub/1.1.0",
"/meshsub/1.0.0",
];
let expected_protocol_version = "ipfs/0.1.0";
let expected_agent_version = "iroh/0.1.0";

assert_eq!(expected_peer_id, got.peer_id);
assert!(got.listen_addrs.contains(expected_addr));
assert_eq!(expected_protocols, got.protocols);
assert_eq!(expected_protocol_version, got.protocol_version);
assert_eq!(expected_agent_version, got.agent_version);
Ok(())
}

#[tokio::test]
async fn test_gossipsub() -> Result<()> {
let mut test_runner_a = TestRunnerBuilder::new().no_bootstrap().build().await?;
Expand Down
23 changes: 22 additions & 1 deletion iroh-p2p/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use tracing::{debug, trace};

use async_trait::async_trait;
use iroh_bitswap::Block;
use iroh_rpc_client::Lookup;
use iroh_rpc_types::p2p::{
BitswapRequest, BitswapResponse, ConnectByPeerIdRequest, ConnectRequest, DisconnectRequest,
GetListeningAddrsResponse, GetPeersResponse, GossipsubAllPeersResponse, GossipsubPeerAndTopics,
Expand Down Expand Up @@ -319,6 +320,14 @@ impl RpcP2p for P2p {
Ok(ack)
}

#[tracing::instrument(skip(self))]
async fn lookup_local(&self, _: ()) -> Result<PeerInfo> {
let (s, r) = oneshot::channel();
self.sender.send(RpcMessage::LookupLocalPeerInfo(s)).await?;
let lookup = r.await?;
Ok(peer_info_from_lookup(lookup))
}

#[tracing::instrument(skip(self, req))]
async fn lookup(&self, req: LookupRequest) -> Result<PeerInfo> {
let (s, r) = oneshot::channel();
Expand Down Expand Up @@ -522,7 +531,18 @@ fn peer_info_from_identify_info(i: IdentifyInfo) -> PeerInfo {
.map(|addr| addr.to_vec())
.collect(),
protocols: i.protocols,
observed_addr: i.observed_addr.to_vec(),
observed_addrs: vec![i.observed_addr.to_vec()],
}
}

fn peer_info_from_lookup(l: Lookup) -> PeerInfo {
PeerInfo {
peer_id: l.peer_id.to_bytes(),
protocol_version: l.protocol_version,
agent_version: l.agent_version,
listen_addrs: l.listen_addrs.iter().map(|a| a.to_vec()).collect(),
protocols: l.protocols,
observed_addrs: l.observed_addrs.iter().map(|a| a.to_vec()).collect(),
}
}

Expand Down Expand Up @@ -583,6 +603,7 @@ pub enum RpcMessage {
ListenForIdentify(oneshot::Sender<Result<IdentifyInfo>>, PeerId),
CancelListenForIdentify(oneshot::Sender<()>, PeerId),
AddressesOfPeer(oneshot::Sender<Vec<Multiaddr>>, PeerId),
LookupLocalPeerInfo(oneshot::Sender<Lookup>),
Shutdown,
}

Expand Down
17 changes: 15 additions & 2 deletions iroh-rpc-client/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,12 @@ impl P2pClient {
Lookup::from_peer_info(peer_info)
}

#[tracing::instrument(skip(self))]
pub async fn lookup_local(&self) -> Result<Lookup> {
let peer_info = self.backend.lookup_local(()).await?;
Lookup::from_peer_info(peer_info)
}

#[tracing::instrument(skip(self))]
pub async fn disconnect(&self, peer_id: PeerId) -> Result<()> {
warn!("NetDisconnect not yet implemented on p2p node");
Expand Down Expand Up @@ -296,14 +302,14 @@ impl Lookup {
fn from_peer_info(p: PeerInfo) -> Result<Self> {
let peer_id = peer_id_from_bytes(p.peer_id)?;
let listen_addrs = addrs_from_bytes(p.listen_addrs)?;
let addr = addr_from_bytes(p.observed_addr)?;
let observed_addrs = addrs_from_bytes(p.observed_addrs)?;
Ok(Self {
peer_id,
protocol_version: p.protocol_version,
agent_version: p.agent_version,
listen_addrs,
protocols: p.protocols,
observed_addrs: vec![addr],
observed_addrs,
})
}
}
Expand Down Expand Up @@ -523,6 +529,13 @@ mod tests {
todo!()
}

async fn lookup_local(
&self,
_request: Request<()>,
) -> Result<tonic::Response<PeerInfo>, tonic::Status> {
todo!()
}

async fn gossipsub_add_explicit_peer(
&self,
_request: Request<GossipsubPeerIdMsg>,
Expand Down
5 changes: 3 additions & 2 deletions iroh-rpc-types/proto/p2p.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ service P2p {
rpc PeerDisconnect(DisconnectRequest) returns (google.protobuf.Empty) {}
rpc Shutdown(google.protobuf.Empty) returns (google.protobuf.Empty) {}
rpc Lookup(LookupRequest) returns (PeerInfo) {}
rpc LookupLocal(google.protobuf.Empty) returns (PeerInfo) {}

rpc GossipsubAddExplicitPeer(GossipsubPeerIdMsg) returns (google.protobuf.Empty) {}
rpc GossipsubAllMeshPeers(google.protobuf.Empty) returns (GossipsubPeersResponse) {}
Expand Down Expand Up @@ -132,8 +133,8 @@ message PeerInfo {
repeated bytes listen_addrs = 4;
// vec of Strings
repeated string protocols = 5;
// Multiaddr
bytes observed_addr = 6;
// vec of Multiaddr
repeated bytes observed_addrs = 6;
}
message Multiaddrs {
// Serialized list of multiaddrs
Expand Down
1 change: 1 addition & 0 deletions iroh-rpc-types/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ proxy!(
peer_connect_by_peer_id: ConnectByPeerIdRequest => () => (),
peer_disconnect: DisconnectRequest => () => (),
lookup: LookupRequest => PeerInfo => PeerInfo,
lookup_local: () => PeerInfo => PeerInfo,
gossipsub_add_explicit_peer: GossipsubPeerIdMsg => () => (),
gossipsub_all_mesh_peers: () => GossipsubPeersResponse => GossipsubPeersResponse,
gossipsub_all_peers: () => GossipsubAllPeersResponse => GossipsubAllPeersResponse,
Expand Down