Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

task: Introduce a new pattern for task-local storage #2126

Merged
merged 1 commit into from
Jan 17, 2020

Conversation

LucioFranco
Copy link
Member

@LucioFranco LucioFranco commented Jan 16, 2020

Introduce a new pattern for task-local storage

This PR introduces a new pattern for task-local storage. It allows for storage
and retrieval of data in an asynchronous context. It does so using a new pattern
based on past experience. This API is similar to the one from std except for
changes that were needed to be made due to the differences required by async
code.

A quick example:

tokio::task_local! {
  static FOO: u32;
}

FOO.scope(1, async move {
    some_async_fn().await;
    assert_eq!(FOO.get(), 1);
}).await;

Background of task-local storage

The goal for task-local storage is to be able to provide some ambiant context in
an asynchronous context. One primary use case is for distributed tracing style
systems where a request identifier is made available during the context of a
request / response exchange. In a synchronous context, thread-local storage
would be used for this. However, with asynchronous Rust, logic is run in a
"task", which is decoupled from an underlying thread. A task may run on many
threads and many tasks may be multiplexed on a single thread. This hints at the
need for task-local storage.

Early attempt

Futures 0.1 included a task-local storage strategy. This was based around
using the "runtime task" (more on this later) as the scope. When a task was
spawned with tokio::spawn, a task-local map would be created and assigned
with that task. Any task-local value that was stored would be stored in this
map. Whenever the runtime polled the task, it would set the task context
enabling access to find the value.

There are two main problems with this strategy which ultimately lead to the
removal of runtime task-local storage:

  1. In asynchronous Rust, a "task" is not a clear-cut thing.
  2. The implementation did not leverage the significant optimizations that the
    compiler provides for thread-local storage.

What is a "task"?

With synchronous Rust, a "thread" is a clear concept: the construct you get with
thread::spawn. With asynchronous Rust, there is no strict definition of a
"task". A task is most commonly the construct you get when calling
tokio::spawn. The construct obtained with tokio::spawn will be referred to
as the "runtime task". However, it is also possible to multiplex asynchronous
logic within the context of a runtime task. APIs such as
task::LocalSet , FuturesUnordered,
select!, and join! provide the ability to embed a mini
scheduler within a single runtime task.

Revisiting the primary use case, setting a request identifier for the duration
of a request response exchange, here is a scenario in which using the "runtime
task" as the scope for task-local storage would fail:

task_local!(static REQUEST_ID: Cell<u64> = Cell::new(0));

let request1 = get_request().await;
let request2 = get_request().await;

let (response1, response2) = join!{
    async {
        REQUEST_ID.with(|cell| cell.set(request1.identifier()));
        process(request1)
    },
    async {
        REQUEST_ID.with(|cell| cell.set(request2.identifier()));
        process(request2)
    },
 };

join! multiplexes the execution of both branches on the same runtime task.
Given this, if REQUEST_ID is scoped by the runtime task, the request ID would
leak across the request / response exchange processing.

This is not a theoretical problem, but was hit repeatedly in practice. For
example, Hyper's HTTP/2.0 implementation multiplexes many request / response
exchanges on the same runtime task.

Compiler thread-local optimizations

A second smaller problem with the original task-local storage strategy is that
it required re-implementing "thread-local storage" like constructs but without
being able to get the compiler to help optimize. A discussion of how the
compiler optimizes thread-local storage is out of scope for this PR description,
but suffice to say a task-local storage implementation should be able to
leverage thread-locals as much as possible.

A new task-local strategy

Introduced in this PR is a new strategy for dealing with task-local storage.
Instead of using the runtime task as the thread-local scope, the proposed
task-local API allows the user to define any arbitrary scope. This solves the
problem of binding task-locals to the runtime task:

tokio::task_local!(static FOO: u32);

FOO.scope(1, async move {

    some_async_fn().await;
    assert_eq!(FOO.get(), 1);

}).await;

The scope function establishes a task-local scope for the FOO variable. It
takes a value to initialize FOO with and an async block. The FOO task-local
is then available for the duration of the provided block. scope returns a new
future that must then be awaited on.

tokio::task_local will define a new thread-local. The future returned from
scope will set this thread-local at the start of poll and unset it at the
end of poll. FOO.get is a simple thread-local access with no special logic.

This strategy solves both problems. Task-locals can be scoped at any level and
can leverage thread-local compiler optimizations.

Going back to the previous example:

task_local! {
  static REQUEST_ID: u64;
}

let request1 = get_request().await;
let request2 = get_request().await;

let (response1, response2) = join!{
    async {
        let identifier = request1.identifier();

        REQUEST_ID.scope(identifier, async {
            process(request1).await
        }).await
    },
    async {
        let identifier = request2.identifier();

        REQUEST_ID.scope(identifier, async {
            process(request2).await
        }).await
    },
 };

There is no longer a problem with request identifiers leaking.

Disadvantages

The primary disadvantage of this strategy is that the "set and forget" pattern
with thread-locals is not possible.

thread_local! {
  static FOO: Cell<usize> = Cell::new(0);
}

thread::spawn(|| {
    FOO.with(|cell| cell.set(123));

    do_work();
});

In this example, FOO is set at the start of the thread and automatically
cleared when the thread terminates. While this is nice in some cases, it only
really logically makes sense because the scope of a "thread" is clear (the
thread).

A similar pattern can be done with the proposed stratgy but would require an
explicit setting of the scope at the root of tokio::spawn. Additionally, one
should only do this if the runtime task is the appropriate scope for the
specific task-local variable.

Another disadvantage is that this new method does not support lazy initialization
but requires an explicit LocalKey::scope call to set the task-local value. In
this case since task-local's are different from thread-locals it is fine.

Overall, I think this is a much better improvement over what we originally had
in futures 0.1. This version is also much easier to work with and reason about!

This PR introduces a new pattern for task-local storage. It allows for storage
and retrieval of data in an asynchronous context. It does so using a new pattern
based on past experience.

A quick example:

```rust
tokio::task_local! {
  static FOO: u32;
}

FOO.scope(1, async move {
    some_async_fn().await;
    assert_eq!(FOO.get(), 1);
}).await;
```

## Background of task-local storage

The goal for task-local storage is to be able to provide some ambiant context in
an asynchronous context. One primary use case is for distributed tracing style
systems where a request identifier is made available during the context of a
request / response exchange. In a synchronous context, thread-local storage
would be used for this. However, with asynchronous Rust, logic is run in a
"task", which is decoupled from an underlying thread. A task may run on many
threads and many tasks may be multiplexed on a single thread. This hints at the
need for task-local storage.

### Early attempt

Futures 0.1 included a [task-local storage][01] strategy. This was based around
using the "runtime task" (more on this later) as the scope. When a task was
spawned with `tokio::spawn`, a task-local map would be created and assigned
with that task. Any task-local value that was stored would be stored in this
map. Whenever the runtime polled the task, it would set the task context
enabling access to find the value.

There are two main problems with this strategy which ultimetly lead to the
removal of runtime task-local storage:

1) In asynchronous Rust, a "task" is not a clear-cut thing.
2) The implementation did not leverage the significant optimizations that the
compiler provides for thread-local storage.

### What is a "task"?

With synchronous Rust, a "thread" is a clear concept: the construct you get with
`thread::spawn`. With asynchronous Rust, there is no strict definition of a
"task". A task is most commonly the construct you get when calling
`tokio::spawn`. The construct obtained with `tokio::spawn` will be referred to
as the "runtime task". However, it is also possible to multiplex asynchronous
logic within the context of a runtime task. APIs such as
[`task::LocalSet`][local-set] , [`FuturesUnordered`][futures-unordered],
[`select!`][select], and [`join!`][join] provide the ability to embed a mini
scheduler within a single runtime task.

Revisiting the primary use case, setting a request identifier for the duration
of a request response exchange, here is a scenario in which using the "runtime
task" as the scope for task-local storage would fail:

```rust
task_local!(static REQUEST_ID: Cell<u64> = Cell::new(0));

let request1 = get_request().await;
let request2 = get_request().await;

let (response1, response2) = join!{
    async {
        REQUEST_ID.with(|cell| cell.set(request1.identifier()));
        process(request1)
    },
    async {
        REQUEST_ID.with(|cell| cell.set(request2.identifier()));
        process(request2)
    },
 };
```

`join!` multiplexes the execution of both branches on the same runtime task.
Given this, if `REQUEST_ID` is scoped by the runtime task, the request ID would
leak across the request / response exchange processing.

This is not a theoretical problem, but was hit repeatedly in practice. For
example, Hyper's HTTP/2.0 implementation multiplexes many request / response
exchanges on the same runtime task.

### Compiler thread-local optimizations

A second smaller problem with the original task-local storage strategy is that
it required re-implementing "thread-local storage" like constructs but without
being able to get the compiler to help optimize. A discussion of how the
compiler optimizes thread-local storage is out of scope for this PR description,
but suffice to say a task-local storage implementation should be able to
leverage thread-locals as much as possible.

## A new task-local strategy

Introduced in this PR is a new strategy for dealing with task-local storage.
Instead of using the runtime task as the thread-local scope, the proposed
task-local API allows the user to define any arbitrary scope. This solves the
problem of binding task-locals to the runtime task:

```rust
tokio::task_local!(static FOO: u32);

FOO.scope(1, async move {

    some_async_fn().await;
    assert_eq!(FOO.get(), 1);

}).await;
```

The `scope` function establishes a task-local scope for the `FOO` variable. It
takes a value to initialize `FOO` with and an async block. The `FOO` task-local
is then available for the duration of the provided block. `scope` returns a new
future that must then be awaited on.

`tokio::task_local` will define a new thread-local. The future returned from
`scope` will set this thread-local at the start of `poll` and unset it at the
end of `poll`. `FOO.get` is a simple thread-local access with no special logic.

This strategy solves both problems. Task-locals can be scoped at any level and
can leverage thread-local compiler optimizations.

Going back to the previous example:

```rust
task_local! {
  static REQUEST_ID: u64;
}

let request1 = get_request().await;
let request2 = get_request().await;

let (response1, response2) = join!{
    async {
        let identifier = request1.identifier();

        REQUEST_ID.scope(identifier, async {
            process(request1).await
        }).await
    },
    async {
        let identifier = request2.identifier();

        REQUEST_ID.scope(identifier, async {
            process(request2).await
        }).await
    },
 };
```

There is no longer a problem with request identifiers leaking.

## Disadvantages

The primary disadvantage of this strategy is that the "set and forget" pattern
with thread-locals is not possible.

```rust
thread_local! {
  static FOO: Cell<usize> = Cell::new(0);
}

thread::spawn(|| {
    FOO.with(|cell| cell.set(123));

    do_work();
});
```

In this example, `FOO` is set at the start of the thread and automatically
cleared when the thread terminates. While this is nice in some cases, it only
really logically  makes sense because the scope of a "thread" is clear (the
thread).

A similar pattern can be done with the proposed stratgy but would require an
explicit setting of the scope at the root of `tokio::spawn`. Additionally, one
should only do this if the runtime task is the appropriate scope for the
specific task-local variable.

Another disadvantage is that this new method does not support lazy initialization
but requires an explicit `LocalKey::scope` call to set the task-local value. In
this case since task-local's are different from thread-locals it is fine.

[01]: https://docs.rs/futures/0.1.29/futures/task/struct.LocalKey.html
[local-set]: #
[futures-unordered]: https://docs.rs/futures/0.3.1/futures/stream/struct.FuturesUnordered.html
[select]: https://docs.rs/futures/0.3.1/futures/macro.select.html
[join]: https://docs.rs/futures/0.3.1/futures/macro.join.html
@LucioFranco LucioFranco requested a review from a team January 16, 2020 19:20
@seanmonstar
Copy link
Member

Quick question: while in a "scope", if I spawn a new task, is there a way to make that task "inherit" all existing scopes? For instance, if at a higher level, a user sets REQUEST_ID, and deeper in that scope, they want to spawn a task to do some work and report back, but don't have access to all the current scopes...

@LucioFranco
Copy link
Member Author

LucioFranco commented Jan 16, 2020

@seanmonstar That is not directly possible to do since that would require us to know dynamically all the scopes set on the stack. The real solution is as a user to know what task locals you might want to propagate and set them explicitly on spawn.

tokio::task_local! {
  static FOO: u32;
}

FOO.scope(1, async move {
    tokio::spawn(FOO.scope(FOO.get(), async move {
		assert_eq!(FOO.get(), 1);
	});
}).await;

Maybe, we can add an API that can do the scope and get as one function that would panic if the current scope is not set?

Also, this is non-trivial when it comes to types that don't implement Copy since you would then require a closure to have the user figure out how clone the type since you'd need to send it to another task.

I think this is ok to keep it this way since this is task locals and this behavior is the same with thread locals.

@carllerche
Copy link
Member

It might make more sense to factor in "inheritable" locals w/ "structured concurrency" (#1879). Thoughts @Matthias247?

@pandaman64
Copy link

task_scope stores a task-local cancellation token inside Waker by terrible unsafe pointer casting (the internal of Wakers are not public yet).

I think the thread-local approach is definitely better for Tokio considering safety now, but hopefully std's Context can offer such a mechanism.

($(#[$attr:meta])* $vis:vis $name:ident, $t:ty) => {
static $name: $crate::task::LocalKey<$t> = {
std::thread_local! {
static __KEY: std::cell::RefCell<Option<$t>> = std::cell::RefCell::new(None);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with how thread-locals are implemented. Do they when defined use memory on every thread - even if that one doesn't make use of them? Or is it lazily allocated? If it requires memory everywhere then adding lots of task-locals will obviously make it worse. But on the other hand I don't expect people to use too many task-locals with this kind declaration mechanism.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, thread-local pages are copy-on-write.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Matthias247
Copy link
Contributor

However, it is also possible to multiplex asynchronous logic within the context of a runtime task. APIs such as task::LocalSet , FuturesUnordered, select!, and join! provide the ability to embed a mini
scheduler within a single runtime task.

These challenges seem to come up in multiple places: tasks locals, tracing (which is related), running concurrently with a Runtime::block_in_place, etc. It's still a bit unclear to me whether we should treat multiple branches in a select! as seperate tasks, and invest as much as possible to uphold that assumption.

Or whether we rather try to keep the task concept clean as "an individual work strand directly scheduled on the executor". Which could e.g. be done by discouraging to use select! outside of timeout/cancellation checks and focus the use of join! on JoinHandles.

So that was just a random thought - since it actually does not matter a lot for the sake of this PR :-)

It might make more sense to factor in "inheritable" locals w/ "structured concurrency" (#1879). Thoughts @Matthias247?

I think scopes inheriting properties from their parent scopes makes a lot of sense. That can contain task locals, scheduler settings, etc. Kotlin does the same for CoroutineContext by default. If we would want to do that, we would however likely need a different design. We would somehow need to know all task-local variables at the time a spawn inside a scope happens, and then need to copy them over. We would also need to define whether a spawn copies a snapshot, or whether it actually has direct read access to parent the parent tasks variables (which might change over time). I guess it would be the snapshot.

Oh, and on the question whether task-locals should be supported at all: I'm rather a fan of explicit parameter passing than implicit. It's easier to understand and easier to test. And actually for each parameter someone could argue that it's more convenient not to pass it - and we end up with all global variables in the end 😊.
However I understand they can remove some noise if used carefully, and can bridge over some missing APIs in the middle - e.g. the missing ability to forward arbitrary parameters via core::task::Context.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great to me 👍 Thanks for making this happen.

@LucioFranco LucioFranco merged commit 619d730 into master Jan 17, 2020
@LucioFranco LucioFranco deleted the lucio/task-local branch January 17, 2020 19:42
@Darksonn Darksonn added A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-task Module: tokio/task labels Jul 25, 2020
@rlamarche
Copy link

rlamarche commented Feb 9, 2024

Hi,
I'm trying to use this pattern to share a database connection through DAO trait objets, but because I'm using diesel-async, I have lifetime problems.

Indeed, I can't use the get because the type I share is not Copy, so I use the function with or try_with, but with an async move closure.

If there were something like a ScopedFuture version of the with (and try_with) functions, I think that I would be able to extend the lifetime of the reference I get in the closure, like it is done in the connection.transaction method provided in the TransactionManager from diesel-async crate.

I'm not sure if my problem is clear, but I've found a workaround by wrapping my connection in a Arc<Mutex<>> (mutex from Tokio) to be able to clone the reference and then obtain a mutable reference to the connection asynchronously.

Here is the code : (it might not compile because you must provide diesel model & tables)

use anyhow::{anyhow, Result};
use async_trait::async_trait;
use diesel_async::{
    pooled_connection::bb8::{Pool, PooledConnection},
    AnsiTransactionManager, AsyncPgConnection, TransactionManager,
};
use std::sync::Arc;
use tokio::sync::Mutex;

pub type DBPool = Pool<AsyncPgConnection>;
pub type DBConnection = AsyncPgConnection;

// this is the db pool connections which as a static lifetime, so it live for the entire program lifetime
pub type PooledDBConnection = PooledConnection<'static, DBConnection>;

// the task local for sharing a connection
tokio::task_local!(static DB_CONNECTION: Arc<Mutex<PooledDBConnection>>);

// the macro to call from a dao to get a connection either from the pool of from the current transaction
macro_rules! run_with_connection {
    ($pool: expr, $body:expr) => {{
        let result = DB_CONNECTION.try_with(|conn| {
            let conn = conn.clone();
            println!("INSIDE TRANSACTION");
            $body(conn)
        });

        match result {
            Ok(future) => future.await,
            Err(_) => {
                println!("OUTSIDE TRANSACTION");
                let conn = std::sync::Arc::new(tokio::sync::Mutex::new($pool.get_owned().await?));
                $body(conn).await
            }
        }
    }};
}

pub(crate) use run_with_connection;

// the wrapper to run in transaction (replaces connection.transaction())
pub async fn run_in_transaction<'a, R, F, Fut>(pool: Arc<DBPool>, callback: F) -> Result<R>
where
    F: FnOnce() -> Fut,
    Fut: Future<Output = Result<R>>,
    R: Send + 'a,
{
    let mut conn = pool.get_owned().await?;

    AnsiTransactionManager::begin_transaction(&mut *conn).await?;

    DB_CONNECTION
        .scope(Arc::new(Mutex::new(conn)), async move {
            match callback().await {
                Ok(value) => {
                    DB_CONNECTION
                        .with(|conn| {
                            let conn = conn.clone();
                            async move {
                                let mut mut_conn = conn.lock().await;
                                AnsiTransactionManager::commit_transaction(&mut **mut_conn).await
                            }
                        })
                        .await?;

                    Ok(value)
                }
                Err(user_error) => {
                    DB_CONNECTION
                        .with(|conn| {
                            let conn = conn.clone();
                            async move {
                                let mut mut_conn = conn.lock().await;
                                match AnsiTransactionManager::rollback_transaction(&mut **mut_conn)
                                    .await
                                {
                                    Ok(()) => Err(user_error),
                                    Err(diesel::result::Error::BrokenTransactionManager) => {
                                        // In this case we are probably more interested by the
                                        // original error, which likely caused this
                                        Err(user_error)
                                    }
                                    Err(rollback_error) => Err(rollback_error.into()),
                                }
                            }
                        })
                        .await?
                }
            }
        })
        .await
}


#[async_trait]
pub trait MyDAO: Sync + Send {
    async fn insert_data(self: &Self, input: &NewData) -> Result<Data>;
}

pub struct MyDAOImpl {
    pool: Arc<DBPool>,
}

impl MyDAOImpl {
    pub fn new(pool: Arc<DBPool>) -> Self {
        Self { pool }
    }
}

#[async_trait]
impl MyDAO for MyDAOImpl {
    async fn insert_data(self: &Self, input: &NewData) -> Result<Data> {
        run_with_connection!(
            self.pool,
            |conn: Arc<Mutex<PooledDBConnection>>| async move {
                let mut conn = conn.lock().await;
                let data: Data = diesel::insert_into(schema::data::table)
                    .values(input)
                    .get_result::<Data>(&mut **conn)
                    .await?;

                Ok(data)
            }
        )
    }
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    dotenvy::dotenv().ok();
    env_logger::init_from_env(env_logger::Env::new().default_filter_or("info"));

    let pool = Arc::new(my_database::pool::create_async_db_pool().await?);

    run_in_transaction(pool.clone(), || async move {

        let dao = MyDAOImpl::new(pool.clone());
        dao.insert_data(&NewData::default()).await?;
        dao.insert_data(&NewData::default()).await?;


        Err(anyhow!("force rollback"))
    }).await?;

    Ok(())
}

@Darksonn
Copy link
Contributor

Darksonn commented Feb 9, 2024

If you have a question, then please open a new issue or discussion for it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-enhancement Category: A PR with an enhancement or bugfix. M-task Module: tokio/task
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants