Skip to content

Commit

Permalink
Problem: hard to test cooperative execution
Browse files Browse the repository at this point in the history
This is because it relies on a scheduler outside of our domain
(JavaScript)

Solution: allow to run a callback after the queue is exhausted

This changes `single_threaded::run_cooperatively` type signature a bit,
adding an extra parameter.

The testing is still not perfect as we can't really ensure the callback
was called, but if it *was*, then we can run the assertions.

This can be alternatively done by something like `join_all` for all
futures representing tasks scheduled and spawning another task to do
something after they've been joined, but it seems at the time that
adding this callback has lower impact / easier ergonomics (but, of
course, it's less "pure" if you may)
  • Loading branch information
yrashk committed Feb 12, 2021
1 parent df48775 commit 089ed60
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 3 deletions.
7 changes: 6 additions & 1 deletion example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,10 @@ pub fn start() {
});
dbg!("starting executor, sending to task1");
let _ = sender1.send(());
executor::run_cooperatively(Some(task2));
executor::run_cooperatively(
Some(task2),
Some(|| {
dbg!("Done");
}),
);
}
55 changes: 53 additions & 2 deletions src/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,42 @@ extern "C" {
/// but it will schedule its own execution if the desired outcome wasn't reached, allowing
/// JavaScript event loop to proceed.
///
/// If it is desirable to perform a computation after the task queue has been exhausted without
/// resorting to waiting on completion of all relevant tasks, `and_then` parameter is used
/// to pass a callback with a function to be called once the task queue has been exhausted.
///
/// This function is available under `cooperative` feature gate.
///
/// ## Note
///
/// On target architectures other than `wasm32` this function will call [`run`] and then
/// invoke `and_then` callback if provided.
#[cfg(feature = "cooperative")]
pub fn run_cooperatively(until: Option<Task>) {
#[cfg(target_arch = "wasm32")]
pub fn run_cooperatively<F>(until: Option<Task>, mut and_then: Option<F>)
where
F: 'static + FnOnce(),
{
if !run_max(&until, Some(1)) {
set_timeout(Closure::once_into_js(|| run_cooperatively(until)));
set_timeout(Closure::once_into_js(|| {
run_cooperatively(until, and_then);
}));
} else {
if let Some(f) = and_then.take() {
f();
}
}
}

#[cfg(feature = "cooperative")]
#[cfg(not(target_arch = "wasm32"))]
pub fn run_cooperatively<F>(until: Option<Task>, mut and_then: Option<F>)
where
F: 'static + FnOnce(),
{
run(until);
if let Some(f) = and_then.take() {
f();
}
}

Expand Down Expand Up @@ -412,4 +443,24 @@ mod tests {
evict_all();
assert_eq!(tokens().len(), 0);
}

#[cfg(feature = "cooperative")]
#[cfg_attr(not(target_arch = "wasm32"), test)]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
fn cooperative_execution() {
use tokio::sync::*;
let (sender, receiver) = oneshot::channel();
spawn(async move {
let _ = sender.send(());
});
run_cooperatively(
None,
Some(|| {
spawn(async move {
assert_eq!(receiver.await.unwrap(), ());
});
run(None);
}),
);
}
}

0 comments on commit 089ed60

Please sign in to comment.