Skip to content

Commit

Permalink
fix(iroh-net)!: improve magicsock's shutdown story (#2227)
Browse files Browse the repository at this point in the history
## Description

- Waits for connections to actually be closed by calling
`quinn::Endpoint::wait_idle` after close.
- Consumes the `MagicEndpoint` in an effort to express the fact that
closing an endpoint is a terminal operation.
- Adds logging to closing the connections in case it's deemed necessary
- Changes `MagicSock::poll_recv` to return `Poll::Pending` on close
instead of an error. More info on the logic behind this change below.
- Changes `MagicSock::poll_send` to not error if nothing is being sent.

## Breaking Changes

`MagicEndpoint::close` now consumes the endpoint.

## Notes & open questions

Shutdown `MagicSock` when quinn is done with it has proven something we
can't do reliably.
- Drop based solutions (close when the `MagicSock` is dropped) are
unreliable because the socket is held by multiple components on `quinn`,
including spawned detached tasks. There is no really any way to trigger
the dropping of these tasks/components from outside `quinn`, and would
require quite en effort to change it to happen inside `quinn`.
- Close based solutions were rejected. The objective here was to stop
polling the socket when the endpoint was both closed (close called) and
all connections gracefully finalized. The reasoning here is that `quinn`
both receives and sends frames after close to read new connection
attempts and gracefully reject them. This is a fair argument on their
side, despite it being clearly a limitation for a reliable freeing of
resources from `quinn`.
- Taking into account the fact that the socket _will_ be polled after
closed, both to send and receive, returning an error from these will
always produce the logging error `quinn::endpoint: I/O error: connection
closed` we have been chasing, _even_ after the `quinn::Endpoint` has
been dropped. Therefore changing `poll_recv` to return `Poll::Pending`
addresses this annoyance.
- Note that the part about _gracefully_ shutting down is actually done
by calling `quinn::Endpoing::wait_idle` and that the (now averted) log
error in `quinn` doesn't really tells us whether shutdown was or not
gracefull.
- Note that the above point creates an API disparity between `poll_send`
and `poll_recv`: one will error on close, the other will return
`Pending`. I find this OK if `MagicSock` is simply a part of our
implementation, but right now it's also part of our public API. I wonder
if the `MagicSock` makes sense to users on its own or if we should
remove it from the public API.
- NOTE that conversation with `quinn`'s maintainers to get a better api
that allows to understand when all resources as freed has started. But
this is playing the long game and we need to solve this on our end for
now.

## Change checklist

- [x] Self-review.
- [x] Documentation updates if relevant.
- [ ] ~Tests if relevant.~
- [x] All breaking changes documented.
  • Loading branch information
divagant-martian committed Apr 24, 2024
1 parent 26e4564 commit 265e284
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 16 deletions.
25 changes: 19 additions & 6 deletions iroh-net/src/magic_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,10 +560,23 @@ impl MagicEndpoint {
///
/// Returns an error if closing the magic socket failed.
/// TODO: Document error cases.
pub async fn close(&self, error_code: VarInt, reason: &[u8]) -> Result<()> {
self.cancel_token.cancel();
self.endpoint.close(error_code, reason);
self.msock.close().await?;
pub async fn close(self, error_code: VarInt, reason: &[u8]) -> Result<()> {
let MagicEndpoint {
msock,
endpoint,
cancel_token,
..
} = self;
cancel_token.cancel();
tracing::debug!("Closing connections");
endpoint.close(error_code, reason);
endpoint.wait_idle().await;
// In case this is the last clone of `MagicEndpoint`, dropping the `quinn::Endpoint` will
// make it more likely that the underlying socket is not polled by quinn anymore after this
drop(endpoint);
tracing::debug!("Connections closed");

msock.close().await?;
Ok(())
}

Expand All @@ -582,8 +595,8 @@ impl MagicEndpoint {
}

#[cfg(test)]
pub(crate) fn magic_sock(&self) -> &MagicSock {
&self.msock
pub(crate) fn magic_sock(&self) -> MagicSock {
self.msock.clone()
}
#[cfg(test)]
pub(crate) fn endpoint(&self) -> &quinn::Endpoint {
Expand Down
25 changes: 15 additions & 10 deletions iroh-net/src/magicsock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,12 @@ impl Inner {
let bytes_total: usize = transmits.iter().map(|t| t.contents.len()).sum();
inc_by!(MagicsockMetrics, send_data, bytes_total as _);

let mut n = 0;
if transmits.is_empty() {
tracing::trace!(is_closed=?self.is_closed(), "poll_send without any quinn_udp::Transmit");
return Poll::Ready(Ok(n));
}

if self.is_closed() {
inc_by!(MagicsockMetrics, send_data_network_down, bytes_total as _);
return Poll::Ready(Err(io::Error::new(
Expand All @@ -295,10 +301,6 @@ impl Inner {
)));
}

let mut n = 0;
if transmits.is_empty() {
return Poll::Ready(Ok(n));
}
trace!(
"sending:\n{}",
transmits.iter().fold(
Expand Down Expand Up @@ -484,6 +486,7 @@ impl Inner {
Ok(sock)
}

/// NOTE: Receiving on a [`Self::closed`] socket will return [`Poll::Pending`] indefinitely.
#[instrument(skip_all, fields(me = %self.me))]
fn poll_recv(
&self,
Expand All @@ -494,10 +497,7 @@ impl Inner {
// FIXME: currently ipv4 load results in ipv6 traffic being ignored
debug_assert_eq!(bufs.len(), metas.len(), "non matching bufs & metas");
if self.is_closed() {
return Poll::Ready(Err(io::Error::new(
io::ErrorKind::NotConnected,
"connection closed",
)));
return Poll::Pending;
}

// order of polling is: UDPv4, UDPv6, relay
Expand Down Expand Up @@ -1411,6 +1411,8 @@ impl MagicSock {
/// Closes the connection.
///
/// Only the first close does anything. Any later closes return nil.
/// Polling the socket ([`AsyncUdpSocket::poll_recv`]) will return [`Poll::Pending`]
/// indefinitely after this call.
#[instrument(skip_all, fields(me = %self.inner.me))]
pub async fn close(&self) -> Result<()> {
if self.inner.is_closed() {
Expand Down Expand Up @@ -1596,6 +1598,7 @@ impl AsyncUdpSocket for MagicSock {
self.inner.poll_send(cx, transmits)
}

/// NOTE: Receiving on a [`Self::close`]d socket will return [`Poll::Pending`] indefinitely.
fn poll_recv(
&self,
cx: &mut Context,
Expand Down Expand Up @@ -2982,11 +2985,13 @@ pub(crate) mod tests {
let _guard = mesh_stacks(vec![m1.clone(), m2.clone()], url.clone()).await?;

println!("closing endpoints");
let msock1 = m1.endpoint.magic_sock();
let msock2 = m2.endpoint.magic_sock();
m1.endpoint.close(0u32.into(), b"done").await?;
m2.endpoint.close(0u32.into(), b"done").await?;

assert!(m1.endpoint.magic_sock().inner.is_closed());
assert!(m2.endpoint.magic_sock().inner.is_closed());
assert!(msock1.inner.is_closed());
assert!(msock2.inner.is_closed());
}
Ok(())
}
Expand Down

0 comments on commit 265e284

Please sign in to comment.