From c5f0309793435e922814504859915975505f83bd Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 18 Jun 2025 10:18:32 -0300 Subject: [PATCH 1/6] Added init function to the trait --- concurrency/src/tasks/error.rs | 7 ++-- concurrency/src/tasks/gen_server.rs | 34 ++++++++++++++++--- concurrency/src/threads/error.rs | 7 ++-- concurrency/src/threads/gen_server.rs | 16 +++++++-- examples/bank/src/server.rs | 8 +++++ examples/bank_threads/src/server.rs | 8 +++++ examples/blocking_genserver/main.rs | 16 +++++++++ examples/name_server/src/server.rs | 8 +++++ examples/name_server_with_error/src/server.rs | 12 +++++-- examples/updater/src/server.rs | 8 +++++ examples/updater_threads/src/server.rs | 8 +++++ rt/src/tasks/tokio/mod.rs | 1 - 12 files changed, 117 insertions(+), 16 deletions(-) diff --git a/concurrency/src/tasks/error.rs b/concurrency/src/tasks/error.rs index 05653fa..498e8fb 100644 --- a/concurrency/src/tasks/error.rs +++ b/concurrency/src/tasks/error.rs @@ -1,11 +1,12 @@ #[derive(Debug)] pub enum GenServerError { - CallbackError, - ServerError, + Callback, + Initialization, + Server, } impl From> for GenServerError { fn from(_value: spawned_rt::tasks::mpsc::SendError) -> Self { - Self::ServerError + Self::Server } } diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 168b3c5..43b80dc 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -70,14 +70,14 @@ impl GenServerHandle { })?; match oneshot_rx.await { Ok(result) => result, - Err(_) => Err(GenServerError::ServerError), + Err(_) => Err(GenServerError::Server), } } pub async fn cast(&mut self, message: G::CastMsg) -> Result<(), GenServerError> { self.tx .send(GenServerInMsg::Cast { message }) - .map_err(|_error| GenServerError::ServerError) + .map_err(|_error| GenServerError::Server) } } @@ -133,11 +133,21 @@ where state: &mut Self::State, ) -> impl Future> + Send { async { + if let Err(err) = self.init(handle, state).await { + tracing::error!("Initialization failed: {err:?}"); + return Err(GenServerError::Initialization); + } self.main_loop(handle, rx, state).await?; Ok(()) } } + fn init( + &mut self, + handle: &GenServerHandle, + state: &mut Self::State, + ) -> impl Future> + Send; + fn main_loop( &mut self, handle: &GenServerHandle, @@ -178,7 +188,7 @@ where CallResponse::Reply(response) => (true, None, Ok(response)), CallResponse::Stop(response) => (false, None, Ok(response)), }, - Err(error) => (true, Some(error), Err(GenServerError::CallbackError)), + Err(error) => (true, Some(error), Err(GenServerError::Callback)), }; // Send response back if sender.send(response).is_err() { @@ -233,7 +243,7 @@ where mod tests { use super::*; use crate::tasks::send_after; - use std::{process::exit, thread, time::Duration}; + use std::{thread, time::Duration}; struct BadlyBehavedTask; #[derive(Clone)] @@ -257,6 +267,14 @@ mod tests { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, _: Self::CallMsg, @@ -296,6 +314,14 @@ mod tests { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/concurrency/src/threads/error.rs b/concurrency/src/threads/error.rs index 735e37e..8834e0f 100644 --- a/concurrency/src/threads/error.rs +++ b/concurrency/src/threads/error.rs @@ -1,11 +1,12 @@ #[derive(Debug)] pub enum GenServerError { - CallbackError, - ServerError, + Callback, + Initialization, + Server, } impl From> for GenServerError { fn from(_value: spawned_rt::threads::mpsc::SendError) -> Self { - Self::ServerError + Self::Server } } diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 1541c43..38f8310 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -51,14 +51,14 @@ impl GenServerHandle { })?; match oneshot_rx.recv() { Ok(result) => result, - Err(_) => Err(GenServerError::ServerError), + Err(_) => Err(GenServerError::Server), } } pub fn cast(&mut self, message: G::CastMsg) -> Result<(), GenServerError> { self.tx .send(GenServerInMsg::Cast { message }) - .map_err(|_error| GenServerError::ServerError) + .map_err(|_error| GenServerError::Server) } } @@ -110,10 +110,20 @@ where rx: &mut mpsc::Receiver>, state: &mut Self::State, ) -> Result<(), GenServerError> { + if let Err(err) = self.init(handle, state) { + tracing::error!("Initialization failed: {err:?}"); + return Err(GenServerError::Initialization); + } self.main_loop(handle, rx, state)?; Ok(()) } + fn init( + &mut self, + handle: &GenServerHandle, + state: &mut Self::State, + ) -> Result<(), Self::Error>; + fn main_loop( &mut self, handle: &GenServerHandle, @@ -149,7 +159,7 @@ where CallResponse::Reply(response) => (true, None, Ok(response)), CallResponse::Stop(response) => (false, None, Ok(response)), }, - Err(error) => (true, Some(error), Err(GenServerError::CallbackError)), + Err(error) => (true, Some(error), Err(GenServerError::Callback)), }; // Send response back if sender.send(response).is_err() { diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index 13287fc..fbb3a4d 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -51,6 +51,14 @@ impl GenServer for Bank { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs index d5dab95..fee6b07 100644 --- a/examples/bank_threads/src/server.rs +++ b/examples/bank_threads/src/server.rs @@ -47,6 +47,14 @@ impl GenServer for Bank { Self {} } + fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 9a2b832..8dcf97c 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -30,6 +30,14 @@ impl GenServer for BadlyBehavedTask { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, _: Self::CallMsg, @@ -71,6 +79,14 @@ impl GenServer for WellBehavedTask { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index 77a8b58..e4bdf1b 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -36,6 +36,14 @@ impl GenServer for NameServer { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/name_server_with_error/src/server.rs b/examples/name_server_with_error/src/server.rs index 5906349..13f4af9 100644 --- a/examples/name_server_with_error/src/server.rs +++ b/examples/name_server_with_error/src/server.rs @@ -15,8 +15,8 @@ impl NameServer { pub async fn add(server: &mut NameServerHandle, key: String, value: String) -> OutMessage { match server.call(InMessage::Add { key, value }).await { Ok(_) => OutMessage::Ok, - Err(GenServerError::ServerError) => OutMessage::ServerError, - Err(GenServerError::CallbackError) => OutMessage::CallbackError, + Err(GenServerError::Callback) => OutMessage::CallbackError, + Err(_) => OutMessage::ServerError, } } @@ -39,6 +39,14 @@ impl GenServer for NameServer { NameServer {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index fa75a8b..ee06be9 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -35,6 +35,14 @@ impl GenServer for UpdaterServer { Self {} } + async fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + async fn handle_call( &mut self, _message: Self::CallMsg, diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs index d26e447..9b6a149 100644 --- a/examples/updater_threads/src/server.rs +++ b/examples/updater_threads/src/server.rs @@ -36,6 +36,14 @@ impl GenServer for UpdaterServer { Self {} } + fn init( + &mut self, + _handle: &GenServerHandle, + _state: &mut Self::State, + ) -> Result<(), Self::Error> { + Ok(()) + } + fn handle_call( &mut self, _message: Self::CallMsg, diff --git a/rt/src/tasks/tokio/mod.rs b/rt/src/tasks/tokio/mod.rs index 51a3877..8131b27 100644 --- a/rt/src/tasks/tokio/mod.rs +++ b/rt/src/tasks/tokio/mod.rs @@ -6,5 +6,4 @@ pub use tokio::{ runtime::Runtime, task::{spawn, spawn_blocking, JoinHandle}, time::sleep, - test, }; From 675c8b7d485c1fe3d62f6b0e2c494782cac41721 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 18 Jun 2025 12:36:03 -0300 Subject: [PATCH 2/6] init() function takes state ownership --- concurrency/src/tasks/gen_server.rs | 50 ++++++++----------- concurrency/src/threads/gen_server.rs | 31 ++++++------ examples/bank/src/server.rs | 8 --- examples/bank_threads/src/server.rs | 8 --- examples/blocking_genserver/main.rs | 16 ------ examples/name_server/src/server.rs | 8 --- examples/name_server_with_error/src/server.rs | 8 --- examples/updater/src/server.rs | 8 --- examples/updater_threads/src/server.rs | 8 --- 9 files changed, 37 insertions(+), 108 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 43b80dc..16f8824 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -20,7 +20,7 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(mut initial_state: G::State) -> Self { + pub(crate) fn new(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); @@ -28,7 +28,7 @@ impl GenServerHandle { // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn(async move { if gen_server - .run(&handle, &mut rx, &mut initial_state) + .run(&handle, &mut rx, initial_state) .await .is_err() { @@ -38,7 +38,7 @@ impl GenServerHandle { handle_clone } - pub(crate) fn new_blocking(mut initial_state: G::State) -> Self { + pub(crate) fn new_blocking(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); @@ -47,7 +47,7 @@ impl GenServerHandle { let _join_handle = rt::spawn_blocking(|| { rt::block_on(async move { if gen_server - .run(&handle, &mut rx, &mut initial_state) + .run(&handle, &mut rx, initial_state) .await .is_err() { @@ -109,7 +109,7 @@ where type CastMsg: Send + Sized; type OutMsg: Send + Sized; type State: Clone + Send; - type Error: Debug; + type Error: Debug + Send; fn new() -> Self; @@ -130,23 +130,29 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: &mut Self::State, + state: Self::State, ) -> impl Future> + Send { async { - if let Err(err) = self.init(handle, state).await { - tracing::error!("Initialization failed: {err:?}"); - return Err(GenServerError::Initialization); + match self.init(handle, state).await { + Ok(mut new_state) => { + self.main_loop(handle, rx, &mut new_state).await?; + Ok(()) + } + Err(err) => { + tracing::error!("Initialization failed: {err:?}"); + Err(GenServerError::Initialization) + } } - self.main_loop(handle, rx, state).await?; - Ok(()) } } fn init( &mut self, - handle: &GenServerHandle, - state: &mut Self::State, - ) -> impl Future> + Send; + _handle: &GenServerHandle, + state: Self::State, + ) -> impl Future> + Send { + async { Ok(state) } + } fn main_loop( &mut self, @@ -267,14 +273,6 @@ mod tests { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, _: Self::CallMsg, @@ -314,14 +312,6 @@ mod tests { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 38f8310..de39858 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -22,17 +22,14 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(mut initial_state: G::State) -> Self { + pub(crate) fn new(initial_state: G::State) -> Self { let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn(move || { - if gen_server - .run(&handle, &mut rx, &mut initial_state) - .is_err() - { + if gen_server.run(&handle, &mut rx, initial_state).is_err() { tracing::trace!("GenServer crashed") }; }); @@ -108,21 +105,27 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: &mut Self::State, + state: Self::State, ) -> Result<(), GenServerError> { - if let Err(err) = self.init(handle, state) { - tracing::error!("Initialization failed: {err:?}"); - return Err(GenServerError::Initialization); + match self.init(handle, state) { + Ok(mut new_state) => { + self.main_loop(handle, rx, &mut new_state)?; + Ok(()) + } + Err(err) => { + tracing::error!("Initialization failed: {err:?}"); + Err(GenServerError::Initialization) + } } - self.main_loop(handle, rx, state)?; - Ok(()) } fn init( &mut self, - handle: &GenServerHandle, - state: &mut Self::State, - ) -> Result<(), Self::Error>; + _handle: &GenServerHandle, + state: Self::State, + ) -> Result { + Ok(state) + } fn main_loop( &mut self, diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index fbb3a4d..13287fc 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -51,14 +51,6 @@ impl GenServer for Bank { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs index fee6b07..d5dab95 100644 --- a/examples/bank_threads/src/server.rs +++ b/examples/bank_threads/src/server.rs @@ -47,14 +47,6 @@ impl GenServer for Bank { Self {} } - fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 8dcf97c..9a2b832 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -30,14 +30,6 @@ impl GenServer for BadlyBehavedTask { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, _: Self::CallMsg, @@ -79,14 +71,6 @@ impl GenServer for WellBehavedTask { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index e4bdf1b..77a8b58 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -36,14 +36,6 @@ impl GenServer for NameServer { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/name_server_with_error/src/server.rs b/examples/name_server_with_error/src/server.rs index 13f4af9..9da0d0f 100644 --- a/examples/name_server_with_error/src/server.rs +++ b/examples/name_server_with_error/src/server.rs @@ -39,14 +39,6 @@ impl GenServer for NameServer { NameServer {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index ee06be9..fa75a8b 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -35,14 +35,6 @@ impl GenServer for UpdaterServer { Self {} } - async fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - async fn handle_call( &mut self, _message: Self::CallMsg, diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs index 9b6a149..d26e447 100644 --- a/examples/updater_threads/src/server.rs +++ b/examples/updater_threads/src/server.rs @@ -36,14 +36,6 @@ impl GenServer for UpdaterServer { Self {} } - fn init( - &mut self, - _handle: &GenServerHandle, - _state: &mut Self::State, - ) -> Result<(), Self::Error> { - Ok(()) - } - fn handle_call( &mut self, _message: Self::CallMsg, From 3210230fc81a67cf73210e3037ffaca209d289a0 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 18 Jun 2025 15:25:22 -0300 Subject: [PATCH 3/6] Improve state ownership along GenServer code --- concurrency/src/tasks/gen_server.rs | 24 +++++++++++++----------- concurrency/src/threads/gen_server.rs | 22 ++++++++++++---------- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 16f8824..a35d3da 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -134,8 +134,8 @@ where ) -> impl Future> + Send { async { match self.init(handle, state).await { - Ok(mut new_state) => { - self.main_loop(handle, rx, &mut new_state).await?; + Ok(new_state) => { + self.main_loop(handle, rx, new_state).await?; Ok(()) } Err(err) => { @@ -158,13 +158,15 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: &mut Self::State, + mut state: Self::State, ) -> impl Future> + Send { async { loop { - if !self.receive(handle, rx, state).await? { + let (new_state, cont) = self.receive(handle, rx, state).await?; + if !cont { break; } + state = new_state; } tracing::trace!("Stopping GenServer"); Ok(()) @@ -175,9 +177,9 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: &mut Self::State, - ) -> impl std::future::Future> + Send { - async { + mut state: Self::State, + ) -> impl std::future::Future> + Send { + async move { let message = rx.recv().await; // Save current state in case of a rollback @@ -186,7 +188,7 @@ where let (keep_running, error) = match message { Some(GenServerInMsg::Call { sender, message }) => { let (keep_running, error, response) = - match AssertUnwindSafe(self.handle_call(message, handle, state)) + match AssertUnwindSafe(self.handle_call(message, handle, &mut state)) .catch_unwind() .await { @@ -205,7 +207,7 @@ where (keep_running, error) } Some(GenServerInMsg::Cast { message }) => { - match AssertUnwindSafe(self.handle_cast(message, handle, state)) + match AssertUnwindSafe(self.handle_cast(message, handle, &mut state)) .catch_unwind() .await { @@ -224,9 +226,9 @@ where if let Some(error) = error { tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); // Restore initial state (ie. dismiss any change) - *state = state_clone; + state = state_clone; }; - Ok(keep_running) + Ok((state, keep_running)) } } diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index de39858..fa7c77b 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -108,8 +108,8 @@ where state: Self::State, ) -> Result<(), GenServerError> { match self.init(handle, state) { - Ok(mut new_state) => { - self.main_loop(handle, rx, &mut new_state)?; + Ok(new_state) => { + self.main_loop(handle, rx, new_state)?; Ok(()) } Err(err) => { @@ -131,12 +131,14 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: &mut Self::State, + mut state: Self::State, ) -> Result<(), GenServerError> { loop { - if !self.receive(handle, rx, state)? { + let (new_state, cont) = self.receive(handle, rx, state)?; + if !cont { break; } + state = new_state; } tracing::trace!("Stopping GenServer"); Ok(()) @@ -146,8 +148,8 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: &mut Self::State, - ) -> Result { + mut state: Self::State, + ) -> Result<(Self::State, bool), GenServerError> { let message = rx.recv().ok(); // Save current state in case of a rollback @@ -156,7 +158,7 @@ where let (keep_running, error) = match message { Some(GenServerInMsg::Call { sender, message }) => { let (keep_running, error, response) = match catch_unwind(AssertUnwindSafe(|| { - self.handle_call(message, handle, state) + self.handle_call(message, handle, &mut state) })) { Ok(response) => match response { CallResponse::Reply(response) => (true, None, Ok(response)), @@ -172,7 +174,7 @@ where } Some(GenServerInMsg::Cast { message }) => { match catch_unwind(AssertUnwindSafe(|| { - self.handle_cast(message, handle, state) + self.handle_cast(message, handle, &mut state) })) { Ok(response) => match response { CastResponse::NoReply => (true, None), @@ -189,9 +191,9 @@ where if let Some(error) = error { tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); // Restore initial state (ie. dismiss any change) - *state = state_clone; + state = state_clone; }; - Ok(keep_running) + Ok((state, keep_running)) } fn handle_call( From 28d2db5ba7dc98e7ec03957691684113d287e05f Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Wed, 18 Jun 2025 17:26:41 -0300 Subject: [PATCH 4/6] Moved state ownership into and --- concurrency/src/tasks/gen_server.rs | 94 ++++++++++--------- concurrency/src/threads/gen_server.rs | 78 ++++++++------- examples/bank/src/main.rs | 12 ++- examples/bank/src/server.rs | 55 ++++++----- examples/bank_threads/src/main.rs | 12 ++- examples/bank_threads/src/server.rs | 63 ++++++++----- examples/blocking_genserver/main.rs | 23 +++-- examples/name_server/src/server.rs | 21 +++-- examples/name_server_with_error/src/server.rs | 21 +++-- examples/updater/src/server.rs | 12 +-- examples/updater_threads/src/server.rs | 12 +-- 11 files changed, 233 insertions(+), 170 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index a35d3da..6120219 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -81,23 +81,23 @@ impl GenServerHandle { } } -pub enum GenServerInMsg { +pub enum GenServerInMsg { Call { - sender: oneshot::Sender>, - message: A::CallMsg, + sender: oneshot::Sender>, + message: G::CallMsg, }, Cast { - message: A::CastMsg, + message: G::CastMsg, }, } -pub enum CallResponse { - Reply(U), - Stop(U), +pub enum CallResponse { + Reply(G::State, G::OutMsg), + Stop(G::OutMsg), } -pub enum CastResponse { - NoReply, +pub enum CastResponse { + NoReply(G::State), Stop, } @@ -177,7 +177,7 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - mut state: Self::State, + state: Self::State, ) -> impl std::future::Future> + Send { async move { let message = rx.recv().await; @@ -185,18 +185,25 @@ where // Save current state in case of a rollback let state_clone = state.clone(); - let (keep_running, error) = match message { + let (keep_running, new_state) = match message { Some(GenServerInMsg::Call { sender, message }) => { - let (keep_running, error, response) = - match AssertUnwindSafe(self.handle_call(message, handle, &mut state)) + let (keep_running, new_state, response) = + match AssertUnwindSafe(self.handle_call(message, handle, state)) .catch_unwind() .await { Ok(response) => match response { - CallResponse::Reply(response) => (true, None, Ok(response)), - CallResponse::Stop(response) => (false, None, Ok(response)), + CallResponse::Reply(new_state, response) => { + (true, new_state, Ok(response)) + } + CallResponse::Stop(response) => (false, state_clone, Ok(response)), }, - Err(error) => (true, Some(error), Err(GenServerError::Callback)), + Err(error) => { + tracing::trace!( + "Error in callback, reverting state - Error: '{error:?}'" + ); + (true, state_clone, Err(GenServerError::Callback)) + } }; // Send response back if sender.send(response).is_err() { @@ -204,31 +211,31 @@ where "GenServer failed to send response back, client must have died" ) }; - (keep_running, error) + (keep_running, new_state) } Some(GenServerInMsg::Cast { message }) => { - match AssertUnwindSafe(self.handle_cast(message, handle, &mut state)) + match AssertUnwindSafe(self.handle_cast(message, handle, state)) .catch_unwind() .await { Ok(response) => match response { - CastResponse::NoReply => (true, None), - CastResponse::Stop => (false, None), + CastResponse::NoReply(new_state) => (true, new_state), + CastResponse::Stop => (false, state_clone), }, - Err(error) => (true, Some(error)), + Err(error) => { + tracing::trace!( + "Error in callback, reverting state - Error: '{error:?}'" + ); + (true, state_clone) + } } } None => { // Channel has been closed; won't receive further messages. Stop the server. - (false, None) + (false, state) } }; - if let Some(error) = error { - tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); - // Restore initial state (ie. dismiss any change) - state = state_clone; - }; - Ok((state, keep_running)) + Ok((new_state, keep_running)) } } @@ -236,15 +243,15 @@ where &mut self, message: Self::CallMsg, handle: &GenServerHandle, - state: &mut Self::State, - ) -> impl std::future::Future> + Send; + state: Self::State, + ) -> impl std::future::Future> + Send; fn handle_cast( &mut self, message: Self::CastMsg, handle: &GenServerHandle, - state: &mut Self::State, - ) -> impl std::future::Future + Send; + state: Self::State, + ) -> impl std::future::Future> + Send; } #[cfg(test)] @@ -279,8 +286,8 @@ mod tests { &mut self, _: Self::CallMsg, _: &GenServerHandle, - _: &mut Self::State, - ) -> CallResponse { + _: Self::State, + ) -> CallResponse { CallResponse::Stop(()) } @@ -288,8 +295,8 @@ mod tests { &mut self, _: Self::CastMsg, _: &GenServerHandle, - _: &mut Self::State, - ) -> CastResponse { + _: Self::State, + ) -> CastResponse { rt::sleep(Duration::from_millis(20)).await; thread::sleep(Duration::from_secs(2)); CastResponse::Stop @@ -318,10 +325,13 @@ mod tests { &mut self, message: Self::CallMsg, _: &GenServerHandle, - state: &mut Self::State, - ) -> CallResponse { + state: Self::State, + ) -> CallResponse { match message { - InMessage::GetCount => CallResponse::Reply(OutMsg::Count(state.count)), + InMessage::GetCount => { + let count = state.count; + CallResponse::Reply(state, OutMsg::Count(count)) + } InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)), } } @@ -330,12 +340,12 @@ mod tests { &mut self, _: Self::CastMsg, handle: &GenServerHandle, - state: &mut Self::State, - ) -> CastResponse { + mut state: Self::State, + ) -> CastResponse { state.count += 1; println!("{:?}: good still alive", thread::current().id()); send_after(Duration::from_millis(100), handle.to_owned(), ()); - CastResponse::NoReply + CastResponse::NoReply(state) } } diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index fa7c77b..1e7226f 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -59,23 +59,23 @@ impl GenServerHandle { } } -pub enum GenServerInMsg { +pub enum GenServerInMsg { Call { - sender: oneshot::Sender>, - message: A::CallMsg, + sender: oneshot::Sender>, + message: G::CallMsg, }, Cast { - message: A::CastMsg, + message: G::CastMsg, }, } -pub enum CallResponse { - Reply(U), - Stop(U), +pub enum CallResponse { + Reply(G::State, G::OutMsg), + Stop(G::OutMsg), } -pub enum CastResponse { - NoReply, +pub enum CastResponse { + NoReply(G::State), Stop, } @@ -148,65 +148,71 @@ where &mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - mut state: Self::State, + state: Self::State, ) -> Result<(Self::State, bool), GenServerError> { let message = rx.recv().ok(); // Save current state in case of a rollback let state_clone = state.clone(); - let (keep_running, error) = match message { + let (keep_running, new_state) = match message { Some(GenServerInMsg::Call { sender, message }) => { - let (keep_running, error, response) = match catch_unwind(AssertUnwindSafe(|| { - self.handle_call(message, handle, &mut state) - })) { - Ok(response) => match response { - CallResponse::Reply(response) => (true, None, Ok(response)), - CallResponse::Stop(response) => (false, None, Ok(response)), - }, - Err(error) => (true, Some(error), Err(GenServerError::Callback)), - }; + let (keep_running, new_state, response) = + match catch_unwind(AssertUnwindSafe(|| { + self.handle_call(message, handle, state) + })) { + Ok(response) => match response { + CallResponse::Reply(new_state, response) => { + (true, new_state, Ok(response)) + } + CallResponse::Stop(response) => (false, state_clone, Ok(response)), + }, + Err(error) => { + tracing::trace!( + "Error in callback, reverting state - Error: '{error:?}'" + ); + (true, state_clone, Err(GenServerError::Callback)) + } + }; // Send response back if sender.send(response).is_err() { tracing::trace!("GenServer failed to send response back, client must have died") }; - (keep_running, error) + (keep_running, new_state) } Some(GenServerInMsg::Cast { message }) => { match catch_unwind(AssertUnwindSafe(|| { - self.handle_cast(message, handle, &mut state) + self.handle_cast(message, handle, state) })) { Ok(response) => match response { - CastResponse::NoReply => (true, None), - CastResponse::Stop => (false, None), + CastResponse::NoReply(new_state) => (true, new_state), + CastResponse::Stop => (false, state_clone), }, - Err(error) => (true, Some(error)), + Err(error) => { + tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); + (true, state_clone) + } } } None => { // Channel has been closed; won't receive further messages. Stop the server. - (false, None) + (false, state) } }; - if let Some(error) = error { - tracing::trace!("Error in callback, reverting state - Error: '{error:?}'"); - // Restore initial state (ie. dismiss any change) - state = state_clone; - }; - Ok((state, keep_running)) + Ok((new_state, keep_running)) } fn handle_call( &mut self, message: Self::CallMsg, handle: &GenServerHandle, - state: &mut Self::State, - ) -> CallResponse; + state: Self::State, + ) -> CallResponse; fn handle_cast( &mut self, message: Self::CastMsg, handle: &GenServerHandle, - state: &mut Self::State, - ) -> CastResponse; + state: Self::State, + ) -> CastResponse; } diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 90f97eb..8819627 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -78,11 +78,21 @@ fn main() { assert_eq!( result, Err(BankError::InsufficientBalance { - who: joe, + who: joe.clone(), amount: 25 }) ); + let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 25).await; + tracing::info!("Withdraw result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::WidrawOk { + who: joe, + amount: 0 + }) + ); + let result = Bank::stop(&mut name_server).await; tracing::info!("Stop result {result:?}"); assert_eq!(result, Ok(BankOutMessage::Stopped)); diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index 13287fc..8ce8550 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -55,43 +55,54 @@ impl GenServer for Bank { &mut self, message: Self::CallMsg, _handle: &BankHandle, - state: &mut Self::State, - ) -> CallResponse { + mut state: Self::State, + ) -> CallResponse { match message.clone() { Self::CallMsg::New { who } => match state.get(&who) { - Some(_amount) => CallResponse::Reply(Err(BankError::AlreadyACustomer { who })), + Some(_amount) => { + CallResponse::Reply(state, Err(BankError::AlreadyACustomer { who })) + } None => { state.insert(who.clone(), 0); - CallResponse::Reply(Ok(OutMessage::Welcome { who })) + CallResponse::Reply(state, Ok(OutMessage::Welcome { who })) } }, Self::CallMsg::Add { who, amount } => match state.get(&who) { Some(current) => { let new_amount = current + amount; state.insert(who.clone(), new_amount); - CallResponse::Reply(Ok(OutMessage::Balance { - who, - amount: new_amount, - })) + CallResponse::Reply( + state, + Ok(OutMessage::Balance { + who, + amount: new_amount, + }), + ) } - None => CallResponse::Reply(Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), }, Self::CallMsg::Remove { who, amount } => match state.get(&who) { - Some(current) => match current < &amount { - true => CallResponse::Reply(Err(BankError::InsufficientBalance { - who, - amount: *current, - })), + Some(¤t) => match current < amount { + true => CallResponse::Reply( + state, + Err(BankError::InsufficientBalance { + who, + amount: current, + }), + ), false => { let new_amount = current - amount; state.insert(who.clone(), new_amount); - CallResponse::Reply(Ok(OutMessage::WidrawOk { - who, - amount: new_amount, - })) + CallResponse::Reply( + state, + Ok(OutMessage::WidrawOk { + who, + amount: new_amount, + }), + ) } }, - None => CallResponse::Reply(Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), }, Self::CallMsg::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), } @@ -101,8 +112,8 @@ impl GenServer for Bank { &mut self, _message: Self::CastMsg, _handle: &BankHandle, - _state: &mut Self::State, - ) -> CastResponse { - CastResponse::NoReply + state: Self::State, + ) -> CastResponse { + CastResponse::NoReply(state) } } diff --git a/examples/bank_threads/src/main.rs b/examples/bank_threads/src/main.rs index c04a6ac..5a28dc6 100644 --- a/examples/bank_threads/src/main.rs +++ b/examples/bank_threads/src/main.rs @@ -78,11 +78,21 @@ fn main() { assert_eq!( result, Err(BankError::InsufficientBalance { - who: joe, + who: joe.clone(), amount: 25 }) ); + let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 25); + tracing::info!("Withdraw result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::WidrawOk { + who: joe, + amount: 0 + }) + ); + let result = Bank::stop(&mut name_server); tracing::info!("Stop result {result:?}"); assert_eq!(result, Ok(BankOutMessage::Stopped)); diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs index d5dab95..02fb21c 100644 --- a/examples/bank_threads/src/server.rs +++ b/examples/bank_threads/src/server.rs @@ -51,45 +51,56 @@ impl GenServer for Bank { &mut self, message: Self::CallMsg, _handle: &BankHandle, - state: &mut Self::State, - ) -> CallResponse { + mut state: Self::State, + ) -> CallResponse { match message.clone() { - InMessage::New { who } => match state.get(&who) { - Some(_amount) => CallResponse::Reply(Err(BankError::AlreadyACustomer { who })), + Self::CallMsg::New { who } => match state.get(&who) { + Some(_amount) => { + CallResponse::Reply(state, Err(BankError::AlreadyACustomer { who })) + } None => { state.insert(who.clone(), 0); - CallResponse::Reply(Ok(OutMessage::Welcome { who })) + CallResponse::Reply(state, Ok(OutMessage::Welcome { who })) } }, - InMessage::Add { who, amount } => match state.get(&who) { + Self::CallMsg::Add { who, amount } => match state.get(&who) { Some(current) => { let new_amount = current + amount; state.insert(who.clone(), new_amount); - CallResponse::Reply(Ok(OutMessage::Balance { - who, - amount: new_amount, - })) + CallResponse::Reply( + state, + Ok(OutMessage::Balance { + who, + amount: new_amount, + }), + ) } - None => CallResponse::Reply(Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), }, - InMessage::Remove { who, amount } => match state.get(&who) { - Some(current) => match current < &amount { - true => CallResponse::Reply(Err(BankError::InsufficientBalance { - who, - amount: *current, - })), + Self::CallMsg::Remove { who, amount } => match state.get(&who) { + Some(¤t) => match current < amount { + true => CallResponse::Reply( + state, + Err(BankError::InsufficientBalance { + who, + amount: current, + }), + ), false => { let new_amount = current - amount; state.insert(who.clone(), new_amount); - CallResponse::Reply(Ok(OutMessage::WidrawOk { - who, - amount: new_amount, - })) + CallResponse::Reply( + state, + Ok(OutMessage::WidrawOk { + who, + amount: new_amount, + }), + ) } }, - None => CallResponse::Reply(Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), }, - InMessage::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), + Self::CallMsg::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), } } @@ -97,8 +108,8 @@ impl GenServer for Bank { &mut self, _message: Self::CastMsg, _handle: &BankHandle, - _state: &mut Self::State, - ) -> CastResponse { - CastResponse::NoReply + state: Self::State, + ) -> CastResponse { + CastResponse::NoReply(state) } } diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 9a2b832..8dead78 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -34,8 +34,8 @@ impl GenServer for BadlyBehavedTask { &mut self, _: Self::CallMsg, _: &GenServerHandle, - _: &mut Self::State, - ) -> CallResponse { + _: Self::State, + ) -> CallResponse { CallResponse::Stop(()) } @@ -43,8 +43,8 @@ impl GenServer for BadlyBehavedTask { &mut self, _: Self::CastMsg, _: &GenServerHandle, - _: &mut Self::State, - ) -> CastResponse { + _: Self::State, + ) -> CastResponse { rt::sleep(Duration::from_millis(20)).await; loop { println!("{:?}: bad still alive", thread::current().id()); @@ -75,10 +75,13 @@ impl GenServer for WellBehavedTask { &mut self, message: Self::CallMsg, _: &GenServerHandle, - state: &mut Self::State, - ) -> CallResponse { + state: Self::State, + ) -> CallResponse { match message { - InMessage::GetCount => CallResponse::Reply(OutMsg::Count(state.count)), + InMessage::GetCount => { + let count = state.count; + CallResponse::Reply(state, OutMsg::Count(count)) + } InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)), } } @@ -87,12 +90,12 @@ impl GenServer for WellBehavedTask { &mut self, _: Self::CastMsg, handle: &GenServerHandle, - state: &mut Self::State, - ) -> CastResponse { + mut state: Self::State, + ) -> CastResponse { state.count += 1; println!("{:?}: good still alive", thread::current().id()); send_after(Duration::from_millis(100), handle.to_owned(), ()); - CastResponse::NoReply + CastResponse::NoReply(state) } } diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index 77a8b58..6bf6b30 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -40,18 +40,19 @@ impl GenServer for NameServer { &mut self, message: Self::CallMsg, _handle: &NameServerHandle, - state: &mut Self::State, - ) -> CallResponse { + mut state: Self::State, + ) -> CallResponse { match message.clone() { Self::CallMsg::Add { key, value } => { state.insert(key, value); - CallResponse::Reply(Self::OutMsg::Ok) + CallResponse::Reply(state, Self::OutMsg::Ok) } Self::CallMsg::Find { key } => match state.get(&key) { - Some(value) => CallResponse::Reply(Self::OutMsg::Found { - value: value.to_string(), - }), - None => CallResponse::Reply(Self::OutMsg::NotFound), + Some(result) => { + let value = result.to_string(); + CallResponse::Reply(state, Self::OutMsg::Found { value }) + } + None => CallResponse::Reply(state, Self::OutMsg::NotFound), }, } } @@ -60,8 +61,8 @@ impl GenServer for NameServer { &mut self, _message: Self::CastMsg, _handle: &NameServerHandle, - _state: &mut Self::State, - ) -> CastResponse { - CastResponse::NoReply + state: Self::State, + ) -> CastResponse { + CastResponse::NoReply(state) } } diff --git a/examples/name_server_with_error/src/server.rs b/examples/name_server_with_error/src/server.rs index 9da0d0f..96c1091 100644 --- a/examples/name_server_with_error/src/server.rs +++ b/examples/name_server_with_error/src/server.rs @@ -43,22 +43,23 @@ impl GenServer for NameServer { &mut self, message: Self::CallMsg, _handle: &NameServerHandle, - state: &mut Self::State, - ) -> CallResponse { + mut state: Self::State, + ) -> CallResponse { match message.clone() { Self::CallMsg::Add { key, value } => { state.insert(key.clone(), value); if key == "error" { panic!("error!") } else { - CallResponse::Reply(Self::OutMsg::Ok) + CallResponse::Reply(state, Self::OutMsg::Ok) } } Self::CallMsg::Find { key } => match state.get(&key) { - Some(value) => CallResponse::Reply(Self::OutMsg::Found { - value: value.to_string(), - }), - None => CallResponse::Reply(Self::OutMsg::NotFound), + Some(result) => { + let value = result.to_string(); + CallResponse::Reply(state, Self::OutMsg::Found { value }) + } + None => CallResponse::Reply(state, Self::OutMsg::NotFound), }, } } @@ -67,8 +68,8 @@ impl GenServer for NameServer { &mut self, _message: Self::CastMsg, _handle: &NameServerHandle, - _state: &mut Self::State, - ) -> CastResponse { - CastResponse::NoReply + state: Self::State, + ) -> CastResponse { + CastResponse::NoReply(state) } } diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index fa75a8b..1196ab8 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -39,17 +39,17 @@ impl GenServer for UpdaterServer { &mut self, _message: Self::CallMsg, _handle: &UpdateServerHandle, - _state: &mut Self::State, - ) -> CallResponse { - CallResponse::Reply(OutMessage::Ok) + state: Self::State, + ) -> CallResponse { + CallResponse::Reply(state, OutMessage::Ok) } async fn handle_cast( &mut self, message: Self::CastMsg, handle: &UpdateServerHandle, - state: &mut Self::State, - ) -> CastResponse { + state: Self::State, + ) -> CastResponse { match message { Self::CastMsg::Check => { send_after(state.periodicity, handle.clone(), InMessage::Check); @@ -59,7 +59,7 @@ impl GenServer for UpdaterServer { tracing::info!("Response: {resp:?}"); - CastResponse::NoReply + CastResponse::NoReply(state) } } } diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs index d26e447..4cfa76b 100644 --- a/examples/updater_threads/src/server.rs +++ b/examples/updater_threads/src/server.rs @@ -40,17 +40,17 @@ impl GenServer for UpdaterServer { &mut self, _message: Self::CallMsg, _handle: &UpdateServerHandle, - _state: &mut Self::State, - ) -> CallResponse { - CallResponse::Reply(OutMessage::Ok) + state: Self::State, + ) -> CallResponse { + CallResponse::Reply(state, OutMessage::Ok) } fn handle_cast( &mut self, message: Self::CastMsg, handle: &UpdateServerHandle, - state: &mut Self::State, - ) -> CastResponse { + state: Self::State, + ) -> CastResponse { match message { Self::CastMsg::Check => { send_after(state.periodicity, handle.clone(), InMessage::Check); @@ -60,7 +60,7 @@ impl GenServer for UpdaterServer { tracing::info!("Response: {resp:?}"); - CastResponse::NoReply + CastResponse::NoReply(state) } } } From d750b15178627f7737a89d47244894946f0547a0 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 19 Jun 2025 14:27:13 -0300 Subject: [PATCH 5/6] Added documentation and examples for the init() callback --- concurrency/src/tasks/gen_server.rs | 3 +++ concurrency/src/threads/gen_server.rs | 3 +++ examples/bank/src/main.rs | 20 ++++++++++++++++++++ examples/bank/src/server.rs | 10 ++++++++++ examples/bank_threads/src/main.rs | 20 ++++++++++++++++++++ examples/bank_threads/src/server.rs | 10 ++++++++++ 6 files changed, 66 insertions(+) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 6120219..f800a2f 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -146,6 +146,9 @@ where } } + /// Initialization function. It's called before main loop. It + /// can be overrided on implementations in case initial steps are + /// required. fn init( &mut self, _handle: &GenServerHandle, diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 1e7226f..912067b 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -119,6 +119,9 @@ where } } + /// Initialization function. It's called before main loop. It + /// can be overrided on implementations in case initial steps are + /// required. fn init( &mut self, _handle: &GenServerHandle, diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 8819627..7284745 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -31,18 +31,33 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { + // Starting the bank let mut name_server = Bank::start(HashMap::new()); + // Testing initial balance for "main" account + let result = Bank::withdraw(&mut name_server, "main".to_string(), 15).await; + tracing::info!("Withdraw result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::WidrawOk { + who: "main".to_string(), + amount: 985 + }) + ); + let joe = "Joe".to_string(); + // Error on deposit for an unexistent account let result = Bank::deposit(&mut name_server, joe.clone(), 10).await; tracing::info!("Deposit result {result:?}"); assert_eq!(result, Err(BankError::NotACustomer { who: joe.clone() })); + // Account creation let result = Bank::new_account(&mut name_server, "Joe".to_string()).await; tracing::info!("New account result {result:?}"); assert_eq!(result, Ok(BankOutMessage::Welcome { who: joe.clone() })); + // Deposit let result = Bank::deposit(&mut name_server, "Joe".to_string(), 10).await; tracing::info!("Deposit result {result:?}"); assert_eq!( @@ -53,6 +68,7 @@ fn main() { }) ); + // Deposit let result = Bank::deposit(&mut name_server, "Joe".to_string(), 30).await; tracing::info!("Deposit result {result:?}"); assert_eq!( @@ -63,6 +79,7 @@ fn main() { }) ); + // Withdrawal let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 15).await; tracing::info!("Withdraw result {result:?}"); assert_eq!( @@ -73,6 +90,7 @@ fn main() { }) ); + // Withdrawal with not enough balance let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 45).await; tracing::info!("Withdraw result {result:?}"); assert_eq!( @@ -83,6 +101,7 @@ fn main() { }) ); + // Full withdrawal let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 25).await; tracing::info!("Withdraw result {result:?}"); assert_eq!( @@ -93,6 +112,7 @@ fn main() { }) ); + // Stopping the bank let result = Bank::stop(&mut name_server).await; tracing::info!("Stop result {result:?}"); assert_eq!(result, Ok(BankOutMessage::Stopped)); diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index 8ce8550..793a4ce 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -51,6 +51,16 @@ impl GenServer for Bank { Self {} } + // Initializing "main" account with 1000 in balance to test init() callback. + async fn init( + &mut self, + _handle: &GenServerHandle, + mut state: Self::State, + ) -> Result { + state.insert("main".to_string(), 1000); + Ok(state) + } + async fn handle_call( &mut self, message: Self::CallMsg, diff --git a/examples/bank_threads/src/main.rs b/examples/bank_threads/src/main.rs index 5a28dc6..ced28da 100644 --- a/examples/bank_threads/src/main.rs +++ b/examples/bank_threads/src/main.rs @@ -31,18 +31,33 @@ use spawned_rt::threads as rt; fn main() { rt::run(|| { + // Starting the bank let mut name_server = Bank::start(HashMap::new()); + // Testing initial balance for "main" account + let result = Bank::withdraw(&mut name_server, "main".to_string(), 15); + tracing::info!("Withdraw result {result:?}"); + assert_eq!( + result, + Ok(BankOutMessage::WidrawOk { + who: "main".to_string(), + amount: 985 + }) + ); + let joe = "Joe".to_string(); + // Error on deposit for an unexistent account let result = Bank::deposit(&mut name_server, joe.clone(), 10); tracing::info!("Deposit result {result:?}"); assert_eq!(result, Err(BankError::NotACustomer { who: joe.clone() })); + // Account creation let result = Bank::new_account(&mut name_server, "Joe".to_string()); tracing::info!("New account result {result:?}"); assert_eq!(result, Ok(BankOutMessage::Welcome { who: joe.clone() })); + // Deposit let result = Bank::deposit(&mut name_server, "Joe".to_string(), 10); tracing::info!("Deposit result {result:?}"); assert_eq!( @@ -53,6 +68,7 @@ fn main() { }) ); + // Deposit let result = Bank::deposit(&mut name_server, "Joe".to_string(), 30); tracing::info!("Deposit result {result:?}"); assert_eq!( @@ -63,6 +79,7 @@ fn main() { }) ); + // Withdrawal let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 15); tracing::info!("Withdraw result {result:?}"); assert_eq!( @@ -73,6 +90,7 @@ fn main() { }) ); + // Withdrawal with not enough balance let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 45); tracing::info!("Withdraw result {result:?}"); assert_eq!( @@ -83,6 +101,7 @@ fn main() { }) ); + // Full withdrawal let result = Bank::withdraw(&mut name_server, "Joe".to_string(), 25); tracing::info!("Withdraw result {result:?}"); assert_eq!( @@ -93,6 +112,7 @@ fn main() { }) ); + // Stopping the bank let result = Bank::stop(&mut name_server); tracing::info!("Stop result {result:?}"); assert_eq!(result, Ok(BankOutMessage::Stopped)); diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs index 02fb21c..69820a3 100644 --- a/examples/bank_threads/src/server.rs +++ b/examples/bank_threads/src/server.rs @@ -47,6 +47,16 @@ impl GenServer for Bank { Self {} } + // Initializing "main" account with 1000 in balance to test init() callback. + fn init( + &mut self, + _handle: &GenServerHandle, + mut state: Self::State, + ) -> Result { + state.insert("main".to_string(), 1000); + Ok(state) + } + fn handle_call( &mut self, message: Self::CallMsg, From 620da61ecd58f5b0b19d3115827f6bf149f5b0c9 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Mon, 23 Jun 2025 10:51:18 -0300 Subject: [PATCH 6/6] Added periodic check initialization in init() function --- examples/updater/src/main.rs | 7 +------ examples/updater/src/server.rs | 19 ++++++++++--------- examples/updater_threads/src/main.rs | 7 +------ examples/updater_threads/src/server.rs | 19 ++++++++++--------- 4 files changed, 22 insertions(+), 30 deletions(-) diff --git a/examples/updater/src/main.rs b/examples/updater/src/main.rs index 5119b9e..c01f3c6 100644 --- a/examples/updater/src/main.rs +++ b/examples/updater/src/main.rs @@ -8,22 +8,17 @@ mod server; use std::{thread, time::Duration}; -use messages::UpdaterOutMessage; use server::{UpdateServerState, UpdaterServer}; use spawned_concurrency::tasks::GenServer as _; use spawned_rt::tasks as rt; fn main() { rt::run(async { - let mut update_server = UpdaterServer::start(UpdateServerState { + UpdaterServer::start(UpdateServerState { url: "https://httpbin.org/ip".to_string(), periodicity: Duration::from_millis(1000), }); - let result = UpdaterServer::check(&mut update_server).await; - tracing::info!("Update check done: {result:?}"); - assert_eq!(result, UpdaterOutMessage::Ok); - // giving it some time before ending thread::sleep(Duration::from_secs(10)); }) diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index 1196ab8..5c610a2 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -15,15 +15,6 @@ pub struct UpdateServerState { } pub struct UpdaterServer {} -impl UpdaterServer { - pub async fn check(server: &mut UpdateServerHandle) -> OutMessage { - match server.cast(InMessage::Check).await { - Ok(_) => OutMessage::Ok, - Err(_) => OutMessage::Error, - } - } -} - impl GenServer for UpdaterServer { type CallMsg = (); type CastMsg = InMessage; @@ -35,6 +26,16 @@ impl GenServer for UpdaterServer { Self {} } + // Initializing GenServer to start periodic checks + async fn init( + &mut self, + handle: &GenServerHandle, + state: Self::State, + ) -> Result { + send_after(state.periodicity, handle.clone(), InMessage::Check); + Ok(state) + } + async fn handle_call( &mut self, _message: Self::CallMsg, diff --git a/examples/updater_threads/src/main.rs b/examples/updater_threads/src/main.rs index 64236be..b4409b5 100644 --- a/examples/updater_threads/src/main.rs +++ b/examples/updater_threads/src/main.rs @@ -8,22 +8,17 @@ mod server; use std::{thread, time::Duration}; -use messages::UpdaterOutMessage; use server::{UpdateServerState, UpdaterServer}; use spawned_concurrency::threads::GenServer as _; use spawned_rt::threads as rt; fn main() { rt::run(|| { - let mut update_server = UpdaterServer::start(UpdateServerState { + UpdaterServer::start(UpdateServerState { url: "https://httpbin.org/ip".to_string(), periodicity: Duration::from_millis(1000), }); - let result = UpdaterServer::check(&mut update_server); - tracing::info!("Update check done: {result:?}"); - assert_eq!(result, UpdaterOutMessage::Ok); - // giving it some time before ending thread::sleep(Duration::from_secs(10)); }) diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs index 4cfa76b..bd5e6cd 100644 --- a/examples/updater_threads/src/server.rs +++ b/examples/updater_threads/src/server.rs @@ -16,15 +16,6 @@ pub struct UpdateServerState { } pub struct UpdaterServer {} -impl UpdaterServer { - pub fn check(server: &mut UpdateServerHandle) -> OutMessage { - match server.cast(InMessage::Check) { - Ok(_) => OutMessage::Ok, - Err(_) => OutMessage::Error, - } - } -} - impl GenServer for UpdaterServer { type CallMsg = (); type CastMsg = InMessage; @@ -36,6 +27,16 @@ impl GenServer for UpdaterServer { Self {} } + // Initializing GenServer to start periodic checks. + fn init( + &mut self, + handle: &GenServerHandle, + state: Self::State, + ) -> Result { + send_after(state.periodicity, handle.clone(), InMessage::Check); + Ok(state) + } + fn handle_call( &mut self, _message: Self::CallMsg,