diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 90455fb9e24..988315f1989 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"] # stdin, stdout, stderr io-std = [] macros = ["tokio-macros"] +stats = [] net = [ "libc", "mio/os-poll", diff --git a/tokio/src/macros/cfg.rs b/tokio/src/macros/cfg.rs index 8318fd12aba..510d8a6292e 100644 --- a/tokio/src/macros/cfg.rs +++ b/tokio/src/macros/cfg.rs @@ -162,6 +162,25 @@ macro_rules! cfg_macros { } } +macro_rules! cfg_stats { + ($($item:item)*) => { + $( + #[cfg(all(tokio_unstable, feature = "stats"))] + #[cfg_attr(docsrs, doc(cfg(feature = "stats")))] + $item + )* + } +} + +macro_rules! cfg_not_stats { + ($($item:item)*) => { + $( + #[cfg(not(all(tokio_unstable, feature = "stats")))] + $item + )* + } +} + macro_rules! cfg_net { ($($item:item)*) => { $( diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index fe2e4a8c691..117c7f747ae 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -2,6 +2,7 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::Mutex; use crate::park::{Park, Unpark}; +use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::sync::notify::Notify; use crate::util::{waker_ref, Wake, WakerRef}; @@ -47,6 +48,9 @@ struct Inner { /// Thread park handle park: P, + + /// Stats batcher + stats: WorkerStatsBatcher, } #[derive(Clone)] @@ -87,6 +91,9 @@ struct Shared { /// Indicates whether the blocked on thread was woken. woken: AtomicBool, + + /// Keeps track of various runtime stats. + stats: RuntimeStats, } /// Thread-local context. @@ -123,6 +130,7 @@ impl BasicScheduler

{ owned: OwnedTasks::new(), unpark: unpark as Box, woken: AtomicBool::new(false), + stats: RuntimeStats::new(1), }), }; @@ -133,6 +141,7 @@ impl BasicScheduler

{ spawner: spawner.clone(), tick: 0, park, + stats: WorkerStatsBatcher::new(0), })); BasicScheduler { @@ -205,6 +214,7 @@ impl Inner

{ 'outer: loop { if scheduler.spawner.was_woken() || !polled { polled = true; + scheduler.stats.incr_poll_count(); if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) { return v; } @@ -238,7 +248,10 @@ impl Inner

{ Some(entry) => entry, None => { // Park until the thread is signaled + scheduler.stats.about_to_park(); + scheduler.stats.submit(&scheduler.spawner.shared.stats); scheduler.park.park().expect("failed to park"); + scheduler.stats.returned_from_park(); // Try polling the `block_on` future next continue 'outer; @@ -247,6 +260,7 @@ impl Inner

{ match entry { RemoteMsg::Schedule(task) => { + scheduler.stats.incr_poll_count(); let task = context.shared.owned.assert_owner(task); crate::coop::budget(|| task.run()) } @@ -255,6 +269,7 @@ impl Inner

{ // Yield to the park, this drives the timer and pulls any pending // I/O events. + scheduler.stats.submit(&scheduler.spawner.shared.stats); scheduler .park .park_timeout(Duration::from_millis(0)) @@ -369,6 +384,10 @@ impl Spawner { handle } + pub(crate) fn stats(&self) -> &RuntimeStats { + &self.shared.stats + } + fn pop(&self) -> Option { match self.shared.queue.lock().as_mut() { Some(queue) => queue.pop_front(), diff --git a/tokio/src/runtime/handle.rs b/tokio/src/runtime/handle.rs index 46072f381d1..1ae83cb2560 100644 --- a/tokio/src/runtime/handle.rs +++ b/tokio/src/runtime/handle.rs @@ -111,6 +111,14 @@ impl Handle { context::current().ok_or(TryCurrentError(())) } + cfg_stats! { + /// Returns a view that lets you get information about how the runtime + /// is performing. + pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats { + self.spawner.stats() + } + } + /// Spawn a future onto the Tokio runtime. /// /// This spawns the given future onto the runtime's executor, usually a diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index 52532ec6f12..c01c6c8d886 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -181,6 +181,13 @@ pub(crate) mod enter; pub(crate) mod task; +cfg_stats! { + pub mod stats; +} +cfg_not_stats! { + pub(crate) mod stats; +} + cfg_rt! { mod basic_scheduler; use basic_scheduler::BasicScheduler; diff --git a/tokio/src/runtime/queue.rs b/tokio/src/runtime/queue.rs index c45cb6a5a3d..c0e327c302e 100644 --- a/tokio/src/runtime/queue.rs +++ b/tokio/src/runtime/queue.rs @@ -3,6 +3,7 @@ use crate::loom::cell::UnsafeCell; use crate::loom::sync::atomic::{AtomicU16, AtomicU32}; use crate::loom::sync::Arc; +use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::{self, Inject}; use std::mem::MaybeUninit; @@ -288,7 +289,11 @@ impl Steal { } /// Steals half the tasks from self and place them into `dst`. - pub(super) fn steal_into(&self, dst: &mut Local) -> Option> { + pub(super) fn steal_into( + &self, + dst: &mut Local, + stats: &mut WorkerStatsBatcher, + ) -> Option> { // Safety: the caller is the only thread that mutates `dst.tail` and // holds a mutable reference. let dst_tail = unsafe { dst.inner.tail.unsync_load() }; @@ -307,6 +312,7 @@ impl Steal { // Steal the tasks into `dst`'s buffer. This does not yet expose the // tasks in `dst`. let mut n = self.steal_into2(dst, dst_tail); + stats.incr_steal_count(n); if n == 0 { // No tasks were stolen diff --git a/tokio/src/runtime/spawner.rs b/tokio/src/runtime/spawner.rs index fbcde2cfaf5..9a3d465aef9 100644 --- a/tokio/src/runtime/spawner.rs +++ b/tokio/src/runtime/spawner.rs @@ -1,8 +1,7 @@ -cfg_rt! { - use crate::future::Future; - use crate::runtime::basic_scheduler; - use crate::task::JoinHandle; -} +use crate::future::Future; +use crate::runtime::basic_scheduler; +use crate::runtime::stats::RuntimeStats; +use crate::task::JoinHandle; cfg_rt_multi_thread! { use crate::runtime::thread_pool; @@ -10,7 +9,6 @@ cfg_rt_multi_thread! { #[derive(Debug, Clone)] pub(crate) enum Spawner { - #[cfg(feature = "rt")] Basic(basic_scheduler::Spawner), #[cfg(feature = "rt-multi-thread")] ThreadPool(thread_pool::Spawner), @@ -25,21 +23,25 @@ impl Spawner { } } } -} -cfg_rt! { - impl Spawner { - pub(crate) fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - F::Output: Send + 'static, - { - match self { - #[cfg(feature = "rt")] - Spawner::Basic(spawner) => spawner.spawn(future), - #[cfg(feature = "rt-multi-thread")] - Spawner::ThreadPool(spawner) => spawner.spawn(future), - } + pub(crate) fn spawn(&self, future: F) -> JoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + match self { + Spawner::Basic(spawner) => spawner.spawn(future), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.spawn(future), + } + } + + #[cfg_attr(not(all(tokio_unstable, feature = "stats")), allow(dead_code))] + pub(crate) fn stats(&self) -> &RuntimeStats { + match self { + Spawner::Basic(spawner) => spawner.stats(), + #[cfg(feature = "rt-multi-thread")] + Spawner::ThreadPool(spawner) => spawner.stats(), } } } diff --git a/tokio/src/runtime/stats/mock.rs b/tokio/src/runtime/stats/mock.rs new file mode 100644 index 00000000000..3bda8bffde6 --- /dev/null +++ b/tokio/src/runtime/stats/mock.rs @@ -0,0 +1,27 @@ +//! This file contains mocks of the types in src/runtime/stats/stats.rs + +pub(crate) struct RuntimeStats {} + +impl RuntimeStats { + pub(crate) fn new(_worker_threads: usize) -> Self { + Self {} + } +} + +pub(crate) struct WorkerStatsBatcher {} + +impl WorkerStatsBatcher { + pub(crate) fn new(_my_index: usize) -> Self { + Self {} + } + + pub(crate) fn submit(&mut self, _to: &RuntimeStats) {} + + pub(crate) fn about_to_park(&mut self) {} + pub(crate) fn returned_from_park(&mut self) {} + + #[cfg(feature = "rt-multi-thread")] + pub(crate) fn incr_steal_count(&mut self, _by: u16) {} + + pub(crate) fn incr_poll_count(&mut self) {} +} diff --git a/tokio/src/runtime/stats/mod.rs b/tokio/src/runtime/stats/mod.rs new file mode 100644 index 00000000000..5e08e8ec4d9 --- /dev/null +++ b/tokio/src/runtime/stats/mod.rs @@ -0,0 +1,17 @@ +//! This module contains information need to view information about how the +//! runtime is performing. +#![allow(clippy::module_inception)] + +cfg_stats! { + mod stats; + + pub use self::stats::{RuntimeStats, WorkerStats}; + pub(crate) use self::stats::WorkerStatsBatcher; +} + +cfg_not_stats! { + #[path = "mock.rs"] + mod stats; + + pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher}; +} diff --git a/tokio/src/runtime/stats/stats.rs b/tokio/src/runtime/stats/stats.rs new file mode 100644 index 00000000000..39a48aee6a8 --- /dev/null +++ b/tokio/src/runtime/stats/stats.rs @@ -0,0 +1,97 @@ +//! This file contains the types necessary to collect various types of stats. +use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed}; + +/// This type contains methods to retrieve stats from a Tokio runtime. +#[derive(Debug)] +pub struct RuntimeStats { + workers: Box<[WorkerStats]>, +} + +/// This type contains methods to retrieve stats from a worker thread on a Tokio runtime. +#[derive(Debug)] +#[repr(align(128))] +pub struct WorkerStats { + park_count: AtomicU64, + steal_count: AtomicU64, + poll_count: AtomicU64, +} + +impl RuntimeStats { + pub(crate) fn new(worker_threads: usize) -> Self { + let mut workers = Vec::with_capacity(worker_threads); + for _ in 0..worker_threads { + workers.push(WorkerStats { + park_count: AtomicU64::new(0), + steal_count: AtomicU64::new(0), + poll_count: AtomicU64::new(0), + }); + } + + Self { + workers: workers.into_boxed_slice(), + } + } + + /// Returns a slice containing the worker stats for each worker thread. + pub fn workers(&self) -> impl Iterator { + self.workers.iter() + } +} + +impl WorkerStats { + /// Returns the total number of times this worker thread has parked. + pub fn park_count(&self) -> u64 { + self.park_count.load(Relaxed) + } + + /// Returns the number of tasks this worker has stolen from other worker + /// threads. + pub fn steal_count(&self) -> u64 { + self.steal_count.load(Relaxed) + } + + /// Returns the number of times this worker has polled a task. + pub fn poll_count(&self) -> u64 { + self.poll_count.load(Relaxed) + } +} + +pub(crate) struct WorkerStatsBatcher { + my_index: usize, + park_count: u64, + steal_count: u64, + poll_count: u64, +} + +impl WorkerStatsBatcher { + pub(crate) fn new(my_index: usize) -> Self { + Self { + my_index, + park_count: 0, + steal_count: 0, + poll_count: 0, + } + } + pub(crate) fn submit(&mut self, to: &RuntimeStats) { + let worker = &to.workers[self.my_index]; + + worker.park_count.store(self.park_count, Relaxed); + worker.steal_count.store(self.steal_count, Relaxed); + worker.poll_count.store(self.poll_count, Relaxed); + } + + pub(crate) fn about_to_park(&mut self) { + self.park_count += 1; + } + + pub(crate) fn returned_from_park(&mut self) {} + + #[cfg(feature = "rt-multi-thread")] + pub(crate) fn incr_steal_count(&mut self, by: u16) { + self.steal_count += u64::from(by); + } + + pub(crate) fn incr_poll_count(&mut self) { + self.poll_count += 1; + } +} diff --git a/tokio/src/runtime/tests/loom_queue.rs b/tokio/src/runtime/tests/loom_queue.rs index a1ed1717b90..2cbb0a18b53 100644 --- a/tokio/src/runtime/tests/loom_queue.rs +++ b/tokio/src/runtime/tests/loom_queue.rs @@ -1,5 +1,6 @@ use crate::runtime::blocking::NoopSchedule; use crate::runtime::queue; +use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::Inject; use loom::thread; @@ -11,11 +12,12 @@ fn basic() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..3 { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -65,10 +67,11 @@ fn steal_overflow() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -113,9 +116,10 @@ fn multi_stealer() { const NUM_TASKS: usize = 5; fn steal_tasks(steal: queue::Steal) -> usize { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); - if steal.steal_into(&mut local).is_none() { + if steal.steal_into(&mut local, &mut stats).is_none() { return 0; } @@ -165,6 +169,7 @@ fn multi_stealer() { #[test] fn chained_steal() { loom::model(|| { + let mut stats = WorkerStatsBatcher::new(0); let (s1, mut l1) = queue::local(); let (s2, mut l2) = queue::local(); let inject = Inject::new(); @@ -180,8 +185,9 @@ fn chained_steal() { // Spawn a task to steal from **our** queue let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); - s1.steal_into(&mut local); + s1.steal_into(&mut local, &mut stats); while local.pop().is_some() {} }); @@ -189,7 +195,7 @@ fn chained_steal() { // Drain our tasks, then attempt to steal while l1.pop().is_some() {} - s2.steal_into(&mut l1); + s2.steal_into(&mut l1, &mut stats); th.join().unwrap(); diff --git a/tokio/src/runtime/tests/queue.rs b/tokio/src/runtime/tests/queue.rs index 428b002071a..47f1b01d6a6 100644 --- a/tokio/src/runtime/tests/queue.rs +++ b/tokio/src/runtime/tests/queue.rs @@ -1,4 +1,5 @@ use crate::runtime::queue; +use crate::runtime::stats::WorkerStatsBatcher; use crate::runtime::task::{self, Inject, Schedule, Task}; use std::thread; @@ -44,6 +45,8 @@ fn overflow() { #[test] fn steal_batch() { + let mut stats = WorkerStatsBatcher::new(0); + let (steal1, mut local1) = queue::local(); let (_, mut local2) = queue::local(); let inject = Inject::new(); @@ -53,7 +56,7 @@ fn steal_batch() { local1.push_back(task, &inject); } - assert!(steal1.steal_into(&mut local2).is_some()); + assert!(steal1.steal_into(&mut local2, &mut stats).is_some()); for _ in 0..1 { assert!(local2.pop().is_some()); @@ -81,11 +84,12 @@ fn stress1() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } @@ -137,11 +141,12 @@ fn stress2() { let inject = Inject::new(); let th = thread::spawn(move || { + let mut stats = WorkerStatsBatcher::new(0); let (_, mut local) = queue::local(); let mut n = 0; for _ in 0..NUM_STEAL { - if steal.steal_into(&mut local).is_some() { + if steal.steal_into(&mut local, &mut stats).is_some() { n += 1; } diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 3808aa26465..fdc9ab82bc7 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -12,6 +12,7 @@ pub(crate) use worker::Launch; pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; +use crate::runtime::stats::RuntimeStats; use crate::runtime::task::JoinHandle; use crate::runtime::Parker; @@ -99,6 +100,10 @@ impl Spawner { pub(crate) fn shutdown(&mut self) { self.shared.close(); } + + pub(crate) fn stats(&self) -> &RuntimeStats { + self.shared.stats() + } } impl fmt::Debug for Spawner { diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index f5004c0e406..4ea54f64de3 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -64,6 +64,7 @@ use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::park::{Parker, Unparker}; +use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher}; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::{AtomicCell, Idle}; use crate::runtime::{queue, task}; @@ -112,6 +113,9 @@ struct Core { /// borrow checker happy. park: Option, + /// Batching stats so they can be submitted to RuntimeStats. + stats: WorkerStatsBatcher, + /// Fast random number generator. rand: FastRand, } @@ -137,6 +141,9 @@ pub(super) struct Shared { /// stolen by a thread that was spawned as part of `block_in_place`. #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex>>, + + /// Collect stats from the runtime. + stats: RuntimeStats, } /// Used to communicate with a worker from other threads. @@ -179,7 +186,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { let mut remotes = vec![]; // Create the local queues - for _ in 0..size { + for i in 0..size { let (steal, run_queue) = queue::local(); let park = park.clone(); @@ -192,6 +199,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { is_searching: false, is_shutdown: false, park: Some(park), + stats: WorkerStatsBatcher::new(i), rand: FastRand::new(seed()), })); @@ -204,6 +212,7 @@ pub(super) fn create(size: usize, park: Parker) -> (Arc, Launch) { idle: Idle::new(size), owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), + stats: RuntimeStats::new(size), }); let mut launch = Launch(vec![]); @@ -391,6 +400,7 @@ impl Context { core.transition_from_searching(&self.worker); // Make the core available to the runtime context + core.stats.incr_poll_count(); *self.core.borrow_mut() = Some(core); // Run the task @@ -415,6 +425,7 @@ impl Context { if coop::has_budget_remaining() { // Run the LIFO task, then loop + core.stats.incr_poll_count(); *self.core.borrow_mut() = Some(core); let task = self.worker.shared.owned.assert_owner(task); task.run(); @@ -462,6 +473,8 @@ impl Context { // Take the parker out of core let mut park = core.park.take().expect("park missing"); + core.stats.about_to_park(); + // Store `core` in context *self.core.borrow_mut() = Some(core); @@ -483,6 +496,8 @@ impl Context { self.worker.shared.notify_parked(); } + core.stats.returned_from_park(); + core } } @@ -524,7 +539,10 @@ impl Core { } let target = &worker.shared.remotes[i]; - if let Some(task) = target.steal.steal_into(&mut self.run_queue) { + if let Some(task) = target + .steal + .steal_into(&mut self.run_queue, &mut self.stats) + { return Some(task); } } @@ -590,6 +608,8 @@ impl Core { /// Runs maintenance work such as checking the pool's state. fn maintenance(&mut self, worker: &Worker) { + self.stats.submit(&worker.shared.stats); + if !self.is_shutdown { // Check if the scheduler has been shutdown self.is_shutdown = worker.inject().is_closed(); @@ -601,6 +621,8 @@ impl Core { fn pre_shutdown(&mut self, worker: &Worker) { // Signal to all tasks to shut down. worker.shared.owned.close_and_shutdown_all(); + + self.stats.submit(&worker.shared.stats); } /// Shutdown the core @@ -651,6 +673,10 @@ impl Shared { handle } + pub(crate) fn stats(&self) -> &RuntimeStats { + &self.stats + } + pub(super) fn schedule(&self, task: Notified, is_yield: bool) { CURRENT.with(|maybe_cx| { if let Some(cx) = maybe_cx {