Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add task scheduled times histogram #409

Merged
merged 7 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions console-api/proto/common.proto
Expand Up @@ -161,6 +161,10 @@ message PollStats {
// Subtracting this timestamp from `created_at` can be used to calculate the
// time to first poll for this object, a measurement of executor latency.
optional google.protobuf.Timestamp first_poll = 3;
// The timestamp of the most recent time this object was woken.
//
// If this is `None`, the object has not yet been woken.
optional google.protobuf.Timestamp last_wake = 7;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like this needs to be removed, since we changed #406 to add this to TaskStats instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to have ended up with last_wake and scheduled_time twice somehow. Sorry.

// The timestamp of the most recent time this objects's poll method was invoked.
//
// If this is `None`, the object has not yet been polled.
Expand All @@ -185,6 +189,13 @@ message PollStats {
// has spent *waiting* to be polled (including the `scheduled_time` value
// from `TaskStats`, if this is a task).
google.protobuf.Duration busy_time = 6;
// The total duration this object was scheduled prior to being polled, summed
// across all poll cycles. Note that this includes only polls that have
// started and is not reflecting any scheduled state where the polling hasn't
// yet finished. Subtracting both `busy_time` and `scheduled_time` from the
// total lifetime of the polled object results in the amount of time it spent
// unable to progress because it was waiting on some resource.
google.protobuf.Duration scheduled_time = 8;
hawkw marked this conversation as resolved.
Show resolved Hide resolved
}

// State attributes of an entity. These are dependent on the type of the entity.
Expand Down
6 changes: 6 additions & 0 deletions console-api/proto/tasks.proto
Expand Up @@ -59,6 +59,12 @@ message TaskDetails {
// A histogram plus additional data.
DurationHistogram histogram = 4;
}

// A histogram of task scheduled durations.
//
// The scheduled duration is the time a task spends between being
// woken and when it is next polled.
DurationHistogram scheduled_times_histogram = 5;
}

// Data recorded when a new task is spawned.
Expand Down
13 changes: 13 additions & 0 deletions console-api/src/generated/rs.tokio.console.common.rs
Expand Up @@ -235,6 +235,11 @@ pub struct PollStats {
/// time to first poll for this object, a measurement of executor latency.
#[prost(message, optional, tag="3")]
pub first_poll: ::core::option::Option<::prost_types::Timestamp>,
/// The timestamp of the most recent time this object was woken.
///
/// If this is `None`, the object has not yet been woken.
#[prost(message, optional, tag="7")]
pub last_wake: ::core::option::Option<::prost_types::Timestamp>,
/// The timestamp of the most recent time this objects's poll method was invoked.
///
/// If this is `None`, the object has not yet been polled.
Expand Down Expand Up @@ -262,6 +267,14 @@ pub struct PollStats {
/// from `TaskStats`, if this is a task).
#[prost(message, optional, tag="6")]
pub busy_time: ::core::option::Option<::prost_types::Duration>,
/// The total duration this object was scheduled prior to being polled, summed
/// across all poll cycles. Note that this includes only polls that have
/// started and is not reflecting any scheduled state where the polling hasn't
/// yet finished. Subtracting both `busy_time` and `scheduled_time` from the
/// total lifetime of the polled object results in the amount of time it spent
/// unable to progress because it was waiting on some resource.
#[prost(message, optional, tag="8")]
pub scheduled_time: ::core::option::Option<::prost_types::Duration>,
}
/// State attributes of an entity. These are dependent on the type of the entity.
///
Expand Down
6 changes: 6 additions & 0 deletions console-api/src/generated/rs.tokio.console.tasks.rs
Expand Up @@ -43,6 +43,12 @@ pub struct TaskDetails {
/// The timestamp for when the update to the task took place.
#[prost(message, optional, tag="2")]
pub now: ::core::option::Option<::prost_types::Timestamp>,
/// A histogram of task scheduled durations.
///
/// The scheduled duration is the time a task spends between being
/// woken and when it is next polled.
#[prost(message, optional, tag="5")]
pub scheduled_times_histogram: ::core::option::Option<DurationHistogram>,
/// A histogram of task poll durations.
///
/// This is either:
Expand Down
2 changes: 2 additions & 0 deletions console-subscriber/src/aggregator/mod.rs
Expand Up @@ -327,6 +327,7 @@ impl Aggregator {
task_id: Some(id.clone().into()),
now,
poll_times_histogram: Some(stats.poll_duration_histogram()),
scheduled_times_histogram: Some(stats.scheduled_duration_histogram()),
})
{
self.details_watchers
Expand Down Expand Up @@ -374,6 +375,7 @@ impl Aggregator {
task_id: Some(id.clone().into()),
now: Some(self.base_time.to_timestamp(Instant::now())),
poll_times_histogram: Some(task_stats.poll_duration_histogram()),
scheduled_times_histogram: Some(task_stats.scheduled_duration_histogram()),
};
watchers.retain(|watch| watch.update(&details));
!watchers.is_empty()
Expand Down
24 changes: 24 additions & 0 deletions console-subscriber/src/builder.rs
Expand Up @@ -50,6 +50,12 @@ pub struct Builder {
/// Any polls exceeding this duration will be clamped to this value. Higher
/// values will result in more memory usage.
pub(super) poll_duration_max: Duration,

/// The maximum value for the task scheduled duration histogram.
///
/// Any scheduled times exceeding this duration will be clamped to this
/// value. Higher values will result in more memory usage.
pub(super) scheduled_duration_max: Duration,
}

impl Default for Builder {
Expand All @@ -60,6 +66,7 @@ impl Default for Builder {
publish_interval: ConsoleLayer::DEFAULT_PUBLISH_INTERVAL,
retention: ConsoleLayer::DEFAULT_RETENTION,
poll_duration_max: ConsoleLayer::DEFAULT_POLL_DURATION_MAX,
scheduled_duration_max: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX,
server_addr: ServerAddr::Tcp(SocketAddr::new(Server::DEFAULT_IP, Server::DEFAULT_PORT)),
recording_path: None,
filter_env_var: "RUST_LOG".to_string(),
Expand Down Expand Up @@ -235,6 +242,23 @@ impl Builder {
}
}

/// Sets the maximum value for task scheduled duration histograms.
///
/// Any scheduled duration (the time from a task being woken until it is next
/// polled) exceeding this value will be clamped down to this duration
/// and recorded as an outlier.
///
/// By default, this is [one second]. Higher values will increase per-task
/// memory usage.
///
/// [one second]: ConsoleLayer::DEFAULT_SCHEDULED_DURATION_MAX
pub fn scheduled_duration_histogram_max(self, max: Duration) -> Self {
Self {
scheduled_duration_max: max,
..self
}
}

/// Sets whether tasks, resources, and async ops from the console
/// subscriber thread are recorded.
///
Expand Down
22 changes: 21 additions & 1 deletion console-subscriber/src/lib.rs
Expand Up @@ -123,6 +123,11 @@ pub struct ConsoleLayer {
///
/// By default, this is one second.
max_poll_duration_nanos: u64,

/// Maximum value for the scheduled time histogram.
///
/// By default, this is one second.
max_scheduled_duration_nanos: u64,
}

/// A gRPC [`Server`] that implements the [`tokio-console` wire format][wire].
Expand Down Expand Up @@ -273,6 +278,7 @@ impl ConsoleLayer {
?config.recording_path,
?config.filter_env_var,
?config.poll_duration_max,
?config.scheduled_duration_max,
?base_time,
"configured console subscriber"
);
Expand Down Expand Up @@ -310,6 +316,7 @@ impl ConsoleLayer {
recorder,
base_time,
max_poll_duration_nanos: config.poll_duration_max.as_nanos() as u64,
max_scheduled_duration_nanos: config.scheduled_duration_max.as_nanos() as u64,
};
(layer, server)
}
Expand Down Expand Up @@ -365,6 +372,15 @@ impl ConsoleLayer {
/// See also [`Builder::poll_duration_histogram_max`].
pub const DEFAULT_POLL_DURATION_MAX: Duration = Duration::from_secs(1);

/// The default maximum value for the task scheduled duration histogram.
///
/// Any scheduled duration (the time from a task being woken until it is next
/// polled) exceeding this will be clamped to this value. By default, the
/// maximum scheduled duration is one second.
///
/// See also [`Builder::scheduled_duration_histogram_max`].
pub const DEFAULT_SCHEDULED_DURATION_MAX: Duration = Duration::from_secs(1);

fn is_spawn(&self, meta: &'static Metadata<'static>) -> bool {
self.spawn_callsites.contains(meta)
}
Expand Down Expand Up @@ -567,7 +583,11 @@ where
fields: record::SerializeFields(fields.clone()),
});
if let Some(stats) = self.send_stats(&self.shared.dropped_tasks, move || {
let stats = Arc::new(stats::TaskStats::new(self.max_poll_duration_nanos, at));
let stats = Arc::new(stats::TaskStats::new(
self.max_poll_duration_nanos,
self.max_scheduled_duration_nanos,
at,
));
let event = Event::Spawn {
id: id.clone(),
stats: stats.clone(),
Expand Down
55 changes: 43 additions & 12 deletions console-subscriber/src/stats.rs
Expand Up @@ -117,7 +117,8 @@ struct PollTimestamps<H> {
last_poll_ended: Option<Instant>,
busy_time: Duration,
scheduled_time: Duration,
histogram: H,
poll_histogram: H,
scheduled_histogram: H,
}

#[derive(Debug)]
Expand All @@ -128,8 +129,8 @@ struct Histogram {
max_outlier: Option<u64>,
}

trait RecordPoll {
fn record_poll_duration(&mut self, duration: Duration);
trait RecordDuration {
fn record_duration(&mut self, duration: Duration);
}

impl TimeAnchor {
Expand All @@ -153,15 +154,20 @@ impl TimeAnchor {
}

impl TaskStats {
pub(crate) fn new(poll_duration_max: u64, created_at: Instant) -> Self {
pub(crate) fn new(
poll_duration_max: u64,
scheduled_duration_max: u64,
created_at: Instant,
) -> Self {
Self {
is_dirty: AtomicBool::new(true),
is_dropped: AtomicBool::new(false),
created_at,
dropped_at: Mutex::new(None),
poll_stats: PollStats {
timestamps: Mutex::new(PollTimestamps {
histogram: Histogram::new(poll_duration_max),
poll_histogram: Histogram::new(poll_duration_max),
scheduled_histogram: Histogram::new(scheduled_duration_max),
first_poll: None,
last_wake: None,
last_poll_started: None,
Expand Down Expand Up @@ -240,10 +246,18 @@ impl TaskStats {
}

pub(crate) fn poll_duration_histogram(&self) -> proto::tasks::task_details::PollTimesHistogram {
let hist = self.poll_stats.timestamps.lock().histogram.to_proto();
let hist = self.poll_stats.timestamps.lock().poll_histogram.to_proto();
proto::tasks::task_details::PollTimesHistogram::Histogram(hist)
}

pub(crate) fn scheduled_duration_histogram(&self) -> proto::tasks::DurationHistogram {
self.poll_stats
.timestamps
.lock()
.scheduled_histogram
.to_proto()
}

#[inline]
fn make_dirty(&self) {
self.is_dirty.swap(true, AcqRel);
Expand Down Expand Up @@ -475,7 +489,7 @@ impl ToProto for ResourceStats {

// === impl PollStats ===

impl<H: RecordPoll> PollStats<H> {
impl<H: RecordDuration> PollStats<H> {
fn wake(&self, at: Instant) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
Expand Down Expand Up @@ -515,6 +529,10 @@ impl<H: RecordPoll> PollStats<H> {
return;
}
};

// if we have a scheduled time histogram, add the timestamp
timestamps.scheduled_histogram.record_duration(elapsed);

timestamps.scheduled_time += elapsed;
}

Expand Down Expand Up @@ -550,7 +568,7 @@ impl<H: RecordPoll> PollStats<H> {
};

// if we have a poll time histogram, add the timestamp
timestamps.histogram.record_poll_duration(elapsed);
timestamps.poll_histogram.record_duration(elapsed);

timestamps.busy_time += elapsed;
}
Expand All @@ -564,6 +582,7 @@ impl<H> ToProto for PollStats<H> {
proto::PollStats {
polls: self.polls.load(Acquire) as u64,
first_poll: timestamps.first_poll.map(|at| base_time.to_timestamp(at)),
last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
last_poll_started: timestamps
.last_poll_started
.map(|at| base_time.to_timestamp(at)),
Expand All @@ -577,6 +596,18 @@ impl<H> ToProto for PollStats<H> {
);
Default::default()
})),
scheduled_time: Some(
timestamps
.scheduled_time
.try_into()
.unwrap_or_else(|error| {
eprintln!(
"failed to convert `scheduled_time` to protobuf duration: {}",
error
);
Default::default()
}),
),
}
}
}
Expand Down Expand Up @@ -636,8 +667,8 @@ impl Histogram {
}
}

impl RecordPoll for Histogram {
fn record_poll_duration(&mut self, duration: Duration) {
impl RecordDuration for Histogram {
fn record_duration(&mut self, duration: Duration) {
let mut duration_ns = duration.as_nanos() as u64;

// clamp the duration to the histogram's max value
Expand All @@ -653,8 +684,8 @@ impl RecordPoll for Histogram {
}
}

impl RecordPoll for () {
fn record_poll_duration(&mut self, _: Duration) {
impl RecordDuration for () {
fn record_duration(&mut self, _: Duration) {
// do nothing
}
}
2 changes: 1 addition & 1 deletion tokio-console/src/state/histogram.rs
Expand Up @@ -30,7 +30,7 @@ impl DurationHistogram {
})
}

fn from_proto(proto: &proto::DurationHistogram) -> Option<Self> {
pub(crate) fn from_proto(proto: &proto::DurationHistogram) -> Option<Self> {
let histogram = deserialize_histogram(&proto.raw_histogram[..])?;
Some(Self {
histogram,
Expand Down
4 changes: 4 additions & 0 deletions tokio-console/src/state/mod.rs
Expand Up @@ -221,6 +221,10 @@ impl State {
.poll_times_histogram
.as_ref()
.and_then(histogram::DurationHistogram::from_poll_durations),
scheduled_times_histogram: update
.scheduled_times_histogram
.as_ref()
.and_then(histogram::DurationHistogram::from_proto),
};

*self.current_task_details.borrow_mut() = Some(details);
Expand Down
6 changes: 6 additions & 0 deletions tokio-console/src/state/tasks.rs
Expand Up @@ -32,6 +32,7 @@ pub(crate) struct TasksState {
pub(crate) struct Details {
pub(crate) span_id: SpanId,
pub(crate) poll_times_histogram: Option<DurationHistogram>,
pub(crate) scheduled_times_histogram: Option<DurationHistogram>,
}

#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -264,6 +265,10 @@ impl Details {
pub(crate) fn poll_times_histogram(&self) -> Option<&DurationHistogram> {
self.poll_times_histogram.as_ref()
}

pub(crate) fn scheduled_times_histogram(&self) -> Option<&DurationHistogram> {
self.scheduled_times_histogram.as_ref()
}
}

impl Task {
Expand Down Expand Up @@ -454,6 +459,7 @@ impl From<proto::tasks::Stats> for TaskStats {

let poll_stats = pb.poll_stats.expect("task should have poll stats");
let busy = poll_stats.busy_time.map(pb_duration).unwrap_or_default();

let scheduled = pb.scheduled_time.map(pb_duration).unwrap_or_default();
let idle = total.map(|total| total.checked_sub(busy + scheduled).unwrap_or_default());
Self {
Expand Down