Skip to content

Commit

Permalink
stats: initial work on runtime stats (#4043)
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Aug 27, 2021
1 parent 8a097d2 commit 98578a6
Show file tree
Hide file tree
Showing 14 changed files with 276 additions and 31 deletions.
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/os-poll",
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ macro_rules! cfg_macros {
}
}

macro_rules! cfg_stats {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "stats"))]
#[cfg_attr(docsrs, doc(cfg(feature = "stats")))]
$item
)*
}
}

macro_rules! cfg_not_stats {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, feature = "stats")))]
$item
)*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::sync::notify::Notify;
use crate::util::{waker_ref, Wake, WakerRef};
Expand Down Expand Up @@ -47,6 +48,9 @@ struct Inner<P: Park> {

/// Thread park handle
park: P,

/// Stats batcher
stats: WorkerStatsBatcher,
}

#[derive(Clone)]
Expand Down Expand Up @@ -87,6 +91,9 @@ struct Shared {

/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Keeps track of various runtime stats.
stats: RuntimeStats,
}

/// Thread-local context.
Expand Down Expand Up @@ -123,6 +130,7 @@ impl<P: Park> BasicScheduler<P> {
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
stats: RuntimeStats::new(1),
}),
};

Expand All @@ -133,6 +141,7 @@ impl<P: Park> BasicScheduler<P> {
spawner: spawner.clone(),
tick: 0,
park,
stats: WorkerStatsBatcher::new(0),
}));

BasicScheduler {
Expand Down Expand Up @@ -205,6 +214,7 @@ impl<P: Park> Inner<P> {
'outer: loop {
if scheduler.spawner.was_woken() || !polled {
polled = true;
scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
Expand Down Expand Up @@ -238,7 +248,10 @@ impl<P: Park> Inner<P> {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.stats.about_to_park();
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler.park.park().expect("failed to park");
scheduler.stats.returned_from_park();

// Try polling the `block_on` future next
continue 'outer;
Expand All @@ -247,6 +260,7 @@ impl<P: Park> Inner<P> {

match entry {
RemoteMsg::Schedule(task) => {
scheduler.stats.incr_poll_count();
let task = context.shared.owned.assert_owner(task);
crate::coop::budget(|| task.run())
}
Expand All @@ -255,6 +269,7 @@ impl<P: Park> Inner<P> {

// Yield to the park, this drives the timer and pulls any pending
// I/O events.
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler
.park
.park_timeout(Duration::from_millis(0))
Expand Down Expand Up @@ -369,6 +384,10 @@ impl Spawner {
handle
}

pub(crate) fn stats(&self) -> &RuntimeStats {
&self.shared.stats
}

fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ impl Handle {
context::current().ok_or(TryCurrentError(()))
}

cfg_stats! {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats {
self.spawner.stats()
}
}

/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ pub(crate) mod enter;

pub(crate) mod task;

cfg_stats! {
pub mod stats;
}
cfg_not_stats! {
pub(crate) mod stats;
}

cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
use crate::runtime::stats::WorkerStatsBatcher;
use crate::runtime::task::{self, Inject};

use std::mem::MaybeUninit;
Expand Down Expand Up @@ -288,7 +289,11 @@ impl<T> Steal<T> {
}

/// Steals half the tasks from self and place them into `dst`.
pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> {
pub(super) fn steal_into(
&self,
dst: &mut Local<T>,
stats: &mut WorkerStatsBatcher,
) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
Expand All @@ -307,6 +312,7 @@ impl<T> Steal<T> {
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
stats.incr_steal_count(n);

if n == 0 {
// No tasks were stolen
Expand Down
42 changes: 22 additions & 20 deletions tokio/src/runtime/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
cfg_rt! {
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;
}
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::runtime::stats::RuntimeStats;
use crate::task::JoinHandle;

cfg_rt_multi_thread! {
use crate::runtime::thread_pool;
}

#[derive(Debug, Clone)]
pub(crate) enum Spawner {
#[cfg(feature = "rt")]
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-multi-thread")]
ThreadPool(thread_pool::Spawner),
Expand All @@ -25,21 +23,25 @@ impl Spawner {
}
}
}
}

cfg_rt! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
#[cfg(feature = "rt")]
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
}

#[cfg_attr(not(all(tokio_unstable, feature = "stats")), allow(dead_code))]
pub(crate) fn stats(&self) -> &RuntimeStats {
match self {
Spawner::Basic(spawner) => spawner.stats(),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.stats(),
}
}
}
27 changes: 27 additions & 0 deletions tokio/src/runtime/stats/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//! This file contains mocks of the types in src/runtime/stats/stats.rs

pub(crate) struct RuntimeStats {}

impl RuntimeStats {
pub(crate) fn new(_worker_threads: usize) -> Self {
Self {}
}
}

pub(crate) struct WorkerStatsBatcher {}

impl WorkerStatsBatcher {
pub(crate) fn new(_my_index: usize) -> Self {
Self {}
}

pub(crate) fn submit(&mut self, _to: &RuntimeStats) {}

pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn returned_from_park(&mut self) {}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}

pub(crate) fn incr_poll_count(&mut self) {}
}
17 changes: 17 additions & 0 deletions tokio/src/runtime/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! This module contains information need to view information about how the
//! runtime is performing.
#![allow(clippy::module_inception)]

cfg_stats! {
mod stats;

pub use self::stats::{RuntimeStats, WorkerStats};
pub(crate) use self::stats::WorkerStatsBatcher;
}

cfg_not_stats! {
#[path = "mock.rs"]
mod stats;

pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher};
}
97 changes: 97 additions & 0 deletions tokio/src/runtime/stats/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//! This file contains the types necessary to collect various types of stats.
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};

/// This type contains methods to retrieve stats from a Tokio runtime.
#[derive(Debug)]
pub struct RuntimeStats {
workers: Box<[WorkerStats]>,
}

/// This type contains methods to retrieve stats from a worker thread on a Tokio runtime.
#[derive(Debug)]
#[repr(align(128))]
pub struct WorkerStats {
park_count: AtomicU64,
steal_count: AtomicU64,
poll_count: AtomicU64,
}

impl RuntimeStats {
pub(crate) fn new(worker_threads: usize) -> Self {
let mut workers = Vec::with_capacity(worker_threads);
for _ in 0..worker_threads {
workers.push(WorkerStats {
park_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
});
}

Self {
workers: workers.into_boxed_slice(),
}
}

/// Returns a slice containing the worker stats for each worker thread.
pub fn workers(&self) -> impl Iterator<Item = &WorkerStats> {
self.workers.iter()
}
}

impl WorkerStats {
/// Returns the total number of times this worker thread has parked.
pub fn park_count(&self) -> u64 {
self.park_count.load(Relaxed)
}

/// Returns the number of tasks this worker has stolen from other worker
/// threads.
pub fn steal_count(&self) -> u64 {
self.steal_count.load(Relaxed)
}

/// Returns the number of times this worker has polled a task.
pub fn poll_count(&self) -> u64 {
self.poll_count.load(Relaxed)
}
}

pub(crate) struct WorkerStatsBatcher {
my_index: usize,
park_count: u64,
steal_count: u64,
poll_count: u64,
}

impl WorkerStatsBatcher {
pub(crate) fn new(my_index: usize) -> Self {
Self {
my_index,
park_count: 0,
steal_count: 0,
poll_count: 0,
}
}
pub(crate) fn submit(&mut self, to: &RuntimeStats) {
let worker = &to.workers[self.my_index];

worker.park_count.store(self.park_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
}

pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
}

pub(crate) fn returned_from_park(&mut self) {}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += u64::from(by);
}

pub(crate) fn incr_poll_count(&mut self) {
self.poll_count += 1;
}
}
Loading

0 comments on commit 98578a6

Please sign in to comment.