Skip to content

Conversation

@adrian-kong
Copy link
Contributor

@adrian-kong adrian-kong commented Sep 5, 2022

Description

Raw decoder to return iterator of bytes mut instead of consuming and skipping errors

JIRA Reference

https://swift-nav.atlassian.net/browse/DEVINFRA-866

@adrian-kong adrian-kong marked this pull request as ready for review September 7, 2022 01:42
@notoriaga
Copy link
Contributor

@adrian-kong I'm almost done with a pr to add on to what you have here which might help clean things up, so for right now I'd focus on the ubx stuff

@adrian-kong adrian-kong requested review from a team and notoriaga September 15, 2022 02:03
@notoriaga
Copy link
Contributor

notoriaga commented Sep 20, 2022

normally you want to avoid bounds like R: io::Read on definitions if possible and opt for what's called "late binding" of the generic params. that way you can have new take a Read or an AsyncRead

diff --git a/rust/sbp/src/de.rs b/rust/sbp/src/de.rs
index a781bd57..2414bb81 100644
--- a/rust/sbp/src/de.rs
+++ b/rust/sbp/src/de.rs
@@ -128,7 +128,7 @@ impl std::error::Error for CrcError {}
 
 pub struct Framer<R>(FramedRead<R, FramerImpl>);
 
-impl<R: io::Read> Framer<R> {
+impl<R> Framer<R> {
     pub fn new(reader: R) -> Self {
         Self(FramedRead::new(reader, FramerImpl))
     }
@@ -142,6 +142,18 @@ impl<R: io::Read> Iterator for Framer<R> {
     }
 }
 
+#[cfg(feature = "async")]
+impl<R: futures::AsyncRead + Unpin> futures::Stream for Framer<R> {
+    type Item = Result<Frame, Error>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        std::pin::Pin::new(&mut self.0).poll_next(cx)
+    }
+}
+
 pub struct Decoder<R>(FramedRead<R, FramerImpl>);
 
 impl<R> Decoder<R> {

@notoriaga
Copy link
Contributor

notoriaga commented Sep 20, 2022

how about

diff --git a/rust/sbp/src/de.rs b/rust/sbp/src/de.rs
index a781bd57..3da68015 100644
--- a/rust/sbp/src/de.rs
+++ b/rust/sbp/src/de.rs
@@ -48,9 +48,9 @@ pub fn iter_messages<R: io::Read>(input: R) -> impl Iterator<Item = Result<Sbp,
 /// for the maximum allowed duration without a successful message.
 pub fn iter_messages_with_timeout<R: io::Read>(
     input: R,
-    timeout_duration: Duration,
+    timeout: Duration,
 ) -> impl Iterator<Item = Result<Sbp, Error>> {
-    TimeoutSbpDecoder::framed_with_timeout(input, timeout_duration)
+    TimeoutDecoder::new(input, timeout)
 }
 
 /// Deserialize the async IO stream into an stream of messages.
@@ -64,9 +64,9 @@ pub fn stream_messages<R: futures::AsyncRead + Unpin>(
 #[cfg(feature = "async")]
 pub fn stream_messages_with_timeout<R: futures::AsyncRead + Unpin>(
     input: R,
-    timeout_duration: Duration,
+    timeout: Duration,
 ) -> impl futures::Stream<Item = Result<Sbp, Error>> {
-    TimeoutSbpDecoder::framed_with_timeout(input, timeout_duration)
+    TimeoutDecoder::new(input, timeout)
 }
 
 /// All errors that can occur while reading messages.
@@ -179,6 +179,46 @@ impl<R: futures::AsyncRead + Unpin> futures::Stream for Decoder<R> {
     }
 }
 
+pub struct TimeoutDecoder<R>(FramedRead<R, TimeoutDecoderImpl<FramerImpl>>);
+
+impl<R> TimeoutDecoder<R> {
+    pub fn new(reader: R, timeout: Duration) -> Self {
+        TimeoutDecoder(FramedRead::new(
+            reader,
+            TimeoutDecoderImpl::new(timeout, FramerImpl),
+        ))
+    }
+}
+
+impl<R: io::Read> Iterator for TimeoutDecoder<R> {
+    type Item = Result<Sbp, Error>;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        let sbp = match self.0.next()? {
+            Ok(frame) => frame.to_sbp(),
+            Err(err) => return Some(Err(err)),
+        };
+        Some(sbp)
+    }
+}
+
+#[cfg(feature = "async")]
+impl<R: futures::AsyncRead + Unpin> futures::Stream for TimeoutDecoder<R> {
+    type Item = Result<Sbp, Error>;
+
+    fn poll_next(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<Option<Self::Item>> {
+        let frame = match futures::ready!(std::pin::Pin::new(&mut self.0).poll_next(cx)) {
+            Some(Ok(frame)) => frame,
+            Some(Err(e)) => return std::task::Poll::Ready(Some(Err(e.into()))),
+            None => return std::task::Poll::Ready(None),
+        };
+        std::task::Poll::Ready(Some(frame.to_sbp()))
+    }
+}
+
 #[derive(Debug, Clone, Default)]
 struct FramerImpl;
 
@@ -272,42 +312,35 @@ impl Frame {
     }
 }
 
-struct TimeoutSbpDecoder {
-    timeout_duration: Duration,
+struct TimeoutDecoderImpl<D> {
+    timeout: Duration,
     last_msg_received: Instant,
+    inner: D,
 }
 
-impl TimeoutSbpDecoder {
-    fn new(timeout_duration: Duration) -> Self {
+impl<D> TimeoutDecoderImpl<D> {
+    fn new(timeout: Duration, inner: D) -> Self {
         Self {
-            timeout_duration,
+            timeout,
             last_msg_received: Instant::now(),
+            inner,
         }
     }
-    fn framed_with_timeout<W>(
-        writer: W,
-        timeout_duration: Duration,
-    ) -> FramedRead<W, TimeoutSbpDecoder> {
-        FramedRead::new(writer, Self::new(timeout_duration))
-    }
 }
 
-impl dencode::Decoder for TimeoutSbpDecoder {
-    type Item = Sbp;
-    type Error = Error;
+impl<D: dencode::Decoder> dencode::Decoder for TimeoutDecoderImpl<D> {
+    type Item = D::Item;
+    type Error = D::Error;
 
     fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
-        if self.last_msg_received.elapsed() > self.timeout_duration {
-            return Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "timeout").into());
+        if self.last_msg_received.elapsed() > self.timeout {
+            return Err(io::Error::new(io::ErrorKind::TimedOut, "timeout").into());
         }
-        let next = match FramerImpl.decode(src)? {
-            Some(frame) => frame.to_sbp().map(Some),
-            None => return Ok(None),
-        };
-        if let Ok(Some(_)) = next {
+        let next = self.inner.decode(src)?;
+        if next.is_some() {
             self.last_msg_received = Instant::now();
         }
-        next
+        Ok(next)
     }
 }
 

It makes the timeout functionality generic (so you could have a TimeoutFramer if you wanted). Also introduces a named type for the timeout decoder

Comment on lines 192 to 199
#[doc(inline)]
pub use crate::de::Decoder;

#[doc(inline)]
pub use crate::de::Framer;

#[doc(inline)]
pub use crate::de::Frame;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can combine all these with the other pub use de::* below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that include

#[cfg(feature = "async")]
#[doc(inline)]
pub use de::{stream_messages, stream_messages_with_timeout};

so replacing all the pub use de::... to pub use de::*
or just pub use crate::de::*

i.e.

#[doc(inline)]
pub use de::{Error as DeserializeError, *};

#[cfg(feature = "async")]
#[doc(inline)]
pub use de::{stream_messages, stream_messages_with_timeout};

Copy link
Contributor

@notoriaga notoriaga left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! I might try using this in one of the projects that would benefit from this (like sbp-filter) in order to make sure the api is right/you have access to everything you need in a downstream crate. But you could also address anything in a separate pr

}

pub fn to_sbp(&self) -> Result<Sbp, Error> {
if !self.check_frame() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing an invalid frame as dummy for raw decoder, not sure if ideal

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm... seems like i broke something

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reverted and just merging previous decoder

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants