Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial work on runtime stats #4043

Merged
merged 19 commits into from
Aug 27, 2021
Merged
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
metrics = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it is worth making this a feature flag at all (vs. always on).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's going to be pretty slow on platforms that don't have an AtomicU64 as we would then go through this mock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it is a big deal. Alternatively, we use AtomicUsize let it wrap and it is up to the receiver of data to handle that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there's any non 64bit platforms anymore that are important for production.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can always add a feature flag later as well if someone would like it disabled.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding later is tricky as it technically is a breaking change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right not sure what I was thinking about 😅

net = [
"libc",
"mio/os-poll",
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ macro_rules! cfg_macros {
}
}

macro_rules! cfg_metrics {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "metrics"))]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
$item
)*
}
}

macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, feature = "metrics")))]
$item
)*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::future::poll_fn;
use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::metrics::{RuntimeMetrics, WorkerMetricsBatcher};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::sync::notify::Notify;
use crate::util::{waker_ref, Wake, WakerRef};
Expand Down Expand Up @@ -47,6 +48,9 @@ struct Inner<P: Park> {

/// Thread park handle
park: P,

/// Metrics batcher
metrics: WorkerMetricsBatcher,
}

#[derive(Clone)]
Expand Down Expand Up @@ -87,6 +91,9 @@ struct Shared {

/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Keeps track of various runtime metrics.
metrics: RuntimeMetrics,
}

/// Thread-local context.
Expand Down Expand Up @@ -123,6 +130,7 @@ impl<P: Park> BasicScheduler<P> {
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
metrics: RuntimeMetrics::new(1),
}),
};

Expand All @@ -133,6 +141,7 @@ impl<P: Park> BasicScheduler<P> {
spawner: spawner.clone(),
tick: 0,
park,
metrics: WorkerMetricsBatcher::new(0),
}));

BasicScheduler {
Expand Down Expand Up @@ -205,6 +214,7 @@ impl<P: Park> Inner<P> {
'outer: loop {
if scheduler.spawner.was_woken() || !polled {
polled = true;
scheduler.metrics.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
Expand Down Expand Up @@ -238,7 +248,10 @@ impl<P: Park> Inner<P> {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.metrics.about_to_park();
scheduler.metrics.submit(&scheduler.spawner.shared.metrics);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
scheduler.park.park().expect("failed to park");
scheduler.metrics.returned_from_park();

// Try polling the `block_on` future next
continue 'outer;
Expand All @@ -247,6 +260,7 @@ impl<P: Park> Inner<P> {

match entry {
RemoteMsg::Schedule(task) => {
scheduler.metrics.incr_poll_count();
let task = context.shared.owned.assert_owner(task);
crate::coop::budget(|| task.run())
}
Expand All @@ -255,6 +269,7 @@ impl<P: Park> Inner<P> {

// Yield to the park, this drives the timer and pulls any pending
// I/O events.
scheduler.metrics.submit(&scheduler.spawner.shared.metrics);
scheduler
.park
.park_timeout(Duration::from_millis(0))
Expand Down Expand Up @@ -369,6 +384,10 @@ impl Spawner {
handle
}

pub(crate) fn metrics(&self) -> &RuntimeMetrics {
&self.shared.metrics
}

fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand Down
8 changes: 8 additions & 0 deletions tokio/src/runtime/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ impl Handle {
context::current().ok_or(TryCurrentError(()))
}

cfg_metrics! {
/// Returns a view that lets you get information about how the runtime
/// is performing.
pub fn metrics(&self) -> &crate::runtime::metrics::RuntimeMetrics {
self.spawner.metrics()
}
}

/// Spawn a future onto the Tokio runtime.
///
/// This spawns the given future onto the runtime's executor, usually a
Expand Down
108 changes: 108 additions & 0 deletions tokio/src/runtime/metrics/counter_duration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//! This module contains a type that is essentially a (u16, Duration) pair,
//! including an atomic container for this pair type. Supports durations up to
//! 78 hours.
//!
//! The u16 counter can be used to determine if the duration has changed since
//! last time you look at it.
use crate::loom::sync::atomic::{AtomicU64, Ordering};
use std::fmt;
use std::time::Duration;

const MAX_NANOS: u64 = (1u64 << 48) - 1;
const NANOS_MASK: u64 = MAX_NANOS;
const COUNTER_MASK: u64 = !NANOS_MASK;
const COUNTER_ONE: u64 = 1u64 << 48;

#[derive(Copy, Clone, Default)]
pub(crate) struct CounterDuration {
value: u64,
}

impl CounterDuration {
#[cfg(test)]
pub(crate) fn new(counter: u16, duration: Duration) -> Self {
let nanos = std::cmp::min(duration.as_nanos(), u128::from(MAX_NANOS)) as u64;
Self {
value: (u64::from(counter) << 48) | nanos,
}
}

pub(crate) fn counter(self) -> u16 {
(self.value >> 48) as u16
}

pub(crate) fn duration(self) -> Duration {
Duration::from_nanos(self.value & MAX_NANOS)
}

/// Increment the counter by one and replace the duration with the supplied
/// duration.
pub(crate) fn set_next_duration(&mut self, dur: Duration) {
let nanos = std::cmp::min(dur.as_nanos(), u128::from(MAX_NANOS)) as u64;
let counter_bits = (self.value & COUNTER_MASK).wrapping_add(COUNTER_ONE);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
self.value = counter_bits | nanos;
}

pub(crate) fn into_pair(self) -> (u16, Duration) {
(self.counter(), self.duration())
}
}

#[derive(Default)]
pub(crate) struct AtomicCounterDuration {
value: AtomicU64,
}

impl AtomicCounterDuration {
pub(crate) fn store(&self, new_value: CounterDuration, ordering: Ordering) {
self.value.store(new_value.value, ordering);
}

pub(crate) fn load(&self, ordering: Ordering) -> CounterDuration {
CounterDuration {
value: self.value.load(ordering),
}
}
}

impl fmt::Debug for CounterDuration {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("CounterDuration")
.field("counter", &self.counter())
.field("duration", &self.duration())
.finish()
}
}

impl fmt::Debug for AtomicCounterDuration {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
let value = self.load(Ordering::Relaxed);
fmt.debug_struct("AtomicCounterDuration")
.field("counter", &value.counter())
.field("duration", &value.duration())
.finish()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn counter_increment() {
let mut dur = 1u64;
let mut cd = CounterDuration::new(u16::MAX, Duration::from_nanos(dur));

for counter in 0..(1u32 << 18) {
// Multiply by a prime number to get a sequence of mostly unrelated
// durations.
dur = (dur * 32717) % (1 + MAX_NANOS);
cd.set_next_duration(Duration::from_nanos(dur));

// Note that `counter as u16` will truncate extra bits. This is
// intended.
assert_eq!(cd.counter(), counter as u16);
assert_eq!(cd.duration().as_nanos(), u128::from(dur));
}
}
}
124 changes: 124 additions & 0 deletions tokio/src/runtime/metrics/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! This file contains the types necessary to collect various types of metrics.
use crate::loom::sync::atomic::{AtomicU64, Ordering::Relaxed};
use crate::runtime::metrics::counter_duration::{AtomicCounterDuration, CounterDuration};

use std::time::{Duration, Instant};

/// This type contains methods to retrieve metrics from a Tokio runtime.
#[derive(Debug)]
pub struct RuntimeMetrics {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider naming this just Metrics (or something like that). It already is in a runtime module. runtime::RuntimeMetrics is a bit of a stutter.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would call this RuntimeStats or just Stats. Stats::poll_count() seems pretty natural. I feel like Metrics is decently overloaded (at least in applications they are).

workers: Box<[WorkerMetrics]>,
}

/// This type contains methods to retrieve metrics from a worker thread on a Tokio runtime.
#[derive(Debug)]
pub struct WorkerMetrics {
park_count: AtomicU64,
steal_count: AtomicU64,
poll_count: AtomicU64,
park_to_park: AtomicCounterDuration,
}

impl RuntimeMetrics {
pub(crate) fn new(worker_threads: usize) -> Self {
let mut workers = Vec::with_capacity(worker_threads);
for _ in 0..worker_threads {
workers.push(WorkerMetrics {
park_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
park_to_park: AtomicCounterDuration::default(),
});
}

Self {
workers: workers.into_boxed_slice(),
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
}
}

/// Returns a slice containing the worker metrics for each worker thread.
pub fn workers(&self) -> impl Iterator<Item = &WorkerMetrics> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could store WorkerMetrics in a slice. We just have to make sure the struct itself is cache padded. That would be a bit of a simpler API.

self.workers.iter()
}
}

impl WorkerMetrics {
/// Returns the total number of times this worker thread has parked.
pub fn park_count(&self) -> u64 {
self.park_count.load(Relaxed)
}

/// Returns the number of tasks this worker has stolen from other worker
/// threads.
pub fn steal_count(&self) -> u64 {
self.steal_count.load(Relaxed)
}

/// Returns the number of times this worker has polled a task.
pub fn poll_count(&self) -> u64 {
self.poll_count.load(Relaxed)
}

/// Returns the amount of time the runtime spent working between the last
/// two times it parked.
///
/// The `u16` is a counter that is incremented by one each time the duration
/// is changed. The counter will wrap around when it reaches `u16::MAX`.
pub fn park_to_park(&self) -> (u16, Duration) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This name feels odd, how about park_duration?

self.park_to_park.load(Relaxed).into_pair()
}
}

pub(crate) struct WorkerMetricsBatcher {
my_index: usize,
park_count: u64,
steal_count: u64,
poll_count: u64,
last_park: Instant,
park_to_park: CounterDuration,
}

impl WorkerMetricsBatcher {
pub(crate) fn new(my_index: usize) -> Self {
Self {
my_index,
park_count: 0,
steal_count: 0,
poll_count: 0,
last_park: Instant::now(),
park_to_park: CounterDuration::default(),
}
}
pub(crate) fn submit(&mut self, to: &RuntimeMetrics) {
let worker = &to.workers[self.my_index];

worker.park_count.store(self.park_count, Relaxed);
worker.steal_count.store(self.steal_count, Relaxed);
worker.poll_count.store(self.poll_count, Relaxed);
worker.park_to_park.store(self.park_to_park, Relaxed);
}

pub(crate) fn about_to_park(&mut self) {
self.park_count += 1;
self.update_park_to_park();
}

pub(crate) fn returned_from_park(&mut self) {
self.last_park = Instant::now();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So my design originally tried to avoid any Instant::now calls, should we continue to consider it? Do we know what this perf impact might be this deep in the runtime call stack?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't notice this from your RFC, but I've punted this part of the feature for now.

}

#[cfg(feature = "rt-multi-thread")]
pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += u64::from(by);
}

pub(crate) fn incr_poll_count(&mut self) {
self.poll_count += 1;
}

pub(crate) fn update_park_to_park(&mut self) {
let now = Instant::now();
let diff = now - self.last_park;
self.park_to_park.set_next_duration(diff);
}
}
Loading