diff --git a/iroh/examples/locally-discovered-nodes.rs b/iroh/examples/locally-discovered-nodes.rs index 5257f6b128..ab76cc9f72 100644 --- a/iroh/examples/locally-discovered-nodes.rs +++ b/iroh/examples/locally-discovered-nodes.rs @@ -5,7 +5,11 @@ //! This is an async, non-determinate process, so the number of NodeIDs discovered each time may be different. If you have other iroh endpoints or iroh nodes with [`MdnsDiscovery`] enabled, it may discover those nodes as well. use std::time::Duration; -use iroh::{Endpoint, NodeId, discovery::DiscoveryEvent, node_info::UserData}; +use iroh::{ + Endpoint, NodeId, + discovery::mdns::{DiscoveryEvent, MdnsDiscovery}, + node_info::UserData, +}; use n0_future::StreamExt; use n0_snafu::Result; use tokio::task::JoinSet; @@ -15,28 +19,27 @@ async fn main() -> Result<()> { tracing_subscriber::fmt::init(); println!("Discovering Local Nodes Example!"); - let ep = Endpoint::builder().discovery_local_network().bind().await?; + let ep = Endpoint::builder().bind().await?; let node_id = ep.node_id(); + + let mdns = MdnsDiscovery::builder().build(node_id)?; + ep.discovery().add(mdns.clone()); + println!("Created endpoint {}", node_id.fmt_short()); let user_data = UserData::try_from(String::from("local-nodes-example"))?; - let mut discovery_stream = ep.discovery_stream(); - let ud = user_data.clone(); let discovery_stream_task = tokio::spawn(async move { + let mut discovery_stream = mdns.subscribe().await; let mut discovered_nodes: Vec = vec![]; - while let Some(item) = discovery_stream.next().await { - match item { - Err(e) => { - tracing::error!("{e}"); - return; - } - Ok(DiscoveryEvent::Discovered(item)) => { + while let Some(event) = discovery_stream.next().await { + match event { + DiscoveryEvent::Discovered { node_info, .. } => { // if there is no user data, or the user data // does not indicate that the discovered node // is a part of the example, ignore it - match item.node_info().data.user_data() { + match node_info.data.user_data() { Some(user_data) if &ud == user_data => {} _ => { tracing::error!("found node with unexpected user data, ignoring it"); @@ -46,14 +49,14 @@ async fn main() -> Result<()> { // if we've already found this node, ignore it // otherwise announce that we have found a new node - if discovered_nodes.contains(&item.node_id()) { + if discovered_nodes.contains(&node_info.node_id) { continue; } else { - discovered_nodes.push(item.node_id()); - println!("Found node {}!", item.node_id().fmt_short()); + discovered_nodes.push(node_info.node_id); + println!("Found node {}!", node_info.node_id.fmt_short()); } } - Ok(DiscoveryEvent::Expired(_)) => {} + DiscoveryEvent::Expired { .. } => {} }; } }); diff --git a/iroh/src/discovery.rs b/iroh/src/discovery.rs index 2948e340f7..35a0b0b67c 100644 --- a/iroh/src/discovery.rs +++ b/iroh/src/discovery.rs @@ -7,8 +7,7 @@ //! connect directly with a [`NodeId`]. //! //! For this to work however, the endpoint has to get the addressing information by -//! other means. This can be done by manually calling [`Endpoint::add_node_addr`], -//! but that still requires knowing the other addressing information. +//! other means. //! //! Node discovery is an automated system for an [`Endpoint`] to retrieve this addressing //! information. Each iroh node will automatically publish their own addressing @@ -109,11 +108,10 @@ //! [`MdnsDiscovery`]: mdns::MdnsDiscovery //! [`StaticProvider`]: static_provider::StaticProvider -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use iroh_base::{NodeAddr, NodeId}; use n0_future::{ - Stream, TryStreamExt, boxed::BoxStream, stream::StreamExt, task::{self, AbortOnDropHandle}, @@ -329,50 +327,14 @@ pub trait Discovery: std::fmt::Debug + Send + Sync + 'static { ) -> Option>> { None } - - /// Subscribe to all addresses that get *passively* discovered. - /// - /// An implementation may choose to defer emitting passively discovered nodes - /// until the stream is actually polled. To avoid missing discovered nodes, - /// poll the stream as soon as possible. - /// - /// If you do not regularly poll the stream, you may miss discovered nodes. - /// - /// Any discovery systems that only discover when explicitly resolving a - /// specific [`NodeId`] do not need to implement this method. Any nodes or - /// addresses that are discovered by calling `resolve` should NOT be added - /// to the `subscribe` stream. - /// - /// Discovery systems that are capable of receiving information about [`NodeId`]s - /// and their addressing information without explicitly calling `resolve`, i.e., - /// systems that do "passive" discovery, should implement this method. If - /// `subscribe` is called multiple times, the passively discovered addresses - /// should be sent on all streams. - /// - /// The [`crate::endpoint::Endpoint`] will `subscribe` to the discovery system - /// and add the discovered addresses to the internal address book as they arrive - /// on this stream. - fn subscribe(&self) -> Option> { - None - } } impl Discovery for Arc {} -/// An event emitted from [`Discovery`] services. -#[derive(Debug, Clone, Eq, PartialEq)] -pub enum DiscoveryEvent { - /// A peer was discovered or it's information was updated. - Discovered(DiscoveryItem), - /// A peer was expired due to being inactive, unreachable, or otherwise - /// unavailable. - Expired(NodeId), -} - /// Node discovery results from [`Discovery`] services. /// -/// This is the item in the streams returned from [`Discovery::resolve`] and -/// [`Discovery::subscribe`]. It contains the [`NodeData`] about the discovered node, +/// This is the item in the streams returned from [`Discovery::resolve`]. +/// It contains the [`NodeData`] about the discovered node, /// and some additional metadata about the discovery. /// /// This struct derefs to [`NodeData`], so you can access the methods from [`NodeData`] @@ -459,9 +421,9 @@ impl From for NodeInfo { /// A discovery service that combines multiple discovery sources. /// /// The discovery services will resolve concurrently. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub struct ConcurrentDiscovery { - services: Vec>, + services: Arc>>>, } impl ConcurrentDiscovery { @@ -472,12 +434,27 @@ impl ConcurrentDiscovery { /// Creates a new [`ConcurrentDiscovery`]. pub fn from_services(services: Vec>) -> Self { - Self { services } + Self { + services: Arc::new(RwLock::new(services)), + } } /// Adds a [`Discovery`] service. - pub fn add(&mut self, service: impl Discovery + 'static) { - self.services.push(Box::new(service)); + pub fn add(&self, service: impl Discovery + 'static) { + self.services + .write() + .expect("poisoned") + .push(Box::new(service)); + } + + /// Is there any services configured? + pub fn is_empty(&self) -> bool { + self.services.read().expect("poisoned").is_empty() + } + + /// How many services are configured + pub fn len(&self) -> usize { + self.services.read().expect("poisoned").len() } } @@ -487,38 +464,29 @@ where { fn from(iter: T) -> Self { let services = iter.into_iter().collect::>(); - Self { services } + Self { + services: Arc::new(RwLock::new(services)), + } } } impl Discovery for ConcurrentDiscovery { fn publish(&self, data: &NodeData) { - for service in &self.services { + let services = self.services.read().expect("poisoned"); + for service in &*services { service.publish(data); } } fn resolve(&self, node_id: NodeId) -> Option>> { - let streams = self - .services + let services = self.services.read().expect("poisoned"); + let streams = services .iter() .filter_map(|service| service.resolve(node_id)); let streams = n0_future::MergeBounded::from_iter(streams); Some(Box::pin(streams)) } - - fn subscribe(&self) -> Option> { - let mut streams = vec![]; - for service in self.services.iter() { - if let Some(stream) = service.subscribe() { - streams.push(stream) - } - } - - let streams = n0_future::MergeBounded::from_iter(streams); - Some(Box::pin(streams)) - } } /// Maximum duration since the last control or data message received from an endpoint to make us @@ -534,7 +502,7 @@ pub(super) struct DiscoveryTask { impl DiscoveryTask { /// Starts a discovery task. pub(super) fn start(ep: Endpoint, node_id: NodeId) -> Result { - ensure!(ep.discovery().is_some(), NoServiceConfiguredSnafu); + ensure!(!ep.discovery().is_empty(), NoServiceConfiguredSnafu); let (on_first_tx, on_first_rx) = oneshot::channel(); let me = ep.node_id(); let task = task::spawn( @@ -562,10 +530,10 @@ impl DiscoveryTask { delay: Option, ) -> Result, DiscoveryError> { // If discovery is not needed, don't even spawn a task. - if !Self::needs_discovery(ep, node_id) { + if !ep.needs_discovery(node_id, MAX_AGE) { return Ok(None); } - ensure!(ep.discovery().is_some(), NoServiceConfiguredSnafu); + ensure!(!ep.discovery().is_empty(), NoServiceConfiguredSnafu); let (on_first_tx, on_first_rx) = oneshot::channel(); let ep = ep.clone(); let me = ep.node_id(); @@ -574,7 +542,7 @@ impl DiscoveryTask { // If delay is set, wait and recheck if discovery is needed. If not, early-exit. if let Some(delay) = delay { time::sleep(delay).await; - if !Self::needs_discovery(&ep, node_id) { + if !ep.needs_discovery(node_id, MAX_AGE) { debug!("no discovery needed, abort"); on_first_tx.send(Ok(())).ok(); return; @@ -603,37 +571,14 @@ impl DiscoveryTask { ep: &Endpoint, node_id: NodeId, ) -> Result>, DiscoveryError> { - let discovery = ep.discovery().ok_or(NoServiceConfiguredSnafu.build())?; - let stream = discovery + ensure!(!ep.discovery().is_empty(), NoServiceConfiguredSnafu); + let stream = ep + .discovery() .resolve(node_id) .ok_or(NoResultsSnafu { node_id }.build())?; Ok(stream) } - /// We need discovery if we have no paths to the node, or if the paths we do have - /// have timed out. - fn needs_discovery(ep: &Endpoint, node_id: NodeId) -> bool { - match ep.remote_info(node_id) { - // No info means no path to node -> start discovery. - None => true, - Some(info) => { - match ( - info.last_received(), - info.relay_url.as_ref().and_then(|r| r.last_alive), - ) { - // No path to node -> start discovery. - (None, None) => true, - // If we haven't received on direct addresses or the relay for MAX_AGE, - // start discovery. - (Some(elapsed), Some(elapsed_relay)) => { - elapsed > MAX_AGE && elapsed_relay > MAX_AGE - } - (Some(elapsed), _) | (_, Some(elapsed)) => elapsed > MAX_AGE, - } - } - } - } - async fn run( ep: Endpoint, node_id: NodeId, @@ -663,9 +608,6 @@ impl DiscoveryTask { if let Some(tx) = on_first_tx.take() { tx.send(Ok(())).ok(); } - // Send the discovery item to the subscribers of the discovery broadcast stream. - ep.discovery_subscribers() - .send(DiscoveryEvent::Discovered(r)); } Some(Err(err)) => { warn!(?err, "discovery service produced error"); @@ -680,50 +622,6 @@ impl DiscoveryTask { } } -/// Error returned when a discovery watch stream lagged too far behind. -/// -/// The stream returned from [`Endpoint::discovery_stream`] yields this error -/// if the loop in which the stream is processed cannot keep up with the emitted -/// discovery events. Attempting to read the next item from the channel afterwards -/// will return the oldest [`DiscoveryItem`] that is still retained. -/// -/// Includes the number of skipped messages. -#[derive(Debug, Snafu)] -#[snafu(display("channel lagged by {val}"))] -pub struct Lagged { - /// The number of skipped messages - pub val: u64, -} - -#[derive(Clone, Debug)] -pub(super) struct DiscoverySubscribers { - inner: tokio::sync::broadcast::Sender, -} - -impl DiscoverySubscribers { - pub(crate) fn new() -> Self { - // TODO: Make capacity configurable from the endpoint builder? - // This is the maximum number of [`DiscoveryItem`]s held by the channel if - // subscribers are stalled. - const CAPACITY: usize = 128; - Self { - inner: tokio::sync::broadcast::Sender::new(CAPACITY), - } - } - - pub(crate) fn subscribe(&self) -> impl Stream> + use<> { - use tokio_stream::wrappers::{BroadcastStream, errors::BroadcastStreamRecvError}; - let recv = self.inner.subscribe(); - BroadcastStream::new(recv).map_err(|BroadcastStreamRecvError::Lagged(n)| Lagged { val: n }) - } - - pub(crate) fn send(&self, item: DiscoveryEvent) { - // `broadcast::Sender::send` returns an error if the channel has no subscribers, - // which we don't care about. - self.inner.send(item).ok(); - } -} - #[cfg(test)] mod tests { use std::{ @@ -746,19 +644,9 @@ mod tests { type InfoStore = HashMap; - #[derive(Debug, Clone)] + #[derive(Debug, Clone, Default)] struct TestDiscoveryShared { nodes: Arc>, - watchers: tokio::sync::broadcast::Sender, - } - - impl Default for TestDiscoveryShared { - fn default() -> Self { - Self { - nodes: Default::default(), - watchers: tokio::sync::broadcast::Sender::new(1024), - } - } } impl TestDiscoveryShared { @@ -781,10 +669,6 @@ mod tests { delay: Duration::from_millis(100), } } - - pub fn send_passive(&self, item: DiscoveryEvent) { - self.watchers.send(item).ok(); - } } #[derive(Debug)] @@ -842,13 +726,6 @@ mod tests { }; Some(stream) } - - fn subscribe(&self) -> Option> { - let recv = self.shared.watchers.subscribe(); - let stream = - tokio_stream::wrappers::BroadcastStream::new(recv).filter_map(|item| item.ok()); - Some(Box::pin(stream)) - } } #[derive(Debug, Clone)] @@ -903,7 +780,7 @@ mod tests { let secret = SecretKey::generate(rand::thread_rng()); let disco1 = EmptyDiscovery; let disco2 = disco_shared.create_discovery(secret.public()); - let mut disco = ConcurrentDiscovery::empty(); + let disco = ConcurrentDiscovery::empty(); disco.add(disco1); disco.add(disco2); new_endpoint(secret, disco).await @@ -935,7 +812,7 @@ mod tests { let disco1 = EmptyDiscovery; let disco2 = disco_shared.create_lying_discovery(secret.public()); let disco3 = disco_shared.create_discovery(secret.public()); - let mut disco = ConcurrentDiscovery::empty(); + let disco = ConcurrentDiscovery::empty(); disco.add(disco1); disco.add(disco2); disco.add(disco3); @@ -1007,61 +884,6 @@ mod tests { Ok(()) } - #[tokio::test] - #[traced_test] - async fn endpoint_discovery_watch() -> Result { - let disco_shared = TestDiscoveryShared::default(); - let (ep1, _guard1) = { - let secret = SecretKey::generate(rand::thread_rng()); - let disco = disco_shared.create_discovery(secret.public()); - new_endpoint(secret, disco).await - }; - let (ep2, _guard2) = { - let secret = SecretKey::generate(rand::thread_rng()); - let disco = disco_shared.create_discovery(secret.public()); - new_endpoint(secret, disco).await - }; - - let mut stream = ep1.discovery_stream(); - - // wait for ep2 node addr to be updated and connect from ep1 -> discovery via resolve - ep2.node_addr().initialized().await; - let _ = ep1.connect(ep2.node_id(), TEST_ALPN).await?; - - let DiscoveryEvent::Discovered(item) = - tokio::time::timeout(Duration::from_secs(1), stream.next()) - .await - .expect("timeout") - .expect("stream closed") - .expect("stream lagged") - else { - panic!("Returned unexpected discovery event!"); - }; - - assert_eq!(item.node_id(), ep2.node_id()); - assert_eq!(item.provenance(), "test-disco"); - - // inject item into discovery passively - let passive_node_id = SecretKey::generate(rand::thread_rng()).public(); - let node_info = NodeInfo::new(passive_node_id); - let passive_item = DiscoveryItem::new(node_info, "test-disco-passive", None); - disco_shared.send_passive(DiscoveryEvent::Discovered(passive_item.clone())); - - let DiscoveryEvent::Discovered(item) = - tokio::time::timeout(Duration::from_secs(1), stream.next()) - .await - .expect("timeout") - .expect("stream closed") - .expect("stream lagged") - else { - panic!("Returned unexpected discovery event!"); - }; - assert_eq!(item.node_id(), passive_node_id); - assert_eq!(item.provenance(), "test-disco-passive"); - - Ok(()) - } - async fn new_endpoint( secret: SecretKey, disco: impl IntoDiscovery + 'static, diff --git a/iroh/src/discovery/mdns.rs b/iroh/src/discovery/mdns.rs index 7be948cf5a..eb1de2f986 100644 --- a/iroh/src/discovery/mdns.rs +++ b/iroh/src/discovery/mdns.rs @@ -5,39 +5,49 @@ //! //! When [`MdnsDiscovery`] is enabled, it's possible to get a list of the locally discovered nodes by filtering a list of `RemoteInfo`s. //! -//! ``` +//! ```no_run //! use std::time::Duration; //! -//! use iroh::endpoint::{Endpoint, Source}; +//! use iroh::{ +//! SecretKey, +//! discovery::mdns::{DiscoveryEvent, MdnsDiscovery}, +//! endpoint::{Endpoint, Source}, +//! }; +//! use n0_future::StreamExt; //! //! #[tokio::main] //! async fn main() { //! let recent = Duration::from_secs(600); // 10 minutes in seconds -//! //! let endpoint = Endpoint::builder().bind().await.unwrap(); -//! let remotes = endpoint.remote_info_iter(); -//! let locally_discovered: Vec<_> = remotes -//! .filter(|remote| { -//! remote.sources().iter().any(|(source, duration)| { -//! if let Source::Discovery { name } = source { -//! name == iroh::discovery::mdns::NAME && *duration <= recent -//! } else { -//! false -//! } -//! }) -//! }) -//! .collect(); -//! println!("locally discovered nodes: {locally_discovered:?}"); +//! +//! // Register the discovery services with the endpoint +//! let mdns = MdnsDiscovery::builder().build(endpoint.node_id()).unwrap(); +//! endpoint.discovery().add(mdns.clone()); +//! +//! // Subscribe to the discovery events +//! let mut events = mdns.subscribe().await; +//! while let Some(event) = events.next().await { +//! match event { +//! DiscoveryEvent::Discovered { node_info, .. } => { +//! println!("MDNS discovered: {:?}", node_info); +//! } +//! DiscoveryEvent::Expired { node_id } => { +//! println!("MDNS expired: {node_id}"); +//! } +//! } +//! } //! } //! ``` use std::{ collections::{BTreeSet, HashMap}, net::{IpAddr, SocketAddr}, str::FromStr, + sync::Arc, }; use iroh_base::{NodeId, PublicKey}; use n0_future::{ + Stream, boxed::BoxStream, task::{self, AbortOnDropHandle, JoinSet}, time::{self, Duration}, @@ -48,7 +58,7 @@ use tokio::sync::mpsc::{self, error::TrySendError}; use tracing::{Instrument, debug, error, info_span, trace, warn}; use super::{DiscoveryContext, DiscoveryError, IntoDiscovery, IntoDiscoveryError}; -use crate::discovery::{Discovery, DiscoveryEvent, DiscoveryItem, NodeData, NodeInfo}; +use crate::discovery::{Discovery, DiscoveryItem, NodeData, NodeInfo}; /// The n0 local service name const N0_SERVICE_NAME: &str = "irohv1"; @@ -68,10 +78,10 @@ const USER_DATA_ATTRIBUTE: &str = "user-data"; const DISCOVERY_DURATION: Duration = Duration::from_secs(10); /// Discovery using `swarm-discovery`, a variation on mdns -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct MdnsDiscovery { #[allow(dead_code)] - handle: AbortOnDropHandle<()>, + handle: Arc>, sender: mpsc::Sender, advertise: bool, /// When `local_addrs` changes, we re-publish our info. @@ -190,6 +200,24 @@ impl IntoDiscovery for MdnsDiscoveryBuilder { } } +/// An event emitted from the [`MdnsDiscovery`] service. +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum DiscoveryEvent { + /// A peer was discovered or it's information was updated. + Discovered { + /// The node info for the node, as discovered. + node_info: NodeInfo, + /// Optional timestamp when this node address info was last updated. + last_updated: Option, + }, + /// A peer was expired due to being inactive, unreachable, or otherwise + /// unavailable. + Expired { + /// The id of the node that expired. + node_id: NodeId, + }, +} + impl MdnsDiscovery { /// Returns a [`MdnsDiscoveryBuilder`] that implements [`IntoDiscovery`]. pub fn builder() -> MdnsDiscoveryBuilder { @@ -295,7 +323,9 @@ impl MdnsDiscovery { "removing node from MdnsDiscovery address book" ); node_addrs.remove(&discovered_node_id); - subscribers.send(DiscoveryEvent::Expired(discovered_node_id)); + subscribers.send(DiscoveryEvent::Expired { + node_id: discovered_node_id, + }); continue; } @@ -328,7 +358,10 @@ impl MdnsDiscovery { // in other words, nodes sent to the `subscribers` should only be the ones that // have been "passively" discovered if !resolved { - subscribers.send(DiscoveryEvent::Discovered(item)); + subscribers.send(DiscoveryEvent::Discovered { + node_info: item.node_info, + last_updated: item.last_updated, + }); } } Message::Resolve(node_id, sender) => { @@ -375,13 +408,21 @@ impl MdnsDiscovery { }; let handle = task::spawn(discovery_fut.instrument(info_span!("swarm-discovery.actor"))); Ok(Self { - handle: AbortOnDropHandle::new(handle), + handle: Arc::new(AbortOnDropHandle::new(handle)), sender: send, advertise, local_addrs, }) } + /// Subscribe to discovered nodes + pub async fn subscribe(&self) -> impl Stream + Unpin + use<> { + let (sender, recv) = mpsc::channel(20); + let discovery_sender = self.sender.clone(); + discovery_sender.send(Message::Subscribe(sender)).await.ok(); + tokio_stream::wrappers::ReceiverStream::new(recv) + } + fn spawn_discoverer( node_id: PublicKey, advertise: bool, @@ -480,18 +521,6 @@ impl Discovery for MdnsDiscovery { self.local_addrs.set(Some(data.clone())).ok(); } } - - fn subscribe(&self) -> Option> { - use futures_util::FutureExt; - - let (sender, recv) = mpsc::channel(20); - let discovery_sender = self.sender.clone(); - let stream = async move { - discovery_sender.send(Message::Subscribe(sender)).await.ok(); - tokio_stream::wrappers::ReceiverStream::new(recv) - }; - Some(Box::pin(stream.flatten_stream())) - } } #[cfg(test)] @@ -523,42 +552,40 @@ mod tests { .with_user_data(Some(user_data.clone())); // resolve twice to ensure we can create separate streams for the same node_id - let mut s1 = discovery_a - .subscribe() - .unwrap() - .filter(|event| match event { - DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b, - _ => false, - }); - let mut s2 = discovery_a - .subscribe() - .unwrap() - .filter(|event| match event { - DiscoveryEvent::Discovered(event) => event.node_id() == node_id_b, - _ => false, - }); + let mut s1 = discovery_a.subscribe().await.filter(|event| match event { + DiscoveryEvent::Discovered { node_info, .. } => node_info.node_id == node_id_b, + _ => false, + }); + let mut s2 = discovery_a.subscribe().await.filter(|event| match event { + DiscoveryEvent::Discovered { node_info, .. } => node_info.node_id == node_id_b, + _ => false, + }); tracing::debug!(?node_id_b, "Discovering node id b"); // publish discovery_b's address discovery_b.publish(&node_data); - let DiscoveryEvent::Discovered(s1_res) = - tokio::time::timeout(Duration::from_secs(5), s1.next()) - .await - .context("timeout")? - .unwrap() + let DiscoveryEvent::Discovered { + node_info: s1_node_info, + .. + } = tokio::time::timeout(Duration::from_secs(5), s1.next()) + .await + .context("timeout")? + .unwrap() else { panic!("Received unexpected discovery event"); }; - let DiscoveryEvent::Discovered(s2_res) = - tokio::time::timeout(Duration::from_secs(5), s2.next()) - .await - .context("timeout")? - .unwrap() + let DiscoveryEvent::Discovered { + node_info: s2_node_info, + .. + } = tokio::time::timeout(Duration::from_secs(5), s2.next()) + .await + .context("timeout")? + .unwrap() else { panic!("Received unexpected discovery event"); }; - assert_eq!(s1_res.node_info().data, node_data); - assert_eq!(s2_res.node_info().data, node_data); + assert_eq!(s1_node_info.data, node_data); + assert_eq!(s2_node_info.data, node_data); Ok(()) } @@ -574,7 +601,7 @@ mod tests { .with_user_data(Some("".parse()?)); discovery_b.publish(&node_data); - let mut s1 = discovery_a.subscribe().unwrap(); + let mut s1 = discovery_a.subscribe().await; tracing::debug!(?node_id_b, "Discovering node id b"); // Wait for the specific node to be discovered @@ -585,7 +612,9 @@ mod tests { .expect("Stream should not be closed"); match event { - DiscoveryEvent::Discovered(item) if item.node_info().node_id == node_id_b => { + DiscoveryEvent::Discovered { node_info, .. } + if node_info.node_id == node_id_b => + { break; } _ => continue, // Ignore other discovery events @@ -604,7 +633,9 @@ mod tests { .expect("Stream should not be closed"); match event { - DiscoveryEvent::Expired(expired_node_id) if expired_node_id == node_id_b => { + DiscoveryEvent::Expired { + node_id: expired_node_id, + } if expired_node_id == node_id_b => { break; } _ => continue, // Ignore other events @@ -633,14 +664,16 @@ mod tests { discoverers.push(discovery); } - let mut events = discovery.subscribe().unwrap(); + let mut events = discovery.subscribe().await; let test = async move { let mut got_ids = BTreeSet::new(); while got_ids.len() != num_nodes { - if let Some(DiscoveryEvent::Discovered(item)) = events.next().await { - if node_ids.contains(&(item.node_id(), item.user_data())) { - got_ids.insert((item.node_id(), item.user_data())); + if let Some(DiscoveryEvent::Discovered { node_info, .. }) = events.next().await + { + let data = node_info.data.user_data().cloned(); + if node_ids.contains(&(node_info.node_id, data.clone())) { + got_ids.insert((node_info.node_id, data)); } } else { whatever!( diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 9695cfdfc4..4f66590b39 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -24,7 +24,7 @@ use std::{ use ed25519_dalek::{VerifyingKey, pkcs8::DecodePublicKey}; use iroh_base::{NodeAddr, NodeId, RelayUrl, SecretKey}; use iroh_relay::RelayMap; -use n0_future::{Stream, time::Duration}; +use n0_future::time::Duration; use n0_watcher::Watcher; use nested_enum_utils::common_fields; use pin_project::pin_project; @@ -38,9 +38,8 @@ use crate::discovery::pkarr::PkarrResolver; use crate::{discovery::dns::DnsDiscovery, dns::DnsResolver}; use crate::{ discovery::{ - ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, DiscoveryEvent, - DiscoverySubscribers, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, IntoDiscoveryError, - Lagged, UserData, pkarr::PkarrPublisher, + ConcurrentDiscovery, DiscoveryError, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, + UserData, pkarr::PkarrPublisher, }, magicsock::{self, Handle, NodeIdMappedAddr, OwnAddressSnafu}, metrics::EndpointMetrics, @@ -70,14 +69,14 @@ pub use quinn_proto::{ use self::rtt_actor::RttMessage; pub use super::magicsock::{ AddNodeAddrError, ConnectionType, ControlMsg, DirectAddr, DirectAddrInfo, DirectAddrType, - RemoteInfo, Source, + Source, }; /// The delay to fall back to discovery when direct addresses fail. /// /// When a connection is attempted with a [`NodeAddr`] containing direct addresses the /// [`Endpoint`] assumes one of those addresses probably works. If after this delay there -/// is still no connection the configured [`Discovery`] will be used however. +/// is still no connection the configured [`crate::discovery::Discovery`] will be used however. const DISCOVERY_WAIT_PERIOD: Duration = Duration::from_millis(500); /// Defines the mode of path selection for all traffic flowing through @@ -108,8 +107,6 @@ pub struct Builder { discovery: Vec>, discovery_user_data: Option, proxy_url: Option, - /// List of known nodes. See [`Builder::known_nodes`]. - node_map: Option>, #[cfg(not(wasm_browser))] dns_resolver: Option, #[cfg(any(test, feature = "test-utils"))] @@ -134,7 +131,6 @@ impl Default for Builder { discovery: Default::default(), discovery_user_data: Default::default(), proxy_url: None, - node_map: None, #[cfg(not(wasm_browser))] dns_resolver: None, #[cfg(any(test, feature = "test-utils"))] @@ -170,24 +166,6 @@ impl Builder { #[cfg(not(wasm_browser))] let dns_resolver = self.dns_resolver.unwrap_or_default(); - let discovery: Option> = { - let context = DiscoveryContext { - secret_key: &secret_key, - #[cfg(not(wasm_browser))] - dns_resolver: &dns_resolver, - }; - let discovery = self - .discovery - .into_iter() - .map(|builder| builder.into_discovery(&context)) - .collect::, IntoDiscoveryError>>()?; - match discovery.len() { - 0 => None, - 1 => Some(discovery.into_iter().next().expect("checked length")), - _ => Some(Box::new(ConcurrentDiscovery::from_services(discovery))), - } - }; - let metrics = EndpointMetrics::default(); let msock_opts = magicsock::Options { @@ -195,8 +173,7 @@ impl Builder { addr_v6: self.addr_v6, secret_key, relay_map, - node_map: self.node_map, - discovery, + discovery: self.discovery, discovery_user_data: self.discovery_user_data, proxy_url: self.proxy_url, #[cfg(not(wasm_browser))] @@ -297,7 +274,7 @@ impl Builder { /// If no discovery service is set, connecting to a node without providing its /// direct addresses or relay URLs will fail. /// - /// See the documentation of the [`Discovery`] trait for details. + /// See the documentation of the [`crate::discovery::Discovery`] trait for details. pub fn discovery(mut self, discovery: impl IntoDiscovery) -> Self { self.discovery.clear(); self.discovery.push(Box::new(discovery)); @@ -319,7 +296,7 @@ impl Builder { /// /// To clear all discovery services, use [`Builder::clear_discovery`]. /// - /// See the documentation of the [`Discovery`] trait for details. + /// See the documentation of the [`crate::discovery::Discovery`] trait for details. pub fn add_discovery(mut self, discovery: impl IntoDiscovery) -> Self { self.discovery.push(Box::new(discovery)); self @@ -392,12 +369,6 @@ impl Builder { self } - /// Optionally set a list of known nodes. - pub fn known_nodes(mut self, nodes: Vec) -> Self { - self.node_map = Some(nodes); - self - } - // # Methods for more specialist customisation. /// Sets a custom [`quinn::TransportConfig`] for this endpoint. @@ -598,10 +569,6 @@ pub enum BindError { MagicSpawn { source: magicsock::CreateHandleError, }, - #[snafu(transparent)] - Discovery { - source: crate::discovery::IntoDiscoveryError, - }, } #[allow(missing_docs)] @@ -677,9 +644,8 @@ impl Endpoint { /// establish a direct connection without involving a relay server. /// /// If neither a [`RelayUrl`] or direct addresses are configured in the [`NodeAddr`] it - /// may still be possible a connection can be established. This depends on other calls - /// to [`Endpoint::add_node_addr`] which may provide contact information, or via the - /// [`Discovery`] service configured using [`Builder::discovery`]. The discovery + /// may still be possible a connection can be established. This depends on which, if any, + /// [`crate::discovery::Discovery`] services were configured using [`Builder::discovery`]. The discovery /// service will also be used if the remote node is not reachable on the provided direct /// addresses and there is no [`RelayUrl`]. /// @@ -842,7 +808,7 @@ impl Endpoint { /// if the direct addresses are a subset of ours. /// /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider - pub fn add_node_addr(&self, node_addr: NodeAddr) -> Result<(), AddNodeAddrError> { + fn add_node_addr(&self, node_addr: NodeAddr) -> Result<(), AddNodeAddrError> { self.add_node_addr_inner(node_addr, magicsock::Source::App) } @@ -865,7 +831,7 @@ impl Endpoint { /// if the direct addresses are a subset of ours. /// /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider - pub fn add_node_addr_with_source( + pub(crate) fn add_node_addr_with_source( &self, node_addr: NodeAddr, source: &'static str, @@ -1075,65 +1041,6 @@ impl Endpoint { .collect() } - // # Getter methods for information about other nodes. - - /// Returns information about the remote node identified by a [`NodeId`]. - /// - /// The [`Endpoint`] keeps some information about remote iroh nodes, which it uses to find - /// the best path to a node. Having information on a remote node, however, does not mean we have - /// ever connected to it to or even whether a connection is even possible. The information about a - /// remote node will change over time, as the [`Endpoint`] learns more about the node. Future - /// calls may return different information. Furthermore, node information may even be - /// completely evicted as it becomes stale. - /// - /// See also [`Endpoint::remote_info_iter`] which returns information on all nodes known - /// by this [`Endpoint`]. - pub fn remote_info(&self, node_id: NodeId) -> Option { - self.msock.remote_info(node_id) - } - - /// Returns information about all the remote nodes this [`Endpoint`] knows about. - /// - /// This returns the same information as [`Endpoint::remote_info`] for each node known to this - /// [`Endpoint`]. - /// - /// The [`Endpoint`] keeps some information about remote iroh nodes, which it uses to find - /// the best path to a node. This returns all the nodes it knows about, regardless of whether a - /// connection was ever made or is even possible. - /// - /// See also [`Endpoint::remote_info`] to only retrieve information about a single node. - pub fn remote_info_iter(&self) -> impl Iterator + use<> { - self.msock.list_remote_infos().into_iter() - } - - /// Returns a stream of all remote nodes discovered through the endpoint's discovery services. - /// - /// Whenever a node is discovered via the endpoint's discovery service, the corresponding - /// [`DiscoveryEvent`] is yielded from this stream. This includes nodes discovered actively - /// through [`Discovery::resolve`], which is invoked automatically when calling - /// [`Endpoint::connect`] for a [`NodeId`] unknown to the endpoint. It also includes - /// nodes that the endpoint discovers passively from discovery services that implement - /// [`Discovery::subscribe`], which e.g. [`MdnsDiscovery`] does. - /// - /// The stream does not yield information about nodes that are added manually to the endpoint's - /// addressbook by calling [`Endpoint::add_node_addr`] or by supplying a full [`NodeAddr`] to - /// [`Endpoint::connect`]. It also does not yield information about nodes that we only - /// know about because they connected to us. When using the [`StaticProvider`] discovery, - /// discovery info is only emitted once connecting to a node added to the static provider, not - /// at the time of adding it to the static provider. - /// - /// The stream should be processed in a loop. If the stream is not processed fast enough, - /// [`Lagged`] may be yielded, indicating that items were missed. - /// - /// See also [`Endpoint::remote_info_iter`], which returns an iterator over all remotes - /// the endpoint knows about at a specific point in time. - /// - /// [`MdnsDiscovery`]: crate::discovery::mdns::MdnsDiscovery - /// [`StaticProvider`]: crate::discovery::static_provider::StaticProvider - pub fn discovery_stream(&self) -> impl Stream> + use<> { - self.msock.discovery_subscribers().subscribe() - } - // # Methods for less common getters. // // Partially they return things passed into the builder. @@ -1165,6 +1072,13 @@ impl Endpoint { self.msock.conn_type(node_id) } + /// Returns the currently lowest latency for this node. + /// + /// Will return `None` if we do not have any address information for the given `node_id`. + pub fn latency(&self, node_id: NodeId) -> Option { + self.msock.latency(node_id) + } + /// Returns the DNS resolver used in this [`Endpoint`]. /// /// See [`Builder::dns_resolver`]. @@ -1176,7 +1090,7 @@ impl Endpoint { /// Returns the discovery mechanism, if configured. /// /// See [`Builder::discovery`]. - pub fn discovery(&self) -> Option<&dyn Discovery> { + pub fn discovery(&self) -> &ConcurrentDiscovery { self.msock.discovery() } @@ -1378,6 +1292,29 @@ impl Endpoint { // # Remaining private methods + /// Checks if the given `NodeId` needs discovery. + pub(crate) fn needs_discovery(&self, node_id: NodeId, max_age: Duration) -> bool { + match self.msock.remote_info(node_id) { + // No info means no path to node -> start discovery. + None => true, + Some(info) => { + match ( + info.last_received(), + info.relay_url.as_ref().and_then(|r| r.last_alive), + ) { + // No path to node -> start discovery. + (None, None) => true, + // If we haven't received on direct addresses or the relay for MAX_AGE, + // start discovery. + (Some(elapsed), Some(elapsed_relay)) => { + elapsed > max_age && elapsed_relay > max_age + } + (Some(elapsed), _) | (_, Some(elapsed)) => elapsed > max_age, + } + } + } + } + /// Return the quic mapped address for this `node_id` and possibly start discovery /// services if discovery is enabled on this magic endpoint. /// @@ -1440,11 +1377,6 @@ impl Endpoint { } } - /// Returns a reference to the subscribers channel for discovery events. - pub(crate) fn discovery_subscribers(&self) -> &DiscoverySubscribers { - self.msock.discovery_subscribers() - } - #[cfg(test)] pub(crate) fn magic_sock(&self) -> Handle { self.msock.clone() @@ -2260,10 +2192,7 @@ fn is_cgi() -> bool { // https://github.com/n0-computer/iroh/issues/1183 #[cfg(test)] mod tests { - use std::{ - net::SocketAddr, - time::{Duration, Instant}, - }; + use std::time::{Duration, Instant}; use iroh_base::{NodeAddr, NodeId, SecretKey}; use n0_future::{BufferedStreamExt, StreamExt, stream, task::AbortOnDropHandle}; @@ -2278,7 +2207,7 @@ mod tests { use crate::{ RelayMode, discovery::static_provider::StaticProvider, - endpoint::{ConnectOptions, Connection, ConnectionType, RemoteInfo}, + endpoint::{ConnectOptions, Connection, ConnectionType}, protocol::{AcceptError, ProtocolHandler, Router}, test_utils::{run_relay_server, run_relay_server_with}, }; @@ -2401,60 +2330,6 @@ mod tests { Ok(()) } - /// Test that peers are properly restored - #[tokio::test] - #[traced_test] - async fn restore_peers() -> Result { - let secret_key = SecretKey::generate(rand::thread_rng()); - - /// Create an endpoint for the test. - async fn new_endpoint( - secret_key: SecretKey, - nodes: Option>, - ) -> Result { - let mut transport_config = quinn::TransportConfig::default(); - transport_config.max_idle_timeout(Some(Duration::from_secs(10).try_into().unwrap())); - - let mut builder = Endpoint::builder() - .secret_key(secret_key.clone()) - .transport_config(transport_config); - if let Some(nodes) = nodes { - builder = builder.known_nodes(nodes); - } - Ok(builder.alpns(vec![TEST_ALPN.to_vec()]).bind().await?) - } - - // create the peer that will be added to the peer map - let peer_id = SecretKey::generate(rand::thread_rng()).public(); - let direct_addr: SocketAddr = - (std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST), 8758u16).into(); - let node_addr = NodeAddr::new(peer_id).with_direct_addresses([direct_addr]); - - info!("setting up first endpoint"); - // first time, create a magic endpoint without peers but a peers file and add addressing - // information for a peer - let endpoint = new_endpoint(secret_key.clone(), None).await?; - assert_eq!(endpoint.remote_info_iter().count(), 0); - endpoint.add_node_addr(node_addr.clone())?; - - // Grab the current addrs - let node_addrs: Vec = endpoint.remote_info_iter().map(Into::into).collect(); - assert_eq!(node_addrs.len(), 1); - assert_eq!(node_addrs[0], node_addr); - - info!("closing endpoint"); - // close the endpoint and restart it - endpoint.close().await; - - info!("restarting endpoint"); - // now restart it and check the addressing info of the peer - let endpoint = new_endpoint(secret_key, Some(node_addrs)).await?; - let RemoteInfo { mut addrs, .. } = endpoint.remote_info(peer_id).e()?; - let conn_addr = addrs.pop().unwrap().addr; - assert_eq!(conn_addr, direct_addr); - Ok(()) - } - #[tokio::test] #[traced_test] async fn endpoint_relay_connect_loop() -> Result { diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 1d501baa99..1366185e1d 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -33,8 +33,6 @@ use data_encoding::HEXLOWER; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; use iroh_relay::RelayMap; use n0_future::{ - StreamExt, - boxed::BoxStream, task::{self, AbortOnDropHandle}, time::{self, Duration, Instant}, }; @@ -71,8 +69,12 @@ use crate::net_report::{IpMappedAddr, QuicConfig}; use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, SendAddr}, - discovery::{Discovery, DiscoveryEvent, DiscoverySubscribers, NodeData, UserData}, + discovery::{ + ConcurrentDiscovery, Discovery, DiscoveryContext, DynIntoDiscovery, IntoDiscoveryError, + NodeData, UserData, + }, key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box}, + magicsock::node_map::RemoteInfo, metrics::EndpointMetrics, net_report::{self, IfStateDetails, IpMappedAddresses, Report}, }; @@ -86,7 +88,7 @@ pub use node_map::Source; pub use self::{ metrics::Metrics, - node_map::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}, + node_map::{ConnectionType, ControlMsg, DirectAddrInfo}, }; /// How long we consider a QAD-derived endpoint valid for. UDP NAT mappings typically @@ -113,11 +115,8 @@ pub(crate) struct Options { /// The [`RelayMap`] to use, leave empty to not use a relay server. pub(crate) relay_map: RelayMap, - /// An optional [`NodeMap`], to restore information about nodes. - pub(crate) node_map: Option>, - - /// Optional node discovery mechanism. - pub(crate) discovery: Option>, + /// Optional node discovery mechanisms. + pub(crate) discovery: Vec>, /// Optional user-defined discovery data. pub(crate) discovery_user_data: Option, @@ -211,11 +210,9 @@ pub(crate) struct MagicSock { // - Discovery /// Optional discovery service - discovery: Option>, + discovery: ConcurrentDiscovery, /// Optional user-defined discover data. discovery_user_data: RwLock>, - /// Broadcast channel for listening to discovery updates. - discovery_subscribers: DiscoverySubscribers, /// Metrics pub(crate) metrics: EndpointMetrics, @@ -290,6 +287,7 @@ impl MagicSock { } /// Return the [`RemoteInfo`]s of all nodes in the node map. + #[cfg(test)] pub(crate) fn list_remote_infos(&self) -> Vec { self.node_map.list_remote_infos(Instant::now()) } @@ -373,6 +371,10 @@ impl MagicSock { self.node_map.conn_type(node_id) } + pub(crate) fn latency(&self, node_id: NodeId) -> Option { + self.node_map.latency(node_id) + } + /// Returns the socket address which can be used by the QUIC layer to dial this node. pub(crate) fn get_mapping_addr(&self, node_id: NodeId) -> Option { self.node_map.get_quic_mapped_addr_for_node_key(node_id) @@ -380,7 +382,7 @@ impl MagicSock { /// Add addresses for a node to the magic socket's addresbook. #[instrument(skip_all)] - pub fn add_node_addr( + pub(crate) fn add_node_addr( &self, mut addr: NodeAddr, source: node_map::Source, @@ -423,9 +425,9 @@ impl MagicSock { &self.dns_resolver } - /// Reference to optional discovery service - pub(crate) fn discovery(&self) -> Option<&dyn Discovery> { - self.discovery.as_ref().map(Box::as_ref) + /// Reference to the internal discovery service + pub(crate) fn discovery(&self) -> &ConcurrentDiscovery { + &self.discovery } /// Updates the user-defined discovery data for this node. @@ -446,11 +448,6 @@ impl MagicSock { .ok(); } - /// Returns a reference to the subscribers channel for discovery events. - pub(crate) fn discovery_subscribers(&self) -> &DiscoverySubscribers { - &self.discovery_subscribers - } - #[cfg(test)] async fn force_network_change(&self, is_major: bool) { self.actor_sender @@ -1134,23 +1131,21 @@ impl MagicSock { /// /// Called whenever our addresses or home relay node changes. fn publish_my_addr(&self) { - if let Some(ref discovery) = self.discovery { - let relay_url = self.my_relay(); - let direct_addrs = self.direct_addrs.sockaddrs(); - - let user_data = self - .discovery_user_data - .read() - .expect("lock poisened") - .clone(); - if relay_url.is_none() && direct_addrs.is_empty() && user_data.is_none() { - // do not bother publishing if we don't have any information - return; - } - - let data = NodeData::new(relay_url, direct_addrs).with_user_data(user_data); - discovery.publish(&data); + let relay_url = self.my_relay(); + let direct_addrs = self.direct_addrs.sockaddrs(); + + let user_data = self + .discovery_user_data + .read() + .expect("lock poisened") + .clone(); + if relay_url.is_none() && direct_addrs.is_empty() && user_data.is_none() { + // do not bother publishing if we don't have any information + return; } + + let data = NodeData::new(relay_url, direct_addrs).with_user_data(user_data); + self.discovery.publish(&data); } } @@ -1332,6 +1327,10 @@ pub enum CreateHandleError { CreateNetmonMonitor { source: netmon::Error }, #[snafu(display("Failed to subscribe netmon monitor"))] SubscribeNetmonMonitor { source: netmon::Error }, + #[snafu(transparent)] + Discovery { + source: crate::discovery::IntoDiscoveryError, + }, } impl Handle { @@ -1342,7 +1341,6 @@ impl Handle { addr_v6, secret_key, relay_map, - node_map, discovery, discovery_user_data, #[cfg(not(wasm_browser))] @@ -1356,6 +1354,22 @@ impl Handle { metrics, } = opts; + let discovery = { + let context = DiscoveryContext { + secret_key: &secret_key, + #[cfg(not(wasm_browser))] + dns_resolver: &dns_resolver, + }; + let discovery = discovery + .into_iter() + .map(|builder| builder.into_discovery(&context)) + .collect::, IntoDiscoveryError>>()?; + match discovery.len() { + 0 => ConcurrentDiscovery::default(), + _ => ConcurrentDiscovery::from_services(discovery), + } + }; + let addr_v4 = addr_v4.unwrap_or_else(|| SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); #[cfg(not(wasm_browser))] @@ -1369,9 +1383,8 @@ impl Handle { let ipv6_reported = false; // load the node data - let node_map = node_map.unwrap_or_default(); let node_map = NodeMap::load_from_vec( - node_map, + Vec::new(), #[cfg(any(test, feature = "test-utils"))] path_selection, ipv6_reported, @@ -1420,7 +1433,6 @@ impl Handle { net_report: Watchable::new((None, UpdateReason::None)), #[cfg(not(wasm_browser))] dns_resolver: dns_resolver.clone(), - discovery_subscribers: DiscoverySubscribers::new(), metrics: metrics.clone(), local_addrs_watch: transports.local_addrs_watch(), #[cfg(not(wasm_browser))] @@ -1857,13 +1869,6 @@ impl Actor { .port_mapper .watch_external_address(); - let mut discovery_events: BoxStream = Box::pin(n0_future::stream::empty()); - if let Some(d) = self.msock.discovery() { - if let Some(events) = d.subscribe() { - discovery_events = events; - } - } - let mut receiver_closed = false; #[cfg_attr(wasm_browser, allow(unused_mut))] let mut portmap_watcher_closed = false; @@ -1997,27 +2002,6 @@ impl Actor { self.msock.metrics.magicsock.actor_link_change.inc(); self.handle_network_change(is_major).await; } - // Even if `discovery_events` yields `None`, it could begin to yield - // `Some` again in the future, so we don't want to disable this branch - // forever like we do with the other branches that yield `Option`s - Some(discovery_item) = discovery_events.next() => { - trace!("tick: discovery event, address discovered: {discovery_item:?}"); - if let DiscoveryEvent::Discovered(discovery_item) = &discovery_item { - let provenance = discovery_item.provenance(); - let node_addr = discovery_item.to_node_addr(); - if let Err(e) = self.msock.add_node_addr( - node_addr, - Source::Discovery { - name: provenance.to_string() - }) { - let node_addr = discovery_item.to_node_addr(); - warn!(?node_addr, "unable to add discovered node address to the node map: {e:?}"); - } - } - - // Send the discovery item to the subscribers of the discovery broadcast stream. - self.msock.discovery_subscribers.send(discovery_item); - } Some((dst, dst_key, msg)) = self.disco_receiver.recv() => { if let Err(err) = self.msock.send_disco_message(&sender, dst.clone(), dst_key, msg).await { warn!(%dst, node = %dst_key.fmt_short(), ?err, "failed to send disco message (UDP)"); @@ -2562,8 +2546,7 @@ mod tests { addr_v6: None, secret_key, relay_map: RelayMap::empty(), - node_map: None, - discovery: None, + discovery: Default::default(), proxy_url: None, dns_resolver: DnsResolver::new(), server_config, @@ -3095,8 +3078,7 @@ mod tests { addr_v6: None, secret_key: secret_key.clone(), relay_map: RelayMap::empty(), - node_map: None, - discovery: None, + discovery: Default::default(), discovery_user_data: None, dns_resolver, proxy_url: None, diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 3b61744be2..362e0af5b6 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -3,6 +3,7 @@ use std::{ hash::Hash, net::{IpAddr, SocketAddr}, sync::Mutex, + time::Duration, }; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; @@ -22,8 +23,8 @@ mod path_state; mod path_validity; mod udp_paths; -pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}; -pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; +pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo}; +pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, RemoteInfo, SendPing}; /// Number of nodes that are inactive for which we keep info about. This limit is enforced /// periodically via [`NodeMap::prune_inactive`]. @@ -84,8 +85,7 @@ enum NodeStateKey { /// sources can be associated with a single address, if we have discovered this /// address through multiple means. /// -/// Each time a [`NodeAddr`] is added to the node map, usually through -/// [`crate::endpoint::Endpoint::add_node_addr_with_source`], a [`Source`] must be supplied to indicate +/// Each time a [`NodeAddr`] is added to the node map a [`Source`] must be supplied to indicate /// how the address was obtained. /// /// A [`Source`] can describe a variety of places that an address or node was @@ -285,6 +285,7 @@ impl NodeMap { } /// Returns the [`RemoteInfo`]s for each node in the node map. + #[cfg(test)] pub(super) fn list_remote_infos(&self, now: Instant) -> Vec { // NOTE: calls to this method will often call `into_iter` (or similar methods). Note that // we can't avoid `collect` here since it would hold a lock for an indefinite time. Even if @@ -307,6 +308,10 @@ impl NodeMap { self.inner.lock().expect("poisoned").conn_type(node_id) } + pub(super) fn latency(&self, node_id: NodeId) -> Option { + self.inner.lock().expect("poisoned").latency(node_id) + } + /// Get the [`RemoteInfo`]s for the node identified by [`NodeId`]. pub(super) fn remote_info(&self, node_id: NodeId) -> Option { self.inner.lock().expect("poisoned").remote_info(node_id) @@ -473,6 +478,7 @@ impl NodeMapInner { *node_state.quic_mapped_addr() } + #[cfg(test)] fn node_states(&self) -> impl Iterator { self.by_id.iter() } @@ -482,6 +488,7 @@ impl NodeMapInner { } /// Get the [`RemoteInfo`]s for all nodes. + #[cfg(test)] fn remote_infos_iter(&self, now: Instant) -> impl Iterator + '_ { self.node_states().map(move |(_, ep)| ep.info(now)) } @@ -506,6 +513,11 @@ impl NodeMapInner { .map(|ep| ep.conn_type()) } + fn latency(&self, node_id: NodeId) -> Option { + self.get(NodeStateKey::NodeId(node_id)) + .and_then(|ep| ep.latency()) + } + fn handle_pong(&mut self, sender: NodeId, src: &transports::Addr, pong: Pong) { if let Some(ns) = self.get_mut(NodeStateKey::NodeId(sender)).as_mut() { let insert = ns.handle_pong(&pong, src.clone().into()); diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index aaa2977f3b..7cf6511b97 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -208,10 +208,8 @@ impl NodeState { self.conn_type.watch() } - /// Returns info about this node. - pub(super) fn info(&self, now: Instant) -> RemoteInfo { - let conn_type = self.conn_type.get(); - let latency = match conn_type { + pub(super) fn latency(&self) -> Option { + match self.conn_type.get() { ConnectionType::Direct(addr) => self .udp_paths .paths() @@ -236,7 +234,13 @@ impl NodeState { addr_latency.min(relay_latency) } ConnectionType::None => None, - }; + } + } + + /// Returns info about this node. + pub(super) fn info(&self, now: Instant) -> RemoteInfo { + let conn_type = self.conn_type.get(); + let latency = self.latency(); let addrs = self .udp_paths @@ -1298,10 +1302,7 @@ pub struct DirectAddrInfo { /// /// The elapsed time since *any* confirmation of the path's existence was received is /// returned. If the remote node moved networks and no longer has this path, this could - /// be a long duration. If the path was added via [`Endpoint::add_node_addr`] or some - /// node discovery the path may never have been known to exist. - /// - /// [`Endpoint::add_node_addr`]: crate::endpoint::Endpoint::add_node_addr + /// be a long duration. pub last_alive: Option, /// A [`HashMap`] of [`Source`]s to [`Duration`]s. /// @@ -1349,7 +1350,7 @@ impl From for RelayUrl { /// /// [`Endpoint::add_node_addr`]: crate::endpoint::Endpoint::add_node_addr #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -pub struct RemoteInfo { +pub(crate) struct RemoteInfo { /// The globally unique identifier for this node. pub node_id: NodeId, /// Relay server information, if available. @@ -1374,7 +1375,7 @@ pub struct RemoteInfo { impl RemoteInfo { /// Get the duration since the last activity we received from this endpoint /// on any of its direct addresses. - pub fn last_received(&self) -> Option { + pub(crate) fn last_received(&self) -> Option { self.addrs .iter() .filter_map(|addr| addr.last_control.map(|x| x.0).min(addr.last_payload)) @@ -1385,30 +1386,9 @@ impl RemoteInfo { /// /// Note that this does not provide any guarantees of whether any network path is /// usable. - pub fn has_send_address(&self) -> bool { + pub(crate) fn has_send_address(&self) -> bool { self.relay_url.is_some() || !self.addrs.is_empty() } - - /// Returns a deduplicated list of [`Source`]s merged from all address in the [`RemoteInfo`]. - /// - /// Deduplication is on the (`Source`, `Duration`) tuple, so you will get multiple [`Source`]s - /// for each `Source` variant, if different addresses were discovered from the same [`Source`] - /// at different times. - /// - /// The list is sorted from least to most recent [`Source`]. - pub fn sources(&self) -> Vec<(Source, Duration)> { - let mut sources = vec![]; - for addr in &self.addrs { - for source in &addr.sources { - let source = (source.0.clone(), *source.1); - if !sources.contains(&source) { - sources.push(source) - } - } - } - sources.sort_by(|a, b| b.1.cmp(&a.1)); - sources - } } /// The type of connection we have to the endpoint.