From 3a9b5073cf515be5bf0eb52905ee79fab5c3f6ee Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Thu, 14 Sep 2017 10:02:09 -0700 Subject: [PATCH 01/13] RFC: Tokio reform --- tokio-reform.md | 931 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 931 insertions(+) create mode 100644 tokio-reform.md diff --git a/tokio-reform.md b/tokio-reform.md new file mode 100644 index 0000000..4c62b6f --- /dev/null +++ b/tokio-reform.md @@ -0,0 +1,931 @@ +# Summary +[summary]: #summary + +This RFC proposes to simplify and focus the Tokio project, in an attempt to make +it easier to learn and more productive to use. Specifically: + +* Add a global event loop in `tokio-core` that is managed automatically by + default. This change eliminates the need for setting up and managing your own + event loop in the vast majority of cases. + + * Moreover, remove the distinction between `Handle` and `Remote` in + `tokio-core` by making `Handle` both `Send` and `Sync` and deprecating + `Remote`. Thus, even working with custom event loops becomes simpler. + +* Decouple all task execution functionality from Tokio, instead providing it + through a standard futures component. As with event loops, provide a default + global thread pool that suffices for the majority of use-cases, removing the + need for any manual setup. + + * Moreover, when running tasks thread-locally (for non-`Send` futures), + provide more fool-proof APIs that help avoid lost wakeups. + +* Provide the above changes in a new `tokio` crate, which is a slimmed down + version of today's `tokio-core`, and may *eventually* re-export the contents + of `tokio-io`. The `tokio-core` crate is deprecated, but will remain available + for backward compatibility. In the long run, most users should only need to + depend on `tokio` to use the Tokio stack. + +* Focus documentation primarily on `tokio`, rather than on + `tokio-proto`. Provide a much more extensive set of cookbook-style examples + and general guidelines, as well as a more in-depth guide to working with + futures. + +Altogether, these changes, together with [async/await], should go a long +distance toward making Tokio a newcomer-friendly library. + +[async/await]: https://internals.rust-lang.org/t/help-test-async-await-generators-coroutines/5835 + +# Motivation +[motivation]: #motivation + +While Tokio has been an important step forward in Rust's async I/O story, many +have found it difficult to learn initially, and have wanted more guidance in +using it even after mounting the initial learning curve. While documentation is +a major culprit, there are also several aspects of the core APIs that can be +made simpler and less error-prone. The core motivation of this RFC is to tackle +this problem head-on, ensuring that the technical foundation itself of Tokio is +simplified to enable a much smoother introductory experience into the "world of +async" in Rust. + +On the documentation side, one mistake we made early on in the Tokio project was +to so prominently discuss the `tokio-proto` crate in the documentation. While +the crate was intended to make it very easy to get basic protocol +implementations up and running, it did not provide a very useful *entry* point +for learning Tokio as a whole. Moreover, it turns out that there's a much wider +audience for Tokio than there is for `tokio-proto` in particular. It's not +entirely clear what the long-term story for `tokio-proto` should be (given that +[`h2`], one of its intended uses, rolled it own implementation), but for the +time being it will be de-emphasized in the documentation. + +[`h2`]: https://github.com/carllerche/h2 + +On the API side, we've had more success with the `tokio-core` crate, but it's +not without its problems. The distinction between `Core`, `Handle`, and `Remote` +is subtle and can be difficult to grasp, for example. And in general, Tokio +requires some amount of setup to use properly, which has turned out to be a +struggle for some. Simplifying and decoupling these APIs should make the library +easier to learn and to use. + +It is our intention that after this reorganization happens the introduction to +the Tokio project is a much more direct and smoother path than it is today. +There will be fewer crates to consider (mostly just `tokio`) which have a +much smaller API to work with (detailed below) and should allow us to tackle +the heart of async programming, futures, much more quickly in the +documentation. + +# Guide-level explanation +[guide-level-explanation]: #guide-level-explanation + +## API changes + +*Note: this is a guide-level explanation to the *changes*, not a full-blown +attempt to teach Tokio from scratch with the new APIs*. + +### A quick overview of the changes + +From a high level, the changes are: + +- Make it possible to use Tokio without ever explicitly touching an event loop. + +- Simplify the APIs around event loops. + +- Decouple all task execution functionality from Tokio, moving it to the futures + level. + +- Publish this all as a new `tokio` crate, deprecating `tokio-core` (which just + wraps `tokio` and can interoperate). + +### Examples for common-case usage + +To see the API changes at a high level, let's start with a very simple example: +an echo server written directly against Tokio. We'll look, in fact, at *four* +variants of the echo server: + +- A blocking variant using `std`. +- A variant using today's `tokio-core`. +- A variant using the proposed `tokio` crate. +- A variant using `tokio` and [`async/await`] notation. + +#### Variant 1: a blocking echo server + +Here's one way you might write a blocking echo server against `std`: + +```rust +//! A blocking echo server + +use std::io; +use std::env; +use std::net::{SocketAddr, TcpListener}; +use std::thread; + +fn serve(addr: SocketAddr) -> io::Result<()> { + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + for conn in socket.incoming() { + if let Ok(stream) = conn { + thread::spawn(move || { + // the strange double-borrow here is needed because + // `copy` takes `&mut` references + match io::copy(&mut &stream, &mut &stream) { + Ok(amt) => println!("wrote {} bytes to {}", amt, addr), + Err(e) => println!("error on {}: {}", addr, e), + } + }); + } + } + + Ok(()) +} + +fn main() { + let addr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".to_string()) + .parse() + .unwrap(); + + serve(addr).unwrap(); +} +``` + +This code is almost entirely straightforward; the only hiccup is around the +`copy` combinator, which wants `&mut` reference for both a reader and a +writer. Fortunately, a `&TcpStream` can serve as *both* a reader and writer +(thread safety is provided by the operating system). + +#### Variant 2: a `tokio-core` echo server + +Now we'll translate the blocking code into async code using today's `tokio-core` +crate, trying to keep it as similar as possible: + +```rust +#![feature(conservative_impl_trait)] + +extern crate futures; +extern crate tokio_core; +extern crate tokio_io; + +use std::io; +use std::env; +use std::net::SocketAddr; + +use futures::{Future, Stream, IntoFuture}; +use tokio_core::net::TcpListener; +use tokio_core::reactor::{Core, Handle}; +use tokio_io::AsyncRead; +use tokio_io::io::copy; + +fn serve(addr: SocketAddr, handle: Handle) -> impl Future { + TcpListener::bind(&addr, &handle) + .into_future() + .and_then(move |socket| { + println!("Listening on: {}", addr); + + socket + .incoming() + .for_each(move |(conn, _)| { + let (reader, writer) = conn.split(); + handle.spawn(copy(reader, writer).then(move |result| { + match result { + Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), + Err(e) => println!("error on {}: {}", addr, e), + }; + Ok(()) + })); + Ok(()) + }) + }) +} + +fn main() { + let addr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".to_string()) + .parse() + .unwrap(); + + let mut core = Core::new().unwrap(); + let handle = core.handle(); + core.run(serve(addr, handle)).unwrap(); +} +``` + +Several aspects needed to change to get this variant to work: + +- Instead of writing code that primarily works with `io::Result`, we're now + working with `impl Future` and combinators thereof. + +- We need to set up an event loop (`Core`), generate handles to it, and pass + those handles around. + - The handles are used both for setting up I/O objects, and for spawning tasks. + +- The `Incoming` stream and result of `copy` are both more complex than those in + `std`. + - For the `Incoming` stream, that's a minor detail having to do with how the + socket address is exposed. + - For `copy`, that's a deeper shift: the function takes full *ownership* of + the reader and writer, because in general futures avoid borrowing. Instead, + ownership of the reader and writer is returned upon completion (and here, + thrown away). + +All of these changes take some work to get your head around. Some of them are +due to Tokio's API design; others are due to futures. The next two variants will +show how we can improve each of those two sides. + +#### Variant 3: a `tokio` echo server + +First, we'll convert the code to use the newly-proposed `tokio` crate: + +```rust +#![feature(conservative_impl_trait)] + +extern crate futures; +extern crate tokio_core; +extern crate tokio_io; + +use std::io; +use std::env; +use std::net::SocketAddr; + +use futures::{Future, Stream, IntoFuture, CurrentThread}; +use tokio_core::net::TcpListener; +use tokio_io::AsyncRead; +use tokio_io::io::copy; + +fn serve(addr: SocketAddr) -> impl Future { + TcpListener::bind(&addr) + .into_future() + .and_then(move |socket| { + println!("Listening on: {}", addr); + + socket + .incoming() + .for_each(move |(conn, _)| { + let (reader, writer) = conn.split(); + CurrentThread.spawn(copy(reader, writer).then(move |result| { + match result { + Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), + Err(e) => println!("error on {}: {}", addr, e), + }; + Ok(()) + })); + Ok(()) + }) + }) +} + +fn main() { + let addr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".to_string()) + .parse() + .unwrap(); + + CurrentThread::run(serve(addr)).unwrap(); +} +``` + +This version of the code retains the move to using futures explicitly, but +eliminates the explicit event loop setup and handle passing. To execute and +spawn tasks, it uses the `CurrentThread` executor from futures, which +multiplexes tasks onto the current OS thread. A similar API is available for +working with the default thread pool instead, which is more appropriate for +tasks performing blocking or CPU-heavy work. + +#### Variant 4: a `tokio` + `async/await` echo server -- the long-term vision + +Finally, if we incorporate [`async/await`] notation, we get code that looks +quite similar to the initial blocking variant: + +```rust +#![feature(proc_macro, conservative_impl_trait, generators)] + +extern crate futures_await as futures; +extern crate tokio; +extern crate tokio_io; + +use std::io; +use std::env; +use std::net::SocketAddr; + +use futures::prelude::*; +use tokio::net::TcpListener; +use tokio_io::AsyncRead; +use tokio_io::io::copy; + +#[async] +fn serve(addr: SocketAddr) -> io::Result<()> { + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + #[async] + for (conn, _) in socket.incoming() { + // with Tokio, read and write components are distinct: + let (reader, writer) = conn.split(); + + CurrentThread.spawn(async_block! { + match await!(copy(reader, writer)) { + Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), + Err(e) => println!("error on {}: {}", addr, e), + }; + + Ok(()) + }); + } + + Ok(()) +} + +fn main() { + let addr = env::args() + .nth(1) + .unwrap_or("127.0.0.1:8080".to_string()) + .parse() + .unwrap(); + + CurrentThread::run(serve(addr)).unwrap(); +} +``` + +The most important difference, of course, is the sprinkling of `async` and +`await` annotations. You can think of them as follows: + +- Code that is `async`-annotated looks like `impl Future` + on the outside, and `Result` on the inside. That is, it's lets you write + code that *looks* like synchronous I/O code, but in fact produces a future. +- Code that is `await`-annotated is the reverse: it looks like + `impl Future` on the *inside*, and `Result` on the + outside. You use `await` to consume futures within `async` code. + +More detail is available [here](https://github.com/alexcrichton/futures-await); +the rest of this RFC is focused purely on the Tokio side of things. + +The *long-term* goal is to provide a complete async story that feels very close to +synchronous programming, i.e. something like: + +```rust +async fn serve(addr: SocketAddr) -> io::Result<()> { + let socket = TcpListener::bind(&addr)?; + println!("Listening on: {}", addr); + + async for (conn, _) in socket.incoming() { + // with Tokio, read and write components are distinct: + let (reader, writer) = conn.split(); + + CurrentThread.spawn(async { + match await copy(reader, writer) { + Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), + Err(e) => println!("error on {}: {}", addr, e), + }; + + Ok(()) + }); + } + + Ok(()) +} +``` + +However, to fully get there, we'll need *borrowing* to work seamlessly with +`async`/`await`, and for the feature to be provided in a direct way by the +compiler; this is still a ways off. In the meantime, though, we can improve the +Tokio side of the experience, which is what this RFC aims to do. + +### Key insights for this RFC + +The design in this RFC stems from several key insights, which we'll work through +next. + +#### Insight 1: we can decouple task execution from the event loop + +In today's `tokio-core` design, the event loop (`Core`) provides two important +pieces of functionality: + +- It manages asynchronous I/O events using OS APIs, recording those events and + waking up tasks as appropriate. + +- It acts as an *executor*, i.e. you can spawn futures onto it. + +On each turn of the event loop, the `Core` processes all I/O events *and* polls +any ready-to-run futures it's responsible for. + +Running tasks directly on the event loop thread is a double-edged sword. On the +one hand, you avoid cross-thread communication when such tasks are woken from +I/O events, which can be advantageous when the task is just going to immediately +set some new I/O in motion. On the other hand, if the woken task does +non-trivial work (even just to determine what I/O to do), *it is effectively +blocking the further processing and dispatch of I/O events*, which can limit the +potential for parallelism. + +An insight behind this RFC is that there's no need to *couple* executor +functionality with the event loop; we can provide each independently, but still +allow you to combine them onto a single thread if needed. Our hypothesis is +that, for most users, keeping event processing and task execution on strictly +separate threads is a net win: it eliminates the risk of blocking the event loop +and hence increases parallelism, at a negligible synchronization cost. + +Thus, this RFC proposes that the new `tokio` crate focus *solely* on I/O, +leaving executors to the `futures` crate. + +#### Insight 2: we can provide default event loops and executors for the common case + +Pushing further along the above lines, we can smooth the path toward a +particular setup that we believe will work well for the majority of +applications: + +- One dedicated event loop thread, which does not perform any task execution. +- A global thread pool for executing compute-intensive or blocking tasks; tasks must be `Send`. +- Optionally, a dedicated thread for running thread-local (non-`Send`) tasks in + a *cooperative* fashion. (Requires that the tasks do not perform too much work + in a given step). + +The idea is to provide APIs for (1) working with I/O objects without an explicit +event loop and (2) spawning `Send` tasks without an explicit thread pool, and in +both cases spin up the corresponding default global resource. We then layer APIs +for customizing or configuring the global resource, as well as APIs for +e.g. targeting specific event loops. But for most users, it's just not necessary +to think about the event loop or thread pool. + +These default global resources greatly aid the experience for two additional cases: + +- Asynchronous client-side code, where it's far less natural to spin up and + manage an event loop yourself. + +- *Synchronous* wrappers around async libraries. These wrappers generally want + to avoid exposing details about the event loop, which is an implementation + detail. By providing a global event loop resource, these wrapper libraries can + both avoid this exposure *and* successfully *share* a common event loop, + without any action on the user's part. + +For thread-local tasks, which are more specialized *and local*, the user should +retain full control over the thread of execution, and specifically request +spinning up an executor. An API for doing so, with some additional fool-proofing +compared to today's APIs, is part of the RFC. + +#### Insight 3: we can remove the `Handle`/`Remote` distinction + +Today, `tokio-core` provides *three* types for working with event loops: + +- `Core`: the event loop itself; non-`Clone`, non-`Send`. +- `Handle`: an *on-thread* pointer to the event loop; `Clone`, non-`Send`. +- `Remote`: a general-purpose pointer to the event loop; `Clone` and `Send`. + +The distinction between `Handle` and `Remote` today stems from the fact that the +event loop serves as an executor; to spawn non-`Send` tasks onto it, you need +"proof" that you're on the same thread, which is what `Handle` provides. By +splitting off the executor functionality, we can do away with this distinction. + +At the same time, we can incorporate recent changes to `mio` that make it +possible to provide highly-concurrent updates to the event loop itself, so that +registering events cross-thread requires only lightweight synchronization. + +#### Insight 4: the `tokio` crate + +As we'll see, some of the changes proposed in this RFC would involve breakage +for `tokio-core`. Rather than produce a new major release, however, the RFC +proposes to introduce a `tokio` crate providing the new APIs, which can be +wrapped by the (now deprecated) `tokio-core` crate. This also allows us to +provide interoperation across libraries. + +The Tokio team initially resisted creating a `tokio` crate because it was +unclear at first what the most common use-cases of the library would be, +e.g. how important `tokio-proto` would be. It's now clear that today's +`tokio-core` is really what *Tokio* is, with `tokio-proto` being a convenient +helper that sits on top. + +Moreover, with the other slimming down proposed in this RFC, the remaining APIs +seem constitute a relatively minimal commitment. + +Thus, the time seems ripe to publish a `tokio` crate! + +## Documentation changes + +This RFC won't go into extensive detail on the new structure of the +documentation, but rather just enumerate some basic constraints: + +- It should put much more focus on core futures and Tokio concepts, rather than + `tokio-proto`, at the outset. This should include talking much more about the + *ecosystem*, showing how to use existing libraries to build async services. + +- It should give far more guidance on how to effectively *use* Tokio and + futures, e.g. by talking about how to deal with common ownership issues. + +- It should provide an order of magnitude more examples, with a mix of + "cookbook"-style snippets and larger case studies. + +- There should be a lot more of it. + +# Reference-level explanation +[reference-level-explanation]: #reference-level-explanation + +## The new `tokio` crate + +To begin with, the new `tokio` crate will look like a streamlined version of +`tokio-core`. Eventually it will also re-export APIs from `tokio-io`, once those +APIs have been fully vetted. + +At the top level, the crate will provide four submodules: + +- `reactor`: for manual configuration and control over event loops. +- `net`: I/O objects for async networking. +- `timer`: APIs for async timers. +- `io`: general utilities for async I/O (much like `std::io`) + +Let's look at each in turn. + +### The `reactor` module + +The reactor module provides just a few types for working with reactors (aka +event loops): + +- `Reactor`, an owned reactor. (Used to be `Core`) +- `ReactorId`, a unique identifier per reactor instance. +- `Handle`, a shared, cloneable handle to a reactor. +- `PollEvented`, a bridge between `mio` event sources and a reactor + +Compared to `Core` today, the `Reactor` API is much the same, but drops the +`run` and `remote` methods and the `Executor` implementation: + +```rust +impl Reactor { + fn new() -> Result; + fn id(&self) -> ReactorId; + fn handle(&self) -> Handle; + + // blocks until an event (or timeout), then dispatches all pending events + fn turn(&mut self, max_wait: Option); +} +``` + +The `Handle` API is even more slimmed down: + +```rust +impl Handle { + // get a handle to the default global event loop + fn global() -> Handle; + + fn id(&self) -> ReactorId; +} +``` + +Unlike today, though, `Handle` is `Send` and `Sync` (as well as `Clone`). +Handles are used solely to tie particular I/O objects to particular event loops. +In particular, the `PollEvented` API stays exactly as-is, and is the primitive +way that I/O objects are registered onto a reactor. + +In the future, we expect to provide ways to configure reactors, and to override +the default reactor within a scope. These APIs will be modeled after +Rayon's +[configuration APIs](https://docs.rs/rayon/0.8.2/rayon/struct.Configuration.html), +which play a very similar role. + +### The `net` module + +The `net` module remains almost exactly as it is in `tokio-core` today, with one +key difference: APIs for constructing I/O objects come in two flavors, one that +uses the default global reactor, and one that takes an explicit handle. For the +latter, when there are already multiple configuration options, this RFC proposes +switching to a builder. + +#### Example: `TcpListener` + +```rust +impl TcpListener { + // set up a listener with the global event loop + fn bind(addr: &SocketAddr) -> io::Result; + + fn builder(addr: &SocketAddr) -> TcpListenerBuilder; +} + +struct TcpListenerBuilder { .. } + +impl TcpListenerBuilder { + fn handle(&mut self, handle: Handle) -> &mut Self; + fn bind(&self) -> io::Result; + fn from_listener(&self, listener: std::net::TcpListener) -> io::Result; +} +``` + +#### Example: `TcpStream` + +```rust +impl TcpStream { + // set up a stream with the global event loop + fn connect(addr: &SocketAddr) -> TcpStreamNew; + + fn builder() -> TcpStreamBuilder; +} + +struct TcpStreamBuilder { .. } + +impl TcpStreamBuilder { + fn handle(&mut self, handle: Handle) -> &mut Self; + fn stream(&mut self, stream: std::net::TcpStream) -> &mut Self; + + fn connect(&self, addr: &SocketAddr) -> TcpStreamNew; + fn from_stream(&self) -> TcpStreamNew; +} +``` + +### The `timer` module + +The `timer` module will contain the `Timeout` and `Interval` types currently in +`tokio_core`'s `reactor` module. Their APIs will be adjusted along similar lines +as the `net` APIs, except that they will *only* support a builder-style API: + +```rust +impl Timeout { + fn build() -> TimeoutBuilder; + fn reset(&mut self, at: Instant); +} + +impl TimeoutBuilder { + fn handle(&mut self, handle: Handle) -> &mut Self; + + fn at(&mut self, time: Instant) -> Timeout; + fn after(&mut self, dur: Duration) -> Timeout; +} +``` + +### The `io` module + +Finally, there may *eventually* be an `io` modulewith the full contents of the +`tokio-io` crate. However, those APIs need a round of scrutiny first (the +subject of a future RFC), and they may ultimately be moved into the `futures` +crate instead. + +An eventual goal, in any case, is that you can put Tokio to good use by bringing +in only the `tokio` and `futures` crates. + +## Changes to the `futures` crate + +In general, futures are spawned onto *executors*, which are responsible for +polling them to completion. There are basically two ways this can work: either +you run a future on your thread, or someone else's (i.e. a thread pool). + +The threadpool case is provided by crates like [futures_cpupool], but the intent +of this RFC is that the `futures` crate itself should provide a configurable, +default global thread pool, which provides similar benefits to providing a +global event loop. Exact details are out of scope for this RFC. + +[futures_cpupool]: https://docs.rs/futures-cpupool/0.1.5/futures_cpupool/ + +The "your thread" case allows for non-`Send` futures to be scheduled +cooperatively onto their thread of origin, and is *currently* provided by the +`Core` in `tokio_core`. As explained in the Guide-level introduction, though, +this RFC completely decouples this functionality, moving it instead into the +`futures` crate. + +### The `current_thread` module + +The `futures` crate will add a module, `current_thread`, with the following +contents: + +```rust +pub mod current_thread { + // Execute the given future *synchronously* on the current thread, blocking until + // it completes and returning its result. + // + // NB: no 'static requirement on the future + pub fn block_until(f: F) -> Result; + + // An object for cooperatively executing multiple tasks on a single thread. + // Useful for working with non-`Send` futures. + // + // NB: this is not `Send` + pub struct TaskRunner { .. } + + impl TaskRunner { + pub fn new() -> Self; + + // To spawn tasks onto this runner, you need a separate, cloneable + // `Spawner` object. + pub fn spawner(&self) -> Spawner; + + // Blocks, running all *non-daemon* tasks to completion. + pub fn block_on_all(&mut self); + + // Blocks, pushing all tasks forward but returning when the given future + // completes. + // + // NB: no 'static requirement on the future + pub fn block_until(&mut self, f: F) -> Result; + + // Kills off *all* remaining tasks (daemon or not) + pub fn force_shutdown(self); + } + + impl Drop for TaskRunner { + // panics if: + // (1) not already panicking and + // (2) there are non-daemon tasks remaining + } + + // NB: this is not `Send` + #[derive(Clone)] + pub struct Spawner { .. } + + impl Spawner { + // Spawns a "standard" task, i.e. one that must be explicitly either + // blocked on or killed off before the associated `TaskRunner` is destroyed. + pub fn spawn(&self, task: F) + where F: Future + 'static; + + // Spawns a "daemon" task, which is not blocked on when waiting + // for completion. + pub fn spawn_daemon(&self, task: F) + where F: Future + 'static; + } + + impl Executor for Spawner + where F: Future + 'static + { + // ... + } +} +``` + +This suite of functionality replaces two APIs provided in the Tokio stack today: + +- The free `block_until` function replaces the `wait` method on `Future`, which + will be deprecated. In our experience with Tokio, as well as the experiences + of other ecosystems like Finagle in Scala, having a blocking method so easily + within reach on futures leads people down the wrong path. While it's vitally + important to have this "bridge" between the async and sync worlds, providing + it as `current_thread::block_until` makes it much more clear what is involved. + +- The `TaskRunner` and `Spawner` types replace the use of `Handle` today for + cooperative, non-`Send` task execution. Unlike `Handle`, though, this API is + carefully crafted to help ensure that spawned tasks are actually run to + completion, unless explicitly requested otherwise. + +Thus, in addition to providing a cleaner factoring, these two APIs also mitigate +two major footguns with today's Tokio stack. + +### Executors in general + +Another footgun we want to watch out for is accidentally trying to spin up +multiple executors on a single thread. The most common case is using +`current_thread::block_until` on, say, a thread pool's worker thread. + +We can mitigate this easily by providing an API for "executor binding" that +amounts to a thread-local boolean: + +```rust +pub mod executor { + pub struct Enter { .. } + + // Marks the current thread as being within the dynamic extent of + // an executor. Panics if the current thread is *already* marked. + pub fn enter() -> Enter; + + impl Drop for Enter { + // Exits the dynamic extent of an executor, unbinding the thread + } +} +``` + +This API should be used in functions like `block_until` that enter +executors. Consequently, nested uses of `TaskRunner`, or usage of +`current_thread::block_until` within a `TaskRunner` or thread pool, will panic, +alerting the user to a bug. + +## Migration Guide + +While `tokio` and `tokio-core` will provide some level of interoperation, the +expectation is that over time the ecosystem will converge on just using `tokio`. +Transitioning APIs over is largely straightforward, but will require a major +version bump: + +- Uses of `Remote`/`Handle` that don't involve spawning threads can use the new + `Handle`. +- Local task spawning should migrate to use the `current_thread` module. +- APIs that need to construct I/O objects, or otherwise require a `Handle`, + should follow the pattern laid out in the `net` module: + - There should be a simple standard API that does not take an explicit + `Handle` argument, instead using the global one. + - Alongside, there should be some means of customizing the handle, either + through a secondary API (`connect` and `connect_handle`), or else as part of + a builder-style API (if there are other customizations that you want to + allow). + +## The story for other Tokio crates + +### `tokio-core` + +This crate is deprecated in favor of the new `tokio` crate. To provide for +interoperation, however, it is also *reimplemented* as a thin layer on top of +the new `tokio` crate, with the ability to dig into this layering. This +reimplementation will be provided as a new, semver-compatible release. + +The key idea is that a `tokio_core` `Core` is a *combination* of a `Reactor` and +`TaskRunner` in this RFC. Likewise, a `tokio_core` `Handle` is a combo of a +`tokio` `Handle` and a `Spawner`. In particular: + +```rust +// in the new tokio_core release: + +use tokio::reactor; +use futures::current_thread; + +impl Core { + fn from_tokio( + reactor: reactor::Reactor, + runner: current_thread::TaskRunner + ) -> Self; + + fn tokio_reactor(&mut self) -> &mut reactor::Reactor; + fn task_runner(&mut self) -> &mut current_thread::TaskRunner; + fn force_shutdown(self); +} + +impl Handle { + fn tokio_handle(&self) -> &reactor::Handle; + fn spawner(&self) -> ¤t_thread::Spawner; +} +``` + +This should allow at least some level of interoperation between libraries using +the current `tokio_core` libraries and those that have migrated to `tokio`. + +### `tokio-io` + +The `tokio-io` crate needs an overall API audit. Its APIs may eventually be +*re*exported within the `tokio` crate, but it also has value as a standalone +crate that provides general I/O utilities for futures. (It may move into +`futures` instead). + +### `tokio-service` + +The `tokio-service` crate is not, in fact, Tokio-specific; it provides a general +way of talking about services and middleware in the futures world. Thus the +likely path is for `tokio-service` to be split out from the Tokio project and +instead provide foundational traits under a new moniker. Regardless, though, the +crate will always remain available in some form. + +### `tokio-proto` + +Finally, there's `tokio-proto`, which provides a very general framework for +Tokio-based servers and clients. + +Part of the original goal was for `tokio-proto` to be expressive enough that you +could build a solid http2 implementation with it; this has not panned out so +far, though it's possible that the crate could be improved to do so. On the +other hand, it's not clear that it's useful to provide that level of +expressiveness in a general-purpose framework. + +So, in general: the future direction for `tokio-proto` is unclear. We need to be +driven by strong, concrete use-cases. If you have one of these, we'd love to +hear about it! + +# Drawbacks +[drawbacks]: #drawbacks + +The clear drawback here is ecosystem churn. While the proposal provides +interoperation with the existing Tokio ecosystem, there's still a push toward +major version bumps across the ecosystem to adapt to the new idioms. The interop +means, however, that it's far less problematic for an app to "mix" the two +worlds, which means that major version bumps don't need to be as coordinated as +they might otherwise be. + +# Rationale and Alternatives +[alternatives]: #alternatives + +The rationale has been discussed pretty extensively throughout. However, a few +aspects of the design have quite plausible alternatives: + +- The `current_thread` module goes to some length to avoid footguns, at the + costs of API complexity and ergonomics. We could instead forgo the use of an + RAII-style API, instead using thread-local storage to allow tasks to be + spawned at any time, without any assurance that they will actually be run. + - Ultimately, the use of `current_thread` for task spawning is expected to be + limited to the "outer layer" of most apps, i.e. to spawning tasks for + handling incoming connections. Therefore, the ergonomic concerns do not seem + *so* concerning, relative to the benefits of footgun protection. + +- The approach to `Handle`s means that libraries need to provide *two* API + surfaces: one using the default global reactor, and one allowing for + customization. In practice, it seems likely that many libraries will skip out + on the second API. Clients that require it will then need to file issues, and + adding support for handle specification after the fact can involve some tricky + refactoring -- especially because there's no simple way to ensure that you + aren't calling some *other* API that's using the default reactor, and should + *now* be passing a customized one. + - An alternative would be to use thread-local storage to set the "current + reactor" in a scoped way. By default, this would be the global reactor, but + you could override it in cases where you need to customize the reactor + that's used. + - Unfortunately, though, this alternative API is subject to some very subtle + bugs when working with futures, because you need to ensure that this + thread-local value is set *at the right time*, which means you need to know + whether the handle is used when *constructing* the future, or when + *executing* it. It was unclear how to design an API that would make it easy + to get this right. + +# Unresolved questions +[unresolved]: #unresolved-questions + +- What should happen with `tokio-io`? +- What are the right use-cases to focus on for `tokio-proto`? From 469d62fc007c85c8ac40756cc71427d3741175dd Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Tue, 19 Sep 2017 14:44:40 -0700 Subject: [PATCH 02/13] Initial batch of revisions --- tokio-reform.md | 131 ++++++++++++++++++++++++++++-------------------- 1 file changed, 78 insertions(+), 53 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index 4c62b6f..384a386 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -50,7 +50,7 @@ async" in Rust. On the documentation side, one mistake we made early on in the Tokio project was to so prominently discuss the `tokio-proto` crate in the documentation. While -the crate was intended to make it very easy to get basic protocol +the crate was intended to make it very easy to get basic request/response protocol implementations up and running, it did not provide a very useful *entry* point for learning Tokio as a whole. Moreover, it turns out that there's a much wider audience for Tokio than there is for `tokio-proto` in particular. It's not @@ -249,12 +249,13 @@ use std::io; use std::env; use std::net::SocketAddr; -use futures::{Future, Stream, IntoFuture, CurrentThread}; +use futures::{Future, Stream, IntoFuture}; +use futures::current_thread::{self, Spawner}; use tokio_core::net::TcpListener; use tokio_io::AsyncRead; use tokio_io::io::copy; -fn serve(addr: SocketAddr) -> impl Future { +fn serve(spawner: &Spawner, addr: SocketAddr) -> impl Future { TcpListener::bind(&addr) .into_future() .and_then(move |socket| { @@ -264,7 +265,7 @@ fn serve(addr: SocketAddr) -> impl Future { .incoming() .for_each(move |(conn, _)| { let (reader, writer) = conn.split(); - CurrentThread.spawn(copy(reader, writer).then(move |result| { + spawner.spawn(copy(reader, writer).then(move |result| { match result { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -283,13 +284,15 @@ fn main() { .parse() .unwrap(); - CurrentThread::run(serve(addr)).unwrap(); + current_thread::block_on_all(|spawner| { + spawner.spawn(serve(spawner, addr)); + }); } ``` This version of the code retains the move to using futures explicitly, but eliminates the explicit event loop setup and handle passing. To execute and -spawn tasks, it uses the `CurrentThread` executor from futures, which +spawn tasks, it uses the `current_thread` executor from futures, which multiplexes tasks onto the current OS thread. A similar API is available for working with the default thread pool instead, which is more appropriate for tasks performing blocking or CPU-heavy work. @@ -311,12 +314,13 @@ use std::env; use std::net::SocketAddr; use futures::prelude::*; +use futures::current_thread::{self, Spawner}; use tokio::net::TcpListener; use tokio_io::AsyncRead; use tokio_io::io::copy; #[async] -fn serve(addr: SocketAddr) -> io::Result<()> { +fn serve(spawner: &Spawner, addr: SocketAddr) -> io::Result<()> { let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); @@ -325,7 +329,7 @@ fn serve(addr: SocketAddr) -> io::Result<()> { // with Tokio, read and write components are distinct: let (reader, writer) = conn.split(); - CurrentThread.spawn(async_block! { + spawner.spawn(async_block! { match await!(copy(reader, writer)) { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -345,7 +349,9 @@ fn main() { .parse() .unwrap(); - CurrentThread::run(serve(addr)).unwrap(); + current_thread::block_on_all(|spawner| { + spawner.spawn(serve(spawner, addr)); + }); } ``` @@ -541,7 +547,6 @@ The reactor module provides just a few types for working with reactors (aka event loops): - `Reactor`, an owned reactor. (Used to be `Core`) -- `ReactorId`, a unique identifier per reactor instance. - `Handle`, a shared, cloneable handle to a reactor. - `PollEvented`, a bridge between `mio` event sources and a reactor @@ -551,25 +556,26 @@ Compared to `Core` today, the `Reactor` API is much the same, but drops the ```rust impl Reactor { fn new() -> Result; - fn id(&self) -> ReactorId; fn handle(&self) -> Handle; - // blocks until an event (or timeout), then dispatches all pending events - fn turn(&mut self, max_wait: Option); + // blocks, turning the event loop and polling the given future until + // either that future completes, or the given timeout expires. + fn turn_until(&mut self, max_wait: Option, f: &mut F) + -> Option> } ``` The `Handle` API is even more slimmed down: ```rust -impl Handle { - // get a handle to the default global event loop - fn global() -> Handle; - - fn id(&self) -> ReactorId; +impl Default for Handle { + // get a handle to the default global event loop ... } ``` +Most usage of the `Handle` type comes through *parameters* to various I/O object +construction functions, as we'll see in a moment. + Unlike today, though, `Handle` is `Send` and `Sync` (as well as `Clone`). Handles are used solely to tie particular I/O objects to particular event loops. In particular, the `PollEvented` API stays exactly as-is, and is the primitive @@ -585,9 +591,9 @@ which play a very similar role. The `net` module remains almost exactly as it is in `tokio-core` today, with one key difference: APIs for constructing I/O objects come in two flavors, one that -uses the default global reactor, and one that takes an explicit handle. For the -latter, when there are already multiple configuration options, this RFC proposes -switching to a builder. +uses the default global reactor, and one that takes an explicit +handle. Generally the convenient, "default" mode of construction doesn't take a +handle, and those involving customizations do. #### Example: `TcpListener` @@ -596,15 +602,12 @@ impl TcpListener { // set up a listener with the global event loop fn bind(addr: &SocketAddr) -> io::Result; - fn builder(addr: &SocketAddr) -> TcpListenerBuilder; -} - -struct TcpListenerBuilder { .. } + // set up a fully-customized listener + fn from_listener(listener: std::net::TcpListener, handle: &Handle) -> io::Result; -impl TcpListenerBuilder { - fn handle(&mut self, handle: Handle) -> &mut Self; - fn bind(&self) -> io::Result; - fn from_listener(&self, listener: std::net::TcpListener) -> io::Result; + // this now yields a *std* TcpStream, so that you can use `TcpStream::from_stream` to + // associate it with an handle of your choice. + fn accept(&mut self) -> Result<(std::net::TcpStream, SocketAddr)> } ``` @@ -615,17 +618,9 @@ impl TcpStream { // set up a stream with the global event loop fn connect(addr: &SocketAddr) -> TcpStreamNew; - fn builder() -> TcpStreamBuilder; -} - -struct TcpStreamBuilder { .. } - -impl TcpStreamBuilder { - fn handle(&mut self, handle: Handle) -> &mut Self; - fn stream(&mut self, stream: std::net::TcpStream) -> &mut Self; - - fn connect(&self, addr: &SocketAddr) -> TcpStreamNew; - fn from_stream(&self) -> TcpStreamNew; + // these are as today + fn from_stream(stream: TcpStream, handle: &Handle) -> Result; + fn connect_stream(stream: TcpStream, addr: &SocketAddr, handle: &Handle) -> TcpStreamNew } ``` @@ -633,28 +628,34 @@ impl TcpStreamBuilder { The `timer` module will contain the `Timeout` and `Interval` types currently in `tokio_core`'s `reactor` module. Their APIs will be adjusted along similar lines -as the `net` APIs, except that they will *only* support a builder-style API: +as the `net` APIs, except that they will *only* support a handle-free mode, +which *always* uses the default global reactor. (In general, we need control +over the way the event loop is run in order to support timers.) ```rust impl Timeout { - fn build() -> TimeoutBuilder; + fn new(dur: Duration) -> Result + fn new_at(at: Instant) -> Result fn reset(&mut self, at: Instant); } -impl TimeoutBuilder { - fn handle(&mut self, handle: Handle) -> &mut Self; +impl Future for Timeout { .. } - fn at(&mut self, time: Instant) -> Timeout; - fn after(&mut self, dur: Duration) -> Timeout; +impl Interval { + fn new(dur: Duration) -> Result + fn new_at(at: Instant, dur: Duration) -> Result + fn reset(&mut self, at: Instant); } + +impl Stream for Interval { .. } ``` ### The `io` module -Finally, there may *eventually* be an `io` modulewith the full contents of the -`tokio-io` crate. However, those APIs need a round of scrutiny first (the -subject of a future RFC), and they may ultimately be moved into the `futures` -crate instead. +Finally, there may *eventually* be an `io` module with the some portion of +contents of the `tokio-io` crate. However, those APIs need a round of scrutiny +first (the subject of a future RFC), and they may ultimately be moved into the +`futures` crate instead. An eventual goal, in any case, is that you can put Tokio to good use by bringing in only the `tokio` and `futures` crates. @@ -691,6 +692,10 @@ pub mod current_thread { // NB: no 'static requirement on the future pub fn block_until(f: F) -> Result; + // A convenience function that creates a new task runner, invokes the given + // closure with a handle to it, then executes all spawned tasks to completion. + pub fn block_on_all(f: F) where F: FnOnce(&Spawner); + // An object for cooperatively executing multiple tasks on a single thread. // Useful for working with non-`Send` futures. // @@ -705,7 +710,7 @@ pub mod current_thread { pub fn spawner(&self) -> Spawner; // Blocks, running all *non-daemon* tasks to completion. - pub fn block_on_all(&mut self); + pub fn block_on_all(&self); // Blocks, pushing all tasks forward but returning when the given future // completes. @@ -723,6 +728,13 @@ pub mod current_thread { // (2) there are non-daemon tasks remaining } + impl Futue for TaskRunner { + type Item = (); + type Error = (); + + // polls *all* current tasks + } + // NB: this is not `Send` #[derive(Clone)] pub struct Spawner { .. } @@ -787,8 +799,9 @@ pub mod executor { } ``` -This API should be used in functions like `block_until` that enter -executors. Consequently, nested uses of `TaskRunner`, or usage of +This API should be used in functions like `block_until` that enter executors, +i.e. that block until some future completes (or other event +occurs). Consequently, nested uses of `TaskRunner`, or usage of `current_thread::block_until` within a `TaskRunner` or thread pool, will panic, alerting the user to a bug. @@ -876,6 +889,18 @@ far, though it's possible that the crate could be improved to do so. On the other hand, it's not clear that it's useful to provide that level of expressiveness in a general-purpose framework. +It seems like there are three basic paths forward: + +- Improve `tokio-proto` such that is can support the full `h2` + implementation. It's not clear to what extent doing so would benefit *other* + protocols, however. +- Refocus `tokio-proto` on simpler use-cases and ergonomics, e.g. by removing + streaming bodies entirely. +- Deprecate it, in favor of direct, custom server implementations. + +It's also not clear what the long-term story for Hyper is; it currently uses +`tokio-proto`, but may end up using `h2` instead. + So, in general: the future direction for `tokio-proto` is unclear. We need to be driven by strong, concrete use-cases. If you have one of these, we'd love to hear about it! From 18d8dfe8cbe7e8ab0c4f823c65f370380f47107f Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 27 Sep 2017 09:54:37 -0700 Subject: [PATCH 03/13] Revisions to clarify event loop customization and improve the single-threaded executor APIs --- tokio-reform.md | 208 ++++++++++++++++++++++++++---------------------- 1 file changed, 112 insertions(+), 96 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index 384a386..ce9672c 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -4,22 +4,27 @@ This RFC proposes to simplify and focus the Tokio project, in an attempt to make it easier to learn and more productive to use. Specifically: -* Add a global event loop in `tokio-core` that is managed automatically by - default. This change eliminates the need for setting up and managing your own - event loop in the vast majority of cases. +* Add a *default* global event loop, eliminating the need for setting up and + managing your own event loop in the vast majority of cases. - * Moreover, remove the distinction between `Handle` and `Remote` in - `tokio-core` by making `Handle` both `Send` and `Sync` and deprecating - `Remote`. Thus, even working with custom event loops becomes simpler. + * Moreover, remove the distinction between `Handle` and `Remote` by making + `Handle` both `Send` and `Sync` and deprecating `Remote`. Thus, even working + with custom event loops becomes simpler. + + * Allow swapping out this default event loop for those who want to exercise + full control. * Decouple all task execution functionality from Tokio, instead providing it - through a standard futures component. As with event loops, provide a default - global thread pool that suffices for the majority of use-cases, removing the - need for any manual setup. + through a standard futures component. - * Moreover, when running tasks thread-locally (for non-`Send` futures), + * When running tasks thread-locally (for non-`Send` futures), provide more fool-proof APIs that help avoid lost wakeups. + * Eventually provide some default thread pools as well (out of scope for this RFC). + +* Similarly, decouple timer futures from Tokio, providing functionality instead + through a new `futures-timer` crate. + * Provide the above changes in a new `tokio` crate, which is a slimmed down version of today's `tokio-core`, and may *eventually* re-export the contents of `tokio-io`. The `tokio-core` crate is deprecated, but will remain available @@ -532,11 +537,10 @@ To begin with, the new `tokio` crate will look like a streamlined version of `tokio-core`. Eventually it will also re-export APIs from `tokio-io`, once those APIs have been fully vetted. -At the top level, the crate will provide four submodules: +At the top level, the crate will provide three submodules: - `reactor`: for manual configuration and control over event loops. - `net`: I/O objects for async networking. -- `timer`: APIs for async timers. - `io`: general utilities for async I/O (much like `std::io`) Let's look at each in turn. @@ -549,6 +553,7 @@ event loops): - `Reactor`, an owned reactor. (Used to be `Core`) - `Handle`, a shared, cloneable handle to a reactor. - `PollEvented`, a bridge between `mio` event sources and a reactor +- `SetDefault`, an RAII object for overriding the default reactor. Compared to `Core` today, the `Reactor` API is much the same, but drops the `run` and `remote` methods and the `Executor` implementation: @@ -581,11 +586,55 @@ Handles are used solely to tie particular I/O objects to particular event loops. In particular, the `PollEvented` API stays exactly as-is, and is the primitive way that I/O objects are registered onto a reactor. -In the future, we expect to provide ways to configure reactors, and to override -the default reactor within a scope. These APIs will be modeled after -Rayon's -[configuration APIs](https://docs.rs/rayon/0.8.2/rayon/struct.Configuration.html), -which play a very similar role. +Finally, we have an API for customizing the default reactor handle: + +```rust +struct SetDefault<'a> { + handle: &'a Handle, + _marker: marker::PhantomData>, // no sharing to other threads +} + +impl<'a> SetDefault<'a> { + // Changes the return value of `Handle::default()` to return a + // clone of the `handle` provided. + // + // This function is used to change the default global event loop + // as identified through `Handle::default()`. The new default applies to + // the *current OS thread* only; as long as the returned object is live, + // the thread will return a clone of the given `Handle` on calls to + // `Handle::default()`. This, in particular, prevents the `Handle::default()` + // from spinning up a lazily-initialized thread. + // + // When the returned object goes out of scope then the return value of + // `Handle::default()` will revert to the (lazily-initialized) default + // event loop. + // + // This function is intended to be used within *applications* at executor + // granularity; it is not intended as a general-purpose mechanism to eschew + // passing a `Handle` for library APIs. This function can only be called + // *once* recursively on a thread: reentrant uses will panic. + // + // # Panics + // + // This function will panic if there's another `SetDefault` object already + // active for this thread. In other words, calling this function a second + // time on one thread where the first return value is alive will cause a panic. + pub fn new(handle: &'a Handle) -> SetDefault<'a> { + // panic if tls is `Some` + // set scoped tls to `Some(handle)` + SetDefault { + handle + _marker: marker::PhantomData, + } + } +} + +impl<'a> Drop for SetDefault<'a> { + fn drop(&mut self) { + // set scoped tls to `None` + } +} +``` ### The `net` module @@ -624,32 +673,6 @@ impl TcpStream { } ``` -### The `timer` module - -The `timer` module will contain the `Timeout` and `Interval` types currently in -`tokio_core`'s `reactor` module. Their APIs will be adjusted along similar lines -as the `net` APIs, except that they will *only* support a handle-free mode, -which *always* uses the default global reactor. (In general, we need control -over the way the event loop is run in order to support timers.) - -```rust -impl Timeout { - fn new(dur: Duration) -> Result - fn new_at(at: Instant) -> Result - fn reset(&mut self, at: Instant); -} - -impl Future for Timeout { .. } - -impl Interval { - fn new(dur: Duration) -> Result - fn new_at(at: Instant, dur: Duration) -> Result - fn reset(&mut self, at: Instant); -} - -impl Stream for Interval { .. } -``` - ### The `io` module Finally, there may *eventually* be an `io` module with the some portion of @@ -666,10 +689,11 @@ In general, futures are spawned onto *executors*, which are responsible for polling them to completion. There are basically two ways this can work: either you run a future on your thread, or someone else's (i.e. a thread pool). -The threadpool case is provided by crates like [futures_cpupool], but the intent -of this RFC is that the `futures` crate itself should provide a configurable, -default global thread pool, which provides similar benefits to providing a -global event loop. Exact details are out of scope for this RFC. +The threadpool case is provided by crates like [futures_cpupool]; eventually, a +crate along these lines should provide a configurable, default global thread +pool, which provides similar benefits to providing a global event loop. This +functionality may eventually want to live in the `futures` crate. Exact details +are out of scope for this RFC. [futures_cpupool]: https://docs.rs/futures-cpupool/0.1.5/futures_cpupool/ @@ -685,54 +709,24 @@ The `futures` crate will add a module, `current_thread`, with the following contents: ```rust -pub mod current_thread { +pub mod thread { // Execute the given future *synchronously* on the current thread, blocking until // it completes and returning its result. // // NB: no 'static requirement on the future pub fn block_until(f: F) -> Result; - // A convenience function that creates a new task runner, invokes the given - // closure with a handle to it, then executes all spawned tasks to completion. - pub fn block_on_all(f: F) where F: FnOnce(&Spawner); + // Blocks until either all non-daemon tasks complete, or `force_shutdown` is invoked + pub fn block_on_all(f: F) where F: FnOnce(KillSwitch, &Spawner); - // An object for cooperatively executing multiple tasks on a single thread. - // Useful for working with non-`Send` futures. + // A handle for forcibly shutting down all running tasks. // // NB: this is not `Send` - pub struct TaskRunner { .. } - - impl TaskRunner { - pub fn new() -> Self; - - // To spawn tasks onto this runner, you need a separate, cloneable - // `Spawner` object. - pub fn spawner(&self) -> Spawner; - - // Blocks, running all *non-daemon* tasks to completion. - pub fn block_on_all(&self); - - // Blocks, pushing all tasks forward but returning when the given future - // completes. - // - // NB: no 'static requirement on the future - pub fn block_until(&mut self, f: F) -> Result; - - // Kills off *all* remaining tasks (daemon or not) - pub fn force_shutdown(self); - } + pub struct KillSwitch { .. } - impl Drop for TaskRunner { - // panics if: - // (1) not already panicking and - // (2) there are non-daemon tasks remaining - } - - impl Futue for TaskRunner { - type Item = (); - type Error = (); - - // polls *all* current tasks + impl KillSwitch { + // Cancels off *all* remaining tasks (daemon or not) + pub fn cancel_all(self); } // NB: this is not `Send` @@ -766,9 +760,9 @@ This suite of functionality replaces two APIs provided in the Tokio stack today: of other ecosystems like Finagle in Scala, having a blocking method so easily within reach on futures leads people down the wrong path. While it's vitally important to have this "bridge" between the async and sync worlds, providing - it as `current_thread::block_until` makes it much more clear what is involved. + it as `thread::block_until` makes it much more clear what is involved. -- The `TaskRunner` and `Spawner` types replace the use of `Handle` today for +- The `KillSwitch` and `Spawner` types replace the use of `Handle` today for cooperative, non-`Send` task execution. Unlike `Handle`, though, this API is carefully crafted to help ensure that spawned tasks are actually run to completion, unless explicitly requested otherwise. @@ -776,6 +770,10 @@ This suite of functionality replaces two APIs provided in the Tokio stack today: Thus, in addition to providing a cleaner factoring, these two APIs also mitigate two major footguns with today's Tokio stack. +Those wishing to couple a reactor and a single-threaded executor, as today's +`tokio-core` does, should use `FuturesUnordered` together with a custom reactor +to do so. + ### Executors in general Another footgun we want to watch out for is accidentally trying to spin up @@ -805,6 +803,33 @@ occurs). Consequently, nested uses of `TaskRunner`, or usage of `current_thread::block_until` within a `TaskRunner` or thread pool, will panic, alerting the user to a bug. +## The `futures-timer` crate + +The `futures-timer` crate will contain the `Timeout` and `Interval` types +currently in `tokio_core`'s `reactor` module: + +```rust +impl Timeout { + fn new(dur: Duration) -> Result + fn new_at(at: Instant) -> Result + fn reset(&mut self, at: Instant); +} + +impl Future for Timeout { .. } + +impl Interval { + fn new(dur: Duration) -> Result + fn new_at(at: Instant, dur: Duration) -> Result + fn reset(&mut self, at: Instant); +} + +impl Stream for Interval { .. } +``` + +These functions will lazily initialize a dedicated timer thread. We will eventually +provide a means of customization, similar to `SetDefault` for reactor, but that's +out of scope for this RFC. + ## Migration Guide While `tokio` and `tokio-core` will provide some level of interoperation, the @@ -921,15 +946,6 @@ they might otherwise be. The rationale has been discussed pretty extensively throughout. However, a few aspects of the design have quite plausible alternatives: -- The `current_thread` module goes to some length to avoid footguns, at the - costs of API complexity and ergonomics. We could instead forgo the use of an - RAII-style API, instead using thread-local storage to allow tasks to be - spawned at any time, without any assurance that they will actually be run. - - Ultimately, the use of `current_thread` for task spawning is expected to be - limited to the "outer layer" of most apps, i.e. to spawning tasks for - handling incoming connections. Therefore, the ergonomic concerns do not seem - *so* concerning, relative to the benefits of footgun protection. - - The approach to `Handle`s means that libraries need to provide *two* API surfaces: one using the default global reactor, and one allowing for customization. In practice, it seems likely that many libraries will skip out From c50f6d988dd9ddbc03ce26ace3635e5172e3f0ee Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 27 Sep 2017 10:01:32 -0700 Subject: [PATCH 04/13] Trivial change to bump github --- tokio-reform.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tokio-reform.md b/tokio-reform.md index ce9672c..9844c04 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -20,7 +20,8 @@ it easier to learn and more productive to use. Specifically: * When running tasks thread-locally (for non-`Send` futures), provide more fool-proof APIs that help avoid lost wakeups. - * Eventually provide some default thread pools as well (out of scope for this RFC). + * Eventually provide some default thread pools as well (out of scope for this + RFC). * Similarly, decouple timer futures from Tokio, providing functionality instead through a new `futures-timer` crate. From c2eaaf3d62cd29474d67207434606c7c6a9021e0 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 27 Sep 2017 10:44:27 -0700 Subject: [PATCH 05/13] Fix up `turn` method --- tokio-reform.md | 21 +++++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index 9844c04..7690a4b 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -560,17 +560,30 @@ Compared to `Core` today, the `Reactor` API is much the same, but drops the `run` and `remote` methods and the `Executor` implementation: ```rust +impl<'a> From<&'a Reactor> for NotifyHandle { /* ... */ } + impl Reactor { fn new() -> Result; fn handle(&self) -> Handle; - // blocks, turning the event loop and polling the given future until - // either that future completes, or the given timeout expires. - fn turn_until(&mut self, max_wait: Option, f: &mut F) - -> Option> + // blocks, turning the event loop until either notified by a future, + // or the given timeout expires. + fn turn(&mut self, max_wait: Option) -> Turn; +} + +struct Turn { /* ... */ } + +impl Turn { + // says whether the wakeup was due to a future (and gives the + // notification ID if so), or due to a timeout. + fn last_notify_id(&self) -> Option; } ``` +The `turn` API is intended to be used in conjunction with the futures executors +API if you wish to have futures notify the event loop (because you are polling +them on the same thread). + The `Handle` API is even more slimmed down: ```rust From 8c56891074616621f233ad1fa85e53774dafb30d Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 27 Sep 2017 10:47:51 -0700 Subject: [PATCH 06/13] Minor fixes to `block_on_all` --- tokio-reform.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index 7690a4b..7dee070 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -290,7 +290,7 @@ fn main() { .parse() .unwrap(); - current_thread::block_on_all(|spawner| { + current_thread::block_on_all(|_, spawner| { spawner.spawn(serve(spawner, addr)); }); } @@ -355,7 +355,7 @@ fn main() { .parse() .unwrap(); - current_thread::block_on_all(|spawner| { + current_thread::block_on_all(|_, spawner| { spawner.spawn(serve(spawner, addr)); }); } @@ -731,7 +731,7 @@ pub mod thread { pub fn block_until(f: F) -> Result; // Blocks until either all non-daemon tasks complete, or `force_shutdown` is invoked - pub fn block_on_all(f: F) where F: FnOnce(KillSwitch, &Spawner); + pub fn block_on_all(f: F) where F: FnOnce(KillSwitch, Spawner); // A handle for forcibly shutting down all running tasks. // From af0cbf18a390b873e4cebde3ec013becf14f0ace Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 27 Sep 2017 14:56:41 -0700 Subject: [PATCH 07/13] Minor API tweaks --- tokio-reform.md | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index 7dee070..e8bdf5d 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -378,7 +378,7 @@ The *long-term* goal is to provide a complete async story that feels very close synchronous programming, i.e. something like: ```rust -async fn serve(addr: SocketAddr) -> io::Result<()> { +async fn serve(spawner: &Spawner,addr: SocketAddr) -> io::Result<()> { let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); @@ -386,7 +386,7 @@ async fn serve(addr: SocketAddr) -> io::Result<()> { // with Tokio, read and write components are distinct: let (reader, writer) = conn.split(); - CurrentThread.spawn(async { + spawner.spawn(async { match await copy(reader, writer) { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -566,30 +566,26 @@ impl Reactor { fn new() -> Result; fn handle(&self) -> Handle; - // blocks, turning the event loop until either notified by a future, + // blocks, turning the event loop until either woken through a handle, // or the given timeout expires. fn turn(&mut self, max_wait: Option) -> Turn; } +// No contents/API initially, but we may want to expand this in the future struct Turn { /* ... */ } - -impl Turn { - // says whether the wakeup was due to a future (and gives the - // notification ID if so), or due to a timeout. - fn last_notify_id(&self) -> Option; -} ``` -The `turn` API is intended to be used in conjunction with the futures executors -API if you wish to have futures notify the event loop (because you are polling -them on the same thread). - -The `Handle` API is even more slimmed down: +The `Handle` API is also slimmed down: ```rust impl Default for Handle { // get a handle to the default global event loop ... } + +impl Handle { + // wakes up the corresponding reactor if it is blocking + fn wakeup(&self); +} ``` Most usage of the `Handle` type comes through *parameters* to various I/O object @@ -641,6 +637,10 @@ impl<'a> SetDefault<'a> { _marker: marker::PhantomData, } } + + // Keeps this new default handle installed for the duration of the thread's + // execution. + fn make_permanent(self); } impl<'a> Drop for SetDefault<'a> { @@ -717,9 +717,9 @@ cooperatively onto their thread of origin, and is *currently* provided by the this RFC completely decouples this functionality, moving it instead into the `futures` crate. -### The `current_thread` module +### The `thread` module -The `futures` crate will add a module, `current_thread`, with the following +The `futures` crate will add a module, `thread`, with the following contents: ```rust From 609b3169cdb4819276626cf1d7e37a46ad29b4a7 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Thu, 28 Sep 2017 12:48:26 -0700 Subject: [PATCH 08/13] Remove references to TaskRunner; remove SetDefault and tie reactor defaults to executors --- tokio-reform.md | 174 +++++++++++++++++++----------------------------- 1 file changed, 67 insertions(+), 107 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index e8bdf5d..51d8fb3 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -256,12 +256,12 @@ use std::env; use std::net::SocketAddr; use futures::{Future, Stream, IntoFuture}; -use futures::current_thread::{self, Spawner}; +use futures::thread::{self, Controller}; use tokio_core::net::TcpListener; use tokio_io::AsyncRead; use tokio_io::io::copy; -fn serve(spawner: &Spawner, addr: SocketAddr) -> impl Future { +fn serve(cont: &Controller, addr: SocketAddr) -> impl Future { TcpListener::bind(&addr) .into_future() .and_then(move |socket| { @@ -271,7 +271,7 @@ fn serve(spawner: &Spawner, addr: SocketAddr) -> impl Future println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -290,15 +290,15 @@ fn main() { .parse() .unwrap(); - current_thread::block_on_all(|_, spawner| { - spawner.spawn(serve(spawner, addr)); + thread::block_on_all(|cont| { + cont.spawn(serve(&cont, addr)); }); } ``` This version of the code retains the move to using futures explicitly, but eliminates the explicit event loop setup and handle passing. To execute and -spawn tasks, it uses the `current_thread` executor from futures, which +spawn tasks, it uses the `thread` executor from futures, which multiplexes tasks onto the current OS thread. A similar API is available for working with the default thread pool instead, which is more appropriate for tasks performing blocking or CPU-heavy work. @@ -320,13 +320,13 @@ use std::env; use std::net::SocketAddr; use futures::prelude::*; -use futures::current_thread::{self, Spawner}; +use futures::thread::{self, Controller}; use tokio::net::TcpListener; use tokio_io::AsyncRead; use tokio_io::io::copy; #[async] -fn serve(spawner: &Spawner, addr: SocketAddr) -> io::Result<()> { +fn serve(cont: &Controller, addr: SocketAddr) -> io::Result<()> { let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); @@ -335,7 +335,7 @@ fn serve(spawner: &Spawner, addr: SocketAddr) -> io::Result<()> { // with Tokio, read and write components are distinct: let (reader, writer) = conn.split(); - spawner.spawn(async_block! { + cont.spawn(async_block! { match await!(copy(reader, writer)) { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -355,8 +355,8 @@ fn main() { .parse() .unwrap(); - current_thread::block_on_all(|_, spawner| { - spawner.spawn(serve(spawner, addr)); + thread::block_on_all(|cont| { + cont.spawn(serve(&cont, addr)); }); } ``` @@ -378,7 +378,7 @@ The *long-term* goal is to provide a complete async story that feels very close synchronous programming, i.e. something like: ```rust -async fn serve(spawner: &Spawner,addr: SocketAddr) -> io::Result<()> { +async fn serve(cont: &Controller, addr: SocketAddr) -> io::Result<()> { let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); @@ -386,7 +386,7 @@ async fn serve(spawner: &Spawner,addr: SocketAddr) -> io::Result<()> { // with Tokio, read and write components are distinct: let (reader, writer) = conn.split(); - spawner.spawn(async { + cont.spawn(async { match await copy(reader, writer) { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -554,25 +554,22 @@ event loops): - `Reactor`, an owned reactor. (Used to be `Core`) - `Handle`, a shared, cloneable handle to a reactor. - `PollEvented`, a bridge between `mio` event sources and a reactor -- `SetDefault`, an RAII object for overriding the default reactor. Compared to `Core` today, the `Reactor` API is much the same, but drops the `run` and `remote` methods and the `Executor` implementation: ```rust -impl<'a> From<&'a Reactor> for NotifyHandle { /* ... */ } - impl Reactor { - fn new() -> Result; - fn handle(&self) -> Handle; + pub fn new() -> Result; + pub fn handle(&self) -> Handle; // blocks, turning the event loop until either woken through a handle, // or the given timeout expires. - fn turn(&mut self, max_wait: Option) -> Turn; + pub fn turn(&mut self, max_wait: Option) -> Turn; } // No contents/API initially, but we may want to expand this in the future -struct Turn { /* ... */ } +pub struct Turn { /* ... */ } ``` The `Handle` API is also slimmed down: @@ -583,8 +580,16 @@ impl Default for Handle { } impl Handle { - // wakes up the corresponding reactor if it is blocking - fn wakeup(&self); + // Wakes up the corresponding reactor if it is blocking + pub fn wakeup(&self); + + // Sets this handle as the default (returned by `Handle::default`) + // within the current thread, for the duration of `Enter`'s lifetime. + // + // The `Enter` type is explained in a later section on executors, + // but the point here is that defaults are tied to executor + // granularity. + pub fn make_default_for(&self, &mut Enter); } ``` @@ -596,60 +601,6 @@ Handles are used solely to tie particular I/O objects to particular event loops. In particular, the `PollEvented` API stays exactly as-is, and is the primitive way that I/O objects are registered onto a reactor. -Finally, we have an API for customizing the default reactor handle: - -```rust -struct SetDefault<'a> { - handle: &'a Handle, - _marker: marker::PhantomData>, // no sharing to other threads -} - -impl<'a> SetDefault<'a> { - // Changes the return value of `Handle::default()` to return a - // clone of the `handle` provided. - // - // This function is used to change the default global event loop - // as identified through `Handle::default()`. The new default applies to - // the *current OS thread* only; as long as the returned object is live, - // the thread will return a clone of the given `Handle` on calls to - // `Handle::default()`. This, in particular, prevents the `Handle::default()` - // from spinning up a lazily-initialized thread. - // - // When the returned object goes out of scope then the return value of - // `Handle::default()` will revert to the (lazily-initialized) default - // event loop. - // - // This function is intended to be used within *applications* at executor - // granularity; it is not intended as a general-purpose mechanism to eschew - // passing a `Handle` for library APIs. This function can only be called - // *once* recursively on a thread: reentrant uses will panic. - // - // # Panics - // - // This function will panic if there's another `SetDefault` object already - // active for this thread. In other words, calling this function a second - // time on one thread where the first return value is alive will cause a panic. - pub fn new(handle: &'a Handle) -> SetDefault<'a> { - // panic if tls is `Some` - // set scoped tls to `Some(handle)` - SetDefault { - handle - _marker: marker::PhantomData, - } - } - - // Keeps this new default handle installed for the duration of the thread's - // execution. - fn make_permanent(self); -} - -impl<'a> Drop for SetDefault<'a> { - fn drop(&mut self) { - // set scoped tls to `None` - } -} -``` - ### The `net` module The `net` module remains almost exactly as it is in `tokio-core` today, with one @@ -731,25 +682,17 @@ pub mod thread { pub fn block_until(f: F) -> Result; // Blocks until either all non-daemon tasks complete, or `force_shutdown` is invoked - pub fn block_on_all(f: F) where F: FnOnce(KillSwitch, Spawner); + pub fn block_on_all(f: F) where F: FnOnce(Controller); - // A handle for forcibly shutting down all running tasks. + // A handle used for controlling task spawning and execution within `block_on_all` // - // NB: this is not `Send` - pub struct KillSwitch { .. } - - impl KillSwitch { - // Cancels off *all* remaining tasks (daemon or not) - pub fn cancel_all(self); - } - // NB: this is not `Send` #[derive(Clone)] - pub struct Spawner { .. } + pub struct Controller { .. } - impl Spawner { + impl Controller { // Spawns a "standard" task, i.e. one that must be explicitly either - // blocked on or killed off before the associated `TaskRunner` is destroyed. + // blocked on or killed off before `block_on_all` will return. pub fn spawn(&self, task: F) where F: Future + 'static; @@ -757,9 +700,15 @@ pub mod thread { // for completion. pub fn spawn_daemon(&self, task: F) where F: Future + 'static; + + // Cancels off *all* remaining tasks (daemon or not) + pub fn cancel_all(&self); + + // Extract proof that we're in an executor context (see below) + pub fn enter(&self) -> &Enter; } - impl Executor for Spawner + impl Executor for Controller where F: Future + 'static { // ... @@ -792,7 +741,7 @@ to do so. Another footgun we want to watch out for is accidentally trying to spin up multiple executors on a single thread. The most common case is using -`current_thread::block_until` on, say, a thread pool's worker thread. +`thread::block_until` on, say, a thread pool's worker thread. We can mitigate this easily by providing an API for "executor binding" that amounts to a thread-local boolean: @@ -805,6 +754,19 @@ pub mod executor { // an executor. Panics if the current thread is *already* marked. pub fn enter() -> Enter; + impl Enter { + // Register a callback to be invoked if and when the thread + // ceased to act as an executor. + pub fn on_exit(&mut self, f: F) where F: FnOnce(); + + // Treat the remainder of execution on this thread as part of an + // executor; used mostly for thread pool worker threads. + // + // All registered `on_exit` callbacks are *dropped* without being + // invoked. + pub fn make_permanent(self); + } + impl Drop for Enter { // Exits the dynamic extent of an executor, unbinding the thread } @@ -813,9 +775,15 @@ pub mod executor { This API should be used in functions like `block_until` that enter executors, i.e. that block until some future completes (or other event -occurs). Consequently, nested uses of `TaskRunner`, or usage of -`current_thread::block_until` within a `TaskRunner` or thread pool, will panic, -alerting the user to a bug. +occurs). Consequently, nested uses of `block_until`, or uses within a thread +pool, will panic, alerting the user to a bug. + +Executors should publicly expose access to an `Enter` only during +initialization, e.g. many executors provide for "initialization hooks" for +setting up worker threads, and these hooks and provide temporary access to an +`Enter`. That restriction is leveraged by methods like +`Handle::make_default_for`, which are intended to only be used during executor +thread initialization. ## The `futures-timer` crate @@ -853,7 +821,7 @@ version bump: - Uses of `Remote`/`Handle` that don't involve spawning threads can use the new `Handle`. -- Local task spawning should migrate to use the `current_thread` module. +- Local task spawning should migrate to use the `thread` module. - APIs that need to construct I/O objects, or otherwise require a `Handle`, should follow the pattern laid out in the `net` module: - There should be a simple standard API that does not take an explicit @@ -873,29 +841,21 @@ the new `tokio` crate, with the ability to dig into this layering. This reimplementation will be provided as a new, semver-compatible release. The key idea is that a `tokio_core` `Core` is a *combination* of a `Reactor` and -`TaskRunner` in this RFC. Likewise, a `tokio_core` `Handle` is a combo of a -`tokio` `Handle` and a `Spawner`. In particular: +a custom thread-local executor, and you can thus extract out the underlying pieces: ```rust // in the new tokio_core release: use tokio::reactor; -use futures::current_thread; +use futures::thread; impl Core { - fn from_tokio( - reactor: reactor::Reactor, - runner: current_thread::TaskRunner - ) -> Self; - + fn from_tokio_reactor(reactor: reactor::Reactor) -> Self; fn tokio_reactor(&mut self) -> &mut reactor::Reactor; - fn task_runner(&mut self) -> &mut current_thread::TaskRunner; - fn force_shutdown(self); } impl Handle { fn tokio_handle(&self) -> &reactor::Handle; - fn spawner(&self) -> ¤t_thread::Spawner; } ``` @@ -905,7 +865,7 @@ the current `tokio_core` libraries and those that have migrated to `tokio`. ### `tokio-io` The `tokio-io` crate needs an overall API audit. Its APIs may eventually be -*re*exported within the `tokio` crate, but it also has value as a standalone +reexported within the `tokio` crate, but it also has value as a standalone crate that provides general I/O utilities for futures. (It may move into `futures` instead). From ac34f6eb06f21937a5841cfd67cf44efca231920 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Thu, 28 Sep 2017 12:50:25 -0700 Subject: [PATCH 09/13] rename Timeout --- tokio-reform.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index 51d8fb3..f9e244f 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -787,17 +787,17 @@ thread initialization. ## The `futures-timer` crate -The `futures-timer` crate will contain the `Timeout` and `Interval` types +The `futures-timer` crate will contain the `Delay` (used to be `Timeout`) and `Interval` types currently in `tokio_core`'s `reactor` module: ```rust -impl Timeout { - fn new(dur: Duration) -> Result - fn new_at(at: Instant) -> Result +impl Delay { + fn new(dur: Duration) -> Result + fn new_at(at: Instant) -> Result fn reset(&mut self, at: Instant); } -impl Future for Timeout { .. } +impl Future for Delay { .. } impl Interval { fn new(dur: Duration) -> Result From 853421b76dfc6f83155fabc89adcbea1859db195 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Thu, 28 Sep 2017 15:06:33 -0700 Subject: [PATCH 10/13] Update mention of KillSwitch --- tokio-reform.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index f9e244f..af507aa 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -725,10 +725,10 @@ This suite of functionality replaces two APIs provided in the Tokio stack today: important to have this "bridge" between the async and sync worlds, providing it as `thread::block_until` makes it much more clear what is involved. -- The `KillSwitch` and `Spawner` types replace the use of `Handle` today for - cooperative, non-`Send` task execution. Unlike `Handle`, though, this API is - carefully crafted to help ensure that spawned tasks are actually run to - completion, unless explicitly requested otherwise. +- The `Controller` type replaces the use of `Handle` today for cooperative, + non-`Send` task execution. Unlike `Handle`, though, this API is carefully + crafted to help ensure that spawned tasks are actually run to completion, + unless explicitly requested otherwise. Thus, in addition to providing a cleaner factoring, these two APIs also mitigate two major footguns with today's Tokio stack. From ff9e944f55bd062d12bd3ae352627012ff470069 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 18 Oct 2017 11:45:23 -0700 Subject: [PATCH 11/13] Remove `Controller` --- tokio-reform.md | 166 ++++++++++++++++++++++++++++-------------------- 1 file changed, 96 insertions(+), 70 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index af507aa..bca4a16 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -256,12 +256,12 @@ use std::env; use std::net::SocketAddr; use futures::{Future, Stream, IntoFuture}; -use futures::thread::{self, Controller}; +use futures::thread; use tokio_core::net::TcpListener; use tokio_io::AsyncRead; use tokio_io::io::copy; -fn serve(cont: &Controller, addr: SocketAddr) -> impl Future { +fn serve(addr: SocketAddr) -> impl Future { TcpListener::bind(&addr) .into_future() .and_then(move |socket| { @@ -271,7 +271,7 @@ fn serve(cont: &Controller, addr: SocketAddr) -> impl Future println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), @@ -290,9 +290,7 @@ fn main() { .parse() .unwrap(); - thread::block_on_all(|cont| { - cont.spawn(serve(&cont, addr)); - }); + thread::block_on_all(serve(addr)); } ``` @@ -320,13 +318,13 @@ use std::env; use std::net::SocketAddr; use futures::prelude::*; -use futures::thread::{self, Controller}; +use futures::thread; use tokio::net::TcpListener; use tokio_io::AsyncRead; use tokio_io::io::copy; #[async] -fn serve(cont: &Controller, addr: SocketAddr) -> io::Result<()> { +fn serve(addr: SocketAddr) -> io::Result<()> { let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); @@ -335,16 +333,14 @@ fn serve(cont: &Controller, addr: SocketAddr) -> io::Result<()> { // with Tokio, read and write components are distinct: let (reader, writer) = conn.split(); - cont.spawn(async_block! { + thread::spawn_task(async_block! { match await!(copy(reader, writer)) { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), }; - Ok(()) }); } - Ok(()) } @@ -355,9 +351,7 @@ fn main() { .parse() .unwrap(); - thread::block_on_all(|cont| { - cont.spawn(serve(&cont, addr)); - }); + thread::block_on_all(serve(&cont, addr)); } ``` @@ -378,7 +372,7 @@ The *long-term* goal is to provide a complete async story that feels very close synchronous programming, i.e. something like: ```rust -async fn serve(cont: &Controller, addr: SocketAddr) -> io::Result<()> { +async fn serve(addr: SocketAddr) -> io::Result<()> { let socket = TcpListener::bind(&addr)?; println!("Listening on: {}", addr); @@ -386,12 +380,11 @@ async fn serve(cont: &Controller, addr: SocketAddr) -> io::Result<()> { // with Tokio, read and write components are distinct: let (reader, writer) = conn.split(); - cont.spawn(async { + thread::spawn_task(async { match await copy(reader, writer) { Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr), Err(e) => println!("error on {}: {}", addr, e), }; - Ok(()) }); } @@ -675,73 +668,106 @@ contents: ```rust pub mod thread { - // Execute the given future *synchronously* on the current thread, blocking until - // it completes and returning its result. - // - // NB: no 'static requirement on the future - pub fn block_until(f: F) -> Result; - - // Blocks until either all non-daemon tasks complete, or `force_shutdown` is invoked - pub fn block_on_all(f: F) where F: FnOnce(Controller); - - // A handle used for controlling task spawning and execution within `block_on_all` - // - // NB: this is not `Send` - #[derive(Clone)] - pub struct Controller { .. } - - impl Controller { - // Spawns a "standard" task, i.e. one that must be explicitly either - // blocked on or killed off before `block_on_all` will return. - pub fn spawn(&self, task: F) - where F: Future + 'static; - - // Spawns a "daemon" task, which is not blocked on when waiting - // for completion. - pub fn spawn_daemon(&self, task: F) - where F: Future + 'static; - - // Cancels off *all* remaining tasks (daemon or not) - pub fn cancel_all(&self); - - // Extract proof that we're in an executor context (see below) - pub fn enter(&self) -> &Enter; - } - - impl Executor for Controller - where F: Future + 'static - { - // ... - } + /// Execute the given future *synchronously* on the current thread, blocking until + /// it (and all spawned tasks) completes and returning its result. + /// + /// In more detail, this function blocks until: + /// - the given future completes, *and* + /// - all spawned tasks complete, or `cancel_all_spawned` is invoked + /// + /// Note that there is no `'static` or `Send` requirement on the future. + pub fn block_on_all(f: F) -> Result; + + /// Execute the given closure, then block until all spawned tasks complete. + /// + /// In more detail, this function will block until: + /// - All spawned tasks are complete, or + /// - `cancel_all_spawned` is invoked. + pub fn block_with_init(f: F) where F: FnOnce(&Enter); + + /// Spawns a task, i.e. one that must be explicitly either + /// blocked on or killed off before `block_*` will return. + /// + /// # Panics + /// + /// This function can only be invoked within a future given to a `block_*` + /// invocation; any other use will result in a panic. + pub fn spawn_task(task: F) where F: Future + 'static; + + /// Spawns a daemon, which does *not* block the pending `block_on_all` call. + /// + /// # Panics + /// + /// This function can only be invoked within a future given to a `block_*` + /// invocation; any other use will result in a panic. + pub fn spawn_daemon(task: F) where F: Future + 'static; + + /// Cancels *all* spawned tasks and daemons. + /// + /// # Panics + /// + /// This function can only be invoked within a future given to a `block_*` + /// invocation; any other use will result in a panic. + pub fn cancel_all_spawned(&self); + + struct TaskExecutor { .. } + impl Executor for TaskExecutor + where F: Future + 'static { .. } + + /// Provides an executor handle for spawning tasks onto the current thread. + /// + /// # Panics + /// + /// As with the `spawn_*` functions, this function can only be invoked within + /// a future given to a `block_*` invocation; any other use will result in + // a panic. + pub fn task_executor() -> TaskExecutor; + + struct DaemonExecutor { .. } + impl Executor for DaemonExecutor + where F: Future + 'static { .. } + + /// Provides an executor handle for spawning daemons onto the current thread. + /// + /// # Panics + /// + /// As with the `spawn_*` functions, this function can only be invoked within + /// a future given to a `block_*` invocation; any other use will result in + // a panic. + pub fn daemon_executor() -> TaskExecutor; } ``` This suite of functionality replaces two APIs provided in the Tokio stack today: -- The free `block_until` function replaces the `wait` method on `Future`, which +- The free `block_on_all` function replaces the `wait` method on `Future`, which will be deprecated. In our experience with Tokio, as well as the experiences of other ecosystems like Finagle in Scala, having a blocking method so easily within reach on futures leads people down the wrong path. While it's vitally important to have this "bridge" between the async and sync worlds, providing - it as `thread::block_until` makes it much more clear what is involved. - -- The `Controller` type replaces the use of `Handle` today for cooperative, - non-`Send` task execution. Unlike `Handle`, though, this API is carefully - crafted to help ensure that spawned tasks are actually run to completion, - unless explicitly requested otherwise. - -Thus, in addition to providing a cleaner factoring, these two APIs also mitigate -two major footguns with today's Tokio stack. + it as `thread::block_on_all` highlights the synchronous nature. In addition, + the fact that the function automatically blocks on any spawned tasks helps + avoid footguns as well. + +- Today, Tokio's `Handle` can be used for cooperative, non-`Send` task + execution. Here, that functionality is replaced (and enhanced) by a suite of + free functions, `spawn_task`, `spawn_daemon`, and `cancel_all_spawned`. These + functions, which can only be called in the context of a `block_*` function, + give you ways both to spawn cooperative tasks, and to fully manage + shutdown. In particular, the fact that `block_on_all` waits for spawned tasks + to complete (or for explicit cancellation) helps mitigate another footgun with + today's setup: tasks that are dropped on the floor. Those wishing to couple a reactor and a single-threaded executor, as today's -`tokio-core` does, should use `FuturesUnordered` together with a custom reactor -to do so. +`tokio-core` does, should use something like `FuturesUnordered` together with a +custom reactor to do so. We expect there will eventually be a separate crate +providing this functionality in a high-performance way. ### Executors in general Another footgun we want to watch out for is accidentally trying to spin up multiple executors on a single thread. The most common case is using -`thread::block_until` on, say, a thread pool's worker thread. +`thread::block_on_all` on, say, a thread pool's worker thread. We can mitigate this easily by providing an API for "executor binding" that amounts to a thread-local boolean: @@ -773,9 +799,9 @@ pub mod executor { } ``` -This API should be used in functions like `block_until` that enter executors, +This API should be used in functions like `block_on_all` that enter executors, i.e. that block until some future completes (or other event -occurs). Consequently, nested uses of `block_until`, or uses within a thread +occurs). Consequently, nested uses of `block_on_all`, or uses within a thread pool, will panic, alerting the user to a bug. Executors should publicly expose access to an `Enter` only during From 7ba9b4613c93ff0998796b8e435e153c5b1ad181 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 18 Oct 2017 11:54:22 -0700 Subject: [PATCH 12/13] Address nits --- tokio-reform.md | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/tokio-reform.md b/tokio-reform.md index bca4a16..8b0cc93 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -610,11 +610,14 @@ impl TcpListener { fn bind(addr: &SocketAddr) -> io::Result; // set up a fully-customized listener - fn from_listener(listener: std::net::TcpListener, handle: &Handle) -> io::Result; + fn from_std(listener: std::net::TcpListener, handle: &Handle) -> io::Result; - // this now yields a *std* TcpStream, so that you can use `TcpStream::from_stream` to + // this yields a *std* TcpStream, so that you can use `TcpStream::from_std` to // associate it with an handle of your choice. - fn accept(&mut self) -> Result<(std::net::TcpStream, SocketAddr)> + fn accept_std(&mut self) -> Result<(std::net::TcpStream, SocketAddr)> + + // by contrast, this method returns a stream bound to the default handle. + fn accept(&mut self) -> Result<(TcpStream, SocketAddr)> } ``` @@ -626,8 +629,8 @@ impl TcpStream { fn connect(addr: &SocketAddr) -> TcpStreamNew; // these are as today - fn from_stream(stream: TcpStream, handle: &Handle) -> Result; - fn connect_stream(stream: TcpStream, addr: &SocketAddr, handle: &Handle) -> TcpStreamNew + fn from_std(stream: std::net::TcpStream, handle: &Handle) -> Result; + fn connect_std(stream: std::net::TcpStream, addr: &SocketAddr, handle: &Handle) -> TcpStreamNew } ``` @@ -783,7 +786,7 @@ pub mod executor { impl Enter { // Register a callback to be invoked if and when the thread // ceased to act as an executor. - pub fn on_exit(&mut self, f: F) where F: FnOnce(); + pub fn on_exit(&self, f: F) where F: FnOnce() + 'static; // Treat the remainder of execution on this thread as part of an // executor; used mostly for thread pool worker threads. @@ -845,7 +848,7 @@ expectation is that over time the ecosystem will converge on just using `tokio`. Transitioning APIs over is largely straightforward, but will require a major version bump: -- Uses of `Remote`/`Handle` that don't involve spawning threads can use the new +- Uses of `Remote`/`Handle` that don't involve spawning tasks can use the new `Handle`. - Local task spawning should migrate to use the `thread` module. - APIs that need to construct I/O objects, or otherwise require a `Handle`, From 443924bbe04ee702ba55ae42a7f3ae09b677bd38 Mon Sep 17 00:00:00 2001 From: Aaron Turon Date: Wed, 18 Oct 2017 12:15:14 -0700 Subject: [PATCH 13/13] Fix copy-pasta --- tokio-reform.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio-reform.md b/tokio-reform.md index 8b0cc93..37ecc20 100644 --- a/tokio-reform.md +++ b/tokio-reform.md @@ -737,7 +737,7 @@ pub mod thread { /// As with the `spawn_*` functions, this function can only be invoked within /// a future given to a `block_*` invocation; any other use will result in // a panic. - pub fn daemon_executor() -> TaskExecutor; + pub fn daemon_executor() -> DaemonExecutor; } ```