Skip to content

Commit

Permalink
feat(console): add warning for tasks that never yield (#439)
Browse files Browse the repository at this point in the history
feat(console): add warning for tasks that never yield (#439)

Adds a new warning for tasks that have never yielded.

This is more complicated than it sounds, because under the previous
behavior, tasks are only linted when they are created or updated. A task
that never yields doesn't get updated, so if it gets linted before the
"never yielded" threshold is reached (defaults to 1 second), the warning
returns false and the task won't get linted again.

To solve this problem, warnings can now return a response that requests
to be rechecked later. In practice, this later is on the next update
cycle, at which point the task may again request to be rechecked later.

For this warning, the overhead will likely be only a single recheck
after task creation (and the fact that we need to store those task IDs
between updates). We will have to consider the performance impact of any
new warnings which may request rechecks as they are added.

Co-authored-by: Hayden Stainsby <hds@caffeineconcepts.com>
  • Loading branch information
2 people authored and hawkw committed Sep 29, 2023
1 parent 362bdbe commit d05fa9e
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 28 deletions.
21 changes: 21 additions & 0 deletions console-subscriber/examples/app.rs
Expand Up @@ -11,6 +11,7 @@ OPTIONS:
blocks Includes a (misbehaving) blocking task
burn Includes a (misbehaving) task that spins CPU with self-wakes
coma Includes a (misbehaving) task that forgets to register a waker
noyield Includes a (misbehaving) task that spawns tasks that never yield
"#;

#[tokio::main]
Expand Down Expand Up @@ -38,6 +39,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.spawn(burn(1, 10))
.unwrap();
}
"noyield" => {
tokio::task::Builder::new()
.name("noyield")
.spawn(no_yield(20))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
return Ok(());
Expand Down Expand Up @@ -114,3 +121,17 @@ async fn burn(min: u64, max: u64) {
}
}
}

#[tracing::instrument]
async fn no_yield(seconds: u64) {
loop {
let handle = tokio::task::Builder::new()
.name("greedy")
.spawn(async move {
std::thread::sleep(Duration::from_secs(seconds));
})
.expect("Couldn't spawn greedy task");

_ = handle.await;
}
}
1 change: 1 addition & 0 deletions tokio-console/src/main.rs
Expand Up @@ -65,6 +65,7 @@ async fn main() -> color_eyre::Result<()> {
.with_task_linters(vec![
warnings::Linter::new(warnings::SelfWakePercent::default()),
warnings::Linter::new(warnings::LostWaker),
warnings::Linter::new(warnings::NeverYielded::default()),
])
.with_retain_for(retain_for);
let mut input = input::EventStream::new();
Expand Down
50 changes: 42 additions & 8 deletions tokio-console/src/state/tasks.rs
Expand Up @@ -9,13 +9,13 @@ use crate::{
},
util::Percentage,
view,
warnings::Linter,
warnings::{Lint, Linter},
};
use console_api as proto;
use ratatui::{style::Color, text::Span};
use std::{
cell::RefCell,
collections::HashMap,
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
rc::{Rc, Weak},
time::{Duration, SystemTime},
Expand All @@ -24,6 +24,7 @@ use std::{
#[derive(Default, Debug)]
pub(crate) struct TasksState {
tasks: Store<Task>,
pending_lint: HashSet<Id<Task>>,
pub(crate) linters: Vec<Linter<Task>>,
dropped_events: u64,
}
Expand Down Expand Up @@ -145,6 +146,9 @@ impl TasksState {
let mut stats_update = update.stats_update;
let linters = &self.linters;

// Gathers the tasks that need to be linted again on the next update cycle
let mut next_pending_lint = HashSet::new();

self.tasks
.insert_with(visibility, update.new_tasks, |ids, mut task| {
if task.id.is_none() {
Expand Down Expand Up @@ -217,15 +221,30 @@ impl TasksState {
warnings: Vec::new(),
location,
};
task.lint(linters);
if let TaskLintResult::RequiresRecheck = task.lint(linters) {
next_pending_lint.insert(task.id);
}
Some((id, task))
});

for (stats, mut task) in self.tasks.updated(stats_update) {
tracing::trace!(?task, ?stats, "processing stats update for");
task.stats = stats.into();
task.lint(linters);
match task.lint(linters) {
TaskLintResult::RequiresRecheck => next_pending_lint.insert(task.id),
// Avoid linting this task again this cycle
_ => self.pending_lint.remove(&task.id),
};
}

for id in &self.pending_lint {
if let Some(task) = self.tasks.get(*id) {
if let TaskLintResult::RequiresRecheck = task.borrow_mut().lint(linters) {
next_pending_lint.insert(*id);
}
}
}
self.pending_lint = next_pending_lint;

self.dropped_events += update.dropped_events;
}
Expand Down Expand Up @@ -430,22 +449,37 @@ impl Task {
&self.warnings[..]
}

fn lint(&mut self, linters: &[Linter<Task>]) {
fn lint(&mut self, linters: &[Linter<Task>]) -> TaskLintResult {
self.warnings.clear();
let mut recheck = false;
for lint in linters {
tracing::debug!(?lint, task = ?self, "checking...");
if let Some(warning) = lint.check(self) {
tracing::info!(?warning, task = ?self, "found a warning!");
self.warnings.push(warning)
match lint.check(self) {
Lint::Warning(warning) => {
tracing::info!(?warning, task = ?self, "found a warning!");
self.warnings.push(warning);
}
Lint::Ok => {}
Lint::Recheck => recheck = true,
}
}
if recheck {
TaskLintResult::RequiresRecheck
} else {
TaskLintResult::Linted
}
}

pub(crate) fn location(&self) -> &str {
&self.location
}
}

enum TaskLintResult {
Linted,
RequiresRecheck,
}

impl From<proto::tasks::Stats> for TaskStats {
fn from(pb: proto::tasks::Stats) -> Self {
let created_at = pb
Expand Down
136 changes: 116 additions & 20 deletions tokio-console/src/warnings.rs
@@ -1,14 +1,18 @@
use crate::state::tasks::Task;
use std::{fmt::Debug, rc::Rc};
use crate::state::tasks::{Task, TaskState};
use std::{
fmt::Debug,
rc::Rc,
time::{Duration, SystemTime},
};

/// A warning for a particular type of monitored entity (e.g. task or resource).
///
/// This trait implements the logic for detecting a particular warning, and
/// generating a warning message describing it. The [`Linter`] type wraps an
/// instance of this trait to track active instances of the warning.
pub trait Warn<T>: Debug {
/// Returns `true` if the warning applies to `val`.
fn check(&self, val: &T) -> bool;
/// Returns if the warning applies to `val`.
fn check(&self, val: &T) -> Warning;

/// Formats a description of the warning detected for a *specific* `val`.
///
Expand Down Expand Up @@ -46,6 +50,19 @@ pub trait Warn<T>: Debug {
fn summary(&self) -> &str;
}

/// A result for a warning check
pub enum Warning {
/// No warning for this entity.
Ok,

/// A warning has been detected for this entity.
Warn,

/// The warning should be rechecked as the conditions to allow for checking
/// are not satisfied yet
Recheck,
}

#[derive(Debug)]
pub(crate) struct Linter<T>(Rc<dyn Warn<T>>);

Expand All @@ -57,17 +74,12 @@ impl<T> Linter<T> {
Self(Rc::new(warning))
}

/// Checks if the warning applies to a particular entity, returning a clone
/// of `Self` if it does.
///
/// The cloned instance of `Self` should be held by the entity that
/// generated the warning, so that it can be formatted. Holding the clone of
/// `Self` will increment the warning count for that entity.
pub(crate) fn check(&self, val: &T) -> Option<Self> {
if self.0.check(val) {
Some(Self(self.0.clone()))
} else {
None
/// Checks if the warning applies to a particular entity
pub(crate) fn check(&self, val: &T) -> Lint<T> {
match self.0.check(val) {
Warning::Ok => Lint::Ok,
Warning::Warn => Lint::Warning(Self(self.0.clone())),
Warning::Recheck => Lint::Recheck,
}
}

Expand All @@ -78,7 +90,7 @@ impl<T> Linter<T> {

pub(crate) fn format(&self, val: &T) -> String {
debug_assert!(
self.0.check(val),
matches!(self.0.check(val), Warning::Warn),
"tried to format a warning for a {} that did not have that warning!",
std::any::type_name::<T>()
);
Expand All @@ -90,6 +102,21 @@ impl<T> Linter<T> {
}
}

/// A result for a linter check
pub(crate) enum Lint<T> {
/// No warning applies to the entity
Ok,

/// The cloned instance of `Self` should be held by the entity that
/// generated the warning, so that it can be formatted. Holding the clone of
/// `Self` will increment the warning count for that entity.
Warning(Linter<T>),

/// The lint should be rechecked as the conditions to allow for checking are
/// not satisfied yet
Recheck,
}

#[derive(Clone, Debug)]
pub(crate) struct SelfWakePercent {
min_percent: u64,
Expand Down Expand Up @@ -120,9 +147,13 @@ impl Warn<Task> for SelfWakePercent {
self.description.as_str()
}

fn check(&self, task: &Task) -> bool {
fn check(&self, task: &Task) -> Warning {
let self_wakes = task.self_wake_percent();
self_wakes > self.min_percent
if self_wakes > self.min_percent {
Warning::Warn
} else {
Warning::Ok
}
}

fn format(&self, task: &Task) -> String {
Expand All @@ -142,11 +173,76 @@ impl Warn<Task> for LostWaker {
"tasks have lost their waker"
}

fn check(&self, task: &Task) -> bool {
!task.is_completed() && task.waker_count() == 0 && !task.is_running() && !task.is_awakened()
fn check(&self, task: &Task) -> Warning {
if !task.is_completed()
&& task.waker_count() == 0
&& !task.is_running()
&& !task.is_awakened()
{
Warning::Warn
} else {
Warning::Ok
}
}

fn format(&self, _: &Task) -> String {
"This task has lost its waker, and will never be woken again.".into()
}
}

/// Warning for if a task has never yielded
#[derive(Clone, Debug)]
pub(crate) struct NeverYielded {
min_duration: Duration,
description: String,
}

impl NeverYielded {
pub(crate) const DEFAULT_DURATION: Duration = Duration::from_secs(1);
pub(crate) fn new(min_duration: Duration) -> Self {
Self {
min_duration,
description: format!(
"tasks have never yielded (threshold {}ms)",
min_duration.as_millis()
),
}
}
}

impl Default for NeverYielded {
fn default() -> Self {
Self::new(Self::DEFAULT_DURATION)
}
}

impl Warn<Task> for NeverYielded {
fn summary(&self) -> &str {
self.description.as_str()
}

fn check(&self, task: &Task) -> Warning {
// Don't fire warning for tasks that are waiting to run
if task.state() != TaskState::Running {
return Warning::Ok;
}

if task.total_polls() > 1 {
return Warning::Ok;
}

// Avoid short-lived task false positives
if task.busy(SystemTime::now()) >= self.min_duration {
Warning::Warn
} else {
Warning::Recheck
}
}

fn format(&self, task: &Task) -> String {
format!(
"This task has never yielded ({:?})",
task.busy(SystemTime::now()),
)
}
}

0 comments on commit d05fa9e

Please sign in to comment.