Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce the Tokio runtime: Reactor + Threadpool #141

Merged
merged 28 commits into from
Feb 21, 2018
Merged

Conversation

carllerche
Copy link
Member

@carllerche carllerche commented Feb 15, 2018

This patch is an initial implementation of the Tokio runtime. The Tokio runtime provides an out of the box configuration for running I/O heavy asynchronous applications.

tl;dr

This patch includes a work-stealing thread pool and an easy way to get a reactor + thread pool:

extern crate tokio;
extern crate futures;
use futures::{Future, Stream};
use tokio::net::TcpListener;

let addr = "127.0.0.1:8080".parse().unwrap();
let listener = TcpListener::bind(&addr).unwrap();

let server = listener.incoming()
    .map_err(|e| println!("error = {:?}", e))
    .for_each(|socket| {
        tokio::spawn(process(socket))
    });

tokio::run(server);

This will result in sockets being processed across all available cores.

Background

Most I/O heavy application require the same general setup: a reactor that drives I/O resources and an executor to run application logic. Historically, tokio-core provided both of these in one structure. A reactor and a single threaded executor for application logic that ran on the same thread as the reactor.

The Tokio reform RFC proposed splitting those two constructs. The Tokio reactor now can be run standalone. To replace the single threaded executor, current_thread was introduced. However, this leaves figuring out concurrency up to the user.

Idea

Modern computers provide many cores. Applications really need to be able to take advantage of this available concurrency opportunity. It would be nice if Tokio provided an out of the box concurrency solution that is solid for most use cases.

Thread Pool

First, to support scheduling futures efficiently across all cores, a thread pool is needed. Currently, the consensus regarding the best strategy is to use a work-stealing thread pool. So, Tokio now has a new crate: tokio-threadpool.

Now, there are many ways to implement a work-stealing thread pool. If you are familiar with Rayon, you might already know that Rayon also uses a work-stealing thread pool. However, Rayon's pool is optimized for a different use case than tokio-threadpool. Rayon's goal is to provide concurrency when attempting to compute a single end result. Tokio's goal is to multiplex many independent tasks with as much fairness as possible while still having good performance. So, the implementation of tokio-threadpool and Rayon's pool are going to be fairly different.

tokio-executor

Another new crate is included, tokio-executor. This is mostly pulling in the design work happening in the futures executor RFC. Since the goal between now and Tokio 0.2 is to get as much experimentation done as possible, I wanted to give the traits / types discussed in that RFC a spin. Ideally, the tokio-executor crate will be short lived and gone by 0.2.

Putting it together

The tokio crate includes a new Runtime type, which is a handle to both the reactor and the thread pool. This allows starting and stopping both in a coordinated way and will provide configuration options and more over time.

Path to 0.2

Again, the goal for the time period between now and 0.2 is to get as much experimentation done as possible. The Runtime construct is an initial proposal with an implementation, giving users a change to try it out and provide feedback.

Before 0.2 is released, an RFC will be written up detailing what was learned, and what is being proposed for inclusion in the 0.2 release. This will be a forum for all to provide feedback based on usage.

Remaining for this PR

  • Update documentation
  • Audit APIs being introduced
  • Check for any changes in the futures executor RFC.
  • Should park return a Result?
  • Actually add tokio::spawn fn.
  • Rename run_seeded.

This patch is an intial implementation of the Tokio runtime. The Tokio
runtime provides an out of the box configuration for running I/O heavy
asynchronous applications.

As of now, the Tokio runtime is a combination of a work-stealing thread
pool as well as a background reactor to drive I/O resources.

This patch also includes tokio-executor, a hopefully short lived crate
that is based on the futures 0.2 executor RFC.
@carllerche
Copy link
Member Author

The Runtime type is here. This file introduces the bulk of the new public API. The rest is the implementation.

]

[badges]
travis-ci = { repository = "tokio-rs/tokio" }
appveyor = { repository = "carllerche/tokio" }

[dependencies]
tokio-io = "0.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you set path in tokio-executor like path = "tokio-executor" but you get tokio-io from crates.io?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tokio-io dependency probably should be updated as well 👍 That can be done on master.

This enables the reactor to be used as the thread parker for executors.
This also adds an `Error` component to `Park`.

#[inline]
pub fn poll(&self) -> Poll<T> {
unsafe {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this whole function actually unsafe? ISTM that the scope could be narrowed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, thanks. This entire file is actually dead code right now :) The deque is provided by a lib.

// Get a handle to the reactor.
let handle = reactor.handle().clone();

let pool = threadpool::Builder::new()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we probably want a way to configure the upper bounds of number of threads this threadpool may run?

///
/// A `Handle` is used for associating I/O objects with an event loop
/// explicitly. Typically though you won't end up using a `Handle` that often
/// and will instead use and implicitly configured handle for your thread.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/and/an/


// If the fallback hasn't been previously initialized then let's spin
// up a helper thread and try to initialize with that. If we can't
// actually create a helper thread then we'll just return a "defunkt"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/defunkt/defunct/


## License

`futures-pool` is primarily distributed under the terms of both the MIT license
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be tokio-threadpool instead of futures-pool?

src/runtime.rs Outdated
}

/// Start the Tokio runtime and spawn the provided future.
pub fn run_seeded<F>(f: F)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is trivial but the 'seeded' terminology seems very unfamiliar for what's probably a common entrypoint to the API. The only 'seed' that comes to mind is that of random number generation. What about just 'run_future' as the name here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the name isn't great.

The issue with run_future is that the function does more than this. It spawns the future onto the runtime, runs it AND any other futures that get spawned.

Any other thoughts for names?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

go ? :trollface:
How about run_async ?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run for closure, poll for future?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking of just renaming run_seeded -> run and getting rid of the other fn (in favor of using the Runtime instance directly).

This patch updates `CurrentThread` to take a `&mut Enter` reference as
part of all functions that require an executor context. This allows the
callers to configure the executor context before hand.

For example, the default Tokio reactor can be set for the
`CurrentThread` executor.

let mut found_work = false;

worker_loop!(idx = len; len; {
Copy link

@alkis alkis Feb 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be stealing from a random, active worker? This seems to be going through the workers in order.

Also this is the only use of worker_loop macro. Perhaps inline it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... you are correct. It was supposed to start at a random index but that seems to have gotten lost / forgotten! Thanks.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On mobile ATM and didn't inspect the loop mentioned, but are we confident that starting at a random index is sufficiently random? Something tells me we actually need to randomly index into a proper set of available workers, otherwise there may be statistical bias towards picking higher numbered workers

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It works as a ring, so there isn't really a "higher" index worker.

IIRC, this bit was inspired from Java's ForkJoin pool.

mem::transmute(unpark_id)
}

/// Execute the task returning `true` if the task needs to be scheduled again.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/true/Run::Schedule/

@alkis
Copy link

alkis commented Feb 17, 2018

@carllerche is it possible to add more toplevel docs explaining some the role of each abstraction? It would help a lot when doing a deeper review.

/// In more detail, this function will block until:
/// - All executing futures are complete, or
/// - `cancel_all_spawned` is invoked.
pub fn run<F, R>(f: F) -> R
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of run and run_seeded, how about: run_with_context (closure) and poll_with_context (future)?

self.inner.clone().into()
}

pub fn schedule(&mut self, item: Box<Future<Item = (), Error = ()>>) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any way to avoid one of the two allocations here? One from Arc and one from the Box'ed future.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a way, but it is actually pretty tricky due to the need for trait objects. I opted to not worry about it for now until it is proven to be an issue.


/// Generates a random number
///
/// Uses a thread-local seeded XorShift.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't this be part of WorkerEntry instead of thread-local?

@carllerche
Copy link
Member Author

@alkis re: randomizing work stealing, the logic was taken from Java's implementation. The pool is intended to work well with a partial worker set, i.e. with any number of worker threads shutdown.

There probably are ways to improve the heuristics, but I will punt any such tuning until later and will want a solid set of benchmarks before making such changes.

@alkis
Copy link

alkis commented Feb 19, 2018

@carllerche Have you looked into the design of the go scheduler? It was designed by Vyukov:

Notable differences to current implementation:

  • when stealing, thief steals half the queue of the victim
  • workers only look at global queue only once per 61 turns
  • stealing goes through the active workers in a random permutation
  • thieves spin 4 times trying to find victims, before giving up and parking
  • workers do not steal at all if there is enough thieves already

There is a comment in the source code which starts with "Worker thread parking/unparking" which explains the current design and why other approaches work badly.

@carllerche
Copy link
Member Author

@alkis All good thoughts, especially stealing more than one task at a time. I was going to include this initially, but the deque impl lacked the ability. The impl has since moved to crossbeam, so threadpool should be updated.

Re: all the other items. It's definitely worth experimenting with, but backed with numbers :) This PR represents an initial commit bringing in the feature, enabling iteration over time. The Go scheduler is intended for a slightly different use case than Tokio's. Go has one scheduler for the entire process and everything uses it. All threads are constantly active in some capacity (threads exist, but might be sleeping). Tokio's thread pool is (currently) intended to handle:

  • More than one instance of a thread pool in a process
  • Threads can spin up and shutdown.
  • There might only be a single thread running in the pool.
  • TBD additional features that don't make sense in Go's runtime.

If you are interested, iterating the scheduler is something that you could champion, starting with setting up a suite of benchmarks. Then, each change can be made and we can check the impact in various scenarios.

Not to mention, there currently exist some bigger known issues (there is a case of false sharing... a TODO in the code mentions this). Also, the code base is generally immature and needs people to use it, so getting it out sooner than later is good!


/// Run the executor to completion, blocking the thread until all
/// spawned futures have completed **or** `duration` time has elapsed.
pub fn run_timeout(&mut self, duration: Duration)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

General question: why does Rust prefer timeouts vs deadlines? Timeouts don't compose well:

// I want to finish something in 10 secs, with timeouts.
let mut start = Instant::now();
let elapsed = || {
  let now = Instant::now();
  let res = start - res;
  start = now;
  res
};
// Not even entirely correct because elapsed() can be greater than 10 secs and subtraction will panic.
let c = socket.open_with_timeout(Duration::from_secs(10))?;
let data = socket.read_with_timeout(Duration::from_secs(10) - elapsed())?;
let processed = process(data)?;
socket.write_with_timeout(processed, Duration::from_secs(10) - elapsed())?;
// I want to finish something in 10 secs, with deadline.
let deadline = Instant::now() + Duration::from_secs(10);
let c = socket.open_with_deadline(deadlline)?;
let data = socket.read_with_deadline(deadlline)?;
let processed = process(data)?;
socket.write_with_deadline(processed, deadlline)?;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, Rust itself picked timeouts as that is what gets passed to the syscalls.

One can add a deadline shim on top of a timeout based API, but if the APIs themselves are deadline based, you would have to make additional syscalls within each one to get Instant::now().

I personally have never hit the composability issue as working w/ futures it is a non-issue (you define the computation w/o any timeout or deadline and set a timeout on the entire computation). However, if it did become an issue, I'd probably add a deadline shim on top :)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding a timeout shim on top of deadline API is easier (no need to check for negative timeouts). Also deadline is the right thing for the implementation anyway.

I don't think there is additional Instant::now() call unless the timeout based code has bugs (see below).

The composability issue is a real problem. Also timeouts usually lead to bugs whenever there are retries or partial reads. Go switched from SetTimeout to SetDeadline before 1.0 mainly for the latter reason.

FWIW I just check stdlib and it suffers from this as well. If we set a timeout on a socket and then use read_to_end or read_exact the timeout won't be obeyed if the loops inside said functions execute more than once. Even if we try to describe the current semantics they are super weird: the effective timeout is K * the set timeout where K is the number of successful partial reads of read_exact or read_to_end respectively.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alkis For me, it comes down to this. How do you change this function to take a deadline instead of a timeout without increasing the number of syscalls?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllerche the function you linked will gain one Instant::now and this function will lose this syscall.

Copy link

@frehberg frehberg Mar 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alkis for example in automotive, having multiple internal clocks, such as the dashboard-clock, power-control-unit-clock, system-clock, and steady-clock. The latter being a monotonic clock; the time points of this clock cannot decrease as physical time moves forward. In contrast a system clock may be adjusted by user or via GPS. If you define a deadline-timer relative to the system clock, and in the meanwhile the clock is adjusted to a timepoint in the past, the timer will fire after a much longer interval than intended; the corresponding functionality of the app may seem frozen and malfunctioning. Using a timeout is not associated to a specific clock, is much more robust against adjustments of clocks

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's sounds like a bug. There is access to monotonic clock yet one is defined on a non monotonic clock.

I am looking for an example where the system does not have a monotonic clock and the program specifies a timeout and that is handled correctly.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI I wrote this on the topic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alkis you probably saw, but tokio-timer is now using deadlines.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I didn't. Thanks for the pointer @carllerche!

@carllerche
Copy link
Member Author

Ok, I think that this is ready to merge (assuming CI passes).

src/runtime.rs Outdated
//! .shutdown_on_idle()
//! .wait().unwrap();
//!
//! tokio::run(server);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have to call tokio::run after we created Runtime?

@Bathtor
Copy link

Bathtor commented Mar 1, 2018

I'm slightly wondering why are we duplicating work on executors? I did exactly the same thing (i.e. writing a workstealing threadpool scheduler on crossbream) months ago, because I noticed it was missing, and I published it to crate.io so people could use it.
Wouldn't it be better to work towards one really good implementation of this, than a bunch of mediocre ones? (Or at least one per load type. As pointed out before there are differences between computing a single result as fast as possible and scheduling reactive tasks well.)

@jeehoonkang
Copy link
Contributor

@carllerche you mentioned in #141 (comment) that stealing multiple elements will be helpful. I have an implementation of many-stealing deque, and I wonder if you are interested in it.

While I "guess" this algorithm is correct, without proofs I cannot be confident. I'm currently proving it, and I hope it'll be finished within a month. In the meantime, I'd like to see if stealing multiple elements will actually improve the performance of Tokio. I'll be back within days with benchmark results.

FYI, Rayon developers are also interested in stealing multiple elements (relevant issue here), but they are not sure whether it's a performance win. I'll bench Rayon with the new deque, too.

@jeehoonkang
Copy link
Contributor

jeehoonkang commented Mar 6, 2018

I changed tokio-threadpool to use the many-stealing deque, and got the following result on a benchmark. Here, control is the PR #185 (replacing coco with crossbeam), and variable is this branch. In variable, Worker::try_steal_task() steals multiple elements, and if there are more than one, it executes one and pushes the others into its own deque. I believe there's a better strategy, but it was the easiest to implement. The benchmark was performed in a Xeon E5-2620 v3 @ 2.40GHz w/ 12 cores (24 hw threads).

 name                                control ns/iter  variable ns/iter  diff ns/iter   diff %  speedup
 connect_churn::multi_threads        7,376,326        7,385,806                9,480    0.13%   x 1.00
 connect_churn::one_thread           9,325,834        7,109,259           -2,216,575  -23.77%   x 1.31
 connect_churn::two_threads          15,843,802       15,970,003             126,201    0.80%   x 0.99
 futures_channel_latency             8,223            5,680                   -2,543  -30.93%   x 1.45
 mio_poll                            125              168                         43   34.40%   x 0.74
 mio_register_deregister             439              588                        149   33.94%   x 0.75
 mio_reregister                      127              129                          2    1.57%   x 0.98
 transfer::big_chunks::one_thread    3,343,033        3,330,092              -12,941   -0.39%   x 1.00
 transfer::small_chunks::one_thread  60,798,726       60,740,570             -58,156   -0.10%   x 1.00
 udp_echo_latency                    19,158           17,840                  -1,318   -6.88%   x 1.07

Overall, it shows that many-stealing is very helpful for some use cases, but maybe not for all. In particular, mio benchmarks are significantly worsened. I'll investigate why.

@carllerche
Copy link
Member Author

The Mio benchmarks are unrelated to anything in Tokio. They are setting the baseline. Any change to tokio-threadpool will not impact those. So, it is odd that there is such a huge diff.

@alkis
Copy link

alkis commented Mar 6, 2018

How many does steal_many() steal? IIRC Go steals half the queue.

@jeehoonkang
Copy link
Contributor

@alkis currently it steals half the elements. I named it steal_many() because it can be configurable in the future, but maybe is it better to name it steal_half()?

// In our example, we have not defined a shutdown strategy, so
// this will block until `ctrl-c` is pressed at the terminal.
});
// current thread. This means that spawned tasks must not implement `Send`.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the old wording correct - "spawned tasks are not required to implement Send"? :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the good news is it has been fixed on master it looks like.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.