Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add enumerate combinator to Stream #832

Merged
merged 3 commits into from
Jan 24, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ macro_rules! if_runtime {
)*)
}

#[cfg_attr(feature = "rt-full", macro_use)]
#[macro_use]
extern crate futures;

#[cfg(feature = "io")]
Expand Down
80 changes: 80 additions & 0 deletions src/util/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use futures::{Async, Poll, Stream, Sink, StartSend};

/// A stream combinator which combines the yields the current item
/// plus its count starting from 0.
///
/// This structure is produced by the `Stream::enumerate` method.
#[derive(Debug)]
#[must_use = "Does nothing unless polled"]
pub struct Enumerate<T> {
inner: T,
count: usize,
}

impl<T> Enumerate<T> {
pub(crate) fn new(stream: T) -> Self {
Self { inner: stream, count: 0 }
}

/// Acquires a reference to the underlying stream that this combinator is
/// pulling from.
pub fn get_ref(&self) -> &T {
&self.inner
}

/// Acquires a mutable reference to the underlying stream that this
/// combinator is pulling from.
///
/// Note that care must be taken to avoid tampering with the state of the
/// stream which may otherwise confuse this combinator.
pub fn get_mut(&mut self) -> &mut T {
&mut self.inner
}

/// Consumes this combinator, returning the underlying stream.
///
/// Note that this may discard intermediate state of this combinator, so
/// care should be taken to avoid losing resources when this is called.
pub fn into_inner(self) -> T {
self.inner
}
}

impl<T> Stream for Enumerate<T>
where
T: Stream,
{
type Item = (usize, T::Item);
type Error = T::Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, T::Error> {
match try_ready!(self.inner.poll()) {
Some(item) => {
let ret = Some((self.count, item));
self.count += 1;
LucioFranco marked this conversation as resolved.
Show resolved Hide resolved
Ok(Async::Ready(ret))
}
None => return Ok(Async::Ready(None)),
}
}
}

// Forwarding impl of Sink from the underlying stream
impl<T> Sink for Enumerate<T>
where T: Sink
{
type SinkItem = T::SinkItem;
type SinkError = T::SinkError;

fn start_send(&mut self, item: T::SinkItem) -> StartSend<T::SinkItem, T::SinkError> {
self.inner.start_send(item)
}

fn poll_complete(&mut self) -> Poll<(), T::SinkError> {
self.inner.poll_complete()
}

fn close(&mut self) -> Poll<(), T::SinkError> {
self.inner.close()
}
}
1 change: 1 addition & 0 deletions src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

mod future;
mod stream;
mod enumerate;

pub use self::future::FutureExt;
pub use self::stream::StreamExt;
20 changes: 19 additions & 1 deletion src/util/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use futures::Stream;

#[cfg(feature = "timer")]
use std::time::Duration;

pub use util::enumerate::Enumerate;

/// An extension trait for `Stream` that provides a variety of convenient
/// combinator functions.
Expand Down Expand Up @@ -92,6 +92,24 @@ pub trait StreamExt: Stream {
Throttle::new(self, duration)
}

/// Creates a new stream which gives the current iteration count as well
/// as the next value.
///
/// The stream returned yields pairs `(i, val)`, where `i` is the
/// current index of iteration and `val` is the value returned by the
/// iterator.
///
/// # Overflow Behavior
///
/// The method does no guarding against overflows, so counting elements of
/// an iterator with more than [`std::usize::MAX`] elements either produces the
/// wrong result or panics.
fn enumerate(self) -> Enumerate<Self>
where Self: Sized,
{
Enumerate::new(self)
}

/// Creates a new stream which allows `self` until `timeout`.
///
/// This combinator creates a new stream which wraps the receiving stream
Expand Down
31 changes: 31 additions & 0 deletions tokio-timer/tests/enumerate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
extern crate futures;
extern crate tokio;
extern crate tokio_executor;
extern crate tokio_timer;

#[macro_use]
mod support;

use futures::prelude::*;
use futures::sync::mpsc;
use tokio::util::StreamExt;

#[test]
fn enumerate() {
use futures::*;

let (mut tx, rx) = mpsc::channel(1);

std::thread::spawn(|| {
for i in 0..5 {
tx = tx.send(i * 2).wait().unwrap();
}
});

let mut result = rx.enumerate().collect();
assert_eq!(
result.wait(),
Ok(vec![(0, 0), (1, 2), (2, 4), (3, 6), (4, 8)])
);

}