Skip to content
1 change: 1 addition & 0 deletions iroh/examples/transfer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,7 @@ async fn fetch(endpoint: Endpoint, remote_addr: EndpointAddr) -> Result<()> {

// Attempt to connect, over the given ALPN.
// Returns a Quinn connection.
info!(?remote_addr, "start to connect");
let conn = endpoint.connect(remote_addr, TRANSFER_ALPN).await?;
println!("Connected to {}", remote_id);
// Spawn a background task that prints connection type changes. Will be aborted on drop.
Expand Down
49 changes: 21 additions & 28 deletions iroh/src/magicsock/remote_map/remote_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ impl RemoteStateActor {
}
}

pub(super) fn start(mut self) -> RemoteStateHandle {
pub(super) fn start(self) -> RemoteStateHandle {
let (tx, rx) = guarded_channel(16);
let me = self.local_endpoint_id;
let endpoint_id = self.endpoint_id;
Expand All @@ -193,19 +193,12 @@ impl RemoteStateActor {
// we don't explicitly set a span we get the spans from whatever call happens to
// first create the actor, which is often very confusing as it then keeps those
// spans for all logging of the actor.
let task = task::spawn(
async move {
if let Err(err) = self.run(rx).await {
error!("actor failed: {err:#}");
}
}
.instrument(info_span!(
parent: None,
"RemoteStateActor",
me = %me.fmt_short(),
remote = %endpoint_id.fmt_short(),
)),
);
let task = task::spawn(self.run(rx).instrument(info_span!(
parent: None,
"RemoteStateActor",
me = %me.fmt_short(),
remote = %endpoint_id.fmt_short(),
)));
RemoteStateHandle {
sender: tx,
_task: AbortOnDropHandle::new(task),
Expand All @@ -217,10 +210,7 @@ impl RemoteStateActor {
/// Note that the actor uses async handlers for tasks from the main loop. The actor is
/// not processing items from the inbox while waiting on any async calls. So some
/// discipline is needed to not turn pending for a long time.
async fn run(
&mut self,
mut inbox: GuardedReceiver<RemoteStateMessage>,
) -> n0_error::Result<()> {
async fn run(mut self, mut inbox: GuardedReceiver<RemoteStateMessage>) {
trace!("actor started");
let idle_timeout = MaybeFuture::None;
tokio::pin!(idle_timeout);
Expand All @@ -239,7 +229,7 @@ impl RemoteStateActor {
biased;
msg = inbox.recv() => {
match msg {
Some(msg) => self.handle_message(msg).await?,
Some(msg) => self.handle_message(msg).await,
None => break,
}
}
Expand All @@ -253,7 +243,7 @@ impl RemoteStateActor {
self.selected_path.set(None).ok();
}
}
_ = self.local_addrs.updated() => {
_ = self.local_addrs.updated(), if self.local_addrs.is_connected() => {
trace!("local addrs updated, triggering holepunching");
self.trigger_holepunching().await;
}
Expand All @@ -278,29 +268,31 @@ impl RemoteStateActor {
}
}

if self.connections.is_empty() && inbox.is_idle() && idle_timeout.is_none() {
let is_idle = self.connections.is_empty() && inbox.is_idle();
if idle_timeout.is_none() && is_idle {
trace!("start idle timeout");
idle_timeout
.as_mut()
.set_future(time::sleep(ACTOR_MAX_IDLE_TIMEOUT));
} else if idle_timeout.is_some() {
} else if idle_timeout.is_some() && !is_idle {
trace!("abort idle timeout");
idle_timeout.as_mut().set_none()
}
}
trace!("actor terminating");
Ok(())
}

/// Handles an actor message.
///
/// Error returns are fatal and kill the actor.
#[instrument(skip(self))]
async fn handle_message(&mut self, msg: RemoteStateMessage) -> n0_error::Result<()> {
async fn handle_message(&mut self, msg: RemoteStateMessage) {
// trace!("handling message");
match msg {
RemoteStateMessage::SendDatagram(transmit) => {
self.handle_msg_send_datagram(transmit).await?;
if let Err(err) = self.handle_msg_send_datagram(transmit).await {
warn!("failed to send datagram: {err:#}");
}
}
RemoteStateMessage::AddConnection(handle, tx) => {
self.handle_msg_add_connection(handle, tx).await;
Expand All @@ -324,7 +316,6 @@ impl RemoteStateActor {
self.handle_msg_latency(tx);
}
}
Ok(())
}

async fn send_datagram(
Expand All @@ -346,10 +337,10 @@ impl RemoteStateActor {
/// Error returns are fatal and kill the actor.
async fn handle_msg_send_datagram(&mut self, transmit: OwnedTransmit) -> n0_error::Result<()> {
if let Some(addr) = self.selected_path.get() {
trace!(?addr, "sending datagram to selected path");
debug!(?addr, "sending datagram to selected path");
self.send_datagram(addr, transmit).await?;
} else {
trace!(
debug!(
paths = ?self.paths.keys().collect::<Vec<_>>(),
"sending datagram to all known paths",
);
Expand Down Expand Up @@ -441,6 +432,8 @@ impl RemoteStateActor {

/// Handles [`RemoteStateMessage::AddEndpointAddr`].
fn handle_msg_add_endpoint_addr(&mut self, addr: EndpointAddr, source: Source) {
// TODO(Frando): Remove or demote to trace.
debug!(?addr, ?source, "add endpoint addr");
for sockaddr in addr.ip_addrs() {
let addr = transports::Addr::from(sockaddr);
self.paths
Expand Down
8 changes: 6 additions & 2 deletions iroh/src/magicsock/transports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,9 +539,13 @@ impl TransportsSender {
}
}
if any_match {
Err(io::Error::other("all available transports failed"))
Err(io::Error::other(format!(
"all available transports failed for destination {dst:?}"
)))
} else {
Err(io::Error::other("no transport available"))
Err(io::Error::other(format!(
"no transport available for destination {dst:?}"
)))
}
}

Expand Down
Loading