Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: get list of ConnectionInfos or an individual node's ConnectionInfo #1435

Merged
merged 13 commits into from
Sep 1, 2023
315 changes: 209 additions & 106 deletions Cargo.lock

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use crate::{
tls,
};

pub use super::magicsock::EndpointInfo as ConnectionInfo;

/// Builder for [MagicEndpoint]
#[derive(Debug)]
pub struct MagicEndpointBuilder {
Expand Down Expand Up @@ -287,6 +289,31 @@ impl MagicEndpoint {
self.msock.my_derp().await
}

/// Get information on all the nodes we have connection information about.
///
/// Includes the node's [`PublicKey`], potential DERP region, its addresses with any known
/// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are
/// currently communicating with that node over a `Direct` (UDP) or `Relay` (DERP) connection.
///
/// Connections are currently only pruned on user action (when we explicitly add a new address
/// to the internal [`crate::netmap::NetworkMap`] in [`MagicEndpoint::add_known_addrs`]), so
/// these connections are not necessarily active connections.
pub async fn connection_infos(&self) -> anyhow::Result<Vec<ConnectionInfo>> {
self.msock.tracked_endpoints().await
}

/// Get connection information about a specific node.
///
/// Includes the node's [`PublicKey`], potential DERP region, its addresses with any known
/// latency, and its [`crate::magicsock::ConnectionType`], which let's us know if we are
/// currently communicating with that node over a `Direct` (UDP) or `Relay` (DERP) connection.
pub async fn connection_info(
&self,
node_id: PublicKey,
) -> anyhow::Result<Option<ConnectionInfo>> {
self.msock.tracked_endpoint(node_id).await
}

/// Connect to a remote endpoint.
///
/// The PublicKey and the ALPN protocol are required. If you happen to know dialable addresses of
Expand Down Expand Up @@ -346,6 +373,9 @@ impl MagicEndpoint {
/// This updates the magic socket's *netmap* with these addresses, which are used as candidates
/// when connecting to this peer (in addition to addresses obtained from a derp server).
///
/// Note: updating the magic socket's *netmap* will also prune any connections that are *not*
/// present in the netmap.
///
/// If no UDP addresses are added, and `derp_region` is `None`, it will error.
/// If no UDP addresses are added, and the given `derp_region` cannot be dialed, it will error.
pub async fn add_known_addrs(
Expand Down
53 changes: 34 additions & 19 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ mod rebinding_conn;
mod timer;
mod udp_actor;

pub use self::endpoint::ConnectionType;
pub use self::endpoint::EndpointInfo;
pub use self::metrics::Metrics;
pub use self::timer::Timer;
Expand Down Expand Up @@ -445,7 +446,7 @@ impl MagicSock {
Ok(c)
}

/// Retrieve information about known peers' endpoints in the network.
/// Retrieve connection information about nodes in the network.
pub async fn tracked_endpoints(&self) -> Result<Vec<EndpointInfo>> {
let (s, r) = sync::oneshot::channel();
self.inner
Expand All @@ -456,6 +457,16 @@ impl MagicSock {
Ok(res)
}

/// Retrieve connection information about a node in the network.
pub async fn tracked_endpoint(&self, node_key: PublicKey) -> Result<Option<EndpointInfo>> {
let (s, r) = sync::oneshot::channel();
self.inner
.actor_sender
.send(ActorMessage::TrackedEndpoint(node_key, s))
.await?;
let res = r.await?;
Ok(res)
}
/// Query for the local endpoints discovered during the last endpoint discovery.
pub async fn local_endpoints(&self) -> Result<Vec<config::Endpoint>> {
let (s, r) = sync::oneshot::channel();
Expand Down Expand Up @@ -831,6 +842,7 @@ impl Drop for WgGuard {
#[allow(clippy::large_enum_variant)]
enum ActorMessage {
TrackedEndpoints(sync::oneshot::Sender<Vec<EndpointInfo>>),
TrackedEndpoint(PublicKey, sync::oneshot::Sender<Option<EndpointInfo>>),
LocalEndpoints(sync::oneshot::Sender<Vec<config::Endpoint>>),
GetMappingAddr(PublicKey, sync::oneshot::Sender<Option<QuicMappedAddr>>),
SetPreferredPort(u16, sync::oneshot::Sender<()>),
Expand All @@ -839,7 +851,7 @@ enum ActorMessage {
CloseOrReconnect(u16, &'static str),
ReStun(&'static str),
EnqueueCallMeMaybe {
derp_addr: u16,
derp_region: u16,
endpoint_id: usize,
},
SendDiscoMessage {
Expand Down Expand Up @@ -981,9 +993,12 @@ impl Actor {
async fn handle_actor_message(&mut self, msg: ActorMessage) -> bool {
match msg {
ActorMessage::TrackedEndpoints(s) => {
let eps: Vec<_> = self.peer_map.endpoints().map(|(_, ep)| ep.info()).collect();
let eps: Vec<_> = self.peer_map.endpoint_infos();
let _ = s.send(eps);
}
ActorMessage::TrackedEndpoint(node_key, s) => {
let _ = s.send(self.peer_map.endpoint_info(&node_key));
}
ActorMessage::LocalEndpoints(s) => {
let eps: Vec<_> = self.last_endpoints.clone();
let _ = s.send(eps);
Expand Down Expand Up @@ -1028,10 +1043,10 @@ impl Actor {
self.re_stun(reason).await;
}
ActorMessage::EnqueueCallMeMaybe {
derp_addr,
derp_region,
endpoint_id,
} => {
self.enqueue_call_me_maybe(derp_addr, endpoint_id).await;
self.enqueue_call_me_maybe(derp_region, endpoint_id).await;
}
ActorMessage::RebindAll(s) => {
self.rebind_all().await;
Expand Down Expand Up @@ -1135,8 +1150,8 @@ impl Actor {

let ep_quic_mapped_addr = match self.peer_map.endpoint_for_node_key_mut(&dm.src) {
Some(ep) => {
if ep.derp_addr().is_none() {
ep.add_derp_addr(region_id);
if ep.derp_region().is_none() {
ep.add_derp_region(region_id);
}
ep.quic_mapped_addr
}
Expand All @@ -1146,7 +1161,7 @@ impl Actor {
msock_sender: self.inner.actor_sender.clone(),
msock_public_key: self.inner.public_key(),
public_key: dm.src,
derp_addr: Some(region_id),
derp_region: Some(region_id),
});
self.peer_map.set_endpoint_for_ip_port(&ipp, id);
let ep = self.peer_map.by_id_mut(&id).expect("inserted");
Expand Down Expand Up @@ -1252,16 +1267,16 @@ impl Actor {
);

match ep.get_send_addrs().await {
Ok((Some(udp_addr), Some(derp_addr))) => {
Ok((Some(udp_addr), Some(derp_region))) => {
let res = self.send_raw(udp_addr, transmits.clone()).await;
self.send_derp(derp_addr, public_key, Self::split_packets(transmits));
self.send_derp(derp_region, public_key, Self::split_packets(transmits));

if let Err(err) = res {
warn!("failed to send UDP: {:?}", err);
}
}
Ok((None, Some(derp_addr))) => {
self.send_derp(derp_addr, public_key, Self::split_packets(transmits));
Ok((None, Some(derp_region))) => {
self.send_derp(derp_region, public_key, Self::split_packets(transmits));
}
Ok((Some(udp_addr), None)) => {
if let Err(err) = self.send_raw(udp_addr, transmits).await {
Expand Down Expand Up @@ -1710,12 +1725,12 @@ impl Actor {
}

#[instrument(skip_all, fields(self.name = %self.inner.name))]
async fn enqueue_call_me_maybe(&mut self, derp_addr: u16, endpoint_id: usize) {
async fn enqueue_call_me_maybe(&mut self, derp_region: u16, endpoint_id: usize) {
let endpoint = self.peer_map.by_id(&endpoint_id);
if endpoint.is_none() {
warn!(
"enqueue_call_me_maybe with invalid endpoint_id called: {} - {}",
derp_addr, endpoint_id
derp_region, endpoint_id
);
return;
}
Expand All @@ -1738,7 +1753,7 @@ impl Actor {
info!("STUN done; sending call-me-maybe",);
msg_sender
.send(ActorMessage::EnqueueCallMeMaybe {
derp_addr,
derp_region,
endpoint_id,
})
.await
Expand All @@ -1761,13 +1776,13 @@ impl Actor {
warn!("sending call me maybe to {public_key:?}");
if let Err(err) = msg_sender
.send(ActorMessage::SendDiscoMessage {
dst: SendAddr::Derp(derp_addr),
dst: SendAddr::Derp(derp_region),
dst_key: public_key,
msg,
})
.await
{
warn!("failed to send disco message to {}: {:?}", derp_addr, err);
warn!("failed to send disco message to {}: {:?}", derp_region, err);
}
});
}
Expand Down Expand Up @@ -2049,7 +2064,7 @@ impl Actor {
msock_sender: self.inner.actor_sender.clone(),
msock_public_key: self.inner.public_key(),
public_key: sender,
derp_addr: src.derp_region(),
derp_region: src.derp_region(),
});
}
self.handle_ping(ping, &sender, src, derp_node_src).await;
Expand Down Expand Up @@ -2215,7 +2230,7 @@ impl Actor {
msock_sender: self.inner.actor_sender.clone(),
msock_public_key: self.inner.public_key(),
public_key: n.key,
derp_addr: n.derp,
derp_region: n.derp,
});
}

Expand Down
Loading
Loading