Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions iroh/src/magicsock/remote_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ pub(crate) struct RemoteMap {
/// The endpoint ID of the local endpoint.
local_endpoint_id: EndpointId,
metrics: Arc<MagicsockMetrics>,
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
/// The "direct" addresses known for our local endpoint
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
disco: DiscoState,
sender: TransportsSender,
discovery: ConcurrentDiscovery,
Expand All @@ -69,7 +70,7 @@ impl RemoteMap {
local_endpoint_id: EndpointId,
metrics: Arc<MagicsockMetrics>,

local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
disco: DiscoState,
sender: TransportsSender,
discovery: ConcurrentDiscovery,
Expand All @@ -80,7 +81,7 @@ impl RemoteMap {
relay_mapped_addrs: Default::default(),
local_endpoint_id,
metrics,
local_addrs,
local_direct_addrs,
disco,
sender,
discovery,
Expand Down Expand Up @@ -138,7 +139,7 @@ impl RemoteMap {
let handle = RemoteStateActor::new(
eid,
self.local_endpoint_id,
self.local_addrs.clone(),
self.local_direct_addrs.clone(),
self.disco.clone(),
self.relay_mapped_addrs.clone(),
self.metrics.clone(),
Expand Down
103 changes: 57 additions & 46 deletions iroh/src/magicsock/remote_map/remote_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use std::{
};

use iroh_base::{EndpointId, RelayUrl, TransportAddr};
use n0_error::StackResultExt;
use n0_future::{
Either, FuturesUnordered, MergeUnbounded, Stream, StreamExt,
boxed::BoxStream,
Expand Down Expand Up @@ -125,7 +126,8 @@ pub(super) struct RemoteStateActor {
/// Our local addresses.
///
/// These are our local addresses and any reflexive transport addresses.
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
/// They are called "direct addresses" in the magic socket actor.
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
/// Shared state to allow to encrypt DISCO messages to peers.
disco: DiscoState,
/// The mapping between endpoints via a relay and their [`RelayMappedAddr`]s.
Expand Down Expand Up @@ -180,7 +182,7 @@ impl RemoteStateActor {
pub(super) fn new(
endpoint_id: EndpointId,
local_endpoint_id: EndpointId,
local_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
local_direct_addrs: n0_watcher::Direct<BTreeSet<DirectAddr>>,
disco: DiscoState,
relay_mapped_addrs: AddrMap<(RelayUrl, EndpointId), RelayMappedAddr>,
metrics: Arc<MagicsockMetrics>,
Expand All @@ -191,7 +193,7 @@ impl RemoteStateActor {
endpoint_id,
local_endpoint_id,
metrics,
local_addrs,
local_direct_addrs,
relay_mapped_addrs,
discovery,
disco,
Expand Down Expand Up @@ -219,19 +221,12 @@ impl RemoteStateActor {
// we don't explicitly set a span we get the spans from whatever call happens to
// first create the actor, which is often very confusing as it then keeps those
// spans for all logging of the actor.
let task = task::spawn(
async move {
if let Err(err) = self.run(rx).await {
error!("actor failed: {err:#}");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this was a poor-man's attempt at being able to have "fatal" errors. The right thing of course is to panic and spawn the tasks in a JoinSet and making sure the panic bubbles up to the endpoint. But that's for another day I guess. For now making this infallible is the safer approach I think. While if forces you to handle the fatal error everywhere it will probably result in better code.

}
}
.instrument(info_span!(
parent: None,
"RemoteStateActor",
me = %me.fmt_short(),
remote = %endpoint_id.fmt_short(),
)),
);
let task = task::spawn(self.run(rx).instrument(info_span!(
parent: None,
"RemoteStateActor",
me = %me.fmt_short(),
remote = %endpoint_id.fmt_short(),
)));
RemoteStateHandle {
sender: tx,
_task: AbortOnDropHandle::new(task),
Expand All @@ -243,10 +238,10 @@ impl RemoteStateActor {
/// Note that the actor uses async handlers for tasks from the main loop. The actor is
/// not processing items from the inbox while waiting on any async calls. So some
/// discipline is needed to not turn pending for a long time.
async fn run(mut self, mut inbox: GuardedReceiver<RemoteStateMessage>) -> n0_error::Result<()> {
async fn run(mut self, mut inbox: GuardedReceiver<RemoteStateMessage>) {
trace!("actor started");
let idle_timeout = MaybeFuture::None;
tokio::pin!(idle_timeout);
let idle_timeout = time::sleep(ACTOR_MAX_IDLE_TIMEOUT);
n0_future::pin!(idle_timeout);
loop {
let scheduled_path_open = match self.scheduled_open_path {
Some(when) => MaybeFuture::Some(time::sleep_until(when)),
Expand All @@ -258,11 +253,16 @@ impl RemoteStateActor {
None => MaybeFuture::None,
};
n0_future::pin!(scheduled_hp);
if !inbox.is_idle() || !self.connections.is_empty() {
idle_timeout
.as_mut()
.reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
}
tokio::select! {
biased;
msg = inbox.recv() => {
match msg {
Some(msg) => self.handle_message(msg).await?,
Some(msg) => self.handle_message(msg).await,
None => break,
}
}
Expand All @@ -276,7 +276,11 @@ impl RemoteStateActor {
self.selected_path.set(None).ok();
}
}
_ = self.local_addrs.updated() => {
res = self.local_direct_addrs.updated() => {
if let Err(n0_watcher::Disconnected) = res {
trace!("direct address watcher disconnected, shutting down");
break;
}
trace!("local addrs updated, triggering holepunching");
self.trigger_holepunching().await;
}
Expand All @@ -300,33 +304,25 @@ impl RemoteStateActor {
if self.connections.is_empty() && inbox.close_if_idle() {
trace!("idle timeout expired and still idle: terminate actor");
break;
} else {
// Seems like we weren't really idle, so we reset
idle_timeout.as_mut().reset(Instant::now() + ACTOR_MAX_IDLE_TIMEOUT);
}
}
}

if self.connections.is_empty() && inbox.is_idle() && idle_timeout.is_none() {
trace!("start idle timeout");
idle_timeout
.as_mut()
.set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT));
} else if idle_timeout.is_some() {
trace!("abort idle timeout");
idle_timeout.as_mut().set_none()
}
}
trace!("actor terminating");
Ok(())
}

/// Handles an actor message.
///
/// Error returns are fatal and kill the actor.
#[instrument(skip(self))]
async fn handle_message(&mut self, msg: RemoteStateMessage) -> n0_error::Result<()> {
async fn handle_message(&mut self, msg: RemoteStateMessage) {
// trace!("handling message");
match msg {
RemoteStateMessage::SendDatagram(transmit) => {
self.handle_msg_send_datagram(transmit).await?;
self.handle_msg_send_datagram(transmit).await;
}
RemoteStateMessage::AddConnection(handle, tx) => {
self.handle_msg_add_connection(handle, tx).await;
Expand All @@ -347,7 +343,6 @@ impl RemoteStateActor {
self.handle_msg_latency(tx);
}
}
Ok(())
}

async fn send_datagram(
Expand All @@ -360,38 +355,54 @@ impl RemoteStateActor {
contents: owned_transmit.contents.as_ref(),
segment_size: owned_transmit.segment_size,
};
self.sender.send(&dst, None, &transmit).await?;
self.sender
.send(&dst, None, &transmit)
.await
.with_context(|_| format!("failed to send datagram to {dst:?}"))?;
Ok(())
}

/// Handles [`RemoteStateMessage::SendDatagram`].
///
/// Error returns are fatal and kill the actor.
async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> {
async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) {
// Sending datagrams might fail, e.g. because we don't have the right transports set
// up to handle sending this owned transmit to.
// After all, we try every single path that we know (relay URL, IP address), even
// though we might not have a relay transport or ip-capable transport set up.
// So these errors must not be fatal for this actor (or even this operation).

if let Some(addr) = self.selected_path.get() {
trace!(?addr, "sending datagram to selected path");
self.send_datagram(addr, transmit).await?;

if let Err(err) = self.send_datagram(addr.clone(), transmit).await {
debug!(?addr, "failed to send datagram on selected_path: {err:#}");
}
} else {
trace!(
paths = ?self.paths.addrs().collect::<Vec<_>>(),
"sending datagram to all known paths",
);
if self.paths.is_empty() {
warn!("Cannot send datagrams: No paths to remote endpoint known");
}
for addr in self.paths.addrs() {
// We never want to send to our local addresses.
// The local address set is updated in the main loop so we can use `peek` here.
if let transports::Addr::Ip(sockaddr) = addr
&& self.local_addrs.peek().iter().any(|a| a.addr == *sockaddr)
&& self
.local_direct_addrs
.peek()
.iter()
.any(|a| a.addr == *sockaddr)
{
trace!(%sockaddr, "not sending datagram to our own address");
} else {
self.send_datagram(addr.clone(), transmit.clone()).await?;
} else if let Err(err) = self.send_datagram(addr.clone(), transmit.clone()).await {
debug!(?addr, "failed to send datagram: {err:#}");
}
}
// This message is received *before* a connection is added. So we do
// not yet have a connection to holepunch. Instead we trigger
// holepunching when AddConnection is received.
}
Ok(())
}

/// Handles [`RemoteStateMessage::AddConnection`].
Expand Down Expand Up @@ -656,7 +667,7 @@ impl RemoteStateActor {

let remote_addrs: BTreeSet<SocketAddr> = self.remote_hp_addrs();
let local_addrs: BTreeSet<SocketAddr> = self
.local_addrs
.local_direct_addrs
.get()
.iter()
.map(|daddr| daddr.addr)
Expand Down Expand Up @@ -748,7 +759,7 @@ impl RemoteStateActor {

// Send the DISCO CallMeMaybe message over the relay.
let my_numbers: Vec<SocketAddr> = self
.local_addrs
.local_direct_addrs
.get()
.iter()
.map(|daddr| daddr.addr)
Expand Down
5 changes: 5 additions & 0 deletions iroh/src/magicsock/remote_map/remote_state/path_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ impl RemotePathState {
self.paths.keys()
}

/// Returns whether this stores any addresses.
pub(super) fn is_empty(&self) -> bool {
self.paths.is_empty()
}

/// Replies to all pending resolve requests.
///
/// This is a no-op if no requests are queued. Replies `Ok` if we have any known paths,
Expand Down
4 changes: 3 additions & 1 deletion iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,9 @@ impl TransportsSender {
if any_match {
Err(io::Error::other("all available transports failed"))
} else {
Err(io::Error::other("no transport available"))
Err(io::Error::other(
"no transport available for this destination",
))
}
}

Expand Down
Loading