feat(rsync_io): ChannelReader/Writer AsyncRead/AsyncWrite adapters (#1797)#4271
Merged
Conversation
…1797) Bridge `tokio::sync::mpsc` channel pairs to `tokio::io::AsyncRead` and `AsyncWrite` so async transports (for example, the in-flight async SSH transport in #1796) can be composed with traits that expect raw async streams. The new `channel_adapter` module is gated behind a new `async-ssh` feature so default builds are unaffected. `pair(capacity)` builds two cross-connected halves for a fully in-memory duplex stream. Unit tests cover round-trip, oversized-chunk re-draining, backpressure, EOF on writer drop, EOF on explicit shutdown, broken-pipe surfacing on both writer-side and reader-side closes, and bidirectional `pair()` exchange.
8 tasks
oferchen
added a commit
that referenced
this pull request
May 18, 2026
…1797) (#4271) Bridge `tokio::sync::mpsc` channel pairs to `tokio::io::AsyncRead` and `AsyncWrite` so async transports (for example, the in-flight async SSH transport in #1796) can be composed with traits that expect raw async streams. The new `channel_adapter` module is gated behind a new `async-ssh` feature so default builds are unaffected. `pair(capacity)` builds two cross-connected halves for a fully in-memory duplex stream. Unit tests cover round-trip, oversized-chunk re-draining, backpressure, EOF on writer drop, EOF on explicit shutdown, broken-pipe surfacing on both writer-side and reader-side closes, and bidirectional `pair()` exchange.
oferchen
added a commit
that referenced
this pull request
May 18, 2026
…1797) (#4271) Bridge `tokio::sync::mpsc` channel pairs to `tokio::io::AsyncRead` and `AsyncWrite` so async transports (for example, the in-flight async SSH transport in #1796) can be composed with traits that expect raw async streams. The new `channel_adapter` module is gated behind a new `async-ssh` feature so default builds are unaffected. `pair(capacity)` builds two cross-connected halves for a fully in-memory duplex stream. Unit tests cover round-trip, oversized-chunk re-draining, backpressure, EOF on writer drop, EOF on explicit shutdown, broken-pipe surfacing on both writer-side and reader-side closes, and bidirectional `pair()` exchange.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
ChannelReaderandChannelWriterin a newcrates/rsync_io/src/channel_adapter.rsmodule that bridgetokio::sync::mpsc::{Receiver, Sender}<Vec<u8>>totokio::io::AsyncRead/AsyncWrite.async-sshfeature onrsync_io. The module is fully gated behind this feature, so default builds are unchanged.pair(capacity)returning two cross-connected(ChannelReader, ChannelWriter)halves for in-memory duplex streams.Why
Companion to the in-flight async SSH transport (task #1796). The adapters give the async transport a way to expose its stdio over an in-process duplex channel while still satisfying APIs that require raw async stream traits.
Implementation notes
ChannelReaderdrains a one-timebuffered: Vec<u8>slice before pulling the next chunk from the channel, so oversized chunks are served across multiplepoll_readcalls without dropping bytes.ChannelWriterfirst triestry_send, then falls back toreserve_owned(). The reservation future is stored on the writer so the channel's wait-queue registration survives acrosspoll_writecalls and the task is woken when capacity frees up.poll_flushis a no-op (the channel itself provides ordering).poll_shutdowndrops the sender, surfacing EOF on the read side.io::ErrorKind::BrokenPipe.Vec<u8>rather thanbytes::Bytessincebytesis not a workspace dependency.Test plan
cargo fmt --allcleanasync-sshfeature is enabledUnit tests (gated
cfg(test)under theasync-sshfeature)round_trip_bytes- write then read N bytes.oversized_chunk_is_drained_across_reads- reader serves an over-large chunk across multiplepoll_readcalls.backpressure_blocks_until_reader_drains- bounded capacity 1 forces the second write to suspend until the reader pulls the first message.dropping_writer_yields_eof- dropping the writer gives readerOk(0).shutdown_yields_eof- explicitshutdown()after a write still delivers buffered data, then EOF.write_after_shutdown_is_broken_pipe- subsequent writes after shutdown returnBrokenPipe.write_to_closed_reader_is_broken_pipe- dropping the receiver surfacesBrokenPipeto the writer.duplex_pair_exchanges_data_both_ways-pair()gives a working bidirectional in-memory stream.