From 5b00d260c319ae7415cefd5d7174bc0129891e08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Wed, 11 Apr 2018 10:49:20 +0200 Subject: [PATCH] Make CurrentThread::turn() more fair by always parking with 0 timeout first This ensures that all fd-based futures are put into the queue for the current tick, if the CurrentThread is parking via the Reactor. Otherwise, if there are queued up futures already, only those would be polled in the turn. These futures could then notify others/themselves to have the queue still non-empty on the next turn. Which then potentially allows the reactor to never be polled, and thus fd-based futures are never queued up and polled. Also return in the Turn return value whether any futures were polled at all, which allows the caller to know if any work was done at all in this turn and based on that adjust behaviour. https://github.com/tokio-rs/tokio/issues/310 --- src/executor/current_thread/mod.rs | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/src/executor/current_thread/mod.rs b/src/executor/current_thread/mod.rs index d4d2ed2951c..9e739b3365b 100644 --- a/src/executor/current_thread/mod.rs +++ b/src/executor/current_thread/mod.rs @@ -147,9 +147,10 @@ pub struct TaskExecutor { _p: ::std::marker::PhantomData>, } -/// Returned by the `turn` function +/// Returned by the `turn` function. Contains `true` if any futures were +/// polled at all and `false` otherwise. #[derive(Debug)] -pub struct Turn(()); +pub struct Turn(pub bool); /// A `CurrentThread` instance bound to a supplied execution conext. pub struct Entered<'a, P: Park + 'a> { @@ -480,7 +481,14 @@ impl<'a, P: Park> Entered<'a, P> { pub fn turn(&mut self, duration: Option) -> Result { - if !self.tick() { + let res = self.executor.park.park_timeout(Duration::from_millis(0)); + if res.is_err() { + return Err(TurnError { _p: () }); + } + + let mut polled = self.tick(); + + if !polled { let res = match duration { Some(duration) => self.executor.park.park_timeout(duration), None => self.executor.park.park(), @@ -490,10 +498,10 @@ impl<'a, P: Park> Entered<'a, P> { return Err(TurnError { _p: () }); } - self.tick(); + polled = self.tick(); } - Ok(Turn(())) + Ok(Turn(polled)) } fn run_timeout2(&mut self, dur: Option)