From cd021a20f3fb4f038f93581ca15157dde29ed5fb Mon Sep 17 00:00:00 2001 From: Jack Wrenn Date: Tue, 2 May 2023 16:36:33 +0000 Subject: [PATCH] taskdump: implement task dumps for multi-thread runtime This PR implements task dumps on the multi-thread runtime. It complements #5608, which implemented task dumps on the current-thread runtime. --- examples/dump.rs | 32 ++-- tokio/src/runtime/handle.rs | 31 +++- .../runtime/scheduler/multi_thread/handle.rs | 25 +++ .../runtime/scheduler/multi_thread/worker.rs | 175 +++++++++++++++++- tokio/src/runtime/task/trace/mod.rs | 38 ++++ tokio/tests/dump.rs | 98 ++++++++++ tokio/tests/dump_current_thread.rs | 55 ------ 7 files changed, 379 insertions(+), 75 deletions(-) create mode 100644 tokio/tests/dump.rs delete mode 100644 tokio/tests/dump_current_thread.rs diff --git a/examples/dump.rs b/examples/dump.rs index 159cc603ba0..c7358489933 100644 --- a/examples/dump.rs +++ b/examples/dump.rs @@ -6,7 +6,7 @@ target_os = "linux", any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") ))] -#[tokio::main(flavor = "current_thread")] +#[tokio::main] async fn main() { use std::hint::black_box; @@ -22,21 +22,29 @@ async fn main() { #[inline(never)] async fn c() { - black_box(tokio::task::yield_now()).await + loop { + tokio::task::yield_now().await; + } } - tokio::spawn(a()); - tokio::spawn(b()); - tokio::spawn(c()); + async fn dump() { + let handle = tokio::runtime::Handle::current(); + let dump = handle.dump().await; - let handle = tokio::runtime::Handle::current(); - let dump = handle.dump(); - - for (i, task) in dump.tasks().iter().enumerate() { - let trace = task.trace(); - println!("task {i} trace:"); - println!("{trace}"); + for (i, task) in dump.tasks().iter().enumerate() { + let trace = task.trace(); + println!("task {i} trace:"); + println!("{trace}\n"); + } } + + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(b()) => {}, + _ = tokio::spawn(c()) => {}, + _ = dump() => {}, + ); } #[cfg(not(all( diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index e44711f875c..cbf20f71982 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -341,15 +341,40 @@ cfg_metrics! { cfg_taskdump! { impl Handle { /// Capture a snapshot of this runtime's state. - pub fn dump(&self) -> crate::runtime::Dump { + pub async fn dump(&self) -> crate::runtime::Dump { match &self.inner { scheduler::Handle::CurrentThread(handle) => handle.dump(), #[cfg(all(feature = "rt-multi-thread", not(tokio_wasi)))] - scheduler::Handle::MultiThread(_) => - unimplemented!("taskdumps are unsupported on the multi-thread runtime"), + scheduler::Handle::MultiThread(handle) => { + // perform the trace in a separate thread so that the + // trace itself does not appear in the taskdump. + let handle = handle.clone(); + spawn_thread(async { + let handle = handle; + handle.dump().await + }).await + }, } } } + + cfg_rt_multi_thread! { + /// Spawn a new thread and asynchronously await on its result. + async fn spawn_thread(f: F) -> ::Output + where + F: Future + Send + 'static, + ::Output: Send + 'static + { + let (tx, rx) = crate::sync::oneshot::channel(); + crate::loom::thread::spawn(|| { + let rt = crate::runtime::Builder::new_current_thread().build().unwrap(); + rt.block_on(async { + let _ = tx.send(f.await); + }); + }); + rx.await.unwrap() + } + } } /// Error returned by `try_current` when no Runtime has been started diff --git a/tokio/src/runtime/scheduler/multi_thread/handle.rs b/tokio/src/runtime/scheduler/multi_thread/handle.rs index 77baaeb06d4..cf3cf52af34 100644 --- a/tokio/src/runtime/scheduler/multi_thread/handle.rs +++ b/tokio/src/runtime/scheduler/multi_thread/handle.rs @@ -95,6 +95,31 @@ cfg_metrics! { } } +cfg_taskdump! { + impl Handle { + pub(crate) async fn dump(&self) -> crate::runtime::Dump { + let trace_status = &self.shared.trace_status; + + // If a dump is in progress, block. + trace_status.start_trace_request(&self).await; + + let result = loop { + if let Some(result) = trace_status.take_result() { + break result; + } else { + self.notify_all(); + trace_status.result_ready.notified().await; + } + }; + + // Allow other queued dumps to proceed. + trace_status.end_trace_request(&self).await; + + result + } + } +} + impl fmt::Debug for Handle { fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { fmt.debug_struct("multi_thread::Handle { ... }").finish() diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index e2bbb643db7..6e4a0da45e7 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -70,6 +70,10 @@ use crate::util::rand::{FastRand, RngSeedGenerator}; use std::cell::RefCell; use std::time::Duration; +cfg_taskdump! { + use std::sync::Barrier; +} + /// A scheduler worker pub(super) struct Worker { /// Reference to scheduler's handle @@ -108,6 +112,9 @@ struct Core { /// True if the scheduler is being shutdown is_shutdown: bool, + /// True if the scheduler is being traced + is_traced: bool, + /// Parker /// /// Stored in an `Option` as the parker is added / removed to make the @@ -130,7 +137,7 @@ pub(super) struct Shared { /// Global task queue used for: /// 1. Submit work to the scheduler while **not** currently on a worker thread. /// 2. Submit work to the scheduler when a worker run queue is saturated - inject: Inject>, + pub(super) inject: Inject>, /// Coordinates idle workers idle: Idle, @@ -145,6 +152,9 @@ pub(super) struct Shared { #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex>>, + /// The number of cores that have observed the trace signal. + pub(super) trace_status: TraceStatus, + /// Scheduler configuration options config: Config, @@ -163,7 +173,7 @@ pub(super) struct Shared { /// Used to communicate with a worker from other threads. struct Remote { /// Steals tasks from this worker. - steal: queue::Steal>, + pub(super) steal: queue::Steal>, /// Unparks the associated worker thread unpark: Unparker, @@ -181,6 +191,81 @@ struct Context { /// Starts the workers pub(crate) struct Launch(Vec>); +cfg_not_taskdump! { + pub(super) struct TraceStatus {} + + impl TraceStatus { + fn new(_: usize) -> Self { + Self {} + } + + fn trace_requested(&self) -> bool { + false + } + } +} + +cfg_taskdump! { + use crate::runtime::dump::Dump; + use crate::loom::sync::atomic::{AtomicBool, Ordering}; + + /// Tracing status of the worker. + pub(super) struct TraceStatus { + pub(super) trace_requested: AtomicBool, + trace_start: Barrier, + trace_end: Barrier, + pub(super) result_ready: crate::sync::Notify, + pub(super) trace_result: Mutex>, + } + + impl TraceStatus { + fn new(remotes_len: usize) -> Self { + Self { + trace_requested: AtomicBool::new(false), + trace_start: Barrier::new(remotes_len), + trace_end: Barrier::new(remotes_len), + result_ready: crate::sync::Notify::new(), + trace_result: Mutex::new(None), + } + } + + fn trace_requested(&self) -> bool { + self.trace_requested.load(Ordering::Relaxed) + } + + pub(super) async fn start_trace_request(&self, handle: &Handle) { + while self.trace_requested.compare_exchange(false, + true, + Ordering::Acquire, + Ordering::Relaxed).is_err() + { + handle.notify_all(); + crate::task::yield_now().await; + } + } + + fn stash_result(&self, dump: Dump) { + let _ = self.trace_result.lock().insert(dump); + self.result_ready.notify_one(); + } + + pub(super) fn take_result(&self) -> Option { + self.trace_result.lock().take() + } + + pub(super) async fn end_trace_request(&self, handle: &Handle) { + while self.trace_requested.compare_exchange(true, + false, + Ordering::Acquire, + Ordering::Relaxed).is_err() + { + handle.notify_all(); + crate::task::yield_now().await; + } + } + } +} + /// Running a task may consume the core. If the core is still available when /// running the task completes, it is returned. Otherwise, the worker will need /// to stop processing. @@ -228,6 +313,7 @@ pub(super) fn create( run_queue, is_searching: false, is_shutdown: false, + is_traced: false, park: Some(park), metrics: MetricsBatch::new(&metrics), rand: FastRand::new(config.seed_generator.next_seed()), @@ -237,6 +323,7 @@ pub(super) fn create( worker_metrics.push(metrics); } + let remotes_len = remotes.len(); let handle = Arc::new(Handle { shared: Shared { remotes: remotes.into_boxed_slice(), @@ -244,6 +331,7 @@ pub(super) fn create( idle: Idle::new(size), owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), + trace_status: TraceStatus::new(remotes_len), config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), @@ -440,6 +528,10 @@ impl Context { while !core.is_shutdown { self.assert_lifo_enabled_is_correct(&core); + if core.is_traced { + core = self.worker.handle.trace_core(core); + } + // Increment the tick core.tick(); @@ -604,7 +696,7 @@ impl Context { } if core.transition_to_parked(&self.worker) { - while !core.is_shutdown { + while !core.is_shutdown && !core.is_traced { core.metrics.about_to_park(); core = self.park_timeout(core, None); core.metrics.returned_from_park(); @@ -765,7 +857,7 @@ impl Core { /// Returns true if the transition happened, false if there is work to do first. fn transition_to_parked(&mut self, worker: &Worker) -> bool { // Workers should not park if they have work to do - if self.lifo_slot.is_some() || self.run_queue.has_tasks() { + if self.lifo_slot.is_some() || self.run_queue.has_tasks() || self.is_traced { return false; } @@ -820,6 +912,11 @@ impl Core { // Check if the scheduler has been shutdown self.is_shutdown = worker.inject().is_closed(); } + + if !self.is_traced { + // Check if the worker should be tracing. + self.is_traced = worker.handle.shared.trace_status.trace_requested(); + } } /// Signals all tasks to shut down, and waits for them to complete. Must run @@ -942,7 +1039,7 @@ impl Handle { } } - fn notify_all(&self) { + pub(super) fn notify_all(&self) { for remote in &self.shared.remotes[..] { remote.unpark.unpark(&self.driver); } @@ -995,6 +1092,51 @@ impl Handle { } } + cfg_not_taskdump! { + fn trace_core(&self, core: Box) -> Box { + core + } + } + + cfg_taskdump! { + fn trace_core(&self, mut core: Box) -> Box { + use crate::runtime::dump; + use task::trace::trace_multi_thread; + + core.is_traced = false; + + // wait for other workers + let barrier = self.shared.trace_status.trace_start.wait(); + + if !barrier.is_leader() { + // wait for leader to finish tracing + self.shared.trace_status.trace_end.wait(); + return core; + } + + // trace + + let owned = &self.shared.owned; + let mut local = self.shared.steal_all(); + let injection = &self.shared.inject; + + let traces = trace_multi_thread(owned, &mut local, injection) + .into_iter() + .map(dump::Task::new) + .collect(); + + let result = dump::Dump::new(traces); + + // stash the result + self.shared.trace_status.stash_result(result); + + // allow other workers to proceed + self.shared.trace_status.trace_end.wait(); + + core + } + } + fn ptr_eq(&self, other: &Handle) -> bool { std::ptr::eq(self, other) } @@ -1019,3 +1161,26 @@ cfg_metrics! { } } } + +cfg_taskdump! { + impl Shared { + /// Steal all tasks from remotes into a single local queue. + pub(super) fn steal_all(&self) -> super::queue::Local> { + let (_steal, mut local) = super::queue::local(); + + let worker_metrics = WorkerMetrics::new(); + let mut metrics = MetricsBatch::new(&worker_metrics); + + for remote in self.remotes.iter() { + let steal = &remote.steal; + while !steal.is_empty() { + if let Some(task) = steal.steal_into(&mut local, &mut metrics) { + local.push_back([task].into_iter()); + } + } + } + + local + } + } +} diff --git a/tokio/src/runtime/task/trace/mod.rs b/tokio/src/runtime/task/trace/mod.rs index fb1909c3574..75d0e9511d2 100644 --- a/tokio/src/runtime/task/trace/mod.rs +++ b/tokio/src/runtime/task/trace/mod.rs @@ -263,3 +263,41 @@ pub(in crate::runtime) fn trace_current_thread( }) .collect() } + +cfg_rt_multi_thread! { + use crate::runtime::scheduler::multi_thread; + + /// Trace and poll all tasks of the current_thread runtime. + pub(in crate::runtime) fn trace_multi_thread( + owned: &OwnedTasks>, + local: &mut multi_thread::queue::Local>, + injection: &super::Inject>, + ) -> Vec { + // clear the local queue + while let Some(notified) = local.pop() { + drop(notified); + } + + // clear the injection queue + while let Some(notified) = injection.pop() { + drop(notified); + } + + // notify each task + let mut traces = vec![]; + owned.for_each(|task| { + // set the notified bit + task.as_raw().state().transition_to_notified_for_tracing(); + + // trace the task + let ((), trace) = Trace::capture(|| task.as_raw().poll()); + traces.push(trace); + + // reschedule the task + let _ = task.as_raw().state().transition_to_notified_by_ref(); + task.as_raw().schedule(); + }); + + traces + } +} diff --git a/tokio/tests/dump.rs b/tokio/tests/dump.rs new file mode 100644 index 00000000000..79051d335bb --- /dev/null +++ b/tokio/tests/dump.rs @@ -0,0 +1,98 @@ +#![cfg(all( + tokio_unstable, + tokio_taskdump, + target_os = "linux", + any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") +))] + +use std::hint::black_box; +use tokio::runtime::{self, Handle}; + +#[inline(never)] +async fn a() { + black_box(b()).await +} + +#[inline(never)] +async fn b() { + black_box(c()).await +} + +#[inline(never)] +async fn c() { + loop { + black_box(tokio::task::yield_now()).await + } +} + +#[test] +fn current_thread() { + let rt = runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let dump = handle.dump().await; + + let tasks: Vec<_> = dump.tasks().iter().collect(); + + assert_eq!(tasks.len(), 3); + + for task in tasks { + let trace = task.trace().to_string(); + eprintln!("\n\n{trace}\n\n"); + assert!(trace.contains("dump::a")); + assert!(trace.contains("dump::b")); + assert!(trace.contains("dump::c")); + assert!(trace.contains("tokio::task::yield_now")); + } + } + + rt.block_on(async { + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = dump() => {}, + ); + }); +} + +#[test] +fn multi_thread() { + let rt = runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + async fn dump() { + let handle = Handle::current(); + let dump = handle.dump().await; + + let tasks: Vec<_> = dump.tasks().iter().collect(); + + assert_eq!(tasks.len(), 3); + + for task in tasks { + let trace = task.trace().to_string(); + eprintln!("\n\n{trace}\n\n"); + assert!(trace.contains("dump::a")); + assert!(trace.contains("dump::b")); + assert!(trace.contains("dump::c")); + assert!(trace.contains("tokio::task::yield_now")); + } + } + + rt.block_on(async { + tokio::select!( + biased; + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = tokio::spawn(a()) => {}, + _ = dump() => {}, + ); + }); +} diff --git a/tokio/tests/dump_current_thread.rs b/tokio/tests/dump_current_thread.rs deleted file mode 100644 index 29661f98fb7..00000000000 --- a/tokio/tests/dump_current_thread.rs +++ /dev/null @@ -1,55 +0,0 @@ -#![cfg(all( - tokio_unstable, - tokio_taskdump, - target_os = "linux", - any(target_arch = "aarch64", target_arch = "x86", target_arch = "x86_64") -))] - -use std::hint::black_box; -use tokio::runtime; - -#[inline(never)] -async fn a() { - black_box(b()).await -} - -#[inline(never)] -async fn b() { - black_box(c()).await -} - -#[inline(never)] -async fn c() { - black_box(tokio::task::yield_now()).await -} - -#[test] -fn test() { - let rt = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - rt.spawn(a()); - - let handle = rt.handle(); - - assert_eq!(handle.dump().tasks().iter().count(), 0); - - let dump = rt.block_on(async { - handle.spawn(a()); - handle.dump() - }); - - let tasks: Vec<_> = dump.tasks().iter().collect(); - - assert_eq!(tasks.len(), 2); - - for task in tasks { - let trace = task.trace().to_string(); - assert!(trace.contains("dump_current_thread::a")); - assert!(trace.contains("dump_current_thread::b")); - assert!(trace.contains("dump_current_thread::c")); - assert!(trace.contains("tokio::task::yield_now")); - } -}