Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a Send'able Handle for the single-threaded Runtime to spawn… #340

Merged
merged 5 commits into from
Jun 5, 2018

Conversation

sdroege
Copy link
Contributor

@sdroege sdroege commented May 3, 2018

… new tasks

// Create an mpsc channel for scheduling new tasks on this runtime
// from a different thread
// XXX: Should this use a bounded channel with an arbitrary limit instead, and then
// panic spawning if the channel is full?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any opinions on this?

Also some further work is needed so that run() ignores our channel instead of just running forever

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how to make run() not wait. The way how I solved that in my code is that I'm not using a channel but just a Mutex<VecDequeue> and then use turn() instead of run() and unpark whenever a future is scheduled.

Any better ideas?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... let's just move it into the executor directly. There there's control over all that :)

// XXX: Should this use a bounded channel with an arbitrary limit instead, and then
// panic spawning if the channel is full?
let (spawn_sender, spawn_receiver) = mpsc::unbounded();
executor.spawn(spawn_receiver.for_each(|future| {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to do this only the first time that handle() is called, but it complicates things slightly. Opinions?

@carllerche
Copy link
Member

Ping me when you think this is reviewable 👍

@sdroege
Copy link
Contributor Author

sdroege commented May 11, 2018

Thanks, will do! Unfortunately got side-tracked with other things, but hopefully will have some time this weekend

@sdroege
Copy link
Contributor Author

sdroege commented May 14, 2018

Sorry for the delay, this is now updated to have the handle implemented directly on the executor (and re-exported from the runtime). And it has a test

@carllerche this should be good for reviewing now. Only open question from my side is whether this should actually be an unbounded channel, and if there should be a standalone function current_thread::handle that gives the handle of the current thread's executor

@tikue
Copy link

tikue commented May 15, 2018

Spawning a future is already an unbounded action, so making the channel unbounded isn't changing anything.

@sdroege
Copy link
Contributor Author

sdroege commented May 18, 2018

@tikue That was also my thought, and if you spawn too many futures you'll probably have another problem already before that channel is becoming one

@sdroege
Copy link
Contributor Author

sdroege commented May 29, 2018

Any other comments, how should we move forward with this? :)

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay (stuff came up)! Looks like a great start. I posted some feedback inline.

@@ -304,10 +311,16 @@ impl<P: Park> CurrentThread<P> {
pub fn new_with_park(park: P) -> Self {
let unpark = park.unpark();

// XXX: Should this use a bounded channel with an arbitrary limit instead, and then
// panic spawning if the channel is full?
let (spawn_sender, spawn_receiver) = mpsc::unbounded();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded is fine here. Generally, spawn fns do not have back pressure. Instead, they either succeed or fail. One accepted pattern is for the executor to have a max # of tasks and spawn to fail if that is reached. This feature can be punted to another PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the comment

spawn_handle: Handle,

/// Receiver for futures spawned from other threads
spawn_receiver: executor::Spawn<mpsc::UnboundedReceiver<Box<Future<Item = (), Error = ()> + Send + 'static>>>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can probably be a plain old queue. There is no need to pull in all the future enabled stuff since this is for an executor. The easiest thing here is probably to use std::sync::mpsc and try_recv.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, it indeed is much simpler with a normal mpsc queue. Changed it accordingly

let notify = borrow.scheduler.notify();

loop {
let res = borrow.enter(self.enter, || {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A lot of this can probably be simplified by not using the future enabled channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

///
/// This function panics if the spawn fails. Failure occurs if the `CurrentThread`
/// instance of the `Handle` does not exist anymore.
pub fn spawn<F>(&self, future: F)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For forwards compat, we will probably want to return Result here (matching https://docs.rs/tokio/0.1.6/tokio/executor/trait.Executor.html#tymethod.spawn).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -8,6 +8,9 @@ use futures::Future;

use std::io;

// Re-export Handle from here
pub use executor::current_thread::Handle;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For forwards compat, I would wrap this in a new type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

…bility

... and consistency with the other spawn() functions.
…rentThread executor

This simplifies the code quite a bit and is doing all we need anyway.
@sdroege
Copy link
Contributor Author

sdroege commented May 30, 2018

Applied all the review comments in separate commits and rebased everything on top of latest master. Thanks for the review!

@sdroege
Copy link
Contributor Author

sdroege commented Jun 4, 2018

@carllerche is there anything else left to be done here?

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me 👍 Thanks for sticking with it.

@carllerche carllerche merged commit 0d41ba7 into tokio-rs:master Jun 5, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants