Skip to content

Commit

Permalink
Add enumerate combinator to Stream (#832)
Browse files Browse the repository at this point in the history
  • Loading branch information
zaharidichev authored and carllerche committed Jan 24, 2019
1 parent 0ec8986 commit fbad629
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -79,7 +79,7 @@ macro_rules! if_runtime {
)*)
}

#[cfg_attr(feature = "rt-full", macro_use)]
#[macro_use]
extern crate futures;

#[cfg(feature = "io")]
Expand Down
80 changes: 80 additions & 0 deletions src/util/enumerate.rs
@@ -0,0 +1,80 @@
use futures::{Async, Poll, Stream, Sink, StartSend};

/// A stream combinator which combines the yields the current item
/// plus its count starting from 0.
///
/// This structure is produced by the `Stream::enumerate` method.
#[derive(Debug)]
#[must_use = "Does nothing unless polled"]
pub struct Enumerate<T> {
inner: T,
count: usize,
}

impl<T> Enumerate<T> {
pub(crate) fn new(stream: T) -> Self {
Self { inner: stream, count: 0 }
}

/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
&self.inner
}

/// Acquires a mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}

/// Consumes this combinator, returning the underlying stream.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> T {
self.inner
}
}

impl<T> Stream for Enumerate<T>
where
T: Stream,
{
type Item = (usize, T::Item);
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, T::Error> {
match try_ready!(self.inner.poll()) {
Some(item) => {
let ret = Some((self.count, item));
self.count += 1;
Ok(Async::Ready(ret))
}
None => return Ok(Async::Ready(None)),
}
}
}

// Forwarding impl of Sink from the underlying stream
impl<T> Sink for Enumerate<T>
where T: Sink
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;

fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(item)
}

fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}

fn close(&mut self) -> Poll<(), T::SinkError> {
self.inner.close()
}
}
1 change: 1 addition & 0 deletions src/util/mod.rs
Expand Up @@ -9,6 +9,7 @@

mod future;
mod stream;
mod enumerate;

pub use self::future::FutureExt;
pub use self::stream::StreamExt;
20 changes: 19 additions & 1 deletion src/util/stream.rs
Expand Up @@ -8,7 +8,7 @@ use futures::Stream;

#[cfg(feature = "timer")]
use std::time::Duration;

pub use util::enumerate::Enumerate;

/// An extension trait for `Stream` that provides a variety of convenient
/// combinator functions.
Expand All @@ -34,6 +34,24 @@ pub trait StreamExt: Stream {
Throttle::new(self, duration)
}

/// Creates a new stream which gives the current iteration count as well
/// as the next value.
///
/// The stream returned yields pairs `(i, val)`, where `i` is the
/// current index of iteration and `val` is the value returned by the
/// iterator.
///
/// # Overflow Behavior
///
/// The method does no guarding against overflows, so counting elements of
/// an iterator with more than [`std::usize::MAX`] elements either produces the
/// wrong result or panics.
fn enumerate(self) -> Enumerate<Self>
where Self: Sized,
{
Enumerate::new(self)
}

/// Creates a new stream which allows `self` until `timeout`.
///
/// This combinator creates a new stream which wraps the receiving stream
Expand Down
31 changes: 31 additions & 0 deletions tokio-timer/tests/enumerate.rs
@@ -0,0 +1,31 @@
extern crate futures;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_timer;

#[macro_use]
mod support;

use futures::prelude::*;
use futures::sync::mpsc;
use tokio::util::StreamExt;

#[test]
fn enumerate() {
use futures::*;

let (mut tx, rx) = mpsc::channel(1);

std::thread::spawn(|| {
for i in 0..5 {
tx = tx.send(i * 2).wait().unwrap();
}
});

let mut result = rx.enumerate().collect();
assert_eq!(
result.wait(),
Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)])
);

}

0 comments on commit fbad629

Please sign in to comment.