Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ members = [
"examples/updater",
"examples/updater_threads",
"examples/blocking_genserver",
"examples/busy_genserver_warning",
]

[workspace.dependencies]
spawned-rt = { path = "rt", version = "0.4.0"}
spawned-concurrency = { path = "concurrency", version = "0.4.0"}
spawned-rt = { path = "rt", version = "0.4.0" }
spawned-concurrency = { path = "concurrency", version = "0.4.0" }
tracing = { version = "0.1.41", features = ["log"] }
tracing-subscriber = { version = "0.3.19", features = ["env-filter"] }

Expand Down
5 changes: 5 additions & 0 deletions concurrency/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ spawned-rt = { workspace = true }
tracing = { workspace = true }
futures = "0.3.1"
thiserror = "2.0.12"
pin-project-lite = "0.2"

[dev-dependencies]
# This tokio imports are only used in tests, we should not use them in the library code.
Expand All @@ -18,3 +19,7 @@ tokio = { version = "1", features = ["full"] }

[lib]
path = "./src/lib.rs"

[features]
# Enable this to log warnings when non-blocking GenServers block the runtime for too much time
warn-on-block = []
62 changes: 54 additions & 8 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
//! GenServer trait and structs to create an abstraction similar to Erlang gen_server.
//! See examples/name_server for a usage example.
use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

use crate::{
error::GenServerError,
tasks::InitResult::{NoSuccess, Success},
};
use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -36,12 +35,19 @@ impl<G: GenServer> GenServerHandle<G> {
cancellation_token,
};
let handle_clone = handle.clone();
// Ignore the JoinHandle for now. Maybe we'll use it in the future
let _join_handle = rt::spawn(async move {
let inner_future = async move {
if gen_server.run(&handle, &mut rx).await.is_err() {
tracing::trace!("GenServer crashed")
};
});
}
};

#[cfg(feature = "warn-on-block")]
// Optionally warn if the GenServer future blocks for too much time
let inner_future = warn_on_block::WarnOnBlocking::new(inner_future);

// Ignore the JoinHandle for now. Maybe we'll use it in the future
let _join_handle = rt::spawn(inner_future);

handle_clone
}

Expand Down Expand Up @@ -294,6 +300,46 @@ pub trait GenServer: Send + Sized {
}
}

#[cfg(feature = "warn-on-block")]
mod warn_on_block {
use super::*;

use std::time::Instant;
use tracing::warn;

pin_project_lite::pin_project! {
pub struct WarnOnBlocking<F: Future>{
#[pin]
inner: F
}
}

impl<F: Future> WarnOnBlocking<F> {
pub fn new(inner: F) -> Self {
Self { inner }
}
}

impl<F: Future> Future for WarnOnBlocking<F> {
type Output = F::Output;

fn poll(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
let type_id = std::any::type_name::<F>();
let this = self.project();
let now = Instant::now();
let res = this.inner.poll(cx);
let elapsed = now.elapsed();
if elapsed > Duration::from_millis(10) {
warn!(future = ?type_id, elapsed = ?elapsed, "Blocking operation detected");
}
res
}
}
}

#[cfg(test)]
mod tests {

Expand Down
13 changes: 13 additions & 0 deletions examples/busy_genserver_warning/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
[package]
name = "busy_genserver_warning"
version = "0.1.0"
edition = "2024"

[dependencies]
spawned-rt = { workspace = true }
spawned-concurrency = { workspace = true }
tracing = { workspace = true }

[[bin]]
name = "busy_genserver_warning"
path = "main.rs"
73 changes: 73 additions & 0 deletions examples/busy_genserver_warning/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use spawned_rt::tasks as rt;
use std::time::Duration;
use std::{process::exit, thread};
use tracing::info;

use spawned_concurrency::tasks::{CallResponse, CastResponse, GenServer, GenServerHandle};

// We test a scenario with a badly behaved task
struct BusyWorker;

impl BusyWorker {
pub fn new() -> Self {
BusyWorker
}
}

#[derive(Clone)]
pub enum InMessage {
GetCount,
Stop,
}

#[derive(Clone)]
pub enum OutMsg {
Count(u64),
}

impl GenServer for BusyWorker {
type CallMsg = InMessage;
type CastMsg = ();
type OutMsg = ();
type Error = ();

async fn handle_call(
&mut self,
_: Self::CallMsg,
_: &GenServerHandle<Self>,
) -> CallResponse<Self> {
CallResponse::Stop(())
}

async fn handle_cast(
&mut self,
_: Self::CastMsg,
handle: &GenServerHandle<Self>,
) -> CastResponse {
info!("sleeping");
thread::sleep(Duration::from_millis(542));
handle.clone().cast(()).await.unwrap();
// This sleep is needed to yield control to the runtime.
// If not, the future never returns and the warning isn't emitted.
rt::sleep(Duration::from_millis(0)).await;
CastResponse::NoReply
}
}

/// Example of a program with a semi-blocking [`GenServer`].
/// As mentioned in the `blocking_genserver` example, tasks that block can block
/// the entire runtime in cooperative multitasking models. This is easy to find
/// in practice, since it appears as if the whole world stopped. However, most
/// of the time, tasks simply take longer than expected, which can lead to
/// service degradation and increased latency. To tackle this, we print a warning
/// whenever we detect tasks that take too long to run.
pub fn main() {
rt::run(async move {
// If we change BusyWorker to start_blocking instead, it won't print the warning
let mut badboy = BusyWorker::new().start();
let _ = badboy.cast(()).await;

rt::sleep(Duration::from_secs(5)).await;
exit(0);
})
}