diff --git a/Cargo.toml b/Cargo.toml index 50d7a1c..4055b9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ rt = ["tokio"] tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" -tokio = { version = "1.26.0", features = ["rt", "stats", "time", "net"], optional = true } +tokio = { version = "1.31.0", features = ["rt", "stats", "time", "net"], optional = true } [dev-dependencies] axum = "0.6" @@ -47,4 +47,4 @@ all-features = true rustdoc-args = ["--cfg", "docsrs", "--cfg", "tokio_unstable"] # it's necessary to _also_ pass `--cfg tokio_unstable` to rustc, or else # dependencies will not be enabled, and the docs build will fail. -rustc-args = ["--cfg", "tokio_unstable"] +rustc-args = ["--cfg", "tokio_unstable"] \ No newline at end of file diff --git a/src/runtime.rs b/src/runtime.rs index 094a053..321eaaf 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -178,6 +178,68 @@ pub struct RuntimeMetrics { /// - [`RuntimeMetrics::max_park_count`] pub min_park_count: u64, + /// The average duration of a single invocation of poll on a task. + /// + /// This average is an exponentially-weighted moving average of the duration + /// of task polls on all runtime workers. + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("mean task poll duration is {:?}", interval.mean_poll_duration); + /// } + /// ``` + pub mean_poll_duration: Duration, + + /// The average duration of a single invocation of poll on a task on the + /// worker with the lowest value. + /// + /// This average is an exponentially-weighted moving average of the duration + /// of task polls on the runtime worker with the lowest value. + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min); + /// } + /// ``` + pub mean_poll_duration_worker_min: Duration, + + /// The average duration of a single invocation of poll on a task on the + /// worker with the highest value. + /// + /// This average is an exponentially-weighted moving average of the duration + /// of task polls on the runtime worker with the highest value. + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max); + /// } + /// ``` + pub mean_poll_duration_worker_max: Duration, + /// The number of times worker threads unparked but performed no work before parking again. /// /// The worker no-op count increases by one each time the worker unparks the thread but finds @@ -306,14 +368,10 @@ pub struct RuntimeMetrics { /// }.await; /// /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// assert_eq!(interval.total_steal_count, 1); - /// assert_eq!(interval.min_steal_count, 0); - /// assert_eq!(interval.max_steal_count, 1); + /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count); /// /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 - /// assert_eq!(interval.total_steal_count, 0); - /// assert_eq!(interval.min_steal_count, 0); - /// assert_eq!(interval.max_steal_count, 0); + /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count); /// } /// /// async fn flush_metrics() { @@ -395,14 +453,10 @@ pub struct RuntimeMetrics { /// }.await; /// /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// assert_eq!(interval.total_steal_operations, 1); - /// assert_eq!(interval.min_steal_operations, 0); - /// assert_eq!(interval.max_steal_operations, 1); + /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations); /// /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 - /// assert_eq!(interval.total_steal_operations, 0); - /// assert_eq!(interval.min_steal_operations, 0); - /// assert_eq!(interval.max_steal_operations, 0); + /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations); /// } /// /// async fn flush_metrics() { @@ -1091,6 +1145,7 @@ impl RuntimeIntervals { min_polls_count: u64::MAX, min_busy_duration: Duration::from_secs(1000000000), min_local_queue_depth: usize::MAX, + mean_poll_duration_worker_min: Duration::MAX, budget_forced_yield_count: budget_forced_yields - self.budget_forced_yield_count, io_driver_ready_count: io_driver_ready_events - self.io_driver_ready_count, ..Default::default() @@ -1105,6 +1160,13 @@ impl RuntimeIntervals { worker.probe(&self.runtime, &mut metrics); } + if metrics.total_polls_count == 0 { + debug_assert_eq!(metrics.mean_poll_duration, Duration::default()); + + metrics.mean_poll_duration_worker_max = Duration::default(); + metrics.mean_poll_duration_worker_min = Duration::default(); + } + metrics } } @@ -1223,6 +1285,9 @@ impl Worker { }}; } + let mut worker_polls_count = self.total_polls_count; + let total_polls_count = metrics.total_polls_count; + metric!( total_park_count, max_park_count, @@ -1272,6 +1337,33 @@ impl Worker { worker_total_busy_duration ); + // Get the number of polls since last probe + worker_polls_count = self.total_polls_count - worker_polls_count; + + // Update the mean task poll duration if there were polls + if worker_polls_count > 0 { + let val = rt.worker_mean_poll_time(self.worker); + + if val > metrics.mean_poll_duration_worker_max { + metrics.mean_poll_duration_worker_max = val; + } + + if val < metrics.mean_poll_duration_worker_min { + metrics.mean_poll_duration_worker_min = val; + } + + // First, scale the current value down + let ratio = total_polls_count as f64 / metrics.total_polls_count as f64; + let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio; + + // Add the scaled current worker's mean poll duration + let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64; + mean += val.as_nanos() as f64 * ratio; + + metrics.mean_poll_duration = Duration::from_nanos(mean as u64); + } + + // Local scheduled tasks is an absolute value let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker);