From 75afa82aa6d1992a10d5b9a0818ddb1fd8db5b4d Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 1 Dec 2025 09:23:25 -0800 Subject: [PATCH 1/2] Close NetRx write stream on exit (#2003) Summary: Currently, when `NetRx` exits, it does not close the write stream gracefully. Since `NetTx` is listening to this write stream for the ack messages, NetTx will see an `close_notify` error, and generate a log like this: > [-]E1125 06:50:46.476994 518989 fbcode/monarch/hyperactor/src/channel/net/client.rs:988] [net i/o loop{session:metatls:2401:db00:eef0:1120:3520:0:6c09:3cba:44243.17588939158573269181, connected:true, next_seq:2, largest_acked:AckedSeq { seq: 0, timestamp: "2s 805us 589ns" }, outbox:QueueValue::NonEmpty { len: 1, num_bytes_queued: 20, front: QueueEntryValue { seq: 1, since_received: ("817us 855ns",), since_sent: () }, back: QueueEntryValue { seq: 1, since_received: ("821us 911ns",), since_sent: () } }, unacked:QueueValue::Empty}] failed while receiving ack, dest:metatls:2401:db00:eef0:1120:3520:0:6c09:3cba:44243, session_id:17588939158573269181, error:peer closed connection without sending TLS close_notify: https://docs.rs/rustls/latest/rustls/manual/_03_howto/index.html#unexpected-eof This diff fixes that. Reviewed By: shayne-fletcher Differential Revision: D87863524 --- hyperactor/src/channel/net/client.rs | 32 ++++++++++------------------ hyperactor/src/channel/net/framed.rs | 15 +++++++++++++ hyperactor/src/channel/net/server.rs | 16 +++++++++++--- 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index f609cd1b5..afc963e1e 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -640,34 +640,24 @@ async fn run( let _ = notify.send(TxStatus::Closed); match conn { - Conn::Connected { write_state, .. } => { - let write_half = match write_state { - WriteState::Writing(mut frame_writer, ()) => { - if let Err(err) = frame_writer.send().await { - tracing::info!( - parent: &span, - dest = %dest, - error = %err, - session_id = session_id, - "write error during cleanup" - ); - } - Some(frame_writer.complete()) - } - WriteState::Idle(writer) => Some(writer), - WriteState::Broken => None, - }; - - if let Some(mut w) = write_half { - if let Err(err) = w.shutdown().await { + Conn::Connected { + mut write_state, .. + } => { + if let WriteState::Writing(frame_writer, ()) = &mut write_state { + if let Err(err) = frame_writer.send().await { tracing::info!( parent: &span, dest = %dest, error = %err, session_id = session_id, - "failed to shutdown NetTx write stream during cleanup" + "write error during cleanup" ); } + }; + if let Some(mut w) = write_state.into_writer() { + // Try to shutdown the connection gracefully. This is a best effort + // operation, and we don't care if it fails. + let _ = w.shutdown().await; } } Conn::Disconnected(_) => (), diff --git a/hyperactor/src/channel/net/framed.rs b/hyperactor/src/channel/net/framed.rs index e6cd0c3db..04d212133 100644 --- a/hyperactor/src/channel/net/framed.rs +++ b/hyperactor/src/channel/net/framed.rs @@ -373,6 +373,21 @@ impl WriteState { Self::Broken => panic!("illegal state"), } } + + /// Consume the state and return the underlying writer, if the + /// stream is not broken. + /// + /// For `Idle`, this returns the stored writer. For `Writing`, + /// this assumes no more frames will be sent and calls + /// `complete()` to recover the writer. For `Broken`, this returns + /// `None`. + pub fn into_writer(self) -> Option { + match self { + Self::Idle(w) => Some(w), + Self::Writing(w, _) => Some(w.complete()), + Self::Broken => None, + } + } } #[cfg(test)] diff --git a/hyperactor/src/channel/net/server.rs b/hyperactor/src/channel/net/server.rs index 343308392..47be5f736 100644 --- a/hyperactor/src/channel/net/server.rs +++ b/hyperactor/src/channel/net/server.rs @@ -21,6 +21,7 @@ use dashmap::DashMap; use dashmap::mapref::entry::Entry; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt as _; use tokio::io::ReadHalf; use tokio::io::WriteHalf; use tokio::sync::mpsc; @@ -80,7 +81,7 @@ impl ServerConn { /// Handles a server side stream created during the `listen` loop. async fn process( - &mut self, + mut self, session_id: u64, tx: mpsc::Sender, cancel_token: CancellationToken, @@ -421,6 +422,12 @@ impl ServerConn { }; } + if let Some(mut w) = self.write_state.into_writer() { + // Try to shutdown the connection gracefully. This is a best effort + // operation, and we don't care if it fails. + let _ = w.shutdown().await; + } + (final_next, final_result) } @@ -524,14 +531,17 @@ impl SessionManager { } }; + let source = conn.source.clone(); + let dest = conn.dest.clone(); + let next = session_var.take().await; let (next, res) = conn.process(session_id, tx, cancel_token, next).await; session_var.put(next).await; if let Err(ref err) = res { tracing::info!( - source = %conn.source, - dest = %conn.dest, + source = %source, + dest = %dest, error = ?err, session_id = session_id, "process encountered an error" From c3226acd8334dc836b78a921a87872fa907fc979 Mon Sep 17 00:00:00 2001 From: Peng Zhang Date: Mon, 1 Dec 2025 09:23:25 -0800 Subject: [PATCH 2/2] Close channel when NetRx is being stopped (#2010) Summary: When `NetRx` is stopped, we should let its `NetTx` half know as well, so it will exit too and stop reconnecting. Otherwise, those reconnections would all fail, and lead to a log spew. Reviewed By: shayne-fletcher Differential Revision: D87926632 --- hyperactor/src/channel/net.rs | 47 +++++++++++++++++++++++++++- hyperactor/src/channel/net/client.rs | 14 ++++++++- hyperactor/src/channel/net/server.rs | 24 ++++++++++---- 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/hyperactor/src/channel/net.rs b/hyperactor/src/channel/net.rs index 756f14ba5..aa89b48e0 100644 --- a/hyperactor/src/channel/net.rs +++ b/hyperactor/src/channel/net.rs @@ -106,8 +106,10 @@ enum Frame { #[derive(Debug, Serialize, Deserialize, EnumAsInner)] enum NetRxResponse { Ack(u64), - /// This channel is closed with the given reason. NetTx should stop reconnecting. + /// This session is rejected with the given reason. NetTx should stop reconnecting. Reject(String), + /// This channel is closed. + Closed, } fn serialize_response(response: NetRxResponse) -> Result { @@ -1612,6 +1614,9 @@ mod tests { handle.await.unwrap().unwrap(); // mpsc is closed too and there should be no unread message left. assert!(rx.recv().await.is_none()); + // should send NetRxResponse::Closed before stopping server. + let bytes = reader.next().await.unwrap().unwrap(); + assert!(deserialize_response(bytes).unwrap().is_closed()); // No more acks from server. assert!(reader.next().await.unwrap().is_none()); }; @@ -1646,6 +1651,9 @@ mod tests { handle.await.unwrap().unwrap(); // mpsc is closed too and there should be no unread message left. assert!(rx.recv().await.is_none()); + // should send NetRxResponse::Closed before stopping server. + let bytes = reader.next().await.unwrap().unwrap(); + assert!(deserialize_response(bytes).unwrap().is_closed()); // No more acks from server. assert!(reader.next().await.unwrap().is_none()); } @@ -2385,4 +2393,41 @@ mod tests { let bytes = reader.next().await.unwrap().unwrap(); assert!(deserialize_response(bytes).unwrap().is_reject()); } + + #[async_timed_test(timeout_secs = 60)] + // TODO: OSS: called `Result::unwrap()` on an `Err` value: Listen(Tcp([::1]:0), Os { code: 99, kind: AddrNotAvailable, message: "Cannot assign requested address" }) + #[cfg_attr(not(fbcode_build), ignore)] + async fn test_stop_net_tx_after_stopping_net_rx() { + hyperactor_telemetry::initialize_logging_for_test(); + + let config = config::global::lock(); + let _guard = + config.override_key(config::MESSAGE_DELIVERY_TIMEOUT, Duration::from_secs(300)); + let (addr, mut rx) = tcp::serve::("[::1]:0".parse().unwrap()).unwrap(); + let socket_addr = match addr { + ChannelAddr::Tcp(a) => a, + _ => panic!("unexpected channel type"), + }; + let tx = tcp::dial::(socket_addr); + // NetTx will not establish a connection until it sends the 1st message. + // Without a live connection, NetTx cannot received the Closed message + // from NetRx. Therefore, we need to send a message to establish the + //connection. + tx.send(100).await.unwrap(); + assert_eq!(rx.recv().await.unwrap(), 100); + // Drop rx will close the NetRx server. + rx.2.stop("testing"); + assert!(rx.recv().await.is_err()); + + // NetTx will only read from the stream when it needs to send a message + // or wait for an ack. Therefore we need to send a message to trigger that. + tx.post(101); + let mut watcher = tx.status().clone(); + // When NetRx exits, it should notify NetTx to exit as well. + let _ = watcher.wait_for(|val| *val == TxStatus::Closed).await; + // wait_for could return Err due to race between when watch's sender was + // dropped and when wait_for was called. So we still need to do an + // equality check. + assert_eq!(*watcher.borrow(), TxStatus::Closed); + } } diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index afc963e1e..cfd9af0ef 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -943,7 +943,19 @@ where ); (State::Closing { deliveries: Deliveries{outbox, unacked}, - reason: format!("{log_id}: {error_msg}"), + reason: error_msg, + }, Conn::reconnect_with_default()) + } + NetRxResponse::Closed => { + let msg = "server closed the channel".to_string(); + tracing::info!( + dest = %link.dest(), + session_id = session_id, + "{}", msg + ); + (State::Closing { + deliveries: Deliveries{outbox, unacked}, + reason: msg, }, Conn::reconnect_with_default()) } } diff --git a/hyperactor/src/channel/net/server.rs b/hyperactor/src/channel/net/server.rs index 47be5f736..0a8ac8bb6 100644 --- a/hyperactor/src/channel/net/server.rs +++ b/hyperactor/src/channel/net/server.rs @@ -89,7 +89,11 @@ impl ServerConn { ) -> (Next, Result<(), anyhow::Error>) { #[derive(Debug)] enum RejectConn { - Yes(String), + /// Reject the connection due to the given error. + EncounterError(String), + /// The server is being closed. + ServerClosing, + /// Do not reject the connection. No, } @@ -170,7 +174,7 @@ impl ServerConn { // Have a tick to abort select! call to make sure the ack for the last message can get the chance // to be sent as a result of time interval being reached. _ = RealClock.sleep_until(last_ack_time + ack_time_interval), if next.ack < next.seq => {}, - _ = cancel_token.cancelled() => break (next, Ok(()), RejectConn::No), + _ = cancel_token.cancelled() => break (next, Ok(()), RejectConn::ServerClosing), bytes_result = self.reader.next() => { rcv_raw_frame_count += 1; // First handle transport-level I/O errors, and EOFs. @@ -231,7 +235,7 @@ impl ServerConn { break ( next, Err(anyhow::anyhow!("{log_id}: unexpected init frame")), - RejectConn::Yes("expect Frame::Message; got Frame::Int".to_string()), + RejectConn::EncounterError("expect Frame::Message; got Frame::Int".to_string()), ) }, // Ignore retransmits. @@ -260,7 +264,7 @@ impl ServerConn { break ( next, Err(anyhow::anyhow!(format!("{log_id}: {error_msg}"))), - RejectConn::Yes(error_msg), + RejectConn::EncounterError(error_msg), ) } match self.send_with_buffer_metric(session_id, &tx, message).await { @@ -391,12 +395,20 @@ impl ServerConn { } if self.write_state.is_idle() - && let RejectConn::Yes(reason) = reject_conn + && matches!( + reject_conn, + RejectConn::EncounterError(_) | RejectConn::ServerClosing + ) { let Ok(writer) = replace(&mut self.write_state, WriteState::Broken).into_idle() else { panic!("illegal state"); }; - if let Ok(data) = serialize_response(NetRxResponse::Reject(reason)) { + let rsp = match reject_conn { + RejectConn::EncounterError(reason) => NetRxResponse::Reject(reason), + RejectConn::ServerClosing => NetRxResponse::Closed, + RejectConn::No => panic!("illegal state"), + }; + if let Ok(data) = serialize_response(rsp) { match FrameWrite::new( writer, data,