From 0ef86461f38f9c18eff103937bf1353bfbbd79ee Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Tue, 2 May 2023 16:36:33 +0000 Subject: [PATCH 1/3] taskdump: implement task dumps for multi-thread runtime This PR implements task dumps on the multi-thread runtime. It complements #5608, which implemented task dumps on the current-thread runtime. --- examples/dump.rs | 32 +-- tokio/src/runtime/handle.rs | 31 ++- .../runtime/scheduler/multi_thread/handle.rs | 25 +++ .../src/runtime/scheduler/multi_thread/mod.rs | 4 + .../runtime/scheduler/multi_thread/worker.rs | 183 +++++++++++++++++- tokio/src/runtime/task/trace/mod.rs | 49 +++++ tokio/tests/dump.rs | 98 ++++++++++ tokio/tests/dump_current_thread.rs | 55 ------ 8 files changed, 400 insertions(+), 77 deletions(-) create mode 100644 tokio/tests/dump.rs delete mode 100644 tokio/tests/dump_current_thread.rs diff --git a/examples/dump.rs b/examples/dump.rs index 159cc603ba0..c7358489933 100644 --- a/examples/dump.rs +++ b/examples/dump.rs @@ -6,7 +6,7 @@ target_os = "linux", any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { use std::hint::black_box; @@ -22,21 +22,29 @@ async fn main() { #[inline(never)] async fn c() { - black_box(tokio::task::yield_now()).await + loop { + tokio::task::yield_now().await; + } } - tokio::spawn(a()); - tokio::spawn(b()); - tokio::spawn(c()); + async fn dump() { + let handle = tokio::runtime::Handle::current(); + let dump = handle.dump().await; - let handle = tokio::runtime::Handle::current(); - let dump = handle.dump(); - - for (i, task) in dump.tasks().iter().enumerate() { - let trace = task.trace(); - println!("task {i} trace:"); - println!("{trace}"); + for (i, task) in dump.tasks().iter().enumerate() { + let trace = task.trace(); + println!("task {i} trace:"); + println!("{trace}\n"); + } } + + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(b()) => {}, + _ = tokio::spawn(c()) => {}, + _ = dump() => {}, + ); } #[cfg(not(all( diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index a4dc437dbb6..43a7a8a63bd 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -343,15 +343,40 @@ cfg_metrics! { cfg_taskdump! { impl Handle { /// Capture a snapshot of this runtime's state. - pub fn dump(&self) -> crate::runtime::Dump { + pub async fn dump(&self) -> crate::runtime::Dump { match &self.inner { scheduler::Handle::CurrentThread(handle) => handle.dump(), #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - scheduler::Handle::MultiThread(_) => - unimplemented!("taskdumps are unsupported on the multi-thread runtime"), + scheduler::Handle::MultiThread(handle) => { + // perform the trace in a separate thread so that the + // trace itself does not appear in the taskdump. + let handle = handle.clone(); + spawn_thread(async { + let handle = handle; + handle.dump().await + }).await + }, } } } + + cfg_rt_multi_thread! { + /// Spawn a new thread and asynchronously await on its result. + async fn spawn_thread(f: F) -> ::Output + where + F: Future + Send + 'static, + ::Output: Send + 'static + { + let (tx, rx) = crate::sync::oneshot::channel(); + crate::loom::thread::spawn(|| { + let rt = crate::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(async { + let _ = tx.send(f.await); + }); + }); + rx.await.unwrap() + } + } } /// Error returned by `try_current` when no Runtime has been started diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 77baaeb06d4..cf3cf52af34 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -95,6 +95,31 @@ cfg_metrics! { } } +cfg_taskdump! { + impl Handle { + pub(crate) async fn dump(&self) -> crate::runtime::Dump { + let trace_status = &self.shared.trace_status; + + // If a dump is in progress, block. + trace_status.start_trace_request(&self).await; + + let result = loop { + if let Some(result) = trace_status.take_result() { + break result; + } else { + self.notify_all(); + trace_status.result_ready.notified().await; + } + }; + + // Allow other queued dumps to proceed. + trace_status.end_trace_request(&self).await; + + result + } + } +} + impl fmt::Debug for Handle { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("multi_thread::Handle { ... }").finish() diff --git a/tokio/src/runtime/scheduler/multi_thread/mod.rs b/tokio/src/runtime/scheduler/multi_thread/mod.rs index 306a622b3ed..3db15fa8430 100644 --- a/tokio/src/runtime/scheduler/multi_thread/mod.rs +++ b/tokio/src/runtime/scheduler/multi_thread/mod.rs @@ -23,6 +23,10 @@ pub(crate) mod queue; mod worker; pub(crate) use worker::{Context, Launch, Shared}; +cfg_taskdump! { + pub(crate) use worker::Synced; +} + pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 947b6fb7f8c..150cf64e246 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -74,6 +74,10 @@ use std::cell::RefCell; use std::task::Waker; use std::time::Duration; +cfg_taskdump! { + use std::sync::Barrier; +} + /// A scheduler worker pub(super) struct Worker { /// Reference to scheduler's handle @@ -112,6 +116,9 @@ struct Core { /// True if the scheduler is being shutdown is_shutdown: bool, + /// True if the scheduler is being traced + is_traced: bool, + /// Parker /// /// Stored in an `Option` as the parker is added / removed to make the @@ -137,7 +144,7 @@ pub(crate) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - inject: inject::Shared>, + pub(super) inject: inject::Shared>, /// Coordinates idle workers idle: Idle, @@ -155,6 +162,9 @@ pub(crate) struct Shared { #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex>>, + /// The number of cores that have observed the trace signal. + pub(super) trace_status: TraceStatus, + /// Scheduler configuration options config: Config, @@ -171,18 +181,18 @@ pub(crate) struct Shared { } /// Data synchronized by the scheduler mutex -pub(super) struct Synced { +pub(crate) struct Synced { /// Synchronized state for `Idle`. pub(super) idle: idle::Synced, /// Synchronized state for `Inject`. - inject: inject::Synced, + pub(crate) inject: inject::Synced, } /// Used to communicate with a worker from other threads. struct Remote { /// Steals tasks from this worker. - steal: queue::Steal>, + pub(super) steal: queue::Steal>, /// Unparks the associated worker thread unpark: Unparker, @@ -204,6 +214,82 @@ pub(crate) struct Context { /// Starts the workers pub(crate) struct Launch(Vec>); +cfg_not_taskdump! { + pub(super) struct TraceStatus {} + + impl TraceStatus { + fn new(_: usize) -> Self { + Self {} + } + + fn trace_requested(&self) -> bool { + false + } + } +} + +cfg_taskdump! { + use crate::sync::notify::Notify; + use crate::runtime::dump::Dump; + use crate::loom::sync::atomic::{AtomicBool, Ordering}; + + /// Tracing status of the worker. + pub(super) struct TraceStatus { + pub(super) trace_requested: AtomicBool, + trace_start: Barrier, + trace_end: Barrier, + pub(super) result_ready: Notify, + pub(super) trace_result: Mutex>, + } + + impl TraceStatus { + fn new(remotes_len: usize) -> Self { + Self { + trace_requested: AtomicBool::new(false), + trace_start: Barrier::new(remotes_len), + trace_end: Barrier::new(remotes_len), + result_ready: Notify::new(), + trace_result: Mutex::new(None), + } + } + + fn trace_requested(&self) -> bool { + self.trace_requested.load(Ordering::Relaxed) + } + + pub(super) async fn start_trace_request(&self, handle: &Handle) { + while self.trace_requested.compare_exchange(false, + true, + Ordering::Acquire, + Ordering::Relaxed).is_err() + { + handle.notify_all(); + crate::task::yield_now().await; + } + } + + fn stash_result(&self, dump: Dump) { + let _ = self.trace_result.lock().insert(dump); + self.result_ready.notify_one(); + } + + pub(super) fn take_result(&self) -> Option { + self.trace_result.lock().take() + } + + pub(super) async fn end_trace_request(&self, handle: &Handle) { + while self.trace_requested.compare_exchange(true, + false, + Ordering::Acquire, + Ordering::Relaxed).is_err() + { + handle.notify_all(); + crate::task::yield_now().await; + } + } + } +} + /// Running a task may consume the core. If the core is still available when /// running the task completes, it is returned. Otherwise, the worker will need /// to stop processing. @@ -249,6 +335,7 @@ pub(super) fn create( run_queue, is_searching: false, is_shutdown: false, + is_traced: false, park: Some(park), global_queue_interval: stats.tuned_global_queue_interval(&config), stats, @@ -262,6 +349,7 @@ pub(super) fn create( let (idle, idle_synced) = Idle::new(size); let (inject, inject_synced) = inject::Shared::new(); + let remotes_len = remotes.len(); let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), @@ -273,6 +361,7 @@ pub(super) fn create( inject: inject_synced, }), shutdown_cores: Mutex::new(vec![]), + trace_status: TraceStatus::new(remotes_len), config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), @@ -476,6 +565,10 @@ impl Context { while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); + if core.is_traced { + core = self.worker.handle.trace_core(core); + } + // Increment the tick core.tick(); @@ -649,7 +742,7 @@ impl Context { } if core.transition_to_parked(&self.worker) { - while !core.is_shutdown { + while !core.is_shutdown && !core.is_traced { core.stats.about_to_park(); core = self.park_timeout(core, None); @@ -825,7 +918,7 @@ impl Core { /// Returns true if the transition happened, false if there is work to do first. fn transition_to_parked(&mut self, worker: &Worker) -> bool { // Workers should not park if they have work to do - if self.lifo_slot.is_some() || self.run_queue.has_tasks() { + if self.lifo_slot.is_some() || self.run_queue.has_tasks() || self.is_traced { return false; } @@ -890,6 +983,11 @@ impl Core { let synced = worker.handle.shared.synced.lock(); self.is_shutdown = worker.inject().is_closed(&synced.inject); } + + if !self.is_traced { + // Check if the worker should be tracing. + self.is_traced = worker.handle.shared.trace_status.trace_requested(); + } } /// Signals all tasks to shut down, and waits for them to complete. Must run @@ -1048,7 +1146,7 @@ impl Handle { } } - fn notify_all(&self) { + pub(super) fn notify_all(&self) { for remote in &self.shared.remotes[..] { remote.unpark.unpark(&self.driver); } @@ -1101,6 +1199,54 @@ impl Handle { } } + cfg_not_taskdump! { + fn trace_core(&self, core: Box) -> Box { + core + } + } + + cfg_taskdump! { + fn trace_core(&self, mut core: Box) -> Box { + use crate::runtime::dump; + use task::trace::trace_multi_thread; + + core.is_traced = false; + + // wait for other workers + let barrier = self.shared.trace_status.trace_start.wait(); + + if !barrier.is_leader() { + // wait for leader to finish tracing + self.shared.trace_status.trace_end.wait(); + return core; + } + + // trace + + let owned = &self.shared.owned; + let mut local = self.shared.steal_all(); + let synced = &self.shared.synced; + let injection = &self.shared.inject; + + // safety: `trace_multi_thread` is invoked with the same `synced` that `injection` + // was created with. + let traces = unsafe { trace_multi_thread(owned, &mut local, synced, injection) } + .into_iter() + .map(dump::Task::new) + .collect(); + + let result = dump::Dump::new(traces); + + // stash the result + self.shared.trace_status.stash_result(result); + + // allow other workers to proceed + self.shared.trace_status.trace_end.wait(); + + core + } + } + fn ptr_eq(&self, other: &Handle) -> bool { std::ptr::eq(self, other) } @@ -1163,6 +1309,29 @@ cfg_metrics! { } } +cfg_taskdump! { + impl Shared { + /// Steal all tasks from remotes into a single local queue. + pub(super) fn steal_all(&self) -> super::queue::Local> { + let (_steal, mut local) = super::queue::local(); + + let worker_metrics = WorkerMetrics::new(); + let mut stats = Stats::new(&worker_metrics); + + for remote in self.remotes.iter() { + let steal = &remote.steal; + while !steal.is_empty() { + if let Some(task) = steal.steal_into(&mut local, &mut stats) { + local.push_back([task].into_iter()); + } + } + } + + local + } + } +} + // `u32::abs_diff` is not available on Tokio's MSRV. fn abs_diff(a: u32, b: u32) -> u32 { if a > b { diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index 6299d75d581..543b7eee98e 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -279,3 +279,52 @@ pub(in crate::runtime) fn trace_current_thread( }) .collect() } + +cfg_rt_multi_thread! { + use crate::loom::sync::Mutex; + use crate::runtime::scheduler::multi_thread; + use crate::runtime::scheduler::multi_thread::Synced; + use crate::runtime::scheduler::inject::Shared; + + /// Trace and poll all tasks of the current_thread runtime. + /// + /// ## Safety + /// + /// Must be called with the same `synced` that `injection` was created with. + pub(in crate::runtime) unsafe fn trace_multi_thread( + owned: &OwnedTasks>, + local: &mut multi_thread::queue::Local>, + synced: &Mutex, + injection: &Shared>, + ) -> Vec { + // clear the local queue + while let Some(notified) = local.pop() { + drop(notified); + } + + // clear the injection queue + let mut synced = synced.lock(); + while let Some(notified) = injection.pop(&mut synced.inject) { + drop(notified); + } + + drop(synced); + + // notify each task + let mut traces = vec![]; + owned.for_each(|task| { + // set the notified bit + task.as_raw().state().transition_to_notified_for_tracing(); + + // trace the task + let ((), trace) = Trace::capture(|| task.as_raw().poll()); + traces.push(trace); + + // reschedule the task + let _ = task.as_raw().state().transition_to_notified_by_ref(); + task.as_raw().schedule(); + }); + + traces + } +} diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs new file mode 100644 index 00000000000..79051d335bb --- /dev/null +++ b/tokio/tests/dump.rs @@ -0,0 +1,98 @@ +#![cfg(all( + tokio_unstable, + tokio_taskdump, + target_os = "linux", + any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") +))] + +use std::hint::black_box; +use tokio::runtime::{self, Handle}; + +#[inline(never)] +async fn a() { + black_box(b()).await +} + +#[inline(never)] +async fn b() { + black_box(c()).await +} + +#[inline(never)] +async fn c() { + loop { + black_box(tokio::task::yield_now()).await + } +} + +#[test] +fn current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let dump = handle.dump().await; + + let tasks: Vec<_> = dump.tasks().iter().collect(); + + assert_eq!(tasks.len(), 3); + + for task in tasks { + let trace = task.trace().to_string(); + eprintln!("\n\n{trace}\n\n"); + assert!(trace.contains("dump::a")); + assert!(trace.contains("dump::b")); + assert!(trace.contains("dump::c")); + assert!(trace.contains("tokio::task::yield_now")); + } + } + + rt.block_on(async { + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = dump() => {}, + ); + }); +} + +#[test] +fn multi_thread() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let dump = handle.dump().await; + + let tasks: Vec<_> = dump.tasks().iter().collect(); + + assert_eq!(tasks.len(), 3); + + for task in tasks { + let trace = task.trace().to_string(); + eprintln!("\n\n{trace}\n\n"); + assert!(trace.contains("dump::a")); + assert!(trace.contains("dump::b")); + assert!(trace.contains("dump::c")); + assert!(trace.contains("tokio::task::yield_now")); + } + } + + rt.block_on(async { + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = dump() => {}, + ); + }); +} diff --git a/tokio/tests/dump_current_thread.rs b/tokio/tests/dump_current_thread.rs deleted file mode 100644 index 29661f98fb7..00000000000 --- a/tokio/tests/dump_current_thread.rs +++ /dev/null @@ -1,55 +0,0 @@ -#![cfg(all( - tokio_unstable, - tokio_taskdump, - target_os = "linux", - any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") -))] - -use std::hint::black_box; -use tokio::runtime; - -#[inline(never)] -async fn a() { - black_box(b()).await -} - -#[inline(never)] -async fn b() { - black_box(c()).await -} - -#[inline(never)] -async fn c() { - black_box(tokio::task::yield_now()).await -} - -#[test] -fn test() { - let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - rt.spawn(a()); - - let handle = rt.handle(); - - assert_eq!(handle.dump().tasks().iter().count(), 0); - - let dump = rt.block_on(async { - handle.spawn(a()); - handle.dump() - }); - - let tasks: Vec<_> = dump.tasks().iter().collect(); - - assert_eq!(tasks.len(), 2); - - for task in tasks { - let trace = task.trace().to_string(); - assert!(trace.contains("dump_current_thread::a")); - assert!(trace.contains("dump_current_thread::b")); - assert!(trace.contains("dump_current_thread::c")); - assert!(trace.contains("tokio::task::yield_now")); - } -} From 2ad3939ecdf0381871d8879414ffcc6c03ab6788 Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Tue, 6 Jun 2023 19:16:36 +0000 Subject: [PATCH 2/3] tokio: Use `Barrier::wait_timeout` to avoid taskdump deadlocks. Previously, a deadlock could occur if a trace was initiated in the midst of shutdown: The tracing worker would wait endlessly for the other worker to reach the trace start barrier; the thread shutting down would wait endlessly for the tracing thread to finish tracing. Now, the tracing thread will wait a generous 250ms for workers to synchronize at the trace start barrier, or else abort the trace. --- tokio/src/loom/std/barrier.rs | 217 ++++++++++++++++++ tokio/src/loom/std/mod.rs | 3 + .../runtime/scheduler/multi_thread/worker.rs | 16 +- tokio/tests/dump.rs | 1 + 4 files changed, 234 insertions(+), 3 deletions(-) create mode 100644 tokio/src/loom/std/barrier.rs diff --git a/tokio/src/loom/std/barrier.rs b/tokio/src/loom/std/barrier.rs new file mode 100644 index 00000000000..6481efc161e --- /dev/null +++ b/tokio/src/loom/std/barrier.rs @@ -0,0 +1,217 @@ +//! A `Barrier` that provides `wait_timeout`. +//! +//! This implementation mirrors that of the Rust standard library. + +use crate::loom::sync::{Condvar, Mutex}; +use std::fmt; +use std::time::{Duration, Instant}; + +/// A barrier enables multiple threads to synchronize the beginning +/// of some computation. +/// +/// # Examples +/// +/// ``` +/// use std::sync::{Arc, Barrier}; +/// use std::thread; +/// +/// let mut handles = Vec::with_capacity(10); +/// let barrier = Arc::new(Barrier::new(10)); +/// for _ in 0..10 { +/// let c = Arc::clone(&barrier); +/// // The same messages will be printed together. +/// // You will NOT see any interleaving. +/// handles.push(thread::spawn(move|| { +/// println!("before wait"); +/// c.wait(); +/// println!("after wait"); +/// })); +/// } +/// // Wait for other threads to finish. +/// for handle in handles { +/// handle.join().unwrap(); +/// } +/// ``` +pub(crate) struct Barrier { + lock: Mutex, + cvar: Condvar, + num_threads: usize, +} + +// The inner state of a double barrier +struct BarrierState { + count: usize, + generation_id: usize, +} + +/// A `BarrierWaitResult` is returned by [`Barrier::wait()`] when all threads +/// in the [`Barrier`] have rendezvoused. +/// +/// # Examples +/// +/// ``` +/// use std::sync::Barrier; +/// +/// let barrier = Barrier::new(1); +/// let barrier_wait_result = barrier.wait(); +/// ``` +pub(crate) struct BarrierWaitResult(bool); + +impl fmt::Debug for Barrier { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Barrier").finish_non_exhaustive() + } +} + +impl Barrier { + /// Creates a new barrier that can block a given number of threads. + /// + /// A barrier will block `n`-1 threads which call [`wait()`] and then wake + /// up all threads at once when the `n`th thread calls [`wait()`]. + /// + /// [`wait()`]: Barrier::wait + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(10); + /// ``` + #[must_use] + pub(crate) fn new(n: usize) -> Barrier { + Barrier { + lock: Mutex::new(BarrierState { + count: 0, + generation_id: 0, + }), + cvar: Condvar::new(), + num_threads: n, + } + } + + /// Blocks the current thread until all threads have rendezvoused here. + /// + /// Barriers are re-usable after all threads have rendezvoused once, and can + /// be used continuously. + /// + /// A single (arbitrary) thread will receive a [`BarrierWaitResult`] that + /// returns `true` from [`BarrierWaitResult::is_leader()`] when returning + /// from this function, and all other threads will receive a result that + /// will return `false` from [`BarrierWaitResult::is_leader()`]. + /// + /// # Examples + /// + /// ``` + /// use std::sync::{Arc, Barrier}; + /// use std::thread; + /// + /// let mut handles = Vec::with_capacity(10); + /// let barrier = Arc::new(Barrier::new(10)); + /// for _ in 0..10 { + /// let c = Arc::clone(&barrier); + /// // The same messages will be printed together. + /// // You will NOT see any interleaving. + /// handles.push(thread::spawn(move|| { + /// println!("before wait"); + /// c.wait(); + /// println!("after wait"); + /// })); + /// } + /// // Wait for other threads to finish. + /// for handle in handles { + /// handle.join().unwrap(); + /// } + /// ``` + pub(crate) fn wait(&self) -> BarrierWaitResult { + let mut lock = self.lock.lock(); + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + lock = self.cvar.wait(lock).unwrap(); + } + BarrierWaitResult(false) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + BarrierWaitResult(true) + } + } + + /// Blocks the current thread until all threads have rendezvoused here for + /// at most `timeout` duration. + pub(crate) fn wait_timeout(&self, timeout: Duration) -> Option { + // This implementation mirrors `wait`, but with each blocking operation + // replaced by a timeout-amenable alternative. + + let deadline = Instant::now() + timeout; + + // Acquire `self.lock` with at most `timeout` duration. + let mut lock = loop { + if let Some(guard) = self.lock.try_lock() { + break guard; + } else if Instant::now() > deadline { + return None; + } else { + std::thread::yield_now(); + } + }; + + // Shrink the `timeout` to account for the time taken to acquire `lock`. + let timeout = deadline.saturating_duration_since(Instant::now()); + + let local_gen = lock.generation_id; + lock.count += 1; + if lock.count < self.num_threads { + // We need a while loop to guard against spurious wakeups. + // https://en.wikipedia.org/wiki/Spurious_wakeup + while local_gen == lock.generation_id { + let timeout_result; + (lock, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap(); + if timeout_result.timed_out() { + return None; + } + } + Some(BarrierWaitResult(false)) + } else { + lock.count = 0; + lock.generation_id = lock.generation_id.wrapping_add(1); + self.cvar.notify_all(); + Some(BarrierWaitResult(true)) + } + } +} + +impl fmt::Debug for BarrierWaitResult { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("BarrierWaitResult") + .field("is_leader", &self.is_leader()) + .finish() + } +} + +impl BarrierWaitResult { + /// Returns `true` if this thread is the "leader thread" for the call to + /// [`Barrier::wait()`]. + /// + /// Only one thread will have `true` returned from their result, all other + /// threads will have `false` returned. + /// + /// # Examples + /// + /// ``` + /// use std::sync::Barrier; + /// + /// let barrier = Barrier::new(1); + /// let barrier_wait_result = barrier.wait(); + /// println!("{:?}", barrier_wait_result.is_leader()); + /// ``` + #[must_use] + pub(crate) fn is_leader(&self) -> bool { + self.0 + } +} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 6bd1ad93dcf..0a732791f76 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -4,6 +4,7 @@ mod atomic_u16; mod atomic_u32; mod atomic_u64; mod atomic_usize; +mod barrier; mod mutex; #[cfg(feature = "parking_lot")] mod parking_lot; @@ -76,6 +77,8 @@ pub(crate) mod sync { pub(crate) use std::sync::atomic::{fence, AtomicBool, AtomicPtr, AtomicU8, Ordering}; } + + pub(crate) use super::barrier::Barrier; } pub(crate) mod sys { diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 150cf64e246..cd33af2c81f 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -75,7 +75,7 @@ use std::task::Waker; use std::time::Duration; cfg_taskdump! { - use std::sync::Barrier; + use crate::loom::sync::Barrier; } /// A scheduler worker @@ -1212,8 +1212,18 @@ impl Handle { core.is_traced = false; - // wait for other workers - let barrier = self.shared.trace_status.trace_start.wait(); + if core.is_shutdown { + return core; + } + + // wait for other workers, or timeout without tracing + let timeout = Duration::from_millis(250); // a _very_ generous timeout + let barrier = if let Some(barrier) = self.shared.trace_status.trace_start.wait_timeout(timeout) { + barrier + } else { + // don't attempt to trace + return core; + }; if !barrier.is_leader() { // wait for leader to finish tracing diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs index 79051d335bb..658ee4b9bfc 100644 --- a/tokio/tests/dump.rs +++ b/tokio/tests/dump.rs @@ -65,6 +65,7 @@ fn current_thread() { fn multi_thread() { let rt = runtime::Builder::new_multi_thread() .enable_all() + .worker_threads(3) .build() .unwrap(); From ddfc38cd5cb7971f521e48e9d60f572150c04edf Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Tue, 6 Jun 2023 19:30:47 +0000 Subject: [PATCH 3/3] tokio: remove destructuring assignment for MSRV --- tokio/src/loom/std/barrier.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tokio/src/loom/std/barrier.rs b/tokio/src/loom/std/barrier.rs index 6481efc161e..a3f0ca0ab6d 100644 --- a/tokio/src/loom/std/barrier.rs +++ b/tokio/src/loom/std/barrier.rs @@ -1,5 +1,5 @@ //! A `Barrier` that provides `wait_timeout`. -//! +//! //! This implementation mirrors that of the Rust standard library. use crate::loom::sync::{Condvar, Mutex}; @@ -170,8 +170,8 @@ impl Barrier { // We need a while loop to guard against spurious wakeups. // https://en.wikipedia.org/wiki/Spurious_wakeup while local_gen == lock.generation_id { - let timeout_result; - (lock, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap(); + let (guard, timeout_result) = self.cvar.wait_timeout(lock, timeout).unwrap(); + lock = guard; if timeout_result.timed_out() { return None; }