Skip to content

Commit

Permalink
test what happens when poll gets invoked many times
Browse files Browse the repository at this point in the history
  • Loading branch information
nikomatsakis committed Jan 11, 2017
1 parent d99dd89 commit 07cb64d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 4 deletions.
7 changes: 5 additions & 2 deletions src/scope/future/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,11 @@ impl<F: Future + Send> ScopeFutureContents<F> {

if let Some(waiting_task) = self.waiting_task.take() {
log!(FutureUnparkWaitingTask);
waiting_task.unpark();
match unwind::halt_unwinding(|| waiting_task.unpark()) {
Ok(()) => { }
Err(err) => {
}
}
}

// Allow the enclosing scope to end. Asserts that
Expand Down Expand Up @@ -411,7 +415,6 @@ impl<F> ScopeFutureTrait<<CU<F> as Future>::Item, <CU<F> as Future>::Error> for
let r = mem::replace(&mut contents.result, Ok(Async::NotReady));
return r;
} else {
assert!(contents.waiting_task.is_none());
log!(FutureInstallWaitingTask { state: state });
contents.waiting_task = Some(task::park());
Ok(Async::NotReady)
Expand Down
52 changes: 50 additions & 2 deletions src/scope/future/test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use futures::{self, Future};
use futures::{self, Async, Future};
use futures::future::lazy;
use futures::sync::oneshot;
use futures::task::{self, Unpark};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use ::{scope, ThreadPool, Configuration};

/// Basic test of using futures to data on the stack frame.
Expand Down Expand Up @@ -109,7 +113,6 @@ fn future_wait_panics_inside_rayon_thread() {
});
}


/// Test that invoking `wait` on a `RayonFuture` will not panic if we
/// are outside a Rayon worker thread.
#[test]
Expand All @@ -121,3 +124,48 @@ fn future_wait_works_outside_rayon_threads() {
assert_eq!(Ok(()), future.unwrap().wait());
}

struct TrackUnpark {
value: AtomicUsize
}

impl Unpark for TrackUnpark {
fn unpark(&self) {
self.value.fetch_add(1, Ordering::SeqCst);
}
}

#[test]
fn double_unpark() {
let unpark0 = Arc::new(TrackUnpark { value: AtomicUsize::new(0) });
let unpark1 = Arc::new(TrackUnpark { value: AtomicUsize::new(0) });
let mut _tag = None;
scope(|s| {
let (a_tx, a_rx) = oneshot::channel::<u32>();
let rf = s.spawn_future(a_rx);

let mut spawn = task::spawn(rf);

// test that we don't panic if people try to install a task many times;
// even if they are different tasks
for i in 0 .. 22 {
let u = if i % 2 == 0 { unpark0.clone() } else { unpark1.clone() };
match spawn.poll_future(u) {
Ok(Async::NotReady) => { /* good, we expect not to be ready yet */ }
r => panic!("spawn poll returned: {:?}", r),
}
}

a_tx.complete(22);

// just hold onto `rf` to ensure that nothing is cancelled
_tag = Some(spawn.into_inner());
});

// Since scope is done, our spawned future must have completed. It
// should have signalled the unpark value we gave it -- but
// exactly once, even though we called `poll` many times.
assert_eq!(unpark1.value.load(Ordering::SeqCst), 1);

// unpark0 was not the last unpark supplied, so it will never be signalled
assert_eq!(unpark0.value.load(Ordering::SeqCst), 0);
}

0 comments on commit 07cb64d

Please sign in to comment.