From bad01c1feb4257e07e6e6f839b6030ed8b308bf2 Mon Sep 17 00:00:00 2001 From: Francisco Xavier Gauna Date: Fri, 13 Jun 2025 18:56:25 -0300 Subject: [PATCH 1/3] Adding sstart_blocking to gen servers And an example --- Cargo.lock | 9 ++ Cargo.toml | 2 +- concurrency/src/tasks/gen_server.rs | 170 +++++++++++++++++++++++++ concurrency/src/threads/gen_server.rs | 6 + examples/blocking_genserver/Cargo.toml | 13 ++ examples/blocking_genserver/main.rs | 139 ++++++++++++++++++++ rt/src/tasks/mod.rs | 8 +- rt/src/tasks/tokio/mod.rs | 2 +- rt/src/threads/mod.rs | 9 ++ 9 files changed, 355 insertions(+), 3 deletions(-) create mode 100644 examples/blocking_genserver/Cargo.toml create mode 100644 examples/blocking_genserver/main.rs diff --git a/Cargo.lock b/Cargo.lock index c4bb721..b9f9150 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,6 +83,15 @@ version = "2.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b8e56985ec62d17e9c1001dc89c88ecd7dc08e47eba5ec7c29c7b5eeecde967" +[[package]] +name = "blocking_genserver" +version = "0.1.0" +dependencies = [ + "spawned-concurrency", + "spawned-rt", + "tracing", +] + [[package]] name = "bumpalo" version = "3.17.0" diff --git a/Cargo.toml b/Cargo.toml index a6f68bf..725d92b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ members = [ "examples/ping_pong", "examples/ping_pong_threads", "examples/updater", - "examples/updater_threads", + "examples/updater_threads", "examples/blocking_genserver", ] [workspace.dependencies] diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 196fb5c..1875c1d 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -38,6 +38,26 @@ impl GenServerHandle { handle_clone } + pub(crate) fn new_blocking(mut initial_state: G::State) -> Self { + let (tx, mut rx) = mpsc::channel::>(); + let handle = GenServerHandle { tx }; + let mut gen_server: G = GenServer::new(); + let handle_clone = handle.clone(); + // Ignore the JoinHandle for now. Maybe we'll use it in the future + let _join_handle = rt::spawn_blocking(|| { + rt::block_on(async move { + if gen_server + .run(&handle, &mut rx, &mut initial_state) + .await + .is_err() + { + tracing::trace!("GenServer crashed") + }; + }) + }); + handle_clone + } + pub fn sender(&self) -> mpsc::Sender> { self.tx.clone() } @@ -98,6 +118,15 @@ where GenServerHandle::new(initial_state) } + /// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't + /// happen if the task is blocking the thread. As such, for sync compute task + /// or other blocking tasks need to be in their own separate thread, and the OS + /// will manage them through hardware interrupts. + /// Start blocking provides such thread. + fn start_blocking(initial_state: Self::State) -> GenServerHandle { + GenServerHandle::new_blocking(initial_state) + } + fn run( &mut self, handle: &GenServerHandle, @@ -201,3 +230,144 @@ where state: &mut Self::State, ) -> impl std::future::Future + Send; } + +#[cfg(test)] +mod tests { + use crate::tasks::send_after; + + use super::*; + use std::thread; + use std::time::Duration; + + // We test a scenario with a badly behaved task + struct BadlyBehavedTask; + + #[derive(Clone)] + pub enum InMessage { + GetCount, + } + #[derive(Clone)] + pub enum OutMsg { + Count(u64), + } + + impl GenServer for BadlyBehavedTask { + type CallMsg = (); + type CastMsg = (); + type OutMsg = (); + type State = (); + type Error = (); + + fn new() -> Self { + Self {} + } + + async fn handle_call( + &mut self, + _: Self::CallMsg, + _: &GenServerHandle, + _: &mut Self::State, + ) -> CallResponse { + todo!() + } + + async fn handle_cast( + &mut self, + _: Self::CastMsg, + _: &GenServerHandle, + _: &mut Self::State, + ) -> CastResponse { + let orig_thread_id = format!("{:?}", thread::current().id()); + loop { + println!("{:?}: bad still alive", thread::current().id()); + loop { + // here we loop and sleep until we switch threads, once we do, we never call await again + // blocking all progress on all other tasks forever + let thread_id = format!("{:?}", thread::current().id()); + if thread_id == orig_thread_id { + rt::sleep(Duration::from_millis(100)).await; + } else { + break; + } + } + thread::sleep(Duration::from_secs(10)); + } + } + } + + struct WellBehavedTask; + + #[derive(Clone)] + struct CountState { + pub count: u64, + } + + impl GenServer for WellBehavedTask { + type CallMsg = InMessage; + type CastMsg = (); + type OutMsg = OutMsg; + type State = CountState; + type Error = (); + + fn new() -> Self { + Self {} + } + + async fn handle_call( + &mut self, + _: Self::CallMsg, + _: &GenServerHandle, + state: &mut Self::State, + ) -> CallResponse { + CallResponse::Reply(OutMsg::Count(state.count)) + } + + async fn handle_cast( + &mut self, + _: Self::CastMsg, + handle: &GenServerHandle, + state: &mut Self::State, + ) -> CastResponse { + state.count += 1; + println!("{:?}: good still alive", thread::current().id()); + send_after(Duration::from_millis(100), handle.to_owned(), ()); + CastResponse::NoReply + } + } + + /* #[test] + pub fn badly_behaved_non_thread() { + rt::run(async move { + let mut badboy = BadlyBehavedTask::start(()); + let _ = badboy.cast(()).await; + let mut goodboy = WellBehavedTask::start(CountState{count: 0}); + let _ = goodboy.cast(()).await; + rt::sleep(Duration::from_secs(1)).await; + let count = goodboy.call(InMessage::GetCount).await.unwrap(); + + match count { + OutMsg::Count(num) => { + assert!(num == 10); + } + } + }) + } */ + + #[test] + pub fn badly_behaved_thread() { + rt::block_on(async move { + let mut badboy = BadlyBehavedTask::start_blocking(()); + let _ = badboy.cast(()).await; + let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); + let _ = goodboy.cast(()).await; + rt::sleep(Duration::from_secs(1)).await; + let count = goodboy.call(InMessage::GetCount).await.unwrap(); + + match count { + OutMsg::Count(num) => { + assert!(num == 10); + } + } + }) + } +} diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 6ce3a64..b73ff9a 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -98,6 +98,12 @@ where GenServerHandle::new(initial_state) } + /// We copy the same interface than threads, but all threads can work + /// while blocking by default + fn start_blocking(initial_state: Self::State) -> GenServerHandle { + GenServerHandle::new(initial_state) + } + fn run( &mut self, handle: &GenServerHandle, diff --git a/examples/blocking_genserver/Cargo.toml b/examples/blocking_genserver/Cargo.toml new file mode 100644 index 0000000..e09f82a --- /dev/null +++ b/examples/blocking_genserver/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "blocking_genserver" +version = "0.1.0" +edition = "2024" + +[dependencies] +spawned-rt = { workspace = true } +spawned-concurrency = { workspace = true } +tracing = { workspace = true } + +[[bin]] +name = "blocking_genserver" +path = "main.rs" diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs new file mode 100644 index 0000000..37914d8 --- /dev/null +++ b/examples/blocking_genserver/main.rs @@ -0,0 +1,139 @@ +use spawned_rt::tasks as rt; +use std::time::Duration; +use std::{process::exit, thread}; + +use spawned_concurrency::tasks::{ + CallResponse, CastResponse, GenServer, GenServerHandle, send_after, +}; + +// We test a scenario with a badly behaved task +struct BadlyBehavedTask; + +#[derive(Clone)] +pub enum InMessage { + GetCount, + Stop, +} +#[derive(Clone)] +pub enum OutMsg { + Count(u64), +} + +impl GenServer for BadlyBehavedTask { + type CallMsg = InMessage; + type CastMsg = (); + type OutMsg = (); + type State = (); + type Error = (); + + fn new() -> Self { + Self {} + } + + async fn handle_call( + &mut self, + _: Self::CallMsg, + _: &GenServerHandle, + _: &mut Self::State, + ) -> CallResponse { + CallResponse::Stop(()) + } + + async fn handle_cast( + &mut self, + _: Self::CastMsg, + _: &GenServerHandle, + _: &mut Self::State, + ) -> CastResponse { + rt::sleep(Duration::from_millis(20)).await; + loop { + println!("{:?}: bad still alive", thread::current().id()); + thread::sleep(Duration::from_millis(50)); + } + } +} + +struct WellBehavedTask; + +#[derive(Clone)] +struct CountState { + pub count: u64, +} + +impl GenServer for WellBehavedTask { + type CallMsg = InMessage; + type CastMsg = (); + type OutMsg = OutMsg; + type State = CountState; + type Error = (); + + fn new() -> Self { + Self {} + } + + async fn handle_call( + &mut self, + message: Self::CallMsg, + _: &GenServerHandle, + state: &mut Self::State, + ) -> CallResponse { + match message { + InMessage::GetCount => CallResponse::Reply(OutMsg::Count(state.count)), + InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)), + } + } + + async fn handle_cast( + &mut self, + _: Self::CastMsg, + handle: &GenServerHandle, + state: &mut Self::State, + ) -> CastResponse { + state.count += 1; + println!("{:?}: good still alive", thread::current().id()); + send_after(Duration::from_millis(100), handle.to_owned(), ()); + CastResponse::NoReply + } +} + +/* #[test] +pub fn badly_behaved_non_thread() { + rt::run(async move { + let mut badboy = BadlyBehavedTask::start(()); + let _ = badboy.cast(()).await; + let mut goodboy = WellBehavedTask::start(CountState{count: 0}); + let _ = goodboy.cast(()).await; + rt::sleep(Duration::from_secs(1)).await; + let count = goodboy.call(InMessage::GetCount).await.unwrap(); + + match count { + OutMsg::Count(num) => {x + assert!(num == 10); + } + } + }) +} */ + +/// Example of start_blocking to fix issues #8 https://github.com/lambdaclass/spawned/issues/8 +/// Tasks that block can block the entire tokio runtime (and other cooperative multitasking models) +/// To fix this we implement start_blocking, which under the hood launches a new thread to deal with the issue +pub fn main() { + rt::run(async move { + // If we change BadlyBehavedTask to start instead, it can stop the entire program + let mut badboy = BadlyBehavedTask::start_blocking(()); + let _ = badboy.cast(()).await; + let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); + let _ = goodboy.cast(()).await; + rt::sleep(Duration::from_secs(1)).await; + let count = goodboy.call(InMessage::GetCount).await.unwrap(); + + match count { + OutMsg::Count(num) => { + assert!(num == 10); + } + } + + goodboy.call(InMessage::Stop).await.unwrap(); + exit(0); + }) +} diff --git a/rt/src/tasks/mod.rs b/rt/src/tasks/mod.rs index 5cb9a41..7cf9373 100644 --- a/rt/src/tasks/mod.rs +++ b/rt/src/tasks/mod.rs @@ -9,12 +9,14 @@ mod tokio; +use ::tokio::runtime::Handle; + use crate::tracing::init_tracing; pub use crate::tasks::tokio::mpsc; pub use crate::tasks::tokio::oneshot; pub use crate::tasks::tokio::sleep; -pub use crate::tasks::tokio::{spawn, JoinHandle, Runtime}; +pub use crate::tasks::tokio::{spawn, spawn_blocking, JoinHandle, Runtime}; use std::future::Future; pub fn run(future: F) -> F::Output { @@ -23,3 +25,7 @@ pub fn run(future: F) -> F::Output { let rt = Runtime::new().unwrap(); rt.block_on(future) } + +pub fn block_on(future: F) -> F::Output { + Handle::current().block_on(future) +} diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 7d7ba9a..8131b27 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -4,6 +4,6 @@ pub mod oneshot; pub use tokio::{ runtime::Runtime, - task::{spawn, JoinHandle}, + task::{spawn, spawn_blocking, JoinHandle}, time::sleep, }; diff --git a/rt/src/threads/mod.rs b/rt/src/threads/mod.rs index cd8b543..29dc6c0 100644 --- a/rt/src/threads/mod.rs +++ b/rt/src/threads/mod.rs @@ -20,3 +20,12 @@ pub fn block_on(future: F) -> F::Output { let rt = Runtime::new().unwrap(); rt.block_on(future) } + +/// Spawn blocking is the same as spawn for pure threaded usage. +pub fn spawn_blocking(f: F) -> JoinHandle +where + F: FnOnce() -> R + Send + 'static, + R: Send + 'static, +{ + spawn(f) +} From 9a438418b1f0668bd73d9c91f7de556c143ee9fa Mon Sep 17 00:00:00 2001 From: Francisco Xavier Gauna Date: Tue, 17 Jun 2025 12:42:35 -0300 Subject: [PATCH 2/3] Fixed auto test --- concurrency/src/tasks/gen_server.rs | 62 +++++++++++++---------------- examples/blocking_genserver/main.rs | 18 --------- rt/src/tasks/tokio/mod.rs | 1 + 3 files changed, 28 insertions(+), 53 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 1875c1d..465832e 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -233,18 +233,15 @@ where #[cfg(test)] mod tests { - use crate::tasks::send_after; - use super::*; - use std::thread; - use std::time::Duration; - - // We test a scenario with a badly behaved task + use crate::tasks::send_after; + use std::{process::exit, thread, time::Duration}; struct BadlyBehavedTask; #[derive(Clone)] pub enum InMessage { GetCount, + Stop, } #[derive(Clone)] pub enum OutMsg { @@ -252,7 +249,7 @@ mod tests { } impl GenServer for BadlyBehavedTask { - type CallMsg = (); + type CallMsg = InMessage; type CastMsg = (); type OutMsg = (); type State = (); @@ -268,7 +265,7 @@ mod tests { _: &GenServerHandle, _: &mut Self::State, ) -> CallResponse { - todo!() + CallResponse::Stop(()) } async fn handle_cast( @@ -277,21 +274,9 @@ mod tests { _: &GenServerHandle, _: &mut Self::State, ) -> CastResponse { - let orig_thread_id = format!("{:?}", thread::current().id()); - loop { - println!("{:?}: bad still alive", thread::current().id()); - loop { - // here we loop and sleep until we switch threads, once we do, we never call await again - // blocking all progress on all other tasks forever - let thread_id = format!("{:?}", thread::current().id()); - if thread_id == orig_thread_id { - rt::sleep(Duration::from_millis(100)).await; - } else { - break; - } - } - thread::sleep(Duration::from_secs(10)); - } + rt::sleep(Duration::from_millis(20)).await; + thread::sleep(Duration::from_secs(2)); + CastResponse::Stop } } @@ -315,11 +300,14 @@ mod tests { async fn handle_call( &mut self, - _: Self::CallMsg, + message: Self::CallMsg, _: &GenServerHandle, state: &mut Self::State, ) -> CallResponse { - CallResponse::Reply(OutMsg::Count(state.count)) + match message { + InMessage::GetCount => CallResponse::Reply(OutMsg::Count(state.count)), + InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)), + } } async fn handle_cast( @@ -335,27 +323,30 @@ mod tests { } } - /* #[test] - pub fn badly_behaved_non_thread() { - rt::run(async move { + #[test] + pub fn badly_behaved_thread_non_blocking() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { let mut badboy = BadlyBehavedTask::start(()); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::start(CountState{count: 0}); + let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); match count { OutMsg::Count(num) => { - assert!(num == 10); + assert_ne!(num, 10); } } - }) - } */ + goodboy.call(InMessage::Stop).await.unwrap(); + }); + } #[test] pub fn badly_behaved_thread() { - rt::block_on(async move { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { let mut badboy = BadlyBehavedTask::start_blocking(()); let _ = badboy.cast(()).await; let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); @@ -365,9 +356,10 @@ mod tests { match count { OutMsg::Count(num) => { - assert!(num == 10); + assert_eq!(num, 10); } } - }) + goodboy.call(InMessage::Stop).await.unwrap(); + }); } } diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 37914d8..9a2b832 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -96,24 +96,6 @@ impl GenServer for WellBehavedTask { } } -/* #[test] -pub fn badly_behaved_non_thread() { - rt::run(async move { - let mut badboy = BadlyBehavedTask::start(()); - let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::start(CountState{count: 0}); - let _ = goodboy.cast(()).await; - rt::sleep(Duration::from_secs(1)).await; - let count = goodboy.call(InMessage::GetCount).await.unwrap(); - - match count { - OutMsg::Count(num) => {x - assert!(num == 10); - } - } - }) -} */ - /// Example of start_blocking to fix issues #8 https://github.com/lambdaclass/spawned/issues/8 /// Tasks that block can block the entire tokio runtime (and other cooperative multitasking models) /// To fix this we implement start_blocking, which under the hood launches a new thread to deal with the issue diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 8131b27..51a3877 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -6,4 +6,5 @@ pub use tokio::{ runtime::Runtime, task::{spawn, spawn_blocking, JoinHandle}, time::sleep, + test, }; From 73396b54971768b0155d644558ccdafb60620b4d Mon Sep 17 00:00:00 2001 From: Francisco Xavier Gauna Date: Tue, 17 Jun 2025 16:06:31 -0300 Subject: [PATCH 3/3] fixed comment --- concurrency/src/threads/gen_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index b73ff9a..1541c43 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -98,7 +98,7 @@ where GenServerHandle::new(initial_state) } - /// We copy the same interface than threads, but all threads can work + /// We copy the same interface as tasks, but all threads can work /// while blocking by default fn start_blocking(initial_state: Self::State) -> GenServerHandle { GenServerHandle::new(initial_state)