Skip to content

Commit

Permalink
Remove Future::schedule
Browse files Browse the repository at this point in the history
A more appealing model is actually just automatically inferring what needs to be
scheduled based on what actions are done during poll. For example if during a
poll you check a oneshot channel, then the current task is registered for being
woken up if it's not ready. Similarly this will apply to I/O where if I/O is
attempted but we see EAGAIN then we'll schedule the task to get notified when
it's ready.

This may also have performance benefits in some niche situations because you
don't need to recompute where you are in the state machine both during poll and
during schedule. Instead, it now happens all at once.
  • Loading branch information
alexcrichton committed Aug 18, 2016
1 parent f672963 commit d07c397
Show file tree
Hide file tree
Showing 74 changed files with 126 additions and 665 deletions.
4 changes: 0 additions & 4 deletions futures-cpupool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,4 @@ impl<R: Send + 'static> Future for CpuFuture<R> {
Poll::NotReady => Poll::NotReady,
}
}

fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task)
}
}
8 changes: 0 additions & 8 deletions futures-io/src/buf_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,6 @@ impl<R> Stream for BufReader<R>
self.inner.poll(task)
}
}

fn schedule(&mut self, task: &mut Task) {
if self.pos < self.cap {
task.notify()
} else {
self.inner.schedule(task)
}
}
}

impl<R: ReadTask> ReadTask for BufReader<R> {
Expand Down
10 changes: 0 additions & 10 deletions futures-io/src/buf_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,6 @@ impl<A> Stream for BufWriter<A>
self.inner.poll(task)
}
}

fn schedule(&mut self, task: &mut Task) {
// Notify immediately if we have some capacity, but also ask our
// underlying stream for readiness so it'll be ready by the time that we
// need to flush.
if !self.flushing && self.buf.len() < self.buf.capacity() {
task.notify()
}
self.inner.schedule(task)
}
}

impl<W: WriteTask> WriteTask for BufWriter<W> {
Expand Down
8 changes: 0 additions & 8 deletions futures-io/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ impl<A, B> Stream for Chain<A, B>
self.b.poll(task)
}
}

fn schedule(&mut self, task: &mut Task) {
if self.first {
self.a.schedule(task)
} else {
self.b.schedule(task)
}
}
}

impl<A, B> ReadTask for Chain<A, B>
Expand Down
12 changes: 0 additions & 12 deletions futures-io/src/copy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,16 +133,4 @@ impl<R, W> Future for Copy<R, W>
}
}
}

fn schedule(&mut self, task: &mut Task) {
if self.read_ready && self.write_ready {
task.notify();
}
if !self.read_ready && !self.read_done {
self.reader.schedule(task);
}
if !self.write_ready && !self.flush_done {
self.writer.schedule(task);
}
}
}
4 changes: 0 additions & 4 deletions futures-io/src/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ impl Stream for Empty {
fn poll(&mut self, _task: &mut Task) -> Poll<Option<Ready>, io::Error> {
Poll::Ok(Some(Ready::Read))
}

fn schedule(&mut self, task: &mut Task) {
task.notify();
}
}

impl Read for Empty {
Expand Down
6 changes: 0 additions & 6 deletions futures-io/src/flush.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,5 @@ impl<A> Future for Flush<A>
Err(e) => Poll::Err(e),
}
}

fn schedule(&mut self, task: &mut Task) {
if let Some(ref mut a) = self.a {
a.schedule(task);
}
}
}

7 changes: 0 additions & 7 deletions futures-io/src/read_exact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,5 @@ impl<A, T> Future for ReadExact<A, T>
State::Empty => panic!(),
}
}

fn schedule(&mut self, task: &mut Task) {
match self.state {
State::Reading { ref mut a, .. } => a.schedule(task),
State::Empty => task.notify(),
}
}
}

4 changes: 0 additions & 4 deletions futures-io/src/read_to_end.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,4 @@ impl<A> Future for ReadToEnd<A>
Err(e) => Poll::Err(e)
}
}

fn schedule(&mut self, task: &mut Task) {
self.a.schedule(task)
}
}
4 changes: 0 additions & 4 deletions futures-io/src/ready_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,6 @@ impl<S> Stream for ReadyTracker<S>
other => other,
}
}

fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task)
}
}

fn is_wouldblock<T>(res: &io::Result<T>) -> bool {
Expand Down
4 changes: 0 additions & 4 deletions futures-io/src/repeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ impl Stream for Repeat {
fn poll(&mut self, _task: &mut Task) -> Poll<Option<Ready>, io::Error> {
Poll::Ok(Some(Ready::Read))
}

fn schedule(&mut self, task: &mut Task) {
task.notify();
}
}

impl Read for Repeat {
Expand Down
4 changes: 0 additions & 4 deletions futures-io/src/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ impl Stream for Sink {
fn poll(&mut self, _task: &mut Task) -> Poll<Option<Ready>, io::Error> {
Poll::Ok(Some(Ready::Write))
}

fn schedule(&mut self, task: &mut Task) {
task.notify()
}
}

impl Write for Sink {
Expand Down
8 changes: 0 additions & 8 deletions futures-io/src/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,6 @@ impl<A> Stream for Take<A>
self.a.poll(task)
}
}

fn schedule(&mut self, task: &mut Task) {
if self.left == 0 {
task.notify()
} else {
self.a.schedule(task)
}
}
}

impl<A> Read for Take<A>
Expand Down
33 changes: 0 additions & 33 deletions futures-io/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ impl<T: 'static> Future for TaskIoNew<T> {
fn poll(&mut self, task: &mut Task) -> Poll<TaskIo<T>, io::Error> {
self.inner.poll(task).map(|data| TaskIo { handle: data })
}

fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task)
}
}

impl<T> TaskIo<T>
Expand Down Expand Up @@ -159,23 +155,6 @@ impl<'a, 'b, T> TaskIoTake<'a, 'b, T>
}
}
}

fn schedule(&mut self, ready: Ready) {
let state = self.t.as_mut().unwrap();
if let Some(cur) = state.ready {
match (ready, cur) {
(_, Ready::ReadWrite) |
(Ready::Read, Ready::Read) |
(Ready::Write, Ready::Write) => return self.task.notify(),

(Ready::ReadWrite, Ready::Read) |
(Ready::ReadWrite, Ready::Write) |
(Ready::Read, Ready::Write) |
(Ready::Write, Ready::Read) => {}
}
}
state.object.schedule(self.task)
}
}

impl<'a, 'b, T: 'static> Drop for TaskIoTake<'a, 'b, T> {
Expand All @@ -194,10 +173,6 @@ impl<T> Stream for TaskIo<T>
fn poll(&mut self, task: &mut Task) -> Poll<Option<Ready>, io::Error> {
TaskIoTake::new(task, &self.handle).poll(Ready::ReadWrite)
}

fn schedule(&mut self, task: &mut Task) {
TaskIoTake::new(task, &self.handle).schedule(Ready::ReadWrite)
}
}

impl<T> ReadTask for TaskIo<T>
Expand Down Expand Up @@ -239,10 +214,6 @@ impl<T> Stream for TaskIoRead<T>
fn poll(&mut self, task: &mut Task) -> Poll<Option<Ready>, io::Error> {
TaskIoTake::new(task, &self.handle).poll(Ready::Read)
}

fn schedule(&mut self, task: &mut Task) {
TaskIoTake::new(task, &self.handle).schedule(Ready::Read)
}
}

impl<T> ReadTask for TaskIoRead<T>
Expand Down Expand Up @@ -270,10 +241,6 @@ impl<T> Stream for TaskIoWrite<T>
fn poll(&mut self, task: &mut Task) -> Poll<Option<Ready>, io::Error> {
TaskIoTake::new(task, &self.handle).poll(Ready::Write)
}

fn schedule(&mut self, task: &mut Task) {
TaskIoTake::new(task, &self.handle).schedule(Ready::Write)
}
}

impl<T> WriteTask for TaskIoWrite<T>
Expand Down
7 changes: 0 additions & 7 deletions futures-io/src/write_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,4 @@ impl<A, T> Future for WriteAll<A, T>
State::Empty => panic!(),
}
}

fn schedule(&mut self, task: &mut Task) {
match self.state {
State::Writing { ref mut a, .. } => a.schedule(task),
State::Empty => task.notify(),
}
}
}
99 changes: 36 additions & 63 deletions futures-mio/src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -652,12 +652,8 @@ impl Future for AddSource {
type Item = usize;
type Error = io::Error;

fn poll(&mut self, _task: &mut Task) -> Poll<usize, io::Error> {
self.inner.poll(Loop::add_source)
}

fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task, Message::AddSource)
fn poll(&mut self, task: &mut Task) -> Poll<usize, io::Error> {
self.inner.poll(task, Loop::add_source, Message::AddSource)
}
}

Expand All @@ -676,12 +672,8 @@ impl Future for AddTimeout {
type Item = TimeoutToken;
type Error = io::Error;

fn poll(&mut self, _task: &mut Task) -> Poll<TimeoutToken, io::Error> {
self.inner.poll(Loop::add_timeout)
}

fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task, Message::AddTimeout)
fn poll(&mut self, task: &mut Task) -> Poll<TimeoutToken, io::Error> {
self.inner.poll(task, Loop::add_timeout, Message::AddTimeout)
}
}

Expand Down Expand Up @@ -723,9 +715,14 @@ impl<F, A> Future for AddLoopData<F, A>
type Item = LoopData<A>;
type Error = io::Error;

fn poll(&mut self, _task: &mut Task) -> Poll<LoopData<A>, io::Error> {
let ret = self.inner.poll(|_lp, f| {
fn poll(&mut self, task: &mut Task) -> Poll<LoopData<A>, io::Error> {
let ret = self.inner.poll(task, |_lp, f| {
Ok(DropBox::new(f()))
}, |f, slot| {
Message::Run(Box::new(move || {
slot.try_produce(Ok(DropBox::new(f()))).ok()
.expect("add loop data try_produce intereference");
}))
});

ret.map(|data| {
Expand All @@ -735,15 +732,6 @@ impl<F, A> Future for AddLoopData<F, A>
}
})
}

fn schedule(&mut self, task: &mut Task) {
self.inner.schedule(task, |f, slot| {
Message::Run(Box::new(move || {
slot.try_produce(Ok(DropBox::new(f()))).ok()
.expect("add loop data try_produce intereference");
}))
})
}
}

impl<A: 'static> LoopData<A> {
Expand Down Expand Up @@ -798,15 +786,6 @@ impl<A: Future> Future for LoopData<A> {
task.poll_on(self.executor());
Poll::NotReady
}

fn schedule(&mut self, task: &mut Task) {
// If we're on the right thread, then we're good to go, otherwise we
// need to get poll'd to tell the task to move somewhere else.
match self.get_mut() {
Some(inner) => inner.schedule(task),
None => task.notify(),
}
}
}

impl<A: 'static> Drop for LoopData<A> {
Expand Down Expand Up @@ -975,48 +954,42 @@ struct LoopFuture<T, U> {
impl<T, U> LoopFuture<T, U>
where T: 'static,
{
fn poll<F>(&mut self, f: F) -> Poll<T, io::Error>
fn poll<F, G>(&mut self, task: &mut Task, f: F, g: G) -> Poll<T, io::Error>
where F: FnOnce(&Loop, U) -> io::Result<T>,
G: FnOnce(U, Arc<Slot<io::Result<T>>>) -> Message,
{
match self.result {
Some((ref result, ref token)) => {
Some((ref result, ref mut token)) => {
result.cancel(*token);
match result.try_consume() {
Ok(t) => t.into(),
Err(_) => Poll::NotReady,
Ok(t) => return t.into(),
Err(_) => {}
}
let handle = task.handle().clone();
*token = result.on_full(move |_| {
handle.notify();
});
return Poll::NotReady
}
None => {
let data = &mut self.data;
self.loop_handle.with_loop(|lp| {
match lp {
Some(lp) => f(lp, data.take().unwrap()).into(),
None => Poll::NotReady,
}
})
}
}
}
let ret = self.loop_handle.with_loop(|lp| {
lp.map(|lp| f(lp, data.take().unwrap()))
});
if let Some(ret) = ret {
return ret.into()
}

fn schedule<F>(&mut self, task: &mut Task, f: F)
where F: FnOnce(U, Arc<Slot<io::Result<T>>>) -> Message,
{
if let Some((ref result, ref mut token)) = self.result {
result.cancel(*token);
let handle = task.handle().clone();
*token = result.on_full(move |_| {
handle.notify();
});
return
let handle = task.handle().clone();
let result = Arc::new(Slot::new(None));
let token = result.on_full(move |_| {
handle.notify();
});
self.result = Some((result.clone(), token));
self.loop_handle.send(g(data.take().unwrap(), result));
Poll::NotReady
}
}

let handle = task.handle().clone();
let result = Arc::new(Slot::new(None));
let token = result.on_full(move |_| {
handle.notify();
});
self.result = Some((result.clone(), token));
self.loop_handle.send(f(self.data.take().unwrap(), result))
}
}

Expand Down

0 comments on commit d07c397

Please sign in to comment.