Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: makes Framed and FramedStream resumable after eof #3272

Merged
merged 19 commits into from
Mar 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1bd6881
util: let FramedRead recover from temporary EOF
somethingelseentirely Dec 11, 2020
30ef443
util: adds documentation for Framed eof behaviour
somethingelseentirely Dec 14, 2020
0c208f0
util: makes framed_impl respect clippy styleguide
somethingelseentirely Dec 14, 2020
52fe7a3
Merge branch 'master' into framedread_resumable
somethingelseentirely Dec 16, 2020
23b2795
util: fix bad framed test due to bad trait import
somethingelseentirely Dec 17, 2020
a827e90
util: adds read state machine docs to framed_impl
somethingelseentirely Dec 27, 2020
e87fc43
util: improve documentation to frame decoding
somethingelseentirely Dec 27, 2020
996c8d8
util: fix trailing whitespace in framed_impl
somethingelseentirely Dec 28, 2020
0fd9c55
util: improve documentation for framed poll_next
somethingelseentirely Dec 29, 2020
1175a4d
util: improved documentation on eof framed behaviour
somethingelseentirely Dec 30, 2020
6e5bce6
util: improve framed decoder documentation
somethingelseentirely Dec 30, 2020
ca26406
util: make framed documentation more terse
somethingelseentirely Jan 3, 2021
aa23631
util: address review issues
somethingelseentirely Jan 3, 2021
7946e13
util: makes EOF consistently uppercase
somethingelseentirely Jan 3, 2021
2ffca2a
Merge branch 'master' into framedread_resumable
somethingelseentirely Jan 10, 2021
fa9b983
improve docs for framed
somethingelseentirely Jan 13, 2021
be70485
fix typo and improve grammar
somethingelseentirely Jan 13, 2021
416f34e
Add test
Darksonn Jan 20, 2021
6e8f8ab
Merge branch 'master' into framedread_resumable
Darksonn Mar 21, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion tokio-util/src/codec/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ use std::io;
/// implementing stateful streaming parsers. In many cases, though, this type
/// will simply be a unit struct (e.g. `struct HttpDecoder`).
///
/// For some underlying data-sources, namely files and FIFOs,
/// it's possible to temporarily read 0 bytes by reaching EOF.
///
/// In these cases `decode_eof` will be called until it signals
/// fullfillment of all closing frames by returning `Ok(None)`.
/// After that, repeated attempts to read from the [`Framed`] or [`FramedRead`]
/// will not invoke `decode` or `decode_eof` again, until data can be read
/// during a retry.
///
/// It is up to the Decoder to keep track of a restart after an EOF,
/// and to decide how to handle such an event by, for example,
/// allowing frames to cross EOF boundaries, re-emitting opening frames, or
/// reseting the entire internal state.
///
/// [`Framed`]: crate::codec::Framed
/// [`FramedRead`]: crate::codec::FramedRead
pub trait Decoder {
Expand Down Expand Up @@ -115,13 +129,18 @@ pub trait Decoder {
/// This method defaults to calling `decode` and returns an error if
/// `Ok(None)` is returned while there is unconsumed data in `buf`.
/// Typically this doesn't need to be implemented unless the framing
/// protocol differs near the end of the stream.
/// protocol differs near the end of the stream, or if you need to construct
/// frames _across_ eof boundaries on sources that can be resumed.
///
/// Note that the `buf` argument may be empty. If a previous call to
/// `decode_eof` consumed all the bytes in the buffer, `decode_eof` will be
/// called again until it returns `None`, indicating that there are no more
/// frames to yield. This behavior enables returning finalization frames
/// that may not be based on inbound data.
///
/// Once `None` has been returned, `decode_eof` won't be called again until
/// an attempt to resume the stream has been made, where the underlying stream
/// actually returned more data.
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(buf)? {
Some(frame) => Ok(Some(frame)),
Expand Down
5 changes: 5 additions & 0 deletions tokio-util/src/codec/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ where
/// calling [`split`] on the `Framed` returned by this method, which will
/// break them into separate objects, allowing them to interact more easily.
///
/// Note that, for some byte sources, the stream can be resumed after an EOF
/// by reading from it, even after it has returned `None`. Repeated attempts
/// to do so, without new data available, continue to return `None` without
/// creating more (closing) frames.
///
/// [`Stream`]: futures_core::Stream
/// [`Sink`]: futures_sink::Sink
/// [`Decode`]: crate::codec::Decoder
Expand Down
77 changes: 66 additions & 11 deletions tokio-util/src/codec/framed_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,42 +120,97 @@ where

let mut pinned = self.project();
let state: &mut ReadFrame = pinned.state.borrow_mut();
// The following loops implements a state machine with each state corresponding
// to a combination of the `is_readable` and `eof` flags. States persist across
// loop entries and most state transitions occur with a return.
//
// The intitial state is `reading`.
//
// | state | eof | is_readable |
// |---------|-------|-------------|
// | reading | false | false |
// | framing | false | true |
// | pausing | true | true |
// | paused | true | false |
//
// `decode_eof`
// returns `Some` read 0 bytes
// │ │ │ │
// │ ▼ │ ▼
// ┌───────┐ `decode_eof` ┌──────┐
// ┌──read 0 bytes──▶│pausing│─returns `None`─▶│paused│──┐
// │ └───────┘ └──────┘ │
// pending read┐ │ ┌──────┐ │ ▲ │
// │ │ │ │ │ │ │ │
// │ ▼ │ │ `decode` returns `Some`│ pending read
// │ ╔═══════╗ ┌───────┐◀─┘ │
// └──║reading║─read n>0 bytes─▶│framing│ │
// ╚═══════╝ └───────┘◀──────read n>0 bytes┘
// ▲ │
// │ │
// └─`decode` returns `None`─┘
loop {
// Repeatedly call `decode` or `decode_eof` as long as it is
// "readable". Readable is defined as not having returned `None`. If
// the upstream has returned EOF, and the decoder is no longer
// readable, it can be assumed that the decoder will never become
// readable again, at which point the stream is terminated.
// Repeatedly call `decode` or `decode_eof` while the buffer is "readable",
// i.e. it _might_ contain data consumable as a frame or closing frame.
// Both signal that there is no such data by returning `None`.
//
// If `decode` couldn't read a frame and the upstream source has returned eof,
// `decode_eof` will attemp to decode the remaining bytes as closing frames.
//
// If the underlying AsyncRead is resumable, we may continue after an EOF,
// but must finish emmiting all of it's associated `decode_eof` frames.
// Furthermore, we don't want to emit any `decode_eof` frames on retried
// reads after an EOF unless we've actually read more data.
if state.is_readable {
// pausing or framing
if state.eof {
// pausing
let frame = pinned.codec.decode_eof(&mut state.buffer)?;
if frame.is_none() {
state.is_readable = false; // prepare pausing -> paused
}
// implicit pausing -> pausing or pausing -> paused
return Poll::Ready(frame.map(Ok));
}

// framing
trace!("attempting to decode a frame");

if let Some(frame) = pinned.codec.decode(&mut state.buffer)? {
trace!("frame decoded from buffer");
// implicit framing -> framing
return Poll::Ready(Some(Ok(frame)));
}

// framing -> reading
state.is_readable = false;
}

assert!(!state.eof);

// Otherwise, try to read more data and try again. Make sure we've
// got room for at least one byte to read to ensure that we don't
// get a spurious 0 that looks like EOF
// reading or paused
// If we can't build a frame yet, try to read more data and try again.
// Make sure we've got room for at least one byte to read to ensure
// that we don't get a spurious 0 that looks like EOF.
state.buffer.reserve(1);
let bytect = match poll_read_buf(pinned.inner.as_mut(), cx, &mut state.buffer)? {
Poll::Ready(ct) => ct,
// implicit reading -> reading or implicit paused -> paused
Poll::Pending => return Poll::Pending,
};
if bytect == 0 {
if state.eof {
// We're already at an EOF, and since we've reached this path
// we're also not readable. This implies that we've already finished
// our `decode_eof` handling, so we can simply return `None`.
// implicit paused -> paused
return Poll::Ready(None);
}
// prepare reading -> paused
state.eof = true;
} else {
// prepare paused -> framing or noop reading -> framing
state.eof = false;
}

// paused -> framing or reading -> framing or reading -> pausing
state.is_readable = true;
}
}
Expand Down
23 changes: 23 additions & 0 deletions tokio-util/tests/framed_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,29 @@ fn multi_frames_on_eof() {
});
}

#[test]
fn read_eof_then_resume() {
let mut task = task::spawn(());
let mock = mock! {
Ok(b"\x00\x00\x00\x01".to_vec()),
Ok(b"".to_vec()),
Ok(b"\x00\x00\x00\x02".to_vec()),
Ok(b"".to_vec()),
Ok(b"\x00\x00\x00\x03".to_vec()),
};
let mut framed = FramedRead::new(mock, U32Decoder);

task.enter(|cx, _| {
assert_read!(pin!(framed).poll_next(cx), 1);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
assert_read!(pin!(framed).poll_next(cx), 2);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
assert_read!(pin!(framed).poll_next(cx), 3);
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
assert!(assert_ready!(pin!(framed).poll_next(cx)).is_none());
});
}

// ===== Mock ======

struct Mock {
Expand Down