From 84cbb5322774bf50669ca30128120276c3423bde Mon Sep 17 00:00:00 2001 From: Duarte Nunes Date: Wed, 14 Dec 2022 07:29:10 -0300 Subject: [PATCH] task: also instrument streams (#31) --- Cargo.toml | 1 + examples/stream.rs | 37 +++++++++ src/task.rs | 189 +++++++++++++++++++++++---------------------- 3 files changed, 135 insertions(+), 92 deletions(-) create mode 100644 examples/stream.rs diff --git a/Cargo.toml b/Cargo.toml index 6c62bef..b1c2c42 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ default = ["rt"] rt = ["tokio"] [dependencies] +tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" tokio = { version = "1.15.0", features = ["rt", "stats", "time"], optional = true } diff --git a/examples/stream.rs b/examples/stream.rs new file mode 100644 index 0000000..d2928bd --- /dev/null +++ b/examples/stream.rs @@ -0,0 +1,37 @@ +use std::time::Duration; + +use futures::{stream::FuturesUnordered, StreamExt}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let metrics_monitor = tokio_metrics::TaskMonitor::new(); + + // print task metrics every 500ms + { + let metrics_monitor = metrics_monitor.clone(); + tokio::spawn(async move { + for deltas in metrics_monitor.intervals() { + // pretty-print the metric deltas + println!("{:?}", deltas); + // wait 500ms + tokio::time::sleep(Duration::from_millis(500)).await; + } + }) + }; + + // instrument a stream and await it + let mut stream = + metrics_monitor.instrument((0..3).map(|_| do_work()).collect::>()); + while stream.next().await.is_some() {} + + println!("{:?}", metrics_monitor.cumulative()); + + Ok(()) +} + +async fn do_work() { + for _ in 0..25 { + tokio::task::yield_now().await; + tokio::time::sleep(Duration::from_millis(100)).await; + } +} diff --git a/src/task.rs b/src/task.rs index 02412b1..0e64266 100644 --- a/src/task.rs +++ b/src/task.rs @@ -5,6 +5,7 @@ use std::pin::Pin; use std::sync::atomic::{AtomicU64, Ordering::SeqCst}; use std::sync::Arc; use std::task::{Context, Poll}; +use tokio_stream::Stream; #[cfg(any(feature = "rt"))] use tokio::time::{Duration, Instant}; @@ -1545,7 +1546,7 @@ impl TaskMonitor { /// assert_eq!(monitor.cumulative().first_poll_count, 2); /// } /// ``` - pub fn instrument(&self, task: F) -> Instrumented { + pub fn instrument(&self, task: F) -> Instrumented { self.metrics.instrumented_count.fetch_add(1, SeqCst); Instrumented { task, @@ -2287,113 +2288,117 @@ impl Future for Instrumented { type Output = T::Output; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let poll_start = Instant::now(); - let this = self.project(); - let idled_at = this.idled_at; - let state = this.state; - let instrumented_at = state.instrumented_at; - let metrics = &state.metrics; - - /* accounting for time-to-first-poll and tasks-count */ - // is this the first time this task has been polled? - if !*this.did_poll_once { - // if so, we need to do three things: - /* 1. note that this task *has* been polled */ - *this.did_poll_once = true; - - /* 2. account for the time-to-first-poll of this task */ - // if the time-to-first-poll of this task exceeds `u64::MAX` ns, - // round down to `u64::MAX` nanoseconds - let elapsed = (poll_start - instrumented_at) - .as_nanos() - .try_into() - .unwrap_or(u64::MAX); - // add this duration to `time_to_first_poll_ns_total` - metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst); - - /* 3. increment the count of tasks that have been polled at least once */ - state.metrics.first_poll_count.fetch_add(1, SeqCst); - } - - /* accounting for time-idled and time-scheduled */ - // 1. note (and reset) the instant this task was last awoke - let woke_at = state.woke_at.swap(0, SeqCst); - - // The state of a future is *idling* in the interim between the instant - // it completes a `poll`, and the instant it is next awoken. - if *idled_at < woke_at { - // increment the counter of how many idles occured - metrics.total_idled_count.fetch_add(1, SeqCst); - - // compute the duration of the idle - let idle_ns = woke_at - *idled_at; - - // adjust the total elasped time monitored tasks spent idling - metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst); - } - - // if this task spent any time in the scheduled state after instrumentation, - // and after first poll, `woke_at` will be greater than 0. - if woke_at > 0 { - // increment the counter of how many schedules occured - metrics.total_scheduled_count.fetch_add(1, SeqCst); - - // recall that the `woke_at` field is internally represented as - // nanoseconds-since-instrumentation. here, for accounting purposes, - // we need to instead represent it as a proper `Instant`. - let woke_instant = instrumented_at + Duration::from_nanos(woke_at); - - // the duration this task spent scheduled is time time elapsed between - // when this task was awoke, and when it was polled. - let scheduled_ns = (poll_start - woke_instant) - .as_nanos() - .try_into() - .unwrap_or(u64::MAX); - - // add `scheduled_ns` to the Monitor's total - metrics - .total_scheduled_duration_ns - .fetch_add(scheduled_ns, SeqCst); - } - - // Register the waker - state.waker.register(cx.waker()); + instrument_poll(cx, self, Future::poll) + } +} - // Get the instrumented waker - let waker_ref = futures_util::task::waker_ref(state); - let mut cx = Context::from_waker(&waker_ref); +impl Stream for Instrumented { + type Item = T::Item; - // Poll the task - let inner_poll_start = Instant::now(); - let ret = Future::poll(this.task, &mut cx); - let inner_poll_end = Instant::now(); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + instrument_poll(cx, self, Stream::poll_next) + } +} - /* idle time starts now */ - *idled_at = (inner_poll_end - instrumented_at) +fn instrument_poll( + cx: &mut Context, + instrumented: Pin<&mut Instrumented>, + poll_fn: impl FnOnce(Pin<&mut T>, &mut Context) -> Poll, +) -> Poll { + let poll_start = Instant::now(); + let this = instrumented.project(); + let idled_at = this.idled_at; + let state = this.state; + let instrumented_at = state.instrumented_at; + let metrics = &state.metrics; + /* accounting for time-to-first-poll and tasks-count */ + // is this the first time this task has been polled? + if !*this.did_poll_once { + // if so, we need to do three things: + /* 1. note that this task *has* been polled */ + *this.did_poll_once = true; + + /* 2. account for the time-to-first-poll of this task */ + // if the time-to-first-poll of this task exceeds `u64::MAX` ns, + // round down to `u64::MAX` nanoseconds + let elapsed = (poll_start - instrumented_at) .as_nanos() .try_into() .unwrap_or(u64::MAX); + // add this duration to `time_to_first_poll_ns_total` + metrics.total_first_poll_delay_ns.fetch_add(elapsed, SeqCst); - /* accounting for poll time */ - let inner_poll_duration = inner_poll_end - inner_poll_start; - let inner_poll_ns: u64 = inner_poll_duration + /* 3. increment the count of tasks that have been polled at least once */ + state.metrics.first_poll_count.fetch_add(1, SeqCst); + } + /* accounting for time-idled and time-scheduled */ + // 1. note (and reset) the instant this task was last awoke + let woke_at = state.woke_at.swap(0, SeqCst); + // The state of a future is *idling* in the interim between the instant + // it completes a `poll`, and the instant it is next awoken. + if *idled_at < woke_at { + // increment the counter of how many idles occurred + metrics.total_idled_count.fetch_add(1, SeqCst); + + // compute the duration of the idle + let idle_ns = woke_at - *idled_at; + + // adjust the total elapsed time monitored tasks spent idling + metrics.total_idle_duration_ns.fetch_add(idle_ns, SeqCst); + } + // if this task spent any time in the scheduled state after instrumentation, + // and after first poll, `woke_at` will be greater than 0. + if woke_at > 0 { + // increment the counter of how many schedules occurred + metrics.total_scheduled_count.fetch_add(1, SeqCst); + + // recall that the `woke_at` field is internally represented as + // nanoseconds-since-instrumentation. here, for accounting purposes, + // we need to instead represent it as a proper `Instant`. + let woke_instant = instrumented_at + Duration::from_nanos(woke_at); + + // the duration this task spent scheduled is time time elapsed between + // when this task was awoke, and when it was polled. + let scheduled_ns = (poll_start - woke_instant) .as_nanos() .try_into() .unwrap_or(u64::MAX); - let (count_bucket, duration_bucket) = // was this a slow or fast poll? + // add `scheduled_ns` to the Monitor's total + metrics + .total_scheduled_duration_ns + .fetch_add(scheduled_ns, SeqCst); + } + // Register the waker + state.waker.register(cx.waker()); + // Get the instrumented waker + let waker_ref = futures_util::task::waker_ref(state); + let mut cx = Context::from_waker(&waker_ref); + // Poll the task + let inner_poll_start = Instant::now(); + let ret = poll_fn(this.task, &mut cx); + let inner_poll_end = Instant::now(); + /* idle time starts now */ + *idled_at = (inner_poll_end - instrumented_at) + .as_nanos() + .try_into() + .unwrap_or(u64::MAX); + /* accounting for poll time */ + let inner_poll_duration = inner_poll_end - inner_poll_start; + let inner_poll_ns: u64 = inner_poll_duration + .as_nanos() + .try_into() + .unwrap_or(u64::MAX); + let (count_bucket, duration_bucket) = // was this a slow or fast poll? if inner_poll_duration >= metrics.slow_poll_threshold { (&metrics.total_slow_poll_count, &metrics.total_slow_poll_duration) } else { (&metrics.total_fast_poll_count, &metrics.total_fast_poll_duration_ns) }; - - // update the appropriate bucket - count_bucket.fetch_add(1, SeqCst); - duration_bucket.fetch_add(inner_poll_ns, SeqCst); - - ret - } + // update the appropriate bucket + count_bucket.fetch_add(1, SeqCst); + duration_bucket.fetch_add(inner_poll_ns, SeqCst); + ret } impl State {