diff --git a/Cargo.toml b/Cargo.toml index b368b68..426e68e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,13 +16,19 @@ keywords = ["async", "futures", "metrics", "debugging"] [features] default = ["rt"] -rt = ["tokio"] +rt = ["tokio/rt", "tokio/time", "tokio/net"] +detectors = ["tokio/rt", "tokio/time", "tokio/net", "rand", "libc"] +detectors-multi-thread = ["detectors", "tokio/rt-multi-thread"] [dependencies] tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" -tokio = { version = "1.31.0", features = ["rt", "time", "net"], optional = true } +tokio = { version = "1.31.0", features = ["rt", "time", "net", "rt-multi-thread"], optional = true } +rand = { version = "0.8.5", optional = true } + +[target.'cfg(unix)'.dependencies] +libc = { version = "0.2.149", optional = true } [dev-dependencies] axum = "0.6" @@ -33,6 +39,9 @@ serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" tokio = { version = "1.26.0", features = ["full", "rt", "time", "macros", "test-util"] } +[target.'cfg(unix)'.dev-dependencies] +libc = { version = "0.2.149"} + [[example]] name = "runtime" required-features = ["rt"] diff --git a/src/detectors.rs b/src/detectors.rs new file mode 100644 index 0000000..88fde06 --- /dev/null +++ b/src/detectors.rs @@ -0,0 +1,392 @@ +//! # Tokio runtime health monitoring detector(s). +//! +//! ## Detecting blocked tokio workers: [`LongRunningTaskDetector`]. +//! +//! Blocking IO operations often end up in tokio workers negatively impacting the ability of tokio runtimes to process +//! async requests. The simplest example of this is the use of [`std::thread::sleep`] instead of [`tokio::time::sleep`] which we use +//! in the unit tests to test this utility. +//! +//! The aim of this utility is to detect these situations. +//! [`LongRunningTaskDetector`] is designed to be very low overhead so that it can be safely be run in production. +//! The overhead of this utility is vconfigurable via probing `interval` parameter. +//! +//! ### [`LongRunningTaskDetector`] Example: +//! +//! ``` +//! use std::sync::Arc; +//! use tokio_metrics::detectors::LongRunningTaskDetector; +//! +//! let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( +//! std::time::Duration::from_millis(10), +//! std::time::Duration::from_millis(100) +//! ); +//! let runtime = builder.worker_threads(2).enable_all().build().unwrap(); +//! let runtime_ref = Arc::new(runtime); +//! let lrtd_runtime_ref = runtime_ref.clone(); +//! lrtd.start(lrtd_runtime_ref); +//! runtime_ref.block_on(async { +//! print!("my async code") +//! }); +//! +//! ``` +//! +//! The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. +//! The detail with default action handler will look like: +//! +//! ```text +//! Detected blocking in worker threads: [ +//! ThreadInfo { id: ThreadId(10), pthread_id: 123145381474304 }, +//! ThreadInfo { id: ThreadId(11), pthread_id: 123145385693184 } +//! ] +//! ``` +//! +//! To get more details(like stack traces) start [`LongRunningTaskDetector`] with [`LongRunningTaskDetector::start_with_custom_action`] +//! and provide a custom handler([`BlockingActionHandler`]) that can dump the thread stack traces. The [`LongRunningTaskDetector`] integration tests +//! include an example implementation that is not signal safe as an example. +//! More detailed blocking can look like: +//! +//! ```text +//! Blocking detected with details: 123145387802624 +//! Stack trace for thread tokio-runtime-worker(123145387802624): +//! ... +//! 5: __sigtramp +//! 6: ___semwait_signal +//! 7: +//! 8: std::sys::pal::unix::thread::Thread::sleep +//! at /rustc/.../library/std/src/sys/pal/unix/thread.rs:243:20 +//! 9: std::thread::sleep +//! at /rustc/.../library/std/src/thread/mod.rs:869:5 +//! 10: detectors::unix_lrtd_tests::run_blocking_stuff::{{closure}} +//! at ./tests/detectors.rs:98:9 +//! 11: tokio::runtime::task::core::Core::poll::{{closure}} +//! ... +//! +//! which will help you easilly identify the blocking operation(s). +//! ``` + +use rand::thread_rng; +use rand::Rng; +use std::collections::HashSet; +use std::sync::mpsc; +use std::sync::{Arc, Mutex}; +use std::thread::ThreadId; +use std::time::Duration; +use std::{env, thread}; +use tokio::runtime::{Builder, Runtime}; + +const PANIC_WORKER_BLOCK_DURATION_DEFAULT: Duration = Duration::from_secs(60); + +fn get_panic_worker_block_duration() -> Duration { + let duration_str = env::var("MY_DURATION_ENV").unwrap_or_else(|_| "60".to_string()); + duration_str + .parse::() + .map(Duration::from_secs) + .unwrap_or(PANIC_WORKER_BLOCK_DURATION_DEFAULT) +} + +#[cfg(unix)] +fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } +} + +#[derive(Debug, PartialEq, Eq, Hash, Clone)] +pub struct ThreadInfo { + id: ThreadId, + #[cfg(unix)] + pthread_id: libc::pthread_t, +} + +/// A structure to hold information about a thread, including its platform-specific identifiers. +impl ThreadInfo { + fn new() -> Self { + ThreadInfo { + id: thread::current().id(), + #[cfg(unix)] + pthread_id: get_thread_id(), + } + } + + /// Returns the id + pub fn id(&self) -> &ThreadId { + &self.id + } + + /// Returns the `pthread_id` of this thread. + #[cfg(unix)] + #[cfg_attr(docsrs, doc(cfg(unix)))] + pub fn pthread_id(&self) -> &libc::pthread_t { + &self.pthread_id + } +} + +/// A trait for handling actions when blocking is detected. +/// +/// This trait provides a method for handling the detection of a blocking action. +pub trait BlockingActionHandler: Send + Sync { + /// Called when a blocking action is detected and prior to thread signaling. + /// + /// # Arguments + /// + /// * `workers` - The list of thread IDs of the tokio runtime worker threads. /// # Returns + /// + fn blocking_detected(&self, workers: &[ThreadInfo]); +} + +impl BlockingActionHandler for F +where + F: Fn(&[ThreadInfo]) + Send + Sync, +{ + fn blocking_detected(&self, workers: &[ThreadInfo]) { + // Implement the behavior for blocking_detected using the provided function. + // You can call the function here with the given workers. + self(workers); + } +} +struct StdErrBlockingActionHandler; + +/// BlockingActionHandler implementation that writes blocker details to standard error. +impl BlockingActionHandler for StdErrBlockingActionHandler { + fn blocking_detected(&self, workers: &[ThreadInfo]) { + eprintln!("Detected blocking in worker threads: {:?}", workers); + } +} + +#[derive(Debug)] +struct WorkerSet { + inner: Mutex>, +} + +impl WorkerSet { + fn new() -> Self { + WorkerSet { + inner: Mutex::new(HashSet::new()), + } + } + + fn add(&self, pid: ThreadInfo) { + let mut set = self.inner.lock().unwrap(); + set.insert(pid); + } + + #[cfg(feature = "detectors-multi-thread")] + fn remove(&self, pid: ThreadInfo) { + let mut set = self.inner.lock().unwrap(); + set.remove(&pid); + } + + fn get_all(&self) -> Vec { + let set = self.inner.lock().unwrap(); + set.iter().cloned().collect() + } +} + +/// Worker health monitoring detector to help with detecting blocking in tokio workers. +/// +/// Blocking IO operations often end up in tokio workers negatively impacting the ability of tokio runtimes to process +/// async requests. The simplest example of this is the use of [`std::thread::sleep`] instead of [`tokio::time::sleep`] which we use +/// in the unit tests to test this utility. +/// +/// The aim of this utility is to detect these situations. +/// [`LongRunningTaskDetector`] is designed to be very low overhead so that it can be safely be run in production. +/// The overhead of this utility is vconfigurable via probing `interval` parameter. +/// +/// # Example +/// +/// ``` +/// use std::sync::Arc; +/// use tokio_metrics::detectors::LongRunningTaskDetector; +/// +/// let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( +/// std::time::Duration::from_millis(10), +/// std::time::Duration::from_millis(100) +/// ); +/// let runtime = builder.worker_threads(2).enable_all().build().unwrap(); +/// let runtime_ref = Arc::new(runtime); +/// let lrtd_runtime_ref = runtime_ref.clone(); +/// lrtd.start(lrtd_runtime_ref); +/// runtime_ref.block_on(async { +/// print!("my async code") +/// }); +/// +/// ``` +/// +/// The above will allow you to get details on what is blocking your tokio worker threads for longer that 100ms. +/// The detail with default action handler will look like: +/// +/// ```text +/// Detected blocking in worker threads: [ +/// ThreadInfo { id: ThreadId(10), pthread_id: 123145381474304 }, +/// ThreadInfo { id: ThreadId(11), pthread_id: 123145385693184 } +/// ] +/// ``` +/// +/// To get more details(like stack traces) start [`LongRunningTaskDetector`] with [`LongRunningTaskDetector::start_with_custom_action`] +/// and provide a custom handler([`BlockingActionHandler`]) that can dump the thread stack traces. The [`LongRunningTaskDetector`] integration tests +/// include an example implementation that is not signal safe as an example. +/// More detailed blocking can look like: +/// +/// ```text +/// Blocking detected with details: 123145387802624 +/// Stack trace for thread tokio-runtime-worker(123145387802624): +/// ... +/// 5: __sigtramp +/// 6: ___semwait_signal +/// 7: +/// 8: std::sys::pal::unix::thread::Thread::sleep +/// at /rustc/.../library/std/src/sys/pal/unix/thread.rs:243:20 +/// 9: std::thread::sleep +/// at /rustc/.../library/std/src/thread/mod.rs:869:5 +/// 10: detectors::unix_lrtd_tests::run_blocking_stuff::{{closure}} +/// at ./tests/detectors.rs:98:9 +/// 11: tokio::runtime::task::core::Core::poll::{{closure}} +/// ... +/// +/// which will help you easilly identify the blocking operation(s). +/// ``` +#[derive(Debug)] +pub struct LongRunningTaskDetector { + interval: Duration, + detection_time: Duration, + stop_flag: Arc>, + workers: Arc, +} + +async fn do_nothing(tx: mpsc::Sender<()>) { + // signal I am done + tx.send(()).unwrap(); +} + +fn probe( + tokio_runtime: &Arc, + detection_time: Duration, + workers: &Arc, + action: &Arc, +) { + let (tx, rx) = mpsc::channel(); + let _nothing_handle = tokio_runtime.spawn(do_nothing(tx)); + let is_probe_success = match rx.recv_timeout(detection_time) { + Ok(_result) => true, + Err(_) => false, + }; + if !is_probe_success { + let targets = workers.get_all(); + action.blocking_detected(&targets); + rx.recv_timeout(get_panic_worker_block_duration()).unwrap(); + } +} + +impl LongRunningTaskDetector { + /// Creates [`LongRunningTaskDetector`] and a current threaded [`tokio::runtime::Builder`]. + /// + /// The `interval` argument determines the time interval between tokio runtime worker probing. + /// This interval is randomized. + /// + /// The `detection_time` argument determines maximum time allowed for a probe to succeed. + /// A probe running for longer is considered a tokio worker health issue. (something is blocking the worker threads) + /// + /// # Example + /// + /// ``` + /// use tokio_metrics::detectors::LongRunningTaskDetector; + /// use std::time::Duration; + /// + /// let (detector, builder) = LongRunningTaskDetector::new_current_threaded(Duration::from_secs(1), Duration::from_secs(5)); + /// ``` + pub fn new_current_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { + let workers = Arc::new(WorkerSet::new()); + workers.add(ThreadInfo::new()); + let runtime_builder = Builder::new_current_thread(); + ( + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(true)), + workers, + }, + runtime_builder, + ) + } + + /// Creates [`LongRunningTaskDetector`] and a multi threaded [`tokio::runtime::Builder`]. + /// + /// The `interval` argument determines the time interval between tokio runtime worker probing. + /// This `interval`` is randomized. + /// + /// The `detection_time` argument determines maximum time allowed for a probe to succeed. + /// A probe running for longer is considered a tokio worker health issue. (something is blocking the worker threads) + /// + /// # Example + /// + /// ``` + /// use tokio_metrics::detectors::LongRunningTaskDetector; + /// use std::time::Duration; + /// + /// let (detector, builder) = LongRunningTaskDetector::new_multi_threaded(Duration::from_secs(1), Duration::from_secs(5)); + /// ``` + #[cfg(feature = "detectors-multi-thread")] + #[cfg_attr(docsrs, doc(cfg(feature = "detectors-multi-thread")))] + pub fn new_multi_threaded(interval: Duration, detection_time: Duration) -> (Self, Builder) { + let workers = Arc::new(WorkerSet::new()); + let mut runtime_builder = Builder::new_multi_thread(); + let workers_clone = Arc::clone(&workers); + let workers_clone2 = Arc::clone(&workers); + runtime_builder + .on_thread_start(move || { + workers_clone.add(ThreadInfo::new()); + }) + .on_thread_stop(move || { + workers_clone2.remove(ThreadInfo::new()); + }); + ( + LongRunningTaskDetector { + interval, + detection_time, + stop_flag: Arc::new(Mutex::new(true)), + workers, + }, + runtime_builder, + ) + } + + /// Starts the monitoring thread with default action handlers (that write details to std err). + pub fn start(&self, runtime: Arc) { + self.start_with_custom_action(runtime, Arc::new(StdErrBlockingActionHandler)) + } + + /// Starts the monitoring process with custom action handlers that + /// allow you to customize what happens when blocking is detected. + pub fn start_with_custom_action( + &self, + runtime: Arc, + action: Arc, + ) { + *self.stop_flag.lock().unwrap() = false; + let stop_flag = Arc::clone(&self.stop_flag); + let detection_time = self.detection_time; + let interval = self.interval; + let workers = Arc::clone(&self.workers); + thread::spawn(move || { + let mut rng = thread_rng(); + while !*stop_flag.lock().unwrap() { + probe(&runtime, detection_time, &workers, &action); + thread::sleep(Duration::from_micros( + rng.gen_range(10..=interval.as_micros().try_into().unwrap()), + )); + } + }); + } + + /// Stops the monitoring thread. Does nothing if monitoring thread is already stopped. + pub fn stop(&self) { + let mut sf = self.stop_flag.lock().unwrap(); + if !(*sf) { + *sf = true; + } + } +} + +impl Drop for LongRunningTaskDetector { + fn drop(&mut self) { + self.stop(); + } +} diff --git a/src/lib.rs b/src/lib.rs index 23d02cd..ef846e5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,3 +120,6 @@ cfg_rt! { mod task; pub use task::{Instrumented, TaskMetrics, TaskMonitor}; + +#[cfg(feature = "detectors")] +pub mod detectors; diff --git a/tests/detectors.rs b/tests/detectors.rs new file mode 100644 index 0000000..5ec0e5d --- /dev/null +++ b/tests/detectors.rs @@ -0,0 +1,241 @@ +#[cfg(feature = "detectors")] +mod lrtd_tests_current { + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + use std::thread; + use std::time::Duration; + use tokio_metrics::detectors::LongRunningTaskDetector; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } + + #[test] + fn test_blocking_detection_current() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_current_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Done"); + }); + } + + #[test] + fn test_blocking_detection_lambda() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_current_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + let my_atomic_bool = Arc::new(AtomicBool::new(false)); + let my_atomic_bool2 = my_atomic_bool.clone(); + lrtd.start_with_custom_action( + arc_runtime, + Arc::new(move |workers: &_| { + eprintln!("Blocking: {:?}", workers); + my_atomic_bool.store(true, Ordering::SeqCst); + }), + ); + arc_runtime2.block_on(async { + run_blocking_stuff().await; + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + println!("Done"); + }); + assert!(my_atomic_bool2.load(Ordering::SeqCst)); + } +} + +#[cfg(feature = "detectors-multi-thread")] +mod lrtd_tests_multi { + use std::sync::Arc; + use std::thread; + use std::time::Duration; + use tokio_metrics::detectors::LongRunningTaskDetector; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } + + #[test] + fn test_blocking_detection_multi() { + let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.worker_threads(2).enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + lrtd.start(arc_runtime); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + println!("Done"); + }); + } + + #[test] + fn test_blocking_detection_stop_unstarted() { + let (_lrtd, _builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + } +} + +#[cfg(all(unix, feature = "detectors-multi-thread"))] +mod unix_lrtd_tests { + + use std::backtrace::Backtrace; + use std::collections::HashMap; + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::{Arc, Mutex}; + use std::thread; + use std::time::{Duration, Instant}; + use tokio_metrics::detectors::{BlockingActionHandler, LongRunningTaskDetector, ThreadInfo}; + + async fn run_blocking_stuff() { + println!("slow start"); + thread::sleep(Duration::from_secs(1)); + println!("slow done"); + } + + fn get_thread_id() -> libc::pthread_t { + unsafe { libc::pthread_self() } + } + + static SIGNAL_COUNTER: AtomicUsize = AtomicUsize::new(0); + + static THREAD_DUMPS: Mutex>> = Mutex::new(None); + + extern "C" fn signal_handler(_: i32) { + // not signal safe, this needs to be rewritten to avoid mem allocations and use a pre-allocated buffer. + let backtrace = Backtrace::force_capture(); + let name = thread::current() + .name() + .map(|n| format!(" for thread \"{}\"", n)) + .unwrap_or_else(|| "".to_owned()); + let tid = get_thread_id(); + let detail = format!("Stack trace{}:{}\n{}", name, tid, backtrace); + let mut omap = THREAD_DUMPS.lock().unwrap(); + let map = omap.as_mut().unwrap(); + (*map).insert(tid, detail); + SIGNAL_COUNTER.fetch_sub(1, Ordering::SeqCst); + } + + fn install_thread_stack_stace_handler(signal: libc::c_int) { + unsafe { + libc::signal(signal, signal_handler as libc::sighandler_t); + } + } + + static GTI_MUTEX: Mutex<()> = Mutex::new(()); + + /// A naive stack trace capture implementation for threads for DEMO/TEST only purposes. + fn get_thread_info( + signal: libc::c_int, + targets: &[ThreadInfo], + ) -> HashMap { + let _lock = GTI_MUTEX.lock(); + { + let mut omap = THREAD_DUMPS.lock().unwrap(); + *omap = Some(HashMap::new()); + SIGNAL_COUNTER.store(targets.len(), Ordering::SeqCst); + } + for thread_info in targets { + let result = unsafe { libc::pthread_kill(*thread_info.pthread_id(), signal) }; + if result != 0 { + eprintln!("Error sending signal: {:?}", result); + } + } + let time_limit = Duration::from_secs(1); + let start_time = Instant::now(); + loop { + let signal_count = SIGNAL_COUNTER.load(Ordering::SeqCst); + if signal_count == 0 { + break; + } + if Instant::now() - start_time >= time_limit { + break; + } + std::thread::sleep(std::time::Duration::from_micros(10)); + } + { + let omap = THREAD_DUMPS.lock().unwrap(); + omap.clone().unwrap() + } + } + + struct DetailedCaptureBlockingActionHandler { + inner: Mutex>>, + } + + impl DetailedCaptureBlockingActionHandler { + fn new() -> Self { + DetailedCaptureBlockingActionHandler { + inner: Mutex::new(None), + } + } + + fn contains_symbol(&self, symbol_name: &str) -> bool { + // Iterate over the frames in the backtrace + let omap = self.inner.lock().unwrap(); + match omap.as_ref() { + Some(map) => { + if map.is_empty() { + false + } else { + let bt_str = map.values().next().unwrap(); + bt_str.contains(symbol_name) + } + } + None => false, + } + } + } + + impl BlockingActionHandler for DetailedCaptureBlockingActionHandler { + fn blocking_detected(&self, workers: &[ThreadInfo]) { + let mut map = self.inner.lock().unwrap(); + let tinfo = get_thread_info(libc::SIGUSR1, workers); + eprintln!("Blocking detected with details: {:?}", tinfo); + *map = Some(tinfo); + } + } + + #[test] + fn test_blocking_detection_multi_capture_stack_traces() { + install_thread_stack_stace_handler(libc::SIGUSR1); + let (lrtd, mut builder) = LongRunningTaskDetector::new_multi_threaded( + Duration::from_millis(10), + Duration::from_millis(100), + ); + let runtime = builder.worker_threads(2).enable_all().build().unwrap(); + let arc_runtime = Arc::new(runtime); + let arc_runtime2 = arc_runtime.clone(); + let blocking_action = Arc::new(DetailedCaptureBlockingActionHandler::new()); + let to_assert_blocking = blocking_action.clone(); + lrtd.start_with_custom_action(arc_runtime, blocking_action); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.spawn(run_blocking_stuff()); + arc_runtime2.block_on(async { + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + println!("Done"); + }); + assert!(to_assert_blocking.contains_symbol("std::thread::sleep")); + lrtd.stop() + } +}