Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The documentation for tokio_threadpool::blocking assumes more understanding of Tokio than we can rely on #618

Closed
farnz opened this issue Sep 4, 2018 · 7 comments

Comments

@farnz
Copy link
Contributor

farnz commented Sep 4, 2018

This is connected to @jsgf's issue #617, which demonstrates this confusion nicely.

The documentation for tokio_threadpool::blocking in version 0.1.6 states:

When the blocking function enters, it hands off the responsibility of processing the current work queue to another thread. Then, it calls the supplied closure. The closure is permitted to block indefinitely.

A naïve reading suggests that if you have a blocking future selecting with another future, both futures will run concurrently. This is not true; the threadpool works at the level of tasks, not futures, and all that blocking actually does is tell the scheduler two things:

  1. That this thread cannot drive the event loop any more, as it can be expected to block in the blocking code indefinitely.
  2. That if you have a task waiting for a thread, you can use part of the blocking thread pool to keep more tasks running.

However, both Jeremy and I, plus a third co-worker, ended up confused by this and expected that just the blocking section would run on a new thread, while other futures would continue to run on the current thread.

In code terms, we have the following in a utility library:

/// Take a future, and run it on its own task, returning the result to the caller. This permits
/// Rust to run the spawned future on a different thread to the task that spawned it, thus adding
/// parallelism if used sensibly.
/// Note that the spawning here is lazy - the new task will not be spawned if the returned future
/// is dropped before it's polled.
pub fn spawn_future<T, E, Fut>(f: Fut) -> impl Future<Item = T, Error = E>
where
    Fut: Future<Item = T, Error = E> + Send + 'static,
    T: Send + 'static,
    E: From<futures::Canceled> + Send + 'static,
{
    let (tx, rx) = oneshot::channel();

    let fut = f.then(|res| {
        let _ = tx.send(res);
        Ok(())
    });

    future::lazy(move || {
        let _ = tokio::spawn(fut);
        rx.from_err().and_then(|v| v)
    })
}

/// Given an `FnMut` closure, create a `Future` that will eventually execute the closure using
/// Tokio's `blocking` mechanism, so that it is safe to call blocking code inside the closure
/// without preventing other tasks from making progress.
/// This returns a lazy future - it will not even attempt to run the blocking code until you poll
/// the future.
/// Note that this does not spawn the future onto its own task - use `asynchronize` below if you
/// need to run the blocking code on its own thread, rather than letting it block this task.
pub fn closure_to_blocking_future<T, E, Func>(f: Func) -> impl Future<Item = T, Error = E>
where
    Func: FnMut() -> Result<T, E>,
    E: From<tokio_threadpool::BlockingError>,
{
    let mut func = f;
    future::lazy(|| future::poll_fn(move || tokio_threadpool::blocking(&mut func)))
        .map_err(E::from)
        .and_then(|res| res) // flatten Ok(res) => res
}

///  This method allows us to take synchronous code, schedule it on the default tokio thread pool
/// and convert it to the future. It's the combination of `spawn_future` (which runs a future on
/// another thread) and `closure_to_blocking_future` (which turns a closure into a future).
pub fn asynchronize<Func, T, E>(f: Func) -> impl Future<Item = T, Error = E>
where
    Func: FnMut() -> Result<T, E> + Send + 'static,
    T: Send + 'static,
    E: From<tokio_threadpool::BlockingError> + From<futures::Canceled> + Send + 'static,
{
    let fut = closure_to_blocking_future(f);

    spawn_future(fut)
}

Our naïve expectation is that:

closure_to_blocking_future(
    || { thread::sleep(Duration::from_secs(1500); Ok("timeout") }
).select(other_future)

would be a future that would return the result of other_future or return after 1,500 seconds, whichever came first.

The reality is that the thread you run this future on will be blocked for 1,500 seconds to let the blocking block run on the thread, and then other_future will be allowed to run. We have to use the full machinery in asynchronize to get the blocking thread onto its own task, at which point the two futures can run concurrently.

Can we improve the documentation to make it clear that blocking does not create a fresh task, and thus that the worker that runs the blocking code section will not execute other futures even if they're ready to work? The example includes spawn, but it's not very obvious that not spawning onto a new task may have unexpected results.

@tobz
Copy link
Member

tobz commented Sep 5, 2018

Do you have any rough thoughts on how this could be worded?

I'm thinking along the lines of like... explaining a task vs a future in the sense of a task being a future or future chain, and how a blocking operation within the chain/task stalls the whole thing.

It feels pretty self-explanatory -- if you have something in a future that blocks, everything that depends on that future also blocks -- but I also see what you're saying about it being non-obvious. I'm not sure that the people who think of these things as "obvious" are equipped to come up with the best explanation for it, if that makes sense.

Having that outside perspective, even if it's just a seed of a sentence, would help a lot.

@carllerche
Copy link
Member

This relates to #589 and #588.

I agree that there is a problem here. The blocking API from tokio-threadpool is not general purpose. It is much faster than a general purpose strategy, but has the problem that you described. As such, it makes more sense to use blocking for cases where the closure may or may not block for shortish periods of time, such as file reading / writing.

There should be a general purpose blocking strategy that has the behavior you described. This would have to work by moving the blocking operation to a separate thread. Issue #588 represents this work.

@farnz
Copy link
Contributor Author

farnz commented Sep 5, 2018

As background, the naïve view is that a single future is the scheduling atom. Tasks will "steal" notified futures from other tasks if the other task is busy, just as threads "steal" waiting tasks from other threads if other threads are busy.

Thus, something along the lines of "The resulting future will block the entire task it is running on until the blocking activity is complete - other futures on the same task will not execute until the blocking activity returns to the caller. If this is not desired, you will need to spawn a new task to run the blocking activity".

Basically, make it clear that you will not see parallel execution if you have a blocking future.

@blakesmith
Copy link
Contributor

@farnz I was looking to lift a blocking IO library into a Tokio server, and ended up following a similar pattern to your utility functions. Thank you for sharing! One question for you: Because all the closures end up being FnMut, do you end up wrapping all your captured variables in Arcs? I'm new to Rust, so I might be missing something, but it seems a little heavyweight - especially since in my head the blocking operations I'm performing feel more like a FnOnce, where I would be able to move variables into the closure (store this captured data, then chain it with some other future operations)

@farnz
Copy link
Contributor Author

farnz commented Dec 4, 2018

@blakesmith Worth noting that spawn_future is very similar to oneshot::spawn, except that oneshot::spawn is eager to spawn the future, while spawn_future is lazy about spawning - it will not spawn until the result is demanded by another future.

I don't end up wrapping all my captured variables in Arcs - most of them support Clone, and thus I end up with closures that look like:

{
    let thing = thing.clone(); // Repeated as needed
    move || {
        let data = thing.as_data();
        blocking_operation(data)
    }
}

The reason closure_to_blocking_future takes FnMut not FnOnce arguments is that poll_fn's argument has to be an FnMut, and I use poll_fn to turn blocking's result into a future that will eventually block. I am not expert enough in Rust to find a way to carry blocking's argument through as an FnOnce.

@blakesmith
Copy link
Contributor

@farnz Ah, that makes sense, thanks for explaining, especially the semantics of oneshot::spawn. In my case, I'm moving a Bytes type into the closure for persistent storage. Before reading your response, I assumed that the Clone implementation for Bytes does a deep copy, but it looks like it's smart and does some sort of internal ref counting as well. Looks like I can avoid Arc usage in this case!

Cheers, and thanks again!

@farnz
Copy link
Contributor Author

farnz commented Dec 6, 2018

@tobz has merged my documentation change that resolves this issue - #629 covers fixing the issue properly, so this issue is now dealt with.

@farnz farnz closed this as completed Dec 6, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants