diff --git a/iroh/src/magicsock/remote_map.rs b/iroh/src/magicsock/remote_map.rs index 0861a039b0..76d8fdf0a4 100644 --- a/iroh/src/magicsock/remote_map.rs +++ b/iroh/src/magicsock/remote_map.rs @@ -57,7 +57,8 @@ pub(crate) struct RemoteMap { /// The endpoint ID of the local endpoint. local_endpoint_id: EndpointId, metrics: Arc, - local_addrs: n0_watcher::Direct>, + /// The "direct" addresses known for our local endpoint + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, @@ -69,7 +70,7 @@ impl RemoteMap { local_endpoint_id: EndpointId, metrics: Arc, - local_addrs: n0_watcher::Direct>, + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, sender: TransportsSender, discovery: ConcurrentDiscovery, @@ -80,7 +81,7 @@ impl RemoteMap { relay_mapped_addrs: Default::default(), local_endpoint_id, metrics, - local_addrs, + local_direct_addrs, disco, sender, discovery, @@ -138,7 +139,7 @@ impl RemoteMap { let handle = RemoteStateActor::new( eid, self.local_endpoint_id, - self.local_addrs.clone(), + self.local_direct_addrs.clone(), self.disco.clone(), self.relay_mapped_addrs.clone(), self.metrics.clone(), diff --git a/iroh/src/magicsock/remote_map/remote_state.rs b/iroh/src/magicsock/remote_map/remote_state.rs index f7f13f9557..6fda36e3c9 100644 --- a/iroh/src/magicsock/remote_map/remote_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state.rs @@ -7,6 +7,7 @@ use std::{ }; use iroh_base::{EndpointId, RelayUrl, TransportAddr}; +use n0_error::StackResultExt; use n0_future::{ Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt, boxed::BoxStream, @@ -125,7 +126,8 @@ pub(super) struct RemoteStateActor { /// Our local addresses. /// /// These are our local addresses and any reflexive transport addresses. - local_addrs: n0_watcher::Direct>, + /// They are called "direct addresses" in the magic socket actor. + local_direct_addrs: n0_watcher::Direct>, /// Shared state to allow to encrypt DISCO messages to peers. disco: DiscoState, /// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s. @@ -180,7 +182,7 @@ impl RemoteStateActor { pub(super) fn new( endpoint_id: EndpointId, local_endpoint_id: EndpointId, - local_addrs: n0_watcher::Direct>, + local_direct_addrs: n0_watcher::Direct>, disco: DiscoState, relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>, metrics: Arc, @@ -191,7 +193,7 @@ impl RemoteStateActor { endpoint_id, local_endpoint_id, metrics, - local_addrs, + local_direct_addrs, relay_mapped_addrs, discovery, disco, @@ -219,19 +221,12 @@ impl RemoteStateActor { // we don't explicitly set a span we get the spans from whatever call happens to // first create the actor, which is often very confusing as it then keeps those // spans for all logging of the actor. - let task = task::spawn( - async move { - if let Err(err) = self.run(rx).await { - error!("actor failed: {err:#}"); - } - } - .instrument(info_span!( - parent: None, - "RemoteStateActor", - me = %me.fmt_short(), - remote = %endpoint_id.fmt_short(), - )), - ); + let task = task::spawn(self.run(rx).instrument(info_span!( + parent: None, + "RemoteStateActor", + me = %me.fmt_short(), + remote = %endpoint_id.fmt_short(), + ))); RemoteStateHandle { sender: tx, _task: AbortOnDropHandle::new(task), @@ -243,10 +238,10 @@ impl RemoteStateActor { /// Note that the actor uses async handlers for tasks from the main loop. The actor is /// not processing items from the inbox while waiting on any async calls. So some /// discipline is needed to not turn pending for a long time. - async fn run(mut self, mut inbox: GuardedReceiver) -> n0_error::Result<()> { + async fn run(mut self, mut inbox: GuardedReceiver) { trace!("actor started"); - let idle_timeout = MaybeFuture::None; - tokio::pin!(idle_timeout); + let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT); + n0_future::pin!(idle_timeout); loop { let scheduled_path_open = match self.scheduled_open_path { Some(when) => MaybeFuture::Some(time::sleep_until(when)), @@ -258,11 +253,16 @@ impl RemoteStateActor { None => MaybeFuture::None, }; n0_future::pin!(scheduled_hp); + if !inbox.is_idle() || !self.connections.is_empty() { + idle_timeout + .as_mut() + .reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); + } tokio::select! { biased; msg = inbox.recv() => { match msg { - Some(msg) => self.handle_message(msg).await?, + Some(msg) => self.handle_message(msg).await, None => break, } } @@ -276,7 +276,11 @@ impl RemoteStateActor { self.selected_path.set(None).ok(); } } - _ = self.local_addrs.updated() => { + res = self.local_direct_addrs.updated() => { + if let Err(n0_watcher::Disconnected) = res { + trace!("direct address watcher disconnected, shutting down"); + break; + } trace!("local addrs updated, triggering holepunching"); self.trigger_holepunching().await; } @@ -300,33 +304,25 @@ impl RemoteStateActor { if self.connections.is_empty() && inbox.close_if_idle() { trace!("idle timeout expired and still idle: terminate actor"); break; + } else { + // Seems like we weren't really idle, so we reset + idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT); } } } - - if self.connections.is_empty() && inbox.is_idle() && idle_timeout.is_none() { - trace!("start idle timeout"); - idle_timeout - .as_mut() - .set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT)); - } else if idle_timeout.is_some() { - trace!("abort idle timeout"); - idle_timeout.as_mut().set_none() - } } trace!("actor terminating"); - Ok(()) } /// Handles an actor message. /// /// Error returns are fatal and kill the actor. #[instrument(skip(self))] - async fn handle_message(&mut self, msg: RemoteStateMessage) -> n0_error::Result<()> { + async fn handle_message(&mut self, msg: RemoteStateMessage) { // trace!("handling message"); match msg { RemoteStateMessage::SendDatagram(transmit) => { - self.handle_msg_send_datagram(transmit).await?; + self.handle_msg_send_datagram(transmit).await; } RemoteStateMessage::AddConnection(handle, tx) => { self.handle_msg_add_connection(handle, tx).await; @@ -347,7 +343,6 @@ impl RemoteStateActor { self.handle_msg_latency(tx); } } - Ok(()) } async fn send_datagram( @@ -360,38 +355,54 @@ impl RemoteStateActor { contents: owned_transmit.contents.as_ref(), segment_size: owned_transmit.segment_size, }; - self.sender.send(&dst, None, &transmit).await?; + self.sender + .send(&dst, None, &transmit) + .await + .with_context(|_| format!("failed to send datagram to {dst:?}"))?; Ok(()) } /// Handles [`RemoteStateMessage::SendDatagram`]. - /// - /// Error returns are fatal and kill the actor. - async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> { + async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) { + // Sending datagrams might fail, e.g. because we don't have the right transports set + // up to handle sending this owned transmit to. + // After all, we try every single path that we know (relay URL, IP address), even + // though we might not have a relay transport or ip-capable transport set up. + // So these errors must not be fatal for this actor (or even this operation). + if let Some(addr) = self.selected_path.get() { trace!(?addr, "sending datagram to selected path"); - self.send_datagram(addr, transmit).await?; + + if let Err(err) = self.send_datagram(addr.clone(), transmit).await { + debug!(?addr, "failed to send datagram on selected_path: {err:#}"); + } } else { trace!( paths = ?self.paths.addrs().collect::>(), "sending datagram to all known paths", ); + if self.paths.is_empty() { + warn!("Cannot send datagrams: No paths to remote endpoint known"); + } for addr in self.paths.addrs() { // We never want to send to our local addresses. // The local address set is updated in the main loop so we can use `peek` here. if let transports::Addr::Ip(sockaddr) = addr - && self.local_addrs.peek().iter().any(|a| a.addr == *sockaddr) + && self + .local_direct_addrs + .peek() + .iter() + .any(|a| a.addr == *sockaddr) { trace!(%sockaddr, "not sending datagram to our own address"); - } else { - self.send_datagram(addr.clone(), transmit.clone()).await?; + } else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await { + debug!(?addr, "failed to send datagram: {err:#}"); } } // This message is received *before* a connection is added. So we do // not yet have a connection to holepunch. Instead we trigger // holepunching when AddConnection is received. } - Ok(()) } /// Handles [`RemoteStateMessage::AddConnection`]. @@ -656,7 +667,7 @@ impl RemoteStateActor { let remote_addrs: BTreeSet = self.remote_hp_addrs(); let local_addrs: BTreeSet = self - .local_addrs + .local_direct_addrs .get() .iter() .map(|daddr| daddr.addr) @@ -748,7 +759,7 @@ impl RemoteStateActor { // Send the DISCO CallMeMaybe message over the relay. let my_numbers: Vec = self - .local_addrs + .local_direct_addrs .get() .iter() .map(|daddr| daddr.addr) diff --git a/iroh/src/magicsock/remote_map/remote_state/path_state.rs b/iroh/src/magicsock/remote_map/remote_state/path_state.rs index aeea1b2a5f..a35c947650 100644 --- a/iroh/src/magicsock/remote_map/remote_state/path_state.rs +++ b/iroh/src/magicsock/remote_map/remote_state/path_state.rs @@ -117,6 +117,11 @@ impl RemotePathState { self.paths.keys() } + /// Returns whether this stores any addresses. + pub(super) fn is_empty(&self) -> bool { + self.paths.is_empty() + } + /// Replies to all pending resolve requests. /// /// This is a no-op if no requests are queued. Replies `Ok` if we have any known paths, diff --git a/iroh/src/magicsock/transports.rs b/iroh/src/magicsock/transports.rs index d6a31f7256..85efb5cf2e 100644 --- a/iroh/src/magicsock/transports.rs +++ b/iroh/src/magicsock/transports.rs @@ -541,7 +541,9 @@ impl TransportsSender { if any_match { Err(io::Error::other("all available transports failed")) } else { - Err(io::Error::other("no transport available")) + Err(io::Error::other( + "no transport available for this destination", + )) } }