diff --git a/hyperactor/src/channel/net.rs b/hyperactor/src/channel/net.rs index d2a2b81c9..cfab5bc70 100644 --- a/hyperactor/src/channel/net.rs +++ b/hyperactor/src/channel/net.rs @@ -151,7 +151,11 @@ impl Tx for NetTx { } fn do_post(&self, message: M, return_channel: Option>>) { - tracing::trace!(name = "post", "sending message to {}", self.dest); + tracing::trace!( + name = "post", + dest = %self.dest, + "sending message" + ); let return_channel = return_channel.unwrap_or_else(|| oneshot::channel().0); if let Err(mpsc::error::SendError((message, return_channel, _))) = @@ -168,7 +172,11 @@ pub struct NetRx(mpsc::Receiver, ChannelAddr, ServerHandle) #[async_trait] impl Rx for NetRx { async fn recv(&mut self) -> Result { - tracing::trace!(name = "recv", "receiving message from {}", self.1); + tracing::trace!( + name = "recv", + source = %self.1, + "receiving message" + ); self.0.recv().await.ok_or(ChannelError::Closed) } diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index f162714cb..52f80ef93 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -188,10 +188,16 @@ impl QueuedMessage { .send(SendError(ChannelError::Closed, msg)); } Ok(_) => { - tracing::debug!("queued frame was not a Frame::Message; dropping without return"); + tracing::debug!( + seq = self.seq, + "queued frame was not a Frame::Message; dropping without return" + ); } - Err(e) => { - tracing::warn!("failed to deserialize queued frame for return: {e}"); + Err(_e) => { + tracing::warn!( + seq = self.seq, + "failed to deserialize queued frame for return" + ); } } } @@ -596,7 +602,12 @@ async fn run( let span = state_span(&state, &conn, session_id, &link); - tracing::debug!(parent: &span, "{log_id}: NetTx exited its loop with state: {state}"); + tracing::info!( + parent: &span, + dest = %dest, + session_id = session_id, + "NetTx exited its loop with state: {}", state + ); match state { State::Closing { @@ -626,7 +637,12 @@ async fn run( // Notify senders that this link is no longer usable if let Err(err) = notify.send(TxStatus::Closed) { - tracing::debug!(parent: &span, "{log_id}: tx status update error: {err}"); + tracing::debug!( + dest = %dest, + error = %err, + session_id = session_id, + "tx status update error" + ); } if let Conn::Connected { @@ -635,13 +651,30 @@ async fn run( } = conn { if let Err(err) = frame_writer.send().await { - tracing::info!(parent: &span, "{log_id}: write error: {err}",); + tracing::info!( + parent: &span, + dest = %dest, + error = %err, + session_id = session_id, + "write error during cleanup" + ); } else if let Err(err) = frame_writer.complete().flush().await { - tracing::info!(parent: &span, "{log_id}: flush error: {err}",); + tracing::info!( + parent: &span, + dest = %dest, + error = %err, + session_id = session_id, + "flush error during cleanup" + ); } } - tracing::debug!(parent: &span, "{log_id}: NetTx::run exits"); + tracing::info!( + parent: &span, + dest = %dest, + session_id = session_id, + "NetTx::run exits" + ); } fn state_span<'a, L, S, M>(state: &State<'a, M>, conn: &Conn, session_id: u64, link: &L) -> Span @@ -801,12 +834,17 @@ where (running, conn) } Err(err) => { - let error_msg = format!("{log_id}: failed to push message to outbox: {err}"); - tracing::error!(error_msg); + let error_msg = "failed to push message to outbox"; + tracing::error!( + dest = %link.dest(), + session_id = session_id, + error = %err, + "{}", error_msg + ); ( State::Closing { deliveries: Deliveries { outbox, unacked }, - reason: error_msg, + reason: format!("{log_id}: {error_msg}: {err}"), }, conn, ) @@ -846,6 +884,8 @@ where Err((writer, e)) => { debug_assert_eq!(e.kind(), io::ErrorKind::InvalidData); tracing::error!( + dest = %link.dest(), + session_id = session_id, "rejecting oversize frame: len={} > max={}. \ ack will not arrive before timeout; increase CODEC_MAX_FRAME_LENGTH to allow.", len, @@ -853,15 +893,18 @@ where ); // Reject and return. outbox.pop_front().expect("not empty").try_return(); - let error_msg = - format!("{log_id}: oversized frame was rejected. closing channel"); - tracing::error!(error_msg); + let error_msg = "oversized frame was rejected. closing channel"; + tracing::error!( + dest = %link.dest(), + session_id = session_id, + "{}", error_msg, + ); // Close the channel (avoid sequence // violations). ( State::Closing { deliveries: Deliveries { outbox, unacked }, - reason: error_msg, + reason: format!("{log_id}: {error_msg}"), }, Conn::Connected { reader, @@ -895,27 +938,34 @@ where (State::Running(Deliveries { outbox, unacked }), Conn::Connected { reader, write_state }) } NetRxResponse::Reject => { - let error_msg = format!( - "{log_id}: server rejected connection.", - ); - tracing::error!(error_msg); + let error_msg = "server rejected connection"; + tracing::error!( + dest = %link.dest(), + session_id = session_id, + "{}", error_msg + ); (State::Closing { deliveries: Deliveries{outbox, unacked}, - reason: error_msg, + reason: format!("{log_id}: {error_msg}"), }, Conn::reconnect_with_default()) } } } Err(err) => { - let error_msg = format!( - "{log_id}: failed deserializing response: {err}", - ); - tracing::error!(error_msg); + let error_msg = "failed deserializing response"; + tracing::error!( + dest = %link.dest(), + session_id = session_id, + error = %err, + "{}", error_msg + ); // Similar to the message flow, we always close the // channel when encountering ser/deser errors. (State::Closing { deliveries: Deliveries{outbox, unacked}, - reason: error_msg, + reason: format!( + "{log_id}: {error_msg}: {err}", + ), }, Conn::Connected { reader, write_state }) } } @@ -926,8 +976,11 @@ where } Err(err) => { tracing::error!( - "{log_id}: failed while receiving ack: {err}", - ); + dest = %link.dest(), + session_id = session_id, + error = %err, + "failed while receiving ack" + ); // Reconnect and wish the error will go away. (State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default()) } @@ -937,13 +990,17 @@ where // If acking message takes too long, consider the link broken. _ = unacked.wait_for_timeout(), if !unacked.is_empty() => { let error_msg = format!( - "{log_id}: failed to receive ack within timeout {:?}; link is currently connected", + "failed to receive ack within timeout {:?}; link is currently connected", config::global::get(config::MESSAGE_DELIVERY_TIMEOUT), ); - tracing::error!(error_msg); + tracing::error!( + dest = %link.dest(), + session_id = session_id, + "{}", error_msg, + ); (State::Closing { deliveries: Deliveries{outbox, unacked}, - reason: error_msg, + reason: format!("{log_id}: {error_msg}"), }, Conn::Connected { reader, write_state }) } @@ -963,18 +1020,21 @@ where } Err(err) => { tracing::info!( - "{log_id}: outbox send error: {err}; message size: {}", + dest = %link.dest(), + session_id, + error = %err, + "outbox send error; message size: {}", outbox.front_size().expect("outbox should not be empty"), ); - // Track error for this channel pair - metrics::CHANNEL_ERRORS.add( - 1, - hyperactor_telemetry::kv_pairs!( - "dest" => link.dest().to_string(), - "session_id" => session_id.to_string(), - "error_type" => metrics::ChannelErrorType::SendError.as_str(), - ), - ); + // Track error for this channel pair + metrics::CHANNEL_ERRORS.add( + 1, + hyperactor_telemetry::kv_pairs!( + "dest" => link.dest().to_string(), + "session_id" => session_id.to_string(), + "error_type" => metrics::ChannelErrorType::SendError.as_str(), + ), + ); (State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default()) } } @@ -994,13 +1054,16 @@ where (running, Conn::Connected { reader, write_state }) } Err(err) => { - let error_msg = format!( - "{log_id}: failed to push message to outbox: {err}", + let error_msg = "failed to push message to outbox"; + tracing::error!( + dest = %link.dest(), + session_id, + error = %err, + "{}", error_msg, ); - tracing::error!(error_msg); (State::Closing { deliveries: Deliveries {outbox, unacked}, - reason: error_msg, + reason: format!("{log_id}: {error_msg}: {err}"), }, Conn::Connected { reader, write_state }) } } @@ -1026,27 +1089,35 @@ where // consider the link broken. if outbox.is_expired() { let error_msg = format!( - "{log_id}: failed to deliver message within timeout {:?}", + "failed to deliver message within timeout {:?}", config::global::get(config::MESSAGE_DELIVERY_TIMEOUT) ); - tracing::error!(error_msg); + tracing::error!( + dest = %link.dest(), + session_id, + "{}", error_msg + ); ( State::Closing { deliveries: Deliveries { outbox, unacked }, - reason: error_msg, + reason: format!("{log_id}: {error_msg}"), }, Conn::reconnect_with_default(), ) } else if unacked.is_expired() { let error_msg = format!( - "{log_id}: failed to receive ack within timeout {:?}; link is currently broken", + "failed to receive ack within timeout {:?}; link is currently broken", config::global::get(config::MESSAGE_DELIVERY_TIMEOUT), ); - tracing::error!(error_msg); + tracing::error!( + dest = %link.dest(), + session_id = session_id, + "{}", error_msg + ); ( State::Closing { deliveries: Deliveries { outbox, unacked }, - reason: error_msg, + reason: format!("{log_id}: {error_msg}"), }, Conn::reconnect_with_default(), ) @@ -1112,10 +1183,10 @@ where } Err(err) => { tracing::debug!( - "session {}.{}: failed to connect: {}", - link.dest(), - session_id, - err + dest = %link.dest(), + error = %err, + session_id = session_id, + "failed to connect" ); // Track connection error for this channel pair metrics::CHANNEL_ERRORS.add( diff --git a/hyperactor/src/channel/net/server.rs b/hyperactor/src/channel/net/server.rs index cdb65f818..b5a811e0d 100644 --- a/hyperactor/src/channel/net/server.rs +++ b/hyperactor/src/channel/net/server.rs @@ -124,7 +124,12 @@ impl ServerConn { } Err((writer, e)) => { debug_assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - tracing::error!("failed to create ack frame (should be tiny): {e}"); + tracing::error!( + source = %self.source, + dest = %self.dest, + error = %e, + "failed to create ack frame" + ); self.write_state = WriteState::Idle(writer); } } @@ -165,7 +170,11 @@ impl ServerConn { let bytes = match bytes_result { Ok(Some(bytes)) => bytes, Ok(None) => { - tracing::debug!("{log_id}: EOF"); + tracing::debug!( + source = %self.source, + dest = %self.dest, + "received EOF from client" + ); break (next, Ok(()), false); } Err(err) => break ( @@ -217,9 +226,12 @@ impl ServerConn { // Ignore retransmits. Ok(Frame::Message(seq, _)) if seq < next.seq => { tracing::debug!( - "{log_id}: ignoring retransmit; retransmit seq: {}; expected next seq: {}", + source = %self.source, + dest = %self.dest, + seq = seq, + "ignoring retransmit; retransmit seq: {}; expected next seq: {}", seq, - next.seq, + next.seq ); }, // The following segment ensures exactly-once semantics. @@ -227,11 +239,16 @@ impl ServerConn { Ok(Frame::Message(seq, message)) => { // received seq should be equal to next seq. Else error out! if seq > next.seq { - let msg = format!("{log_id}: out-of-sequence message, expected seq {}, got {}", next.seq, seq); - tracing::error!(msg); - break (next, Err(anyhow::anyhow!(msg)), true) + let error_msg = format!("out-of-sequence message, expected seq {}, got {}", next.seq, seq); + tracing::error!( + source = %self.source, + dest = %self.dest, + seq = seq, + "{}", error_msg + ); + break (next, Err(anyhow::anyhow!(format!("{log_id}: {error_msg}"))), true) } - match self.send_with_buffer_metric(&log_id, &tx, message).await { + match self.send_with_buffer_metric(session_id, &tx, message).await { Ok(()) => { // Track throughput for this channel pair metrics::CHANNEL_THROUGHPUT_BYTES.add( @@ -297,13 +314,19 @@ impl ServerConn { // 1. processed seq/ack is Next-1; // 2. rcv_raw_frame_count contains the last frame which might not be // desrializable, e.g. EOF, error, etc. - tracing::debug!( - "{log_id}: NetRx::process exited its loop with states: initial Next \ + let debug_msg = format!( + "NetRx::process exited its loop with states: initial Next \ was {initial_next}; final Next is {final_next}; since acked: {}sec; \ rcv raw frame count is {rcv_raw_frame_count}; final result: {:?}", last_ack_time.elapsed().as_secs(), final_result, ); + tracing::info!( + source = %self.source, + dest = %self.dest, + session_id, + "{}", debug_msg + ); let mut final_ack = final_next.ack; // Flush any ongoing write. @@ -336,11 +359,11 @@ impl ServerConn { } Err(e) => { tracing::debug!( - "{log_id}: failed to flush acks due to error : {e}. \ - Normally, this is okay because Tx will reconnect, and \ - acks will be resent in the next connection. However, if \ - either Tx or Rx is dropped, the reconnection will not \ - happen, and subsequently the pending ack will never be sent out.", + session_id, + source = %self.source, + dest = %self.dest, + error = %e, + "failed to flush acks during cleanup" ); } } @@ -362,7 +385,13 @@ impl ServerConn { } Err((w, e)) => { debug_assert_eq!(e.kind(), std::io::ErrorKind::InvalidData); - tracing::debug!("failed to create reject frame (should be tiny): {e}"); + tracing::debug!( + source = %self.source, + dest = %self.dest, + session_id = session_id, + error = %e, + "failed to create reject frame" + ); self.write_state = WriteState::Idle(w); // drop the reject; we're closing anyway } @@ -382,7 +411,7 @@ impl ServerConn { // occurences. async fn send_with_buffer_metric( &mut self, - log_id: &str, + session_id: u64, tx: &mpsc::Sender, message: M, ) -> anyhow::Result<()> { @@ -401,12 +430,16 @@ impl ServerConn { hyperactor_telemetry::kv_pairs!( "dest" => self.dest.to_string(), "source" => self.source.to_string(), + "session_id" => session_id.to_string(), ), ); // Full buffer should happen rarely. So we also add a log // here to make debugging easy. - tracing::debug!( - "{log_id}: encountered full mpsc channel for {} secs", + tracing::debug!( + source = %self.source, + dest = %self.dest, + session_id = session_id, + "encountered full mpsc channel for {} secs", start.elapsed().as_secs(), ); } @@ -474,7 +507,13 @@ impl SessionManager { session_var.put(next).await; if let Err(ref err) = res { - tracing::info!("process encountered an error: {:#}", err); + tracing::info!( + source = %conn.source, + dest = %conn.dest, + error = ?err, + session_id = session_id, + "process encountered an error" + ); } res @@ -507,7 +546,11 @@ where match result { Ok((stream, addr)) => { let source : ChannelAddr = addr.into(); - tracing::debug!("listen {}: new connection from {}", listener_channel_addr, source); + tracing::debug!( + source = %source, + addr = %listener_channel_addr, + "new connection accepted" + ); metrics::CHANNEL_CONNECTIONS.add( 1, hyperactor_telemetry::kv_pairs!( @@ -547,9 +590,12 @@ where ChannelAddr::Tcp(source_addr) if source_addr.ip().is_loopback() => {}, _ => { tracing::info!( - "serve: error processing peer connection {} <- {}: {:?}", - dest, source, err - ); + source = %source, + dest = %dest, + error = ?err, + "error processing peer connection" + ); + } } } @@ -567,19 +613,30 @@ where ), ); - tracing::info!("serve {}: accept error: {}", listener_channel_addr, err) + tracing::info!( + addr = %listener_channel_addr, + error = %err, + "accept error" + ); } } } _ = parent_cancel_token.cancelled() => { - tracing::info!("serve {}: received parent token cancellation", listener_channel_addr); + tracing::info!( + addr = %listener_channel_addr, + "received parent token cancellation" + ); break Ok(()); } result = join_nonempty(&mut connections) => { if let Err(err) = result { - tracing::info!("connection error: {}: {}", listener_channel_addr, err); + tracing::info!( + addr = %listener_channel_addr, + error = %err, + "connection task join error" + ); } } } @@ -624,7 +681,13 @@ impl ServerHandle { /// on active connections. After draining is completed, the /// connections are closed. pub(crate) fn stop(&self, reason: &str) { - tracing::info!("stopping server: {}; reason: {}", self, reason); + tracing::info!( + name = "ChannelServerStatus", + addr = %self.channel_addr, + status = "Stop::Sent", + reason, + "sent Stop signal; check server logs for the stop progress" + ); self.cancel_token.cancel(); }