From 01bf082f851245f7fa3d0ca2312690844a6fcd85 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Tue, 2 Jan 2024 12:33:33 -0500 Subject: [PATCH 01/15] Moved LRTD from https://github.com/tokio-rs/tokio/pull/6256 --- Cargo.toml | 7 ++ src/lib.rs | 8 ++ src/lrtd.rs | 256 ++++++++++++++++++++++++++++++++++++++++++++++++++ tests/lrtd.rs | 185 ++++++++++++++++++++++++++++++++++++ 4 files changed, 456 insertions(+) create mode 100644 src/lrtd.rs create mode 100644 tests/lrtd.rs diff --git a/Cargo.toml b/Cargo.toml index b368b68..ba5c721 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,10 @@ tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" tokio = { version = "1.31.0", features = ["rt", "time", "net"], optional = true } +rand = "0.8.5" + +[target.'cfg(unix)'.dependencies] +libc = { version = "0.2.149" } [dev-dependencies] axum = "0.6" @@ -33,6 +37,9 @@ serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" tokio = { version = "1.26.0", features = ["full", "rt", "time", "macros", "test-util"] } +[target.'cfg(unix)'.dev-dependencies] +libc = { version = "0.2.149"} + [[example]] name = "runtime" required-features = ["rt"] diff --git a/src/lib.rs b/src/lib.rs index 23d02cd..d7e2542 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,3 +120,11 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; + +#[cfg(unix)] +pub mod lrtd; +#[cfg(unix)] +pub use lrtd::{ + LongRunningTaskDetector, + BlockingActionHandler +}; \ No newline at end of file diff --git a/src/lrtd.rs b/src/lrtd.rs new file mode 100644 index 0000000..85b3c3d --- /dev/null +++ b/src/lrtd.rs @@ -0,0 +1,256 @@ +//! Utility to help with "really nice to add a warning for tasks that might be blocking" +use libc; +use rand::thread_rng; +use rand::Rng; +use std::collections::HashSet; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::time::Duration; +use std::{env, thread}; +use tokio::runtime::{Builder, Runtime}; + +const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); + +fn get_panic_worker_block_duration() -> Duration { + let duration_str = env::var("MY_DURATION_ENV").unwrap_or_else(|_| "60".to_string()); + duration_str + .parse::() + .map(Duration::from_secs) + .unwrap_or(PANIC_WORKER_BLOCK_DURATION_DEFAULT) +} + +fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } +} + +/// A trait for handling actions when blocking is detected. +/// +/// This trait provides a method for handling the detection of a blocking action. +pub trait BlockingActionHandler: Send + Sync { + /// Called when a blocking action is detected and prior to thread signaling. + /// + /// # Arguments + /// + /// * `workers` - The list of thread IDs of the tokio runtime worker threads. /// # Returns + /// + fn blocking_detected(&self, workers: &[libc::pthread_t]); +} + +struct StdErrBlockingActionHandler; + +/// BlockingActionHandler implementation that writes blocker details to standard error. +impl BlockingActionHandler for StdErrBlockingActionHandler { + fn blocking_detected(&self, workers: &[libc::pthread_t]) { + eprintln!("Detected blocking in worker threads: {:?}", workers); + } +} + +#[derive(Debug)] +struct WorkerSet { + inner: Mutex>, +} + +impl WorkerSet { + fn new() -> Self { + WorkerSet { + inner: Mutex::new(HashSet::new()), + } + } + + fn add(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.insert(pid); + } + + fn remove(&self, pid: libc::pthread_t) { + let mut set = self.inner.lock().unwrap(); + set.remove(&pid); + } + + fn get_all(&self) -> Vec { + let set = self.inner.lock().unwrap(); + set.iter().cloned().collect() + } +} + +/// Utility to help with "really nice to add a warning for tasks that might be blocking" +#[derive(Debug)] +pub struct LongRunningTaskDetector { + interval: Duration, + detection_time: Duration, + stop_flag: Arc>, + workers: Arc, +} + +async fn do_nothing(tx: mpsc::Sender<()>) { + // signal I am done + tx.send(()).unwrap(); +} + +fn probe( + tokio_runtime: &Arc, + detection_time: Duration, + workers: &Arc, + action: &Arc, +) { + let (tx, rx) = mpsc::channel(); + let _nothing_handle = tokio_runtime.spawn(do_nothing(tx)); + let is_probe_success = match rx.recv_timeout(detection_time) { + Ok(_result) => true, + Err(_) => false, + }; + if !is_probe_success { + let targets = workers.get_all(); + action.blocking_detected(&targets); + rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); + } +} + +/// Utility to help with "really nice to add a warning for tasks that might be blocking" +/// Example use: +/// ``` +/// use std::sync::Arc; +/// use tokio::runtime::lrtd::LongRunningTaskDetector; +/// +/// let mut builder = tokio::runtime::Builder::new_multi_thread(); +/// let mutable_builder = builder.worker_threads(2); +/// let lrtd = LongRunningTaskDetector::new( +/// std::time::Duration::from_millis(10), +/// std::time::Duration::from_millis(100), +/// mutable_builder, +/// ); +/// let runtime = builder.enable_all().build().unwrap(); +/// let arc_runtime = Arc::new(runtime); +/// let arc_runtime2 = arc_runtime.clone(); +/// lrtd.start(arc_runtime); +/// arc_runtime2.block_on(async { +/// print!("my async code") +/// }); +/// +/// ``` +/// +/// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. +/// The detail will look like: +/// +/// ```text +/// Detected blocking in worker threads: [123145318232064, 123145320341504] +/// ``` +/// +/// To get more details(like stack traces) start LongRunningTaskDetector with start_with_custom_action and provide a custom handler. +/// +impl LongRunningTaskDetector { + /// Creates a new `LongRunningTaskDetector` instance. + /// + /// # Arguments + /// + /// * `interval` - The interval between probes. This interval is randomized. + /// * `detection_time` - The maximum time allowed for a probe to succeed. + /// A probe running for longer indicates something is blocking the worker threads. + /// * `runtime_builder` - A mutable reference to a `tokio::runtime::Builder`. + /// + /// # Returns + /// + /// Returns a new `LongRunningTaskDetector` instance. + pub fn new( + interval: Duration, + detection_time: Duration, + current_threaded: bool, + ) -> (Self, Builder) { + let workers = Arc::new(WorkerSet::new()); + if current_threaded { + workers.add(get_thread_id()); + let runtime_builder = tokio::runtime::Builder::new_current_thread(); + ( + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(true)), + workers, + }, + runtime_builder, + ) + } else { + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + let workers_clone = Arc::clone(&workers); + let workers_clone2 = Arc::clone(&workers); + runtime_builder + .on_thread_start(move || { + let pid = get_thread_id(); + workers_clone.add(pid); + }) + .on_thread_stop(move || { + let pid = get_thread_id(); + workers_clone2.remove(pid); + }); + ( + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(true)), + workers, + }, + runtime_builder, + ) + } + } + + pub fn new_single_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { + LongRunningTaskDetector::new(interval, detection_time, true) + } + + pub fn new_multi_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { + LongRunningTaskDetector::new(interval, detection_time, false) + } + + /// Starts the monitoring thread with default action handlers (that write details to std err). + /// + /// # Arguments + /// + /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. + pub fn start(&self, runtime: Arc) { + self.start_with_custom_action(runtime, Arc::new(StdErrBlockingActionHandler)) + } + + /// Starts the monitoring process with custom action handlers that + /// allow you to customize what happens when blocking is detected. + /// + /// # Arguments + /// + /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. + /// * `action` - An `Arc` reference to a custom `BlockingActionHandler`. + /// * `thread_action` - An `Arc` reference to a custom `ThreadStateHandler`. + pub fn start_with_custom_action( + &self, + runtime: Arc, + action: Arc, + ) { + *self.stop_flag.lock().unwrap() = false; + let stop_flag = Arc::clone(&self.stop_flag); + let detection_time = self.detection_time; + let interval = self.interval; + let workers = Arc::clone(&self.workers); + thread::spawn(move || { + let mut rng = thread_rng(); + while !*stop_flag.lock().unwrap() { + probe(&runtime, detection_time, &workers, &action); + thread::sleep(Duration::from_micros( + rng.gen_range(10..=interval.as_micros().try_into().unwrap()), + )); + } + }); + } + + /// Stops the monitoring thread. Does nothing if LRTD is already stopped. + pub fn stop(&self) { + let mut sf = self.stop_flag.lock().unwrap(); + if !(*sf) { + *sf = true; + } + } +} + +impl Drop for LongRunningTaskDetector { + fn drop(&mut self) { + self.stop(); + } +} diff --git a/tests/lrtd.rs b/tests/lrtd.rs new file mode 100644 index 0000000..b087fbc --- /dev/null +++ b/tests/lrtd.rs @@ -0,0 +1,185 @@ +#![cfg(unix)] +mod lrtd_tests { + use std::backtrace::Backtrace; + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::{Duration, Instant}; + use tokio_metrics::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } + + #[test] + fn test_blocking_detection_multi() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.worker_threads(2).enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + println!("Done"); + }); + } + + #[test] + fn test_blocking_detection_current() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_single_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Done"); + }); + } + + #[test] + fn test_blocking_detection_stop_unstarted() { + let (_lrtd, _builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + } + + fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } + } + + static SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); + + static THREAD_DUMPS: Mutex>> = Mutex::new(None); + + extern "C" fn signal_handler(_: i32) { + // not signal safe, this needs to be rewritten to avoid mem allocations and use a pre-allocated buffer. + let backtrace = Backtrace::force_capture(); + let name = thread::current() + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or_else(|| "".to_owned()); + let tid = get_thread_id(); + let detail = format!("Stack trace{}:{}\n{}", name, tid, backtrace); + let mut omap = THREAD_DUMPS.lock().unwrap(); + let map = omap.as_mut().unwrap(); + (*map).insert(tid, detail); + SIGNAL_COUNTER.fetch_sub(1, Ordering::SeqCst); + } + + fn install_thread_stack_stace_handler(signal: libc::c_int) { + unsafe { + libc::signal(signal, signal_handler as libc::sighandler_t); + } + } + + static GTI_MUTEX: Mutex<()> = Mutex::new(()); + + /// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. + fn get_thread_info( + signal: libc::c_int, + targets: &[libc::pthread_t], + ) -> HashMap { + let _lock = GTI_MUTEX.lock(); + { + let mut omap = THREAD_DUMPS.lock().unwrap(); + *omap = Some(HashMap::new()); + SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); + } + for thread_id in targets { + let result = unsafe { libc::pthread_kill(*thread_id, signal) }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } + let time_limit = Duration::from_secs(1); + let start_time = Instant::now(); + loop { + let signal_count = SIGNAL_COUNTER.load(Ordering::SeqCst); + if signal_count == 0 { + break; + } + if Instant::now() - start_time >= time_limit { + break; + } + std::thread::sleep(std::time::Duration::from_micros(10)); + } + { + let omap = THREAD_DUMPS.lock().unwrap(); + omap.clone().unwrap() + } + } + + struct DetailedCaptureBlockingActionHandler { + inner: Mutex>>, + } + + impl DetailedCaptureBlockingActionHandler { + fn new() -> Self { + DetailedCaptureBlockingActionHandler { + inner: Mutex::new(None), + } + } + + fn contains_symbol(&self, symbol_name: &str) -> bool { + // Iterate over the frames in the backtrace + let omap = self.inner.lock().unwrap(); + match omap.as_ref() { + Some(map) => { + if map.is_empty() { + false + } else { + let bt_str = map.values().next().unwrap(); + bt_str.contains(symbol_name) + } + } + None => false, + } + } + } + + impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { + fn blocking_detected(&self, workers: &[libc::pthread_t]) { + let mut map = self.inner.lock().unwrap(); + let tinfo = get_thread_info(libc::SIGUSR1, workers); + eprintln!("Blocking detected with details: {:?}", tinfo); + *map = Some(tinfo); + } + } + + #[test] + fn test_blocking_detection_multi_capture_stack_traces() { + install_thread_stack_stace_handler(libc::SIGUSR1); + let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.worker_threads(2).enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + let blocking_action = Arc::new(DetailedCaptureBlockingActionHandler::new()); + let to_assert_blocking = blocking_action.clone(); + lrtd.start_with_custom_action(arc_runtime, blocking_action); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + println!("Done"); + }); + assert!(to_assert_blocking.contains_symbol("std::thread::sleep")); + lrtd.stop() + } +} From 2a020bdc3e066bb84c7727130e7b9553dbae18db Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Tue, 2 Jan 2024 13:37:18 -0500 Subject: [PATCH 02/15] fix docstrings. --- src/lrtd.rs | 84 +++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 65 insertions(+), 19 deletions(-) diff --git a/src/lrtd.rs b/src/lrtd.rs index 85b3c3d..0be03ff 100644 --- a/src/lrtd.rs +++ b/src/lrtd.rs @@ -106,20 +106,19 @@ fn probe( } } -/// Utility to help with "really nice to add a warning for tasks that might be blocking" -/// Example use: +/// Utility to help with detecting blocking in tokio workers. +/// +/// # Example +/// /// ``` /// use std::sync::Arc; -/// use tokio::runtime::lrtd::LongRunningTaskDetector; +/// use tokio_metrics::lrtd::LongRunningTaskDetector; /// -/// let mut builder = tokio::runtime::Builder::new_multi_thread(); -/// let mutable_builder = builder.worker_threads(2); -/// let lrtd = LongRunningTaskDetector::new( +/// let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( /// std::time::Duration::from_millis(10), -/// std::time::Duration::from_millis(100), -/// mutable_builder, +/// std::time::Duration::from_millis(100) /// ); -/// let runtime = builder.enable_all().build().unwrap(); +/// let runtime = builder.worker_threads(2).enable_all().build().unwrap(); /// let arc_runtime = Arc::new(runtime); /// let arc_runtime2 = arc_runtime.clone(); /// lrtd.start(arc_runtime); @@ -130,13 +129,14 @@ fn probe( /// ``` /// /// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. -/// The detail will look like: +/// The detail with default action handler will look like: /// /// ```text /// Detected blocking in worker threads: [123145318232064, 123145320341504] /// ``` /// -/// To get more details(like stack traces) start LongRunningTaskDetector with start_with_custom_action and provide a custom handler. +/// To get more details(like stack traces) start LongRunningTaskDetector with start_with_custom_action and provide a custom handler that can dump the thread stack traces. +/// (see poc in the tests) /// impl LongRunningTaskDetector { /// Creates a new `LongRunningTaskDetector` instance. @@ -146,12 +146,12 @@ impl LongRunningTaskDetector { /// * `interval` - The interval between probes. This interval is randomized. /// * `detection_time` - The maximum time allowed for a probe to succeed. /// A probe running for longer indicates something is blocking the worker threads. - /// * `runtime_builder` - A mutable reference to a `tokio::runtime::Builder`. + /// * `current_threaded` - true for returning a curent thread tokio runtime Builder, flase for a multithreaded one. /// /// # Returns /// /// Returns a new `LongRunningTaskDetector` instance. - pub fn new( + fn new( interval: Duration, detection_time: Duration, current_threaded: bool, @@ -194,19 +194,65 @@ impl LongRunningTaskDetector { } } + /// Creates a new instance of `LongRunningTaskDetector` linked to a single-threaded Tokio runtime. + /// + /// This function takes the `interval` and `detection_time` parameters and initializes a + /// `LongRunningTaskDetector` with a single-threaded Tokio runtime. + /// + /// # Parameters + /// + /// - `interval`: The time interval between probes. + /// - `detection_time`: The maximum blocking time allowed for detecting a long-running task. + /// + /// # Returns + /// + /// Returns a tuple containing the created `LongRunningTaskDetector` instance and the Tokio + /// runtime `Builder` used for configuration. + /// + /// # Example + /// + /// ``` + /// use tokio_metrics::lrtd::LongRunningTaskDetector; + /// use std::time::Duration; + /// + /// let (detector, builder) = LongRunningTaskDetector::new_single_threaded(Duration::from_secs(1), Duration::from_secs(5)); + /// ``` pub fn new_single_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { LongRunningTaskDetector::new(interval, detection_time, true) } + /// Creates a new instance of `LongRunningTaskDetector` linked to a multi-threaded Tokio runtime. + /// + /// This function takes the `interval` and `detection_time` parameters and initializes a + /// `LongRunningTaskDetector` with a multi-threaded Tokio runtime. + /// + /// # Parameters + /// + /// - `interval`: The time interval between probes. + /// - `detection_time`: The maximum blocking time allowed for detecting a long-running task. + /// + /// # Returns + /// + /// Returns a tuple containing the created `LongRunningTaskDetector` instance and the Tokio + /// runtime `Builder` used for configuration. + /// + /// # Example + /// + /// ``` + /// use tokio_metrics::lrtd::LongRunningTaskDetector; + /// use std::time::Duration; + /// + /// let (detector, builder) = LongRunningTaskDetector::new_multi_threaded(Duration::from_secs(1), Duration::from_secs(5)); + /// ``` pub fn new_multi_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { LongRunningTaskDetector::new(interval, detection_time, false) } /// Starts the monitoring thread with default action handlers (that write details to std err). /// - /// # Arguments + /// # Parameters /// - /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. + /// - `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. pub fn start(&self, runtime: Arc) { self.start_with_custom_action(runtime, Arc::new(StdErrBlockingActionHandler)) } @@ -214,11 +260,11 @@ impl LongRunningTaskDetector { /// Starts the monitoring process with custom action handlers that /// allow you to customize what happens when blocking is detected. /// - /// # Arguments + /// # Parameters /// - /// * `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. - /// * `action` - An `Arc` reference to a custom `BlockingActionHandler`. - /// * `thread_action` - An `Arc` reference to a custom `ThreadStateHandler`. + /// - `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. + /// - `action` - An `Arc` reference to a custom `BlockingActionHandler`. + /// - `thread_action` - An `Arc` reference to a custom `ThreadStateHandler`. pub fn start_with_custom_action( &self, runtime: Arc, From 06a42a1ec5c09e95395e153817897ed0f7830611 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Wed, 3 Jan 2024 19:26:04 -0500 Subject: [PATCH 03/15] [add] make lrtd work in non-unix envs. --- src/lib.rs | 8 +------- src/lrtd.rs | 43 ++++++++++++++++++++++++++++++------------- tests/lrtd.rs | 34 ++++++++++++++++++++++++---------- 3 files changed, 55 insertions(+), 30 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d7e2542..6a02c7b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,10 +121,4 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; -#[cfg(unix)] -pub mod lrtd; -#[cfg(unix)] -pub use lrtd::{ - LongRunningTaskDetector, - BlockingActionHandler -}; \ No newline at end of file +pub mod lrtd; \ No newline at end of file diff --git a/src/lrtd.rs b/src/lrtd.rs index 0be03ff..2ca6ce2 100644 --- a/src/lrtd.rs +++ b/src/lrtd.rs @@ -5,6 +5,7 @@ use rand::Rng; use std::collections::HashSet; use std::sync::mpsc; use std::sync::{Arc, Mutex}; +use std::thread::ThreadId; use std::time::Duration; use std::{env, thread}; use tokio::runtime::{Builder, Runtime}; @@ -19,10 +20,28 @@ fn get_panic_worker_block_duration() -> Duration { .unwrap_or(PANIC_WORKER_BLOCK_DURATION_DEFAULT) } +#[cfg(unix)] fn get_thread_id() -> libc::pthread_t { unsafe { libc::pthread_self() } } +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct ThreadInfo { + pub id: ThreadId, + #[cfg(unix)] + pub pthread_id: libc::pthread_t, +} + +impl ThreadInfo { + fn new() -> Self { + ThreadInfo { + id: thread::current().id(), + #[cfg(unix)] + pthread_id: get_thread_id(), + } + } +} + /// A trait for handling actions when blocking is detected. /// /// This trait provides a method for handling the detection of a blocking action. @@ -33,21 +52,21 @@ pub trait BlockingActionHandler: Send + Sync { /// /// * `workers` - The list of thread IDs of the tokio runtime worker threads. /// # Returns /// - fn blocking_detected(&self, workers: &[libc::pthread_t]); + fn blocking_detected(&self, workers: &[ThreadInfo]); } struct StdErrBlockingActionHandler; /// BlockingActionHandler implementation that writes blocker details to standard error. impl BlockingActionHandler for StdErrBlockingActionHandler { - fn blocking_detected(&self, workers: &[libc::pthread_t]) { + fn blocking_detected(&self, workers: &[ThreadInfo]) { eprintln!("Detected blocking in worker threads: {:?}", workers); } } #[derive(Debug)] struct WorkerSet { - inner: Mutex>, + inner: Mutex>, } impl WorkerSet { @@ -57,17 +76,17 @@ impl WorkerSet { } } - fn add(&self, pid: libc::pthread_t) { + fn add(&self, pid: ThreadInfo) { let mut set = self.inner.lock().unwrap(); set.insert(pid); } - fn remove(&self, pid: libc::pthread_t) { + fn remove(&self, pid: ThreadInfo) { let mut set = self.inner.lock().unwrap(); set.remove(&pid); } - fn get_all(&self) -> Vec { + fn get_all(&self) -> Vec { let set = self.inner.lock().unwrap(); set.iter().cloned().collect() } @@ -107,9 +126,9 @@ fn probe( } /// Utility to help with detecting blocking in tokio workers. -/// +/// /// # Example -/// +/// /// ``` /// use std::sync::Arc; /// use tokio_metrics::lrtd::LongRunningTaskDetector; @@ -158,7 +177,7 @@ impl LongRunningTaskDetector { ) -> (Self, Builder) { let workers = Arc::new(WorkerSet::new()); if current_threaded { - workers.add(get_thread_id()); + workers.add(ThreadInfo::new()); let runtime_builder = tokio::runtime::Builder::new_current_thread(); ( LongRunningTaskDetector { @@ -175,12 +194,10 @@ impl LongRunningTaskDetector { let workers_clone2 = Arc::clone(&workers); runtime_builder .on_thread_start(move || { - let pid = get_thread_id(); - workers_clone.add(pid); + workers_clone.add(ThreadInfo::new()); }) .on_thread_stop(move || { - let pid = get_thread_id(); - workers_clone2.remove(pid); + workers_clone2.remove(ThreadInfo::new()); }); ( LongRunningTaskDetector { diff --git a/tests/lrtd.rs b/tests/lrtd.rs index b087fbc..23f75e7 100644 --- a/tests/lrtd.rs +++ b/tests/lrtd.rs @@ -1,12 +1,8 @@ -#![cfg(unix)] mod lrtd_tests { - use std::backtrace::Backtrace; - use std::collections::HashMap; - use std::sync::atomic::{AtomicUsize, Ordering}; - use std::sync::{Arc, Mutex}; + use std::sync::Arc; use std::thread; - use std::time::{Duration, Instant}; - use tokio_metrics::lrtd::{BlockingActionHandler, LongRunningTaskDetector}; + use std::time::Duration; + use tokio_metrics::lrtd::LongRunningTaskDetector; async fn run_blocking_stuff() { println!("slow start"); @@ -56,6 +52,24 @@ mod lrtd_tests { Duration::from_millis(100), ); } +} + +#[cfg(unix)] +mod unix_lrtd_tests { + + use std::backtrace::Backtrace; + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::{Duration, Instant}; + use tokio_metrics::lrtd::{BlockingActionHandler, LongRunningTaskDetector, ThreadInfo}; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } fn get_thread_id() -> libc::pthread_t { unsafe { libc::pthread_self() } @@ -91,7 +105,7 @@ mod lrtd_tests { /// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. fn get_thread_info( signal: libc::c_int, - targets: &[libc::pthread_t], + targets: &[ThreadInfo], ) -> HashMap { let _lock = GTI_MUTEX.lock(); { @@ -100,7 +114,7 @@ mod lrtd_tests { SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); } for thread_id in targets { - let result = unsafe { libc::pthread_kill(*thread_id, signal) }; + let result = unsafe { libc::pthread_kill(thread_id.pthread_id, signal) }; if result != 0 { eprintln!("Error sending signal: {:?}", result); } @@ -152,7 +166,7 @@ mod lrtd_tests { } impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { - fn blocking_detected(&self, workers: &[libc::pthread_t]) { + fn blocking_detected(&self, workers: &[ThreadInfo]) { let mut map = self.inner.lock().unwrap(); let tinfo = get_thread_info(libc::SIGUSR1, workers); eprintln!("Blocking detected with details: {:?}", tinfo); From 2f788007dd0a895f1b4f74ae4e07bf11ff7fdd6e Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Wed, 3 Jan 2024 19:41:20 -0500 Subject: [PATCH 04/15] add rt-multi-thread feature as dependency --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index ba5c721..a999606 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ rt = ["tokio"] tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" -tokio = { version = "1.31.0", features = ["rt", "time", "net"], optional = true } +tokio = { version = "1.31.0", features = ["rt", "time", "net", "rt-multi-thread"], optional = true } rand = "0.8.5" [target.'cfg(unix)'.dependencies] From 911be664fbb52d2000cfae9870b9095c7cb92f5f Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Wed, 3 Jan 2024 19:57:07 -0500 Subject: [PATCH 05/15] Refactor to replace lrtd with detectors --- src/{lrtd.rs => detectors.rs} | 0 src/lib.rs | 2 +- tests/{lrtd.rs => detectors.rs} | 4 ++-- 3 files changed, 3 insertions(+), 3 deletions(-) rename src/{lrtd.rs => detectors.rs} (100%) rename tests/{lrtd.rs => detectors.rs} (97%) diff --git a/src/lrtd.rs b/src/detectors.rs similarity index 100% rename from src/lrtd.rs rename to src/detectors.rs diff --git a/src/lib.rs b/src/lib.rs index 6a02c7b..dc9db9c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,4 +121,4 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; -pub mod lrtd; \ No newline at end of file +pub mod detectors; \ No newline at end of file diff --git a/tests/lrtd.rs b/tests/detectors.rs similarity index 97% rename from tests/lrtd.rs rename to tests/detectors.rs index 23f75e7..616ae0e 100644 --- a/tests/lrtd.rs +++ b/tests/detectors.rs @@ -2,7 +2,7 @@ mod lrtd_tests { use std::sync::Arc; use std::thread; use std::time::Duration; - use tokio_metrics::lrtd::LongRunningTaskDetector; + use tokio_metrics::detectors::LongRunningTaskDetector; async fn run_blocking_stuff() { println!("slow start"); @@ -63,7 +63,7 @@ mod unix_lrtd_tests { use std::sync::{Arc, Mutex}; use std::thread; use std::time::{Duration, Instant}; - use tokio_metrics::lrtd::{BlockingActionHandler, LongRunningTaskDetector, ThreadInfo}; + use tokio_metrics::detectors::{BlockingActionHandler, LongRunningTaskDetector, ThreadInfo}; async fn run_blocking_stuff() { println!("slow start"); From 0fc84d18ce8340464cc4a06954272fba2e8cbc31 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Fri, 5 Jan 2024 20:37:32 -0500 Subject: [PATCH 06/15] implement Fn for trait --- src/detectors.rs | 10 ++++++++++ tests/detectors.rs | 27 +++++++++++++++++++++++++++ 2 files changed, 37 insertions(+) diff --git a/src/detectors.rs b/src/detectors.rs index 2ca6ce2..6203bad 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -55,6 +55,16 @@ pub trait BlockingActionHandler: Send + Sync { fn blocking_detected(&self, workers: &[ThreadInfo]); } +impl BlockingActionHandler for F +where + F: Fn(&[ThreadInfo]) + Send + Sync, +{ + fn blocking_detected(&self, workers: &[ThreadInfo]) { + // Implement the behavior for blocking_detected using the provided function. + // You can call the function here with the given workers. + self(workers); + } +} struct StdErrBlockingActionHandler; /// BlockingActionHandler implementation that writes blocker details to standard error. diff --git a/tests/detectors.rs b/tests/detectors.rs index 616ae0e..9848bc0 100644 --- a/tests/detectors.rs +++ b/tests/detectors.rs @@ -1,4 +1,5 @@ mod lrtd_tests { + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; use std::time::Duration; @@ -52,6 +53,32 @@ mod lrtd_tests { Duration::from_millis(100), ); } + + #[test] + fn test_blocking_detection_lambda() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_single_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + let my_atomic_bool = Arc::new(AtomicBool::new(false)); + let my_atomic_bool2 = my_atomic_bool.clone(); + lrtd.start_with_custom_action( + arc_runtime, + Arc::new(move |workers: &_| { + eprintln!("Blocking: {:?}", workers); + my_atomic_bool.store(true, Ordering::SeqCst); + }), + ); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Done"); + }); + assert!(my_atomic_bool2.load(Ordering::SeqCst)); + } } #[cfg(unix)] From 509edaf17392bffabb43d239375b0a2613ef139d Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 7 Jan 2024 08:08:52 -0500 Subject: [PATCH 07/15] Add getters for ThreadInfo fields. --- src/detectors.rs | 15 +++++++++++++-- tests/detectors.rs | 4 ++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/detectors.rs b/src/detectors.rs index 6203bad..c185cd9 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -27,9 +27,9 @@ fn get_thread_id() -> libc::pthread_t { #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct ThreadInfo { - pub id: ThreadId, + id: ThreadId, #[cfg(unix)] - pub pthread_id: libc::pthread_t, + pthread_id: libc::pthread_t, } impl ThreadInfo { @@ -40,6 +40,17 @@ impl ThreadInfo { pthread_id: get_thread_id(), } } + + // Getter for the id field + pub fn get_id(&self) -> &ThreadId { + &self.id + } + + // Getter for the pthread_id field (only available on Unix) + #[cfg(unix)] + pub fn get_pthread_id(&self) -> &libc::pthread_t { + &self.pthread_id + } } /// A trait for handling actions when blocking is detected. diff --git a/tests/detectors.rs b/tests/detectors.rs index 9848bc0..c2bfcc3 100644 --- a/tests/detectors.rs +++ b/tests/detectors.rs @@ -140,8 +140,8 @@ mod unix_lrtd_tests { *omap = Some(HashMap::new()); SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); } - for thread_id in targets { - let result = unsafe { libc::pthread_kill(thread_id.pthread_id, signal) }; + for thread_info in targets { + let result = unsafe { libc::pthread_kill(*thread_info.get_pthread_id(), signal) }; if result != 0 { eprintln!("Error sending signal: {:?}", result); } From 8b80df1133ada0d9aedafdcac2f0b631ba56f59e Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Mon, 8 Jan 2024 09:58:05 -0500 Subject: [PATCH 08/15] FIX getter names --- src/detectors.rs | 4 ++-- tests/detectors.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/detectors.rs b/src/detectors.rs index c185cd9..50dd115 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -42,13 +42,13 @@ impl ThreadInfo { } // Getter for the id field - pub fn get_id(&self) -> &ThreadId { + pub fn id(&self) -> &ThreadId { &self.id } // Getter for the pthread_id field (only available on Unix) #[cfg(unix)] - pub fn get_pthread_id(&self) -> &libc::pthread_t { + pub fn pthread_id(&self) -> &libc::pthread_t { &self.pthread_id } } diff --git a/tests/detectors.rs b/tests/detectors.rs index c2bfcc3..db22c2b 100644 --- a/tests/detectors.rs +++ b/tests/detectors.rs @@ -141,7 +141,7 @@ mod unix_lrtd_tests { SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); } for thread_info in targets { - let result = unsafe { libc::pthread_kill(*thread_info.get_pthread_id(), signal) }; + let result = unsafe { libc::pthread_kill(*thread_info.pthread_id(), signal) }; if result != 0 { eprintln!("Error sending signal: {:?}", result); } From ac10b3ffb3786bc67bbf17cc32b9e3d4d31b3a08 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Thu, 25 Jan 2024 06:53:07 -0500 Subject: [PATCH 09/15] fix formatting --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index dc9db9c..daf2aa7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,4 +121,4 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; -pub mod detectors; \ No newline at end of file +pub mod detectors; From 0c5c0118d9de55928677c3e6eb1016bdeb7a9448 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Thu, 25 Jan 2024 22:26:33 -0500 Subject: [PATCH 10/15] Fix cargo check --no-default-features --- src/detectors.rs | 4 ++-- src/lib.rs | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/detectors.rs b/src/detectors.rs index 50dd115..10f4dcf 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -199,7 +199,7 @@ impl LongRunningTaskDetector { let workers = Arc::new(WorkerSet::new()); if current_threaded { workers.add(ThreadInfo::new()); - let runtime_builder = tokio::runtime::Builder::new_current_thread(); + let runtime_builder = Builder::new_current_thread(); ( LongRunningTaskDetector { interval, @@ -210,7 +210,7 @@ impl LongRunningTaskDetector { runtime_builder, ) } else { - let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + let mut runtime_builder = Builder::new_multi_thread(); let workers_clone = Arc::clone(&workers); let workers_clone2 = Arc::clone(&workers); runtime_builder diff --git a/src/lib.rs b/src/lib.rs index daf2aa7..4a2cea5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,4 +121,5 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; +#[cfg(all(feature = "rt"))] pub mod detectors; From 348fef012d2368b68a60b9368af7a80963d1842d Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sun, 4 Feb 2024 23:29:32 -0500 Subject: [PATCH 11/15] fix clippy warning --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 4a2cea5..7b35e61 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,5 +121,5 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; -#[cfg(all(feature = "rt"))] +#[cfg(feature = "rt")] pub mod detectors; From e3e8f4f6c6e37d1d2ccc8ac495f62b3ed5229187 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Thu, 8 Feb 2024 18:19:00 -0500 Subject: [PATCH 12/15] FIX doc code. --- src/detectors.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/detectors.rs b/src/detectors.rs index 10f4dcf..9cc6f46 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -152,7 +152,7 @@ fn probe( /// /// ``` /// use std::sync::Arc; -/// use tokio_metrics::lrtd::LongRunningTaskDetector; +/// use tokio_metrics::detectors::LongRunningTaskDetector; /// /// let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( /// std::time::Duration::from_millis(10), @@ -250,7 +250,7 @@ impl LongRunningTaskDetector { /// # Example /// /// ``` - /// use tokio_metrics::lrtd::LongRunningTaskDetector; + /// use tokio_metrics::detectors::LongRunningTaskDetector; /// use std::time::Duration; /// /// let (detector, builder) = LongRunningTaskDetector::new_single_threaded(Duration::from_secs(1), Duration::from_secs(5)); @@ -277,7 +277,7 @@ impl LongRunningTaskDetector { /// # Example /// /// ``` - /// use tokio_metrics::lrtd::LongRunningTaskDetector; + /// use tokio_metrics::detectors::LongRunningTaskDetector; /// use std::time::Duration; /// /// let (detector, builder) = LongRunningTaskDetector::new_multi_threaded(Duration::from_secs(1), Duration::from_secs(5)); From c9a0ba46625d21dce16ec7a3edee7bd76eac12b8 Mon Sep 17 00:00:00 2001 From: Zoltan Farkas Date: Sat, 2 Mar 2024 11:26:08 -0500 Subject: [PATCH 13/15] Implement feedback from @Darksonn --- Cargo.toml | 7 +- src/detectors.rs | 240 +++++++++++++++++++++++++++++---------------- src/lib.rs | 2 +- tests/detectors.rs | 7 +- 4 files changed, 164 insertions(+), 92 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a999606..ed1af78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,17 +16,18 @@ keywords = ["async", "futures", "metrics", "debugging"] [features] default = ["rt"] -rt = ["tokio"] +rt = ["tokio/rt", "tokio/time", "tokio/net"] +detectors = ["tokio/rt", "tokio/time", "tokio/net", "tokio/rt-multi-thread", "rand", "libc"] [dependencies] tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" tokio = { version = "1.31.0", features = ["rt", "time", "net", "rt-multi-thread"], optional = true } -rand = "0.8.5" +rand = { version = "0.8.5", optional = true } [target.'cfg(unix)'.dependencies] -libc = { version = "0.2.149" } +libc = { version = "0.2.149", optional = true } [dev-dependencies] axum = "0.6" diff --git a/src/detectors.rs b/src/detectors.rs index 9cc6f46..af425f8 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -1,5 +1,69 @@ -//! Utility to help with "really nice to add a warning for tasks that might be blocking" -use libc; +//! # Tokio runtime health monitoring detector(s). +//! +//! ## Detecting blocked tokio workers: [`LongRunningTaskDetector`]. +//! +//! Blocking IO operations often end up in tokio workers negatively impacting the ability of tokio runtimes to process +//! async requests. The simplest example of this is the use of [`std::thread::sleep`] instead of [`tokio::time::sleep`] which we use +//! in the unit tests to test this utility. +//! +//! The aim of this utility is to detect these situations. +//! [`LongRunningTaskDetector`] is designed to be very low overhead so that it can be safely be run in production. +//! The overhead of this utility is vconfigurable via probing `interval` parameter. +//! +//! ### [`LongRunningTaskDetector`] Example: +//! +//! ``` +//! use std::sync::Arc; +//! use tokio_metrics::detectors::LongRunningTaskDetector; +//! +//! let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( +//! std::time::Duration::from_millis(10), +//! std::time::Duration::from_millis(100) +//! ); +//! let runtime = builder.worker_threads(2).enable_all().build().unwrap(); +//! let runtime_ref = Arc::new(runtime); +//! let lrtd_runtime_ref = runtime_ref.clone(); +//! lrtd.start(lrtd_runtime_ref); +//! runtime_ref.block_on(async { +//! print!("my async code") +//! }); +//! +//! ``` +//! +//! The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. +//! The detail with default action handler will look like: +//! +//! ```text +//! Detected blocking in worker threads: [ +//! ThreadInfo { id: ThreadId(10), pthread_id: 123145381474304 }, +//! ThreadInfo { id: ThreadId(11), pthread_id: 123145385693184 } +//! ] +//! ``` +//! +//! To get more details(like stack traces) start [`LongRunningTaskDetector`] with [`LongRunningTaskDetector::start_with_custom_action`] +//! and provide a custom handler([`BlockingActionHandler`]) that can dump the thread stack traces. The [`LongRunningTaskDetector`] integration tests +//! include an example implementation that is not signal safe as an example. +//! More detailed blocking can look like: +//! +//! ```text +//! Blocking detected with details: 123145387802624 +//! Stack trace for thread tokio-runtime-worker(123145387802624): +//! ... +//! 5: __sigtramp +//! 6: ___semwait_signal +//! 7: +//! 8: std::sys::pal::unix::thread::Thread::sleep +//! at /rustc/.../library/std/src/sys/pal/unix/thread.rs:243:20 +//! 9: std::thread::sleep +//! at /rustc/.../library/std/src/thread/mod.rs:869:5 +//! 10: detectors::unix_lrtd_tests::run_blocking_stuff::{{closure}} +//! at ./tests/detectors.rs:98:9 +//! 11: tokio::runtime::task::core::Core::poll::{{closure}} +//! ... +//! +//! which will help you easilly identify the blocking operation(s). +//! ``` + use rand::thread_rng; use rand::Rng; use std::collections::HashSet; @@ -32,6 +96,7 @@ pub struct ThreadInfo { pthread_id: libc::pthread_t, } +/// A structure to hold information about a thread, including its platform-specific identifiers. impl ThreadInfo { fn new() -> Self { ThreadInfo { @@ -41,13 +106,14 @@ impl ThreadInfo { } } - // Getter for the id field + /// Returns the id pub fn id(&self) -> &ThreadId { &self.id } - // Getter for the pthread_id field (only available on Unix) + /// Returns the `pthread_id` of this thread. #[cfg(unix)] + #[cfg_attr(docsrs, cfg(unix))] pub fn pthread_id(&self) -> &libc::pthread_t { &self.pthread_id } @@ -113,7 +179,69 @@ impl WorkerSet { } } -/// Utility to help with "really nice to add a warning for tasks that might be blocking" +/// Worker health monitoring detector to help with detecting blocking in tokio workers. +/// +/// Blocking IO operations often end up in tokio workers negatively impacting the ability of tokio runtimes to process +/// async requests. The simplest example of this is the use of [`std::thread::sleep`] instead of [`tokio::time::sleep`] which we use +/// in the unit tests to test this utility. +/// +/// The aim of this utility is to detect these situations. +/// [`LongRunningTaskDetector`] is designed to be very low overhead so that it can be safely be run in production. +/// The overhead of this utility is vconfigurable via probing `interval` parameter. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// use tokio_metrics::detectors::LongRunningTaskDetector; +/// +/// let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( +/// std::time::Duration::from_millis(10), +/// std::time::Duration::from_millis(100) +/// ); +/// let runtime = builder.worker_threads(2).enable_all().build().unwrap(); +/// let runtime_ref = Arc::new(runtime); +/// let lrtd_runtime_ref = runtime_ref.clone(); +/// lrtd.start(lrtd_runtime_ref); +/// runtime_ref.block_on(async { +/// print!("my async code") +/// }); +/// +/// ``` +/// +/// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. +/// The detail with default action handler will look like: +/// +/// ```text +/// Detected blocking in worker threads: [ +/// ThreadInfo { id: ThreadId(10), pthread_id: 123145381474304 }, +/// ThreadInfo { id: ThreadId(11), pthread_id: 123145385693184 } +/// ] +/// ``` +/// +/// To get more details(like stack traces) start [`LongRunningTaskDetector`] with [`LongRunningTaskDetector::start_with_custom_action`] +/// and provide a custom handler([`BlockingActionHandler`]) that can dump the thread stack traces. The [`LongRunningTaskDetector`] integration tests +/// include an example implementation that is not signal safe as an example. +/// More detailed blocking can look like: +/// +/// ```text +/// Blocking detected with details: 123145387802624 +/// Stack trace for thread tokio-runtime-worker(123145387802624): +/// ... +/// 5: __sigtramp +/// 6: ___semwait_signal +/// 7: +/// 8: std::sys::pal::unix::thread::Thread::sleep +/// at /rustc/.../library/std/src/sys/pal/unix/thread.rs:243:20 +/// 9: std::thread::sleep +/// at /rustc/.../library/std/src/thread/mod.rs:869:5 +/// 10: detectors::unix_lrtd_tests::run_blocking_stuff::{{closure}} +/// at ./tests/detectors.rs:98:9 +/// 11: tokio::runtime::task::core::Core::poll::{{closure}} +/// ... +/// +/// which will help you easilly identify the blocking operation(s). +/// ``` #[derive(Debug)] pub struct LongRunningTaskDetector { interval: Duration, @@ -146,51 +274,17 @@ fn probe( } } -/// Utility to help with detecting blocking in tokio workers. -/// -/// # Example -/// -/// ``` -/// use std::sync::Arc; -/// use tokio_metrics::detectors::LongRunningTaskDetector; -/// -/// let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( -/// std::time::Duration::from_millis(10), -/// std::time::Duration::from_millis(100) -/// ); -/// let runtime = builder.worker_threads(2).enable_all().build().unwrap(); -/// let arc_runtime = Arc::new(runtime); -/// let arc_runtime2 = arc_runtime.clone(); -/// lrtd.start(arc_runtime); -/// arc_runtime2.block_on(async { -/// print!("my async code") -/// }); -/// -/// ``` -/// -/// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. -/// The detail with default action handler will look like: -/// -/// ```text -/// Detected blocking in worker threads: [123145318232064, 123145320341504] -/// ``` -/// -/// To get more details(like stack traces) start LongRunningTaskDetector with start_with_custom_action and provide a custom handler that can dump the thread stack traces. -/// (see poc in the tests) -/// impl LongRunningTaskDetector { - /// Creates a new `LongRunningTaskDetector` instance. + /// Creates [`LongRunningTaskDetector`] and a [`tokio::runtime::Builder`]. /// - /// # Arguments + /// The `interval` argument determines the time interval between tokio runtime worker probing. + /// This interval is randomized. /// - /// * `interval` - The interval between probes. This interval is randomized. - /// * `detection_time` - The maximum time allowed for a probe to succeed. - /// A probe running for longer indicates something is blocking the worker threads. - /// * `current_threaded` - true for returning a curent thread tokio runtime Builder, flase for a multithreaded one. + /// The `detection_time` argument determines maximum time allowed for a probe to succeed. + /// A probe running for longer is considered a tokio worker health issue. (something is blocking the worker threads) /// - /// # Returns - /// - /// Returns a new `LongRunningTaskDetector` instance. + /// The `current_threaded` argument if true will result in returning a curent thread tokio runtime Builder, + /// false for a multithreaded one. fn new( interval: Duration, detection_time: Duration, @@ -232,20 +326,13 @@ impl LongRunningTaskDetector { } } - /// Creates a new instance of `LongRunningTaskDetector` linked to a single-threaded Tokio runtime. - /// - /// This function takes the `interval` and `detection_time` parameters and initializes a - /// `LongRunningTaskDetector` with a single-threaded Tokio runtime. + /// Creates [`LongRunningTaskDetector`] and a current threaded [`tokio::runtime::Builder`]. /// - /// # Parameters + /// The `interval` argument determines the time interval between tokio runtime worker probing. + /// This interval is randomized. /// - /// - `interval`: The time interval between probes. - /// - `detection_time`: The maximum blocking time allowed for detecting a long-running task. - /// - /// # Returns - /// - /// Returns a tuple containing the created `LongRunningTaskDetector` instance and the Tokio - /// runtime `Builder` used for configuration. + /// The `detection_time` argument determines maximum time allowed for a probe to succeed. + /// A probe running for longer is considered a tokio worker health issue. (something is blocking the worker threads) /// /// # Example /// @@ -253,26 +340,19 @@ impl LongRunningTaskDetector { /// use tokio_metrics::detectors::LongRunningTaskDetector; /// use std::time::Duration; /// - /// let (detector, builder) = LongRunningTaskDetector::new_single_threaded(Duration::from_secs(1), Duration::from_secs(5)); + /// let (detector, builder) = LongRunningTaskDetector::new_current_threaded(Duration::from_secs(1), Duration::from_secs(5)); /// ``` - pub fn new_single_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { + pub fn new_current_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { LongRunningTaskDetector::new(interval, detection_time, true) } - /// Creates a new instance of `LongRunningTaskDetector` linked to a multi-threaded Tokio runtime. - /// - /// This function takes the `interval` and `detection_time` parameters and initializes a - /// `LongRunningTaskDetector` with a multi-threaded Tokio runtime. - /// - /// # Parameters - /// - /// - `interval`: The time interval between probes. - /// - `detection_time`: The maximum blocking time allowed for detecting a long-running task. + /// Creates [`LongRunningTaskDetector`] and a multi threaded [`tokio::runtime::Builder`]. /// - /// # Returns + /// The `interval` argument determines the time interval between tokio runtime worker probing. + /// This `interval`` is randomized. /// - /// Returns a tuple containing the created `LongRunningTaskDetector` instance and the Tokio - /// runtime `Builder` used for configuration. + /// The `detection_time` argument determines maximum time allowed for a probe to succeed. + /// A probe running for longer is considered a tokio worker health issue. (something is blocking the worker threads) /// /// # Example /// @@ -286,23 +366,13 @@ impl LongRunningTaskDetector { LongRunningTaskDetector::new(interval, detection_time, false) } - /// Starts the monitoring thread with default action handlers (that write details to std err). - /// - /// # Parameters - /// - /// - `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. + /// Starts the monitoring thread with default action handlers (that write details to std err). pub fn start(&self, runtime: Arc) { self.start_with_custom_action(runtime, Arc::new(StdErrBlockingActionHandler)) } /// Starts the monitoring process with custom action handlers that - /// allow you to customize what happens when blocking is detected. - /// - /// # Parameters - /// - /// - `runtime` - An `Arc` reference to a `tokio::runtime::Runtime`. - /// - `action` - An `Arc` reference to a custom `BlockingActionHandler`. - /// - `thread_action` - An `Arc` reference to a custom `ThreadStateHandler`. + /// allow you to customize what happens when blocking is detected. pub fn start_with_custom_action( &self, runtime: Arc, @@ -324,7 +394,7 @@ impl LongRunningTaskDetector { }); } - /// Stops the monitoring thread. Does nothing if LRTD is already stopped. + /// Stops the monitoring thread. Does nothing if monitoring thread is already stopped. pub fn stop(&self) { let mut sf = self.stop_flag.lock().unwrap(); if !(*sf) { diff --git a/src/lib.rs b/src/lib.rs index 7b35e61..ef846e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -121,5 +121,5 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; -#[cfg(feature = "rt")] +#[cfg(feature = "detectors")] pub mod detectors; diff --git a/tests/detectors.rs b/tests/detectors.rs index db22c2b..9b49ecc 100644 --- a/tests/detectors.rs +++ b/tests/detectors.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "detectors")] mod lrtd_tests { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -31,7 +32,7 @@ mod lrtd_tests { #[test] fn test_blocking_detection_current() { - let (lrtd, mut builder) = LongRunningTaskDetector::new_single_threaded( + let (lrtd, mut builder) = LongRunningTaskDetector::new_current_threaded( Duration::from_millis(10), Duration::from_millis(100), ); @@ -56,7 +57,7 @@ mod lrtd_tests { #[test] fn test_blocking_detection_lambda() { - let (lrtd, mut builder) = LongRunningTaskDetector::new_single_threaded( + let (lrtd, mut builder) = LongRunningTaskDetector::new_current_threaded( Duration::from_millis(10), Duration::from_millis(100), ); @@ -81,7 +82,7 @@ mod lrtd_tests { } } -#[cfg(unix)] +#[cfg(all(unix, feature = "detectors"))] mod unix_lrtd_tests { use std::backtrace::Backtrace; From 2b5887cd8bbd57a5345d259bc207066ba0563038 Mon Sep 17 00:00:00 2001 From: zolyfarkas Date: Sun, 3 Mar 2024 04:48:58 -0500 Subject: [PATCH 14/15] Add detectors-multi-thread feature --- Cargo.toml | 3 +- src/detectors.rs | 91 ++++++++++++++++++---------------------------- tests/detectors.rs | 70 +++++++++++++++++++++-------------- 3 files changed, 80 insertions(+), 84 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ed1af78..426e68e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,8 @@ keywords = ["async", "futures", "metrics", "debugging"] [features] default = ["rt"] rt = ["tokio/rt", "tokio/time", "tokio/net"] -detectors = ["tokio/rt", "tokio/time", "tokio/net", "tokio/rt-multi-thread", "rand", "libc"] +detectors = ["tokio/rt", "tokio/time", "tokio/net", "rand", "libc"] +detectors-multi-thread = ["detectors", "tokio/rt-multi-thread"] [dependencies] tokio-stream = "0.1.11" diff --git a/src/detectors.rs b/src/detectors.rs index af425f8..84a2787 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -113,7 +113,7 @@ impl ThreadInfo { /// Returns the `pthread_id` of this thread. #[cfg(unix)] - #[cfg_attr(docsrs, cfg(unix))] + #[cfg_attr(docsrs, doc(cfg(unix)))] pub fn pthread_id(&self) -> &libc::pthread_t { &self.pthread_id } @@ -275,57 +275,6 @@ fn probe( } impl LongRunningTaskDetector { - /// Creates [`LongRunningTaskDetector`] and a [`tokio::runtime::Builder`]. - /// - /// The `interval` argument determines the time interval between tokio runtime worker probing. - /// This interval is randomized. - /// - /// The `detection_time` argument determines maximum time allowed for a probe to succeed. - /// A probe running for longer is considered a tokio worker health issue. (something is blocking the worker threads) - /// - /// The `current_threaded` argument if true will result in returning a curent thread tokio runtime Builder, - /// false for a multithreaded one. - fn new( - interval: Duration, - detection_time: Duration, - current_threaded: bool, - ) -> (Self, Builder) { - let workers = Arc::new(WorkerSet::new()); - if current_threaded { - workers.add(ThreadInfo::new()); - let runtime_builder = Builder::new_current_thread(); - ( - LongRunningTaskDetector { - interval, - detection_time, - stop_flag: Arc::new(Mutex::new(true)), - workers, - }, - runtime_builder, - ) - } else { - let mut runtime_builder = Builder::new_multi_thread(); - let workers_clone = Arc::clone(&workers); - let workers_clone2 = Arc::clone(&workers); - runtime_builder - .on_thread_start(move || { - workers_clone.add(ThreadInfo::new()); - }) - .on_thread_stop(move || { - workers_clone2.remove(ThreadInfo::new()); - }); - ( - LongRunningTaskDetector { - interval, - detection_time, - stop_flag: Arc::new(Mutex::new(true)), - workers, - }, - runtime_builder, - ) - } - } - /// Creates [`LongRunningTaskDetector`] and a current threaded [`tokio::runtime::Builder`]. /// /// The `interval` argument determines the time interval between tokio runtime worker probing. @@ -343,7 +292,18 @@ impl LongRunningTaskDetector { /// let (detector, builder) = LongRunningTaskDetector::new_current_threaded(Duration::from_secs(1), Duration::from_secs(5)); /// ``` pub fn new_current_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { - LongRunningTaskDetector::new(interval, detection_time, true) + let workers = Arc::new(WorkerSet::new()); + workers.add(ThreadInfo::new()); + let runtime_builder = Builder::new_current_thread(); + ( + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(true)), + workers, + }, + runtime_builder, + ) } /// Creates [`LongRunningTaskDetector`] and a multi threaded [`tokio::runtime::Builder`]. @@ -361,9 +321,30 @@ impl LongRunningTaskDetector { /// use std::time::Duration; /// /// let (detector, builder) = LongRunningTaskDetector::new_multi_threaded(Duration::from_secs(1), Duration::from_secs(5)); - /// ``` + /// ``` + #[cfg(feature = "detectors-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "detectors-multi-thread")))] pub fn new_multi_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { - LongRunningTaskDetector::new(interval, detection_time, false) + let workers = Arc::new(WorkerSet::new()); + let mut runtime_builder = Builder::new_multi_thread(); + let workers_clone = Arc::clone(&workers); + let workers_clone2 = Arc::clone(&workers); + runtime_builder + .on_thread_start(move || { + workers_clone.add(ThreadInfo::new()); + }) + .on_thread_stop(move || { + workers_clone2.remove(ThreadInfo::new()); + }); + ( + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(true)), + workers, + }, + runtime_builder, + ) } /// Starts the monitoring thread with default action handlers (that write details to std err). diff --git a/tests/detectors.rs b/tests/detectors.rs index 9b49ecc..5ec0e5d 100644 --- a/tests/detectors.rs +++ b/tests/detectors.rs @@ -1,5 +1,5 @@ #[cfg(feature = "detectors")] -mod lrtd_tests { +mod lrtd_tests_current { use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread; @@ -12,24 +12,6 @@ mod lrtd_tests { println!("slow done"); } - #[test] - fn test_blocking_detection_multi() { - let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( - Duration::from_millis(10), - Duration::from_millis(100), - ); - let runtime = builder.worker_threads(2).enable_all().build().unwrap(); - let arc_runtime = Arc::new(runtime); - let arc_runtime2 = arc_runtime.clone(); - lrtd.start(arc_runtime); - arc_runtime2.spawn(run_blocking_stuff()); - arc_runtime2.spawn(run_blocking_stuff()); - arc_runtime2.block_on(async { - tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; - println!("Done"); - }); - } - #[test] fn test_blocking_detection_current() { let (lrtd, mut builder) = LongRunningTaskDetector::new_current_threaded( @@ -47,14 +29,6 @@ mod lrtd_tests { }); } - #[test] - fn test_blocking_detection_stop_unstarted() { - let (_lrtd, _builder) = LongRunningTaskDetector::new_multi_threaded( - Duration::from_millis(10), - Duration::from_millis(100), - ); - } - #[test] fn test_blocking_detection_lambda() { let (lrtd, mut builder) = LongRunningTaskDetector::new_current_threaded( @@ -82,7 +56,47 @@ mod lrtd_tests { } } -#[cfg(all(unix, feature = "detectors"))] +#[cfg(feature = "detectors-multi-thread")] +mod lrtd_tests_multi { + use std::sync::Arc; + use std::thread; + use std::time::Duration; + use tokio_metrics::detectors::LongRunningTaskDetector; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } + + #[test] + fn test_blocking_detection_multi() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.worker_threads(2).enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + println!("Done"); + }); + } + + #[test] + fn test_blocking_detection_stop_unstarted() { + let (_lrtd, _builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + } +} + +#[cfg(all(unix, feature = "detectors-multi-thread"))] mod unix_lrtd_tests { use std::backtrace::Backtrace; From 72a9d121cc71ead469bffa477a2e0dcec8cbb45e Mon Sep 17 00:00:00 2001 From: zolyfarkas Date: Sun, 3 Mar 2024 10:18:58 -0500 Subject: [PATCH 15/15] Fix cargo hack unused method reportd in current threaded feature. --- src/detectors.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/detectors.rs b/src/detectors.rs index 84a2787..88fde06 100644 --- a/src/detectors.rs +++ b/src/detectors.rs @@ -168,6 +168,7 @@ impl WorkerSet { set.insert(pid); } + #[cfg(feature = "detectors-multi-thread")] fn remove(&self, pid: ThreadInfo) { let mut set = self.inner.lock().unwrap(); set.remove(&pid);