Skip to content

Commit

Permalink
feat: introduce try_executor to detect whether in scope of smol
Browse files Browse the repository at this point in the history
This way third parties can decide how to spawn in runtime time but not
coding time.

It is similar to `tokio::runtime::Handle::try_current`.
  • Loading branch information
kezhuw committed May 6, 2024
1 parent 641326b commit c600f61
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 40 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ jobs:
- name: Run cargo check (without dev-dependencies to catch missing feature flags)
if: startsWith(matrix.rust, 'nightly')
run: cargo check -Z features=dev_dep
- run: cargo test
- name: Run tests with single smol thread
run: cargo test
- name: Run tests with multiple smol threads
run: SMOL_THREADS=2 cargo test

msrv:
runs-on: ubuntu-latest
Expand Down
123 changes: 123 additions & 0 deletions src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::cell::Cell;
use std::future::Future;
use std::panic::catch_unwind;
use std::thread;

use async_executor::{Executor, Task};
use async_lock::OnceCell;
use futures_lite::future;

thread_local! {
static SMOL: Cell<bool> = const { Cell::new(false) };
}

pub(crate) struct EnterScope {
first: bool,
}

impl Drop for EnterScope {
fn drop(&mut self) {
if self.first {
SMOL.set(false);
}
}
}

pub(crate) fn enter() -> EnterScope {
let smol = SMOL.get();
SMOL.set(true);
EnterScope { first: !smol }
}

/// Gets executor if runs in control of smol, that is [block_on], [unblock] and tasks spawnned from
/// [crate::spawn()].
pub fn try_executor() -> Option<&'static Executor<'static>> {
if SMOL.get() {
Some(global())
} else {
None
}
}

/// Same as [async_io::block_on] expect it setup thread context executor for [try_executor].
pub fn block_on<T>(future: impl Future<Output = T>) -> T {
let _scope = enter();
async_io::block_on(future)
}

/// Same as [blocking::unblock] expect it setup thread context executor for [try_executor].
pub fn unblock<T, F>(f: F) -> Task<T>
where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
blocking::unblock(move || {
let _scope = enter();
f()
})
}

pub(crate) fn global() -> &'static Executor<'static> {
static GLOBAL: OnceCell<Executor<'_>> = OnceCell::new();
GLOBAL.get_or_init_blocking(|| {
let num_threads = {
// Parse SMOL_THREADS or default to 1.
std::env::var("SMOL_THREADS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1)
};

for n in 1..=num_threads {
thread::Builder::new()
.name(format!("smol-{}", n))
.spawn(|| loop {
catch_unwind(|| block_on(global().run(future::pending::<()>()))).ok();
})
.expect("cannot spawn executor thread");
}

// Prevent spawning another thread by running the process driver on this thread.
let ex = Executor::new();
#[cfg(not(target_os = "espidf"))]
ex.spawn(async_process::driver()).detach();
ex
})
}

#[cfg(test)]
mod tests {
use super::*;
use futures_lite::future;

#[test]
fn try_executor_no() {
assert!(try_executor().is_none());
}

#[test]
fn try_executor_block_on() {
let executor = block_on(async { try_executor().unwrap() });
assert!(std::ptr::eq(executor, global()));
}

#[test]
fn try_executor_block_on_recursively() {
let executor = block_on(async { block_on(async { try_executor().unwrap() }) });
assert!(std::ptr::eq(executor, global()));
}

#[test]
fn try_executor_unblock() {
let executor = future::block_on(unblock(|| try_executor().unwrap()));
assert!(std::ptr::eq(executor, global()));
}

#[test]
fn try_executor_spawn() {
for _ in 0..100 {
let executor = future::block_on(crate::spawn(async { try_executor().unwrap() }));
assert!(std::ptr::eq(executor, global()));
}
}
}
6 changes: 4 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ doc_comment::doctest!("../README.md");
#[doc(inline)]
pub use {
async_executor::{Executor, LocalExecutor, Task},
async_io::{block_on, Async, Timer},
blocking::{unblock, Unblock},
async_io::{Async, Timer},
blocking::Unblock,
futures_lite::{future, io, pin, prelude, ready, stream},
};

Expand All @@ -58,5 +58,7 @@ pub use {async_channel as channel, async_fs as fs, async_lock as lock, async_net
#[doc(inline)]
pub use async_process as process;

mod executor;
mod spawn;
pub use executor::{block_on, try_executor, unblock};
pub use spawn::spawn;
40 changes: 3 additions & 37 deletions src/spawn.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
use std::future::Future;
use std::panic::catch_unwind;
use std::thread;

use async_executor::{Executor, Task};
use async_io::block_on;
use async_lock::OnceCell;
use futures_lite::future;
use async_executor::Task;

/// Spawns a task onto the global executor (single-threaded by default).
///
/// There is a global executor that gets lazily initialized on first use. It is included in this
/// library for convenience when writing unit tests and small programs, but it is otherwise
/// more advisable to create your own [`Executor`].
/// more advisable to create your own [`crate::Executor`].
///
/// By default, the global executor is run by a single background thread, but you can also
/// configure the number of threads by setting the `SMOL_THREADS` environment variable.
Expand All @@ -28,34 +23,5 @@ use futures_lite::future;
/// });
/// ```
pub fn spawn<T: Send + 'static>(future: impl Future<Output = T> + Send + 'static) -> Task<T> {
static GLOBAL: OnceCell<Executor<'_>> = OnceCell::new();

fn global() -> &'static Executor<'static> {
GLOBAL.get_or_init_blocking(|| {
let num_threads = {
// Parse SMOL_THREADS or default to 1.
std::env::var("SMOL_THREADS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1)
};

for n in 1..=num_threads {
thread::Builder::new()
.name(format!("smol-{}", n))
.spawn(|| loop {
catch_unwind(|| block_on(global().run(future::pending::<()>()))).ok();
})
.expect("cannot spawn executor thread");
}

// Prevent spawning another thread by running the process driver on this thread.
let ex = Executor::new();
#[cfg(not(target_os = "espidf"))]
ex.spawn(async_process::driver()).detach();
ex
})
}

global().spawn(future)
crate::executor::global().spawn(future)
}

0 comments on commit c600f61

Please sign in to comment.