Skip to content

Commit

Permalink
Some early prototyping to get wasm support for Ractor
Browse files Browse the repository at this point in the history
Failing to execute however with timer problems in Node.js

```log
$ wasm-pack test --node
[INFO]: 🎯  Checking for the Wasm target...
    Finished dev [unoptimized + debuginfo] target(s) in 0.24s
[INFO]: ⬇️  Installing wasm-bindgen...
    Finished test [unoptimized + debuginfo] target(s) in 0.22s
     Running unittests src/lib.rs (/Users/seanlawlor/src/slawlor_ractor/target/wasm32-unknown-unknown/debug/deps/ractor-fd4d272b3d05cda5.wasm)
Set timeout to 20 seconds...
running 54 tests

panicked at 'time not implemented on this platform', library/std/src/sys/wasm/../unsupported/time.rs:31:9
```

Related to Issue #124 - Adding WASM support
  • Loading branch information
slawlor committed Oct 13, 2023
1 parent c593b4c commit 45ce5b8
Show file tree
Hide file tree
Showing 15 changed files with 219 additions and 23 deletions.
26 changes: 22 additions & 4 deletions ractor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ rust-version = "1.64"
### Other features
cluster = []

# default = ["async-std"]
default = []

[dependencies]
Expand All @@ -26,23 +25,42 @@ async-trait = "0.1"
dashmap = "5"
futures = "0.3"
once_cell = "1"
rand = "0.8"

# Tracing feature requires --cfg=tokio_unstable
tokio = { version = "1", features = ["sync", "time", "rt", "macros", "tracing"] }
async-std = { version = "1", features = ["attributes"], optional = true}
tracing = { version = "0.1", features = ["attributes"] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
# TODO #124 - this is due to the temporary disabling of the "factory" module
rand = "0.8"

[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1"
instant = "0.1"
getrandom = { version = "0.2", features = ["js"] }
parking_lot = { version = "0.11", features = ["wasm-bindgen"] }
tracing-wasm = "0.2"
wasmtimer = { version = "0.2" }
wasm-bindgen = "0.2"
wasm-bindgen-futures = { version = "0.4"}
wasm-bindgen-test = "0.3"

[dev-dependencies]
criterion = "0.5"
function_name = "0.3"
paste = "1"
rand = "0.8"
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread", "tracing"] }
tracing-glog = "0.3"
tracing-subscriber = { version = "0.3", features = ["env-filter"]}
tracing-test = "0.2"

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
tokio = { version = "1", features = ["rt", "time", "sync", "macros"] }

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
tokio = { version = "1", features = ["rt", "time", "sync", "macros", "rt-multi-thread", "tracing"] }
criterion = "0.5"

[[bench]]
name = "actor"
harness = false
Expand Down
2 changes: 2 additions & 0 deletions ractor/benches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

#![cfg(not(target_arch = "wasm32"))]

#[macro_use]
extern crate criterion;

Expand Down
3 changes: 2 additions & 1 deletion ractor/examples/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ fn init_logging() {
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
#[cfg_attr(not(target_arch = "wasm32"), tokio::main)]
#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))]
async fn main() {
init_logging();

Expand Down
3 changes: 2 additions & 1 deletion ractor/examples/monte_carlo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ fn init_logging() {
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
#[cfg_attr(not(target_arch = "wasm32"), tokio::main)]
#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))]
async fn main() {
init_logging();

Expand Down
3 changes: 2 additions & 1 deletion ractor/examples/output_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ fn init_logging() {
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
#[cfg_attr(not(target_arch = "wasm32"), tokio::main)]
#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))]
async fn main() {
init_logging();

Expand Down
3 changes: 2 additions & 1 deletion ractor/examples/philosophers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ fn init_logging() {
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
#[cfg_attr(not(target_arch = "wasm32"), tokio::main)]
#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))]
async fn main() {
init_logging();

Expand Down
3 changes: 2 additions & 1 deletion ractor/examples/ping_pong.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ fn init_logging() {
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
#[cfg_attr(not(target_arch = "wasm32"), tokio::main)]
#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))]
async fn main() {
init_logging();

Expand Down
3 changes: 2 additions & 1 deletion ractor/examples/supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ fn init_logging() {
tracing::subscriber::set_global_default(subscriber).expect("to set global subscriber");
}

#[tokio::main]
#[cfg_attr(not(target_arch = "wasm32"), tokio::main)]
#[cfg_attr(target_arch = "wasm32", tokio::main(flavor = "current_thread"))]
async fn main() {
init_logging();

Expand Down
10 changes: 5 additions & 5 deletions ractor/src/actor/actor_cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::any::TypeId;
use std::sync::Arc;

#[cfg(feature = "async-std")]
#[cfg(any(feature = "async-std", target_arch = "wasm32"))]
use futures::FutureExt;

use super::messages::{Signal, StopMessage};
Expand Down Expand Up @@ -110,7 +110,7 @@ impl ActorPortSet {
where
TState: crate::State,
{
#[cfg(feature = "async-std")]
#[cfg(any(feature = "async-std", target_arch = "wasm32"))]
{
crate::concurrency::select! {
// supervision or message processing work
Expand All @@ -124,7 +124,7 @@ impl ActorPortSet {
}
}
}
#[cfg(not(feature = "async-std"))]
#[cfg(not(any(feature = "async-std", target_arch = "wasm32")))]
{
crate::concurrency::select! {
// supervision or message processing work
Expand All @@ -149,7 +149,7 @@ impl ActorPortSet {
/// Returns [Ok(ActorPortMessage)] on a successful message reception, [MessagingErr]
/// in the event any of the channels is closed.
pub async fn listen_in_priority(&mut self) -> Result<ActorPortMessage, MessagingErr<()>> {
#[cfg(feature = "async-std")]
#[cfg(any(feature = "async-std", target_arch = "wasm32"))]
{
crate::concurrency::select! {
signal = self.signal_rx.recv().fuse() => {
Expand All @@ -166,7 +166,7 @@ impl ActorPortSet {
}
}
}
#[cfg(not(feature = "async-std"))]
#[cfg(not(any(feature = "async-std", target_arch = "wasm32")))]
{
crate::concurrency::select! {
signal = self.signal_rx.recv() => {
Expand Down
2 changes: 1 addition & 1 deletion ractor/src/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
Actor, ActorCell, ActorProcessingErr, ActorRef, ActorStatus, SpawnErr, SupervisionEvent,
};

mod supervisor;
pub mod supervisor;

struct EmptyMessage;
#[cfg(feature = "cluster")]
Expand Down
6 changes: 6 additions & 0 deletions ractor/src/common_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

// TODO #124 (slawlor): Redesign this without usage of core time primatives (i.e.
// use concurrency instants)
#[cfg(not(target_arch = "wasm32"))]
use std::future::Future;

use crate::concurrency::sleep;
Expand All @@ -23,6 +26,9 @@ where
assert!(check());
}

// TODO #124 (slawlor): Redesign this without usage of core time primatives (i.e.
// use concurrency instants)
#[cfg(not(target_arch = "wasm32"))]
pub async fn periodic_async_check<F, Fut>(check: F, timeout: Duration)
where
F: Fn() -> Fut,
Expand Down
13 changes: 9 additions & 4 deletions ractor/src/concurrency/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,17 @@ pub fn broadcast<T: Clone>(buffer: usize) -> (BroadcastSender<T>, BroadcastRecei
tokio::sync::broadcast::channel(buffer)
}

#[cfg(not(feature = "async-std"))]
#[cfg(not(any(feature = "async-std", target_arch = "wasm32")))]
pub mod tokio_primatives;
#[cfg(not(feature = "async-std"))]
#[cfg(not(any(feature = "async-std", target_arch = "wasm32")))]
pub use self::tokio_primatives::*;

#[cfg(feature = "async-std")]
#[cfg(all(feature = "async-std", not(target_arch = "wasm32")))]
pub mod async_std_primatives;
#[cfg(feature = "async-std")]
#[cfg(all(feature = "async-std", not(target_arch = "wasm32")))]
pub use self::async_std_primatives::*;

#[cfg(target_arch = "wasm32")]
pub mod wasm_primatives;
#[cfg(target_arch = "wasm32")]
pub use self::wasm_primatives::*;
130 changes: 130 additions & 0 deletions ractor/src/concurrency/wasm_primatives.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Copyright (c) Sean Lawlor
//
// This source code is licensed under both the MIT license found in the
// LICENSE-MIT file in the root directory of this source tree.

//! Concurrency primitaves based on a WASM runtime

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use futures::FutureExt;

/// Represents a task JoinHandle
pub struct JoinHandle<T> {
handle: Option<futures::future::RemoteHandle<T>>,
}

impl<T> Drop for JoinHandle<T> {
fn drop(&mut self) {
if let Some(h) = self.handle.take() {
h.forget();
}
}
}

impl<T> JoinHandle<T> {
/// Determine if the handle is currently finished
pub fn is_finished(&self) -> bool {
self.handle.is_none()
}

/// Abort the handle
pub fn abort(&mut self) {
// For a remote handle, being dropped will wake the remote future
// to be dropped by the executor
// See: https://docs.rs/futures/latest/futures/prelude/future/struct.RemoteHandle.html
drop(self.handle.take());
}
}

impl<T: 'static> Future for JoinHandle<T> {
type Output = Result<T, ()>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// a little black-magic to poll the inner future, but return a Result instead of a unit
let mutself = self.get_mut();
let inner_polled_value = if let Some(inner) = mutself.handle.as_mut() {
inner.poll_unpin(cx)
} else {
return Poll::Ready(Err(()));
};

match inner_polled_value {
Poll::Pending => Poll::Pending,
Poll::Ready(v) => {
mutself.abort();
Poll::Ready(Ok(v))
}
}
}
}

/// Spawn a task on the executor runtime
pub fn spawn<F>(future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: Send + 'static,
{
spawn_named(None, future)
}

/// Spawn a (possibly) named task on the executor runtime
pub fn spawn_named<F>(name: Option<&str>, future: F) -> JoinHandle<F::Output>
where
F: Future + 'static,
F::Output: Send + 'static,
{

let _ = name;
let (remote, remote_handle) = future.remote_handle();
wasm_bindgen_futures::spawn_local(remote);
JoinHandle { handle: Some(remote_handle) }
}

/// A duration of time
pub type Duration = tokio::time::Duration;

/// An instant measured on system time
pub type Instant = instant::Instant;

/// Sleep the task for a duration of time
pub async fn sleep(dur: super::Duration) {
wasmtimer::tokio::sleep(dur).await;
}

/// An asynchronous interval calculation which waits until
/// a checkpoint time to tick
pub type Interval = wasmtimer::tokio::Interval;

/// Build a new interval at the given duration starting at now
///
/// Ticks 1 time immediately
pub fn interval(dur: Duration) -> Interval {
wasmtimer::tokio::interval(dur)
}

/// A set of futures to join on, in an unordered fashion
/// (first-completed, first-served)
pub type JoinSet<T> = tokio::task::JoinSet<T>;

/// Execute the future up to a timeout
///
/// * `dur`: The duration of time to allow the future to execute for
/// * `future`: The future to execute
///
/// Returns [Ok(_)] if the future succeeded before the timeout, [Err(super::Timeout)] otherwise
pub async fn timeout<F, T>(dur: super::Duration, future: F) -> Result<T, super::Timeout>
where
F: Future<Output = T>,
{
wasmtimer::tokio::timeout(dur, future)
.await
.map_err(|_| super::Timeout)
}

pub use futures::select_biased as select;

// test macro
pub use wasm_bindgen_test::wasm_bindgen_test as test;
Loading

0 comments on commit 45ce5b8

Please sign in to comment.