Skip to content

Commit

Permalink
admin: Include tokio runtime metrics in the default metrics export (#215
Browse files Browse the repository at this point in the history
)

This change adds a new independent crate, kubert-prometheus-tokio, that
exports Tokio runtime metrics when the `tokio_unstable` compiler feature
is enabled. When these unstable features are enabled, a set of tokio
runtime metrics is included in the default metrics export. For example:

    # HELP tokio_rt_workers The number of worker threads used by the runtime.
    # TYPE tokio_rt_workers gauge
    tokio_rt_workers 16
    # HELP tokio_rt_park Total number of times worker threads parked.
    # TYPE tokio_rt_park counter
    tokio_rt_park_total 1237
    # HELP tokio_rt_noop Number of times workers unparked but found no new work.
    # TYPE tokio_rt_noop counter
    tokio_rt_noop_total 776
    # HELP tokio_rt_steal Number of tasks stolen by workers from others.
    # TYPE tokio_rt_steal counter
    tokio_rt_steal_total 19
    # HELP tokio_rt_steal_operations Number of times workers stole tasks from other.
    # TYPE tokio_rt_steal_operations counter
    tokio_rt_steal_operations_total 19
    # HELP tokio_rt_remote_schedule Total number of remote schedule operations.
    # TYPE tokio_rt_remote_schedule counter
    tokio_rt_remote_schedule_total 27
    # HELP tokio_rt_local_schedule Total number of local schedule operations.
    # TYPE tokio_rt_local_schedule counter
    tokio_rt_local_schedule_total 546
    # HELP tokio_rt_overflow Total number of overflow operations.
    # TYPE tokio_rt_overflow counter
    tokio_rt_overflow_total 0
    # HELP tokio_rt_polls The number of tasks that have been polled across all worker threads.
    # TYPE tokio_rt_polls counter
    tokio_rt_polls_total 468
    # HELP tokio_rt_busy_seconds Total duration of time when worker threads were busy processing tasks.
    # TYPE tokio_rt_busy_seconds counter
    # UNIT tokio_rt_busy_seconds seconds
    tokio_rt_busy_seconds_total 0.08628424000000004
    # HELP tokio_rt_injection_queue_depth The number of tasks currently scheduled in the runtime's injection queue.
    # TYPE tokio_rt_injection_queue_depth gauge
    tokio_rt_injection_queue_depth 0
    # HELP tokio_rt_local_queue_depth The total number of tasks currently scheduled in workers' local queues.
    # TYPE tokio_rt_local_queue_depth gauge
    tokio_rt_local_queue_depth 0
    # HELP tokio_rt_budget_forced_yield Number of times a worker thread was forced to yield due to budget exhaustion.
    # TYPE tokio_rt_budget_forced_yield counter
    tokio_rt_budget_forced_yield_total 0
    # HELP tokio_rt_io_driver_ready Number of times the IO driver was woken up.
    # TYPE tokio_rt_io_driver_ready counter
    tokio_rt_io_driver_ready_total 80

The kubert-prometheus-tokio crate provides standalone prometheus-client
integration using the tokio-metrics crate. This is achieved by spawning
a background task that updates these metrics once per second. The
polling interval for the default metrics endpoint cannot currently be
configured.
  • Loading branch information
olix0r committed Dec 11, 2023
1 parent f741924 commit a4efe89
Show file tree
Hide file tree
Showing 11 changed files with 349 additions and 14 deletions.
3 changes: 3 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[build]
rustflags = ["--cfg", "tokio_unstable"]
rustdocflags = ["--cfg", "tokio_unstable"]
28 changes: 16 additions & 12 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
@@ -1,14 +1,6 @@
{
"name": "kubert",
"image": "ghcr.io/linkerd/dev:v42",
"extensions": [
"kokakiwi.vscode-just",
"NathanRidley.autotrim",
"rust-lang.rust-analyzer",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"samverschueren.final-newline",
"tamasfe.even-better-toml"
],
"runArgs": [
"--init",
// Use the host network so we can access k3d, etc.
Expand All @@ -19,9 +11,6 @@
],
"overrideCommand": false,
"remoteUser": "code",
"containerEnv": {
"CXX": "clang++-14",
},
"mounts": [
{
"source": "/var/run/docker.sock",
Expand All @@ -33,5 +22,20 @@
"target": "/home/code/.docker",
"type": "bind"
}
]
],
"containerEnv": {
"CXX": "clang++-14",
},
"customizations": {
"vscode": {
"extensions": [
"kokakiwi.vscode-just",
"NathanRidley.autotrim",
"rust-lang.rust-analyzer",
"ms-kubernetes-tools.vscode-kubernetes-tools",
"samverschueren.final-newline",
"tamasfe.even-better-toml"
]
}
}
}
86 changes: 86 additions & 0 deletions .github/workflows/release-prometheus-tokio.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
name: Release kubernetes-prometheus-tokio

on:
pull_request:
paths:
- .github/workflows/release-prometheus-tokio.yml
push:
tags:
- 'kubert-prometheus-tokio/*'

env:
CARGO_INCREMENTAL: 0
CARGO_NET_RETRY: 10
RUSTUP_MAX_RETRIES: 10

permissions:
contents: read

jobs:
cleanup:
runs-on: ubuntu-latest
permissions:
actions: write
steps:
- uses: styfle/cancel-workflow-action@01ce38bf961b4e243a6342cbade0dbc8ba3f0432
with:
all_but_latest: true
access_token: ${{ github.token }}

meta:
timeout-minutes: 5
runs-on: ubuntu-latest
container: ghcr.io/linkerd/dev:v42-rust
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- id: meta
shell: bash
run: |
ref="${{ github.ref }}"
if [[ "$ref" == refs/tags/kubert-prometheus-tokio/* ]]; then
version="${ref##refs/tags/kubert-prometheus-tokio/}"
crate=$(just-cargo crate-version kubert-prometheus-tokio)
if [[ "$crate" != "$version" ]]; then
echo "::error ::Crate version $crate does not match tag $version" >&2
exit 1
fi
( echo version="$version"
echo mode=release
) >> "$GITHUB_OUTPUT"
else
sha="${{ github.sha }}"
( echo version="$(just-cargo crate-version kubert-prometheus-tokio)-git-${sha:0:7}"
echo mode=test
) >> "$GITHUB_OUTPUT"
fi
outputs:
mode: ${{ steps.meta.outputs.mode }}
version: ${{ steps.meta.outputs.version }}

release:
needs: [meta]
permissions:
contents: write
timeout-minutes: 5
runs-on: ubuntu-latest
steps:
- if: needs.meta.outputs.mode == 'release'
uses: softprops/action-gh-release@de2c0eb89ae2a093876385947365aca7b0e5f844
with:
name: kubert-prometheus-tokio ${{ needs.meta.outputs.version }}
generate_release_notes: true

crate:
# Only publish the crate after the rest of the release succeeds.
needs: [meta, release]
timeout-minutes: 10
runs-on: ubuntu-latest
container: ghcr.io/linkerd/dev:v42-rust
env:
RUSTFLAGS: '--cfg tokio_unstable'
RUSTDOCFLAGS: '--cfg tokio_unstable'
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- run: cargo publish --package=kubert-prometheus-tokio --dry-run
- if: needs.meta.outputs.mode == 'release'
run: cargo publish --package=kubert-prometheus-tokio --token=${{ secrets.CRATESIO_TOKEN }}
3 changes: 3 additions & 0 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ jobs:
timeout-minutes: 10
runs-on: ubuntu-latest
container: ghcr.io/linkerd/dev:v42-rust
env:
RUSTFLAGS: '--cfg tokio_unstable'
RUSTDOCFLAGS: '--cfg tokio_unstable'
steps:
- uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
- run: cargo publish --package=kubert --dry-run
Expand Down
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
[workspace]
resolver = "2"
default-members = ["kubert", "kubert-prometheus-process"]
default-members = [
"kubert",
"kubert-prometheus-process",
"kubert-prometheus-tokio",
]
members = [
"kubert",
"kubert-prometheus-process",
"kubert-prometheus-tokio",
"examples",
]
1 change: 1 addition & 0 deletions examples/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ COPY . .
RUN --mount=type=cache,target=/usr/local/cargo/registry \
CARGO_NET_RETRY=10 just-cargo fetch
ARG FEATURES="rustls-tls"
ENV RUSTFLAGS="--cfg tokio_unstable"
RUN --mount=type=cache,target=/usr/local/cargo/registry \
CARGO_INCREMENTAL=0 just-cargo build \
--frozen --package=kubert-examples --examples \
Expand Down
4 changes: 4 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ _features := if features == "all" {
# Required to build openssl
export CXX := 'clang++-14'

# Enable tokio-metrics
export RUSTFLAGS := env_var_or_default('RUSTFLAGS', '--cfg tokio_unstable')
export RUSTDOCFLAGS := env_var_or_default('RUSTDOCFLAGS', '--cfg tokio_unstable')

#
# Recipes
#
Expand Down
19 changes: 19 additions & 0 deletions kubert-prometheus-tokio/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "kubert-prometheus-tokio"
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
description = "A prometheus-client tokio runtime metrics collector"
readme = "../README.md"
repository = "https://github.com/olix0r/kubert"
rust-version = "1.65"
keywords = ["prometheus-client", "rokio", "metrics", "monitoring"]

[features]
rt = ["tokio/rt", "tokio/time", "tokio-metrics/rt"]

[dependencies]
prometheus-client = "0.22"
tokio = "1"
tokio-metrics = "0.3"
tracing = "0.1"
186 changes: 186 additions & 0 deletions kubert-prometheus-tokio/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
//! A `tokio-metrics` exporter for `prometheus-client`.

#![deny(rust_2018_idioms, missing_docs, warnings)]
#![forbid(unsafe_code)]
#![cfg_attr(docsrs, feature(doc_cfg))]

#[cfg(all(feature = "rt", not(tokio_unstable)))]
compile_error!("RUSTFLAGS='--cfg tokio_unstable' must be set to use `tokio-metrics/rt`");

#[cfg(all(feature = "rt", tokio_unstable))]
pub use self::rt::Runtime;

#[cfg(all(feature = "rt", tokio_unstable))]
mod rt {
use prometheus_client::{
metrics::{counter::Counter, gauge::Gauge},
registry::{Registry, Unit},
};
use tokio::time;
use tokio_metrics::{RuntimeIntervals, RuntimeMonitor};

/// Tokio runtime metrics.
///
/// NOTE that this module requires unstable tokio functionality that must be
/// enabled via the `tokio_unstable` feature. When it is not enabled, no metrics
/// will be registered.
///
/// `RUSTFLAGS="--cfg tokio_unstable"` must be set at build-time to use this featur
#[derive(Debug)]
pub struct Runtime {
runtime: tokio::runtime::Handle,
metrics: Metrics,
}

#[derive(Debug, Default)]
struct Metrics {
workers: Gauge,
park: Counter,
noop: Counter,
steal: Counter,
steal_operations: Counter,
remote_schedule: Counter,
local_schedule: Counter,
overflow: Counter,
polls: Counter,
busy: Counter<f64>,
injection_queue_depth: Gauge,
local_queue_depth: Gauge,
budget_forced_yield: Counter,
io_driver_ready: Counter,
// TODO poll_count_histogram requires configuration
}

impl Runtime {
/// Registers Tokio runtime metrics with the given registry. Note that
/// metrics are NOT prefixed.
pub fn register(reg: &mut Registry, runtime: tokio::runtime::Handle) -> Self {
let metrics = Metrics::default();

reg.register(
"workers",
"The number of worker threads used by the runtime",
metrics.workers.clone(),
);

reg.register(
"park",
"Total number of times worker threads parked",
metrics.park.clone(),
);
reg.register(
"noop",
"Number of times workers unparked but found no new work",
metrics.noop.clone(),
);
reg.register(
"steal",
"Number of tasks stolen by workers from others",
metrics.steal.clone(),
);
reg.register(
"steal_operations",
"Number of times workers stole tasks from other",
metrics.steal_operations.clone(),
);

reg.register(
"remote_schedule",
"Total number of remote schedule operations",
metrics.remote_schedule.clone(),
);
reg.register(
"local_schedule",
"Total number of local schedule operations",
metrics.local_schedule.clone(),
);

reg.register(
"overflow",
"Total number of overflow operations",
metrics.overflow.clone(),
);
reg.register(
"polls",
"The number of tasks that have been polled across all worker threads",
metrics.polls.clone(),
);
reg.register_with_unit(
"busy",
"Total duration of time when worker threads were busy processing tasks",
Unit::Seconds,
metrics.busy.clone(),
);

reg.register(
"injection_queue_depth",
"The number of tasks currently scheduled in the runtime's injection queue",
metrics.injection_queue_depth.clone(),
);
reg.register(
"local_queue_depth",
"The total number of tasks currently scheduled in workers' local queues",
metrics.local_queue_depth.clone(),
);

reg.register(
"budget_forced_yield",
"Number of times a worker thread was forced to yield due to budget exhaustion",
metrics.budget_forced_yield.clone(),
);
reg.register(
"io_driver_ready",
"Number of times the IO driver was woken up",
metrics.io_driver_ready.clone(),
);

Self { runtime, metrics }
}

/// Drives metrics updates for a runtime according to a fixed interval.
pub async fn updated(&self, interval: &mut time::Interval) -> ! {
let mut probes = RuntimeMonitor::new(&self.runtime).intervals();
loop {
interval.tick().await;
self.metrics.probe(&mut probes);
}
}
}

impl Metrics {
#[tracing::instrument(skip_all, ret, level = tracing::Level::TRACE)]
fn probe(&self, probes: &mut RuntimeIntervals) {
let probe = probes.next().expect("runtime metrics stream must not end");

// Tokio-metrics tracks all of these values as rates so we have
// to turn them back into absolute counters:
self.park.inc_by(probe.total_park_count);
self.noop.inc_by(probe.total_noop_count);
self.steal.inc_by(probe.total_steal_count);
self.steal_operations.inc_by(probe.total_steal_operations);
self.remote_schedule.inc_by(probe.num_remote_schedules);
self.local_schedule.inc_by(probe.total_local_schedule_count);
self.overflow.inc_by(probe.total_overflow_count);
self.polls.inc_by(probe.total_polls_count);
self.busy.inc_by(probe.total_busy_duration.as_secs_f64());
self.io_driver_ready.inc_by(probe.io_driver_ready_count);

// Instantaneous gauges:
self.workers.set(probe.workers_count as i64);
self.injection_queue_depth
.set(probe.total_local_queue_depth as i64);
self.local_queue_depth
.set(probe.total_local_queue_depth as i64);

// Absolute counters need to be incremented by the delta:
if let Some(delta) = probe
.budget_forced_yield_count
.checked_sub(self.budget_forced_yield.get())
{
self.budget_forced_yield.inc_by(delta);
} else {
tracing::trace!("budget_forced_yield_count overflow");
}
}
}
}
Loading

0 comments on commit a4efe89

Please sign in to comment.