Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add scheduled time per task #406

Merged
merged 11 commits into from
Apr 19, 2023
9 changes: 6 additions & 3 deletions console-api/proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,13 @@ message PollStats {
// its poll method has completed.
optional google.protobuf.Timestamp last_poll_ended = 5;
// The total duration this object was being *actively polled*, summed across
// all polls. Note that this includes only polls that have completed and is
// not reflecting any inprogress polls. Subtracting `busy_time` from the
// all polls.
//
// Note that this includes only polls that have completed, and does not
// reflect any in-progress polls. Subtracting `busy_time` from the
// total lifetime of the polled object results in the amount of time it
// has spent *waiting* to be polled.
// has spent *waiting* to be polled (including the `scheduled_time` value
// from `TaskStats`, if this is a task).
google.protobuf.Duration busy_time = 6;
}

Expand Down
10 changes: 10 additions & 0 deletions console-api/proto/tasks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,16 @@ message Stats {
common.PollStats poll_stats = 7;
// The total number of times this task has woken itself.
uint64 self_wakes = 8;
// The total duration this task was scheduled prior to being polled, summed
// across all poll cycles.
//
// Note that this includes only polls that have started, and does not
// reflect any scheduled state where the task hasn't yet been polled.
// Subtracting both `busy_time` (from the task's `PollStats`) and
// `scheduled_time` from the total lifetime of the task results in the
// amount of time it spent unable to progress because it was waiting on
// some resource.
google.protobuf.Duration scheduled_time = 9;
}


Expand Down
9 changes: 6 additions & 3 deletions console-api/src/generated/rs.tokio.console.common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,13 @@ pub struct PollStats {
#[prost(message, optional, tag="5")]
pub last_poll_ended: ::core::option::Option<::prost_types::Timestamp>,
/// The total duration this object was being *actively polled*, summed across
/// all polls. Note that this includes only polls that have completed and is
/// not reflecting any inprogress polls. Subtracting `busy_time` from the
/// all polls.
///
/// Note that this includes only polls that have completed, and does not
/// reflect any in-progress polls. Subtracting `busy_time` from the
/// total lifetime of the polled object results in the amount of time it
/// has spent *waiting* to be polled.
/// has spent *waiting* to be polled (including the `scheduled_time` value
/// from `TaskStats`, if this is a task).
#[prost(message, optional, tag="6")]
pub busy_time: ::core::option::Option<::prost_types::Duration>,
}
Expand Down
11 changes: 11 additions & 0 deletions console-api/src/generated/rs.tokio.console.tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ pub struct Stats {
/// The total number of times this task has woken itself.
#[prost(uint64, tag="8")]
pub self_wakes: u64,
/// The total duration this task was scheduled prior to being polled, summed
/// across all poll cycles.
///
/// Note that this includes only polls that have started, and does not
/// reflect any scheduled state where the task hasn't yet been polled.
/// Subtracting both `busy_time` (from the task's `PollStats`) and
/// `scheduled_time` from the total lifetime of the task results in the
/// amount of time it spent unable to progress because it was waiting on
/// some resource.
#[prost(message, optional, tag="9")]
pub scheduled_time: ::core::option::Option<::prost_types::Duration>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DurationHistogram {
Expand Down
78 changes: 78 additions & 0 deletions console-subscriber/examples/long_scheduled.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//! Long scheduled time
//!
//! This example shows an application with a task that has an excessive
//! time between being woken and being polled.
//!
//! It consists of a channel where a sender task sends a message
//! through the channel and then immediately does a lot of work
//! (simulated in this case by a call to `std::thread::sleep`).
//!
//! As soon as the sender task calls `send()` the receiver task gets
//! woken, but because there's only a single worker thread, it doesn't
//! get polled until after the sender task has finished "working" and
//! yields (via `tokio::time::sleep`).
hawkw marked this conversation as resolved.
Show resolved Hide resolved
//!
//! In the console, this is visible in the `rx` task, which has very
//! high scheduled times - in the detail screen you will see that around
//! it is scheduled around 98% of the time. The `tx` task, on the other
//! hand, is busy most of the time.
use std::time::Duration;

use console_subscriber::ConsoleLayer;
use tokio::{sync::mpsc, task};
use tracing::info;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
ConsoleLayer::builder()
.with_default_env()
.publish_interval(Duration::from_millis(100))
.init();

let (tx, rx) = mpsc::channel::<u32>(1);
let count = 10000;

let jh_rx = task::Builder::new()
.name("rx")
.spawn(receiver(rx, count))
.unwrap();
let jh_tx = task::Builder::new()
.name("tx")
.spawn(sender(tx, count))
.unwrap();

let res_tx = jh_tx.await;
let res_rx = jh_rx.await;
info!(
"main: Joined sender: {:?} and receiver: {:?}",
res_tx, res_rx,
);

tokio::time::sleep(Duration::from_millis(200)).await;

Ok(())
}

async fn sender(tx: mpsc::Sender<u32>, count: u32) {
info!("tx: started");

for idx in 0..count {
let msg: u32 = idx;
let res = tx.send(msg).await;
info!("tx: sent msg '{}' result: {:?}", msg, res);

std::thread::sleep(Duration::from_millis(5000));
info!("tx: work done");

tokio::time::sleep(Duration::from_millis(100)).await;
}
}

async fn receiver(mut rx: mpsc::Receiver<u32>, count: u32) {
info!("rx: started");

for _ in 0..count {
let msg = rx.recv().await;
info!("rx: Received message: '{:?}'", msg);
}
}
90 changes: 64 additions & 26 deletions console-subscriber/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ pub(crate) struct TaskStats {
is_dropped: AtomicBool,
// task stats
pub(crate) created_at: Instant,
timestamps: Mutex<TaskTimestamps>,
dropped_at: Mutex<Option<Instant>>,

// waker stats
wakes: AtomicUsize,
Expand Down Expand Up @@ -100,12 +100,6 @@ pub(crate) struct ResourceStats {
pub(crate) parent_id: Option<Id>,
}

#[derive(Debug, Default)]
struct TaskTimestamps {
dropped_at: Option<Instant>,
last_wake: Option<Instant>,
}

#[derive(Debug, Default)]
struct PollStats<H> {
/// The number of polls in progress
Expand All @@ -118,9 +112,11 @@ struct PollStats<H> {
#[derive(Debug, Default)]
struct PollTimestamps<H> {
first_poll: Option<Instant>,
last_wake: Option<Instant>,
last_poll_started: Option<Instant>,
last_poll_ended: Option<Instant>,
busy_time: Duration,
scheduled_time: Duration,
histogram: H,
}

Expand Down Expand Up @@ -162,14 +158,16 @@ impl TaskStats {
is_dirty: AtomicBool::new(true),
is_dropped: AtomicBool::new(false),
created_at,
timestamps: Mutex::new(TaskTimestamps::default()),
dropped_at: Mutex::new(None),
poll_stats: PollStats {
timestamps: Mutex::new(PollTimestamps {
histogram: Histogram::new(poll_duration_max),
first_poll: None,
last_wake: None,
last_poll_started: None,
last_poll_ended: None,
busy_time: Duration::new(0, 0),
scheduled_time: Duration::new(0, 0),
}),
current_polls: AtomicUsize::new(0),
polls: AtomicUsize::new(0),
Expand Down Expand Up @@ -209,13 +207,14 @@ impl TaskStats {
}

fn wake(&self, at: Instant, self_wake: bool) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
self.wakes.fetch_add(1, Release);
self.poll_stats.wake(at);

self.wakes.fetch_add(1, Release);
if self_wake {
self.wakes.fetch_add(1, Release);
}

self.make_dirty();
}

pub(crate) fn start_poll(&self, at: Instant) {
Expand All @@ -235,8 +234,7 @@ impl TaskStats {
return;
}

let mut timestamps = self.timestamps.lock();
let _prev = timestamps.dropped_at.replace(dropped_at);
let _prev = self.dropped_at.lock().replace(dropped_at);
debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!");
self.make_dirty();
}
Expand All @@ -257,16 +255,28 @@ impl ToProto for TaskStats {

fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
let poll_stats = Some(self.poll_stats.to_proto(base_time));
let timestamps = self.timestamps.lock();
let timestamps = self.poll_stats.timestamps.lock();
proto::tasks::Stats {
poll_stats,
created_at: Some(base_time.to_timestamp(self.created_at)),
dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)),
dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
wakes: self.wakes.load(Acquire) as u64,
waker_clones: self.waker_clones.load(Acquire) as u64,
self_wakes: self.self_wakes.load(Acquire) as u64,
waker_drops: self.waker_drops.load(Acquire) as u64,
last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
scheduled_time: Some(
timestamps
.scheduled_time
.try_into()
.unwrap_or_else(|error| {
eprintln!(
"failed to convert `scheduled_time` to protobuf duration: {}",
error
);
Default::default()
}),
),
}
}
}
Expand All @@ -287,7 +297,7 @@ impl DroppedAt for TaskStats {
// avoid acquiring the lock if we know we haven't tried to drop this
// thing yet
if self.is_dropped.load(Acquire) {
return self.timestamps.lock().dropped_at;
return *self.dropped_at.lock();
}

None
Expand Down Expand Up @@ -466,18 +476,46 @@ impl ToProto for ResourceStats {
// === impl PollStats ===

impl<H: RecordPoll> PollStats<H> {
fn start_poll(&self, at: Instant) {
if self.current_polls.fetch_add(1, AcqRel) == 0 {
// We are starting the first poll
let mut timestamps = self.timestamps.lock();
if timestamps.first_poll.is_none() {
timestamps.first_poll = Some(at);
}
fn wake(&self, at: Instant) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
}

timestamps.last_poll_started = Some(at);
fn start_poll(&self, at: Instant) {
if self.current_polls.fetch_add(1, AcqRel) > 0 {
return;
}

self.polls.fetch_add(1, Release);
// We are starting the first poll
let mut timestamps = self.timestamps.lock();
if timestamps.first_poll.is_none() {
timestamps.first_poll = Some(at);
}

timestamps.last_poll_started = Some(at);

self.polls.fetch_add(1, Release);

// If the last poll ended after the last wake then it was likely
// a self-wake, so we measure from the end of the last poll instead.
// This also ensures that `busy_time` and `scheduled_time` don't overlap.
let scheduled = match std::cmp::max(timestamps.last_wake, timestamps.last_poll_ended) {
Some(scheduled) => scheduled,
None => return, // Async operations record polls, but not wakes
};

let elapsed = match at.checked_duration_since(scheduled) {
Some(elapsed) => elapsed,
None => {
eprintln!(
"possible Instant clock skew detected: a poll's start timestamp \
was before the wake time/last poll end timestamp\nwake = {:?}\n start = {:?}",
scheduled, at
);
return;
}
};
timestamps.scheduled_time += elapsed;
}

fn end_poll(&self, at: Instant) {
Expand Down Expand Up @@ -534,7 +572,7 @@ impl<H> ToProto for PollStats<H> {
.map(|at| base_time.to_timestamp(at)),
busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| {
eprintln!(
"failed to convert busy time to protobuf duration: {}",
"failed to convert `busy_time` to protobuf duration: {}",
error
);
Default::default()
Expand Down