diff --git a/console-subscriber/examples/app.rs b/console-subscriber/examples/app.rs index f1247ed62..7a1b50c11 100644 --- a/console-subscriber/examples/app.rs +++ b/console-subscriber/examples/app.rs @@ -11,6 +11,7 @@ OPTIONS: blocks Includes a (misbehaving) blocking task burn Includes a (misbehaving) task that spins CPU with self-wakes coma Includes a (misbehaving) task that forgets to register a waker + noyield Includes a (misbehaving) task that spawns tasks that never yield "#; #[tokio::main] @@ -38,6 +39,12 @@ async fn main() -> Result<(), Box> { .spawn(burn(1, 10)) .unwrap(); } + "noyield" => { + tokio::task::Builder::new() + .name("noyield") + .spawn(no_yield(20)) + .unwrap(); + } "help" | "-h" => { eprintln!("{}", HELP); return Ok(()); @@ -114,3 +121,17 @@ async fn burn(min: u64, max: u64) { } } } + +#[tracing::instrument] +async fn no_yield(seconds: u64) { + loop { + let handle = tokio::task::Builder::new() + .name("greedy") + .spawn(async move { + std::thread::sleep(Duration::from_secs(seconds)); + }) + .expect("Couldn't spawn greedy task"); + + _ = handle.await; + } +} diff --git a/tokio-console/src/main.rs b/tokio-console/src/main.rs index 4fc6630bb..8545e30b9 100644 --- a/tokio-console/src/main.rs +++ b/tokio-console/src/main.rs @@ -65,6 +65,7 @@ async fn main() -> color_eyre::Result<()> { .with_task_linters(vec![ warnings::Linter::new(warnings::SelfWakePercent::default()), warnings::Linter::new(warnings::LostWaker), + warnings::Linter::new(warnings::NeverYielded::default()), ]) .with_retain_for(retain_for); let mut input = input::EventStream::new(); diff --git a/tokio-console/src/state/tasks.rs b/tokio-console/src/state/tasks.rs index bf9c8a2c0..97aa4f35b 100644 --- a/tokio-console/src/state/tasks.rs +++ b/tokio-console/src/state/tasks.rs @@ -9,13 +9,13 @@ use crate::{ }, util::Percentage, view, - warnings::Linter, + warnings::{Lint, Linter}, }; use console_api as proto; use ratatui::{style::Color, text::Span}; use std::{ cell::RefCell, - collections::HashMap, + collections::{HashMap, HashSet}, convert::{TryFrom, TryInto}, rc::{Rc, Weak}, time::{Duration, SystemTime}, @@ -24,6 +24,7 @@ use std::{ #[derive(Default, Debug)] pub(crate) struct TasksState { tasks: Store, + pending_lint: HashSet>, pub(crate) linters: Vec>, dropped_events: u64, } @@ -145,6 +146,9 @@ impl TasksState { let mut stats_update = update.stats_update; let linters = &self.linters; + // Gathers the tasks that need to be linted again on the next update cycle + let mut next_pending_lint = HashSet::new(); + self.tasks .insert_with(visibility, update.new_tasks, |ids, mut task| { if task.id.is_none() { @@ -217,15 +221,30 @@ impl TasksState { warnings: Vec::new(), location, }; - task.lint(linters); + if let TaskLintResult::RequiresRecheck = task.lint(linters) { + next_pending_lint.insert(task.id); + } Some((id, task)) }); for (stats, mut task) in self.tasks.updated(stats_update) { tracing::trace!(?task, ?stats, "processing stats update for"); task.stats = stats.into(); - task.lint(linters); + match task.lint(linters) { + TaskLintResult::RequiresRecheck => next_pending_lint.insert(task.id), + // Avoid linting this task again this cycle + _ => self.pending_lint.remove(&task.id), + }; + } + + for id in &self.pending_lint { + if let Some(task) = self.tasks.get(*id) { + if let TaskLintResult::RequiresRecheck = task.borrow_mut().lint(linters) { + next_pending_lint.insert(*id); + } + } } + self.pending_lint = next_pending_lint; self.dropped_events += update.dropped_events; } @@ -430,15 +449,25 @@ impl Task { &self.warnings[..] } - fn lint(&mut self, linters: &[Linter]) { + fn lint(&mut self, linters: &[Linter]) -> TaskLintResult { self.warnings.clear(); + let mut recheck = false; for lint in linters { tracing::debug!(?lint, task = ?self, "checking..."); - if let Some(warning) = lint.check(self) { - tracing::info!(?warning, task = ?self, "found a warning!"); - self.warnings.push(warning) + match lint.check(self) { + Lint::Warning(warning) => { + tracing::info!(?warning, task = ?self, "found a warning!"); + self.warnings.push(warning); + } + Lint::Ok => {} + Lint::Recheck => recheck = true, } } + if recheck { + TaskLintResult::RequiresRecheck + } else { + TaskLintResult::Linted + } } pub(crate) fn location(&self) -> &str { @@ -446,6 +475,11 @@ impl Task { } } +enum TaskLintResult { + Linted, + RequiresRecheck, +} + impl From for TaskStats { fn from(pb: proto::tasks::Stats) -> Self { let created_at = pb diff --git a/tokio-console/src/warnings.rs b/tokio-console/src/warnings.rs index 818b140bc..59d43f99f 100644 --- a/tokio-console/src/warnings.rs +++ b/tokio-console/src/warnings.rs @@ -1,5 +1,9 @@ -use crate::state::tasks::Task; -use std::{fmt::Debug, rc::Rc}; +use crate::state::tasks::{Task, TaskState}; +use std::{ + fmt::Debug, + rc::Rc, + time::{Duration, SystemTime}, +}; /// A warning for a particular type of monitored entity (e.g. task or resource). /// @@ -7,8 +11,8 @@ use std::{fmt::Debug, rc::Rc}; /// generating a warning message describing it. The [`Linter`] type wraps an /// instance of this trait to track active instances of the warning. pub trait Warn: Debug { - /// Returns `true` if the warning applies to `val`. - fn check(&self, val: &T) -> bool; + /// Returns if the warning applies to `val`. + fn check(&self, val: &T) -> Warning; /// Formats a description of the warning detected for a *specific* `val`. /// @@ -46,6 +50,19 @@ pub trait Warn: Debug { fn summary(&self) -> &str; } +/// A result for a warning check +pub enum Warning { + /// No warning for this entity. + Ok, + + /// A warning has been detected for this entity. + Warn, + + /// The warning should be rechecked as the conditions to allow for checking + /// are not satisfied yet + Recheck, +} + #[derive(Debug)] pub(crate) struct Linter(Rc>); @@ -57,17 +74,12 @@ impl Linter { Self(Rc::new(warning)) } - /// Checks if the warning applies to a particular entity, returning a clone - /// of `Self` if it does. - /// - /// The cloned instance of `Self` should be held by the entity that - /// generated the warning, so that it can be formatted. Holding the clone of - /// `Self` will increment the warning count for that entity. - pub(crate) fn check(&self, val: &T) -> Option { - if self.0.check(val) { - Some(Self(self.0.clone())) - } else { - None + /// Checks if the warning applies to a particular entity + pub(crate) fn check(&self, val: &T) -> Lint { + match self.0.check(val) { + Warning::Ok => Lint::Ok, + Warning::Warn => Lint::Warning(Self(self.0.clone())), + Warning::Recheck => Lint::Recheck, } } @@ -78,7 +90,7 @@ impl Linter { pub(crate) fn format(&self, val: &T) -> String { debug_assert!( - self.0.check(val), + matches!(self.0.check(val), Warning::Warn), "tried to format a warning for a {} that did not have that warning!", std::any::type_name::() ); @@ -90,6 +102,21 @@ impl Linter { } } +/// A result for a linter check +pub(crate) enum Lint { + /// No warning applies to the entity + Ok, + + /// The cloned instance of `Self` should be held by the entity that + /// generated the warning, so that it can be formatted. Holding the clone of + /// `Self` will increment the warning count for that entity. + Warning(Linter), + + /// The lint should be rechecked as the conditions to allow for checking are + /// not satisfied yet + Recheck, +} + #[derive(Clone, Debug)] pub(crate) struct SelfWakePercent { min_percent: u64, @@ -120,9 +147,13 @@ impl Warn for SelfWakePercent { self.description.as_str() } - fn check(&self, task: &Task) -> bool { + fn check(&self, task: &Task) -> Warning { let self_wakes = task.self_wake_percent(); - self_wakes > self.min_percent + if self_wakes > self.min_percent { + Warning::Warn + } else { + Warning::Ok + } } fn format(&self, task: &Task) -> String { @@ -142,11 +173,76 @@ impl Warn for LostWaker { "tasks have lost their waker" } - fn check(&self, task: &Task) -> bool { - !task.is_completed() && task.waker_count() == 0 && !task.is_running() && !task.is_awakened() + fn check(&self, task: &Task) -> Warning { + if !task.is_completed() + && task.waker_count() == 0 + && !task.is_running() + && !task.is_awakened() + { + Warning::Warn + } else { + Warning::Ok + } } fn format(&self, _: &Task) -> String { "This task has lost its waker, and will never be woken again.".into() } } + +/// Warning for if a task has never yielded +#[derive(Clone, Debug)] +pub(crate) struct NeverYielded { + min_duration: Duration, + description: String, +} + +impl NeverYielded { + pub(crate) const DEFAULT_DURATION: Duration = Duration::from_secs(1); + pub(crate) fn new(min_duration: Duration) -> Self { + Self { + min_duration, + description: format!( + "tasks have never yielded (threshold {}ms)", + min_duration.as_millis() + ), + } + } +} + +impl Default for NeverYielded { + fn default() -> Self { + Self::new(Self::DEFAULT_DURATION) + } +} + +impl Warn for NeverYielded { + fn summary(&self) -> &str { + self.description.as_str() + } + + fn check(&self, task: &Task) -> Warning { + // Don't fire warning for tasks that are waiting to run + if task.state() != TaskState::Running { + return Warning::Ok; + } + + if task.total_polls() > 1 { + return Warning::Ok; + } + + // Avoid short-lived task false positives + if task.busy(SystemTime::now()) >= self.min_duration { + Warning::Warn + } else { + Warning::Recheck + } + } + + fn format(&self, task: &Task) -> String { + format!( + "This task has never yielded ({:?})", + task.busy(SystemTime::now()), + ) + } +}