Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support tokio #4

Merged
merged 4 commits into from
Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,32 +1,38 @@
[package]
name = "async-scoped"
version = "0.4.1"
version = "0.5.1"
authors = ["Rajsekar Manokaran <rajsekar@gmail.com>"]
edition = "2018"

documentation = "https://docs.rs/async-scoped"
description = "Spawn scoped (non 'static) asynchronous futures using async_std runtime"
description = "Spawn scoped (non 'static) asynchronous futures for async_std and tokio runtimes"
homepage = "https://github.com/rmanoka/async-scoped"
repository = "https://github.com/rmanoka/async-scoped"
readme = "README.md"

categories = [ "asynchronous", "concurrency" ]
keywords = [ "async", "async-std", "scoped", "spawn" ]
keywords = [ "async", "async-std", "tokio", "scoped", "spawn" ]
license = "Apache-2.0/MIT"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures-core = "0.3.1"
futures = "0.3.1"
pin-project = "0.4.6"
slab = "0.4.2"
async-std = { version = "1.6.3", optional = true }
futures = "0.3.1"
tokio = {version = "0.3.3", features = ["rt-multi-thread", "macros", "sync"], optional = true}

[features]
# Verify package.metadata.docs.rs when updating
use-async-std = ["async-std"]
use-tokio = ["tokio"]

[dependencies.async-std]
features = ["attributes"]
version = "1.4.0"
# path = "../async-std"

[dev-dependencies]
femme = "1.3.0"
log = { version = "0.4.8", features = ["kv_unstable"] }
async-std = { version = "1.6.3", features = ["attributes"] }

[package.metadata.docs.rs]
features = ["use-async-std", "use-tokio"]
148 changes: 8 additions & 140 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Async-scoped

Enables controlled spawning of non-`'static` futures when
using the [async-std](//github.com/async-rs/async-std) executor.
using the [async-std](//github.com/async-rs/async-std) or
[tokio](//github.com/tokio-rs/tokio) executors.

## Motivation

Expand All @@ -19,145 +20,12 @@ number (> ~1K) of I/O futures, or a few CPU heavy futures.
## Usage

The API is meant to be a minimal wrapper around efficient
executors. Currently, we only support `async_std`, but the
API easily accomodates any spawn function that just accepts
a `'static` future.

``` rust
#[async_std::test]
async fn test_scope_and_block() {
let not_copy = String::from("hello world!");
let not_copy_ref = &not_copy;

let (_, vals) = async_scoped::scope_and_block(|s| {
for _ in 0..10 {
let proc = || async {
assert_eq!(not_copy_ref, "hello world!");
};
s.spawn(proc());
}
});

assert_eq!(vals.len(), 10);
}
```

## Scope API

We propose an API similar to `crossbeam::scope` to allow
controlled spawning of futures that are not `'static`. The
key function is:

``` rust
pub unsafe fn scope<'a, T: Send + 'static,
F: FnOnce(&mut Scope<'a, T>)>(f: F)
-> impl Stream {
// ...
}
```

This function is used as follows:

``` rust
#[async_std::test]
async fn scoped_futures() {
let not_copy = String::from("hello world!");
let not_copy_ref = &not_copy;

let (mut stream, _) = unsafe {
async_scoped::scope(|s| {
for _ in 0..10 {
let proc = || async {
assert_eq!(not_copy_ref, "hello world!");
};
s.spawn(proc());
}
})
};

// Uncomment this for compile error
// std::mem::drop(not_copy);

use futures::StreamExt;
let mut count = 0;
while let Some(_) = stream.next().await {
count += 1;
}
assert_eq!(count, 10);
}
```

## Cancellation

To support cancellation, `Scope` provides a
`spawn_cancellable` which wraps a future to make it
cancellable. When a `Scope` is dropped, (or if `cancel`
method is invoked), all the cancellable futures are
scheduled for cancellation. In the next poll of the
futures, they are dropped and a default value (provided
by a closure during spawn) is returned as the output of
the future.

Note that cancellation requires some reasonable
behaviour from the future and futures that do not return
control to the executor cannot be cancelled until their
next poll.

## Safety Considerations

The `scope` API provided in this crate is
unsafe as it is possible to `forget` the stream received
from the API without driving it to completion. The only
completely (without any additional assumptions) safe API
is the `scope_and_block` function,
which _blocks the current thread_ until all spawned
futures complete.

The `scope_and_block` may not be
convenient in an asynchronous setting. In this case, the
`scope_and_collect` API may be
used. Care must be taken to ensure the returned future
is not forgotten before being driven to completion. Note
that dropping this future will lead to it being driven
to completion, while blocking the current thread to
ensure safety. However, it is unsafe to forget this
future before it is fully driven.

## Implementation

Our current implementation simply uses _unsafe_ glue to
`transmute` the lifetime, to actually spawn the futures
in the executor. The original lifetime is recorded in
the `Scope`. This allows the compiler to enforce the
necessary lifetime requirements as long as this returned
stream is not forgotten.

For soundness, we drive the stream to completion in the
`Drop` impl. The current thread is blocked until the stream
is fully driven.

Unfortunately, since the `std::mem::forget` method is
allowed in safe Rust, the purely asynchronous API here is
_inherently unsafe_.

### Efficiency

Our current implementation is focussed on safety, and leaves
room for optimization. Below we list a few venues that we
hope could be further optimized.

1. The `spawn` involves an allocation (not including any
allocation done by the executor itself). This occurs while
transmuting the lifetime of the future, which to the best of
our knowledge is not possible without erasing the concrete
type of the future itself. Please see the implementation of
`Scope::spawn` in `src/lib.rs` for more details of the
transmute, and allocation.

1. The `CancellableFuture` wrapper also uses a synchronous
`Mutex` and hence is not lock-free. However, the lock is
only used to make one insertion into a `HashMap` while in
contention.
executors. Users **must use** either "use-async-std", or the
"use-tokio" feature gates, to obtain a usable scope type.
These gates provide `TokioScope` and `AsyncScope` that
support spawning, and blocking. See
[docs.rs](https://docs.rs/async-scoped) for detailed
documentation.

## License

Expand Down
1 change: 0 additions & 1 deletion src/cancellable_future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,4 @@ impl<I, F: Future<Output=I>, Fu: FnOnce() -> I> Future
Poll::Ready(this.default.take().unwrap()())
}
}

}
37 changes: 35 additions & 2 deletions src/cancellation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,16 @@ use std::sync::Mutex;
use std::task::{Poll, Waker, Context};
use std::pin::Pin;
use std::future::Future;
use async_std::sync::RwLock;

cfg_async_std! {
use async_std::sync::RwLock;
}

cfg_async_std_or_else! {
use tokio::sync::RwLock;
}


use slab::Slab;

pub struct Cancellation {
Expand Down Expand Up @@ -44,7 +53,31 @@ impl Cancellation {
fut: Pin<&mut F>, cx: &mut Context,
) -> Option<(Poll<I>, Option<usize>)> {

if let Some(guard) = self.flag.try_read() {
cfg_async_std_or_else! {
macro_rules! with_guard {
() => {{
use futures::Future;
use std::task::Poll;
use tokio::pin;
let read_fut = self.flag.read();
pin!(read_fut);
let result = read_fut.poll(cx);
match result {
Poll::Pending => None,
Poll::Ready(guard) => Some(guard)
}
}}
}
}
cfg_async_std! {
macro_rules! with_guard {
() => {{
self.flag.try_read()
}}
}
}

if let Some(guard) = with_guard!() {
if *guard {
let poll_result = fut.poll(cx);

Expand Down
Loading