Skip to content

Commit

Permalink
feat: get list of ConnectionInfos or an individual node's `Connecti…
Browse files Browse the repository at this point in the history
…onInfo` (#1435)

## Description

Display connection information for all nodes we have connected with or
have supplied our node with addresses for.

Display connection information on a particular node, using its node id.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
  • Loading branch information
ramfox committed Sep 1, 2023
1 parent f8bd328 commit bdf966e
Show file tree
Hide file tree
Showing 9 changed files with 838 additions and 192 deletions.
315 changes: 209 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
tls,
};

pub use super::magicsock::EndpointInfo as ConnectionInfo;

/// Builder for [MagicEndpoint]
#[derive(Debug)]
pub struct MagicEndpointBuilder {
Expand Down Expand Up @@ -287,6 +289,31 @@ impl MagicEndpoint {
self.msock.my_derp().await
}

/// Get information on all the nodes we have connection information about.
///
/// Includes the node's [`PublicKey`], potential DERP region, its addresses with any known
/// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are
/// currently communicating with that node over a `Direct` (UDP) or `Relay` (DERP) connection.
///
/// Connections are currently only pruned on user action (when we explicitly add a new address
/// to the internal [`crate::netmap::NetworkMap`] in [`MagicEndpoint::add_known_addrs`]), so
/// these connections are not necessarily active connections.
pub async fn connection_infos(&self) -> anyhow::Result<Vec<ConnectionInfo>> {
self.msock.tracked_endpoints().await
}

/// Get connection information about a specific node.
///
/// Includes the node's [`PublicKey`], potential DERP region, its addresses with any known
/// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are
/// currently communicating with that node over a `Direct` (UDP) or `Relay` (DERP) connection.
pub async fn connection_info(
&self,
node_id: PublicKey,
) -> anyhow::Result<Option<ConnectionInfo>> {
self.msock.tracked_endpoint(node_id).await
}

/// Connect to a remote endpoint.
///
/// The PublicKey and the ALPN protocol are required. If you happen to know dialable addresses of
Expand Down Expand Up @@ -346,6 +373,9 @@ impl MagicEndpoint {
/// This updates the magic socket's *netmap* with these addresses, which are used as candidates
/// when connecting to this peer (in addition to addresses obtained from a derp server).
///
/// Note: updating the magic socket's *netmap* will also prune any connections that are *not*
/// present in the netmap.
///
/// If no UDP addresses are added, and `derp_region` is `None`, it will error.
/// If no UDP addresses are added, and the given `derp_region` cannot be dialed, it will error.
pub async fn add_known_addrs(
Expand Down
53 changes: 34 additions & 19 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mod rebinding_conn;
mod timer;
mod udp_actor;

pub use self::endpoint::ConnectionType;
pub use self::endpoint::EndpointInfo;
pub use self::metrics::Metrics;
pub use self::timer::Timer;
Expand Down Expand Up @@ -445,7 +446,7 @@ impl MagicSock {
Ok(c)
}

/// Retrieve information about known peers' endpoints in the network.
/// Retrieve connection information about nodes in the network.
pub async fn tracked_endpoints(&self) -> Result<Vec<EndpointInfo>> {
let (s, r) = sync::oneshot::channel();
self.inner
Expand All @@ -456,6 +457,16 @@ impl MagicSock {
Ok(res)
}

/// Retrieve connection information about a node in the network.
pub async fn tracked_endpoint(&self, node_key: PublicKey) -> Result<Option<EndpointInfo>> {
let (s, r) = sync::oneshot::channel();
self.inner
.actor_sender
.send(ActorMessage::TrackedEndpoint(node_key, s))
.await?;
let res = r.await?;
Ok(res)
}
/// Query for the local endpoints discovered during the last endpoint discovery.
pub async fn local_endpoints(&self) -> Result<Vec<config::Endpoint>> {
let (s, r) = sync::oneshot::channel();
Expand Down Expand Up @@ -831,6 +842,7 @@ impl Drop for WgGuard {
#[allow(clippy::large_enum_variant)]
enum ActorMessage {
TrackedEndpoints(sync::oneshot::Sender<Vec<EndpointInfo>>),
TrackedEndpoint(PublicKey, sync::oneshot::Sender<Option<EndpointInfo>>),
LocalEndpoints(sync::oneshot::Sender<Vec<config::Endpoint>>),
GetMappingAddr(PublicKey, sync::oneshot::Sender<Option<QuicMappedAddr>>),
SetPreferredPort(u16, sync::oneshot::Sender<()>),
Expand All @@ -839,7 +851,7 @@ enum ActorMessage {
CloseOrReconnect(u16, &'static str),
ReStun(&'static str),
EnqueueCallMeMaybe {
derp_addr: u16,
derp_region: u16,
endpoint_id: usize,
},
SendDiscoMessage {
Expand Down Expand Up @@ -981,9 +993,12 @@ impl Actor {
async fn handle_actor_message(&mut self, msg: ActorMessage) -> bool {
match msg {
ActorMessage::TrackedEndpoints(s) => {
let eps: Vec<_> = self.peer_map.endpoints().map(|(_, ep)| ep.info()).collect();
let eps: Vec<_> = self.peer_map.endpoint_infos();
let _ = s.send(eps);
}
ActorMessage::TrackedEndpoint(node_key, s) => {
let _ = s.send(self.peer_map.endpoint_info(&node_key));
}
ActorMessage::LocalEndpoints(s) => {
let eps: Vec<_> = self.last_endpoints.clone();
let _ = s.send(eps);
Expand Down Expand Up @@ -1028,10 +1043,10 @@ impl Actor {
self.re_stun(reason).await;
}
ActorMessage::EnqueueCallMeMaybe {
derp_addr,
derp_region,
endpoint_id,
} => {
self.enqueue_call_me_maybe(derp_addr, endpoint_id).await;
self.enqueue_call_me_maybe(derp_region, endpoint_id).await;
}
ActorMessage::RebindAll(s) => {
self.rebind_all().await;
Expand Down Expand Up @@ -1135,8 +1150,8 @@ impl Actor {

let ep_quic_mapped_addr = match self.peer_map.endpoint_for_node_key_mut(&dm.src) {
Some(ep) => {
if ep.derp_addr().is_none() {
ep.add_derp_addr(region_id);
if ep.derp_region().is_none() {
ep.add_derp_region(region_id);
}
ep.quic_mapped_addr
}
Expand All @@ -1146,7 +1161,7 @@ impl Actor {
msock_sender: self.inner.actor_sender.clone(),
msock_public_key: self.inner.public_key(),
public_key: dm.src,
derp_addr: Some(region_id),
derp_region: Some(region_id),
});
self.peer_map.set_endpoint_for_ip_port(&ipp, id);
let ep = self.peer_map.by_id_mut(&id).expect("inserted");
Expand Down Expand Up @@ -1252,16 +1267,16 @@ impl Actor {
);

match ep.get_send_addrs().await {
Ok((Some(udp_addr), Some(derp_addr))) => {
Ok((Some(udp_addr), Some(derp_region))) => {
let res = self.send_raw(udp_addr, transmits.clone()).await;
self.send_derp(derp_addr, public_key, Self::split_packets(transmits));
self.send_derp(derp_region, public_key, Self::split_packets(transmits));

if let Err(err) = res {
warn!("failed to send UDP: {:?}", err);
}
}
Ok((None, Some(derp_addr))) => {
self.send_derp(derp_addr, public_key, Self::split_packets(transmits));
Ok((None, Some(derp_region))) => {
self.send_derp(derp_region, public_key, Self::split_packets(transmits));
}
Ok((Some(udp_addr), None)) => {
if let Err(err) = self.send_raw(udp_addr, transmits).await {
Expand Down Expand Up @@ -1710,12 +1725,12 @@ impl Actor {
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
async fn enqueue_call_me_maybe(&mut self, derp_addr: u16, endpoint_id: usize) {
async fn enqueue_call_me_maybe(&mut self, derp_region: u16, endpoint_id: usize) {
let endpoint = self.peer_map.by_id(&endpoint_id);
if endpoint.is_none() {
warn!(
"enqueue_call_me_maybe with invalid endpoint_id called: {} - {}",
derp_addr, endpoint_id
derp_region, endpoint_id
);
return;
}
Expand All @@ -1738,7 +1753,7 @@ impl Actor {
info!("STUN done; sending call-me-maybe",);
msg_sender
.send(ActorMessage::EnqueueCallMeMaybe {
derp_addr,
derp_region,
endpoint_id,
})
.await
Expand All @@ -1761,13 +1776,13 @@ impl Actor {
warn!("sending call me maybe to {public_key:?}");
if let Err(err) = msg_sender
.send(ActorMessage::SendDiscoMessage {
dst: SendAddr::Derp(derp_addr),
dst: SendAddr::Derp(derp_region),
dst_key: public_key,
msg,
})
.await
{
warn!("failed to send disco message to {}: {:?}", derp_addr, err);
warn!("failed to send disco message to {}: {:?}", derp_region, err);
}
});
}
Expand Down Expand Up @@ -2049,7 +2064,7 @@ impl Actor {
msock_sender: self.inner.actor_sender.clone(),
msock_public_key: self.inner.public_key(),
public_key: sender,
derp_addr: src.derp_region(),
derp_region: src.derp_region(),
});
}
self.handle_ping(ping, &sender, src, derp_node_src).await;
Expand Down Expand Up @@ -2215,7 +2230,7 @@ impl Actor {
msock_sender: self.inner.actor_sender.clone(),
msock_public_key: self.inner.public_key(),
public_key: n.key,
derp_addr: n.derp,
derp_region: n.derp,
});
}

Expand Down
Loading

0 comments on commit bdf966e

Please sign in to comment.