Skip to content

Commit

Permalink
feat!: The @wasmer/sdk Runtime has removed the limit on the maxim…
Browse files Browse the repository at this point in the history
…um number of worker threads it is allowed to spawn. The corresponding `poolSize` option has been removed from `RuntimeOptions`.
  • Loading branch information
Michael-F-Bryan committed Dec 19, 2023
1 parent c13435f commit d5da4ea
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 87 deletions.
18 changes: 1 addition & 17 deletions src/js_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::{
num::NonZeroUsize,
ops::{Deref, DerefMut},
sync::Arc,
};
Expand Down Expand Up @@ -33,15 +32,7 @@ impl JsRuntime {
impl JsRuntime {
#[wasm_bindgen(constructor)]
pub fn js_new(options: Option<RuntimeOptions>) -> Result<JsRuntime, Error> {
let pool_size = options.as_ref().and_then(|options| options.pool_size());

let pool = match pool_size {
Some(size) => {
let size = NonZeroUsize::new(size).unwrap_or(NonZeroUsize::MIN);
ThreadPool::new(size)
}
None => ThreadPool::new_with_max_threads()?,
};
let pool = ThreadPool::new();

let registry = match options.as_ref().and_then(|opts| opts.registry()) {
Some(registry_url) => registry_url.resolve(),
Expand Down Expand Up @@ -108,10 +99,6 @@ const RUNTIME_OPTIONS_TYPE_DECLARATION: &str = r#"
* Options used when constructing a {@link Runtime}.
*/
export type RuntimeOptions = {
/**
* The number of worker threads to use.
*/
poolSize?: number;
/**
* The GraphQL endpoint for the Wasmer registry used when looking up
* packages.
Expand All @@ -137,9 +124,6 @@ extern "C" {
#[wasm_bindgen(typescript_type = "RuntimeOptions")]
pub type RuntimeOptions;

#[wasm_bindgen(method, getter, js_name = "poolSize")]
fn pool_size(this: &RuntimeOptions) -> Option<usize>;

#[wasm_bindgen(method, getter)]
fn registry(this: &RuntimeOptions) -> Option<MaybeRegistryUrl>;

Expand Down
2 changes: 1 addition & 1 deletion src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Runtime {
}

pub(crate) fn with_defaults() -> Result<Self, Error> {
let pool = ThreadPool::new_with_max_threads()?;
let pool = ThreadPool::new();
let mut rt = Runtime::new(pool);

rt.set_registry(crate::DEFAULT_REGISTRY, None)?;
Expand Down
2 changes: 1 addition & 1 deletion src/tasks/post_message_payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ mod tests {
let engine = wasmer::Engine::default();
let module = wasmer::Module::new(&engine, wasm).unwrap();
let flag = Arc::new(AtomicBool::new(false));
let pool = ThreadPool::new(NonZeroUsize::MAX);
let pool = ThreadPool::new();
let runtime = Runtime::new(pool);
let env = WasiEnvBuilder::new("program")
.runtime(Arc::new(runtime))
Expand Down
64 changes: 16 additions & 48 deletions src/tasks/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::{
collections::{BTreeMap, VecDeque},
fmt::Debug,
num::NonZeroUsize,
sync::atomic::{AtomicU32, Ordering},
};

Expand All @@ -22,21 +21,20 @@ use crate::tasks::{
#[derive(Debug, Clone)]
pub(crate) struct Scheduler {
scheduler_thread_id: u32,
capacity: NonZeroUsize,
channel: UnboundedSender<SchedulerMessage>,
}

impl Scheduler {
/// Spin up a scheduler on the current thread and get a channel that can be
/// used to communicate with it.
pub(crate) fn spawn(capacity: NonZeroUsize) -> Scheduler {
pub(crate) fn spawn() -> Scheduler {
let (sender, mut receiver) = mpsc::unbounded_channel();

let thread_id = wasmer::current_thread_id();
// Safety: we just got the thread ID.
let sender = unsafe { Scheduler::new(sender, thread_id, capacity) };
let sender = unsafe { Scheduler::new(sender, thread_id) };

let mut scheduler = SchedulerState::new(capacity, sender.clone());
let mut scheduler = SchedulerState::new(sender.clone());

tracing::debug!(thread_id, "Spinning up the scheduler");
wasm_bindgen_futures::spawn_local(
Expand Down Expand Up @@ -67,16 +65,11 @@ impl Scheduler {
///
/// The `scheduler_thread_id` must match the [`wasmer::current_thread_id()`]
/// otherwise these `!Send` values will be sent between threads.
unsafe fn new(
channel: UnboundedSender<SchedulerMessage>,
scheduler_thread_id: u32,
capacity: NonZeroUsize,
) -> Self {
unsafe fn new(channel: UnboundedSender<SchedulerMessage>, scheduler_thread_id: u32) -> Self {
debug_assert_eq!(scheduler_thread_id, wasmer::current_thread_id());
Scheduler {
channel,
scheduler_thread_id,
capacity,
}
}

Expand All @@ -102,10 +95,6 @@ impl Scheduler {
Ok(())
}
}

pub(crate) fn capacity(&self) -> NonZeroUsize {
self.capacity
}
}

// Safety: The only way our !Send messages will be sent to the scheduler is if
Expand All @@ -117,8 +106,6 @@ unsafe impl Sync for Scheduler {}
/// The state for the actor in charge of the threadpool.
#[derive(Debug)]
struct SchedulerState {
/// The maximum number of workers we will start.
capacity: NonZeroUsize,
/// Workers that are able to receive work.
idle: VecDeque<WorkerHandle>,
/// Workers that are currently blocked on synchronous operations and can't
Expand All @@ -130,9 +117,8 @@ struct SchedulerState {
}

impl SchedulerState {
fn new(capacity: NonZeroUsize, mailbox: Scheduler) -> Self {
fn new(mailbox: Scheduler) -> Self {
SchedulerState {
capacity,
idle: VecDeque::new(),
busy: VecDeque::new(),
mailbox,
Expand Down Expand Up @@ -213,14 +199,14 @@ impl SchedulerState {
/// Send a task to one of the worker threads, preferring workers that aren't
/// running synchronous work.
fn post_message(&mut self, msg: PostMessagePayload) -> Result<(), Error> {
let (worker, already_blocked) = self.next_available_worker()?;
let worker = self.next_available_worker()?;

let would_block = msg.would_block();
worker
.send(msg)
.with_context(|| format!("Unable to send a message to worker {}", worker.id()))?;

if would_block || already_blocked {
if would_block {
self.busy.push_back(worker);
} else {
self.idle.push_back(worker);
Expand All @@ -229,43 +215,25 @@ impl SchedulerState {
Ok(())
}

fn next_available_worker(&mut self) -> Result<(WorkerHandle, bool), Error> {
fn next_available_worker(&mut self) -> Result<WorkerHandle, Error> {
// First, try to send the message to an idle worker
if let Some(worker) = self.idle.pop_front() {
tracing::trace!(
worker.id = worker.id(),
"Sending the message to an idle worker"
);
return Ok((worker, false));
return Ok(worker);
}

if self.busy.len() + self.idle.len() < self.capacity.get() {
// Rather than sending the task to one of the blocking workers,
// let's spawn a new worker

let worker = self.start_worker()?;
tracing::trace!(
worker.id = worker.id(),
"Sending the message to a new worker"
);
return Ok((worker, false));
}

// Oh well, looks like there aren't any more idle workers and we can't
// spin up any new workers, so we'll need to add load to a worker that
// is already blocking.
//
// Note: This shouldn't panic because if there were no idle workers and
// we didn't start a new worker, there should always be at least one
// busy worker because our capacity is non-zero.
let worker = self.busy.pop_front().unwrap();
// Rather than sending the task to one of the blocking workers,
// let's spawn a new worker

let worker = self.start_worker()?;
tracing::trace!(
worker.id = worker.id(),
"Sending the message to a busy worker"
"Sending the message to a new worker"
);

Ok((worker, true))
Ok(worker)
}

fn start_worker(&mut self) -> Result<WorkerHandle, Error> {
Expand Down Expand Up @@ -309,8 +277,8 @@ mod tests {
async fn spawn_an_async_function() {
let (sender, receiver) = oneshot::channel();
let (tx, _) = mpsc::unbounded_channel();
let tx = unsafe { Scheduler::new(tx, wasmer::current_thread_id(), NonZeroUsize::MAX) };
let mut scheduler = SchedulerState::new(NonZeroUsize::MAX, tx);
let tx = unsafe { Scheduler::new(tx, wasmer::current_thread_id()) };
let mut scheduler = SchedulerState::new(tx);
let message = SchedulerMessage::SpawnAsync(Box::new(move || {
Box::pin(async move {
let _ = sender.send(42);
Expand Down
30 changes: 10 additions & 20 deletions src/tasks/thread_pool.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::{fmt::Debug, future::Future, num::NonZeroUsize, pin::Pin};
use std::{fmt::Debug, future::Future, pin::Pin};

use anyhow::Context;
use futures::future::LocalBoxFuture;
use instant::Duration;
use wasm_bindgen_futures::JsFuture;
Expand All @@ -18,23 +17,11 @@ pub struct ThreadPool {
}

impl ThreadPool {
pub fn new(capacity: NonZeroUsize) -> Self {
let sender = Scheduler::spawn(capacity);
pub fn new() -> Self {
let sender = Scheduler::spawn();
ThreadPool { scheduler: sender }
}

pub fn new_with_max_threads() -> Result<ThreadPool, anyhow::Error> {
let concurrency = crate::utils::GlobalScope::current()
.hardware_concurrency()
.context("Unable to determine the hardware concurrency")?;
// Note: We want to deliberately over-commit to avoid accidental
// deadlocks.
let concurrency = concurrency
.checked_mul(NonZeroUsize::new(16).unwrap())
.unwrap();
Ok(ThreadPool::new(concurrency))
}

/// Run an `async` function to completion on the threadpool.
pub fn spawn(
&self,
Expand Down Expand Up @@ -113,7 +100,10 @@ impl VirtualTaskManager for ThreadPool {

/// Returns the amount of parallelism that is possible on this platform
fn thread_parallelism(&self) -> Result<usize, WasiThreadError> {
Ok(self.scheduler.capacity().get())
match crate::utils::GlobalScope::current().hardware_concurrency() {
Some(n) => Ok(n.get()),
None => Err(WasiThreadError::Unsupported),
}
}

fn spawn_with_module(
Expand Down Expand Up @@ -148,7 +138,7 @@ mod tests {
.dyn_into()
.unwrap();
let module = wasmer::Module::from(module);
let pool = ThreadPool::new_with_max_threads().unwrap();
let pool = ThreadPool::new();

let (sender, receiver) = oneshot::channel();
pool.spawn_with_module(
Expand All @@ -166,7 +156,7 @@ mod tests {

#[wasm_bindgen_test]
async fn spawned_tasks_can_communicate_with_the_main_thread() {
let pool = ThreadPool::new(2.try_into().unwrap());
let pool = ThreadPool::new();
let (sender, receiver) = oneshot::channel();

pool.task_shared(Box::new(move || {
Expand Down Expand Up @@ -203,7 +193,7 @@ mod tests {
let (sender_1, receiver_1) = oneshot::channel();
let (sender_2, mut receiver_2) = oneshot::channel();
// Set things up so we can run 2 blocking tasks at the same time.
let pool = ThreadPool::new(NonZeroUsize::new(2).unwrap());
let pool = ThreadPool::new();

// Note: The second task depends on the first one completing
let first_task = Box::new(move || sender_1.send(()).unwrap());
Expand Down

0 comments on commit d5da4ea

Please sign in to comment.