Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on runtime stats #4043

Merged
merged 19 commits into from
Aug 27, 2021
Merged
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
stats = []
net = [
"libc",
"mio/os-poll",
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ macro_rules! cfg_macros {
}
}

macro_rules! cfg_stats {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "stats"))]
#[cfg_attr(docsrs, doc(cfg(feature = "stats")))]
$item
)*
}
}

macro_rules! cfg_not_stats {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, feature = "stats")))]
$item
)*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::sync::notify::Notify;
use crate::util::{waker_ref, Wake, WakerRef};
Expand Down Expand Up @@ -47,6 +48,9 @@ struct Inner<P: Park> {

/// Thread park handle
park: P,

/// Stats batcher
stats: WorkerStatsBatcher,
}

#[derive(Clone)]
Expand Down Expand Up @@ -87,6 +91,9 @@ struct Shared {

/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Keeps track of various runtime stats.
stats: RuntimeStats,
}

/// Thread-local context.
Expand Down Expand Up @@ -123,6 +130,7 @@ impl<P: Park> BasicScheduler<P> {
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
stats: RuntimeStats::new(1),
}),
};

Expand All @@ -133,6 +141,7 @@ impl<P: Park> BasicScheduler<P> {
spawner: spawner.clone(),
tick: 0,
park,
stats: WorkerStatsBatcher::new(0),
}));

BasicScheduler {
Expand Down Expand Up @@ -205,6 +214,7 @@ impl<P: Park> Inner<P> {
'outer: loop {
if scheduler.spawner.was_woken() || !polled {
polled = true;
scheduler.stats.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
Expand Down Expand Up @@ -238,7 +248,10 @@ impl<P: Park> Inner<P> {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.stats.about_to_park();
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler.park.park().expect("failed to park");
scheduler.stats.returned_from_park();

// Try polling the `block_on` future next
continue 'outer;
Expand All @@ -247,6 +260,7 @@ impl<P: Park> Inner<P> {

match entry {
RemoteMsg::Schedule(task) => {
scheduler.stats.incr_poll_count();
let task = context.shared.owned.assert_owner(task);
crate::coop::budget(|| task.run())
}
Expand All @@ -255,6 +269,7 @@ impl<P: Park> Inner<P> {

// Yield to the park, this drives the timer and pulls any pending
// I/O events.
scheduler.stats.submit(&scheduler.spawner.shared.stats);
scheduler
.park
.park_timeout(Duration::from_millis(0))
Expand Down Expand Up @@ -369,6 +384,10 @@ impl Spawner {
handle
}

pub(crate) fn stats(&self) -> &RuntimeStats {
&self.shared.stats
}

fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ impl Handle {
context::current().ok_or(TryCurrentError(()))
}

cfg_stats! {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn stats(&self) -> &crate::runtime::stats::RuntimeStats {
self.spawner.stats()
}
}

/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,13 @@ pub(crate) mod enter;

pub(crate) mod task;

cfg_stats! {
pub mod stats;
}
cfg_not_stats! {
pub(crate) mod stats;
}

cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;
Expand Down
8 changes: 7 additions & 1 deletion tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
use crate::runtime::stats::WorkerStatsBatcher;
use crate::runtime::task::{self, Inject};

use std::mem::MaybeUninit;
Expand Down Expand Up @@ -288,7 +289,11 @@ impl<T> Steal<T> {
}

/// Steals half the tasks from self and place them into `dst`.
pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> {
pub(super) fn steal_into(
&self,
dst: &mut Local<T>,
stats: &mut WorkerStatsBatcher,
) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
Expand All @@ -307,6 +312,7 @@ impl<T> Steal<T> {
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
stats.incr_steal_count(n);

if n == 0 {
// No tasks were stolen
Expand Down
42 changes: 22 additions & 20 deletions tokio/src/runtime/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
cfg_rt! {
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;
}
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::runtime::stats::RuntimeStats;
use crate::task::JoinHandle;

cfg_rt_multi_thread! {
use crate::runtime::thread_pool;
}

#[derive(Debug, Clone)]
pub(crate) enum Spawner {
#[cfg(feature = "rt")]
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-multi-thread")]
ThreadPool(thread_pool::Spawner),
Expand All @@ -25,21 +23,25 @@ impl Spawner {
}
}
}
}

cfg_rt! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
#[cfg(feature = "rt")]
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
}

#[cfg_attr(not(all(tokio_unstable, feature = "stats")), allow(dead_code))]
pub(crate) fn stats(&self) -> &RuntimeStats {
match self {
Spawner::Basic(spawner) => spawner.stats(),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.stats(),
}
}
}
27 changes: 27 additions & 0 deletions tokio/src/runtime/stats/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
//! This file contains mocks of the types in src/runtime/stats/stats.rs

pub(crate) struct RuntimeStats {}

impl RuntimeStats {
pub(crate) fn new(_worker_threads: usize) -> Self {
Self {}
}
}

pub(crate) struct WorkerStatsBatcher {}

impl WorkerStatsBatcher {
pub(crate) fn new(_my_index: usize) -> Self {
Self {}
}

pub(crate) fn submit(&mut self, _to: &RuntimeStats) {}

pub(crate) fn about_to_park(&mut self) {}
pub(crate) fn returned_from_park(&mut self) {}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, _by: u16) {}

pub(crate) fn incr_poll_count(&mut self) {}
}
17 changes: 17 additions & 0 deletions tokio/src/runtime/stats/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
//! This module contains information need to view information about how the
//! runtime is performing.
#![allow(clippy::module_inception)]

cfg_stats! {
mod stats;

pub use self::stats::{RuntimeStats, WorkerStats};
pub(crate) use self::stats::WorkerStatsBatcher;
}

cfg_not_stats! {
#[path = "mock.rs"]
mod stats;

pub(crate) use self::stats::{RuntimeStats, WorkerStatsBatcher};
}
97 changes: 97 additions & 0 deletions tokio/src/runtime/stats/stats.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
//! This file contains the types necessary to collect various types of stats.
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};

/// This type contains methods to retrieve stats from a Tokio runtime.
#[derive(Debug)]
pub struct RuntimeStats {
workers: Box<[WorkerStats]>,
}

/// This type contains methods to retrieve stats from a worker thread on a Tokio runtime.
#[derive(Debug)]
#[repr(align(128))]
pub struct WorkerStats {
park_count: AtomicU64,
steal_count: AtomicU64,
poll_count: AtomicU64,
}

impl RuntimeStats {
pub(crate) fn new(worker_threads: usize) -> Self {
let mut workers = Vec::with_capacity(worker_threads);
for _ in 0..worker_threads {
workers.push(WorkerStats {
park_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
});
}

Self {
workers: workers.into_boxed_slice(),
}
}

/// Returns a slice containing the worker stats for each worker thread.
pub fn workers(&self) -> impl Iterator<Item = &WorkerStats> {
self.workers.iter()
}
}

impl WorkerStats {
/// Returns the total number of times this worker thread has parked.
pub fn park_count(&self) -> u64 {
self.park_count.load(Relaxed)
}

/// Returns the number of tasks this worker has stolen from other worker
/// threads.
pub fn steal_count(&self) -> u64 {
self.steal_count.load(Relaxed)
}

/// Returns the number of times this worker has polled a task.
pub fn poll_count(&self) -> u64 {
self.poll_count.load(Relaxed)
}
}

pub(crate) struct WorkerStatsBatcher {
my_index: usize,
park_count: u64,
steal_count: u64,
poll_count: u64,
}

impl WorkerStatsBatcher {
pub(crate) fn new(my_index: usize) -> Self {
Self {
my_index,
park_count: 0,
steal_count: 0,
poll_count: 0,
}
}
pub(crate) fn submit(&mut self, to: &RuntimeStats) {
let worker = &to.workers[self.my_index];

worker.park_count.store(self.park_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
}

pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
}

pub(crate) fn returned_from_park(&mut self) {}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += u64::from(by);
}

pub(crate) fn incr_poll_count(&mut self) {
self.poll_count += 1;
}
}
Loading