Skip to content

Commit

Permalink
Initial work on metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Darksonn committed Aug 25, 2021
1 parent 897fed1 commit bf905c3
Show file tree
Hide file tree
Showing 11 changed files with 205 additions and 23 deletions.
1 change: 1 addition & 0 deletions tokio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ io-util = ["memchr", "bytes"]
# stdin, stdout, stderr
io-std = []
macros = ["tokio-macros"]
metrics = []
net = [
"libc",
"mio/os-poll",
Expand Down
19 changes: 19 additions & 0 deletions tokio/src/macros/cfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,25 @@ macro_rules! cfg_macros {
}
}

macro_rules! cfg_metrics {
($($item:item)*) => {
$(
#[cfg(all(tokio_unstable, feature = "metrics"))]
#[cfg_attr(docsrs, doc(cfg(feature = "metrics")))]
$item
)*
}
}

macro_rules! cfg_not_metrics {
($($item:item)*) => {
$(
#[cfg(not(all(tokio_unstable, feature = "metrics")))]
$item
)*
}
}

macro_rules! cfg_net {
($($item:item)*) => {
$(
Expand Down
18 changes: 18 additions & 0 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::loom::sync::atomic::AtomicBool;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task};
use crate::runtime::metrics::{RuntimeMetrics, WorkerMetricsBatcher};
use crate::sync::notify::Notify;
use crate::util::{waker_ref, Wake, WakerRef};

Expand Down Expand Up @@ -47,6 +48,9 @@ struct Inner<P: Park> {

/// Thread park handle
park: P,

/// Metrics batcher
metrics: WorkerMetricsBatcher,
}

#[derive(Clone)]
Expand Down Expand Up @@ -87,6 +91,9 @@ struct Shared {

/// Indicates whether the blocked on thread was woken.
woken: AtomicBool,

/// Keeps track of various runtime metrics.
metrics: RuntimeMetrics,
}

/// Thread-local context.
Expand Down Expand Up @@ -123,6 +130,7 @@ impl<P: Park> BasicScheduler<P> {
owned: OwnedTasks::new(),
unpark: unpark as Box<dyn Unpark>,
woken: AtomicBool::new(false),
metrics: RuntimeMetrics::new(1),
}),
};

Expand All @@ -133,6 +141,7 @@ impl<P: Park> BasicScheduler<P> {
spawner: spawner.clone(),
tick: 0,
park,
metrics: WorkerMetricsBatcher::new(0),
}));

BasicScheduler {
Expand Down Expand Up @@ -205,6 +214,7 @@ impl<P: Park> Inner<P> {
'outer: loop {
if scheduler.spawner.was_woken() || !polled {
polled = true;
scheduler.metrics.incr_poll_count();
if let Ready(v) = crate::coop::budget(|| future.as_mut().poll(&mut cx)) {
return v;
}
Expand Down Expand Up @@ -238,6 +248,8 @@ impl<P: Park> Inner<P> {
Some(entry) => entry,
None => {
// Park until the thread is signaled
scheduler.metrics.incr_park_count();
scheduler.metrics.submit(&scheduler.spawner.shared.metrics);
scheduler.park.park().expect("failed to park");

// Try polling the `block_on` future next
Expand All @@ -247,6 +259,7 @@ impl<P: Park> Inner<P> {

match entry {
RemoteMsg::Schedule(task) => {
scheduler.metrics.incr_poll_count();
let task = context.shared.owned.assert_owner(task);
crate::coop::budget(|| task.run())
}
Expand All @@ -255,6 +268,7 @@ impl<P: Park> Inner<P> {

// Yield to the park, this drives the timer and pulls any pending
// I/O events.
scheduler.metrics.submit(&scheduler.spawner.shared.metrics);
scheduler
.park
.park_timeout(Duration::from_millis(0))
Expand Down Expand Up @@ -369,6 +383,10 @@ impl Spawner {
handle
}

pub(crate) fn metrics(&self) -> &RuntimeMetrics {
&self.shared.metrics
}

fn pop(&self) -> Option<RemoteMsg> {
match self.shared.queue.lock().as_mut() {
Some(queue) => queue.pop_front(),
Expand Down
68 changes: 68 additions & 0 deletions tokio/src/runtime/metrics/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
//! This file contains the types necessary to collect various types of metrics.
use crate::loom::sync::atomic::{AtomicU64, Ordering};

pub(crate) struct RuntimeMetrics {
workers: Box<[WorkerMetrics]>,
}

pub(crate) struct WorkerMetrics {
park_count: AtomicU64,
steal_count: AtomicU64,
poll_count: AtomicU64,
}


impl RuntimeMetrics {
pub(crate) fn new(worker_threads: usize) -> Self {
let mut workers = Vec::with_capacity(worker_threads);
for _ in 0..worker_threads {
workers.push(WorkerMetrics {
park_count: AtomicU64::new(0),
steal_count: AtomicU64::new(0),
poll_count: AtomicU64::new(0),
});
}

Self {
workers: workers.into_boxed_slice(),
}
}
}


pub(crate) struct WorkerMetricsBatcher {
my_index: usize,
park_count: u64,
steal_count: u64,
poll_count: u64,
}

impl WorkerMetricsBatcher {
pub(crate) fn new(my_index: usize) -> Self {
Self {
my_index,
park_count: 0,
steal_count: 0,
poll_count: 0,
}
}
pub(crate) fn submit(&mut self, to: &RuntimeMetrics) {
let worker = &to.workers[self.my_index];

worker.park_count.store(self.park_count, Ordering::Relaxed);
worker.steal_count.store(self.steal_count, Ordering::Relaxed);
worker.poll_count.store(self.poll_count, Ordering::Relaxed);
}

pub(crate) fn incr_park_count(&mut self) {
self.park_count += 1;
}

pub(crate) fn incr_steal_count(&mut self, by: u16) {
self.steal_count += u64::from(by);
}

pub(crate) fn incr_poll_count(&mut self) {
self.poll_count += 1;
}
}
35 changes: 35 additions & 0 deletions tokio/src/runtime/metrics/mock.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//! This file contains mocks of the types in src/runtime/metrics/metrics.rs

pub(crate) struct RuntimeMetrics {
}

impl RuntimeMetrics {
pub(crate) fn new(_worker_threads: usize) -> Self {
Self {
}
}
}



pub(crate) struct WorkerMetricsBatcher {
}

impl WorkerMetricsBatcher {
pub(crate) fn new(_my_index: usize) -> Self {
Self {
}
}

pub(crate) fn submit(&mut self, _to: &RuntimeMetrics) {
}

pub(crate) fn incr_park_count(&mut self) {
}

pub(crate) fn incr_steal_count(&mut self, _by: u16) {
}

pub(crate) fn incr_poll_count(&mut self) {
}
}
10 changes: 10 additions & 0 deletions tokio/src/runtime/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
cfg_metrics! {
mod metrics;
}

cfg_not_metrics! {
#[path = "mock.rs"]
mod metrics;
}

pub(crate) use self::metrics::{RuntimeMetrics, WorkerMetricsBatcher};
2 changes: 2 additions & 0 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ pub(crate) mod enter;

pub(crate) mod task;

pub(crate) mod metrics;

cfg_rt! {
mod basic_scheduler;
use basic_scheduler::BasicScheduler;
Expand Down
4 changes: 3 additions & 1 deletion tokio/src/runtime/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::loom::cell::UnsafeCell;
use crate::loom::sync::atomic::{AtomicU16, AtomicU32};
use crate::loom::sync::Arc;
use crate::runtime::metrics::WorkerMetricsBatcher;
use crate::runtime::task::{self, Inject};

use std::mem::MaybeUninit;
Expand Down Expand Up @@ -288,7 +289,7 @@ impl<T> Steal<T> {
}

/// Steals half the tasks from self and place them into `dst`.
pub(super) fn steal_into(&self, dst: &mut Local<T>) -> Option<task::Notified<T>> {
pub(super) fn steal_into(&self, dst: &mut Local<T>, metrics: &mut WorkerMetricsBatcher) -> Option<task::Notified<T>> {
// Safety: the caller is the only thread that mutates `dst.tail` and
// holds a mutable reference.
let dst_tail = unsafe { dst.inner.tail.unsync_load() };
Expand All @@ -307,6 +308,7 @@ impl<T> Steal<T> {
// Steal the tasks into `dst`'s buffer. This does not yet expose the
// tasks in `dst`.
let mut n = self.steal_into2(dst, dst_tail);
metrics.incr_steal_count(n);

if n == 0 {
// No tasks were stolen
Expand Down
41 changes: 21 additions & 20 deletions tokio/src/runtime/spawner.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
cfg_rt! {
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::task::JoinHandle;
}
use crate::future::Future;
use crate::runtime::basic_scheduler;
use crate::runtime::metrics::RuntimeMetrics;
use crate::task::JoinHandle;

cfg_rt_multi_thread! {
use crate::runtime::thread_pool;
}

#[derive(Debug, Clone)]
pub(crate) enum Spawner {
#[cfg(feature = "rt")]
Basic(basic_scheduler::Spawner),
#[cfg(feature = "rt-multi-thread")]
ThreadPool(thread_pool::Spawner),
Expand All @@ -25,21 +23,24 @@ impl Spawner {
}
}
}
}

cfg_rt! {
impl Spawner {
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
#[cfg(feature = "rt")]
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
match self {
Spawner::Basic(spawner) => spawner.spawn(future),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.spawn(future),
}
}

pub(crate) fn metrics(&self) -> &RuntimeMetrics {
match self {
Spawner::Basic(spawner) => spawner.metrics(),
#[cfg(feature = "rt-multi-thread")]
Spawner::ThreadPool(spawner) => spawner.metrics(),
}
}
}
5 changes: 5 additions & 0 deletions tokio/src/runtime/thread_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub(crate) use worker::block_in_place;
use crate::loom::sync::Arc;
use crate::runtime::task::JoinHandle;
use crate::runtime::Parker;
use crate::runtime::metrics::RuntimeMetrics;

use std::fmt;
use std::future::Future;
Expand Down Expand Up @@ -99,6 +100,10 @@ impl Spawner {
pub(crate) fn shutdown(&mut self) {
self.shared.close();
}

pub(crate) fn metrics(&self) -> &RuntimeMetrics {
&self.shared.metrics()
}
}

impl fmt::Debug for Spawner {
Expand Down
Loading

0 comments on commit bf905c3

Please sign in to comment.