Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: create reactor per worker #660

Merged
merged 5 commits into from Oct 3, 2018
Merged

runtime: create reactor per worker #660

merged 5 commits into from Oct 3, 2018

Conversation

ghost
Copy link

@ghost ghost commented Sep 25, 2018

Motivation

Sharing one reactor among all worker threads causes a lot of contention.

Solution

This PR creates a reactor per worker thread. Each worker thread drives its own reactor when it goes to sleep.

@ghost ghost requested a review from carllerche September 25, 2018 18:32
/// ```
pub fn reactor(&self) -> &Handle {
self.inner().reactor.handle()
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure what to do about reactor() and handle() other than to remove them. Perhaps we could still keep the background reactor, but would it be worth it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep this to maintain backwards compatibility. Could this just return a handle to the first reactor? A deprecation warning should provide enough hint to users to stop using it.

@@ -152,9 +150,6 @@ pub struct Runtime {

#[derive(Debug)]
struct Inner {
/// Reactor running on a background thread.
reactor: Background,

/// Task execution pool.
pool: threadpool::ThreadPool,
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I remove Inner since now it only contains the ThreadPool? Doesn't make much sense to go through .inner.pool rather than just .pool.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is private, so we can change it to whatever is best 👍

@@ -241,8 +241,14 @@ impl Builder {
});
})
.custom_park(move |worker_id| {
// Create a new reactor
let reactor = Reactor::new().unwrap(); // TODO(stjepang): remove unwrap
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a tricky unwrap(). I think we have two options here:

  1. Change the signature of custom_park so that the closure returns an io::Result<P>.
  2. Construct reactors and timers outside the threadpool builder. For that, we should probably make the inner field of WorkerId public and guarantee that IDs are always numbers in 0..core_threads.

Which one do you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much of an opinion here. My initial observations:

  • tokio-threadpool should be as independent as possible, changing the signature of custom_park to io::Result is requiring knowledge of the runtime needs.
  • Making WorkerId PartialEq<usize> or something will probably work. It will be 0..self.pool_size() set by the builder. This also is easier to do in a backwards compatible way.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍 Thanks for breaking this up, it makes it much easier to review.

I provided thoughts inline to your questions.

@@ -241,8 +241,14 @@ impl Builder {
});
})
.custom_park(move |worker_id| {
// Create a new reactor
let reactor = Reactor::new().unwrap(); // TODO(stjepang): remove unwrap
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have much of an opinion here. My initial observations:

  • tokio-threadpool should be as independent as possible, changing the signature of custom_park to io::Result is requiring knowledge of the runtime needs.
  • Making WorkerId PartialEq<usize> or something will probably work. It will be 0..self.pool_size() set by the builder. This also is easier to do in a backwards compatible way.

@@ -152,9 +150,6 @@ pub struct Runtime {

#[derive(Debug)]
struct Inner {
/// Reactor running on a background thread.
reactor: Background,

/// Task execution pool.
pool: threadpool::ThreadPool,
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is private, so we can change it to whatever is best 👍

/// ```
pub fn reactor(&self) -> &Handle {
self.inner().reactor.handle()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to keep this to maintain backwards compatibility. Could this just return a handle to the first reactor? A deprecation warning should provide enough hint to users to stop using it.

})
});

let inner = Box::new(inner.pool.shutdown_now());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odds are you can remove the box, but it isn't critical.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great. The only change I would ask for is to keep the fields of WorkerId private, instead add a as_usize() method. This should help with forwards compatibility.

I'm a 👍 with that change. Anyone else may feel free to merge this at that point.

/// Worker identifiers in a single thread pool are guaranteed to be integers in
/// the range `0..pool_size`.
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
pub struct WorkerId(pub usize);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making the field pub, could you instead keep the field private and add a new fn: WorkerId::as_usize()? This should help with forwards compatibility.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to name it WorkerId::into_usize because:

  • fn as_usize(&self) -> &usize
  • fn to_usize(&self) -> usize
  • fn into_usize(self) -> usize

I made WorkerId a Copy type (just like e.g. ThreadId is) so now into_usize seems most fitting.

let timer_handle = t1.lock().unwrap()
.get(w.id()).unwrap()
.clone();
let index = w.id().0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to the comment on WorkerId, this would become w.id().as_usize().

.insert(worker_id.clone(), timer.handle());

timer
timers[worker_id.0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would become worker_id.as_usize().

@jonhoo
Copy link
Contributor

jonhoo commented Oct 2, 2018

I'm working on tidying up the benchmarks for https://github.com/mit-pdos/noria. In theory that should serve as a good benchmark of the impact of this change.

@carllerche
Copy link
Member

@jonhoo Thanks for letting us know.

That said, I don't think this change will make much of a difference. This is the first step. The next step will be assigning tasks to "home" workers.

@jonhoo
Copy link
Contributor

jonhoo commented Oct 2, 2018

@carllerche Hmm, if I understand the change right, this should at least be able to replace tokio-io-pool?

@@ -79,7 +79,7 @@ struct CurrentTask {
///
/// This identifier is unique scoped by the thread pool. It is possible that
/// different thread pool instances share worker identifier values.
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the type is already Clone, I would avoid Copy for now (forwards compat hazard).

///
/// Worker identifiers in a single thread pool are guaranteed to correspond to integers in the
/// range `0..pool_size`.
pub fn into_usize(&self) -> usize {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still taking &self. That said, I would opt for to_usize for now (but it isn't critical either way).

@carllerche
Copy link
Member

@jonhoo According to @stjepang, I am wrong and this PR should take us there.

Also, @seanmonstar described how to run the hyper benchmarks here. We should check the impact of this PR.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

@carllerche carllerche merged commit d35d051 into tokio-rs:master Oct 3, 2018
@ghost ghost deleted the reactor-per-worker branch October 3, 2018 09:35
@jonhoo
Copy link
Contributor

jonhoo commented Nov 13, 2018

@stjepang just to check my own understanding of this, this means that, with work stealing, we could end up with lots of futures being driven by reactors on different threads, right? Because there's no migration between reactors?

@ghost
Copy link
Author

ghost commented Nov 13, 2018

@jonhoo Yes... in theory.

However, if a worker is running a stolen task, the situation is no worse than what we previously had - where all futures are driven by the single reactor on a dedicated background thread.

Furthermore, in order to minimize the amount of stealing going on, I submitted #683 that attempts to distribute futures onto workers & reactors as evenly as possible.

@jonhoo
Copy link
Contributor

jonhoo commented Nov 13, 2018

Okay, yeah, that's what I figured; just wanted to double-check. I wonder if we could keep track of the "original" worker for a future, and try to bias the stealing so that things tend to end up back where they were spawned when possible.

@ghost
Copy link
Author

ghost commented Nov 13, 2018

I wonder if we could keep track of the "original" worker for a future, and try to bias the stealing so that things tend to end up back where they were spawned when possible.

That is already happening. When a reactor decides to notify a task, it inserts it into the current worker's queue.

Say task T is polled by worker W1, its IO resource gets registered in reactor R1, and then it goes back to W1's task queue. Then another worker W2 steals T and polls it, but it has to block on T's IO resource. T gets inserted into R1 (because the IO resource was registered there) and waits there until it becomes ready. Eventually, R1 will decide to notify T and insert it into W1's queue. This way T ends up back in W1.

@jonhoo
Copy link
Contributor

jonhoo commented Nov 13, 2018

Neat! I didn't realize reactors also had their own queues? I'm doing some reconnaissance ahead of https://twitter.com/Jonhoo/status/1062392012878606339 :)

@ghost
Copy link
Author

ghost commented Nov 13, 2018

Oh nice! Looking forward to it :)

So the way a worker finds and runs a task is:

  1. Grab a task from my own queue.
  2. If not found, then steal it from someone else.
  3. If I've got a task, then run it.
    a. If the task has completed, drop it.
    b. If the task has to block on an IO resource, give a task reference to the reactor the IO resource is registered in.
    • If the IO resource hasn't been registered anywhere yet, register it in my reactor.
  4. If I don't have a task, block on my own reactor.
    a. If the reactor sees IO activity, it will notify appropriate tasks and wake me up.
    b. It's also possible for another thread to spawn a new future and put it into my queue. That action will trigger a dummy IO resource in my reactor in order to wake me up.

Besides waiting on the reactor when no tasks are found, the worker also calls function named "sleep_light" every X task runs, which asks the reactor for new events without blocking. We do this so that IO notifications keep chugging along even when the task queues are not empty.

The reactor has a list of IO resources together with tasks they belong to. When the reactor is blocked on, it will wait until at least one IO resource becomes ready. Then it notifies tasks associated with all readied IO resources, and the act of notification simply puts task references into the current worker's queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants