Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(console): add warning for tasks that never yield #439

Merged
merged 17 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions console-subscriber/examples/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ OPTIONS:
blocks Includes a (misbehaving) blocking task
burn Includes a (misbehaving) task that spins CPU with self-wakes
coma Includes a (misbehaving) task that forgets to register a waker
noyield Includes a (misbehaving) task that spawns tasks that never yield
"#;

#[tokio::main]
Expand Down Expand Up @@ -38,6 +39,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
.spawn(burn(1, 10))
.unwrap();
}
"noyield" => {
tokio::task::Builder::new()
.name("noyield")
.spawn(no_yield(20))
.unwrap();
}
"help" | "-h" => {
eprintln!("{}", HELP);
return Ok(());
Expand Down Expand Up @@ -114,3 +121,17 @@ async fn burn(min: u64, max: u64) {
}
}
}

#[tracing::instrument]
async fn no_yield(seconds: u64) {
loop {
let handle = tokio::task::Builder::new()
.name("greedy")
.spawn(async move {
std::thread::sleep(Duration::from_secs(seconds));
})
.expect("Couldn't spawn greedy task");

_ = handle.await;
}
}
1 change: 1 addition & 0 deletions tokio-console/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ async fn main() -> color_eyre::Result<()> {
.with_task_linters(vec![
warnings::Linter::new(warnings::SelfWakePercent::default()),
warnings::Linter::new(warnings::LostWaker),
warnings::Linter::new(warnings::NeverYielded::default()),
])
.with_retain_for(retain_for);
let mut input = input::EventStream::new();
Expand Down
54 changes: 46 additions & 8 deletions tokio-console/src/state/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ use crate::{
},
util::Percentage,
view,
warnings::Linter,
warnings::{Lint, Linter},
};
use console_api as proto;
use ratatui::{style::Color, text::Span};
use std::{
cell::RefCell,
collections::HashMap,
collections::{HashMap, HashSet},
convert::{TryFrom, TryInto},
rc::{Rc, Weak},
time::{Duration, SystemTime},
Expand All @@ -24,6 +24,7 @@ use std::{
#[derive(Default, Debug)]
pub(crate) struct TasksState {
tasks: Store<Task>,
pending_lint: HashSet<Id<Task>>,
pub(crate) linters: Vec<Linter<Task>>,
dropped_events: u64,
}
Expand Down Expand Up @@ -145,6 +146,9 @@ impl TasksState {
let mut stats_update = update.stats_update;
let linters = &self.linters;

// Gathers the tasks that need to be linted again on the next update cycle
let mut next_pending_lint = HashSet::new();

self.tasks
.insert_with(visibility, update.new_tasks, |ids, mut task| {
if task.id.is_none() {
Expand Down Expand Up @@ -217,16 +221,35 @@ impl TasksState {
warnings: Vec::new(),
location,
};
task.lint(linters);
if matches!(task.lint(linters), TaskLintResult::RequiresRecheck) {
next_pending_lint.insert(task.id);
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
Some((id, task))
});

for (stats, mut task) in self.tasks.updated(stats_update) {
tracing::trace!(?task, ?stats, "processing stats update for");
task.stats = stats.into();
task.lint(linters);
if matches!(task.lint(linters), TaskLintResult::RequiresRecheck) {
next_pending_lint.insert(task.id);
} else {
// Avoid linting this task again this cycle
self.pending_lint.remove(&task.id);
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}

for id in &self.pending_lint {
if let Some(task) = self.tasks.get(*id) {
if matches!(
task.borrow_mut().lint(linters),
TaskLintResult::RequiresRecheck
) {
next_pending_lint.insert(*id);
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved
}
}
self.pending_lint = next_pending_lint;

self.dropped_events += update.dropped_events;
}

Expand Down Expand Up @@ -430,22 +453,37 @@ impl Task {
&self.warnings[..]
}

fn lint(&mut self, linters: &[Linter<Task>]) {
fn lint(&mut self, linters: &[Linter<Task>]) -> TaskLintResult {
self.warnings.clear();
let mut recheck = false;
for lint in linters {
tracing::debug!(?lint, task = ?self, "checking...");
if let Some(warning) = lint.check(self) {
tracing::info!(?warning, task = ?self, "found a warning!");
self.warnings.push(warning)
match lint.check(self) {
Lint::Warning(warning) => {
tracing::info!(?warning, task = ?self, "found a warning!");
self.warnings.push(warning);
}
Lint::Ok => {}
Lint::Recheck => recheck = true,
}
}
if recheck {
TaskLintResult::RequiresRecheck
} else {
TaskLintResult::Linted
}
}

pub(crate) fn location(&self) -> &str {
&self.location
}
}

enum TaskLintResult {
Linted,
RequiresRecheck,
}

impl From<proto::tasks::Stats> for TaskStats {
fn from(pb: proto::tasks::Stats) -> Self {
let created_at = pb
Expand Down
136 changes: 116 additions & 20 deletions tokio-console/src/warnings.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::state::tasks::Task;
use std::{fmt::Debug, rc::Rc};
use crate::state::tasks::{Task, TaskState};
use std::{
fmt::Debug,
rc::Rc,
time::{Duration, SystemTime},
};

/// A warning for a particular type of monitored entity (e.g. task or resource).
///
/// This trait implements the logic for detecting a particular warning, and
/// generating a warning message describing it. The [`Linter`] type wraps an
/// instance of this trait to track active instances of the warning.
pub trait Warn<T>: Debug {
/// Returns `true` if the warning applies to `val`.
fn check(&self, val: &T) -> bool;
/// Returns if the warning applies to `val`.
fn check(&self, val: &T) -> Warning;

/// Formats a description of the warning detected for a *specific* `val`.
///
Expand Down Expand Up @@ -57,17 +61,12 @@ impl<T> Linter<T> {
Self(Rc::new(warning))
}

/// Checks if the warning applies to a particular entity, returning a clone
/// of `Self` if it does.
///
/// The cloned instance of `Self` should be held by the entity that
/// generated the warning, so that it can be formatted. Holding the clone of
/// `Self` will increment the warning count for that entity.
pub(crate) fn check(&self, val: &T) -> Option<Self> {
if self.0.check(val) {
Some(Self(self.0.clone()))
} else {
None
/// Checks if the warning applies to a particular entity
pub(crate) fn check(&self, val: &T) -> Lint<T> {
match self.0.check(val) {
Warning::Ok => Lint::Ok,
Warning::Warn => Lint::Warning(Self(self.0.clone())),
Warning::Recheck => Lint::Recheck,
}
}

Expand All @@ -78,7 +77,7 @@ impl<T> Linter<T> {

pub(crate) fn format(&self, val: &T) -> String {
debug_assert!(
self.0.check(val),
matches!(self.0.check(val), Warning::Ok),
jefftt marked this conversation as resolved.
Show resolved Hide resolved
"tried to format a warning for a {} that did not have that warning!",
std::any::type_name::<T>()
);
Expand All @@ -90,6 +89,34 @@ impl<T> Linter<T> {
}
}

/// A result for a linter check
pub(crate) enum Lint<T> {
/// No warning applies to the entity
Ok,

/// The cloned instance of `Self` should be held by the entity that
/// generated the warning, so that it can be formatted. Holding the clone of
/// `Self` will increment the warning count for that entity.
Warning(Linter<T>),

/// The lint should be rechecked as the conditions to allow for checking are
/// not satisfied yet
Recheck,
}

/// A result for a warning check
pub enum Warning {
/// No warning for this entity.
Ok,

/// A warning has been detected for this entity.
Warn,

/// The warning should be rechecked as the conditions to allow for checking
/// are not satisfied yet
Recheck,
}
jefftt marked this conversation as resolved.
Show resolved Hide resolved

#[derive(Clone, Debug)]
pub(crate) struct SelfWakePercent {
min_percent: u64,
Expand Down Expand Up @@ -120,9 +147,13 @@ impl Warn<Task> for SelfWakePercent {
self.description.as_str()
}

fn check(&self, task: &Task) -> bool {
fn check(&self, task: &Task) -> Warning {
let self_wakes = task.self_wake_percent();
self_wakes > self.min_percent
if self_wakes > self.min_percent {
Warning::Warn
} else {
Warning::Ok
}
}

fn format(&self, task: &Task) -> String {
Expand All @@ -142,11 +173,76 @@ impl Warn<Task> for LostWaker {
"tasks have lost their waker"
}

fn check(&self, task: &Task) -> bool {
!task.is_completed() && task.waker_count() == 0 && !task.is_running() && !task.is_awakened()
fn check(&self, task: &Task) -> Warning {
if !task.is_completed()
&& task.waker_count() == 0
&& !task.is_running()
&& !task.is_awakened()
{
Warning::Warn
} else {
Warning::Ok
}
}

fn format(&self, _: &Task) -> String {
"This task has lost its waker, and will never be woken again.".into()
}
}

/// Warning for if a task has never yielded
#[derive(Clone, Debug)]
pub(crate) struct NeverYielded {
min_duration: Duration,
description: String,
}

impl NeverYielded {
pub(crate) const DEFAULT_DURATION: Duration = Duration::from_secs(1);
pub(crate) fn new(min_duration: Duration) -> Self {
Self {
min_duration,
description: format!(
"tasks have never yielded (threshold {}ms)",
min_duration.as_millis()
),
}
}
}

impl Default for NeverYielded {
fn default() -> Self {
Self::new(Self::DEFAULT_DURATION)
}
}

impl Warn<Task> for NeverYielded {
fn summary(&self) -> &str {
self.description.as_str()
}

fn check(&self, task: &Task) -> Warning {
// Don't fire warning for tasks that are waiting to run
if task.state() != TaskState::Running {
return Warning::Ok;
}

if task.total_polls() > 1 {
return Warning::Ok;
}

// Avoid short-lived task false positives
if task.busy(SystemTime::now()) >= self.min_duration {
Warning::Warn
} else {
Warning::Recheck
}
}

fn format(&self, task: &Task) -> String {
format!(
"This task has never yielded ({:?})",
task.busy(SystemTime::now()),
)
}
}