From fbad6297c5dc22a716a13c29433d89e2c75d7695 Mon Sep 17 00:00:00 2001 From: Zahari Dichev Date: Thu, 24 Jan 2019 21:50:34 +0200 Subject: [PATCH] Add enumerate combinator to Stream (#832) --- src/lib.rs | 2 +- src/util/enumerate.rs | 80 ++++++++++++++++++++++++++++++++++ src/util/mod.rs | 1 + src/util/stream.rs | 20 ++++++++- tokio-timer/tests/enumerate.rs | 31 +++++++++++++ 5 files changed, 132 insertions(+), 2 deletions(-) create mode 100644 src/util/enumerate.rs create mode 100644 tokio-timer/tests/enumerate.rs diff --git a/src/lib.rs b/src/lib.rs index 1828675f834..36071f9e9f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -79,7 +79,7 @@ macro_rules! if_runtime { )*) } -#[cfg_attr(feature = "rt-full", macro_use)] +#[macro_use] extern crate futures; #[cfg(feature = "io")] diff --git a/src/util/enumerate.rs b/src/util/enumerate.rs new file mode 100644 index 00000000000..dccbb71610a --- /dev/null +++ b/src/util/enumerate.rs @@ -0,0 +1,80 @@ +use futures::{Async, Poll, Stream, Sink, StartSend}; + +/// A stream combinator which combines the yields the current item +/// plus its count starting from 0. +/// +/// This structure is produced by the `Stream::enumerate` method. +#[derive(Debug)] +#[must_use = "Does nothing unless polled"] +pub struct Enumerate { + inner: T, + count: usize, +} + +impl Enumerate { + pub(crate) fn new(stream: T) -> Self { + Self { inner: stream, count: 0 } + } + + /// Acquires a reference to the underlying stream that this combinator is + /// pulling from. + pub fn get_ref(&self) -> &T { + &self.inner + } + + /// Acquires a mutable reference to the underlying stream that this + /// combinator is pulling from. + /// + /// Note that care must be taken to avoid tampering with the state of the + /// stream which may otherwise confuse this combinator. + pub fn get_mut(&mut self) -> &mut T { + &mut self.inner + } + + /// Consumes this combinator, returning the underlying stream. + /// + /// Note that this may discard intermediate state of this combinator, so + /// care should be taken to avoid losing resources when this is called. + pub fn into_inner(self) -> T { + self.inner + } +} + +impl Stream for Enumerate +where + T: Stream, +{ + type Item = (usize, T::Item); + type Error = T::Error; + + fn poll(&mut self) -> Poll, T::Error> { + match try_ready!(self.inner.poll()) { + Some(item) => { + let ret = Some((self.count, item)); + self.count += 1; + Ok(Async::Ready(ret)) + } + None => return Ok(Async::Ready(None)), + } + } +} + +// Forwarding impl of Sink from the underlying stream +impl Sink for Enumerate + where T: Sink +{ + type SinkItem = T::SinkItem; + type SinkError = T::SinkError; + + fn start_send(&mut self, item: T::SinkItem) -> StartSend { + self.inner.start_send(item) + } + + fn poll_complete(&mut self) -> Poll<(), T::SinkError> { + self.inner.poll_complete() + } + + fn close(&mut self) -> Poll<(), T::SinkError> { + self.inner.close() + } +} diff --git a/src/util/mod.rs b/src/util/mod.rs index 3ebd1fc7083..e26f44ca923 100644 --- a/src/util/mod.rs +++ b/src/util/mod.rs @@ -9,6 +9,7 @@ mod future; mod stream; +mod enumerate; pub use self::future::FutureExt; pub use self::stream::StreamExt; diff --git a/src/util/stream.rs b/src/util/stream.rs index 76e6c27e05e..d4e57c376a3 100644 --- a/src/util/stream.rs +++ b/src/util/stream.rs @@ -8,7 +8,7 @@ use futures::Stream; #[cfg(feature = "timer")] use std::time::Duration; - +pub use util::enumerate::Enumerate; /// An extension trait for `Stream` that provides a variety of convenient /// combinator functions. @@ -34,6 +34,24 @@ pub trait StreamExt: Stream { Throttle::new(self, duration) } + /// Creates a new stream which gives the current iteration count as well + /// as the next value. + /// + /// The stream returned yields pairs `(i, val)`, where `i` is the + /// current index of iteration and `val` is the value returned by the + /// iterator. + /// + /// # Overflow Behavior + /// + /// The method does no guarding against overflows, so counting elements of + /// an iterator with more than [`std::usize::MAX`] elements either produces the + /// wrong result or panics. + fn enumerate(self) -> Enumerate + where Self: Sized, + { + Enumerate::new(self) + } + /// Creates a new stream which allows `self` until `timeout`. /// /// This combinator creates a new stream which wraps the receiving stream diff --git a/tokio-timer/tests/enumerate.rs b/tokio-timer/tests/enumerate.rs new file mode 100644 index 00000000000..3e7ec1256f1 --- /dev/null +++ b/tokio-timer/tests/enumerate.rs @@ -0,0 +1,31 @@ +extern crate futures; +extern crate tokio; +extern crate tokio_executor; +extern crate tokio_timer; + +#[macro_use] +mod support; + +use futures::prelude::*; +use futures::sync::mpsc; +use tokio::util::StreamExt; + +#[test] +fn enumerate() { + use futures::*; + + let (mut tx, rx) = mpsc::channel(1); + + std::thread::spawn(|| { + for i in 0..5 { + tx = tx.send(i * 2).wait().unwrap(); + } + }); + + let mut result = rx.enumerate().collect(); + assert_eq!( + result.wait(), + Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)]) + ); + +}