Skip to content

Commit

Permalink
Merge pull request rust-lang#310 from troplin/take_while
Browse files Browse the repository at this point in the history
Stream::take_while Implementation
  • Loading branch information
alexcrichton committed Dec 28, 2016
2 parents 3fa57f9 + bb01d81 commit 7f853ff
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 0 deletions.
16 changes: 16 additions & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ mod select;
mod skip;
mod skip_while;
mod take;
mod take_while;
mod then;
mod unfold;
mod zip;
Expand All @@ -58,6 +59,7 @@ pub use self::select::Select;
pub use self::skip::Skip;
pub use self::skip_while::SkipWhile;
pub use self::take::Take;
pub use self::take_while::TakeWhile;
pub use self::then::Then;
pub use self::unfold::{Unfold, unfold};
pub use self::zip::Zip;
Expand Down Expand Up @@ -582,6 +584,20 @@ pub trait Stream {
skip_while::new(self, pred)
}

/// Take elements from this stream while the predicate provided resolves to
/// `true`.
///
/// This function, like `Iterator::take_while`, will take elements from the
/// stream until the `predicate` resolves to `false`. Once one element
/// returns false it will always return that the stream is done.
fn take_while<P, R>(self, pred: P) -> TakeWhile<Self, P, R>
where P: FnMut(&Self::Item) -> R,
R: IntoFuture<Item=bool, Error=Self::Error>,
Self: Sized
{
take_while::new(self, pred)
}

/// Runs this stream to completion, executing the provided closure for each
/// element on the stream.
///
Expand Down
83 changes: 83 additions & 0 deletions src/stream/take_while.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use {Async, Poll, IntoFuture, Future};
use stream::Stream;

/// A stream combinator which takes elements from a stream while a predicate
/// holds.
///
/// This structure is produced by the `Stream::take_while` method.
#[must_use = "streams do nothing unless polled"]
pub struct TakeWhile<S, P, R> where S: Stream, R: IntoFuture {
stream: S,
pred: P,
pending: Option<(R::Future, S::Item)>,
done_taking: bool,
}

pub fn new<S, P, R>(s: S, p: P) -> TakeWhile<S, P, R>
where S: Stream,
P: FnMut(&S::Item) -> R,
R: IntoFuture<Item=bool, Error=S::Error>,
{
TakeWhile {
stream: s,
pred: p,
pending: None,
done_taking: false,
}
}

// Forwarding impl of Sink from the underlying stream
impl<S, P, R> ::sink::Sink for TakeWhile<S, P, R>
where S: ::sink::Sink + Stream, R: IntoFuture
{
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;

fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
self.stream.start_send(item)
}

fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
self.stream.poll_complete()
}
}

impl<S, P, R> Stream for TakeWhile<S, P, R>
where S: Stream,
P: FnMut(&S::Item) -> R,
R: IntoFuture<Item=bool, Error=S::Error>,
{
type Item = S::Item;
type Error = S::Error;

fn poll(&mut self) -> Poll<Option<S::Item>, S::Error> {
if self.done_taking {
return Ok(Async::Ready(None));
}

if self.pending.is_none() {
let item = match try_ready!(self.stream.poll()) {
Some(e) => e,
None => return Ok(Async::Ready(None)),
};
self.pending = Some(((self.pred)(&item).into_future(), item));
}

assert!(self.pending.is_some());
match self.pending.as_mut().unwrap().0.poll() {
Ok(Async::Ready(true)) => {
let (_, item) = self.pending.take().unwrap();
Ok(Async::Ready(Some(item)))
},
Ok(Async::Ready(false)) => {
self.done_taking = true;
Ok(Async::Ready(None))
}
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(e) => {
self.pending = None;
Err(e)
}
}
}
}
6 changes: 6 additions & 0 deletions tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,12 @@ fn take() {
assert_done(|| list().take(2).collect(), Ok(vec![1, 2]));
}

#[test]
fn take_while() {
assert_done(|| list().take_while(|e| Ok(*e < 3)).collect(),
Ok(vec![1, 2]));
}

#[test]
fn take_passes_errors_through() {
let mut s = iter(vec![Err(1), Err(2), Ok(3), Ok(4), Err(4)])
Expand Down

0 comments on commit 7f853ff

Please sign in to comment.