From 15386b62f80fcaeeff730447b26d098ed86bd96d Mon Sep 17 00:00:00 2001 From: Gus Wynn Date: Fri, 20 Aug 2021 12:40:05 -0700 Subject: [PATCH 1/3] make lifo_slot configurable --- tokio/src/runtime/builder.rs | 46 ++++++++++++++++++++++++- tokio/src/runtime/thread_pool/mod.rs | 8 +++-- tokio/src/runtime/thread_pool/worker.rs | 38 ++++++++++++++------ tokio/tests/rt_threaded.rs | 35 ++++++++++++++++++- 4 files changed, 113 insertions(+), 14 deletions(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 51bf8c843fa..b7f5cfd3a51 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -55,6 +55,9 @@ pub struct Builder { /// Only used when not using the current-thread executor. worker_threads: Option, + /// Whether or not to enable to the lifo_slot optimization + lifo_slot_optimization: bool, + /// Cap on thread usage. max_blocking_threads: usize, @@ -126,6 +129,9 @@ impl Builder { max_blocking_threads: 512, + // Default to using the optimization + lifo_slot_optimization: true, + // Default thread name thread_name: std::sync::Arc::new(|| "tokio-runtime-worker".into()), @@ -220,6 +226,44 @@ impl Builder { self } + /// Configures whether or not the `Runtime` will use an optimization that + /// causes recently spawned tasks to be scheduled first. This defaults to `true`, + /// as this optimization improves latencies in many patterns, but may be undesired + /// behavior + /// + /// # Default + /// + /// The default value is `true` + /// + /// # Panic + /// + /// When using the `current_thread` runtime this method will panic, since + /// those variants do not allow setting worker thread counts. + /// + /// + /// # Examples + /// + /// ## Multi threaded runtime with 4 threads + /// + /// ``` + /// use tokio::runtime; + /// + /// // This will spawn a work-stealing runtime with the optimization turned off + /// let rt = runtime::Builder::new_multi_thread() + /// .lifo_slot_optimization(true) + /// .build() + /// .unwrap(); + /// + /// rt.spawn(async move {}); + /// ``` + pub fn lifo_slot_optimization(&mut self, yes: bool) -> &mut Self { + // Doesn't actually panic as described in the docs, to match the current + // behavior of `worker_threads` + self.lifo_slot_optimization = yes; + self + } + + /// Specifies the limit for additional threads spawned by the Runtime. /// /// These threads are used for blocking operations like tasks spawned @@ -546,7 +590,7 @@ cfg_rt_multi_thread! { let (driver, resources) = driver::Driver::new(self.get_cfg())?; - let (scheduler, launch) = ThreadPool::new(core_threads, Parker::new(driver)); + let (scheduler, launch) = ThreadPool::new(core_threads, self.lifo_slot_optimization, Parker::new(driver)); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); // Create the blocking pool diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 3808aa26465..d894634294a 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -43,8 +43,12 @@ pub(crate) struct Spawner { // ===== impl ThreadPool ===== impl ThreadPool { - pub(crate) fn new(size: usize, parker: Parker) -> (ThreadPool, Launch) { - let (shared, launch) = worker::create(size, parker); + pub(crate) fn new( + size: usize, + lifo_slot_optimization: bool, + parker: Parker, + ) -> (ThreadPool, Launch) { + let (shared, launch) = worker::create(size, lifo_slot_optimization, parker); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index f5004c0e406..a4128ff4523 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -94,7 +94,9 @@ struct Core { /// queue. This effectively results in the **last** scheduled task to be run /// next (LIFO). This is an optimization for message passing patterns and /// helps to reduce latency. - lifo_slot: Option, + /// + /// The first layer of `Option` is whether or not the slot is configured for use or not + lifo_slot: Option>, /// The worker-local run queue. run_queue: queue::Local>, @@ -174,7 +176,11 @@ type Notified = task::Notified>; // Tracks thread-local state scoped_thread_local!(static CURRENT: Context); -pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { +pub(super) fn create( + size: usize, + lifo_slot_optimization: bool, + park: Parker, +) -> (Arc, Launch) { let mut cores = vec![]; let mut remotes = vec![]; @@ -187,7 +193,11 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { cores.push(Box::new(Core { tick: 0, - lifo_slot: None, + lifo_slot: if lifo_slot_optimization { + Some(None) + } else { + None + }, run_queue, is_searching: false, is_shutdown: false, @@ -408,8 +418,11 @@ impl Context { }; // Check for a task in the LIFO slot - let task = match core.lifo_slot.take() { - Some(task) => task, + let task = match &mut core.lifo_slot { + Some(lifo_slot) => match lifo_slot.take() { + Some(task) => task, + None => return Ok(core), + }, None => return Ok(core), }; @@ -503,7 +516,10 @@ impl Core { } fn next_local_task(&mut self) -> Option { - self.lifo_slot.take().or_else(|| self.run_queue.pop()) + self.lifo_slot + .as_mut() + .and_then(|lifo_slot| lifo_slot.take()) + .or_else(|| self.run_queue.pop()) } fn steal_work(&mut self, worker: &Worker) -> Option { @@ -573,7 +589,7 @@ impl Core { fn transition_from_parked(&mut self, worker: &Worker) -> bool { // If a task is in the lifo slot, then we must unpark regardless of // being notified - if self.lifo_slot.is_some() { + if matches!(self.lifo_slot, Some(Some(_))) { worker.shared.idle.unpark_worker_by_id(worker.index); self.is_searching = true; return true; @@ -675,19 +691,21 @@ impl Shared { // task must always be pushed to the back of the queue, enabling other // tasks to be executed. If **not** a yield, then there is more // flexibility and the task may go to the front of the queue. - let should_notify = if is_yield { + let should_notify = if is_yield || core.lifo_slot.is_none() { core.run_queue.push_back(task, &self.inject); true } else { // Push to the LIFO slot - let prev = core.lifo_slot.take(); + + // We checked `is_none` above + let prev = core.lifo_slot.as_mut().unwrap().take(); let ret = prev.is_some(); if let Some(prev) = prev { core.run_queue.push_back(prev, &self.inject); } - core.lifo_slot = Some(task); + core.lifo_slot = Some(Some(task)); ret }; diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 9e76c4ed0b7..1ae4d2bf107 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -3,7 +3,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::runtime::{self, Runtime}; +use tokio::runtime::{self, Runtime, Builder}; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; @@ -54,6 +54,39 @@ fn many_oneshot_futures() { drop(rt); } } + +#[test] +fn no_lifo_slot_complex() { + // used for notifying the main thread + const NUM: usize = 1_000; + + + for _ in 0..5 { + let (tx, rx) = mpsc::channel(); + + let rt = Builder::new_multi_thread().lifo_slot_optimization(false).build().unwrap(); + let cnt = Arc::new(AtomicUsize::new(0)); + + for _ in 0..NUM { + let cnt = cnt.clone(); + let tx = tx.clone(); + + rt.spawn(async move { + let num = cnt.fetch_add(1, Relaxed) + 1; + + if num == NUM { + tx.send(()).unwrap(); + } + }); + } + + rx.recv().unwrap(); + + // Wait for the pool to shutdown + drop(rt); + } +} + #[test] fn many_multishot_futures() { const CHAIN: usize = 200; From f6e1a0ea34331633cbe8f354f08f0c7204eccf0f Mon Sep 17 00:00:00 2001 From: Gus Wynn Date: Fri, 20 Aug 2021 14:56:51 -0700 Subject: [PATCH 2/3] fmt --- tokio/tests/rt_threaded.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/tokio/tests/rt_threaded.rs b/tokio/tests/rt_threaded.rs index 1ae4d2bf107..7c87beccfbb 100644 --- a/tokio/tests/rt_threaded.rs +++ b/tokio/tests/rt_threaded.rs @@ -3,7 +3,7 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; -use tokio::runtime::{self, Runtime, Builder}; +use tokio::runtime::{self, Builder, Runtime}; use tokio::sync::oneshot; use tokio_test::{assert_err, assert_ok}; @@ -60,11 +60,13 @@ fn no_lifo_slot_complex() { // used for notifying the main thread const NUM: usize = 1_000; - for _ in 0..5 { let (tx, rx) = mpsc::channel(); - let rt = Builder::new_multi_thread().lifo_slot_optimization(false).build().unwrap(); + let rt = Builder::new_multi_thread() + .lifo_slot_optimization(false) + .build() + .unwrap(); let cnt = Arc::new(AtomicUsize::new(0)); for _ in 0..NUM { From 3cb6cd21688c1c05db9188be2a83dac40187ac72 Mon Sep 17 00:00:00 2001 From: Gus Wynn Date: Fri, 20 Aug 2021 15:24:23 -0700 Subject: [PATCH 3/3] more fixes --- tokio/src/runtime/builder.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index b7f5cfd3a51..dd252407fcf 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -263,7 +263,6 @@ impl Builder { self } - /// Specifies the limit for additional threads spawned by the Runtime. /// /// These threads are used for blocking operations like tasks spawned @@ -624,6 +623,7 @@ impl fmt::Debug for Builder { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("Builder") .field("worker_threads", &self.worker_threads) + .field("lifo_slot_optimization", &self.lifo_slot_optimization) .field("max_blocking_threads", &self.max_blocking_threads) .field( "thread_name",