Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions hyperactor/src/channel/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,11 @@ impl<M: RemoteMessage> Tx<M> for NetTx<M> {
}

fn do_post(&self, message: M, return_channel: Option<oneshot::Sender<SendError<M>>>) {
tracing::trace!(name = "post", "sending message to {}", self.dest);
tracing::trace!(
name = "post",
dest = %self.dest,
"sending message"
);

let return_channel = return_channel.unwrap_or_else(|| oneshot::channel().0);
if let Err(mpsc::error::SendError((message, return_channel, _))) =
Expand All @@ -168,7 +172,11 @@ pub struct NetRx<M: RemoteMessage>(mpsc::Receiver<M>, ChannelAddr, ServerHandle)
#[async_trait]
impl<M: RemoteMessage> Rx<M> for NetRx<M> {
async fn recv(&mut self) -> Result<M, ChannelError> {
tracing::trace!(name = "recv", "receiving message from {}", self.1);
tracing::trace!(
name = "recv",
source = %self.1,
"receiving message"
);
self.0.recv().await.ok_or(ChannelError::Closed)
}

Expand Down
179 changes: 125 additions & 54 deletions hyperactor/src/channel/net/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,16 @@ impl<M: RemoteMessage> QueuedMessage<M> {
.send(SendError(ChannelError::Closed, msg));
}
Ok(_) => {
tracing::debug!("queued frame was not a Frame::Message; dropping without return");
tracing::debug!(
seq = self.seq,
"queued frame was not a Frame::Message; dropping without return"
);
}
Err(e) => {
tracing::warn!("failed to deserialize queued frame for return: {e}");
Err(_e) => {
tracing::warn!(
seq = self.seq,
"failed to deserialize queued frame for return"
);
}
}
}
Expand Down Expand Up @@ -596,7 +602,12 @@ async fn run<M: RemoteMessage>(

let span = state_span(&state, &conn, session_id, &link);

tracing::debug!(parent: &span, "{log_id}: NetTx exited its loop with state: {state}");
tracing::info!(
parent: &span,
dest = %dest,
session_id = session_id,
"NetTx exited its loop with state: {}", state
);

match state {
State::Closing {
Expand Down Expand Up @@ -626,7 +637,12 @@ async fn run<M: RemoteMessage>(

// Notify senders that this link is no longer usable
if let Err(err) = notify.send(TxStatus::Closed) {
tracing::debug!(parent: &span, "{log_id}: tx status update error: {err}");
tracing::debug!(
dest = %dest,
error = %err,
session_id = session_id,
"tx status update error"
);
}

if let Conn::Connected {
Expand All @@ -635,13 +651,30 @@ async fn run<M: RemoteMessage>(
} = conn
{
if let Err(err) = frame_writer.send().await {
tracing::info!(parent: &span, "{log_id}: write error: {err}",);
tracing::info!(
parent: &span,
dest = %dest,
error = %err,
session_id = session_id,
"write error during cleanup"
);
} else if let Err(err) = frame_writer.complete().flush().await {
tracing::info!(parent: &span, "{log_id}: flush error: {err}",);
tracing::info!(
parent: &span,
dest = %dest,
error = %err,
session_id = session_id,
"flush error during cleanup"
);
}
}

tracing::debug!(parent: &span, "{log_id}: NetTx::run exits");
tracing::info!(
parent: &span,
dest = %dest,
session_id = session_id,
"NetTx::run exits"
);
}

fn state_span<'a, L, S, M>(state: &State<'a, M>, conn: &Conn<S>, session_id: u64, link: &L) -> Span
Expand Down Expand Up @@ -801,12 +834,17 @@ where
(running, conn)
}
Err(err) => {
let error_msg = format!("{log_id}: failed to push message to outbox: {err}");
tracing::error!(error_msg);
let error_msg = "failed to push message to outbox";
tracing::error!(
dest = %link.dest(),
session_id = session_id,
error = %err,
"{}", error_msg
);
(
State::Closing {
deliveries: Deliveries { outbox, unacked },
reason: error_msg,
reason: format!("{log_id}: {error_msg}: {err}"),
},
conn,
)
Expand Down Expand Up @@ -846,22 +884,27 @@ where
Err((writer, e)) => {
debug_assert_eq!(e.kind(), io::ErrorKind::InvalidData);
tracing::error!(
dest = %link.dest(),
session_id = session_id,
"rejecting oversize frame: len={} > max={}. \
ack will not arrive before timeout; increase CODEC_MAX_FRAME_LENGTH to allow.",
len,
max
);
// Reject and return.
outbox.pop_front().expect("not empty").try_return();
let error_msg =
format!("{log_id}: oversized frame was rejected. closing channel");
tracing::error!(error_msg);
let error_msg = "oversized frame was rejected. closing channel";
tracing::error!(
dest = %link.dest(),
session_id = session_id,
"{}", error_msg,
);
// Close the channel (avoid sequence
// violations).
(
State::Closing {
deliveries: Deliveries { outbox, unacked },
reason: error_msg,
reason: format!("{log_id}: {error_msg}"),
},
Conn::Connected {
reader,
Expand Down Expand Up @@ -895,27 +938,34 @@ where
(State::Running(Deliveries { outbox, unacked }), Conn::Connected { reader, write_state })
}
NetRxResponse::Reject => {
let error_msg = format!(
"{log_id}: server rejected connection.",
);
tracing::error!(error_msg);
let error_msg = "server rejected connection";
tracing::error!(
dest = %link.dest(),
session_id = session_id,
"{}", error_msg
);
(State::Closing {
deliveries: Deliveries{outbox, unacked},
reason: error_msg,
reason: format!("{log_id}: {error_msg}"),
}, Conn::reconnect_with_default())
}
}
}
Err(err) => {
let error_msg = format!(
"{log_id}: failed deserializing response: {err}",
);
tracing::error!(error_msg);
let error_msg = "failed deserializing response";
tracing::error!(
dest = %link.dest(),
session_id = session_id,
error = %err,
"{}", error_msg
);
// Similar to the message flow, we always close the
// channel when encountering ser/deser errors.
(State::Closing {
deliveries: Deliveries{outbox, unacked},
reason: error_msg,
reason: format!(
"{log_id}: {error_msg}: {err}",
),
}, Conn::Connected { reader, write_state })
}
}
Expand All @@ -926,8 +976,11 @@ where
}
Err(err) => {
tracing::error!(
"{log_id}: failed while receiving ack: {err}",
);
dest = %link.dest(),
session_id = session_id,
error = %err,
"failed while receiving ack"
);
// Reconnect and wish the error will go away.
(State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default())
}
Expand All @@ -937,13 +990,17 @@ where
// If acking message takes too long, consider the link broken.
_ = unacked.wait_for_timeout(), if !unacked.is_empty() => {
let error_msg = format!(
"{log_id}: failed to receive ack within timeout {:?}; link is currently connected",
"failed to receive ack within timeout {:?}; link is currently connected",
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
);
tracing::error!(error_msg);
tracing::error!(
dest = %link.dest(),
session_id = session_id,
"{}", error_msg,
);
(State::Closing {
deliveries: Deliveries{outbox, unacked},
reason: error_msg,
reason: format!("{log_id}: {error_msg}"),
}, Conn::Connected { reader, write_state })
}

Expand All @@ -963,18 +1020,21 @@ where
}
Err(err) => {
tracing::info!(
"{log_id}: outbox send error: {err}; message size: {}",
dest = %link.dest(),
session_id,
error = %err,
"outbox send error; message size: {}",
outbox.front_size().expect("outbox should not be empty"),
);
// Track error for this channel pair
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"dest" => link.dest().to_string(),
"session_id" => session_id.to_string(),
"error_type" => metrics::ChannelErrorType::SendError.as_str(),
),
);
// Track error for this channel pair
metrics::CHANNEL_ERRORS.add(
1,
hyperactor_telemetry::kv_pairs!(
"dest" => link.dest().to_string(),
"session_id" => session_id.to_string(),
"error_type" => metrics::ChannelErrorType::SendError.as_str(),
),
);
(State::Running(Deliveries { outbox, unacked }), Conn::reconnect_with_default())
}
}
Expand All @@ -994,13 +1054,16 @@ where
(running, Conn::Connected { reader, write_state })
}
Err(err) => {
let error_msg = format!(
"{log_id}: failed to push message to outbox: {err}",
let error_msg = "failed to push message to outbox";
tracing::error!(
dest = %link.dest(),
session_id,
error = %err,
"{}", error_msg,
);
tracing::error!(error_msg);
(State::Closing {
deliveries: Deliveries {outbox, unacked},
reason: error_msg,
reason: format!("{log_id}: {error_msg}: {err}"),
}, Conn::Connected { reader, write_state })
}
}
Expand All @@ -1026,27 +1089,35 @@ where
// consider the link broken.
if outbox.is_expired() {
let error_msg = format!(
"{log_id}: failed to deliver message within timeout {:?}",
"failed to deliver message within timeout {:?}",
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT)
);
tracing::error!(error_msg);
tracing::error!(
dest = %link.dest(),
session_id,
"{}", error_msg
);
(
State::Closing {
deliveries: Deliveries { outbox, unacked },
reason: error_msg,
reason: format!("{log_id}: {error_msg}"),
},
Conn::reconnect_with_default(),
)
} else if unacked.is_expired() {
let error_msg = format!(
"{log_id}: failed to receive ack within timeout {:?}; link is currently broken",
"failed to receive ack within timeout {:?}; link is currently broken",
config::global::get(config::MESSAGE_DELIVERY_TIMEOUT),
);
tracing::error!(error_msg);
tracing::error!(
dest = %link.dest(),
session_id = session_id,
"{}", error_msg
);
(
State::Closing {
deliveries: Deliveries { outbox, unacked },
reason: error_msg,
reason: format!("{log_id}: {error_msg}"),
},
Conn::reconnect_with_default(),
)
Expand Down Expand Up @@ -1112,10 +1183,10 @@ where
}
Err(err) => {
tracing::debug!(
"session {}.{}: failed to connect: {}",
link.dest(),
session_id,
err
dest = %link.dest(),
error = %err,
session_id = session_id,
"failed to connect"
);
// Track connection error for this channel pair
metrics::CHANNEL_ERRORS.add(
Expand Down
Loading