Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose locally discovered node id and addresses #2612

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions iroh-net/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ use crate::{AddrInfo, Endpoint, NodeId};

pub mod dns;

#[cfg(feature = "local_swarm_discovery")]
pub mod local_swarm_discovery;
pub mod pkarr;

/// Name used for logging when new node addresses are added from discovery.
Expand Down Expand Up @@ -95,6 +93,15 @@ pub trait Discovery: std::fmt::Debug + Send + Sync {
) -> Option<BoxStream<Result<DiscoveryItem>>> {
None
}

/// Returns a list of nodes that have been discovered on the
/// local network.
///
/// If the discovery service does not discover local nodes,
/// this returns `None`.
fn locally_discovered_nodes(&self) -> Option<BoxStream<NodeAddr>> {
None
}
}

/// The results returned from [`Discovery::resolve`].
Expand Down Expand Up @@ -136,6 +143,11 @@ impl ConcurrentDiscovery {
pub fn add(&mut self, service: impl Discovery + 'static) {
self.services.push(Box::new(service));
}

/// Return a slice of the [`Discovery`] services we have available
pub fn services(&self) -> &[Box<dyn Discovery>] {
&self.services
}
}

impl<T> From<T> for ConcurrentDiscovery
Expand Down Expand Up @@ -168,6 +180,15 @@ impl Discovery for ConcurrentDiscovery {
let streams = futures_buffered::Merge::from_iter(streams);
Some(Box::pin(streams))
}

fn locally_discovered_nodes(&self) -> Option<BoxStream<NodeAddr>> {
let streams = self
.services
.iter()
.filter_map(|service| service.locally_discovered_nodes());
let streams = futures_buffered::Merge::from_iter(streams);
Some(Box::pin(streams))
}
}

/// Maximum duration since the last control or data message received from an endpoint to make us
Expand Down
39 changes: 38 additions & 1 deletion iroh-net/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub use quinn::{

pub use super::magicsock::{
ConnectionInfo, ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddr, DirectAddrInfo,
DirectAddrType, DirectAddrsStream,
DirectAddrType, DirectAddrsStream, LocalSwarmDiscovery, NodeMap,
};

pub use iroh_base::node_addr::{AddrInfo, NodeAddr};
Expand Down Expand Up @@ -83,6 +83,7 @@ pub struct Builder {
dns_resolver: Option<DnsResolver>,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: bool,
mdns: bool,
}

impl Default for Builder {
Expand All @@ -100,6 +101,10 @@ impl Default for Builder {
dns_resolver: None,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify: false,
#[cfg(test)]
mdns: false,
#[cfg(not(test))]
mdns: true,
}
}
}
Expand Down Expand Up @@ -136,6 +141,7 @@ impl Builder {
relay_map,
node_map: self.node_map,
discovery: self.discovery,
mdns: self.mdns,
proxy_url: self.proxy_url,
dns_resolver,
#[cfg(any(test, feature = "test-utils"))]
Expand Down Expand Up @@ -203,6 +209,22 @@ impl Builder {
self
}

/// When `true`, the endpoint will launch a [`LocalSwarmDiscovery`]
/// service that listens over mDNS and adds any nodes discovered
/// on the local network to the [`Endpoint`]'s internal address
/// book.
///
/// Also, this will add [`LocalSwarmDiscovery`] as an additional
/// discovery service, so that you may wait to dial a specific
/// node (that you know exists on the local network and also has
/// mDNS enabled) until you have discovered it via mDNS.
///
/// Default is `true` in production and `false` in tests.
pub fn mdns(mut self, enable: bool) -> Self {
self.mdns = enable;
self
}

/// Optionally set a list of known nodes.
pub fn known_nodes(mut self, nodes: Vec<NodeAddr>) -> Self {
self.node_map = Some(nodes);
Expand Down Expand Up @@ -376,6 +398,7 @@ impl Endpoint {
initial_alpns: Vec<Vec<u8>>,
) -> Result<Self> {
let span = info_span!("magic_ep", me = %static_config.secret_key.public().fmt_short());

let _guard = span.enter();
let msock = magicsock::MagicSock::spawn(msock_opts).await?;
trace!("created magicsock");
Expand Down Expand Up @@ -730,6 +753,20 @@ impl Endpoint {
self.msock.connection_infos()
}

/// Return any [`NodeAddr`]s for nodes we have discovered in the local network.
///
/// If no nodes have been discovered, or [`iroh-net::discovery::LocalSwarmDiscovery`]
/// has not been configured as a [`Discovery`] service, it will return an
/// empty list.
pub async fn locally_discovered_nodes(&self) -> Option<Vec<NodeAddr>> {
if let Some(discovery) = self.discovery() {
if let Some(addrs) = discovery.locally_discovered_nodes() {
return Some(addrs.collect().await);
}
}
None
}

// # Methods for less common getters.
//
// Partially they return things passed into the builder.
Expand Down
51 changes: 43 additions & 8 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ use crate::{

use self::{
metrics::Metrics as MagicsockMetrics,
node_map::{NodeMap, PingAction, PingRole, SendPing},
node_map::{PingAction, PingRole, SendPing},
relay_actor::{RelayActor, RelayActorMessage, RelayReadResult},
udp_conn::UdpConn,
};
Expand All @@ -75,9 +75,16 @@ mod relay_actor;
mod timer;
mod udp_conn;

#[cfg(feature = "local_swarm_discovery")]
mod local_swarm_discovery;

#[cfg(feature = "local_swarm_discovery")]
pub use self::local_swarm_discovery::LocalSwarmDiscovery;

pub use self::metrics::Metrics;
pub use self::node_map::{
ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, NodeInfo as ConnectionInfo,
NodeMap,
};
pub(super) use self::timer::Timer;
pub(crate) use node_map::Source;
Expand Down Expand Up @@ -110,6 +117,12 @@ pub(crate) struct Options {
/// Optional node discovery mechanism.
pub(crate) discovery: Option<Box<dyn Discovery>>,

/// Optional mDNS, when `true`, it will also add mDNS as a
/// [`Discovery`] service as well.
// TODO(ramfox): can be an enum so we leave room for allowing
// folks to supply their own mdns service
pub(crate) mdns: bool,

/// A DNS resolver to use for resolving relay URLs.
///
/// You can use [`crate::dns::default_resolver`] for a resolver that uses the system's DNS
Expand All @@ -134,6 +147,10 @@ impl Default for Options {
relay_map: RelayMap::empty(),
node_map: None,
discovery: None,
#[cfg(test)]
mdns: false,
#[cfg(not(test))]
mdns: true,
proxy_url: None,
dns_resolver: crate::dns::default_resolver().clone(),
#[cfg(any(test, feature = "test-utils"))]
Expand Down Expand Up @@ -225,6 +242,10 @@ pub(crate) struct MagicSock {
/// Optional discovery service
discovery: Option<Box<dyn Discovery>>,

/// Optional mdns service. When not `None`, this implies that
/// mdns is also being used as part of discovery
mdns: Option<LocalSwarmDiscovery>,

/// Our discovered direct addresses.
direct_addrs: Watchable<DiscoveredDirectAddrs>,

Expand Down Expand Up @@ -1242,16 +1263,20 @@ impl MagicSock {
///
/// Called whenever our addresses or home relay node changes.
fn publish_my_addr(&self) {
let addrs = self.direct_addrs.read();
let relay_url = self.my_relay();
let direct_addresses = addrs.iter().map(|da| da.addr).collect();
let info = AddrInfo {
relay_url,
direct_addresses,
};
if let Some(ref discovery) = self.discovery {
let addrs = self.direct_addrs.read();
let relay_url = self.my_relay();
let direct_addresses = addrs.iter().map(|da| da.addr).collect();
let info = AddrInfo {
relay_url,
direct_addresses,
};
discovery.publish(&info);
}
if let Some(ref mdns) = self.mdns {
// TODO(ramfox): fix this when the API has settled
mdns.client().publish(&info);
}
}
}

Expand Down Expand Up @@ -1374,6 +1399,7 @@ impl Handle {
discovery,
dns_resolver,
proxy_url,
mdns,
#[cfg(any(test, feature = "test-utils"))]
insecure_skip_relay_cert_verify,
} = opts;
Expand Down Expand Up @@ -1403,6 +1429,14 @@ impl Handle {
let node_map = node_map.unwrap_or_default();
let node_map = NodeMap::load_from_vec(node_map);

let mdns = match mdns {
true => Some(LocalSwarmDiscovery::new(
secret_key.public(),
Some(node_map.clone()),
)?),
false => None,
};

let udp_state = quinn_udp::UdpState::default();
let inner = Arc::new(MagicSock {
me,
Expand All @@ -1429,6 +1463,7 @@ impl Handle {
send_buffer: Default::default(),
udp_disco_sender,
discovery,
mdns,
direct_addrs: Watchable::new(Default::default()),
pending_call_me_maybes: Default::default(),
direct_addr_update_state: DirectAddrUpdateState::new(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,19 @@ use futures_lite::{stream::Boxed as BoxStream, StreamExt};
use tracing::{debug, error, trace, warn};

use async_channel::Sender;
use iroh_base::key::PublicKey;
use iroh_base::{key::PublicKey, node_addr::NodeAddr};
use swarm_discovery::{Discoverer, DropGuard, IpClass, Peer};
use tokio::task::JoinSet;

use crate::{
discovery::{Discovery, DiscoveryItem},
magicsock::Source,
util::AbortingJoinHandle,
AddrInfo, Endpoint, NodeId,
};

use super::node_map::NodeMap;

/// The n0 local swarm node discovery name
const N0_LOCAL_SWARM: &str = "iroh.local.swarm";

Expand Down Expand Up @@ -60,7 +63,7 @@ impl LocalSwarmDiscovery {
///
/// # Panics
/// This relies on [`tokio::runtime::Handle::current`] and will panic if called outside of the context of a tokio runtime.
pub fn new(node_id: NodeId) -> Result<Self> {
pub fn new(node_id: NodeId, node_map: Option<NodeMap>) -> Result<Self> {
debug!("Creating new LocalSwarmDiscovery service");
let (send, recv) = async_channel::bounded(64);
let task_sender = send.clone();
Expand Down Expand Up @@ -132,7 +135,16 @@ impl LocalSwarmDiscovery {
?peer_info,
"adding node to LocalSwarmDiscovery address book"
);
node_addrs.insert(discovered_node_id, peer_info);
node_addrs.insert(discovered_node_id, peer_info.clone());
if let Some(ref nm) = node_map {
let addrs = peer_info
.addrs()
.iter()
.map(|(ipaddr, port)| SocketAddr::new(*ipaddr, *port))
.collect();
let node_addr = NodeAddr::from_parts(discovered_node_id, None, addrs);
nm.add_node_addr(node_addr, Source::Mdns);
}
}
Message::SendAddrs(node_id, sender) => {
let id = last_id + 1;
Expand Down Expand Up @@ -187,6 +199,13 @@ impl LocalSwarmDiscovery {
})
}

/// Create a Client to interact with the running [`LocalSwarmDiscovery`].
///
/// The [`Client`] satisfies the [`Discovery`] trait.
pub fn client(&self) -> Client {
Client(self.sender.clone())
}

fn spawn_discoverer(
node_id: PublicKey,
sender: Sender<Message>,
Expand Down Expand Up @@ -245,10 +264,17 @@ impl From<&Peer> for DiscoveryItem {
}
}

impl Discovery for LocalSwarmDiscovery {
/// A local swarm discovery client. Allows you to interact with the running
/// `LocalSwarmDiscovery`.
///
/// Satisfies the [`Discovery`] trait.
#[derive(Clone, Debug)]
pub struct Client(Sender<Message>);

impl Discovery for Client {
fn resolve(&self, _ep: Endpoint, node_id: NodeId) -> Option<BoxStream<Result<DiscoveryItem>>> {
let (send, recv) = async_channel::bounded(20);
let discovery_sender = self.sender.clone();
let discovery_sender = self.0.clone();
tokio::spawn(async move {
discovery_sender
.send(Message::SendAddrs(node_id, send))
Expand All @@ -259,7 +285,7 @@ impl Discovery for LocalSwarmDiscovery {
}

fn publish(&self, info: &AddrInfo) {
let discovery_sender = self.sender.clone();
let discovery_sender = self.0.clone();
let info = info.clone();
tokio::spawn(async move {
discovery_sender
Expand All @@ -276,7 +302,7 @@ mod tests {
use testresult::TestResult;

#[tokio::test]
async fn test_local_swarm_discovery() -> TestResult {
async fn test_local_swarm_discovery_client() -> TestResult {
let _guard = iroh_test::logging::setup();
let (_, discovery_a) = make_discoverer()?;
let (node_id_b, discovery_b) = make_discoverer()?;
Expand Down Expand Up @@ -306,8 +332,8 @@ mod tests {
Ok(())
}

fn make_discoverer() -> Result<(PublicKey, LocalSwarmDiscovery)> {
fn make_discoverer() -> Result<(PublicKey, Client)> {
let node_id = crate::key::SecretKey::generate().public();
Ok((node_id, LocalSwarmDiscovery::new(node_id)?))
Ok((node_id, LocalSwarmDiscovery::new(node_id, None)?.client()))
}
}
Loading