Skip to content

Commit

Permalink
feat: add scheduled time per task (#406)
Browse files Browse the repository at this point in the history
Each task displays the sum of the time it has been idle and busy, as
well as the total. The idle time includes the time between when a task
is woken, and when the runtime actually polls that task.

There are cases where a task may be scheduled for a long time after
being woken, before it is polled. Especially if many tasks are woken at
the same time and don't yield back to the runtime quickly.

To add visibility to this, the total time that a task is scheduled
before being polled has been added. Additionally, a new task state
`Scheduled` has been added. This is displayed in both the tasks table
and in the task detail view.

In the `console-api`, the total `scheduled_time` for the task has been added to
the `TaskStats` message in `tasks.proto`.

This is the first of two parts. In the second part (#409), a histogram for
scheduled time will be added, the equivalent of the poll time histogram
which is already available on the task detail screen.

To show a pathological case which may lead to needing to see the
scheduled time, a new example has been added to the `console-subscriber`

## PR Notes

This PR does something adjacent to what is described in #50, but not quite.

The unicode character used for a `SCHED` task is ⏫.

The second PR (#409) will record a scheduled time histogram the same as it
recorded for poll times. These two changes should go in together (and certainly
shouldn't be released separately). However, this PR is already quite big, so they'll
be separated out.

The idea is that this PR isn't merged until the next one has also been reviewed
and approved. It would be good to get some feedback at this stage though.

The task list view with the new column for `Sched` time:

<img width="1032" alt="a tasks table view for the long-scheduled example" src="https://user-images.githubusercontent.com/89589/232456977-2921f884-4673-420f-ba4f-3646627d44db.png">

The `Task` block in the task detail view showing the new `Scheduled` time entry.

<img width="510" alt="The task block on the task detail view for the rx task in the long-scheduled example" src="https://user-images.githubusercontent.com/89589/232457332-e455e086-9468-42c9-8fda-7965d8d1e6f8.png">
  • Loading branch information
hds authored and hawkw committed Sep 29, 2023
1 parent 4409443 commit f280df9
Show file tree
Hide file tree
Showing 10 changed files with 230 additions and 50 deletions.
9 changes: 6 additions & 3 deletions console-api/proto/common.proto
Expand Up @@ -177,10 +177,13 @@ message PollStats {
// its poll method has completed.
optional google.protobuf.Timestamp last_poll_ended = 5;
// The total duration this object was being *actively polled*, summed across
// all polls. Note that this includes only polls that have completed and is
// not reflecting any inprogress polls. Subtracting `busy_time` from the
// all polls.
//
// Note that this includes only polls that have completed, and does not
// reflect any in-progress polls. Subtracting `busy_time` from the
// total lifetime of the polled object results in the amount of time it
// has spent *waiting* to be polled.
// has spent *waiting* to be polled (including the `scheduled_time` value
// from `TaskStats`, if this is a task).
google.protobuf.Duration busy_time = 6;
}

Expand Down
10 changes: 10 additions & 0 deletions console-api/proto/tasks.proto
Expand Up @@ -130,6 +130,16 @@ message Stats {
common.PollStats poll_stats = 7;
// The total number of times this task has woken itself.
uint64 self_wakes = 8;
// The total duration this task was scheduled prior to being polled, summed
// across all poll cycles.
//
// Note that this includes only polls that have started, and does not
// reflect any scheduled state where the task hasn't yet been polled.
// Subtracting both `busy_time` (from the task's `PollStats`) and
// `scheduled_time` from the total lifetime of the task results in the
// amount of time it spent unable to progress because it was waiting on
// some resource.
google.protobuf.Duration scheduled_time = 9;
}


Expand Down
9 changes: 6 additions & 3 deletions console-api/src/generated/rs.tokio.console.common.rs
Expand Up @@ -253,10 +253,13 @@ pub struct PollStats {
#[prost(message, optional, tag="5")]
pub last_poll_ended: ::core::option::Option<::prost_types::Timestamp>,
/// The total duration this object was being *actively polled*, summed across
/// all polls. Note that this includes only polls that have completed and is
/// not reflecting any inprogress polls. Subtracting `busy_time` from the
/// all polls.
///
/// Note that this includes only polls that have completed, and does not
/// reflect any in-progress polls. Subtracting `busy_time` from the
/// total lifetime of the polled object results in the amount of time it
/// has spent *waiting* to be polled.
/// has spent *waiting* to be polled (including the `scheduled_time` value
/// from `TaskStats`, if this is a task).
#[prost(message, optional, tag="6")]
pub busy_time: ::core::option::Option<::prost_types::Duration>,
}
Expand Down
11 changes: 11 additions & 0 deletions console-api/src/generated/rs.tokio.console.tasks.rs
Expand Up @@ -167,6 +167,17 @@ pub struct Stats {
/// The total number of times this task has woken itself.
#[prost(uint64, tag="8")]
pub self_wakes: u64,
/// The total duration this task was scheduled prior to being polled, summed
/// across all poll cycles.
///
/// Note that this includes only polls that have started, and does not
/// reflect any scheduled state where the task hasn't yet been polled.
/// Subtracting both `busy_time` (from the task's `PollStats`) and
/// `scheduled_time` from the total lifetime of the task results in the
/// amount of time it spent unable to progress because it was waiting on
/// some resource.
#[prost(message, optional, tag="9")]
pub scheduled_time: ::core::option::Option<::prost_types::Duration>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct DurationHistogram {
Expand Down
78 changes: 78 additions & 0 deletions console-subscriber/examples/long_scheduled.rs
@@ -0,0 +1,78 @@
//! Long scheduled time
//!
//! This example shows an application with a task that has an excessive
//! time between being woken and being polled.
//!
//! It consists of a channel where a sender task sends a message
//! through the channel and then immediately does a lot of work
//! (simulated in this case by a call to `std::thread::sleep`).
//!
//! As soon as the sender task calls `send()` the receiver task gets
//! woken, but because there's only a single worker thread, it doesn't
//! get polled until after the sender task has finished "working" and
//! yields (via `tokio::time::sleep`).
//!
//! In the console, this is visible in the `rx` task, which has very
//! high scheduled times - in the detail screen you will see that around
//! it is scheduled around 98% of the time. The `tx` task, on the other
//! hand, is busy most of the time.
use std::time::Duration;

use console_subscriber::ConsoleLayer;
use tokio::{sync::mpsc, task};
use tracing::info;

#[tokio::main(flavor = "multi_thread", worker_threads = 1)]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
ConsoleLayer::builder()
.with_default_env()
.publish_interval(Duration::from_millis(100))
.init();

let (tx, rx) = mpsc::channel::<u32>(1);
let count = 10000;

let jh_rx = task::Builder::new()
.name("rx")
.spawn(receiver(rx, count))
.unwrap();
let jh_tx = task::Builder::new()
.name("tx")
.spawn(sender(tx, count))
.unwrap();

let res_tx = jh_tx.await;
let res_rx = jh_rx.await;
info!(
"main: Joined sender: {:?} and receiver: {:?}",
res_tx, res_rx,
);

tokio::time::sleep(Duration::from_millis(200)).await;

Ok(())
}

async fn sender(tx: mpsc::Sender<u32>, count: u32) {
info!("tx: started");

for idx in 0..count {
let msg: u32 = idx;
let res = tx.send(msg).await;
info!("tx: sent msg '{}' result: {:?}", msg, res);

std::thread::sleep(Duration::from_millis(5000));
info!("tx: work done");

tokio::time::sleep(Duration::from_millis(100)).await;
}
}

async fn receiver(mut rx: mpsc::Receiver<u32>, count: u32) {
info!("rx: started");

for _ in 0..count {
let msg = rx.recv().await;
info!("rx: Received message: '{:?}'", msg);
}
}
90 changes: 64 additions & 26 deletions console-subscriber/src/stats.rs
Expand Up @@ -56,7 +56,7 @@ pub(crate) struct TaskStats {
is_dropped: AtomicBool,
// task stats
pub(crate) created_at: Instant,
timestamps: Mutex<TaskTimestamps>,
dropped_at: Mutex<Option<Instant>>,

// waker stats
wakes: AtomicUsize,
Expand Down Expand Up @@ -100,12 +100,6 @@ pub(crate) struct ResourceStats {
pub(crate) parent_id: Option<Id>,
}

#[derive(Debug, Default)]
struct TaskTimestamps {
dropped_at: Option<Instant>,
last_wake: Option<Instant>,
}

#[derive(Debug, Default)]
struct PollStats<H> {
/// The number of polls in progress
Expand All @@ -118,9 +112,11 @@ struct PollStats<H> {
#[derive(Debug, Default)]
struct PollTimestamps<H> {
first_poll: Option<Instant>,
last_wake: Option<Instant>,
last_poll_started: Option<Instant>,
last_poll_ended: Option<Instant>,
busy_time: Duration,
scheduled_time: Duration,
histogram: H,
}

Expand Down Expand Up @@ -162,14 +158,16 @@ impl TaskStats {
is_dirty: AtomicBool::new(true),
is_dropped: AtomicBool::new(false),
created_at,
timestamps: Mutex::new(TaskTimestamps::default()),
dropped_at: Mutex::new(None),
poll_stats: PollStats {
timestamps: Mutex::new(PollTimestamps {
histogram: Histogram::new(poll_duration_max),
first_poll: None,
last_wake: None,
last_poll_started: None,
last_poll_ended: None,
busy_time: Duration::new(0, 0),
scheduled_time: Duration::new(0, 0),
}),
current_polls: AtomicUsize::new(0),
polls: AtomicUsize::new(0),
Expand Down Expand Up @@ -209,13 +207,14 @@ impl TaskStats {
}

fn wake(&self, at: Instant, self_wake: bool) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
self.wakes.fetch_add(1, Release);
self.poll_stats.wake(at);

self.wakes.fetch_add(1, Release);
if self_wake {
self.wakes.fetch_add(1, Release);
}

self.make_dirty();
}

pub(crate) fn start_poll(&self, at: Instant) {
Expand All @@ -235,8 +234,7 @@ impl TaskStats {
return;
}

let mut timestamps = self.timestamps.lock();
let _prev = timestamps.dropped_at.replace(dropped_at);
let _prev = self.dropped_at.lock().replace(dropped_at);
debug_assert_eq!(_prev, None, "tried to drop a task twice; this is a bug!");
self.make_dirty();
}
Expand All @@ -257,16 +255,28 @@ impl ToProto for TaskStats {

fn to_proto(&self, base_time: &TimeAnchor) -> Self::Output {
let poll_stats = Some(self.poll_stats.to_proto(base_time));
let timestamps = self.timestamps.lock();
let timestamps = self.poll_stats.timestamps.lock();
proto::tasks::Stats {
poll_stats,
created_at: Some(base_time.to_timestamp(self.created_at)),
dropped_at: timestamps.dropped_at.map(|at| base_time.to_timestamp(at)),
dropped_at: self.dropped_at.lock().map(|at| base_time.to_timestamp(at)),
wakes: self.wakes.load(Acquire) as u64,
waker_clones: self.waker_clones.load(Acquire) as u64,
self_wakes: self.self_wakes.load(Acquire) as u64,
waker_drops: self.waker_drops.load(Acquire) as u64,
last_wake: timestamps.last_wake.map(|at| base_time.to_timestamp(at)),
scheduled_time: Some(
timestamps
.scheduled_time
.try_into()
.unwrap_or_else(|error| {
eprintln!(
"failed to convert `scheduled_time` to protobuf duration: {}",
error
);
Default::default()
}),
),
}
}
}
Expand All @@ -287,7 +297,7 @@ impl DroppedAt for TaskStats {
// avoid acquiring the lock if we know we haven't tried to drop this
// thing yet
if self.is_dropped.load(Acquire) {
return self.timestamps.lock().dropped_at;
return *self.dropped_at.lock();
}

None
Expand Down Expand Up @@ -466,18 +476,46 @@ impl ToProto for ResourceStats {
// === impl PollStats ===

impl<H: RecordPoll> PollStats<H> {
fn start_poll(&self, at: Instant) {
if self.current_polls.fetch_add(1, AcqRel) == 0 {
// We are starting the first poll
let mut timestamps = self.timestamps.lock();
if timestamps.first_poll.is_none() {
timestamps.first_poll = Some(at);
}
fn wake(&self, at: Instant) {
let mut timestamps = self.timestamps.lock();
timestamps.last_wake = cmp::max(timestamps.last_wake, Some(at));
}

timestamps.last_poll_started = Some(at);
fn start_poll(&self, at: Instant) {
if self.current_polls.fetch_add(1, AcqRel) > 0 {
return;
}

self.polls.fetch_add(1, Release);
// We are starting the first poll
let mut timestamps = self.timestamps.lock();
if timestamps.first_poll.is_none() {
timestamps.first_poll = Some(at);
}

timestamps.last_poll_started = Some(at);

self.polls.fetch_add(1, Release);

// If the last poll ended after the last wake then it was likely
// a self-wake, so we measure from the end of the last poll instead.
// This also ensures that `busy_time` and `scheduled_time` don't overlap.
let scheduled = match std::cmp::max(timestamps.last_wake, timestamps.last_poll_ended) {
Some(scheduled) => scheduled,
None => return, // Async operations record polls, but not wakes
};

let elapsed = match at.checked_duration_since(scheduled) {
Some(elapsed) => elapsed,
None => {
eprintln!(
"possible Instant clock skew detected: a poll's start timestamp \
was before the wake time/last poll end timestamp\nwake = {:?}\n start = {:?}",
scheduled, at
);
return;
}
};
timestamps.scheduled_time += elapsed;
}

fn end_poll(&self, at: Instant) {
Expand Down Expand Up @@ -534,7 +572,7 @@ impl<H> ToProto for PollStats<H> {
.map(|at| base_time.to_timestamp(at)),
busy_time: Some(timestamps.busy_time.try_into().unwrap_or_else(|error| {
eprintln!(
"failed to convert busy time to protobuf duration: {}",
"failed to convert `busy_time` to protobuf duration: {}",
error
);
Default::default()
Expand Down

0 comments on commit f280df9

Please sign in to comment.