Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tokio for platforms without multi-threading support #12

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 119 additions & 29 deletions src/rt_tokio/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::future::Future;
use std::marker::PhantomData;
use std::sync::Arc;
use std::{fmt, io};

Expand All @@ -8,20 +9,20 @@ pub(crate) mod time;

mod local_worker;

pub(crate) use local_worker::LocalHandle;
use local_worker::LocalWorker;

/// We test whether thread is supported for current target.
static THREAD_SUPPORTED: Lazy<bool> =
Lazy::new(|| std::thread::Builder::new().spawn(|| {}).is_ok());

pub(crate) fn get_default_runtime_size() -> usize {
// We use num_cpus as std::thread::available_parallelism() does not take
// system resource constraint (e.g.: cgroups) into consideration.
#[cfg(not(target_os = "wasi"))]
{
if *THREAD_SUPPORTED {
// We use num_cpus as std::thread::available_parallelism() does not take
// system resource constraint (e.g.: cgroups) into consideration.
num_cpus::get()
}
// WASI does not support multi-threading at this moment.
#[cfg(target_os = "wasi")]
{
1
} else {
// For platforms without thread support, we report available workers as 0.
0
}
}

Expand All @@ -42,18 +43,30 @@ where
}

#[derive(Clone)]
pub(crate) struct Runtime {
workers: Arc<Vec<LocalWorker>>,
enum RuntimeInner {
/// Target has multi-threading support.
Threaded { workers: Arc<Vec<LocalWorker>> },
/// Target does not have multi-threading support.
Main,
}

impl fmt::Debug for Runtime {
impl fmt::Debug for RuntimeInner {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Runtime")
.field("workers", &"Vec<LocalWorker>")
.finish()
match self {
Self::Threaded { .. } => f
.debug_struct("RuntimeInner::Threaded")
.field("workers", &"Vec<LocalWorker>")
.finish(),
Self::Main => f.debug_struct("RuntimeInner::Main").finish(),
}
}
}

#[derive(Clone, Debug)]
pub(crate) struct Runtime {
inner: RuntimeInner,
}

impl Default for Runtime {
fn default() -> Self {
static DEFAULT_RT: Lazy<Runtime> = Lazy::new(|| {
Expand All @@ -66,22 +79,30 @@ impl Default for Runtime {

impl Runtime {
pub fn new(size: usize) -> io::Result<Self> {
assert!(size > 0, "must have more than 1 worker.");
if *THREAD_SUPPORTED {
assert!(size > 0, "must have more than 1 worker.");

let mut workers = Vec::with_capacity(size);
let mut workers = Vec::with_capacity(size);

for _ in 0..size {
let worker = LocalWorker::new()?;
workers.push(worker);
}
for _ in 0..size {
let worker = LocalWorker::new()?;
workers.push(worker);
}

Ok(Self {
workers: workers.into(),
})
Ok(Self {
inner: RuntimeInner::Threaded {
workers: workers.into(),
},
})
} else {
Ok(Self {
inner: RuntimeInner::Main,
})
}
}

fn find_least_busy_local_worker(&self) -> &LocalWorker {
let mut workers = self.workers.iter();
fn find_least_busy_local_worker(workers: &[LocalWorker]) -> &LocalWorker {
let mut workers = workers.iter();

let mut worker = workers.next().expect("must have more than 1 worker.");
let mut task_count = worker.task_count();
Expand Down Expand Up @@ -109,8 +130,77 @@ impl Runtime {
F: Send + 'static,
Fut: Future<Output = ()> + 'static,
{
let worker = self.find_least_busy_local_worker();
worker.spawn_pinned(create_task);
match self.inner {
RuntimeInner::Threaded { ref workers } => {
let worker = Self::find_least_busy_local_worker(workers);

worker.spawn_pinned(create_task);
}

RuntimeInner::Main => {
tokio::task::spawn_local(create_task());
}
}
}
}

#[derive(Debug, Clone)]
enum LocalHandleInner {
Threaded(local_worker::LocalHandle),
Main,
}

#[derive(Debug, Clone)]
pub(crate) struct LocalHandle {
inner: LocalHandleInner,
// This type is not send or sync.
_marker: PhantomData<*const ()>,
}

impl LocalHandle {
pub fn try_current() -> Option<Self> {
if *THREAD_SUPPORTED {
Some(Self {
inner: LocalHandleInner::Threaded(local_worker::LocalHandle::try_current()?),
_marker: PhantomData,
})
} else {
Some(Self {
inner: LocalHandleInner::Main,
_marker: PhantomData,
})
}
}

pub fn current() -> Self {
if *THREAD_SUPPORTED {
Self {
inner: LocalHandleInner::Threaded(local_worker::LocalHandle::current()),
_marker: PhantomData,
}
} else {
Self {
inner: LocalHandleInner::Main,
_marker: PhantomData,
}
}
}

pub fn spawn_local<F>(&self, f: F)
where
F: Future<Output = ()> + 'static,
{
match self.inner {
LocalHandleInner::Threaded(ref m) => {
m.spawn_local(f);
}
LocalHandleInner::Main => {
// For platforms without multi threading support, we assume a behaviour similar to
// wasm-bindgen-futures, where the main function is running under a tokio local set
// and tokio::task::spawn_local is available for the entire program.
tokio::task::spawn_local(f);
}
}
}
}

Expand Down
Loading