From e951d55720689ae5058d8af97a71a66d063f87cd Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 11 Jan 2022 18:39:56 -0800 Subject: [PATCH] rt: refactor current-thread scheduler (take 2) (#4395) Re-applies #4377 and fixes the bug resulting in Hyper's double panic. Revert: #4394 Original PR: This PR does some refactoring to the current-thread scheduler bringing it closer to the structure of the multi-threaded scheduler. More specifically, the core scheduler data is stored in a Core struct and that struct is passed around as a "token" indicating permission to do work. The Core structure is also stored in the thread-local context. This refactor is intended to support #4373, making it easier to track counters in more locations in the current-thread scheduler. I tried to keep commits small, but the "set Core in thread-local context" is both the biggest commit and the key one. --- tokio/src/runtime/basic_scheduler.rs | 420 ++++++++++-------- tokio/src/runtime/mod.rs | 2 +- .../src/runtime/tests/loom_basic_scheduler.rs | 20 +- tokio/src/runtime/thread_pool/mod.rs | 3 - tokio/src/runtime/thread_pool/worker.rs | 3 +- .../thread_pool => util}/atomic_cell.rs | 10 +- tokio/src/util/mod.rs | 3 + tokio/tests/rt_basic.rs | 29 ++ 8 files changed, 275 insertions(+), 215 deletions(-) rename tokio/src/{runtime/thread_pool => util}/atomic_cell.rs (77%) diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 872d0d5b897..f70fa656925 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -3,10 +3,12 @@ use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; use crate::runtime::context::EnterGuard; +use crate::runtime::driver::Driver; use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::Callback; use crate::sync::notify::Notify; +use crate::util::atomic_cell::AtomicCell; use crate::util::{waker_ref, Wake, WakerRef}; use std::cell::RefCell; @@ -19,13 +21,12 @@ use std::task::Poll::{Pending, Ready}; use std::time::Duration; /// Executes tasks on the current thread -pub(crate) struct BasicScheduler { - /// Inner state guarded by a mutex that is shared - /// between all `block_on` calls. - inner: Mutex>>, +pub(crate) struct BasicScheduler { + /// Core scheduler data is acquired by a thread entering `block_on`. + core: AtomicCell, /// Notifier for waking up other threads to steal the - /// parker. + /// driver. notify: Notify, /// Sendable task spawner @@ -38,15 +39,11 @@ pub(crate) struct BasicScheduler { context_guard: Option, } -/// The inner scheduler that owns the task queue and the main parker P. -struct Inner { +/// Data required for executing the scheduler. The struct is passed around to +/// a function that will perform the scheduling work and acts as a capability token. +struct Core { /// Scheduler run queue - /// - /// When the scheduler is executed, the queue is removed from `self` and - /// moved into `Context`. - /// - /// This indirection is to allow `BasicScheduler` to be `Send`. - tasks: Option, + tasks: VecDeque>>, /// Sendable task spawner spawner: Spawner, @@ -54,13 +51,10 @@ struct Inner { /// Current tick tick: u8, - /// Thread park handle - park: P, - - /// Callback for a worker parking itself - before_park: Option, - /// Callback for a worker unparking itself - after_unpark: Option, + /// Runtime driver + /// + /// The driver is removed before starting to park the thread + driver: Option, /// Stats batcher stats: WorkerStatsBatcher, @@ -71,13 +65,6 @@ pub(crate) struct Spawner { shared: Arc, } -struct Tasks { - /// Local run queue. - /// - /// Tasks notified from the current thread are pushed into this queue. - queue: VecDeque>>, -} - /// A remote scheduler entry. /// /// These are filled in by remote threads sending instructions to the scheduler. @@ -100,22 +87,29 @@ struct Shared { owned: OwnedTasks>, /// Unpark the blocked thread. - unpark: Box, + unpark: ::Unpark, /// Indicates whether the blocked on thread was woken. woken: AtomicBool, + /// Callback for a worker parking itself + before_park: Option, + + /// Callback for a worker unparking itself + after_unpark: Option, + /// Keeps track of various runtime stats. stats: RuntimeStats, } /// Thread-local context. struct Context { - /// Shared scheduler state - shared: Arc, + /// Handle to the spawner + spawner: Spawner, - /// Local queue - tasks: RefCell, + /// Scheduler core, enabling the holder of `Context` to execute the + /// scheduler. + core: RefCell>>, } /// Initial queue capacity. @@ -133,38 +127,36 @@ const REMOTE_FIRST_INTERVAL: u8 = 31; // Tracks the current BasicScheduler. scoped_thread_local!(static CURRENT: Context); -impl BasicScheduler

{ +impl BasicScheduler { pub(crate) fn new( - park: P, + driver: Driver, before_park: Option, after_unpark: Option, - ) -> BasicScheduler

{ - let unpark = Box::new(park.unpark()); + ) -> BasicScheduler { + let unpark = driver.unpark(); let spawner = Spawner { shared: Arc::new(Shared { queue: Mutex::new(Some(VecDeque::with_capacity(INITIAL_CAPACITY))), owned: OwnedTasks::new(), - unpark: unpark as Box, + unpark, woken: AtomicBool::new(false), + before_park, + after_unpark, stats: RuntimeStats::new(1), }), }; - let inner = Mutex::new(Some(Inner { - tasks: Some(Tasks { - queue: VecDeque::with_capacity(INITIAL_CAPACITY), - }), + let core = AtomicCell::new(Some(Box::new(Core { + tasks: VecDeque::with_capacity(INITIAL_CAPACITY), spawner: spawner.clone(), tick: 0, - park, - before_park, - after_unpark, + driver: Some(driver), stats: WorkerStatsBatcher::new(0), - })); + }))); BasicScheduler { - inner, + core, notify: Notify::new(), spawner, context_guard: None, @@ -178,12 +170,12 @@ impl BasicScheduler

{ pub(crate) fn block_on(&self, future: F) -> F::Output { pin!(future); - // Attempt to steal the dedicated parker and block_on the future if we can there, - // otherwise, lets select on a notification that the parker is available - // or the future is complete. + // Attempt to steal the scheduler core and block_on the future if we can + // there, otherwise, lets select on a notification that the core is + // available or the future is complete. loop { - if let Some(inner) = &mut self.take_inner() { - return inner.block_on(future); + if let Some(core) = self.take_core() { + return core.block_on(future); } else { let mut enter = crate::runtime::enter(false); @@ -210,11 +202,14 @@ impl BasicScheduler

{ } } - fn take_inner(&self) -> Option> { - let inner = self.inner.lock().take()?; + fn take_core(&self) -> Option> { + let core = self.core.take()?; - Some(InnerGuard { - inner: Some(inner), + Some(CoreGuard { + context: Context { + spawner: self.spawner.clone(), + core: RefCell::new(Some(core)), + }, basic_scheduler: self, }) } @@ -224,156 +219,109 @@ impl BasicScheduler

{ } } -impl Inner

{ - /// Blocks on the provided future and drives the runtime's driver. - fn block_on(&mut self, future: F) -> F::Output { - enter(self, |scheduler, context| { - let _enter = crate::runtime::enter(false); - let waker = scheduler.spawner.waker_ref(); - let mut cx = std::task::Context::from_waker(&waker); - - pin!(future); - - 'outer: loop { - if scheduler.spawner.reset_woken() { - scheduler.stats.incr_poll_count(); - if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { - return v; - } - } +impl Context { + /// Execute the closure with the given scheduler core stored in the + /// thread-local context. + fn run_task(&self, mut core: Box, f: impl FnOnce() -> R) -> (Box, R) { + core.stats.incr_poll_count(); + self.enter(core, || crate::coop::budget(f)) + } - for _ in 0..MAX_TASKS_PER_TICK { - // Get and increment the current tick - let tick = scheduler.tick; - scheduler.tick = scheduler.tick.wrapping_add(1); + /// Blocks the current thread until an event is received by the driver, + /// including I/O events, timer events, ... + fn park(&self, mut core: Box) -> Box { + let mut driver = core.driver.take().expect("driver missing"); + + if let Some(f) = &self.spawner.shared.before_park { + // Incorrect lint, the closures are actually different types so `f` + // cannot be passed as an argument to `enter`. + #[allow(clippy::redundant_closure)] + let (c, _) = self.enter(core, || f()); + core = c; + } - let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { - scheduler.spawner.pop().or_else(|| { - context - .tasks - .borrow_mut() - .queue - .pop_front() - .map(RemoteMsg::Schedule) - }) - } else { - context - .tasks - .borrow_mut() - .queue - .pop_front() - .map(RemoteMsg::Schedule) - .or_else(|| scheduler.spawner.pop()) - }; + // This check will fail if `before_park` spawns a task for us to run + // instead of parking the thread + if core.tasks.is_empty() { + // Park until the thread is signaled + core.stats.about_to_park(); + core.stats.submit(&core.spawner.shared.stats); - let entry = match entry { - Some(entry) => entry, - None => { - if let Some(f) = &scheduler.before_park { - f(); - } - // This check will fail if `before_park` spawns a task for us to run - // instead of parking the thread - if context.tasks.borrow_mut().queue.is_empty() { - // Park until the thread is signaled - scheduler.stats.about_to_park(); - scheduler.stats.submit(&scheduler.spawner.shared.stats); - scheduler.park.park().expect("failed to park"); - scheduler.stats.returned_from_park(); - } - if let Some(f) = &scheduler.after_unpark { - f(); - } + let (c, _) = self.enter(core, || { + driver.park().expect("failed to park"); + }); - // Try polling the `block_on` future next - continue 'outer; - } - }; + core = c; + core.stats.returned_from_park(); + } - match entry { - RemoteMsg::Schedule(task) => { - scheduler.stats.incr_poll_count(); - let task = context.shared.owned.assert_owner(task); - crate::coop::budget(|| task.run()) - } - } - } + if let Some(f) = &self.spawner.shared.after_unpark { + // Incorrect lint, the closures are actually different types so `f` + // cannot be passed as an argument to `enter`. + #[allow(clippy::redundant_closure)] + let (c, _) = self.enter(core, || f()); + core = c; + } - // Yield to the park, this drives the timer and pulls any pending - // I/O events. - scheduler.stats.submit(&scheduler.spawner.shared.stats); - scheduler - .park - .park_timeout(Duration::from_millis(0)) - .expect("failed to park"); - } - }) + core.driver = Some(driver); + core } -} -/// Enters the scheduler context. This sets the queue and other necessary -/// scheduler state in the thread-local. -fn enter(scheduler: &mut Inner

, f: F) -> R -where - F: FnOnce(&mut Inner

, &Context) -> R, - P: Park, -{ - // Ensures the run queue is placed back in the `BasicScheduler` instance - // once `block_on` returns.` - struct Guard<'a, P: Park> { - context: Option, - scheduler: &'a mut Inner

, - } + /// Checks the driver for new events without blocking the thread. + fn park_yield(&self, mut core: Box) -> Box { + let mut driver = core.driver.take().expect("driver missing"); - impl Drop for Guard<'_, P> { - fn drop(&mut self) { - let Context { tasks, .. } = self.context.take().expect("context missing"); - self.scheduler.tasks = Some(tasks.into_inner()); - } - } + core.stats.submit(&core.spawner.shared.stats); + let (mut core, _) = self.enter(core, || { + driver + .park_timeout(Duration::from_millis(0)) + .expect("failed to park"); + }); - // Remove `tasks` from `self` and place it in a `Context`. - let tasks = scheduler.tasks.take().expect("invalid state"); + core.driver = Some(driver); + core + } - let guard = Guard { - context: Some(Context { - shared: scheduler.spawner.shared.clone(), - tasks: RefCell::new(tasks), - }), - scheduler, - }; + fn enter(&self, core: Box, f: impl FnOnce() -> R) -> (Box, R) { + // Store the scheduler core in the thread-local context + // + // A drop-guard is employed at a higher level. + *self.core.borrow_mut() = Some(core); - let context = guard.context.as_ref().unwrap(); - let scheduler = &mut *guard.scheduler; + // Execute the closure while tracking the execution budget + let ret = f(); - CURRENT.set(context, || f(scheduler, context)) + // Take the scheduler core back + let core = self.core.borrow_mut().take().expect("core missing"); + (core, ret) + } } -impl Drop for BasicScheduler

{ +impl Drop for BasicScheduler { fn drop(&mut self) { // Avoid a double panic if we are currently panicking and // the lock may be poisoned. - let mut inner = match self.inner.lock().take() { - Some(inner) => inner, + let core = match self.take_core() { + Some(core) => core, None if std::thread::panicking() => return, - None => panic!("Oh no! We never placed the Inner state back, this is a bug!"), + None => panic!("Oh no! We never placed the Core back, this is a bug!"), }; - enter(&mut inner, |scheduler, context| { + core.enter(|mut core, context| { // Drain the OwnedTasks collection. This call also closes the // collection, ensuring that no tasks are ever pushed after this // call returns. - context.shared.owned.close_and_shutdown_all(); + context.spawner.shared.owned.close_and_shutdown_all(); // Drain local queue // We already shut down every task, so we just need to drop the task. - for task in context.tasks.borrow_mut().queue.drain(..) { + while let Some(task) = core.tasks.pop_front() { drop(task); } // Drain remote queue and set it to None - let remote_queue = scheduler.spawner.shared.queue.lock().take(); + let remote_queue = core.spawner.shared.queue.lock().take(); // Using `Option::take` to replace the shared queue with `None`. // We already shut down every task, so we just need to drop the task. @@ -387,12 +335,14 @@ impl Drop for BasicScheduler

{ } } - assert!(context.shared.owned.is_empty()); + assert!(context.spawner.shared.owned.is_empty()); + + (core, ()) }); } } -impl fmt::Debug for BasicScheduler

{ +impl fmt::Debug for BasicScheduler { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("BasicScheduler").finish() } @@ -455,8 +405,14 @@ impl Schedule for Arc { fn schedule(&self, task: task::Notified) { CURRENT.with(|maybe_cx| match maybe_cx { - Some(cx) if Arc::ptr_eq(self, &cx.shared) => { - cx.tasks.borrow_mut().queue.push_back(task); + Some(cx) if Arc::ptr_eq(self, &cx.spawner.shared) => { + let mut core = cx.core.borrow_mut(); + + // If `None`, the runtime is shutting down, so there is no need + // to schedule the task. + if let Some(core) = core.as_mut() { + core.tasks.push_back(task); + } } _ => { // If the queue is None, then the runtime has shut down. We @@ -484,35 +440,107 @@ impl Wake for Shared { } } -// ===== InnerGuard ===== +// ===== CoreGuard ===== -/// Used to ensure we always place the Inner value -/// back into its slot in `BasicScheduler`, even if the -/// future panics. -struct InnerGuard<'a, P: Park> { - inner: Option>, - basic_scheduler: &'a BasicScheduler

, +/// Used to ensure we always place the `Core` value back into its slot in +/// `BasicScheduler`, even if the future panics. +struct CoreGuard<'a> { + context: Context, + basic_scheduler: &'a BasicScheduler, } -impl InnerGuard<'_, P> { - fn block_on(&mut self, future: F) -> F::Output { - // The only time inner gets set to `None` is if we have dropped - // already so this unwrap is safe. - self.inner.as_mut().unwrap().block_on(future) +impl CoreGuard<'_> { + fn block_on(self, future: F) -> F::Output { + self.enter(|mut core, context| { + let _enter = crate::runtime::enter(false); + let waker = context.spawner.waker_ref(); + let mut cx = std::task::Context::from_waker(&waker); + + pin!(future); + + 'outer: loop { + if core.spawner.reset_woken() { + let (c, res) = context.run_task(core, || future.as_mut().poll(&mut cx)); + + core = c; + + if let Ready(v) = res { + return (core, v); + } + } + + for _ in 0..MAX_TASKS_PER_TICK { + // Get and increment the current tick + let tick = core.tick; + core.tick = core.tick.wrapping_add(1); + + let entry = if tick % REMOTE_FIRST_INTERVAL == 0 { + core.spawner + .pop() + .or_else(|| core.tasks.pop_front().map(RemoteMsg::Schedule)) + } else { + core.tasks + .pop_front() + .map(RemoteMsg::Schedule) + .or_else(|| core.spawner.pop()) + }; + + let entry = match entry { + Some(entry) => entry, + None => { + core = context.park(core); + + // Try polling the `block_on` future next + continue 'outer; + } + }; + + match entry { + RemoteMsg::Schedule(task) => { + let task = context.spawner.shared.owned.assert_owner(task); + + let (c, _) = context.run_task(core, || { + task.run(); + }); + + core = c; + } + } + } + + // Yield to the driver, this drives the timer and pulls any + // pending I/O events. + core = context.park_yield(core); + } + }) + } + + /// Enters the scheduler context. This sets the queue and other necessary + /// scheduler state in the thread-local. + fn enter(self, f: F) -> R + where + F: FnOnce(Box, &Context) -> (Box, R), + { + // Remove `core` from `context` to pass into the closure. + let core = self.context.core.borrow_mut().take().expect("core missing"); + + // Call the closure and place `core` back + let (core, ret) = CURRENT.set(&self.context, || f(core, &self.context)); + + *self.context.core.borrow_mut() = Some(core); + + ret } } -impl Drop for InnerGuard<'_, P> { +impl Drop for CoreGuard<'_> { fn drop(&mut self) { - if let Some(scheduler) = self.inner.take() { - let mut lock = self.basic_scheduler.inner.lock(); - + if let Some(core) = self.context.core.borrow_mut().take() { // Replace old scheduler back into the state to allow // other threads to pick it up and drive it. - lock.replace(scheduler); + self.basic_scheduler.core.set(core); - // Wake up other possible threads that could steal - // the dedicated parker P. + // Wake up other possible threads that could steal the driver. self.basic_scheduler.notify.notify_one() } } diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 847dd5972e1..e77c5e3a0f8 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -283,7 +283,7 @@ cfg_rt! { #[derive(Debug)] enum Kind { /// Execute all tasks on the current-thread. - CurrentThread(BasicScheduler), + CurrentThread(BasicScheduler), /// Execute tasks across multiple threads. #[cfg(feature = "rt-multi-thread")] diff --git a/tokio/src/runtime/tests/loom_basic_scheduler.rs b/tokio/src/runtime/tests/loom_basic_scheduler.rs index d2894b9b27e..a772603f711 100644 --- a/tokio/src/runtime/tests/loom_basic_scheduler.rs +++ b/tokio/src/runtime/tests/loom_basic_scheduler.rs @@ -34,20 +34,22 @@ fn assert_at_most_num_polls(rt: Arc, at_most_polls: usize) { #[test] fn block_on_num_polls() { loom::model(|| { - // we expect at most 3 number of polls because there are - // three points at which we poll the future. At any of these - // points it can be ready: + // we expect at most 4 number of polls because there are three points at + // which we poll the future and an opportunity for a false-positive.. At + // any of these points it can be ready: // - // - when we fail to steal the parker and we block on a - // notification that it is available. + // - when we fail to steal the parker and we block on a notification + // that it is available. // // - when we steal the parker and we schedule the future // - // - when the future is woken up and we have ran the max - // number of tasks for the current tick or there are no - // more tasks to run. + // - when the future is woken up and we have ran the max number of tasks + // for the current tick or there are no more tasks to run. // - let at_most = 3; + // - a thread is notified that the parker is available but a third + // thread acquires it before the notified thread can. + // + let at_most = 4; let rt1 = Arc::new(Builder::new_current_thread().build().unwrap()); let rt2 = rt1.clone(); diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 82e34c78d28..3e1ce448215 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -1,8 +1,5 @@ //! Threadpool -mod atomic_cell; -use atomic_cell::AtomicCell; - mod idle; use self::idle::Idle; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index ae8efe6724f..27d0d5e7d32 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -66,8 +66,9 @@ use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; -use crate::runtime::thread_pool::{AtomicCell, Idle}; +use crate::runtime::thread_pool::Idle; use crate::runtime::{queue, task, Callback}; +use crate::util::atomic_cell::AtomicCell; use crate::util::FastRand; use std::cell::RefCell; diff --git a/tokio/src/runtime/thread_pool/atomic_cell.rs b/tokio/src/util/atomic_cell.rs similarity index 77% rename from tokio/src/runtime/thread_pool/atomic_cell.rs rename to tokio/src/util/atomic_cell.rs index 98847e6ffa1..07e37303a7b 100644 --- a/tokio/src/runtime/thread_pool/atomic_cell.rs +++ b/tokio/src/util/atomic_cell.rs @@ -3,7 +3,7 @@ use crate::loom::sync::atomic::AtomicPtr; use std::ptr; use std::sync::atomic::Ordering::AcqRel; -pub(super) struct AtomicCell { +pub(crate) struct AtomicCell { data: AtomicPtr, } @@ -11,22 +11,22 @@ unsafe impl Send for AtomicCell {} unsafe impl Sync for AtomicCell {} impl AtomicCell { - pub(super) fn new(data: Option>) -> AtomicCell { + pub(crate) fn new(data: Option>) -> AtomicCell { AtomicCell { data: AtomicPtr::new(to_raw(data)), } } - pub(super) fn swap(&self, val: Option>) -> Option> { + pub(crate) fn swap(&self, val: Option>) -> Option> { let old = self.data.swap(to_raw(val), AcqRel); from_raw(old) } - pub(super) fn set(&self, val: Box) { + pub(crate) fn set(&self, val: Box) { let _ = self.swap(Some(val)); } - pub(super) fn take(&self) -> Option> { + pub(crate) fn take(&self) -> Option> { self.swap(None) } } diff --git a/tokio/src/util/mod.rs b/tokio/src/util/mod.rs index df30f2b86a9..f0a79a7cca9 100644 --- a/tokio/src/util/mod.rs +++ b/tokio/src/util/mod.rs @@ -3,6 +3,9 @@ cfg_io_driver! { pub(crate) mod slab; } +#[cfg(feature = "rt")] +pub(crate) mod atomic_cell; + #[cfg(any( // io driver uses `WakeList` directly feature = "net", diff --git a/tokio/tests/rt_basic.rs b/tokio/tests/rt_basic.rs index 70056b16f01..149b3bfaad7 100644 --- a/tokio/tests/rt_basic.rs +++ b/tokio/tests/rt_basic.rs @@ -168,6 +168,35 @@ fn drop_tasks_in_context() { assert!(SUCCESS.load(Ordering::SeqCst)); } +#[test] +#[should_panic(expected = "boom")] +fn wake_in_drop_after_panic() { + let (tx, rx) = oneshot::channel::<()>(); + + struct WakeOnDrop(Option>); + + impl Drop for WakeOnDrop { + fn drop(&mut self) { + self.0.take().unwrap().send(()).unwrap(); + } + } + + let rt = rt(); + + rt.spawn(async move { + let _wake_on_drop = WakeOnDrop(Some(tx)); + // wait forever + futures::future::pending::<()>().await; + }); + + let _join = rt.spawn(async move { rx.await }); + + rt.block_on(async { + tokio::task::yield_now().await; + panic!("boom"); + }); +} + #[test] #[should_panic( expected = "A Tokio 1.x context was found, but timers are disabled. Call `enable_time` on the runtime builder to enable timers."