Skip to content

Commit

Permalink
Update docs
Browse files Browse the repository at this point in the history
  • Loading branch information
rmanoka committed Nov 16, 2020
1 parent e01d72a commit de0808b
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 161 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
[package]
name = "async-scoped"
version = "0.4.1"
version = "0.5.0"
authors = ["Rajsekar Manokaran <rajsekar@gmail.com>"]
edition = "2018"

documentation = "https://docs.rs/async-scoped"
description = "Spawn scoped (non 'static) asynchronous futures using async_std runtime"
description = "Spawn scoped (non 'static) asynchronous futures for async_std and tokio runtimes"
homepage = "https://github.com/rmanoka/async-scoped"
repository = "https://github.com/rmanoka/async-scoped"
readme = "README.md"

categories = [ "asynchronous", "concurrency" ]
keywords = [ "async", "async-std", "scoped", "spawn" ]
keywords = [ "async", "async-std", "tokio", "scoped", "spawn" ]
license = "Apache-2.0/MIT"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
148 changes: 8 additions & 140 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Async-scoped

Enables controlled spawning of non-`'static` futures when
using the [async-std](//github.com/async-rs/async-std) executor.
using the [async-std](//github.com/async-rs/async-std) or
[tokio](//github.com/tokio-rs/tokio) executors.

## Motivation

Expand All @@ -19,145 +20,12 @@ number (> ~1K) of I/O futures, or a few CPU heavy futures.
## Usage

The API is meant to be a minimal wrapper around efficient
executors. Currently, we only support `async_std`, but the
API easily accomodates any spawn function that just accepts
a `'static` future.

``` rust
#[async_std::test]
async fn test_scope_and_block() {
let not_copy = String::from("hello world!");
let not_copy_ref = &not_copy;

let (_, vals) = async_scoped::scope_and_block(|s| {
for _ in 0..10 {
let proc = || async {
assert_eq!(not_copy_ref, "hello world!");
};
s.spawn(proc());
}
});

assert_eq!(vals.len(), 10);
}
```

## Scope API

We propose an API similar to `crossbeam::scope` to allow
controlled spawning of futures that are not `'static`. The
key function is:

``` rust
pub unsafe fn scope<'a, T: Send + 'static,
F: FnOnce(&mut Scope<'a, T>)>(f: F)
-> impl Stream {
// ...
}
```

This function is used as follows:

``` rust
#[async_std::test]
async fn scoped_futures() {
let not_copy = String::from("hello world!");
let not_copy_ref = &not_copy;

let (mut stream, _) = unsafe {
async_scoped::scope(|s| {
for _ in 0..10 {
let proc = || async {
assert_eq!(not_copy_ref, "hello world!");
};
s.spawn(proc());
}
})
};

// Uncomment this for compile error
// std::mem::drop(not_copy);

use futures::StreamExt;
let mut count = 0;
while let Some(_) = stream.next().await {
count += 1;
}
assert_eq!(count, 10);
}
```

## Cancellation

To support cancellation, `Scope` provides a
`spawn_cancellable` which wraps a future to make it
cancellable. When a `Scope` is dropped, (or if `cancel`
method is invoked), all the cancellable futures are
scheduled for cancellation. In the next poll of the
futures, they are dropped and a default value (provided
by a closure during spawn) is returned as the output of
the future.

Note that cancellation requires some reasonable
behaviour from the future and futures that do not return
control to the executor cannot be cancelled until their
next poll.

## Safety Considerations

The `scope` API provided in this crate is
unsafe as it is possible to `forget` the stream received
from the API without driving it to completion. The only
completely (without any additional assumptions) safe API
is the `scope_and_block` function,
which _blocks the current thread_ until all spawned
futures complete.

The `scope_and_block` may not be
convenient in an asynchronous setting. In this case, the
`scope_and_collect` API may be
used. Care must be taken to ensure the returned future
is not forgotten before being driven to completion. Note
that dropping this future will lead to it being driven
to completion, while blocking the current thread to
ensure safety. However, it is unsafe to forget this
future before it is fully driven.

## Implementation

Our current implementation simply uses _unsafe_ glue to
`transmute` the lifetime, to actually spawn the futures
in the executor. The original lifetime is recorded in
the `Scope`. This allows the compiler to enforce the
necessary lifetime requirements as long as this returned
stream is not forgotten.

For soundness, we drive the stream to completion in the
`Drop` impl. The current thread is blocked until the stream
is fully driven.

Unfortunately, since the `std::mem::forget` method is
allowed in safe Rust, the purely asynchronous API here is
_inherently unsafe_.

### Efficiency

Our current implementation is focussed on safety, and leaves
room for optimization. Below we list a few venues that we
hope could be further optimized.

1. The `spawn` involves an allocation (not including any
allocation done by the executor itself). This occurs while
transmuting the lifetime of the future, which to the best of
our knowledge is not possible without erasing the concrete
type of the future itself. Please see the implementation of
`Scope::spawn` in `src/lib.rs` for more details of the
transmute, and allocation.

1. The `CancellableFuture` wrapper also uses a synchronous
`Mutex` and hence is not lock-free. However, the lock is
only used to make one insertion into a `HashMap` while in
contention.
executors. Users **must use** either "use-async-std", or the
"use-tokio" feature gates, to obtain a usable scope type.
These gates provide `TokioScope` and `AsyncScope` that
support spawning, and blocking. See
[docs.rs](https://docs.rs/async-scoped) for detailed
documentation.

## License

Expand Down
32 changes: 23 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
//!
//! See [`scope`][Scope::scope] for the exact definition, and
//! safety guidelines. The simplest and safest API is
//! [`scope_and_block`][scope_and_block], used as follows:
//! [`scope_and_block`][Scope::scope_and_block], used as follows:
//!
//! ``` rust, ignore
//! async fn scoped_futures() {
Expand Down Expand Up @@ -62,10 +62,27 @@
//! However, the user should ensure that the returned future
//! _is not forgetten_ before being driven to completion.
//!
//! ## Executor Selection
//!
//! Users **must use** either "use-async-std", or the
//! "use-tokio" feature gates, to obtain a usable scope
//! type. These gates provide [`TokioScope`] and
//! [`AsyncScope`] that support spawning, and blocking. Here
//! is a run-down of key differences between the two
//! runtimes.
//!
//! 1. [`AsyncScope`] may run into a dead-lock if used in
//! deep recursions (depth > #num-cores / 2).
//!
//! 2. [`TokioScope::scope_and_block`][Scope::scope_and_block] may only be used
//! within a multi-threaded. An incompletely driven
//! `TokioScope` also needs a multi-threaded context to be
//! dropped.
//!
//! ## Cancellation
//!
//! To support cancellation, `Scope` provides a
//! [`spawn_cancellable`][spawn_cancellable] which wraps a
//! [`spawn_cancellable`][Scope::spawn_cancellable] which wraps a
//! future to make it cancellable. When a `Scope` is
//! dropped, (or if `cancel` method is invoked), all the
//! cancellable futures are scheduled for cancellation. In
Expand All @@ -78,22 +95,19 @@
//! return control to the executor cannot be cancelled once
//! it has started.
//!
//! **Note.** This feature is currently available only with
//! "use-async-std" feature.
//!
//! ## Safety Considerations
//!
//! The [`scope`][scope] API provided in this crate is
//! The [`scope`][Scope::scope] API provided in this crate is
//! unsafe as it is possible to `forget` the stream received
//! from the API without driving it to completion. The only
//! completely (without any additional assumptions) safe API
//! is the [`scope_and_block`][scope_and_block] function,
//! is the [`scope_and_block`][Scope::scope_and_block] function,
//! which _blocks the current thread_ until all spawned
//! futures complete.
//!
//! The [`scope_and_block`][scope_and_block] may not be
//! The [`scope_and_block`][Scope::scope_and_block] may not be
//! convenient in an asynchronous setting. In this case, the
//! [`scope_and_collect`][scope_and_collect] API may be
//! [`scope_and_collect`][Scope::scope_and_collect] API may be
//! used. Care must be taken to ensure the returned future
//! is not forgotten before being driven to completion.
//!
Expand Down
32 changes: 23 additions & 9 deletions src/scoped.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,29 @@ impl<'a, T, Sp: Spawner<T> + Blocker> Stream for Scope<'a, T, Sp> {
}
}

#[pinned_drop]
impl<'a, T, Sp: Spawner<T> + Blocker> PinnedDrop for Scope<'a, T, Sp> {
fn drop(mut self: Pin<&mut Self>) {
if !self.done {
<Sp as Blocker>::block_on(async {
#[cfg(feature = "use-async-std")]
self.cancel().await;
self.collect().await;
});
cfg_any_spawner! {
#[pinned_drop]
impl<'a, T, Sp: Spawner<T> + Blocker> PinnedDrop for Scope<'a, T, Sp> {
fn drop(mut self: Pin<&mut Self>) {
if !self.done {
<Sp as Blocker>::block_on(async {
self.cancel().await;
self.collect().await;
});
}
}
}
}

cfg_no_spawner! {
#[pinned_drop]
impl<'a, T, Sp: Spawner<T> + Blocker> PinnedDrop for Scope<'a, T, Sp> {
fn drop(mut self: Pin<&mut Self>) {
if !self.done {
<Sp as Blocker>::block_on(async {
self.collect().await;
});
}
}
}
}
10 changes: 10 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,13 @@ macro_rules! cfg_any_spawner {
)*
}
}

#[macro_use]
macro_rules! cfg_no_spawner {
($($item:item)*) => {
$(
#[cfg(not( any(feature = "use-async-std", feature = "use-tokio") ))]
$item
)*
}
}

0 comments on commit de0808b

Please sign in to comment.