Skip to content

Commit

Permalink
Backport to 0.1: Shared must relinquish control to the executor if re…
Browse files Browse the repository at this point in the history
…polled

This backports #2136 to Futures 0.1. There isn't much to add on top of
what's mentioned in #2136: the same bug exists in Futures 0.1, and it'll
fail in the same way when polled in recent versions of Tokio (#2418).

Backporting to 0.1 allows codebases that still have some Futures 0.1
code around to still use newer versions of Tokio.
  • Loading branch information
krallin committed May 13, 2020
1 parent 56f8eb9 commit 51e7231
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 47 deletions.
80 changes: 33 additions & 47 deletions src/future/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,8 @@ struct Notifier {

const IDLE: usize = 0;
const POLLING: usize = 1;
const REPOLL: usize = 2;
const COMPLETE: usize = 3;
const POISONED: usize = 4;
const COMPLETE: usize = 2;
const POISONED: usize = 3;

pub fn new<F: Future>(future: F) -> Shared<F> {
Shared {
Expand Down Expand Up @@ -133,7 +132,7 @@ impl<F> Future for Shared<F>
IDLE => {
// Lock acquired, fall through
}
POLLING | REPOLL => {
POLLING => {
// Another task is currently polling, at this point we just want
// to ensure that our task handle is currently registered

Expand All @@ -146,56 +145,45 @@ impl<F> Future for Shared<F>
_ => unreachable!(),
}

loop {
struct Reset<'a>(&'a AtomicUsize);
struct Reset<'a>(&'a AtomicUsize);

impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
use std::thread;
impl<'a> Drop for Reset<'a> {
fn drop(&mut self) {
use std::thread;

if thread::panicking() {
self.0.store(POISONED, SeqCst);
}
if thread::panicking() {
self.0.store(POISONED, SeqCst);
}
}
}

let _reset = Reset(&self.inner.notifier.state);

// Poll the future
let res = unsafe {
(*self.inner.future.get()).as_mut().unwrap()
.poll_future_notify(&self.inner.notifier, 0)
};
match res {
Ok(Async::NotReady) => {
// Not ready, try to release the handle
match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
POLLING => {
// Success
return Ok(Async::NotReady);
}
REPOLL => {
// Gotta poll again!
let prev = self.inner.notifier.state.swap(POLLING, SeqCst);
assert_eq!(prev, REPOLL);
}
_ => unreachable!(),
let _reset = Reset(&self.inner.notifier.state);

// Poll the future
let res = unsafe {
(*self.inner.future.get()).as_mut().unwrap()
.poll_future_notify(&self.inner.notifier, 0)
};
match res {
Ok(Async::NotReady) => {
// Not ready, try to release the handle
match self.inner.notifier.state.compare_and_swap(POLLING, IDLE, SeqCst) {
POLLING => {
// Success
return Ok(Async::NotReady);
}

_ => unreachable!(),
}
Ok(Async::Ready(i)) => {
unsafe {
(*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) }));
}

break;
}
Ok(Async::Ready(i)) => {
unsafe {
(*self.inner.result.get()) = Some(Ok(SharedItem { item: Arc::new(i) }));
}
Err(e) => {
unsafe {
(*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) }));
}

break;
}
Err(e) => {
unsafe {
(*self.inner.result.get()) = Some(Err(SharedError { error: Arc::new(e) }));
}
}
}
Expand Down Expand Up @@ -225,8 +213,6 @@ impl<F> Drop for Shared<F> where F: Future {

impl Notify for Notifier {
fn notify(&self, _id: usize) {
self.state.compare_and_swap(POLLING, REPOLL, SeqCst);

let waiters = mem::replace(&mut *self.waiters.lock().unwrap(), HashMap::new());

for (_, waiter) in waiters {
Expand Down
30 changes: 30 additions & 0 deletions tests/shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,33 @@ fn recursive_poll_with_unpark() {
drop(tx0);
core.run(f3).unwrap();
}

#[test]
fn shared_future_that_wakes_itself_until_pending_is_returned() {
use futures::Async;
use std::cell::Cell;

let core = ::support::local_executor::Core::new();

let proceed = Cell::new(false);
let fut = futures::future::poll_fn(|| {
Result::<_, ()>::Ok(if proceed.get() {
Async::Ready(())
} else {
futures::task::current().notify();
Async::NotReady
})
})
.shared()
.map(|_| ())
.map_err(|_| ());

// The join future can only complete if the second future gets a chance to run after the first
// has returned pending
let second = futures::future::lazy(|| {
proceed.set(true);
Result::<_, ()>::Ok(())
});

core.run(fut.join(second)).unwrap();
}

0 comments on commit 51e7231

Please sign in to comment.