Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,3 @@

**notes:**
- `Tx::send` completes after local enqueue (oneshot dropped).
- if the receiver is dropped, `try_post` fails immediately with `Err(SendError(ChannelError::Closed, message))`.
9 changes: 4 additions & 5 deletions docs/source/books/hyperactor-book/src/channels/tx_rx.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ Under the hood, network transports use a length-prefixed, multipart frame with c
```rust
#[async_trait]
pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
fn try_post(&self, message: M, return_channel: oneshot::Sender<M>) -> Result<(), SendError<M>>;
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>);
fn post(&self, message: M);
async fn send(&self, message: M) -> Result<(), SendError<M>>;
fn addr(&self) -> ChannelAddr;
Expand All @@ -32,8 +32,7 @@ pub trait Tx<M: RemoteMessage>: std::fmt::Debug {

- **`try_post(message, return_channel)`**
Enqueues locally.
- Immediate failure → `Err(SendError(ChannelError::Closed, message))`.
- `Ok(())` means queued; if delivery later fails, the original message is sent back on `return_channel`.
- If delivery later fails, the original message is sent back on `return_channel` as SendError.

- **`post(message)`**
Fire-and-forget wrapper around `try_post`. The caller should monitor `status()` for health instead of relying on return values.
Expand Down Expand Up @@ -91,7 +90,7 @@ pub trait Rx<M: RemoteMessage>: std::fmt::Debug {
### Failure semantics
- **Closed receiver:** `recv()` returns `Err(ChannelError::Closed)`.
- **Network transports:** disconnects trigger exponential backoff reconnects; unacked messages are retried. If recovery ultimately fails (e.g., connection cannot be re-established within the delivery timeout window), the client closes and returns all undelivered/unacked messages via their `return_channel`. `status()` flips to `Closed`.
- **Local transport:** no delayed return path; if the receiver is gone, `try_post` fails immediately with `Err(SendError(ChannelError::Closed, message))`.
- **Local transport:** no delayed return path.
- **Network disconnects (EOF/I/O error/temporary break):** the client reconnects with exponential backoff and resends any unacked messages; the server deduplicates by `seq`.
- **Delivery timeout:** see [Size & time limits](#size--time-limits).

Expand All @@ -104,7 +103,7 @@ pub trait Rx<M: RemoteMessage>: std::fmt::Debug {

Concrete channel implementations that satisfy `Tx<M>` / `Rx<M>`:

- **Local** — in-process only; uses `tokio::sync::mpsc`. No network framing/acks. `try_post` fails immediately if the receiver is gone.
- **Local** — in-process only; uses `tokio::sync::mpsc`. No network framing/acks.
_Dial/serve:_ `serve_local::<M>()`, `ChannelAddr::Local(_)`.

- **TCP** — `tokio::net::TcpStream` with 8-byte BE length-prefixed frames; `seq`/`ack` for exactly-once into the server queue; reconnects with backoff.
Expand Down
23 changes: 9 additions & 14 deletions docs/source/books/hyperactor-book/src/mailboxes/mailbox_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,23 +124,18 @@ impl MailboxClient {
let return_handle_0 = return_handle.clone();
tokio::spawn(async move {
let result = return_receiver.await;
if let Ok(message) = result {
let _ = return_handle_0.send(Undeliverable(message));
} else {
// Sender dropped, this task can end.
if let Ok(SendError(e, message)) = result {
message.undeliverable(
DeliveryError::BrokenLink(format!(
"failed to enqueue in MailboxClient when processing buffer: {e}"
)),
return_handle_0,
);
}
});
// Send the message for transmission.
let return_handle_1 = return_handle.clone();
async move {
if let Err(SendError(_, envelope)) = tx.try_post(envelope, return_channel) {
// Failed to enqueue.
envelope.undeliverable(
DeliveryError::BrokenLink("failed to enqueue in MailboxClient".to_string()),
return_handle_1.clone(),
);
}
}
tx.try_post(envelope, return_channel);
future::ready(())
});
let this = Self {
buffer,
Expand Down
4 changes: 1 addition & 3 deletions hyperactor/benches/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ fn bench_message_rates(c: &mut Criterion) {
Vec::with_capacity(rate as usize);
for _ in 0..rate {
let (return_sender, return_receiver) = oneshot::channel();
if let Err(e) = tx.try_post(message.clone(), return_sender) {
panic!("Failed to send message: {:?}", e);
}
tx.try_post(message.clone(), return_sender);

let handle = tokio::spawn(async move {
_ = tokio::time::timeout(
Expand Down
49 changes: 23 additions & 26 deletions hyperactor/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,28 +109,24 @@ pub enum TxStatus {
pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
/// Enqueue a `message` on the local end of the channel. The
/// message is either delivered, or we eventually discover that
/// the channel has failed and it will be sent back on `return_handle`.
// TODO: the return channel should be SendError<M> directly, and we should drop
// the returned result.
/// the channel has failed and it will be sent back on `return_channel`.
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
fn try_post(&self, message: M, return_channel: oneshot::Sender<M>) -> Result<(), SendError<M>>;
// TODO: Consider making return channel optional to indicate that the log can be dropped.
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>);

/// Enqueue a message to be sent on the channel. The caller is expected to monitor
/// the channel status for failures.
/// Enqueue a message to be sent on the channel.
fn post(&self, message: M) {
// We ignore errors here because the caller is meant to monitor the channel's
// status, rather than rely on this function to report errors.
let _ignore = self.try_post(message, oneshot::channel().0);
self.try_post(message, oneshot::channel().0);
}

/// Send a message synchronously, returning when the messsage has
/// been delivered to the remote end of the channel.
async fn send(&self, message: M) -> Result<(), SendError<M>> {
let (tx, rx) = oneshot::channel();
self.try_post(message, tx)?;
self.try_post(message, tx);
match rx.await {
// Channel was closed; the message was not delivered.
Ok(m) => Err(SendError(ChannelError::Closed, m)),
Ok(err) => Err(err),

// Channel was dropped; the message was successfully enqueued
// on the remote end of the channel.
Expand Down Expand Up @@ -179,14 +175,12 @@ impl<M: RemoteMessage> MpscTx<M> {

#[async_trait]
impl<M: RemoteMessage> Tx<M> for MpscTx<M> {
fn try_post(
&self,
message: M,
_return_channel: oneshot::Sender<M>,
) -> Result<(), SendError<M>> {
self.tx
.send(message)
.map_err(|mpsc::error::SendError(message)| SendError(ChannelError::Closed, message))
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
if let Err(mpsc::error::SendError(message)) = self.tx.send(message) {
if let Err(m) = return_channel.send(SendError(ChannelError::Closed, message)) {
tracing::warn!("failed to deliver SendError: {}", m);
}
}
}

fn addr(&self) -> ChannelAddr {
Expand Down Expand Up @@ -749,7 +743,7 @@ enum ChannelTxKind<M: RemoteMessage> {

#[async_trait]
impl<M: RemoteMessage> Tx<M> for ChannelTx<M> {
fn try_post(&self, message: M, return_channel: oneshot::Sender<M>) -> Result<(), SendError<M>> {
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
match &self.inner {
ChannelTxKind::Local(tx) => tx.try_post(message, return_channel),
ChannelTxKind::Tcp(tx) => tx.try_post(message, return_channel),
Expand Down Expand Up @@ -1054,7 +1048,7 @@ mod tests {
let addr = listen_addr.clone();
sends.spawn(async move {
let tx = dial::<u64>(addr).unwrap();
tx.try_post(message, oneshot::channel().0).unwrap();
tx.post(message);
});
}

Expand Down Expand Up @@ -1089,7 +1083,7 @@ mod tests {
let (listen_addr, rx) = crate::channel::serve::<u64>(addr).unwrap();

let tx = dial::<u64>(listen_addr).unwrap();
tx.try_post(123, oneshot::channel().0).unwrap();
tx.post(123);
drop(rx);

// New transmits should fail... but there is buffering, etc.,
Expand All @@ -1099,12 +1093,15 @@ mod tests {
let start = RealClock.now();

let result = loop {
let result = tx.try_post(123, oneshot::channel().0);
if result.is_err() || start.elapsed() > Duration::from_secs(10) {
let (return_tx, return_rx) = oneshot::channel();
tx.try_post(123, return_tx);
let result = return_rx.await;

if result.is_ok() || start.elapsed() > Duration::from_secs(10) {
break result;
}
};
assert_matches!(result, Err(SendError(ChannelError::Closed, 123)));
assert_matches!(result, Ok(SendError(ChannelError::Closed, 123)));
}
}

Expand Down Expand Up @@ -1137,7 +1134,7 @@ mod tests {
for addr in addrs() {
let (listen_addr, mut rx) = crate::channel::serve::<i32>(addr).unwrap();
let tx = crate::channel::dial(listen_addr).unwrap();
tx.try_post(123, oneshot::channel().0).unwrap();
tx.post(123);
assert_eq!(rx.recv().await.unwrap(), 123);
}
}
Expand Down
34 changes: 18 additions & 16 deletions hyperactor/src/channel/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,21 @@ pub struct LocalTx<M: RemoteMessage> {

#[async_trait]
impl<M: RemoteMessage> Tx<M> for LocalTx<M> {
fn try_post(
&self,
message: M,
_return_channel: oneshot::Sender<M>,
) -> Result<(), SendError<M>> {
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
let data: Data = match bincode::serialize(&message) {
Ok(data) => data,
Err(err) => return Err(SendError(err.into(), message)),
Err(err) => {
if let Err(m) = return_channel.send(SendError(err.into(), message)) {
tracing::warn!("failed to deliver SendError: {}", m);
}
return;
}
};
self.tx
.send(data)
.map_err(|_| SendError(ChannelError::Closed, message))
if self.tx.send(data).is_err() {
if let Err(m) = return_channel.send(SendError(ChannelError::Closed, message)) {
tracing::warn!("failed to deliver SendError: {}", m);
}
}
}

fn addr(&self) -> ChannelAddr {
Expand Down Expand Up @@ -167,7 +170,7 @@ mod tests {
async fn test_local_basic() {
let (tx, mut rx) = local::new::<u64>();

tx.try_post(123, unused_return_channel()).unwrap();
tx.try_post(123, unused_return_channel());
assert_eq!(rx.recv().await.unwrap(), 123);
}

Expand All @@ -178,23 +181,22 @@ mod tests {

let tx = local::dial::<u64>(port).unwrap();

tx.try_post(123, unused_return_channel()).unwrap();
tx.try_post(123, unused_return_channel());
assert_eq!(rx.recv().await.unwrap(), 123);

drop(rx);

assert_matches!(
tx.try_post(123, unused_return_channel()),
Err(SendError(ChannelError::Closed, 123))
);
let (return_tx, return_rx) = oneshot::channel();
tx.try_post(123, return_tx);
assert_matches!(return_rx.await, Ok(SendError(ChannelError::Closed, 123)));
}

#[tokio::test]
async fn test_local_drop() {
let (port, mut rx) = local::serve::<u64>();
let tx = local::dial::<u64>(port).unwrap();

tx.try_post(123, unused_return_channel()).unwrap();
tx.try_post(123, unused_return_channel());
assert_eq!(rx.recv().await.unwrap(), 123);

drop(rx);
Expand Down
Loading
Loading