Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem: hard to test cooperative execution #14

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,10 @@ pub fn start() {
});
dbg!("starting executor, sending to task1");
let _ = sender1.send(());
executor::run_cooperatively(Some(task2));
executor::run_cooperatively(
Some(task2),
Some(|| {
dbg!("Done");
}),
);
}
55 changes: 53 additions & 2 deletions src/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,42 @@ extern "C" {
/// but it will schedule its own execution if the desired outcome wasn't reached, allowing
/// JavaScript event loop to proceed.
///
/// If it is desirable to perform a computation after the task queue has been exhausted without
/// resorting to waiting on completion of all relevant tasks, `and_then` parameter is used
/// to pass a callback with a function to be called once the task queue has been exhausted.
///
/// This function is available under `cooperative` feature gate.
///
/// ## Note
///
/// On target architectures other than `wasm32` this function will call [`run`] and then
/// invoke `and_then` callback if provided.
#[cfg(feature = "cooperative")]
pub fn run_cooperatively(until: Option<Task>) {
#[cfg(target_arch = "wasm32")]
pub fn run_cooperatively<F>(until: Option<Task>, mut and_then: Option<F>)
where
F: 'static + FnOnce(),
{
if !run_max(&until, Some(1)) {
set_timeout(Closure::once_into_js(|| run_cooperatively(until)));
set_timeout(Closure::once_into_js(|| {
run_cooperatively(until, and_then);
}));
} else {
if let Some(f) = and_then.take() {
f();
}
}
}

#[cfg(feature = "cooperative")]
#[cfg(not(target_arch = "wasm32"))]
pub fn run_cooperatively<F>(until: Option<Task>, mut and_then: Option<F>)
where
F: 'static + FnOnce(),
{
run(until);
if let Some(f) = and_then.take() {
f();
}
}

Expand Down Expand Up @@ -412,4 +443,24 @@ mod tests {
evict_all();
assert_eq!(tokens().len(), 0);
}

#[cfg(feature = "cooperative")]
#[cfg_attr(not(target_arch = "wasm32"), test)]
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test)]
fn cooperative_execution() {
use tokio::sync::*;
let (sender, receiver) = oneshot::channel();
spawn(async move {
let _ = sender.send(());
});
run_cooperatively(
None,
Some(|| {
spawn(async move {
assert_eq!(receiver.await.unwrap(), ());
});
run(None);
}),
);
}
}