Skip to content
42 changes: 35 additions & 7 deletions rust/sbp/src/de.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::{

use bytes::{Buf, BytesMut};
use dencode::FramedRead;
use futures::StreamExt;

use crate::{wire_format, Sbp, CRC_LEN, HEADER_LEN, MAX_FRAME_LEN, PAYLOAD_INDEX, PREAMBLE};

Expand Down Expand Up @@ -44,12 +45,26 @@ pub fn iter_messages<R: io::Read>(input: R) -> impl Iterator<Item = Result<Sbp,
Decoder::new(input)
}

/// Deserialize IO stream into iterator of raw frames
pub fn iter_frames<R: io::Read>(input: R) -> impl Iterator<Item = Result<Frame, Error>> {
Framer::new(input)
}

/// Deserialize the IO stream into an iterator of messages. Provide a timeout
/// for the maximum allowed duration without a successful message.
pub fn iter_messages_with_timeout<R: io::Read>(
input: R,
timeout_duration: Duration,
) -> impl Iterator<Item = Result<Sbp, Error>> {
TimeoutDecoder::new(input, timeout_duration).map(|res| res.and_then(|frame| frame.to_sbp()))
}

/// Deserialize the IO stream into an iterator of frames. Provide a timeout
/// for the maximum allowed duration without a successful message.
pub fn iter_frames_with_timeout<R: io::Read>(
input: R,
timeout_duration: Duration,
) -> impl Iterator<Item = Result<Frame, Error>> {
TimeoutDecoder::new(input, timeout_duration)
}

Expand All @@ -66,6 +81,22 @@ pub fn stream_messages_with_timeout<R: futures::AsyncRead + Unpin>(
input: R,
timeout_duration: Duration,
) -> impl futures::Stream<Item = Result<Sbp, Error>> {
TimeoutDecoder::new(input, timeout_duration).map(|res| res.and_then(|frame| frame.to_sbp()))
}

/// Deserialize the async IO stream into stream of frames
#[cfg(feature = "async")]
pub fn stream_frames<R: futures::AsyncRead + Unpin>(
input: R,
) -> impl futures::Stream<Item = Result<Frame, Error>> {
Framer::new(input)
}

#[cfg(feature = "async")]
pub fn stream_frames_with_timeout<R: futures::AsyncRead + Unpin>(
input: R,
timeout_duration: Duration,
) -> impl futures::Stream<Item = Result<Frame, Error>> {
TimeoutDecoder::new(input, timeout_duration)
}

Expand Down Expand Up @@ -326,26 +357,23 @@ impl<R> TimeoutDecoder<R> {
}

impl<R: io::Read> Iterator for TimeoutDecoder<R> {
type Item = Result<Sbp, Error>;
type Item = Result<Frame, Error>;

fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|f| match f {
Ok(frame) => frame.to_sbp(),
Err(err) => Err(err),
})
self.0.next()
}
}

#[cfg(feature = "async")]
impl<R: futures::AsyncRead + Unpin> futures::Stream for TimeoutDecoder<R> {
type Item = Result<Sbp, Error>;
type Item = Result<Frame, Error>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match futures::ready!(std::pin::Pin::new(&mut self.0).poll_next(cx)) {
Some(Ok(frame)) => std::task::Poll::Ready(Some(frame.to_sbp())),
Some(Ok(frame)) => std::task::Poll::Ready(Some(Ok(frame))),
Some(Err(e)) => std::task::Poll::Ready(Some(Err(e))),
None => std::task::Poll::Ready(None),
}
Expand Down