diff --git a/Cargo.lock b/Cargo.lock index 983b4d9..13b3249 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -98,6 +98,15 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "busy_genserver_warning" +version = "0.1.0" +dependencies = [ + "spawned-concurrency", + "spawned-rt", + "tracing", +] + [[package]] name = "bytes" version = "1.10.1" @@ -1191,6 +1200,7 @@ name = "spawned-concurrency" version = "0.4.0" dependencies = [ "futures", + "pin-project-lite", "spawned-rt", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 6c6f691..802efaf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,11 +12,12 @@ members = [ "examples/updater", "examples/updater_threads", "examples/blocking_genserver", + "examples/busy_genserver_warning", ] [workspace.dependencies] -spawned-rt = { path = "rt", version = "0.4.0"} -spawned-concurrency = { path = "concurrency", version = "0.4.0"} +spawned-rt = { path = "rt", version = "0.4.0" } +spawned-concurrency = { path = "concurrency", version = "0.4.0" } tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 198c166..f959dfc 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -10,6 +10,7 @@ spawned-rt = { workspace = true } tracing = { workspace = true } futures = "0.3.1" thiserror = "2.0.12" +pin-project-lite = "0.2" [dev-dependencies] # This tokio imports are only used in tests, we should not use them in the library code. @@ -18,3 +19,7 @@ tokio = { version = "1", features = ["full"] } [lib] path = "./src/lib.rs" + +[features] +# Enable this to log warnings when non-blocking GenServers block the runtime for too much time +warn-on-block = [] diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index c5275c7..3c05b04 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -1,13 +1,12 @@ //! GenServer trait and structs to create an abstraction similar to Erlang gen_server. //! See examples/name_server for a usage example. -use futures::future::FutureExt as _; -use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken}; -use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; - use crate::{ error::GenServerError, tasks::InitResult::{NoSuccess, Success}, }; +use futures::future::FutureExt as _; +use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken}; +use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5); @@ -36,12 +35,19 @@ impl GenServerHandle { cancellation_token, }; let handle_clone = handle.clone(); - // Ignore the JoinHandle for now. Maybe we'll use it in the future - let _join_handle = rt::spawn(async move { + let inner_future = async move { if gen_server.run(&handle, &mut rx).await.is_err() { tracing::trace!("GenServer crashed") - }; - }); + } + }; + + #[cfg(feature = "warn-on-block")] + // Optionally warn if the GenServer future blocks for too much time + let inner_future = warn_on_block::WarnOnBlocking::new(inner_future); + + // Ignore the JoinHandle for now. Maybe we'll use it in the future + let _join_handle = rt::spawn(inner_future); + handle_clone } @@ -294,6 +300,46 @@ pub trait GenServer: Send + Sized { } } +#[cfg(feature = "warn-on-block")] +mod warn_on_block { + use super::*; + + use std::time::Instant; + use tracing::warn; + + pin_project_lite::pin_project! { + pub struct WarnOnBlocking{ + #[pin] + inner: F + } + } + + impl WarnOnBlocking { + pub fn new(inner: F) -> Self { + Self { inner } + } + } + + impl Future for WarnOnBlocking { + type Output = F::Output; + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let type_id = std::any::type_name::(); + let this = self.project(); + let now = Instant::now(); + let res = this.inner.poll(cx); + let elapsed = now.elapsed(); + if elapsed > Duration::from_millis(10) { + warn!(future = ?type_id, elapsed = ?elapsed, "Blocking operation detected"); + } + res + } + } +} + #[cfg(test)] mod tests { diff --git a/examples/busy_genserver_warning/Cargo.toml b/examples/busy_genserver_warning/Cargo.toml new file mode 100644 index 0000000..641bd87 --- /dev/null +++ b/examples/busy_genserver_warning/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "busy_genserver_warning" +version = "0.1.0" +edition = "2024" + +[dependencies] +spawned-rt = { workspace = true } +spawned-concurrency = { workspace = true } +tracing = { workspace = true } + +[[bin]] +name = "busy_genserver_warning" +path = "main.rs" diff --git a/examples/busy_genserver_warning/main.rs b/examples/busy_genserver_warning/main.rs new file mode 100644 index 0000000..e799996 --- /dev/null +++ b/examples/busy_genserver_warning/main.rs @@ -0,0 +1,73 @@ +use spawned_rt::tasks as rt; +use std::time::Duration; +use std::{process::exit, thread}; +use tracing::info; + +use spawned_concurrency::tasks::{CallResponse, CastResponse, GenServer, GenServerHandle}; + +// We test a scenario with a badly behaved task +struct BusyWorker; + +impl BusyWorker { + pub fn new() -> Self { + BusyWorker + } +} + +#[derive(Clone)] +pub enum InMessage { + GetCount, + Stop, +} + +#[derive(Clone)] +pub enum OutMsg { + Count(u64), +} + +impl GenServer for BusyWorker { + type CallMsg = InMessage; + type CastMsg = (); + type OutMsg = (); + type Error = (); + + async fn handle_call( + &mut self, + _: Self::CallMsg, + _: &GenServerHandle, + ) -> CallResponse { + CallResponse::Stop(()) + } + + async fn handle_cast( + &mut self, + _: Self::CastMsg, + handle: &GenServerHandle, + ) -> CastResponse { + info!("sleeping"); + thread::sleep(Duration::from_millis(542)); + handle.clone().cast(()).await.unwrap(); + // This sleep is needed to yield control to the runtime. + // If not, the future never returns and the warning isn't emitted. + rt::sleep(Duration::from_millis(0)).await; + CastResponse::NoReply + } +} + +/// Example of a program with a semi-blocking [`GenServer`]. +/// As mentioned in the `blocking_genserver` example, tasks that block can block +/// the entire runtime in cooperative multitasking models. This is easy to find +/// in practice, since it appears as if the whole world stopped. However, most +/// of the time, tasks simply take longer than expected, which can lead to +/// service degradation and increased latency. To tackle this, we print a warning +/// whenever we detect tasks that take too long to run. +pub fn main() { + rt::run(async move { + // If we change BusyWorker to start_blocking instead, it won't print the warning + let mut badboy = BusyWorker::new().start(); + let _ = badboy.cast(()).await; + + rt::sleep(Duration::from_secs(5)).await; + exit(0); + }) +}