diff --git a/hyperactor/src/channel/net/client.rs b/hyperactor/src/channel/net/client.rs index 83ee28f46..37bf3cc6b 100644 --- a/hyperactor/src/channel/net/client.rs +++ b/hyperactor/src/channel/net/client.rs @@ -33,6 +33,7 @@ use super::framed::FrameReader; use super::framed::FrameWrite; use super::framed::WriteState; use crate::RemoteMessage; +use crate::channel::ChannelAddr; use crate::channel::ChannelError; use crate::channel::SendError; use crate::channel::TxStatus; @@ -203,14 +204,18 @@ struct Outbox<'a, M: RemoteMessage> { next_seq: u64, deque: MessageDeque, log_id: &'a str, + dest_addr: &'a ChannelAddr, + session_id: u64, } impl<'a, M: RemoteMessage> Outbox<'a, M> { - fn new(log_id: &'a str) -> Self { + fn new(log_id: &'a str, dest_addr: &'a ChannelAddr, session_id: u64) -> Self { Self { next_seq: 0, deque: MessageDeque(VecDeque::new()), log_id, + dest_addr, + session_id, } } @@ -255,7 +260,24 @@ impl<'a, M: RemoteMessage> Outbox<'a, M> { let frame = Frame::Message(self.next_seq, message); let message = serialize_bincode(&frame).map_err(|e| format!("serialization error: {e}"))?; - metrics::REMOTE_MESSAGE_SEND_SIZE.record(message.frame_len() as f64, &[]); + let message_size = message.frame_len(); + metrics::REMOTE_MESSAGE_SEND_SIZE.record(message_size as f64, &[]); + + // Track throughput for this channel pair + metrics::CHANNEL_THROUGHPUT_BYTES.add( + message_size as u64, + hyperactor_telemetry::kv_pairs!( + "dest" => self.dest_addr.to_string(), + "session_id" => self.session_id.to_string(), + ), + ); + metrics::CHANNEL_THROUGHPUT_MESSAGES.add( + 1, + hyperactor_telemetry::kv_pairs!( + "dest" => self.dest_addr.to_string(), + "session_id" => self.session_id.to_string(), + ), + ); self.deque.push_back(QueuedMessage { seq: self.next_seq, @@ -371,7 +393,7 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> { } /// Remove acked messages from the deque. - fn prune(&mut self, acked: u64, acked_at: Instant) { + fn prune(&mut self, acked: u64, acked_at: Instant, dest_addr: &ChannelAddr, session_id: u64) { assert!( self.largest_acked.as_ref().map_or(0, |i| i.0) <= acked, "{}: received out-of-order ack; received: {}; stored largest: {}", @@ -386,7 +408,16 @@ impl<'a, M: RemoteMessage> Unacked<'a, M> { let deque = &mut self.deque; while let Some(msg) = deque.front() { if msg.seq <= acked { - deque.pop_front(); + let msg: QueuedMessage = deque.pop_front().unwrap(); + // Track latency: time from when message was first received to when it was acked + let latency_micros = msg.received_at.elapsed().as_micros() as i64; + metrics::CHANNEL_LATENCY_MICROS.record( + latency_micros as f64, + hyperactor_telemetry::kv_pairs!( + "dest" => dest_addr.to_string(), + "session_id" => session_id.to_string(), + ), + ); } else { // Messages in the deque are orderd by seq in ascending // order. So we could return early once we encounter @@ -461,9 +492,9 @@ enum State<'a, M: RemoteMessage> { } impl<'a, M: RemoteMessage> State<'a, M> { - fn init(log_id: &'a str) -> Self { + fn init(log_id: &'a str, dest_addr: &'a ChannelAddr, session_id: u64) -> Self { Self::Running(Deliveries { - outbox: Outbox::new(log_id), + outbox: Outbox::new(log_id, dest_addr, session_id), unacked: Unacked::new(None, log_id), }) } @@ -543,7 +574,8 @@ async fn run( let session_id = rand::random(); let log_id = format!("session {}.{}", link.dest(), session_id); - let mut state = State::init(&log_id); + let dest = link.dest(); + let mut state = State::init(&log_id, &dest, session_id); let mut conn = Conn::reconnect_with_default(); let (state, conn) = loop { @@ -859,7 +891,7 @@ where Ok(response) => { match response { NetRxResponse::Ack(ack) => { - unacked.prune(ack, RealClock.now()); + unacked.prune(ack, RealClock.now(), &link.dest(), session_id); (State::Running(Deliveries { outbox, unacked }), Conn::Connected { reader, write_state }) } NetRxResponse::Reject => { @@ -934,6 +966,15 @@ where "{log_id}: outbox send error: {err}; message size: {}", outbox.front_size().expect("outbox should not be empty"), ); + // Track error for this channel pair + metrics::CHANNEL_ERRORS.add( + 1, + hyperactor_telemetry::kv_pairs!( + "dest" => link.dest().to_string(), + "session_id" => session_id.to_string(), + "error_type" => metrics::ChannelErrorType::SendError.as_str(), + ), + ); (State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default()) } } @@ -1030,6 +1071,18 @@ where // Need to resend unacked after reconnecting. let largest_acked = unacked.largest_acked; + let num_retries = unacked.deque.len(); + if num_retries > 0 { + // Track reconnection for this channel pair + metrics::CHANNEL_RECONNECTIONS.add( + 1, + hyperactor_telemetry::kv_pairs!( + "dest" => link.dest().to_string(), + "transport" => link.dest().transport().to_string(), + "reason" => "reconnect_with_unacked", + ), + ); + } outbox.requeue_unacked(unacked.deque); ( State::Running(Deliveries { @@ -1061,6 +1114,15 @@ where session_id, err ); + // Track connection error for this channel pair + metrics::CHANNEL_ERRORS.add( + 1, + hyperactor_telemetry::kv_pairs!( + "dest" => link.dest().to_string(), + "session_id" => session_id.to_string(), + "error_type" => metrics::ChannelErrorType::ConnectionError.as_str(), + ), + ); ( State::Running(Deliveries { outbox, unacked }), Conn::reconnect(backoff), diff --git a/hyperactor/src/channel/net/server.rs b/hyperactor/src/channel/net/server.rs index 6a642365f..cdb65f818 100644 --- a/hyperactor/src/channel/net/server.rs +++ b/hyperactor/src/channel/net/server.rs @@ -181,9 +181,21 @@ impl ServerConn { }; // De-frame the multi-part message. + let bytes_len = bytes.len(); let message = match serde_multipart::Message::from_framed(bytes) { Ok(message) => message, - Err(err) => break ( + Err(err) => { + // Track deframing error for this channel pair + metrics::CHANNEL_ERRORS.add( + 1, + hyperactor_telemetry::kv_pairs!( + "source" => self.source.to_string(), + "dest" => self.dest.to_string(), + "session_id" => session_id.to_string(), + "error_type" => metrics::ChannelErrorType::DeframeError.as_str(), + ), + ); + break ( next, Err::<(), anyhow::Error>(err.into()).context( format!( @@ -192,7 +204,8 @@ impl ServerConn { ) ), false - ), + ) + }, }; // Finally decode the message. This assembles the M-typed message @@ -220,6 +233,23 @@ impl ServerConn { } match self.send_with_buffer_metric(&log_id, &tx, message).await { Ok(()) => { + // Track throughput for this channel pair + metrics::CHANNEL_THROUGHPUT_BYTES.add( + bytes_len as u64, + hyperactor_telemetry::kv_pairs!( + "source" => self.source.to_string(), + "dest" => self.dest.to_string(), + "session_id" => session_id.to_string(), + ), + ); + metrics::CHANNEL_THROUGHPUT_MESSAGES.add( + 1, + hyperactor_telemetry::kv_pairs!( + "source" => self.source.to_string(), + "dest" => self.dest.to_string(), + "session_id" => session_id.to_string(), + ), + ); // In channel's contract, "delivered" means the message // is sent to the NetRx object. Therefore, we could bump // `next_seq` as far as the message is put on the mpsc @@ -236,16 +266,28 @@ impl ServerConn { } } }, - Err(err) => break ( - next, - Err::<(), anyhow::Error>(err.into()).context( - format!( - "{log_id}: deserialize message with M = {}", - type_name::(), - ) - ), - false - ), + Err(err) => { + // Track deserialization error for this channel pair + metrics::CHANNEL_ERRORS.add( + 1, + hyperactor_telemetry::kv_pairs!( + "source" => self.source.to_string(), + "dest" => self.dest.to_string(), + "session_id" => session_id.to_string(), + "error_type" => metrics::ChannelErrorType::DeserializeError.as_str(), + ), + ); + break ( + next, + Err::<(), anyhow::Error>(err.into()).context( + format!( + "{log_id}: deserialize message with M = {}", + type_name::(), + ) + ), + false + ) + }, } }, } @@ -490,11 +532,13 @@ where }; if let Err(ref err) = res { - metrics::CHANNEL_CONNECTION_ERRORS.add( + metrics::CHANNEL_ERRORS.add( 1, hyperactor_telemetry::kv_pairs!( "transport" => dest.transport().to_string(), "error" => err.to_string(), + "error_type" => metrics::ChannelErrorType::ConnectionError.as_str(), + "dest" => dest.to_string(), ), ); @@ -513,12 +557,13 @@ where }); } Err(err) => { - metrics::CHANNEL_CONNECTION_ERRORS.add( + metrics::CHANNEL_ERRORS.add( 1, hyperactor_telemetry::kv_pairs!( "transport" => listener_channel_addr.transport().to_string(), "operation" => "accept", "error" => err.to_string(), + "error_type" => metrics::ChannelErrorType::ConnectionError.as_str(), ), ); diff --git a/hyperactor/src/metrics.rs b/hyperactor/src/metrics.rs index d875625fd..2dacc4727 100644 --- a/hyperactor/src/metrics.rs +++ b/hyperactor/src/metrics.rs @@ -15,6 +15,31 @@ use hyperactor_telemetry::declare_static_histogram; use hyperactor_telemetry::declare_static_timer; use hyperactor_telemetry::declare_static_up_down_counter; +/// Error types for channel-related errors. Only used for telemetry. +#[derive(Debug, Clone, Copy)] +pub enum ChannelErrorType { + /// Error occurred while sending a message. + SendError, + /// Error occurred while connecting to a channel. + ConnectionError, + /// Error occurred while deframing a message. + DeframeError, + /// Error occurred while deserializing a message. + DeserializeError, +} + +impl ChannelErrorType { + /// Returns the string representation of the error type. + pub fn as_str(&self) -> &'static str { + match self { + ChannelErrorType::SendError => "send_error", + ChannelErrorType::ConnectionError => "connection_error", + ChannelErrorType::DeframeError => "deframe_error", + ChannelErrorType::DeserializeError => "deserialize_error", + } + } +} + // MAILBOX // Tracks messages that couldn't be delivered to their destination and were returned as undeliverable declare_static_counter!( @@ -44,16 +69,23 @@ declare_static_timer!( declare_static_histogram!(REMOTE_MESSAGE_SEND_SIZE, "channel.remote_message_send_size"); // Tracks the number of new channel connections established (client and server) declare_static_counter!(CHANNEL_CONNECTIONS, "channel.connections"); -// Tracks errors that occur when establishing channel connections -declare_static_counter!(CHANNEL_CONNECTION_ERRORS, "channel.connection_errors"); // Tracks the number of channel reconnection attempts declare_static_counter!(CHANNEL_RECONNECTIONS, "channel.reconnections"); +// Tracks errors for each channel pair +declare_static_counter!(CHANNEL_ERRORS, "channel.errors"); // Tracks the number of NetRx encountering full buffer, i.e. its mpsc channel. // This metric counts how often the NetRx→client mpsc channel remains full, // incrementing once per CHANNEL_NET_RX_BUFFER_FULL_CHECK_INTERVAL while blocked. declare_static_counter!(CHANNEL_NET_RX_BUFFER_FULL, "channel.net_rx_buffer_full"); +// Tracks throughput (bytes sent) +declare_static_counter!(CHANNEL_THROUGHPUT_BYTES, "channel.throughput.bytes"); +// Tracks throughput (message count) +declare_static_counter!(CHANNEL_THROUGHPUT_MESSAGES, "channel.throughput.messages"); +// Tracks message latency for each channel pair in microseconds +declare_static_histogram!(CHANNEL_LATENCY_MICROS, "channel.latency.us"); + // PROC MESH // Tracks the number of active processes in the process mesh declare_static_counter!(PROC_MESH_ALLOCATION, "proc_mesh.active_procs");