Skip to content

Commit

Permalink
feat(console): add task scheduled times histogram (#409)
Browse files Browse the repository at this point in the history
Display the scheduled time percentiles and sparklines for the histogram
of scheduled times. The schduled time is the time between when a task is
woken and when it is next polled.

The scheduled time, which was already calculated, is now stored in a
histogram and sent over the wire in together with the task details.

This is used to draw percentiles and sparklines on the task details
view, in the same way that is done for the poll times histogram.

The refactoring done in #408 has been used to more easily display two
sets of durations (percentiles and histogram where possible).

## PR Notes

The PR depends on both #406, which adds initial support for recording the
scheduled (wake-to-poll) time, and #408, which refactors the percentile and
histogram widgets to make them easier to reuse.

It shouldn't really be reviewed in depth until those two have been merged as
it contains a lot of duplication and will need to be rebased.

Here are some examples of the scheduled times durations on the task detail view:

<img width="1037" alt="task detail view for the sender task in the long-scheduled example" src="https://user-images.githubusercontent.com/89589/232608774-d8ac48a7-3fe7-4742-a75b-e11bdb23abaa.png">

<img width="1043" alt="task detail view for the burn task in the app example" src="https://user-images.githubusercontent.com/89589/232608864-637f4f52-d4a6-468d-88fc-8fe1d53fdff9.png">
  • Loading branch information
hds authored and hawkw committed Sep 29, 2023
1 parent a5454a5 commit d92a399
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 58 deletions.
6 changes: 6 additions & 0 deletions console-api/proto/tasks.proto
Expand Up @@ -59,6 +59,12 @@ message TaskDetails {
// A histogram plus additional data.
DurationHistogram histogram = 4;
}

// A histogram of task scheduled durations.
//
// The scheduled duration is the time a task spends between being
// woken and when it is next polled.
DurationHistogram scheduled_times_histogram = 5;
}

// Data recorded when a new task is spawned.
Expand Down
6 changes: 6 additions & 0 deletions console-api/src/generated/rs.tokio.console.tasks.rs
Expand Up @@ -43,6 +43,12 @@ pub struct TaskDetails {
/// The timestamp for when the update to the task took place.
#[prost(message, optional, tag="2")]
pub now: ::core::option::Option<::prost_types::Timestamp>,
/// A histogram of task scheduled durations.
///
/// The scheduled duration is the time a task spends between being
/// woken and when it is next polled.
#[prost(message, optional, tag="5")]
pub scheduled_times_histogram: ::core::option::Option<DurationHistogram>,
/// A histogram of task poll durations.
///
/// This is either:
Expand Down
48 changes: 48 additions & 0 deletions console-subscriber/examples/long_sleep.rs
@@ -0,0 +1,48 @@
use std::time::Duration;

use console_subscriber::ConsoleLayer;
use tokio::task::{self, yield_now};
use tracing::info;

#[tokio::main(flavor = "multi_thread", worker_threads = 2)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
ConsoleLayer::builder()
.with_default_env()
.publish_interval(Duration::from_millis(100))
.init();

let long_sleeps = task::Builder::new()
.name("long-sleeps")
.spawn(long_sleeps(5000))
.unwrap();

let sleep_forever = task::Builder::new()
.name("sleep-forever")
.spawn(sleep_forever(5000))
.unwrap();

match (long_sleeps.await, sleep_forever.await) {
(Ok(_), Ok(_)) => info!("Success"),
(_, _) => info!("Error awaiting tasks."),
}

tokio::time::sleep(Duration::from_millis(200)).await;

Ok(())
}

async fn long_sleeps(inc: u64) {
let millis = inc;
loop {
std::thread::sleep(Duration::from_millis(millis));

yield_now().await;
}
}

async fn sleep_forever(inc: u64) {
let millis = inc;
loop {
std::thread::sleep(Duration::from_millis(millis));
}
}
2 changes: 2 additions & 0 deletions console-subscriber/src/aggregator/mod.rs
Expand Up @@ -327,6 +327,7 @@ impl Aggregator {
task_id: Some(id.clone().into()),
now,
poll_times_histogram: Some(stats.poll_duration_histogram()),
scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
})
{
self.details_watchers
Expand Down Expand Up @@ -374,6 +375,7 @@ impl Aggregator {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: Some(task_stats.poll_duration_histogram()),
scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
};
watchers.retain(|watch| watch.update(&details));
!watchers.is_empty()
Expand Down
24 changes: 24 additions & 0 deletions console-subscriber/src/builder.rs
Expand Up @@ -50,6 +50,12 @@ pub struct Builder {
/// Any polls exceeding this duration will be clamped to this value. Higher
/// values will result in more memory usage.
pub(super) poll_duration_max: Duration,

/// The maximum value for the task scheduled duration histogram.
///
/// Any scheduled times exceeding this duration will be clamped to this
/// value. Higher values will result in more memory usage.
pub(super) scheduled_duration_max: Duration,
}

impl Default for Builder {
Expand All @@ -60,6 +66,7 @@ impl Default for Builder {
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
scheduled_duration_max: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX,
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
Expand Down Expand Up @@ -235,6 +242,23 @@ impl Builder {
}
}

/// Sets the maximum value for task scheduled duration histograms.
///
/// Any scheduled duration (the time from a task being woken until it is next
/// polled) exceeding this value will be clamped down to this duration
/// and recorded as an outlier.
///
/// By default, this is [one second]. Higher values will increase per-task
/// memory usage.
///
/// [one second]: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX
pub fn scheduled_duration_histogram_max(self, max: Duration) -> Self {
Self {
scheduled_duration_max: max,
..self
}
}

/// Sets whether tasks, resources, and async ops from the console
/// subscriber thread are recorded.
///
Expand Down
22 changes: 21 additions & 1 deletion console-subscriber/src/lib.rs
Expand Up @@ -123,6 +123,11 @@ pub struct ConsoleLayer {
///
/// By default, this is one second.
max_poll_duration_nanos: u64,

/// Maximum value for the scheduled time histogram.
///
/// By default, this is one second.
max_scheduled_duration_nanos: u64,
}

/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
Expand Down Expand Up @@ -273,6 +278,7 @@ impl ConsoleLayer {
?config.recording_path,
?config.filter_env_var,
?config.poll_duration_max,
?config.scheduled_duration_max,
?base_time,
"configured console subscriber"
);
Expand Down Expand Up @@ -310,6 +316,7 @@ impl ConsoleLayer {
recorder,
base_time,
max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
};
(layer, server)
}
Expand Down Expand Up @@ -365,6 +372,15 @@ impl ConsoleLayer {
/// See also [`Builder::poll_duration_histogram_max`].
pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);

/// The default maximum value for the task scheduled duration histogram.
///
/// Any scheduled duration (the time from a task being woken until it is next
/// polled) exceeding this will be clamped to this value. By default, the
/// maximum scheduled duration is one second.
///
/// See also [`Builder::scheduled_duration_histogram_max`].
pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);

fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
self.spawn_callsites.contains(meta)
}
Expand Down Expand Up @@ -567,7 +583,11 @@ where
fields: record::SerializeFields(fields.clone()),
});
if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
let stats = Arc::new(stats::TaskStats::new(self.max_poll_duration_nanos, at));
let stats = Arc::new(stats::TaskStats::new(
self.max_poll_duration_nanos,
self.max_scheduled_duration_nanos,
at,
));
let event = Event::Spawn {
id: id.clone(),
stats: stats.clone(),
Expand Down
42 changes: 30 additions & 12 deletions console-subscriber/src/stats.rs
Expand Up @@ -117,7 +117,8 @@ struct PollTimestamps<H> {
last_poll_ended: Option<Instant>,
busy_time: Duration,
scheduled_time: Duration,
histogram: H,
poll_histogram: H,
scheduled_histogram: H,
}

#[derive(Debug)]
Expand All @@ -128,8 +129,8 @@ struct Histogram {
max_outlier: Option<u64>,
}

trait RecordPoll {
fn record_poll_duration(&mut self, duration: Duration);
trait RecordDuration {
fn record_duration(&mut self, duration: Duration);
}

impl TimeAnchor {
Expand All @@ -153,15 +154,20 @@ impl TimeAnchor {
}

impl TaskStats {
pub(crate) fn new(poll_duration_max: u64, created_at: Instant) -> Self {
pub(crate) fn new(
poll_duration_max: u64,
scheduled_duration_max: u64,
created_at: Instant,
) -> Self {
Self {
is_dirty: AtomicBool::new(true),
is_dropped: AtomicBool::new(false),
created_at,
dropped_at: Mutex::new(None),
poll_stats: PollStats {
timestamps: Mutex::new(PollTimestamps {
histogram: Histogram::new(poll_duration_max),
poll_histogram: Histogram::new(poll_duration_max),
scheduled_histogram: Histogram::new(scheduled_duration_max),
first_poll: None,
last_wake: None,
last_poll_started: None,
Expand Down Expand Up @@ -240,10 +246,18 @@ impl TaskStats {
}

pub(crate) fn poll_duration_histogram(&self) -> proto::tasks::task_details::PollTimesHistogram {
let hist = self.poll_stats.timestamps.lock().histogram.to_proto();
let hist = self.poll_stats.timestamps.lock().poll_histogram.to_proto();
proto::tasks::task_details::PollTimesHistogram::Histogram(hist)
}

pub(crate) fn scheduled_duration_histogram(&self) -> proto::tasks::DurationHistogram {
self.poll_stats
.timestamps
.lock()
.scheduled_histogram
.to_proto()
}

#[inline]
fn make_dirty(&self) {
self.is_dirty.swap(true, AcqRel);
Expand Down Expand Up @@ -475,7 +489,7 @@ impl ToProto for ResourceStats {

// === impl PollStats ===

impl<H: RecordPoll> PollStats<H> {
impl<H: RecordDuration> PollStats<H> {
fn wake(&self, at: Instant) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
Expand Down Expand Up @@ -515,6 +529,10 @@ impl<H: RecordPoll> PollStats<H> {
return;
}
};

// if we have a scheduled time histogram, add the timestamp
timestamps.scheduled_histogram.record_duration(elapsed);

timestamps.scheduled_time += elapsed;
}

Expand Down Expand Up @@ -550,7 +568,7 @@ impl<H: RecordPoll> PollStats<H> {
};

// if we have a poll time histogram, add the timestamp
timestamps.histogram.record_poll_duration(elapsed);
timestamps.poll_histogram.record_duration(elapsed);

timestamps.busy_time += elapsed;
}
Expand Down Expand Up @@ -636,8 +654,8 @@ impl Histogram {
}
}

impl RecordPoll for Histogram {
fn record_poll_duration(&mut self, duration: Duration) {
impl RecordDuration for Histogram {
fn record_duration(&mut self, duration: Duration) {
let mut duration_ns = duration.as_nanos() as u64;

// clamp the duration to the histogram's max value
Expand All @@ -653,8 +671,8 @@ impl RecordPoll for Histogram {
}
}

impl RecordPoll for () {
fn record_poll_duration(&mut self, _: Duration) {
impl RecordDuration for () {
fn record_duration(&mut self, _: Duration) {
// do nothing
}
}
2 changes: 1 addition & 1 deletion tokio-console/src/state/histogram.rs
Expand Up @@ -30,7 +30,7 @@ impl DurationHistogram {
})
}

fn from_proto(proto: &proto::DurationHistogram) -> Option<Self> {
pub(crate) fn from_proto(proto: &proto::DurationHistogram) -> Option<Self> {
let histogram = deserialize_histogram(&proto.raw_histogram[..])?;
Some(Self {
histogram,
Expand Down
4 changes: 4 additions & 0 deletions tokio-console/src/state/mod.rs
Expand Up @@ -221,6 +221,10 @@ impl State {
.poll_times_histogram
.as_ref()
.and_then(histogram::DurationHistogram::from_poll_durations),
scheduled_times_histogram: update
.scheduled_times_histogram
.as_ref()
.and_then(histogram::DurationHistogram::from_proto),
};

*self.current_task_details.borrow_mut() = Some(details);
Expand Down
5 changes: 5 additions & 0 deletions tokio-console/src/state/tasks.rs
Expand Up @@ -32,6 +32,7 @@ pub(crate) struct TasksState {
pub(crate) struct Details {
pub(crate) span_id: SpanId,
pub(crate) poll_times_histogram: Option<DurationHistogram>,
pub(crate) scheduled_times_histogram: Option<DurationHistogram>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -264,6 +265,10 @@ impl Details {
pub(crate) fn poll_times_histogram(&self) -> Option<&DurationHistogram> {
self.poll_times_histogram.as_ref()
}

pub(crate) fn scheduled_times_histogram(&self) -> Option<&DurationHistogram> {
self.scheduled_times_histogram.as_ref()
}
}

impl Task {
Expand Down
16 changes: 15 additions & 1 deletion tokio-console/src/view/durations.rs
Expand Up @@ -36,14 +36,22 @@ pub(crate) struct Durations<'a> {
percentiles_title: &'a str,
/// Title for histogram sparkline block
histogram_title: &'a str,
/// Fixed width for percentiles block
percentiles_width: u16,
}

impl<'a> Widget for Durations<'a> {
fn render(self, area: tui::layout::Rect, buf: &mut tui::buffer::Buffer) {
// Only split the durations area in half if we're also drawing a
// sparkline. We require UTF-8 to draw the sparkline and also enough width.
let (percentiles_area, histogram_area) = if self.styles.utf8 {
let percentiles_width = cmp::max(self.percentiles_title.len() as u16, 13_u16) + 2;
let percentiles_width = match self.percentiles_width {
// Fixed width
width if width > 0 => width,
// Long enough for the title or for a single line
// like "p99: 544.77µs" (13) (and borders on the sides).
_ => cmp::max(self.percentiles_title.len() as u16, 13_u16) + 2,
};

// If there isn't enough width left after drawing the percentiles
// then we won't draw the sparkline at all.
Expand Down Expand Up @@ -88,6 +96,7 @@ impl<'a> Durations<'a> {
histogram: None,
percentiles_title: "Percentiles",
histogram_title: "Histogram",
percentiles_width: 0,
}
}

Expand All @@ -105,4 +114,9 @@ impl<'a> Durations<'a> {
self.histogram_title = title;
self
}

pub(crate) fn percentiles_width(mut self, width: u16) -> Self {
self.percentiles_width = width;
self
}
}

0 comments on commit d92a399

Please sign in to comment.