From 7dc69eca3fa2b1858c24577f5ee9dc2547885cb2 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 1 Dec 2025 09:23:28 -0800 Subject: [PATCH] Close NetRx write stream on exit (#2003) Summary: Currently, when `NetRx` exits, it does not close the write stream gracefully. Since `NetTx` is listening to this write stream for the ack messages, NetTx will see an `close_notify` error, and generate a log like this: > [-]E1125 06:50:46.476994 518989 fbcode/monarch/hyperactor/src/channel/net/client.rs:988] [net i/o loop{session:metatls:2401:db00:eef0:1120:3520:0:6c09:3cba:44243.17588939158573269181, connected:true, next_seq:2, largest_acked:AckedSeq { seq: 0, timestamp: "2s 805us 589ns" }, outbox:QueueValue::NonEmpty { len: 1, num_bytes_queued: 20, front: QueueEntryValue { seq: 1, since_received: ("817us 855ns",), since_sent: () }, back: QueueEntryValue { seq: 1, since_received: ("821us 911ns",), since_sent: () } }, unacked:QueueValue::Empty}] failed while receiving ack, dest:metatls:2401:db00:eef0:1120:3520:0:6c09:3cba:44243, session_id:17588939158573269181, error:peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof This diff fixes that. Reviewed By: shayne-fletcher Differential Revision: D87863524 --- hyperactor/src/channel/net/client.rs | 32 ++++++++++------------------ hyperactor/src/channel/net/framed.rs | 15 +++++++++++++ hyperactor/src/channel/net/server.rs | 16 +++++++++++--- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index f609cd1b5..afc963e1e 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -640,34 +640,24 @@ async fn run( let _ = notify.send(TxStatus::Closed); match conn { - Conn::Connected { write_state, .. } => { - let write_half = match write_state { - WriteState::Writing(mut frame_writer, ()) => { - if let Err(err) = frame_writer.send().await { - tracing::info!( - parent: &span, - dest = %dest, - error = %err, - session_id = session_id, - "write error during cleanup" - ); - } - Some(frame_writer.complete()) - } - WriteState::Idle(writer) => Some(writer), - WriteState::Broken => None, - }; - - if let Some(mut w) = write_half { - if let Err(err) = w.shutdown().await { + Conn::Connected { + mut write_state, .. + } => { + if let WriteState::Writing(frame_writer, ()) = &mut write_state { + if let Err(err) = frame_writer.send().await { tracing::info!( parent: &span, dest = %dest, error = %err, session_id = session_id, - "failed to shutdown NetTx write stream during cleanup" + "write error during cleanup" ); } + }; + if let Some(mut w) = write_state.into_writer() { + // Try to shutdown the connection gracefully. This is a best effort + // operation, and we don't care if it fails. + let _ = w.shutdown().await; } } Conn::Disconnected(_) => (), diff --git a/hyperactor/src/channel/net/framed.rs b/hyperactor/src/channel/net/framed.rs index e6cd0c3db..04d212133 100644 --- a/hyperactor/src/channel/net/framed.rs +++ b/hyperactor/src/channel/net/framed.rs @@ -373,6 +373,21 @@ impl WriteState { Self::Broken => panic!("illegal state"), } } + + /// Consume the state and return the underlying writer, if the + /// stream is not broken. + /// + /// For `Idle`, this returns the stored writer. For `Writing`, + /// this assumes no more frames will be sent and calls + /// `complete()` to recover the writer. For `Broken`, this returns + /// `None`. + pub fn into_writer(self) -> Option { + match self { + Self::Idle(w) => Some(w), + Self::Writing(w, _) => Some(w.complete()), + Self::Broken => None, + } + } } #[cfg(test)] diff --git a/hyperactor/src/channel/net/server.rs b/hyperactor/src/channel/net/server.rs index 343308392..47be5f736 100644 --- a/hyperactor/src/channel/net/server.rs +++ b/hyperactor/src/channel/net/server.rs @@ -21,6 +21,7 @@ use dashmap::DashMap; use dashmap::mapref::entry::Entry; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt as _; use tokio::io::ReadHalf; use tokio::io::WriteHalf; use tokio::sync::mpsc; @@ -80,7 +81,7 @@ impl ServerConn { /// Handles a server side stream created during the `listen` loop. async fn process( - &mut self, + mut self, session_id: u64, tx: mpsc::Sender, cancel_token: CancellationToken, @@ -421,6 +422,12 @@ impl ServerConn { }; } + if let Some(mut w) = self.write_state.into_writer() { + // Try to shutdown the connection gracefully. This is a best effort + // operation, and we don't care if it fails. + let _ = w.shutdown().await; + } + (final_next, final_result) } @@ -524,14 +531,17 @@ impl SessionManager { } }; + let source = conn.source.clone(); + let dest = conn.dest.clone(); + let next = session_var.take().await; let (next, res) = conn.process(session_id, tx, cancel_token, next).await; session_var.put(next).await; if let Err(ref err) = res { tracing::info!( - source = %conn.source, - dest = %conn.dest, + source = %source, + dest = %dest, error = ?err, session_id = session_id, "process encountered an error"