Skip to content

Commit

Permalink
Change .buffered() to be in-order
Browse files Browse the repository at this point in the history
  • Loading branch information
alexcrichton committed Aug 11, 2016
1 parent 93ce3f0 commit 9804cec
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 49 deletions.
118 changes: 72 additions & 46 deletions src/stream/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
use {Task, IntoFuture, Poll};
use std::mem;

use {Task, IntoFuture, Poll, Future};
use stream::{Stream, Fuse};
use util::Collapsed;

/// An adaptor for a stream of futures to execute the futures concurrently, if
/// possible.
///
/// This adaptor will buffer up a list of pending futures, and then return their
/// results in the order that they're finished. This is created by the
/// `Stream::buffered` method.
/// results in the order that they were pulled out of the original stream. This
/// is created by the `Stream::buffered` method.
pub struct Buffered<S>
where S: Stream,
S::Item: IntoFuture,
{
stream: Fuse<S>,
futures: Vec<Option<Collapsed<<S::Item as IntoFuture>::Future>>>,
futures: Vec<State<<S::Item as IntoFuture>::Future>>,
cur: usize,
}

enum State<S: Future> {
Empty,
Running(Collapsed<S>),
Finished(Result<S::Item, S::Error>),
}

pub fn new<S>(s: S, amt: usize) -> Buffered<S>
Expand All @@ -22,7 +31,8 @@ pub fn new<S>(s: S, amt: usize) -> Buffered<S>
{
Buffered {
stream: super::fuse::new(s),
futures: (0..amt).map(|_| None).collect(),
futures: (0..amt).map(|_| State::Empty).collect(),
cur: 0,
}
}

Expand All @@ -34,66 +44,82 @@ impl<S> Stream for Buffered<S>
type Error = <S as Stream>::Error;

fn poll(&mut self, task: &mut Task) -> Poll<Option<Self::Item>, Self::Error> {
let mut any_some = false;
for f in self.futures.iter_mut() {
// First, if this slot is empty, try to fill it in. If we fill it in
// we're careful to use TOKENS_ALL for the next poll() below.
if f.is_none() {
// First, try to fill in all the futures
for i in 0..self.futures.len() {
let mut idx = self.cur + i;
if idx >= self.futures.len() {
idx -= self.futures.len();
}

if let State::Empty = self.futures[idx] {
match self.stream.poll(task) {
Poll::Ok(Some(e)) => {
*f = Some(Collapsed::Start(e.into_future()));
Poll::Ok(Some(future)) => {
let future = Collapsed::Start(future.into_future());
self.futures[idx] = State::Running(future);
}
Poll::Ok(None) => break,
Poll::Err(e) => return Poll::Err(e),
Poll::Ok(None) |
Poll::NotReady => continue,
Poll::NotReady => break,
}
}
}

// If we're here then our slot is full, so we unwrap it and poll it.
let ret = {
let future = f.as_mut().unwrap();
match future.poll(task) {
Poll::Ok(e) => Poll::Ok(Some(e)),
Poll::Err(e) => Poll::Err(e),

// TODO: should this happen here or elsewhere?
Poll::NotReady => {
future.collapse();
any_some = true;
continue
// Next, try and step all the futures forward
for future in self.futures.iter_mut() {
let result = match *future {
State::Running(ref mut s) => {
match s.poll(task) {
Poll::Ok(e) => Ok(e),
Poll::Err(e) => Err(e),
Poll::NotReady => {
s.collapse();
return Poll::NotReady
}
}
}
_ => continue,
};
*future = State::Finished(result);
}

// Ok, that future is done, so we chuck it out and return its value.
// Next time we're poll()'d it'll get filled in again.
*f = None;
return ret
// Check to see if our current future is done.
if let State::Finished(_) = self.futures[self.cur] {
let r = match mem::replace(&mut self.futures[self.cur], State::Empty) {
State::Finished(r) => r,
_ => panic!(),
};
self.cur += 1;
if self.cur >= self.futures.len() {
self.cur = 0;
}
return r.map(Some).into()
}

if any_some || !self.stream.is_done() {
Poll::NotReady
} else {
Poll::Ok(None)
if self.stream.is_done() {
if let State::Empty = self.futures[self.cur] {
return Poll::Ok(None)
}
}
Poll::NotReady
}

fn schedule(&mut self, task: &mut Task) {
let mut any_none = false;
// Primarily we're interested in all our pending futures, so schedule a
// callback on all of them.
for f in self.futures.iter_mut() {
match *f {
Some(ref mut f) => f.schedule(task),
None => any_none = true,
// If we've got an empty slot, then we're immediately ready to go.
for slot in self.futures.iter() {
if let State::Empty = *slot {
return task.notify()
}
}

// If any slot was None, then we're also interested in the stream, but
// if all slots were taken we're not actually interested in the stream.
if any_none {
self.stream.schedule(task);
// If the current slot is ready, we're ready to go
if let State::Finished(_) = self.futures[self.cur] {
return task.notify()
}

for slot in self.futures.iter_mut() {
if let State::Running(ref mut s) = *slot {
s.schedule(task);
}
}
}
}

4 changes: 2 additions & 2 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,8 +626,8 @@ pub trait Stream: Send + 'static {
/// An adaptor for creating a buffered list of pending futures.
///
/// If this stream's item can be converted into a future, then this adaptor
/// will buffer up to `amt` futures and then return results in the order
/// that the futures are completed. No more than `amt` futures will be
/// will buffer up to `amt` futures and then return results in the same
/// order as the underlying stream. No more than `amt` futures will be
/// buffered at any point in time, and less than `amt` may also be buffered
/// depending on the state of each future.
///
Expand Down
2 changes: 1 addition & 1 deletion tests/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,10 @@ fn buffered() {
let mut rx = rx.buffered(2);
sassert_empty(&mut rx);
c.complete(3);
sassert_next(&mut rx, 3);
sassert_empty(&mut rx);
a.complete(5);
sassert_next(&mut rx, 5);
sassert_next(&mut rx, 3);
sassert_done(&mut rx);

let (tx, rx) = channel::<_, u32>();
Expand Down

0 comments on commit 9804cec

Please sign in to comment.