From bdcf27c7cb0f8d01f59b770e75daf256c72e960d Mon Sep 17 00:00:00 2001 From: Pauan Date: Thu, 26 Sep 2019 20:33:12 +0200 Subject: [PATCH] Major improvements to wasm-bindgen-futures (#1760) This PR contains a few major improvements: * Code duplication has been removed. * Everything has been refactored so that the implementation is much easier to understand. * `future_to_promise` is now implemented with `spawn_local` rather than the other way around (this means `spawn_local` is faster since it doesn't need to create an unneeded `Promise`). * Both the single threaded and multi threaded executors have been rewritten from scratch: * They only create 1-2 allocations in Rust per Task, and all of the allocations happen when the Task is created. * The singlethreaded executor creates 1 Promise per tick, rather than 1 Promise per tick per Task. * Both executors do *not* create `Closure`s during polling, instead all needed `Closure`s are created ahead of time. * Both executors now have correct behavior with regard to spurious wakeups and waking up during the call to `poll`. * Both executors cache the `Waker` so it doesn't need to be recreated all the time. I believe both executors are now optimal in terms of both Rust and JS performance. --- crates/futures/src/lib.rs | 223 ++++++++++++++++-- crates/futures/src/multithread.rs | 184 --------------- crates/futures/src/queue.rs | 89 +++++++ crates/futures/src/shared.rs | 118 --------- crates/futures/src/singlethread.rs | 142 ----------- crates/futures/src/task/multithread.rs | 182 ++++++++++++++ crates/futures/src/task/singlethread.rs | 113 +++++++++ .../src/{ => task}/wait_async_polyfill.rs | 0 crates/futures/tests/tests.rs | 2 +- crates/macro/ui-tests/async-errors.stderr | 3 +- 10 files changed, 585 insertions(+), 471 deletions(-) delete mode 100644 crates/futures/src/multithread.rs create mode 100644 crates/futures/src/queue.rs delete mode 100644 crates/futures/src/shared.rs delete mode 100644 crates/futures/src/singlethread.rs create mode 100644 crates/futures/src/task/multithread.rs create mode 100644 crates/futures/src/task/singlethread.rs rename crates/futures/src/{ => task}/wait_async_polyfill.rs (100%) diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index bcb15c60072..c7a39bcdd66 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -7,46 +7,221 @@ //! ability to interoperate with JavaScript events and JavaScript I/O //! primitives. //! -//! There are two main interfaces in this crate currently: +//! There are three main interfaces in this crate currently: //! //! 1. [**`JsFuture`**](./struct.JsFuture.html) //! //! A type that is constructed with a `Promise` and can then be used as a -//! `Future`. This Rust future will resolve +//! `Future>`. This Rust future will resolve //! or reject with the value coming out of the `Promise`. //! //! 2. [**`future_to_promise`**](./fn.future_to_promise.html) //! -//! Converts a Rust `Future` into a +//! Converts a Rust `Future>` into a //! JavaScript `Promise`. The future's result will translate to either a -//! rejected or resolved `Promise` in JavaScript. +//! resolved or rejected `Promise` in JavaScript. //! -//! These two items should provide enough of a bridge to interoperate the two -//! systems and make sure that Rust/JavaScript can work together with -//! asynchronous and I/O work. +//! 3. [**`spawn_local`**](./fn.spawn_local.html) //! -//! # Example Usage +//! Spawns a `Future` on the current thread. This is the +//! best way to run a `Future` in Rust without sending it to JavaScript. //! -//! This example wraps JavaScript's `Promise.resolve()` into a Rust `Future` for -//! running tasks on the next tick of the micro task queue. The futures built on -//! top of it can be scheduled for execution by conversion into a JavaScript -//! `Promise`. +//! These three items should provide enough of a bridge to interoperate the two +//! systems and make sure that Rust/JavaScript can work together with +//! asynchronous and I/O work. #![cfg_attr(target_feature = "atomics", feature(stdsimd))] #![deny(missing_docs)] -use cfg_if::cfg_if; +use js_sys::Promise; +use std::cell::RefCell; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll, Waker}; +use wasm_bindgen::prelude::*; + +mod queue; + +mod task { + use cfg_if::cfg_if; + + cfg_if! { + if #[cfg(target_feature = "atomics")] { + mod wait_async_polyfill; + mod multithread; + pub(crate) use multithread::*; + + } else { + mod singlethread; + pub(crate) use singlethread::*; + } + } +} + +/// Runs a Rust `Future` on the current thread. +/// +/// The `future` must be `'static` because it will be scheduled +/// to run in the background and cannot contain any stack references. +/// +/// The `future` will always be run on the next microtask tick even if it +/// immediately returns `Poll::Ready`. +/// +/// # Panics +/// +/// This function has the same panic behavior as `future_to_promise`. +#[inline] +pub fn spawn_local(future: F) +where + F: Future + 'static, +{ + task::Task::spawn(Box::pin(future)); +} + +struct Inner { + result: Option>, + task: Option, + callbacks: Option<(Closure, Closure)>, +} + +/// A Rust `Future` backed by a JavaScript `Promise`. +/// +/// This type is constructed with a JavaScript `Promise` object and translates +/// it to a Rust `Future`. This type implements the `Future` trait from the +/// `futures` crate and will either succeed or fail depending on what happens +/// with the JavaScript `Promise`. +/// +/// Currently this type is constructed with `JsFuture::from`. +pub struct JsFuture { + inner: Rc>, +} + +impl fmt::Debug for JsFuture { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "JsFuture {{ ... }}") + } +} + +impl From for JsFuture { + fn from(js: Promise) -> JsFuture { + // Use the `then` method to schedule two callbacks, one for the + // resolved value and one for the rejected value. We're currently + // assuming that JS engines will unconditionally invoke precisely one of + // these callbacks, no matter what. + // + // Ideally we'd have a way to cancel the callbacks getting invoked and + // free up state ourselves when this `JsFuture` is dropped. We don't + // have that, though, and one of the callbacks is likely always going to + // be invoked. + // + // As a result we need to make sure that no matter when the callbacks + // are invoked they are valid to be called at any time, which means they + // have to be self-contained. Through the `Closure::once` and some + // `Rc`-trickery we can arrange for both instances of `Closure`, and the + // `Rc`, to all be destroyed once the first one is called. + let state = Rc::new(RefCell::new(Inner { + result: None, + task: None, + callbacks: None, + })); + + fn finish(state: &RefCell, val: Result) { + let task = { + let mut state = state.borrow_mut(); + debug_assert!(state.callbacks.is_some()); + debug_assert!(state.result.is_none()); + + // First up drop our closures as they'll never be invoked again and + // this is our chance to clean up their state. + drop(state.callbacks.take()); + + // Next, store the value into the internal state. + state.result = Some(val); + state.task.take() + }; + + // And then finally if any task was waiting on the value wake it up and + // let them know it's there. + if let Some(task) = task { + task.wake() + } + } + + let resolve = { + let state = state.clone(); + Closure::once(move |val| finish(&state, Ok(val))) + }; + + let reject = { + let state = state.clone(); + Closure::once(move |val| finish(&state, Err(val))) + }; + + js.then2(&resolve, &reject); + + state.borrow_mut().callbacks = Some((resolve, reject)); + + JsFuture { inner: state } + } +} + +impl Future for JsFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut inner = self.inner.borrow_mut(); + + // If our value has come in then we return it... + if let Some(val) = inner.result.take() { + return Poll::Ready(val); + } + + // ... otherwise we arrange ourselves to get woken up once the value + // does come in + inner.task = Some(cx.waker().clone()); + Poll::Pending + } +} + +/// Converts a Rust `Future` into a JavaScript `Promise`. +/// +/// This function will take any future in Rust and schedule it to be executed, +/// returning a JavaScript `Promise` which can then be passed to JavaScript. +/// +/// The `future` must be `'static` because it will be scheduled to run in the +/// background and cannot contain any stack references. +/// +/// The returned `Promise` will be resolved or rejected when the future completes, +/// depending on whether it finishes with `Ok` or `Err`. +/// +/// # Panics +/// +/// Note that in wasm panics are currently translated to aborts, but "abort" in +/// this case means that a JavaScript exception is thrown. The wasm module is +/// still usable (likely erroneously) after Rust panics. +/// +/// If the `future` provided panics then the returned `Promise` **will not +/// resolve**. Instead it will be a leaked promise. This is an unfortunate +/// limitation of wasm currently that's hoped to be fixed one day! +pub fn future_to_promise(future: F) -> Promise +where + F: Future> + 'static, +{ + let mut future = Some(future); -mod shared; -pub use shared::*; + Promise::new(&mut |resolve, reject| { + let future = future.take().unwrap_throw(); -cfg_if! { - if #[cfg(target_feature = "atomics")] { - mod wait_async_polyfill; - mod multithread; - pub use multithread::*; - } else { - mod singlethread; - pub use singlethread::*; - } + spawn_local(async move { + match future.await { + Ok(val) => { + resolve.call1(&JsValue::undefined(), &val).unwrap_throw(); + } + Err(val) => { + reject.call1(&JsValue::undefined(), &val).unwrap_throw(); + } + } + }); + }) } diff --git a/crates/futures/src/multithread.rs b/crates/futures/src/multithread.rs deleted file mode 100644 index 79afcc71d23..00000000000 --- a/crates/futures/src/multithread.rs +++ /dev/null @@ -1,184 +0,0 @@ -use js_sys::Promise; -use std::future::Future; -use std::mem::ManuallyDrop; -use std::pin::Pin; -use std::sync::atomic::{AtomicI32, Ordering::SeqCst}; -use std::sync::Arc; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use wasm_bindgen::prelude::*; -use wasm_bindgen::JsCast; - -// Duplicate a bit here because `then` takes a `JsValue` instead of a `Closure`. -#[wasm_bindgen] -extern "C" { - type MyPromise; - #[wasm_bindgen(method)] - fn then(this: &MyPromise, cb: &JsValue); - - type Atomics; - #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)] - fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise; - #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)] - fn get_wait_async() -> JsValue; -} - -pub use crate::shared::{spawn_local, JsFuture}; - -/// Converts a Rust `Future` into a JavaScript `Promise`. -/// -/// This function will take any future in Rust and schedule it to be executed, -/// returning a JavaScript `Promise` which can then be passed back to JavaScript -/// to get plumbed into the rest of a system. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. The -/// returned `Promise` will be resolved or rejected when the future completes, -/// depending on whether it finishes with `Ok` or `Err`. -/// -/// # Panics -/// -/// Note that in wasm panics are currently translated to aborts, but "abort" in -/// this case means that a JavaScript exception is thrown. The wasm module is -/// still usable (likely erroneously) after Rust panics. -/// -/// If the `future` provided panics then the returned `Promise` **will not -/// resolve**. Instead it will be a leaked promise. This is an unfortunate -/// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise -where - F: Future> + 'static, -{ - _future_to_promise(Box::new(future)) -} - -fn _future_to_promise(future: Box>>) -> Promise { - let mut future = Some(future); - js_sys::Promise::new(&mut move |resolve, reject| { - let future = match future.take() { - Some(f) => f, - None => wasm_bindgen::throw_str("cannot invoke twice"), - }; - let state = Arc::new(State { - value: AtomicI32::new(1), - }); - Package { - // Note that the unsafety should be ok here since we're always - // passing in valid pointers and we're handling cleanup with - // `Waker`. - waker: unsafe { Waker::from_raw(State::into_raw_waker(state.clone())) }, - state, - future: Pin::from(future), - resolve, - reject, - } - .poll(); - }) -} - -struct Package { - state: Arc, - future: Pin>>>, - resolve: js_sys::Function, - reject: js_sys::Function, - waker: Waker, -} - -struct State { - value: AtomicI32, -} - -impl Package { - fn poll(mut self) { - // Flag ourselves as ready to receive another notification. We should - // never enter this method unless our `value` is set to `1`, so assert - // that as well. - let prev = self.state.value.swap(0, SeqCst); - debug_assert_eq!(prev, 1); - - // Afterwards start making progress on the future by calling the `poll` - // function. If we get `Ready` then we simply invoke the appropriate JS - // function to resolve the JS `Promise` we're connected to. - // - // If `Pending` is received then we're guaranteed to eventually receive - // an `atomic.notify` as well as a store to `1` in `self.state.value`. - // In this case we create a new promise (using `Atomics.waitAsync`) and - // then we register that promise to continue polling when it's resolved. - // Note that if a `wake` happened while we were polling or after we see - // `Pending` then the promise should end up resolving immediately due to - // the atomicity of `Atomics.waitAsync` with `Atomics.notify`. - let mut cx = Context::from_waker(&self.waker); - let (f, val) = match self.future.as_mut().poll(&mut cx) { - Poll::Ready(Ok(val)) => (&self.resolve, val), - Poll::Ready(Err(val)) => (&self.reject, val), - - // Create a `js_sys::Promise` using `Atomics.waitAsync` (or our - // polyfill) and then register its completion callback as simply - // calling this function again. - Poll::Pending => { - let promise = wait_async(&self.state.value, 0).unchecked_into::(); - let closure = Closure::once_into_js(move || { - self.poll(); - }); - promise.then(&closure); - return; - } - }; - f.call1(&JsValue::undefined(), &val).unwrap_throw(); - } -} - -impl State { - fn wake(&self) { - // Attempt to notify us by storing 1. If we're already 1 then we were - // previously notified and there's nothing to do. Otherwise we execute - // the native `notify` instruction to wake up the corresponding - // `waitAsync` that was waiting for the transition from 0 to 1. - let prev = self.value.swap(1, SeqCst); - if prev == 1 { - return; - } - debug_assert_eq!(prev, 0); - unsafe { - core::arch::wasm32::atomic_notify( - &self.value as *const AtomicI32 as *mut i32, - 1, // number of threads to notify - ); - } - } - - unsafe fn into_raw_waker(me: Arc) -> RawWaker { - const VTABLE: RawWakerVTable = - RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); - RawWaker::new(Arc::into_raw(me) as *const (), &VTABLE) - } -} - -unsafe fn raw_clone(ptr: *const ()) -> RawWaker { - let ptr = ManuallyDrop::new(Arc::from_raw(ptr as *const State)); - State::into_raw_waker((*ptr).clone()) -} - -unsafe fn raw_wake(ptr: *const ()) { - let ptr = Arc::from_raw(ptr as *const State); - ptr.wake(); -} - -unsafe fn raw_wake_by_ref(ptr: *const ()) { - let ptr = ManuallyDrop::new(Arc::from_raw(ptr as *const State)); - ptr.wake(); -} - -unsafe fn raw_drop(ptr: *const ()) { - drop(Arc::from_raw(ptr as *const State)); -} - -fn wait_async(ptr: &AtomicI32, val: i32) -> js_sys::Promise { - // If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today) - // then we use our fallback, otherwise we use the native function. - if Atomics::get_wait_async().is_undefined() { - crate::wait_async_polyfill::wait_async(ptr, val) - } else { - let mem = wasm_bindgen::memory().unchecked_into::(); - Atomics::wait_async(&mem.buffer(), ptr as *const AtomicI32 as i32 / 4, val) - } -} diff --git a/crates/futures/src/queue.rs b/crates/futures/src/queue.rs new file mode 100644 index 00000000000..718c8709a7e --- /dev/null +++ b/crates/futures/src/queue.rs @@ -0,0 +1,89 @@ +use js_sys::Promise; +use std::cell::{Cell, RefCell}; +use std::collections::VecDeque; +use std::rc::Rc; +use wasm_bindgen::prelude::*; + +struct QueueState { + // The queue of Tasks which will be run in order. In practice this is all the + // synchronous work of futures, and each `Task` represents calling `poll` on + // a future "at the right time" + tasks: RefCell>>, + + // This flag indicates whether we're currently executing inside of + // `run_all` or have scheduled `run_all` to run in the future. This is + // used to ensure that it's only scheduled once. + is_spinning: Cell, +} + +impl QueueState { + fn run_all(&self) { + debug_assert!(self.is_spinning.get()); + + // Runs all Tasks until empty. This blocks the event loop if a Future is + // stuck in an infinite loop, so we may want to yield back to the main + // event loop occasionally. For now though greedy execution should get + // the job done. + loop { + let task = match self.tasks.borrow_mut().pop_front() { + Some(task) => task, + None => break, + }; + task.run(); + } + + // All of the Tasks have been run, so it's now possible to schedule the + // next tick again + self.is_spinning.set(false); + } +} + +pub(crate) struct Queue { + state: Rc, + promise: Promise, + closure: Closure, +} + +impl Queue { + pub(crate) fn push_task(&self, task: Rc) { + self.state.tasks.borrow_mut().push_back(task); + + // If we're already inside the `run_all` loop then that'll pick up the + // task we just enqueued. If we're not in `run_all`, though, then we need + // to schedule a microtask. + // + // Note that we currently use a promise and a closure to do this, but + // eventually we should probably use something like `queueMicrotask`: + // https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope/queueMicrotask + if !self.state.is_spinning.replace(true) { + self.promise.then(&self.closure); + } + } +} + +impl Queue { + fn new() -> Self { + let state = Rc::new(QueueState { + is_spinning: Cell::new(false), + tasks: RefCell::new(VecDeque::new()), + }); + + Self { + promise: Promise::resolve(&JsValue::undefined()), + + closure: { + let state = Rc::clone(&state); + + // This closure will only be called on the next microtask event + // tick + Closure::wrap(Box::new(move |_| state.run_all())) + }, + + state, + } + } +} + +thread_local! { + pub(crate) static QUEUE: Queue = Queue::new(); +} diff --git a/crates/futures/src/shared.rs b/crates/futures/src/shared.rs deleted file mode 100644 index 71fab9d3e65..00000000000 --- a/crates/futures/src/shared.rs +++ /dev/null @@ -1,118 +0,0 @@ -use js_sys::Promise; -use std::cell::RefCell; -use std::fmt; -use std::future::Future; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll, Waker}; -use wasm_bindgen::prelude::*; - -/// A Rust `Future` backed by a JavaScript `Promise`. -/// -/// This type is constructed with a JavaScript `Promise` object and translates -/// it to a Rust `Future`. This type implements the `Future` trait from the -/// `futures` crate and will either succeed or fail depending on what happens -/// with the JavaScript `Promise`. -/// -/// Currently this type is constructed with `JsFuture::from`. -pub struct JsFuture { - inner: Rc>, -} - -struct Inner { - result: Option>, - task: Option, - callbacks: Option<(Closure, Closure)>, -} - -impl fmt::Debug for JsFuture { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "JsFuture {{ ... }}") - } -} - -impl From for JsFuture { - fn from(js: Promise) -> JsFuture { - // Use the `then` method to schedule two callbacks, one for the - // resolved value and one for the rejected value. We're currently - // assuming that JS engines will unconditionally invoke precisely one of - // these callbacks, no matter what. - // - // Ideally we'd have a way to cancel the callbacks getting invoked and - // free up state ourselves when this `JsFuture` is dropped. We don't - // have that, though, and one of the callbacks is likely always going to - // be invoked. - // - // As a result we need to make sure that no matter when the callbacks - // are invoked they are valid to be called at any time, which means they - // have to be self-contained. Through the `Closure::once` and some - // `Rc`-trickery we can arrange for both instances of `Closure`, and the - // `Rc`, to all be destroyed once the first one is called. - let state = Rc::new(RefCell::new(Inner { - result: None, - task: None, - callbacks: None, - })); - let state2 = state.clone(); - let resolve = Closure::once(move |val| finish(&state2, Ok(val))); - let state2 = state.clone(); - let reject = Closure::once(move |val| finish(&state2, Err(val))); - - js.then2(&resolve, &reject); - state.borrow_mut().callbacks = Some((resolve, reject)); - - return JsFuture { inner: state }; - - fn finish(state: &RefCell, val: Result) { - // First up drop our closures as they'll never be invoked again and - // this is our chance to clean up their state. Next store the value - // into the internal state, and then finally if any task was waiting - // on the value wake it up and let them know it's there. - let task = { - let mut state = state.borrow_mut(); - debug_assert!(state.callbacks.is_some()); - debug_assert!(state.result.is_none()); - drop(state.callbacks.take()); - state.result = Some(val); - state.task.take() - }; - if let Some(task) = task { - task.wake() - } - } - } -} - -impl Future for JsFuture { - type Output = Result; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut inner = self.inner.borrow_mut(); - // If our value has come in then we return it... - if let Some(val) = inner.result.take() { - return Poll::Ready(val) - } - // ... otherwise we arrange ourselves to get woken up once the value - // does come in - inner.task = Some(cx.waker().clone()); - Poll::Pending - } -} - -/// Converts a Rust `Future` on a local task queue. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. -/// -/// # Panics -/// -/// This function has the same panic behavior as `future_to_promise`. -pub fn spawn_local(future: F) -where - F: Future + 'static, -{ - crate::future_to_promise(async { - future.await; - Ok(JsValue::undefined()) - }); -} diff --git a/crates/futures/src/singlethread.rs b/crates/futures/src/singlethread.rs deleted file mode 100644 index dae2cfa438a..00000000000 --- a/crates/futures/src/singlethread.rs +++ /dev/null @@ -1,142 +0,0 @@ -pub use crate::shared::{spawn_local, JsFuture}; -use js_sys::Promise; -use std::cell::{Cell, RefCell}; -use std::future::Future; -use std::mem::ManuallyDrop; -use std::pin::Pin; -use std::rc::Rc; -use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; -use wasm_bindgen::prelude::*; -use wasm_bindgen::JsCast; - -/// Converts a Rust `Future` into a JavaScript `Promise`. -/// -/// This function will take any future in Rust and schedule it to be executed, -/// returning a JavaScript `Promise` which can then be passed back to JavaScript -/// to get plumbed into the rest of a system. -/// -/// The `future` provided must adhere to `'static` because it'll be scheduled -/// to run in the background and cannot contain any stack references. The -/// returned `Promise` will be resolved or rejected when the future completes, -/// depending on whether it finishes with `Ok` or `Err`. -/// -/// # Panics -/// -/// Note that in wasm panics are currently translated to aborts, but "abort" in -/// this case means that a JavaScript exception is thrown. The wasm module is -/// still usable (likely erroneously) after Rust panics. -/// -/// If the `future` provided panics then the returned `Promise` **will not -/// resolve**. Instead it will be a leaked promise. This is an unfortunate -/// limitation of wasm currently that's hoped to be fixed one day! -pub fn future_to_promise(future: F) -> Promise -where - F: Future> + 'static, -{ - _future_to_promise(Box::new(future)) -} - -fn _future_to_promise(future: Box>>) -> Promise { - let mut future = Some(future); - js_sys::Promise::new(&mut move |resolve, reject| { - let future = match future.take() { - Some(f) => f, - None => wasm_bindgen::throw_str("cannot invoke twice"), - }; - let state = Rc::new(State { - queued: Cell::new(true), - future: RefCell::new(Some(Pin::from(future))), - resolve, - reject, - }); - State::poll(&state); - }) -} - -struct State { - queued: Cell, - future: RefCell>>>>>, - resolve: js_sys::Function, - reject: js_sys::Function, -} - -impl State { - fn poll(me: &Rc) { - debug_assert!(me.queued.get()); - me.queued.set(false); - let waker = unsafe { Waker::from_raw(State::into_raw_waker(me.clone())) }; - let mut future = me.future.borrow_mut(); - - let mut done = false; - if let Some(future) = &mut *future { - match Future::poll(future.as_mut(), &mut Context::from_waker(&waker)) { - Poll::Ready(Ok(val)) => { - me.resolve.call1(&JsValue::undefined(), &val).unwrap_throw(); - done = true; - } - Poll::Ready(Err(val)) => { - me.reject.call1(&JsValue::undefined(), &val).unwrap_throw(); - done = true; - } - Poll::Pending => {} - } - } - if done { - debug_assert!(future.is_some()); - drop(future.take()); - return; - } - } - - fn wake(me: &Rc) { - // If we're already enqueued on the microtask queue there's nothing else - // to do, this is a duplicate notification. - if me.queued.replace(true) { - return; - } - - // Use `Promise.then` on a resolved promise to place our execution - // onto the next turn of the microtask queue, enqueueing our poll - // operation. If we were to poll immediately we run the risk of blowing - // the stack. - let promise = Promise::resolve(&JsValue::undefined()); - let promise = promise.unchecked_ref::(); - let me = me.clone(); - let closure = Closure::once_into_js(move || { - State::poll(&me); - }); - promise.then(&closure); - - #[wasm_bindgen] - extern "C" { - type MyPromise; - #[wasm_bindgen(js_class = Promise, method)] - fn then(this: &MyPromise, closure: &JsValue); - } - } - - unsafe fn into_raw_waker(me: Rc) -> RawWaker { - const VTABLE: RawWakerVTable = - RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); - RawWaker::new(Rc::into_raw(me) as *const (), &VTABLE) - } -} - -unsafe fn raw_clone(ptr: *const ()) -> RawWaker { - let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const State)); - State::into_raw_waker((*ptr).clone()) -} - -unsafe fn raw_wake(ptr: *const ()) { - let ptr = Rc::from_raw(ptr as *const State); - State::wake(&ptr); -} - -unsafe fn raw_wake_by_ref(ptr: *const ()) { - let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const State)); - State::wake(&ptr); -} - -unsafe fn raw_drop(ptr: *const ()) { - drop(Rc::from_raw(ptr as *const State)); -} diff --git a/crates/futures/src/task/multithread.rs b/crates/futures/src/task/multithread.rs new file mode 100644 index 00000000000..70f28cf4b66 --- /dev/null +++ b/crates/futures/src/task/multithread.rs @@ -0,0 +1,182 @@ +use std::cell::RefCell; +use std::future::Future; +use std::mem::ManuallyDrop; +use std::pin::Pin; +use std::rc::Rc; +use std::sync::atomic::AtomicI32; +use std::sync::atomic::Ordering::SeqCst; +use std::sync::Arc; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; +use wasm_bindgen::prelude::*; +use wasm_bindgen::JsCast; + +const SLEEPING: i32 = 0; +const AWAKE: i32 = 1; + +struct AtomicWaker { + state: AtomicI32, +} + +impl AtomicWaker { + fn new() -> Arc { + Arc::new(Self { + state: AtomicI32::new(AWAKE), + }) + } + + fn wake_by_ref(&self) { + // If we're already AWAKE then we previously notified and there's + // nothing to do... + match self.state.swap(AWAKE, SeqCst) { + AWAKE => return, + other => debug_assert_eq!(other, SLEEPING), + } + + // ... otherwise we execute the native `notify` instruction to wake up + // the corresponding `waitAsync` that was waiting for the transition + // from SLEEPING to AWAKE. + unsafe { + core::arch::wasm32::atomic_notify( + &self.state as *const AtomicI32 as *mut i32, + 1, // Number of threads to notify + ); + } + } + + /// Same as the singlethread module, this creates a standard library + /// `RawWaker`. We could use `futures_util::task::ArcWake` but it's small + /// enough that we just inline it for now. + unsafe fn into_raw_waker(this: Arc) -> RawWaker { + unsafe fn raw_clone(ptr: *const ()) -> RawWaker { + let ptr = ManuallyDrop::new(Arc::from_raw(ptr as *const AtomicWaker)); + AtomicWaker::into_raw_waker((*ptr).clone()) + } + + unsafe fn raw_wake(ptr: *const ()) { + let ptr = Arc::from_raw(ptr as *const AtomicWaker); + AtomicWaker::wake_by_ref(&ptr); + } + + unsafe fn raw_wake_by_ref(ptr: *const ()) { + let ptr = ManuallyDrop::new(Arc::from_raw(ptr as *const AtomicWaker)); + AtomicWaker::wake_by_ref(&ptr); + } + + unsafe fn raw_drop(ptr: *const ()) { + drop(Arc::from_raw(ptr as *const AtomicWaker)); + } + + const VTABLE: RawWakerVTable = + RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); + + RawWaker::new(Arc::into_raw(this) as *const (), &VTABLE) + } +} + +struct Inner { + future: Pin + 'static>>, + closure: Closure, +} + +pub(crate) struct Task { + atomic: Arc, + waker: Waker, + // See `singlethread.rs` for why this is an internal `Option`. + inner: RefCell>, +} + +impl Task { + pub(crate) fn spawn(future: Pin + 'static>>) { + let atomic = AtomicWaker::new(); + let waker = unsafe { Waker::from_raw(AtomicWaker::into_raw_waker(atomic.clone())) }; + let this = Rc::new(Task { + atomic, + waker, + inner: RefCell::new(None), + }); + + let closure = { + let this = Rc::clone(&this); + Closure::wrap(Box::new(move |_| this.run()) as Box) + }; + *this.inner.borrow_mut() = Some(Inner { future, closure }); + + // Queue up the Future's work to happen on the next microtask tick. + crate::queue::QUEUE.with(move |queue| queue.push_task(this)); + } + + pub(crate) fn run(&self) { + let mut borrow = self.inner.borrow_mut(); + + // Same as `singlethread.rs`, handle spurious wakeups happening after we + // finished. + let inner = match borrow.as_mut() { + Some(inner) => inner, + None => return, + }; + + // Also the same as `singlethread.rs`, flag ourselves as ready to + // receive a notification. + let prev = self.atomic.state.swap(SLEEPING, SeqCst); + debug_assert_eq!(prev, AWAKE); + + let poll = { + let mut cx = Context::from_waker(&self.waker); + inner.future.as_mut().poll(&mut cx) + }; + + match poll { + // Same as `singlethread.rs` (noticing a pattern?) clean up + // resources associated with the future ASAP. + Poll::Ready(()) => { + *borrow = None; + }, + + // Unlike `singlethread.rs` we are responsible for ensuring there's + // a closure to handle the notification that a Future is ready. In + // the single-threaded case the notification itself enqueues work, + // but in the multithreaded case we don't know what thread a + // notification comes from so we need to ensure the current running + // thread is the one that enqueues the work. To do that we execute + // `Atomics.waitAsync`, creating a local Promise on our own thread + // which will resolve once `Atomics.notify` is called. + // + // We could be in one of two states as we execute this: + // + // * `SLEEPING` - we'll get notified via `Atomics.notify` + // and then this Promise will resolve. + // + // * `AWAKE` - the Promise will immediately be resolved and + // we'll execute the work on the next microtask queue. + Poll::Pending => { + wait_async(&self.atomic.state, SLEEPING).then(&inner.closure); + } + } + } +} + +fn wait_async(ptr: &AtomicI32, current_value: i32) -> js_sys::Promise { + // If `Atomics.waitAsync` isn't defined (as it isn't defined anywhere today) + // then we use our fallback, otherwise we use the native function. + return if Atomics::get_wait_async().is_undefined() { + crate::task::wait_async_polyfill::wait_async(ptr, current_value) + } else { + let mem = wasm_bindgen::memory().unchecked_into::(); + Atomics::wait_async( + &mem.buffer(), + ptr as *const AtomicI32 as i32 / 4, + current_value, + ) + }; + + #[wasm_bindgen] + extern "C" { + type Atomics; + + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync)] + fn wait_async(buf: &JsValue, index: i32, value: i32) -> js_sys::Promise; + + #[wasm_bindgen(static_method_of = Atomics, js_name = waitAsync, getter)] + fn get_wait_async() -> JsValue; + } +} diff --git a/crates/futures/src/task/singlethread.rs b/crates/futures/src/task/singlethread.rs new file mode 100644 index 00000000000..38d3bfe7af6 --- /dev/null +++ b/crates/futures/src/task/singlethread.rs @@ -0,0 +1,113 @@ +use std::cell::{Cell, RefCell}; +use std::future::Future; +use std::mem::ManuallyDrop; +use std::pin::Pin; +use std::rc::Rc; +use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; + +struct Inner { + future: Pin + 'static>>, + waker: Waker, +} + +pub(crate) struct Task { + // The actual Future that we're executing as part of this task. + // + // This is an Option so that the Future can be immediately dropped when it's + // finished + inner: RefCell>, + + // This is used to ensure that the Task will only be queued once + is_queued: Cell, +} + +impl Task { + pub(crate) fn spawn(future: Pin + 'static>>) { + let this = Rc::new(Self { + inner: RefCell::new(None), + is_queued: Cell::new(false), + }); + + let waker = unsafe { Waker::from_raw(Task::into_raw_waker(Rc::clone(&this))) }; + + *this.inner.borrow_mut() = Some(Inner { future, waker }); + + Task::wake_by_ref(&this); + } + + fn wake_by_ref(this: &Rc) { + // If we've already been placed on the run queue then there's no need to + // requeue ourselves since we're going to run at some point in the + // future anyway. + if this.is_queued.replace(true) { + return; + } + + crate::queue::QUEUE.with(|queue| { + queue.push_task(Rc::clone(this)); + }); + } + + /// Creates a standard library `RawWaker` from an `Rc` of ourselves. + /// + /// Note that in general this is wildly unsafe because everything with + /// Futures requires `Sync` + `Send` with regard to Wakers. For wasm, + /// however, everything is guaranteed to be singlethreaded (since we're + /// compiled without the `atomics` feature) so we "safely lie" and say our + /// `Rc` pointer is good enough. + unsafe fn into_raw_waker(this: Rc) -> RawWaker { + unsafe fn raw_clone(ptr: *const ()) -> RawWaker { + let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); + Task::into_raw_waker((*ptr).clone()) + } + + unsafe fn raw_wake(ptr: *const ()) { + let ptr = Rc::from_raw(ptr as *const Task); + Task::wake_by_ref(&ptr); + } + + unsafe fn raw_wake_by_ref(ptr: *const ()) { + let ptr = ManuallyDrop::new(Rc::from_raw(ptr as *const Task)); + Task::wake_by_ref(&ptr); + } + + unsafe fn raw_drop(ptr: *const ()) { + drop(Rc::from_raw(ptr as *const Task)); + } + + const VTABLE: RawWakerVTable = + RawWakerVTable::new(raw_clone, raw_wake, raw_wake_by_ref, raw_drop); + + RawWaker::new(Rc::into_raw(this) as *const (), &VTABLE) + } + + pub(crate) fn run(&self) { + let mut borrow = self.inner.borrow_mut(); + + // Wakeups can come in after a Future has finished and been destroyed, + // so handle this gracefully by just ignoring the request to run. + let inner = match borrow.as_mut() { + Some(inner) => inner, + None => return, + }; + + // Ensure that if poll calls `waker.wake()` we can get enqueued back on + // the run queue. + self.is_queued.set(false); + + let poll = { + let mut cx = Context::from_waker(&inner.waker); + inner.future.as_mut().poll(&mut cx) + }; + + // If a future has finished (`Ready`) then clean up resources associated + // with the future ASAP. This ensures that we don't keep anything extra + // alive in-memory by accident. Our own struct, `Rc` won't + // actually go away until all wakers referencing us go away, which may + // take quite some time, so ensure that the heaviest of resources are + // released early. + if let Poll::Ready(_) = poll { + *borrow = None; + } + } +} diff --git a/crates/futures/src/wait_async_polyfill.rs b/crates/futures/src/task/wait_async_polyfill.rs similarity index 100% rename from crates/futures/src/wait_async_polyfill.rs rename to crates/futures/src/task/wait_async_polyfill.rs diff --git a/crates/futures/tests/tests.rs b/crates/futures/tests/tests.rs index 2d14a63ccae..f53866fb154 100644 --- a/crates/futures/tests/tests.rs +++ b/crates/futures/tests/tests.rs @@ -71,7 +71,7 @@ async fn spawn_local_runs() { #[wasm_bindgen_test] async fn spawn_local_err_no_exception() { let (tx, rx) = oneshot::channel::(); - spawn_local(async { }); + spawn_local(async {}); spawn_local(async { tx.send(42).unwrap(); }); diff --git a/crates/macro/ui-tests/async-errors.stderr b/crates/macro/ui-tests/async-errors.stderr index 035eab09fc1..19f97ffac3d 100644 --- a/crates/macro/ui-tests/async-errors.stderr +++ b/crates/macro/ui-tests/async-errors.stderr @@ -31,7 +31,7 @@ error[E0277]: the trait bound `wasm_bindgen::JsValue: std::convert::From> > > - and 62 others + and 61 others = note: required because of the requirements on the impl of `std::convert::Into` for `BadType` = note: required because of the requirements on the impl of `wasm_bindgen::__rt::IntoJsResult` for `BadType` = note: required by `wasm_bindgen::__rt::IntoJsResult::into_js_result` @@ -48,4 +48,3 @@ error[E0277]: the trait bound `std::result::Result