From 2e2d0e48d00320d9dfb8e152959a042d0c2ad80a Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 29 Dec 2023 13:40:23 +0100 Subject: [PATCH 01/25] Bump main to staging (#7) * Setup (#1) - Add tokio, serde and sqlx crates - Setup .gitignore - Setup Cargo.toml - Setup pipelines * Enable Dependabot (#3) Add dependabot.yml with daily cargo updates configured * Fix staging pipelines (#4) Fix staging pipelines not triggering on pull request * Implement Cargo caching (#5) Add Swatinem's Rust-Cache@v2 action to build, test and deploy pipelines * Improve README.md (#6) - Add deployment badges - Add collaborating info (board + issues) --- README.md | 2 +- src/main.rs | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 03a82eb..985ef57 100644 --- a/README.md +++ b/README.md @@ -14,4 +14,4 @@ Beta: [![Deploy](https://github.com/Kitt3120/lum/actions/workflows/deploy_prerel # Collaborating -Checkout out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues) +Check out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues) diff --git a/src/main.rs b/src/main.rs index 7412001..bba659f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,10 +23,7 @@ async fn main() { let config = match config_handler.load_config() { Ok(config) => config, Err(err) => { - error!( - "Error reading config file: {}\n{} will exit.", - err, BOT_NAME - ); + error!("Error reading config file: {}\n{} will exit.", err, BOT_NAME); return; } @@ -43,10 +40,7 @@ async fn main() { fn setup_logger() { if let Err(error) = log::setup() { - panic!( - "Error setting up the Logger: {}\n{} will exit.", - error, BOT_NAME - ); + panic!("Error setting up the Logger: {}\n{} will exit.", error, BOT_NAME); } } From 04ad42bfc82fc185b52b5ec5b6067058bb7aaf2c Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Mon, 29 Jan 2024 14:25:28 +0100 Subject: [PATCH 02/25] Clonable Status - Made Status clonable by not using BoxedErrors anymore but Strings for holding the error information - Add get_status() to Service - Made status property of Service private --- src/service/service.rs | 8 +++++++- src/service/types.rs | 8 ++++---- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/service/service.rs b/src/service/service.rs index 185f06a..cdb9132 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -18,7 +18,7 @@ pub struct ServiceInfo { pub name: String, pub priority: Priority, - pub status: Arc>, + status: Arc>, } impl ServiceInfo { @@ -31,6 +31,12 @@ impl ServiceInfo { } } + pub async fn get_status(&self) -> Status { + let lock = self.status.read().await; + let clone = lock.clone(); + clone + } + pub async fn set_status(&self, status: Status) { let mut lock = self.status.write().await; *(lock) = status diff --git a/src/service/types.rs b/src/service/types.rs index 5ae3760..d878a4a 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -10,15 +10,15 @@ pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Status { Started, Stopped, Starting, Stopping, - FailedToStart(BoxedError), //TODO: Test out if it'd be better to use a String instead - FailedToStop(BoxedError), - RuntimeError(BoxedError), + FailedToStart(String), + FailedToStop(String), + RuntimeError(String), } impl Display for Status { From b37e205fa6960ecfe2e6171eeb48a2a0980d837e Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Mon, 29 Jan 2024 19:00:50 +0100 Subject: [PATCH 03/25] Adapt Service Manager to new Status enum --- src/service/service.rs | 3 +-- src/service/service_manager.rs | 26 +++++++++++++------------- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/src/service/service.rs b/src/service/service.rs index cdb9132..5d1ba0f 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -33,8 +33,7 @@ impl ServiceInfo { pub async fn get_status(&self) -> Status { let lock = self.status.read().await; - let clone = lock.clone(); - clone + lock.clone() } pub async fn set_status(&self, status: Status) { diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 91fb842..6ae6afa 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -197,9 +197,9 @@ impl ServiceManager { Box::pin(async move { for service in self.services.iter() { let service = service.read().await; - let status = service.info().status.read().await; + let status = service.info().get_status().await; - if !matches!(&*status, Status::Started) { + if !matches!(status, Status::Started) { return OverallStatus::Unhealthy; } } @@ -223,9 +223,9 @@ impl ServiceManager { let service = service.read().await; let info = service.info(); let priority = &info.priority; - let status = info.status.read().await; + let status = info.get_status().await; - match *status { + match status { Status::Started | Status::Stopped => match priority { Priority::Essential => { non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); @@ -293,16 +293,16 @@ impl ServiceManager { &self, service: &RwLockReadGuard<'_, dyn Service>, ) -> bool { - let status = service.info().status.read().await; - matches!(&*status, Status::Started) + let status = service.info().get_status().await; + matches!(status, Status::Started) } async fn is_service_stopped( &self, service: &RwLockReadGuard<'_, dyn Service>, ) -> bool { - let status = service.info().status.read().await; - matches!(&*status, Status::Stopped) + let status = service.info().get_status().await; + matches!(status, Status::Stopped) } async fn init_service( @@ -321,14 +321,14 @@ impl ServiceManager { service.info().set_status(Status::Started).await; } Err(error) => { - service.info().set_status(Status::FailedToStart(error)).await; + service.info().set_status(Status::FailedToStart(error.to_string())).await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } }, Err(error) => { service .info() - .set_status(Status::FailedToStart(Box::new(error))) + .set_status(Status::FailedToStart(error.to_string())) .await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } @@ -351,14 +351,14 @@ impl ServiceManager { service.info().set_status(Status::Stopped).await; } Err(error) => { - service.info().set_status(Status::FailedToStop(error)).await; + service.info().set_status(Status::FailedToStop(error.to_string())).await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } }, Err(error) => { service .info() - .set_status(Status::FailedToStop(Box::new(error))) + .set_status(Status::FailedToStop(error.to_string())) .await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } @@ -404,7 +404,7 @@ impl ServiceManager { service .info() .set_status(Status::RuntimeError( - format!("Background task ended with error: {}", error).into(), + format!("Background task ended with error: {}", error), )) .await; } From 41e11c54a7602ff1ac1270e04a81991649e453c6 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Thu, 8 Feb 2024 23:28:54 +0100 Subject: [PATCH 04/25] Events - Implement Event - Add status_changed Event to ServiceInfo --- src/event.rs | 67 ++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 1 + src/service/service.rs | 12 +++++++- 3 files changed, 79 insertions(+), 1 deletion(-) create mode 100644 src/event.rs diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 0000000..1c96707 --- /dev/null +++ b/src/event.rs @@ -0,0 +1,67 @@ +use crate::service::BoxedError; +use std::{fmt::Debug, sync::Arc}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, +}; + +pub struct Event { + receivers: Mutex>>>, + closures: Mutex) -> Result<(), BoxedError> + Send>>>, +} + +impl Event { + pub fn new() -> Self { + Self { + receivers: Mutex::new(Vec::new()), + closures: Mutex::new(Vec::new()), + } + } + + pub async fn get_receiver(&self, buffer: usize) -> Receiver> { + let (sender, receiver) = channel(buffer); + let mut subscribers = self.receivers.lock().await; + subscribers.push(sender); + receiver + } + + pub async fn subscribe(&self, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + 'static) { + let mut closures = self.closures.lock().await; + closures.push(Box::new(closure)); + } + + pub async fn dispatch(&self, data: T) { + let subscribers = self.receivers.lock().await; + let data = Arc::new(data); + + for subscriber in subscribers.iter() { + let data = Arc::clone(&data); + let result = subscriber.send(data).await; + if let Err(err) = result { + log::error!("Event failed to dispatch data to one of its receivers: {}", err); + } + } + + let closures = self.closures.lock().await; + for closure in closures.iter() { + let data = Arc::clone(&data); + let result = closure(data); + if let Err(err) = result { + log::error!("Event failed to dispatch data to one of its closures: {}", err); + } + } + } +} + +impl Default for Event { + fn default() -> Self { + Self::new() + } +} + +impl Debug for Event { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct(format!("Event of type {}", std::any::type_name::()).as_str()) + .finish() + } +} diff --git a/src/lib.rs b/src/lib.rs index f59cd5b..db7c2fb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ use std::time::SystemTime; pub mod bot; pub mod config; +pub mod event; pub mod log; pub mod service; pub mod setlock; diff --git a/src/service/service.rs b/src/service/service.rs index 5d1ba0f..6a71992 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -7,6 +7,8 @@ use std::{ use downcast_rs::{impl_downcast, DowncastSync}; use tokio::sync::RwLock; +use crate::event::Event; + use super::{ service_manager::ServiceManager, types::{PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, Status}, @@ -19,6 +21,7 @@ pub struct ServiceInfo { pub priority: Priority, status: Arc>, + pub status_changed: Event, } impl ServiceInfo { @@ -28,6 +31,7 @@ impl ServiceInfo { name: name.to_string(), priority, status: Arc::new(RwLock::new(Status::Stopped)), + status_changed: Event::new(), } } @@ -38,7 +42,13 @@ impl ServiceInfo { pub async fn set_status(&self, status: Status) { let mut lock = self.status.write().await; - *(lock) = status + + let previous_status = lock.clone(); + *(lock) = status; + + if previous_status != *lock { + self.status_changed.dispatch(lock.clone()).await; + } } } From c6ce2368b7c17627c02782a7f4472fd4f76fac05 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 10 Feb 2024 16:38:25 +0100 Subject: [PATCH 05/25] Event improvements - Add name attribute to Event - Unify Channel and Closure subscribers by using an Enum - Propagate errors when dispatching events - Add error log when errors occur while dispatching events - Subscribers are now removed from an event when they run into an error while dispatching --- src/event.rs | 89 ++++++++++++++++++++++++++++++------------ src/service.rs | 2 +- src/service/service.rs | 4 +- 3 files changed, 66 insertions(+), 29 deletions(-) diff --git a/src/event.rs b/src/event.rs index 1c96707..9a35fdf 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,61 +1,98 @@ use crate::service::BoxedError; use std::{fmt::Debug, sync::Arc}; use tokio::sync::{ - mpsc::{channel, Receiver, Sender}, + mpsc::{channel, error::SendError, Receiver, Sender}, Mutex, }; +pub enum Subscriber { + Channel(Sender>), + Closure(Box) -> Result<(), BoxedError> + Send + Sync>), +} + +pub enum EventError { + ChannelSend(SendError>), + Closure(BoxedError), +} + pub struct Event { - receivers: Mutex>>>, - closures: Mutex) -> Result<(), BoxedError> + Send>>>, + pub name: String, + subscribers: Mutex>>, } impl Event { - pub fn new() -> Self { + pub fn new(name: &str) -> Self { Self { - receivers: Mutex::new(Vec::new()), - closures: Mutex::new(Vec::new()), + name: name.to_string(), + subscribers: Mutex::new(Vec::new()), } } - pub async fn get_receiver(&self, buffer: usize) -> Receiver> { + pub async fn subscriber_count(&self) -> usize { + let subscribers = self.subscribers.lock().await; + subscribers.len() + } + + pub async fn open_channel(&self, buffer: usize) -> Receiver> { let (sender, receiver) = channel(buffer); - let mut subscribers = self.receivers.lock().await; - subscribers.push(sender); + let mut subscribers = self.subscribers.lock().await; + subscribers.push(Subscriber::Channel(sender)); receiver } - pub async fn subscribe(&self, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + 'static) { - let mut closures = self.closures.lock().await; - closures.push(Box::new(closure)); + pub async fn subscribe( + &self, + closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + ) { + let mut subscribers = self.subscribers.lock().await; + subscribers.push(Subscriber::Closure(Box::new(closure))); } - pub async fn dispatch(&self, data: T) { - let subscribers = self.receivers.lock().await; + pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { + let mut subscribers = self.subscribers.lock().await; let data = Arc::new(data); - for subscriber in subscribers.iter() { + let mut errors = Vec::new(); + let mut subscribers_to_remove = Vec::new(); + + for (index, subscriber) in subscribers.iter().enumerate() { let data = Arc::clone(&data); - let result = subscriber.send(data).await; - if let Err(err) = result { - log::error!("Event failed to dispatch data to one of its receivers: {}", err); + + match subscriber { + Subscriber::Channel(sender) => { + let result = sender.send(data).await; + if let Err(err) = result { + log::error!("Event \"{}\" failed to dispatch data to receiver {}: {}. Receiver will be unregistered from event.", self.name, index, err); + errors.push(EventError::ChannelSend(err)); + subscribers_to_remove.push(index); + } + } + Subscriber::Closure(closure) => { + let result = closure(data); + if let Err(err) = result { + log::error!("Event \"{}\" failed to dispatch data to closure {}: {}. Closure will be unregistered from event.", self.name, index, err); + errors.push(EventError::Closure(err)); + subscribers_to_remove.push(index); + } + } } } - let closures = self.closures.lock().await; - for closure in closures.iter() { - let data = Arc::clone(&data); - let result = closure(data); - if let Err(err) = result { - log::error!("Event failed to dispatch data to one of its closures: {}", err); - } + for index in subscribers_to_remove.into_iter().rev() { + subscribers.remove(index); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) } } } impl Default for Event { fn default() -> Self { - Self::new() + Self::new("Unnamed Event") } } diff --git a/src/service.rs b/src/service.rs index 10131cc..378506a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,5 @@ pub mod discord; -pub mod service; +pub mod service; // Will be fixed when lum gets seperated into multiple workspaces pub mod service_manager; pub mod types; pub mod watchdog; diff --git a/src/service/service.rs b/src/service/service.rs index 6a71992..5f6e6ca 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -31,7 +31,7 @@ impl ServiceInfo { name: name.to_string(), priority, status: Arc::new(RwLock::new(Status::Stopped)), - status_changed: Event::new(), + status_changed: Event::new(format!("{}-status-changed", name).as_str()), } } @@ -47,7 +47,7 @@ impl ServiceInfo { *(lock) = status; if previous_status != *lock { - self.status_changed.dispatch(lock.clone()).await; + let _ = self.status_changed.dispatch(lock.clone()).await; } } } From e500458ce2602b4af82d50040be25ca31c1062d5 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 29 Mar 2024 22:43:56 +0100 Subject: [PATCH 06/25] Event improvements Make the removal of Event subscribers on error optional --- src/event.rs | 14 ++++++++++---- src/service/service.rs | 2 +- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/event.rs b/src/event.rs index 9a35fdf..5568358 100644 --- a/src/event.rs +++ b/src/event.rs @@ -18,13 +18,15 @@ pub enum EventError { pub struct Event { pub name: String, subscribers: Mutex>>, + remove_subscriber_on_error: bool, } impl Event { - pub fn new(name: &str) -> Self { + pub fn new(name: &str, remove_subscriber_on_error: bool) -> Self { Self { name: name.to_string(), subscribers: Mutex::new(Vec::new()), + remove_subscriber_on_error, } } @@ -61,6 +63,7 @@ impl Event { match subscriber { Subscriber::Channel(sender) => { let result = sender.send(data).await; + if let Err(err) = result { log::error!("Event \"{}\" failed to dispatch data to receiver {}: {}. Receiver will be unregistered from event.", self.name, index, err); errors.push(EventError::ChannelSend(err)); @@ -69,6 +72,7 @@ impl Event { } Subscriber::Closure(closure) => { let result = closure(data); + if let Err(err) = result { log::error!("Event \"{}\" failed to dispatch data to closure {}: {}. Closure will be unregistered from event.", self.name, index, err); errors.push(EventError::Closure(err)); @@ -78,8 +82,10 @@ impl Event { } } - for index in subscribers_to_remove.into_iter().rev() { - subscribers.remove(index); + if self.remove_subscriber_on_error { + for index in subscribers_to_remove.into_iter().rev() { + subscribers.remove(index); + } } if errors.is_empty() { @@ -92,7 +98,7 @@ impl Event { impl Default for Event { fn default() -> Self { - Self::new("Unnamed Event") + Self::new("Unnamed Event", false) } } diff --git a/src/service/service.rs b/src/service/service.rs index 5f6e6ca..19856f7 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -31,7 +31,7 @@ impl ServiceInfo { name: name.to_string(), priority, status: Arc::new(RwLock::new(Status::Stopped)), - status_changed: Event::new(format!("{}-status-changed", name).as_str()), + status_changed: Event::new(format!("{}-status-changed", name).as_str(), false), } } From 6c7477dfcbf9b8ba0d3f51864d197a4008af00f1 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Mon, 1 Apr 2024 00:34:27 +0200 Subject: [PATCH 07/25] Slight refactors - Refactors in service_manager.rs - Refactors in watchdog.rs --- src/service/service_manager.rs | 53 ++++++++++++++++++++-------------- src/service/watchdog.rs | 4 +-- 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 6ae6afa..af742cf 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -1,4 +1,4 @@ -use crate::{service::Watchdog, setlock::SetLock}; +use crate::{service::Watchdog, setlock::{SetLock, SetLockError}}; use log::{error, info, warn}; use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; use tokio::{ @@ -58,11 +58,15 @@ impl ServiceManagerBuilder { }; let self_arc = Arc::new(service_manager); + let self_arc_clone = Arc::clone(&self_arc); - match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { - Ok(()) => {} - Err(err) => { - panic!("Failed to set ServiceManager in SetLock for self_arc: {}", err); + let result = self_arc_clone.arc.write().await.set(Arc::clone(&self_arc_clone)); + + if let Err(err) = result { + match err { + SetLockError::AlreadySet => { + unreachable!("Unable to set ServiceManager's self-arc in ServiceManagerBuilder because it was already set. This should never happen. How did you...?"); + } } } @@ -82,8 +86,10 @@ impl ServiceManager { } pub async fn manages_service(&self, service_id: &str) -> bool { - for registered_service in self.services.iter() { - if registered_service.read().await.info().id == service_id { + for service in self.services.iter() { + let service_lock = service.read().await; + + if service_lock.info().id == service_id { return true; } } @@ -93,7 +99,6 @@ impl ServiceManager { pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { let service_lock = service.read().await; - let service_id = service_lock.info().id.clone(); if !self.manages_service(&service_id).await { @@ -104,7 +109,7 @@ impl ServiceManager { return Err(StartupError::ServiceNotStopped(service_id)); } - if self.has_background_task_running(&service_lock).await { + if self.has_background_task_registered(&service_id).await { return Err(StartupError::BackgroundTaskAlreadyRunning(service_id)); } @@ -112,6 +117,7 @@ impl ServiceManager { let mut service_lock = service.write().await; service_lock.info().set_status(Status::Starting).await; + self.init_service(&mut service_lock).await?; self.start_background_task(&service_lock, Arc::clone(&service)).await; @@ -122,13 +128,14 @@ impl ServiceManager { pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { let service_lock = service.read().await; + let service_id = service_lock.info().id.clone(); - if !(self.manages_service(service_lock.info().id.as_str()).await) { - return Err(ShutdownError::ServiceNotManaged(service_lock.info().id.clone())); + if !(self.manages_service(&service_id).await) { + return Err(ShutdownError::ServiceNotManaged(service_id)); } if !self.is_service_started(&service_lock).await { - return Err(ShutdownError::ServiceNotStarted(service_lock.info().id.clone())); + return Err(ShutdownError::ServiceNotStarted(service_id)); } self.stop_background_task(&service_lock).await; @@ -281,12 +288,12 @@ impl ServiceManager { // Helper methods for start_service and stop_service - async fn has_background_task_running( + async fn has_background_task_registered( &self, - service: &RwLockReadGuard<'_, dyn Service>, + service_id: &str, ) -> bool { let tasks = self.background_tasks.read().await; - tasks.contains_key(service.info().id.as_str()) + tasks.contains_key(service_id) } async fn is_service_started( @@ -368,13 +375,15 @@ impl ServiceManager { } async fn start_background_task(&self, service_lock: &RwLockWriteGuard<'_, dyn Service>, service: Arc>) { + if self.has_background_task_registered(&service_lock.info().id).await { + return; + } + let task = service_lock.task(); if let Some(task) = task { let mut watchdog = Watchdog::new(task); watchdog.append(|result| async move { - let service = service; - /* We technically only need a read lock here, but we want to immediately stop other services from accessing the service, so we acquire a write lock instead. @@ -390,7 +399,7 @@ impl ServiceManager { service .info() - .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) + .set_status(Status::RuntimeError("Background task ended unexpectedly!".to_string())) .await; } @@ -422,12 +431,14 @@ impl ServiceManager { } async fn stop_background_task(&self, service_lock: &RwLockReadGuard<'_, dyn Service>) { - if self.has_background_task_running(service_lock).await { - let mut tasks_lock = self.background_tasks.write().await; + if !self.has_background_task_registered(&service_lock.info().id).await { + return; + } + + let mut tasks_lock = self.background_tasks.write().await; let task = tasks_lock.get(service_lock.info().id.as_str()).unwrap(); task.abort(); tasks_lock.remove(service_lock.info().id.as_str()); - } } } diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs index 8102f98..9d826d7 100644 --- a/src/service/watchdog.rs +++ b/src/service/watchdog.rs @@ -1,3 +1,4 @@ +use super::PinnedBoxedFuture; use log::error; use serenity::FutureExt; use std::{future::Future, mem::replace, sync::Arc}; @@ -6,8 +7,7 @@ use tokio::sync::{ Mutex, }; -use super::PinnedBoxedFuture; - +//TODO: Rename to TaskChain and use Event instead of manual subscriber handling pub struct Watchdog<'a, T: Send> { task: PinnedBoxedFuture<'a, T>, subscribers: Arc>>>>, From bdf8ce905b45c492552fa9393c75e16f04802931 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 28 Jun 2024 20:20:20 +0200 Subject: [PATCH 08/25] WIP: Idk lol I made these changes many months ago. Reviewed them for like half an hour, looks good. I know what I was working on. Will continue now :) --- build.rs | 2 +- src/bot.rs | 19 +++++++ src/event.rs | 53 +++++++++++++++---- src/lib.rs | 11 +--- src/main.rs | 1 - src/service.rs | 2 +- src/service/discord.rs | 1 + src/service/service.rs | 14 ++--- src/service/service_manager.rs | 93 +++++++++++++++++++--------------- 9 files changed, 126 insertions(+), 70 deletions(-) diff --git a/build.rs b/build.rs index 7609593..d506869 100644 --- a/build.rs +++ b/build.rs @@ -2,4 +2,4 @@ fn main() { // trigger recompilation when a new migration is added println!("cargo:rerun-if-changed=migrations"); -} \ No newline at end of file +} diff --git a/src/bot.rs b/src/bot.rs index 69bda29..f002d95 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -1,9 +1,15 @@ use std::sync::Arc; +use log::info; use tokio::sync::RwLock; use crate::service::{PinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder}; +pub enum ExitReason { + SIGINT, + EssentialServiceFailed(String), +} + pub struct BotBuilder { name: String, service_manager: ServiceManagerBuilder, @@ -64,4 +70,17 @@ impl Bot { //TODO: Potential for further deinitialization here, like modules }) } + + pub async fn join(&self) -> ExitReason { + match tokio::signal::ctrl_c().await { + Ok(_) => { + info!("Received SIGINT, {} will now shut down", self.name); + } + Err(error) => { + panic!("Error receiving SIGINT: {}\n{} will exit.", error, self.name); + } + } + + ExitReason::SIGINT + } } diff --git a/src/event.rs b/src/event.rs index 5568358..6954ebc 100644 --- a/src/event.rs +++ b/src/event.rs @@ -17,19 +17,29 @@ pub enum EventError { pub struct Event { pub name: String, + subscribers: Mutex>>, + log_on_error: bool, remove_subscriber_on_error: bool, } impl Event { - pub fn new(name: &str, remove_subscriber_on_error: bool) -> Self { + pub fn new(name: &str, log_on_error: bool, remove_subscriber_on_error: bool) -> Self { Self { name: name.to_string(), subscribers: Mutex::new(Vec::new()), + log_on_error, remove_subscriber_on_error, } } + pub fn new_with_defaults(name: &str) -> Self { + Self { + name: name.to_string(), + ..Default::default() + } + } + pub async fn subscriber_count(&self) -> usize { let subscribers = self.subscribers.lock().await; subscribers.len() @@ -65,27 +75,50 @@ impl Event { let result = sender.send(data).await; if let Err(err) = result { - log::error!("Event \"{}\" failed to dispatch data to receiver {}: {}. Receiver will be unregistered from event.", self.name, index, err); + if self.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to receiver {}: {}.", + self.name, + index, + err + ); + } + + if self.remove_subscriber_on_error { + log::error!("Receiver will be unregistered from event."); + subscribers_to_remove.push(index); + } + errors.push(EventError::ChannelSend(err)); - subscribers_to_remove.push(index); } } + Subscriber::Closure(closure) => { let result = closure(data); if let Err(err) = result { - log::error!("Event \"{}\" failed to dispatch data to closure {}: {}. Closure will be unregistered from event.", self.name, index, err); + if self.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to closure {}: {}.", + self.name, + index, + err + ); + } + + if self.remove_subscriber_on_error { + log::error!("Closure will be unregistered from event."); + subscribers_to_remove.push(index); + } + errors.push(EventError::Closure(err)); - subscribers_to_remove.push(index); } } } } - if self.remove_subscriber_on_error { - for index in subscribers_to_remove.into_iter().rev() { - subscribers.remove(index); - } + for index in subscribers_to_remove.into_iter().rev() { + subscribers.remove(index); } if errors.is_empty() { @@ -98,7 +131,7 @@ impl Event { impl Default for Event { fn default() -> Self { - Self::new("Unnamed Event", false) + Self::new("Unnamed Event", true, false) } } diff --git a/src/lib.rs b/src/lib.rs index db7c2fb..793ec3f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -17,7 +17,6 @@ pub fn is_debug() -> bool { pub async fn run(mut bot: Bot) { if !log::is_set_up() { eprintln!("Logger has not been set up!\n{} will exit.", bot.name); - return; } @@ -32,7 +31,6 @@ pub async fn run(mut bot: Bot) { "Error getting elapsed startup time: {}\n{} will exit.", error, bot.name ); - return; } }; @@ -50,14 +48,7 @@ pub async fn run(mut bot: Bot) { info!("{} is alive", bot.name,); //TODO: Add CLI commands - match tokio::signal::ctrl_c().await { - Ok(_) => { - info!("Received SIGINT, {} will now shut down", bot.name); - } - Err(error) => { - panic!("Error receiving SIGINT: {}\n{} will exit.", error, bot.name); - } - } + let exit_reason = bot.join().await; bot.stop().await; diff --git a/src/main.rs b/src/main.rs index bba659f..871eb86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,7 +24,6 @@ async fn main() { Ok(config) => config, Err(err) => { error!("Error reading config file: {}\n{} will exit.", err, BOT_NAME); - return; } }; diff --git a/src/service.rs b/src/service.rs index 378506a..1ac1c28 100644 --- a/src/service.rs +++ b/src/service.rs @@ -8,6 +8,6 @@ pub use service::{Service, ServiceInfo}; pub use service_manager::{ServiceManager, ServiceManagerBuilder}; pub use types::{ BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, - Priority, StartupError, Status, + Priority, ShutdownError, StartupError, Status, }; pub use watchdog::Watchdog; diff --git a/src/service/discord.rs b/src/service/discord.rs index d6f31ff..2c4b37f 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -20,6 +20,7 @@ use tokio::{ time::sleep, }; +//TODO: Restructure pub struct DiscordService { info: ServiceInfo, discord_token: String, diff --git a/src/service/service.rs b/src/service/service.rs index 19856f7..f6bf03a 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -21,7 +21,8 @@ pub struct ServiceInfo { pub priority: Priority, status: Arc>, - pub status_changed: Event, + + pub on_status_change: Event, } impl ServiceInfo { @@ -31,7 +32,7 @@ impl ServiceInfo { name: name.to_string(), priority, status: Arc::new(RwLock::new(Status::Stopped)), - status_changed: Event::new(format!("{}-status-changed", name).as_str(), false), + on_status_change: Event::new_with_defaults(format!("{}::on_status_change", name).as_str()), } } @@ -42,13 +43,14 @@ impl ServiceInfo { pub async fn set_status(&self, status: Status) { let mut lock = self.status.write().await; - let previous_status = lock.clone(); - *(lock) = status; - if previous_status != *lock { - let _ = self.status_changed.dispatch(lock.clone()).await; + if previous_status == status { + return; } + + *(lock) = status; + let _ = self.on_status_change.dispatch(lock.clone()).await; } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index af742cf..1dd5dec 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -1,4 +1,11 @@ -use crate::{service::Watchdog, setlock::{SetLock, SetLockError}}; +use super::{ + service::Service, + types::{OverallStatus, PinnedBoxedFuture, Priority, ShutdownError, StartupError, Status}, +}; +use crate::{ + service::Watchdog, + setlock::{SetLock, SetLockError}, +}; use log::{error, info, warn}; use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; use tokio::{ @@ -7,10 +14,6 @@ use tokio::{ task::JoinHandle, time::timeout, }; -use super::{ - service::Service, - types::{OverallStatus, PinnedBoxedFuture, Priority, ShutdownError, StartupError, Status}, -}; #[derive(Default)] pub struct ServiceManagerBuilder { @@ -18,7 +21,7 @@ pub struct ServiceManagerBuilder { } impl ServiceManagerBuilder { - pub fn new() -> Self { +pub fn new() -> Self { Self { services: Vec::new() } } @@ -99,43 +102,44 @@ impl ServiceManager { pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { let service_lock = service.read().await; - let service_id = service_lock.info().id.clone(); + let service_id = &service_lock.info().id; - if !self.manages_service(&service_id).await { - return Err(StartupError::ServiceNotManaged(service_id)); + if !self.manages_service(service_id).await { + return Err(StartupError::ServiceNotManaged(service_id.clone())); } if !self.is_service_stopped(&service_lock).await { - return Err(StartupError::ServiceNotStopped(service_id)); + return Err(StartupError::ServiceNotStopped(service_id.clone())); } - if self.has_background_task_registered(&service_id).await { - return Err(StartupError::BackgroundTaskAlreadyRunning(service_id)); + if self.has_background_task_registered(service_id).await { + return Err(StartupError::BackgroundTaskAlreadyRunning(service_id.clone())); } - + drop(service_lock); let mut service_lock = service.write().await; service_lock.info().set_status(Status::Starting).await; self.init_service(&mut service_lock).await?; - self.start_background_task(&service_lock, Arc::clone(&service)).await; + self.start_background_task(&service_lock, Arc::clone(&service)) + .await; info!("Started service {}", service_lock.info().name); - + Ok(()) } pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { let service_lock = service.read().await; - let service_id = service_lock.info().id.clone(); + let service_id = &service_lock.info().id; - if !(self.manages_service(&service_id).await) { - return Err(ShutdownError::ServiceNotManaged(service_id)); + if !(self.manages_service(service_id).await) { + return Err(ShutdownError::ServiceNotManaged(service_id.clone())); } if !self.is_service_started(&service_lock).await { - return Err(ShutdownError::ServiceNotStarted(service_id)); + return Err(ShutdownError::ServiceNotStarted(service_id.clone())); } self.stop_background_task(&service_lock).await; @@ -147,7 +151,7 @@ impl ServiceManager { self.shutdown_service(&mut service_lock).await?; info!("Stopped service {}", service_lock.info().name); - + Ok(()) } @@ -155,7 +159,10 @@ impl ServiceManager { let mut results = Vec::new(); for service in &self.services { - results.push(self.start_service(Arc::clone(service)).await); + let service_arc_clone = Arc::clone(service); + let result = self.start_service(service_arc_clone).await; + + results.push(result); } results @@ -165,7 +172,10 @@ impl ServiceManager { let mut results = Vec::new(); for service in &self.services { - results.push(self.stop_service(Arc::clone(service)).await); + let service_arc_clone = Arc::clone(service); + let result = self.stop_service(service_arc_clone).await; + + results.push(result); } results @@ -288,26 +298,17 @@ impl ServiceManager { // Helper methods for start_service and stop_service - async fn has_background_task_registered( - &self, - service_id: &str, - ) -> bool { + async fn has_background_task_registered(&self, service_id: &str) -> bool { let tasks = self.background_tasks.read().await; tasks.contains_key(service_id) } - async fn is_service_started( - &self, - service: &RwLockReadGuard<'_, dyn Service>, - ) -> bool { + async fn is_service_started(&self, service: &RwLockReadGuard<'_, dyn Service>) -> bool { let status = service.info().get_status().await; matches!(status, Status::Started) } - async fn is_service_stopped( - &self, - service: &RwLockReadGuard<'_, dyn Service>, - ) -> bool { + async fn is_service_stopped(&self, service: &RwLockReadGuard<'_, dyn Service>) -> bool { let status = service.info().get_status().await; matches!(status, Status::Stopped) } @@ -328,7 +329,10 @@ impl ServiceManager { service.info().set_status(Status::Started).await; } Err(error) => { - service.info().set_status(Status::FailedToStart(error.to_string())).await; + service + .info() + .set_status(Status::FailedToStart(error.to_string())) + .await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } }, @@ -358,7 +362,10 @@ impl ServiceManager { service.info().set_status(Status::Stopped).await; } Err(error) => { - service.info().set_status(Status::FailedToStop(error.to_string())).await; + service + .info() + .set_status(Status::FailedToStop(error.to_string())) + .await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } }, @@ -374,7 +381,11 @@ impl ServiceManager { Ok(()) } - async fn start_background_task(&self, service_lock: &RwLockWriteGuard<'_, dyn Service>, service: Arc>) { + async fn start_background_task( + &self, + service_lock: &RwLockWriteGuard<'_, dyn Service>, + service: Arc>, + ) { if self.has_background_task_registered(&service_lock.info().id).await { return; } @@ -382,7 +393,7 @@ impl ServiceManager { let task = service_lock.task(); if let Some(task) = task { let mut watchdog = Watchdog::new(task); - + watchdog.append(|result| async move { /* We technically only need a read lock here, but we want to immediately stop @@ -436,9 +447,9 @@ impl ServiceManager { } let mut tasks_lock = self.background_tasks.write().await; - let task = tasks_lock.get(service_lock.info().id.as_str()).unwrap(); - task.abort(); - tasks_lock.remove(service_lock.info().id.as_str()); + let task = tasks_lock.get(&service_lock.info().id).unwrap(); + task.abort(); + tasks_lock.remove(&service_lock.info().id); } } From 2f8850bca0cb9a93660c661630ab09d53afb47bc Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 28 Jun 2024 21:07:48 +0200 Subject: [PATCH 09/25] add: allow clippy::multiple_bound_locations for service module --- src/service.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/service.rs b/src/service.rs index 1ac1c28..e835fc2 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,4 +1,6 @@ pub mod discord; +// TODO: Used for downcast_rs. Maybe this can be removed when updating the crate. +#[allow(clippy::multiple_bound_locations)] pub mod service; // Will be fixed when lum gets seperated into multiple workspaces pub mod service_manager; pub mod types; From 02e244133cd719914c8c5511550ae6cb26acb68b Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 26 Jul 2024 22:13:13 +0200 Subject: [PATCH 10/25] add: observables --- src/event.rs | 148 ++---------------------------- src/event/arc_observable.rs | 60 +++++++++++++ src/event/event.rs | 160 +++++++++++++++++++++++++++++++++ src/event/observable.rs | 56 ++++++++++++ src/service/service.rs | 29 +----- src/service/service_manager.rs | 34 ++++--- 6 files changed, 306 insertions(+), 181 deletions(-) create mode 100644 src/event/arc_observable.rs create mode 100644 src/event/event.rs create mode 100644 src/event/observable.rs diff --git a/src/event.rs b/src/event.rs index 6954ebc..dddafe9 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,143 +1,7 @@ -use crate::service::BoxedError; -use std::{fmt::Debug, sync::Arc}; -use tokio::sync::{ - mpsc::{channel, error::SendError, Receiver, Sender}, - Mutex, -}; +pub mod arc_observable; +pub mod event; +pub mod observable; -pub enum Subscriber { - Channel(Sender>), - Closure(Box) -> Result<(), BoxedError> + Send + Sync>), -} - -pub enum EventError { - ChannelSend(SendError>), - Closure(BoxedError), -} - -pub struct Event { - pub name: String, - - subscribers: Mutex>>, - log_on_error: bool, - remove_subscriber_on_error: bool, -} - -impl Event { - pub fn new(name: &str, log_on_error: bool, remove_subscriber_on_error: bool) -> Self { - Self { - name: name.to_string(), - subscribers: Mutex::new(Vec::new()), - log_on_error, - remove_subscriber_on_error, - } - } - - pub fn new_with_defaults(name: &str) -> Self { - Self { - name: name.to_string(), - ..Default::default() - } - } - - pub async fn subscriber_count(&self) -> usize { - let subscribers = self.subscribers.lock().await; - subscribers.len() - } - - pub async fn open_channel(&self, buffer: usize) -> Receiver> { - let (sender, receiver) = channel(buffer); - let mut subscribers = self.subscribers.lock().await; - subscribers.push(Subscriber::Channel(sender)); - receiver - } - - pub async fn subscribe( - &self, - closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, - ) { - let mut subscribers = self.subscribers.lock().await; - subscribers.push(Subscriber::Closure(Box::new(closure))); - } - - pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { - let mut subscribers = self.subscribers.lock().await; - let data = Arc::new(data); - - let mut errors = Vec::new(); - let mut subscribers_to_remove = Vec::new(); - - for (index, subscriber) in subscribers.iter().enumerate() { - let data = Arc::clone(&data); - - match subscriber { - Subscriber::Channel(sender) => { - let result = sender.send(data).await; - - if let Err(err) = result { - if self.log_on_error { - log::error!( - "Event \"{}\" failed to dispatch data to receiver {}: {}.", - self.name, - index, - err - ); - } - - if self.remove_subscriber_on_error { - log::error!("Receiver will be unregistered from event."); - subscribers_to_remove.push(index); - } - - errors.push(EventError::ChannelSend(err)); - } - } - - Subscriber::Closure(closure) => { - let result = closure(data); - - if let Err(err) = result { - if self.log_on_error { - log::error!( - "Event \"{}\" failed to dispatch data to closure {}: {}.", - self.name, - index, - err - ); - } - - if self.remove_subscriber_on_error { - log::error!("Closure will be unregistered from event."); - subscribers_to_remove.push(index); - } - - errors.push(EventError::Closure(err)); - } - } - } - } - - for index in subscribers_to_remove.into_iter().rev() { - subscribers.remove(index); - } - - if errors.is_empty() { - Ok(()) - } else { - Err(errors) - } - } -} - -impl Default for Event { - fn default() -> Self { - Self::new("Unnamed Event", true, false) - } -} - -impl Debug for Event { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct(format!("Event of type {}", std::any::type_name::()).as_str()) - .finish() - } -} +pub use arc_observable::ArcObservable; +pub use event::{Event, EventError, Subscriber}; +pub use observable::{Observable, ObservableResult}; diff --git a/src/event/arc_observable.rs b/src/event/arc_observable.rs new file mode 100644 index 0000000..bc62aa4 --- /dev/null +++ b/src/event/arc_observable.rs @@ -0,0 +1,60 @@ +use std::{ + hash::{DefaultHasher, Hash, Hasher}, + sync::Arc, +}; + +use tokio::sync::Mutex; + +use super::{Event, ObservableResult}; + +#[derive(Debug)] +pub struct ArcObservable +where + T: Hash, +{ + value: Arc>, + on_change: Event>>, +} + +impl ArcObservable +where + T: Hash, +{ + pub fn new(value: T, event_name: impl Into) -> Self { + Self { + value: Arc::new(Mutex::new(value)), + on_change: Event::from(event_name), + } + } + + pub async fn get(&self) -> Arc> { + Arc::clone(&self.value) + } + + pub async fn set(&self, value: T) -> ObservableResult>> { + let mut lock = self.value.lock().await; + + let mut hasher = DefaultHasher::new(); + (*lock).hash(&mut hasher); + let current_value = hasher.finish(); + + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + let new_value = hasher.finish(); + + if current_value == new_value { + return ObservableResult::Unchanged; + } + + *lock = value; + drop(lock); + + let value = Arc::clone(&self.value); + let dispatch_result = self.on_change.dispatch(value).await; + + match dispatch_result { + Ok(_) => ObservableResult::Changed(Ok(())), + Err(errors) => ObservableResult::Changed(Err(errors)), + } + } +} diff --git a/src/event/event.rs b/src/event/event.rs new file mode 100644 index 0000000..fc62704 --- /dev/null +++ b/src/event/event.rs @@ -0,0 +1,160 @@ +use crate::service::BoxedError; +use std::{ + any::type_name, + fmt::{self, Debug, Formatter}, + sync::Arc, +}; +use thiserror::Error; +use tokio::sync::{ + mpsc::{channel, error::SendError, Receiver, Sender}, + RwLock, +}; + +pub enum Subscriber { + Channel(Sender>), + Closure(Box) -> Result<(), BoxedError> + Send + Sync>), +} + +#[derive(Debug, Error)] +pub enum EventError { + ChannelSend(SendError>), + Closure(BoxedError), +} + +pub struct Event { + pub name: String, + + log_on_error: bool, + remove_subscriber_on_error: bool, + + subscribers: RwLock>>, +} + +impl Event { + pub fn new(name: impl Into, log_on_error: bool, remove_subscriber_on_error: bool) -> Self { + Self { + name: name.into(), + log_on_error, + remove_subscriber_on_error, + subscribers: RwLock::new(Vec::new()), + } + } + + pub async fn subscriber_count(&self) -> usize { + let subscribers = self.subscribers.read().await; + subscribers.len() + } + + pub async fn open_channel(&self, buffer: usize) -> Receiver> { + let (sender, receiver) = channel(buffer); + let mut subscribers = self.subscribers.write().await; + subscribers.push(Subscriber::Channel(sender)); + + receiver + } + + pub async fn subscribe( + &self, + closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + ) { + let mut subscribers = self.subscribers.write().await; + subscribers.push(Subscriber::Closure(Box::new(closure))); + } + + pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { + let data = Arc::new(data); + + let mut errors = Vec::new(); + let mut subscribers_to_remove = Vec::new(); + + let mut subscribers = self.subscribers.write().await; + for (index, subscriber) in subscribers.iter().enumerate() { + let data = Arc::clone(&data); + + match subscriber { + Subscriber::Channel(sender) => { + let result = sender.send(data).await; + + if let Err(err) = result { + if self.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to receiver {}: {}.", + self.name, + index, + err + ); + } + + if self.remove_subscriber_on_error { + log::error!("Receiver will be unregistered from event."); + subscribers_to_remove.push(index); + } + + errors.push(EventError::ChannelSend(err)); + } + } + + Subscriber::Closure(closure) => { + let result = closure(data); + + if let Err(err) = result { + if self.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to closure {}: {}.", + self.name, + index, + err + ); + } + + if self.remove_subscriber_on_error { + log::error!("Closure will be unregistered from event."); + subscribers_to_remove.push(index); + } + + errors.push(EventError::Closure(err)); + } + } + } + } + + for index in subscribers_to_remove.into_iter().rev() { + subscribers.remove(index); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +impl Default for Event { + fn default() -> Self { + Self::new("Unnamed Event", true, false) + } +} + +impl From for Event +where + I: Into, +{ + fn from(name: I) -> Self { + Self { + name: name.into(), + ..Default::default() + } + } +} + +impl Debug for Event { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("name", &self.name) + .field("log_on_error", &self.log_on_error) + .field("remove_subscriber_on_error", &self.remove_subscriber_on_error) + .field("subscribers", &self.subscribers.blocking_read().len()) + .finish() + } +} diff --git a/src/event/observable.rs b/src/event/observable.rs new file mode 100644 index 0000000..0ff16f1 --- /dev/null +++ b/src/event/observable.rs @@ -0,0 +1,56 @@ +use log::{debug, info}; +use tokio::sync::Mutex; + +use super::{Event, EventError}; + +#[derive(Debug)] +pub enum ObservableResult { + Unchanged, + Changed(Result<(), Vec>>), +} + +#[derive(Debug)] +pub struct Observable +where + T: Clone + PartialEq, +{ + value: Mutex, + on_change: Event, +} + +impl Observable +where + T: Clone + PartialEq, +{ + pub fn new(value: T, event_name: I) -> Self + where + I: Into, + { + Self { + value: Mutex::new(value), + on_change: Event::from(event_name), + } + } + + pub async fn get(&self) -> T { + let lock = self.value.lock().await; + lock.clone() + } + + pub async fn set(&self, value: T) -> ObservableResult { + let mut lock = self.value.lock().await; + let current_value = lock.clone(); + + if current_value == value { + return ObservableResult::Unchanged; + } + + *lock = value; + let dispatch_result = self.on_change.dispatch(lock.clone()).await; + + match dispatch_result { + Ok(_) => ObservableResult::Changed(Ok(())), + Err(errors) => ObservableResult::Changed(Err(errors)), + } + } +} diff --git a/src/service/service.rs b/src/service/service.rs index f6bf03a..04bcaba 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -5,9 +5,8 @@ use std::{ }; use downcast_rs::{impl_downcast, DowncastSync}; -use tokio::sync::RwLock; -use crate::event::Event; +use crate::event::Observable; use super::{ service_manager::ServiceManager, @@ -20,9 +19,7 @@ pub struct ServiceInfo { pub name: String, pub priority: Priority, - status: Arc>, - - pub on_status_change: Event, + pub status: Observable, } impl ServiceInfo { @@ -31,26 +28,8 @@ impl ServiceInfo { id: id.to_string(), name: name.to_string(), priority, - status: Arc::new(RwLock::new(Status::Stopped)), - on_status_change: Event::new_with_defaults(format!("{}::on_status_change", name).as_str()), - } - } - - pub async fn get_status(&self) -> Status { - let lock = self.status.read().await; - lock.clone() - } - - pub async fn set_status(&self, status: Status) { - let mut lock = self.status.write().await; - let previous_status = lock.clone(); - - if previous_status == status { - return; + status: Observable::new(Status::Stopped, format!("{}_status_change", id)), } - - *(lock) = status; - let _ = self.on_status_change.dispatch(lock.clone()).await; } } @@ -89,7 +68,7 @@ pub trait Service: DowncastSync { } fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { - Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) + Box::pin(async move { matches!(self.info().status.get().await, Status::Started) }) } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 1dd5dec..a189d6f 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -119,7 +119,7 @@ impl ServiceManager { drop(service_lock); let mut service_lock = service.write().await; - service_lock.info().set_status(Status::Starting).await; + service_lock.info().status.set(Status::Starting).await; self.init_service(&mut service_lock).await?; self.start_background_task(&service_lock, Arc::clone(&service)) @@ -147,7 +147,7 @@ impl ServiceManager { drop(service_lock); let mut service_lock = service.write().await; - service_lock.info().set_status(Status::Stopping).await; + service_lock.info().status.set(Status::Stopping).await; self.shutdown_service(&mut service_lock).await?; info!("Stopped service {}", service_lock.info().name); @@ -214,7 +214,7 @@ impl ServiceManager { Box::pin(async move { for service in self.services.iter() { let service = service.read().await; - let status = service.info().get_status().await; + let status = service.info().status.get().await; if !matches!(status, Status::Started) { return OverallStatus::Unhealthy; @@ -240,7 +240,7 @@ impl ServiceManager { let service = service.read().await; let info = service.info(); let priority = &info.priority; - let status = info.get_status().await; + let status = info.status.get().await; match status { Status::Started | Status::Stopped => match priority { @@ -304,12 +304,12 @@ impl ServiceManager { } async fn is_service_started(&self, service: &RwLockReadGuard<'_, dyn Service>) -> bool { - let status = service.info().get_status().await; + let status = service.info().status.get().await; matches!(status, Status::Started) } async fn is_service_stopped(&self, service: &RwLockReadGuard<'_, dyn Service>) -> bool { - let status = service.info().get_status().await; + let status = service.info().status.get().await; matches!(status, Status::Stopped) } @@ -326,12 +326,13 @@ impl ServiceManager { match timeout_result { Ok(start_result) => match start_result { Ok(()) => { - service.info().set_status(Status::Started).await; + service.info().status.set(Status::Started).await; } Err(error) => { service .info() - .set_status(Status::FailedToStart(error.to_string())) + .status + .set(Status::FailedToStart(error.to_string())) .await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } @@ -339,7 +340,8 @@ impl ServiceManager { Err(error) => { service .info() - .set_status(Status::FailedToStart(error.to_string())) + .status + .set(Status::FailedToStart(error.to_string())) .await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } @@ -359,12 +361,13 @@ impl ServiceManager { match timeout_result { Ok(stop_result) => match stop_result { Ok(()) => { - service.info().set_status(Status::Stopped).await; + service.info().status.set(Status::Stopped).await; } Err(error) => { service .info() - .set_status(Status::FailedToStop(error.to_string())) + .status + .set(Status::FailedToStop(error.to_string())) .await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } @@ -372,7 +375,8 @@ impl ServiceManager { Err(error) => { service .info() - .set_status(Status::FailedToStop(error.to_string())) + .status + .set(Status::FailedToStop(error.to_string())) .await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } @@ -410,7 +414,8 @@ impl ServiceManager { service .info() - .set_status(Status::RuntimeError("Background task ended unexpectedly!".to_string())) + .status + .set(Status::RuntimeError("Background task ended unexpectedly!".to_string())) .await; } @@ -423,7 +428,8 @@ impl ServiceManager { service .info() - .set_status(Status::RuntimeError( + .status + .set(Status::RuntimeError( format!("Background task ended with error: {}", error), )) .await; From 782f704c6a61fde2c6cac83389483f92a85ae00a Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 26 Jul 2024 23:11:31 +0200 Subject: [PATCH 11/25] refactor: use Mutex instead of RwLock everywhere --- src/bot.rs | 6 +- src/event/event.rs | 16 ++--- src/event/observable.rs | 1 - src/main.rs | 6 +- src/service/discord.rs | 10 +-- src/service/service_manager.rs | 118 +++++++++++++++------------------ 6 files changed, 71 insertions(+), 86 deletions(-) diff --git a/src/bot.rs b/src/bot.rs index f002d95..60e2598 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use log::info; -use tokio::sync::RwLock; +use tokio::sync::Mutex; use crate::service::{PinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder}; @@ -23,13 +23,13 @@ impl BotBuilder { } } - pub async fn with_service(mut self, service: Arc>) -> Self { + pub async fn with_service(mut self, service: Arc>) -> Self { self.service_manager = self.service_manager.with_service(service).await; // The ServiceManagerBuilder itself will warn when adding a service multiple times self } - pub async fn with_services(mut self, services: Vec>>) -> Self { + pub async fn with_services(mut self, services: Vec>>) -> Self { for service in services { self.service_manager = self.service_manager.with_service(service).await; } diff --git a/src/event/event.rs b/src/event/event.rs index fc62704..cb6e884 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -7,7 +7,7 @@ use std::{ use thiserror::Error; use tokio::sync::{ mpsc::{channel, error::SendError, Receiver, Sender}, - RwLock, + Mutex, }; pub enum Subscriber { @@ -27,7 +27,7 @@ pub struct Event { log_on_error: bool, remove_subscriber_on_error: bool, - subscribers: RwLock>>, + subscribers: Mutex>>, } impl Event { @@ -36,18 +36,18 @@ impl Event { name: name.into(), log_on_error, remove_subscriber_on_error, - subscribers: RwLock::new(Vec::new()), + subscribers: Mutex::new(Vec::new()), } } pub async fn subscriber_count(&self) -> usize { - let subscribers = self.subscribers.read().await; + let subscribers = self.subscribers.lock().await; subscribers.len() } pub async fn open_channel(&self, buffer: usize) -> Receiver> { let (sender, receiver) = channel(buffer); - let mut subscribers = self.subscribers.write().await; + let mut subscribers = self.subscribers.lock().await; subscribers.push(Subscriber::Channel(sender)); receiver @@ -57,7 +57,7 @@ impl Event { &self, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, ) { - let mut subscribers = self.subscribers.write().await; + let mut subscribers = self.subscribers.lock().await; subscribers.push(Subscriber::Closure(Box::new(closure))); } @@ -67,7 +67,7 @@ impl Event { let mut errors = Vec::new(); let mut subscribers_to_remove = Vec::new(); - let mut subscribers = self.subscribers.write().await; + let mut subscribers = self.subscribers.lock().await; for (index, subscriber) in subscribers.iter().enumerate() { let data = Arc::clone(&data); @@ -154,7 +154,7 @@ impl Debug for Event { .field("name", &self.name) .field("log_on_error", &self.log_on_error) .field("remove_subscriber_on_error", &self.remove_subscriber_on_error) - .field("subscribers", &self.subscribers.blocking_read().len()) + .field("subscribers", &self.subscribers.blocking_lock().len()) .finish() } } diff --git a/src/event/observable.rs b/src/event/observable.rs index 0ff16f1..d8cd64c 100644 --- a/src/event/observable.rs +++ b/src/event/observable.rs @@ -1,4 +1,3 @@ -use log::{debug, info}; use tokio::sync::Mutex; use super::{Event, EventError}; diff --git a/src/main.rs b/src/main.rs index 871eb86..9cc1d1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use lum::{ log, service::{discord::DiscordService, Service}, }; -use tokio::sync::RwLock; +use tokio::sync::Mutex; const BOT_NAME: &str = "Lum"; @@ -43,11 +43,11 @@ fn setup_logger() { } } -fn initialize_services(config: &Config) -> Vec>> { +fn initialize_services(config: &Config) -> Vec>> { //TODO: Add services //... let discord_service = DiscordService::new(config.discord_token.as_str()); - vec![Arc::new(RwLock::new(discord_service))] + vec![Arc::new(Mutex::new(discord_service))] } diff --git a/src/service/discord.rs b/src/service/discord.rs index 2c4b37f..098fc8e 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -24,7 +24,7 @@ use tokio::{ pub struct DiscordService { info: ServiceInfo, discord_token: String, - pub ready: Arc>>, + pub ready: Arc>>, client_handle: Option>>, pub cache: SetLock>, pub data: SetLock>>, @@ -39,7 +39,7 @@ impl DiscordService { Self { info: ServiceInfo::new("lum_builtin_discord", "Discord", Priority::Essential), discord_token: discord_token.to_string(), - ready: Arc::new(RwLock::new(SetLock::new())), + ready: Arc::new(Mutex::new(SetLock::new())), client_handle: None, cache: SetLock::new(), data: SetLock::new(), @@ -157,12 +157,12 @@ async fn convert_thread_result(client_handle: JoinHandle>) -> } struct EventHandler { - client: Arc>>, + client: Arc>>, ready_notify: Arc, } impl EventHandler { - pub fn new(client: Arc>>, ready_notify: Arc) -> Self { + pub fn new(client: Arc>>, ready_notify: Arc) -> Self { Self { client, ready_notify } } } @@ -171,7 +171,7 @@ impl EventHandler { impl client::EventHandler for EventHandler { async fn ready(&self, _ctx: Context, data_about_bot: Ready) { info!("Connected to Discord as {}", data_about_bot.user.tag()); - if let Err(error) = self.client.write().await.set(data_about_bot) { + if let Err(error) = self.client.lock().await.set(data_about_bot) { error!("Failed to set client SetLock: {}", error); panic!("Failed to set client SetLock: {}", error); } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index a189d6f..507a667 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -10,14 +10,14 @@ use log::{error, info, warn}; use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; use tokio::{ spawn, - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Mutex, MutexGuard}, task::JoinHandle, time::timeout, }; #[derive(Default)] pub struct ServiceManagerBuilder { - services: Vec>>, + services: Vec>>, } impl ServiceManagerBuilder { @@ -26,12 +26,12 @@ pub fn new() -> Self { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub async fn with_service(mut self, service: Arc>) -> Self { - let lock = service.read().await; + pub async fn with_service(mut self, service: Arc>) -> Self { + let lock = service.lock().await; let mut found = false; for registered_service in self.services.iter() { - let registered_service = registered_service.read().await; + let registered_service = registered_service.lock().await; if registered_service.info().id == lock.info().id { found = true; @@ -55,15 +55,15 @@ pub fn new() -> Self { pub async fn build(self) -> Arc { let service_manager = ServiceManager { - arc: RwLock::new(SetLock::new()), + arc: Mutex::new(SetLock::new()), services: self.services, - background_tasks: RwLock::new(HashMap::new()), + background_tasks: Mutex::new(HashMap::new()), }; let self_arc = Arc::new(service_manager); let self_arc_clone = Arc::clone(&self_arc); - let result = self_arc_clone.arc.write().await.set(Arc::clone(&self_arc_clone)); + let result = self_arc_clone.arc.lock().await.set(Arc::clone(&self_arc_clone)); if let Err(err) = result { match err { @@ -78,9 +78,9 @@ pub fn new() -> Self { } pub struct ServiceManager { - arc: RwLock>>, - services: Vec>>, - background_tasks: RwLock>>, + arc: Mutex>>, + services: Vec>>, + background_tasks: Mutex>>, } impl ServiceManager { @@ -88,9 +88,10 @@ impl ServiceManager { ServiceManagerBuilder::new() } - pub async fn manages_service(&self, service_id: &str) -> bool { + pub async fn manages_service(&self, service_id: &str) -> bool + { for service in self.services.iter() { - let service_lock = service.read().await; + let service_lock = service.lock().await; if service_lock.info().id == service_id { return true; @@ -100,25 +101,23 @@ impl ServiceManager { false } - pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { - let service_lock = service.read().await; - let service_id = &service_lock.info().id; - - if !self.manages_service(service_id).await { + pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { + let service_id = service.lock().await.info().id.clone(); + if !self.manages_service(&service_id).await { return Err(StartupError::ServiceNotManaged(service_id.clone())); } - if !self.is_service_stopped(&service_lock).await { + let mut service_lock = service.lock().await; + + let status = service_lock.info().status.get().await; + if !matches!(status, Status::Stopped) { return Err(StartupError::ServiceNotStopped(service_id.clone())); } - if self.has_background_task_registered(service_id).await { + if self.has_background_task_registered(&service_id).await { return Err(StartupError::BackgroundTaskAlreadyRunning(service_id.clone())); } - drop(service_lock); - let mut service_lock = service.write().await; - service_lock.info().status.set(Status::Starting).await; self.init_service(&mut service_lock).await?; @@ -130,24 +129,24 @@ impl ServiceManager { Ok(()) } - pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { - let service_lock = service.read().await; - let service_id = &service_lock.info().id; - - if !(self.manages_service(service_id).await) { + //TODO: Clean up + pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { + let service_id = service.lock().await.info().id.clone(); + if !(self.manages_service(&service_id).await) { return Err(ShutdownError::ServiceNotManaged(service_id.clone())); } - if !self.is_service_started(&service_lock).await { + let mut service_lock = service.lock().await; + + let status = service_lock.info().status.get().await; + if !matches!(status, Status::Started) { return Err(ShutdownError::ServiceNotStarted(service_id.clone())); } self.stop_background_task(&service_lock).await; - drop(service_lock); - let mut service_lock = service.write().await; - service_lock.info().status.set(Status::Stopping).await; + self.shutdown_service(&mut service_lock).await?; info!("Stopped service {}", service_lock.info().name); @@ -181,18 +180,17 @@ impl ServiceManager { results } - pub async fn get_service(&self) -> Option>> + pub async fn get_service(&self) -> Option>> where T: Service, { for service in self.services.iter() { - let lock = service.read().await; + let lock = service.lock().await; let is_t = lock.as_any().is::(); - drop(lock); if is_t { let arc_clone = Arc::clone(service); - let service_ptr: *const Arc> = &arc_clone; + let service_ptr: *const Arc> = &arc_clone; /* I tried to do this in safe rust for 3 days, but I couldn't figure it out @@ -200,7 +198,7 @@ impl ServiceManager { Anyways, this should never cause any issues, since we checked if the service is of type T */ unsafe { - let t_ptr: *const Arc> = mem::transmute(service_ptr); + let t_ptr: *const Arc> = mem::transmute(service_ptr); return Some(Arc::clone(&*t_ptr)); } } @@ -213,7 +211,7 @@ impl ServiceManager { pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { Box::pin(async move { for service in self.services.iter() { - let service = service.read().await; + let service = service.lock().await; let status = service.info().status.get().await; if !matches!(status, Status::Started) { @@ -237,7 +235,7 @@ impl ServiceManager { let mut others = String::new(); for service in self.services.iter() { - let service = service.read().await; + let service = service.lock().await; let info = service.info(); let priority = &info.priority; let status = info.status.get().await; @@ -296,28 +294,11 @@ impl ServiceManager { }) } - // Helper methods for start_service and stop_service - - async fn has_background_task_registered(&self, service_id: &str) -> bool { - let tasks = self.background_tasks.read().await; - tasks.contains_key(service_id) - } - - async fn is_service_started(&self, service: &RwLockReadGuard<'_, dyn Service>) -> bool { - let status = service.info().status.get().await; - matches!(status, Status::Started) - } - - async fn is_service_stopped(&self, service: &RwLockReadGuard<'_, dyn Service>) -> bool { - let status = service.info().status.get().await; - matches!(status, Status::Stopped) - } - async fn init_service( &self, - service: &mut RwLockWriteGuard<'_, dyn Service>, + service: &mut MutexGuard<'_, dyn Service>, ) -> Result<(), StartupError> { - let service_manager = Arc::clone(self.arc.read().await.unwrap()); + let service_manager = Arc::clone(self.arc.lock().await.unwrap()); //TODO: Add to config instead of hardcoding duration let start = service.start(service_manager); @@ -352,7 +333,7 @@ impl ServiceManager { async fn shutdown_service( &self, - service: &mut RwLockWriteGuard<'_, dyn Service>, + service: &mut MutexGuard<'_, dyn Service>, ) -> Result<(), ShutdownError> { //TODO: Add to config instead of hardcoding duration let stop = service.stop(); @@ -385,10 +366,15 @@ impl ServiceManager { Ok(()) } + async fn has_background_task_registered(&self, service_id: &str) -> bool { + let tasks = self.background_tasks.lock().await; + tasks.contains_key(service_id) + } + async fn start_background_task( &self, - service_lock: &RwLockWriteGuard<'_, dyn Service>, - service: Arc>, + service_lock: &MutexGuard<'_, dyn Service>, + service: Arc>, ) { if self.has_background_task_registered(&service_lock.info().id).await { return; @@ -403,7 +389,7 @@ impl ServiceManager { We technically only need a read lock here, but we want to immediately stop other services from accessing the service, so we acquire a write lock instead. */ - let service = service.write().await; + let service = service.lock().await; match result { Ok(()) => { @@ -441,18 +427,18 @@ impl ServiceManager { let join_handle = spawn(watchdog.run()); self.background_tasks - .write() + .lock() .await .insert(service_lock.info().id.clone(), join_handle); } } - async fn stop_background_task(&self, service_lock: &RwLockReadGuard<'_, dyn Service>) { + async fn stop_background_task(&self, service_lock: &MutexGuard<'_, dyn Service>) { if !self.has_background_task_registered(&service_lock.info().id).await { return; } - let mut tasks_lock = self.background_tasks.write().await; + let mut tasks_lock = self.background_tasks.lock().await; let task = tasks_lock.get(&service_lock.info().id).unwrap(); task.abort(); tasks_lock.remove(&service_lock.info().id); @@ -470,7 +456,7 @@ impl Display for ServiceManager { let mut services = self.services.iter().peekable(); while let Some(service) = services.next() { - let service = service.blocking_read(); + let service = service.blocking_lock(); write!(f, "{} ({})", service.info().name, service.info().id)?; if services.peek().is_some() { write!(f, ", ")?; From 198296bccd55e76057581aede373a088aafba202 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 27 Jul 2024 14:39:54 +0200 Subject: [PATCH 12/25] refactor: make remove_on_error work on per-subscriber basis --- src/event/event.rs | 42 ++++++++++++++++++++----------------- src/event/event_repeater.rs | 0 2 files changed, 23 insertions(+), 19 deletions(-) create mode 100644 src/event/event_repeater.rs diff --git a/src/event/event.rs b/src/event/event.rs index cb6e884..209301a 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -11,8 +11,8 @@ use tokio::sync::{ }; pub enum Subscriber { - Channel(Sender>), - Closure(Box) -> Result<(), BoxedError> + Send + Sync>), + Channel(Sender>, bool), + Closure(Box) -> Result<(), BoxedError> + Send + Sync>, bool), } #[derive(Debug, Error)] @@ -25,17 +25,15 @@ pub struct Event { pub name: String, log_on_error: bool, - remove_subscriber_on_error: bool, subscribers: Mutex>>, } impl Event { - pub fn new(name: impl Into, log_on_error: bool, remove_subscriber_on_error: bool) -> Self { + pub fn new(name: impl Into, log_on_error: bool) -> Self { Self { name: name.into(), log_on_error, - remove_subscriber_on_error, subscribers: Mutex::new(Vec::new()), } } @@ -45,10 +43,10 @@ impl Event { subscribers.len() } - pub async fn open_channel(&self, buffer: usize) -> Receiver> { + pub async fn open_channel(&self, buffer: usize, remove_on_error: bool) -> Receiver> { let (sender, receiver) = channel(buffer); let mut subscribers = self.subscribers.lock().await; - subscribers.push(Subscriber::Channel(sender)); + subscribers.push(Subscriber::Channel(sender, remove_on_error)); receiver } @@ -56,9 +54,10 @@ impl Event { pub async fn subscribe( &self, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + remove_on_error: bool, ) { let mut subscribers = self.subscribers.lock().await; - subscribers.push(Subscriber::Closure(Box::new(closure))); + subscribers.push(Subscriber::Closure(Box::new(closure), remove_on_error)); } pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { @@ -72,7 +71,7 @@ impl Event { let data = Arc::clone(&data); match subscriber { - Subscriber::Channel(sender) => { + Subscriber::Channel(sender, remove_on_error) => { let result = sender.send(data).await; if let Err(err) = result { @@ -85,8 +84,11 @@ impl Event { ); } - if self.remove_subscriber_on_error { - log::error!("Receiver will be unregistered from event."); + if *remove_on_error { + if self.log_on_error { + log::error!("Receiver will be unregistered from event."); + } + subscribers_to_remove.push(index); } @@ -94,7 +96,7 @@ impl Event { } } - Subscriber::Closure(closure) => { + Subscriber::Closure(closure, remove_on_error) => { let result = closure(data); if let Err(err) = result { @@ -107,8 +109,11 @@ impl Event { ); } - if self.remove_subscriber_on_error { - log::error!("Closure will be unregistered from event."); + if *remove_on_error { + if self.log_on_error { + log::error!("Closure will be unregistered from event."); + } + subscribers_to_remove.push(index); } @@ -132,15 +137,15 @@ impl Event { impl Default for Event { fn default() -> Self { - Self::new("Unnamed Event", true, false) + Self::new("Unnamed Event", true) } } -impl From for Event +impl From for Event where - I: Into, + S: Into, { - fn from(name: I) -> Self { + fn from(name: S) -> Self { Self { name: name.into(), ..Default::default() @@ -153,7 +158,6 @@ impl Debug for Event { f.debug_struct(type_name::()) .field("name", &self.name) .field("log_on_error", &self.log_on_error) - .field("remove_subscriber_on_error", &self.remove_subscriber_on_error) .field("subscribers", &self.subscribers.blocking_lock().len()) .finish() } diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs new file mode 100644 index 0000000..e69de29 From 88a28c1c864168d4c8ba44e3c6524f28e0dc8402 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 27 Jul 2024 16:14:40 +0200 Subject: [PATCH 13/25] refactor: make subscribers identifiable --- Cargo.toml | 1 + src/event.rs | 4 +- src/event/event.rs | 66 +++++++++++++++++--------- src/event/subscriber.rs | 102 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 150 insertions(+), 23 deletions(-) create mode 100644 src/event/subscriber.rs diff --git a/Cargo.toml b/Cargo.toml index adbdd2a..3022bda 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,3 +23,4 @@ serenity = { version = "0.12.0", default-features=false, features = ["builder", sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] } thiserror = "1.0.52" tokio = { version = "1.35.1", features = ["full"] } +uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } diff --git a/src/event.rs b/src/event.rs index dddafe9..77847de 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,7 +1,9 @@ pub mod arc_observable; pub mod event; pub mod observable; +pub mod subscriber; pub use arc_observable::ArcObservable; -pub use event::{Event, EventError, Subscriber}; +pub use event::{Event, EventError}; pub use observable::{Observable, ObservableResult}; +pub use subscriber::{Callback, Subscriber}; diff --git a/src/event/event.rs b/src/event/event.rs index 209301a..d2eb25a 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -6,14 +6,14 @@ use std::{ }; use thiserror::Error; use tokio::sync::{ - mpsc::{channel, error::SendError, Receiver, Sender}, + mpsc::{channel, error::SendError, Receiver}, Mutex, }; -pub enum Subscriber { - Channel(Sender>, bool), - Closure(Box) -> Result<(), BoxedError> + Send + Sync>, bool), -} +use super::{ + subscriber::{ReceiverSubscription, Subscription}, + Callback, Subscriber, +}; #[derive(Debug, Error)] pub enum EventError { @@ -43,21 +43,43 @@ impl Event { subscribers.len() } - pub async fn open_channel(&self, buffer: usize, remove_on_error: bool) -> Receiver> { + pub async fn open_channel( + &self, + name: S, + buffer: usize, + remove_on_error: bool, + ) -> ReceiverSubscription> + where + S: Into, + { let (sender, receiver) = channel(buffer); + let subscriber = Subscriber::new(name, remove_on_error, Callback::Channel(sender)); + + let subscription = Subscription::from(&subscriber); + let receiver_subscription = ReceiverSubscription::new(subscription, receiver); + let mut subscribers = self.subscribers.lock().await; - subscribers.push(Subscriber::Channel(sender, remove_on_error)); + subscribers.push(subscriber); - receiver + receiver_subscription } - pub async fn subscribe( + pub async fn register_callback( &self, + name: S, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, remove_on_error: bool, - ) { + ) -> Subscription + where + S: Into, + { + let subscriber = Subscriber::new(name, remove_on_error, Callback::Closure(Box::new(closure))); + let subscription = Subscription::from(&subscriber); + let mut subscribers = self.subscribers.lock().await; - subscribers.push(Subscriber::Closure(Box::new(closure), remove_on_error)); + subscribers.push(subscriber); + + subscription } pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { @@ -70,23 +92,23 @@ impl Event { for (index, subscriber) in subscribers.iter().enumerate() { let data = Arc::clone(&data); - match subscriber { - Subscriber::Channel(sender, remove_on_error) => { + match &subscriber.callback { + Callback::Channel(sender) => { let result = sender.send(data).await; if let Err(err) = result { if self.log_on_error { log::error!( - "Event \"{}\" failed to dispatch data to receiver {}: {}.", + "Event \"{}\" failed to dispatch data to Channel callback of subscriber {}: {}.", self.name, - index, + subscriber.name, err ); } - if *remove_on_error { + if subscriber.remove_on_error { if self.log_on_error { - log::error!("Receiver will be unregistered from event."); + log::error!("Subscriber will be unregistered from event."); } subscribers_to_remove.push(index); @@ -96,22 +118,22 @@ impl Event { } } - Subscriber::Closure(closure, remove_on_error) => { + Callback::Closure(closure) => { let result = closure(data); if let Err(err) = result { if self.log_on_error { log::error!( - "Event \"{}\" failed to dispatch data to closure {}: {}.", + "Event \"{}\" failed to dispatch data to Closure callback of subscriber {}: {}.", self.name, - index, + subscriber.name, err ); } - if *remove_on_error { + if subscriber.remove_on_error { if self.log_on_error { - log::error!("Closure will be unregistered from event."); + log::error!("Subscriber will be unregistered from event."); } subscribers_to_remove.push(index); diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs new file mode 100644 index 0000000..b4dae81 --- /dev/null +++ b/src/event/subscriber.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use tokio::sync::mpsc::{Receiver, Sender}; +use uuid::Uuid; + +use crate::service::BoxedError; + +pub enum Callback { + Channel(Sender>), + Closure(Box) -> Result<(), BoxedError> + Send + Sync>), +} + +pub struct Subscriber { + pub name: String, + pub remove_on_error: bool, + pub callback: Callback, + + pub uuid: Uuid, +} + +impl Subscriber { + pub fn new(name: S, remove_on_error: bool, callback: Callback) -> Self + where + S: Into, + { + Self { + name: name.into(), + remove_on_error, + callback, + uuid: Uuid::new_v4(), + } + } +} + +impl PartialEq for Subscriber { + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Subscriber {} + +#[derive(Debug, PartialEq, Eq)] +pub struct Subscription { + pub uuid: Uuid, +} + +impl From> for Subscription { + fn from(subscriber: Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +impl From<&Subscriber> for Subscription { + fn from(subscriber: &Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +impl AsRef for Subscription { + fn as_ref(&self) -> &Uuid { + &self.uuid + } +} + +pub struct ReceiverSubscription { + pub subscription: Subscription, + pub receiver: Receiver, +} + +impl ReceiverSubscription { + pub fn new(subscription: Subscription, receiver: Receiver) -> Self { + Self { + subscription, + receiver, + } + } +} + +impl PartialEq for ReceiverSubscription { + fn eq(&self, other: &Self) -> bool { + self.subscription == other.subscription + } +} + +impl Eq for ReceiverSubscription {} + +impl AsRef for ReceiverSubscription { + fn as_ref(&self) -> &Subscription { + &self.subscription + } +} + +impl AsRef for ReceiverSubscription { + fn as_ref(&self) -> &Uuid { + self.subscription.as_ref() + } +} From 8f4d6483897ebc50de14bce222f403697ca7d8d9 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 27 Jul 2024 16:37:50 +0200 Subject: [PATCH 14/25] refactor: move dispatch logic to Subscriber --- src/event.rs | 4 +-- src/event/event.rs | 78 ++++++++++------------------------------- src/event/observable.rs | 4 +-- src/event/subscriber.rs | 19 +++++++++- 4 files changed, 41 insertions(+), 64 deletions(-) diff --git a/src/event.rs b/src/event.rs index 77847de..108f203 100644 --- a/src/event.rs +++ b/src/event.rs @@ -4,6 +4,6 @@ pub mod observable; pub mod subscriber; pub use arc_observable::ArcObservable; -pub use event::{Event, EventError}; +pub use event::Event; pub use observable::{Observable, ObservableResult}; -pub use subscriber::{Callback, Subscriber}; +pub use subscriber::{Callback, DispatchError, Subscriber}; diff --git a/src/event/event.rs b/src/event/event.rs index d2eb25a..d66890c 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -4,23 +4,13 @@ use std::{ fmt::{self, Debug, Formatter}, sync::Arc, }; -use thiserror::Error; -use tokio::sync::{ - mpsc::{channel, error::SendError, Receiver}, - Mutex, -}; +use tokio::sync::{mpsc::channel, Mutex}; use super::{ subscriber::{ReceiverSubscription, Subscription}, - Callback, Subscriber, + Callback, DispatchError, Subscriber, }; -#[derive(Debug, Error)] -pub enum EventError { - ChannelSend(SendError>), - Closure(BoxedError), -} - pub struct Event { pub name: String, @@ -82,7 +72,7 @@ impl Event { subscription } - pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { + pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { let data = Arc::new(data); let mut errors = Vec::new(); @@ -92,56 +82,26 @@ impl Event { for (index, subscriber) in subscribers.iter().enumerate() { let data = Arc::clone(&data); - match &subscriber.callback { - Callback::Channel(sender) => { - let result = sender.send(data).await; - - if let Err(err) = result { - if self.log_on_error { - log::error!( - "Event \"{}\" failed to dispatch data to Channel callback of subscriber {}: {}.", - self.name, - subscriber.name, - err - ); - } - - if subscriber.remove_on_error { - if self.log_on_error { - log::error!("Subscriber will be unregistered from event."); - } - - subscribers_to_remove.push(index); - } - - errors.push(EventError::ChannelSend(err)); - } + let result = subscriber.dispatch(data).await; + if let Err(err) = result { + if self.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to subscriber {}: {}.", + self.name, + subscriber.name, + err + ); } - Callback::Closure(closure) => { - let result = closure(data); - - if let Err(err) = result { - if self.log_on_error { - log::error!( - "Event \"{}\" failed to dispatch data to Closure callback of subscriber {}: {}.", - self.name, - subscriber.name, - err - ); - } - - if subscriber.remove_on_error { - if self.log_on_error { - log::error!("Subscriber will be unregistered from event."); - } - - subscribers_to_remove.push(index); - } - - errors.push(EventError::Closure(err)); + if subscriber.remove_on_error { + if self.log_on_error { + log::error!("Subscriber will be unregistered from event."); } + + subscribers_to_remove.push(index); } + + errors.push(err); } } diff --git a/src/event/observable.rs b/src/event/observable.rs index d8cd64c..c06917b 100644 --- a/src/event/observable.rs +++ b/src/event/observable.rs @@ -1,11 +1,11 @@ use tokio::sync::Mutex; -use super::{Event, EventError}; +use super::{DispatchError, Event}; #[derive(Debug)] pub enum ObservableResult { Unchanged, - Changed(Result<(), Vec>>), + Changed(Result<(), Vec>>), } #[derive(Debug)] diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs index b4dae81..c062fec 100644 --- a/src/event/subscriber.rs +++ b/src/event/subscriber.rs @@ -1,6 +1,7 @@ use std::sync::Arc; -use tokio::sync::mpsc::{Receiver, Sender}; +use thiserror::Error; +use tokio::sync::mpsc::{error::SendError, Receiver, Sender}; use uuid::Uuid; use crate::service::BoxedError; @@ -10,6 +11,15 @@ pub enum Callback { Closure(Box) -> Result<(), BoxedError> + Send + Sync>), } +#[derive(Debug, Error)] +pub enum DispatchError { + #[error("Failed to send data to channel: {0}")] + ChannelSend(#[from] SendError>), + + #[error("Failed to dispatch data to closure: {0}")] + Closure(#[from] BoxedError), +} + pub struct Subscriber { pub name: String, pub remove_on_error: bool, @@ -30,6 +40,13 @@ impl Subscriber { uuid: Uuid::new_v4(), } } + + pub async fn dispatch(&self, data: Arc) -> Result<(), DispatchError> { + match &self.callback { + Callback::Channel(sender) => sender.send(data).await.map_err(DispatchError::ChannelSend), + Callback::Closure(closure) => closure(data).map_err(DispatchError::Closure), + } + } } impl PartialEq for Subscriber { From 740f6d3f6d8a9741fec12ddefbef3505720603c2 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 27 Jul 2024 17:02:25 +0200 Subject: [PATCH 15/25] add: AsyncClosure Callback type --- src/bot.rs | 6 +++--- src/event/event.rs | 20 +++++++++++++++++++- src/event/subscriber.rs | 9 +++++++-- src/service/discord.rs | 8 ++++---- src/service/service.rs | 10 +++++----- src/service/service_manager.rs | 6 +++--- src/service/types.rs | 11 +++++++---- src/service/watchdog.rs | 6 +++--- 8 files changed, 51 insertions(+), 25 deletions(-) diff --git a/src/bot.rs b/src/bot.rs index 60e2598..79df9a5 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use log::info; use tokio::sync::Mutex; -use crate::service::{PinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder}; +use crate::service::{types::LifetimedPinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder}; pub enum ExitReason { SIGINT, @@ -56,7 +56,7 @@ impl Bot { } //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future - pub fn start(&mut self) -> PinnedBoxedFuture<'_, ()> { + pub fn start(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { Box::pin(async move { self.service_manager.start_services().await; //TODO: Potential for further initialization here, like modules @@ -64,7 +64,7 @@ impl Bot { } //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future - pub fn stop(&mut self) -> PinnedBoxedFuture<'_, ()> { + pub fn stop(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { Box::pin(async move { self.service_manager.stop_services().await; //TODO: Potential for further deinitialization here, like modules diff --git a/src/event/event.rs b/src/event/event.rs index d66890c..6b0aed5 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -1,4 +1,4 @@ -use crate::service::BoxedError; +use crate::service::{BoxedError, PinnedBoxedFutureResult}; use std::{ any::type_name, fmt::{self, Debug, Formatter}, @@ -72,6 +72,24 @@ impl Event { subscription } + pub async fn register_async_callback( + &self, + name: S, + closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, + remove_on_error: bool, + ) -> Subscription + where + S: Into, + { + let subscriber = Subscriber::new(name, remove_on_error, Callback::AsyncClosure(Box::new(closure))); + let subscription = Subscription::from(&subscriber); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + subscription + } + pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { let data = Arc::new(data); diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs index c062fec..50a90ef 100644 --- a/src/event/subscriber.rs +++ b/src/event/subscriber.rs @@ -4,10 +4,11 @@ use thiserror::Error; use tokio::sync::mpsc::{error::SendError, Receiver, Sender}; use uuid::Uuid; -use crate::service::BoxedError; +use crate::service::{BoxedError, PinnedBoxedFutureResult}; pub enum Callback { Channel(Sender>), + AsyncClosure(Box) -> PinnedBoxedFutureResult<()> + Send + Sync>), Closure(Box) -> Result<(), BoxedError> + Send + Sync>), } @@ -16,8 +17,11 @@ pub enum DispatchError { #[error("Failed to send data to channel: {0}")] ChannelSend(#[from] SendError>), + #[error("Failed to dispatch data to async closure: {0}")] + AsyncClosure(BoxedError), + #[error("Failed to dispatch data to closure: {0}")] - Closure(#[from] BoxedError), + Closure(BoxedError), } pub struct Subscriber { @@ -44,6 +48,7 @@ impl Subscriber { pub async fn dispatch(&self, data: Arc) -> Result<(), DispatchError> { match &self.callback { Callback::Channel(sender) => sender.send(data).await.map_err(DispatchError::ChannelSend), + Callback::AsyncClosure(closure) => closure(data).await.map_err(DispatchError::AsyncClosure), Callback::Closure(closure) => closure(data).map_err(DispatchError::Closure), } } diff --git a/src/service/discord.rs b/src/service/discord.rs index 098fc8e..bd77d13 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -1,6 +1,6 @@ use crate::setlock::SetLock; -use super::{PinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; +use super::{types::LifetimedPinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; use log::{error, info}; use serenity::{ all::{GatewayIntents, Ready}, @@ -56,7 +56,7 @@ impl Service for DiscordService { &self.info } - fn start(&mut self, _service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()> { + fn start(&mut self, _service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()> { Box::pin(async move { let client_ready_notify = Arc::new(Notify::new()); @@ -115,7 +115,7 @@ impl Service for DiscordService { }) } - fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()> { + fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()> { Box::pin(async move { if let Some(client_handle) = self.client_handle.take() { info!("Waiting for Discord client to stop..."); @@ -129,7 +129,7 @@ impl Service for DiscordService { }) } - fn task<'a>(&self) -> Option> { + fn task<'a>(&self) -> Option> { Some(Box::pin(async move { let mut i = 0; loop { diff --git a/src/service/service.rs b/src/service/service.rs index 04bcaba..4c0c83a 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -10,7 +10,7 @@ use crate::event::Observable; use super::{ service_manager::ServiceManager, - types::{PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, Status}, + types::{LifetimedPinnedBoxedFuture, LifetimedPinnedBoxedFutureResult, Priority, Status}, }; #[derive(Debug)] @@ -61,13 +61,13 @@ impl Hash for ServiceInfo { //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult pub trait Service: DowncastSync { fn info(&self) -> &ServiceInfo; - fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; - fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; - fn task<'a>(&self) -> Option> { + fn start(&mut self, service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + fn task<'a>(&self) -> Option> { None } - fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { + fn is_available(&self) -> LifetimedPinnedBoxedFuture<'_, bool> { Box::pin(async move { matches!(self.info().status.get().await, Status::Started) }) } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 507a667..3028997 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -1,6 +1,6 @@ use super::{ service::Service, - types::{OverallStatus, PinnedBoxedFuture, Priority, ShutdownError, StartupError, Status}, + types::{LifetimedPinnedBoxedFuture, OverallStatus, Priority, ShutdownError, StartupError, Status}, }; use crate::{ service::Watchdog, @@ -208,7 +208,7 @@ impl ServiceManager { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { + pub fn overall_status(&self) -> LifetimedPinnedBoxedFuture<'_, OverallStatus> { Box::pin(async move { for service in self.services.iter() { let service = service.lock().await; @@ -224,7 +224,7 @@ impl ServiceManager { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn status_tree(&self) -> PinnedBoxedFuture<'_, String> { + pub fn status_tree(&self) -> LifetimedPinnedBoxedFuture<'_, String> { Box::pin(async move { let mut text_buffer = String::new(); diff --git a/src/service/types.rs b/src/service/types.rs index d878a4a..32d7b8c 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -4,11 +4,14 @@ use thiserror::Error; pub type BoxedError = Box; -pub type BoxedFuture<'a, T> = Box + Send + 'a>; -pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; +pub type BoxedFuture = Box + Send>; +pub type BoxedFutureResult = BoxedFuture>; -pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; -pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; +pub type PinnedBoxedFuture = Pin + Send>>; +pub type PinnedBoxedFutureResult = PinnedBoxedFuture>; + +pub type LifetimedPinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type LifetimedPinnedBoxedFutureResult<'a, T> = LifetimedPinnedBoxedFuture<'a, Result>; #[derive(Debug, Clone)] pub enum Status { diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs index 9d826d7..2eb53a9 100644 --- a/src/service/watchdog.rs +++ b/src/service/watchdog.rs @@ -1,4 +1,4 @@ -use super::PinnedBoxedFuture; +use super::types::LifetimedPinnedBoxedFuture; use log::error; use serenity::FutureExt; use std::{future::Future, mem::replace, sync::Arc}; @@ -9,12 +9,12 @@ use tokio::sync::{ //TODO: Rename to TaskChain and use Event instead of manual subscriber handling pub struct Watchdog<'a, T: Send> { - task: PinnedBoxedFuture<'a, T>, + task: LifetimedPinnedBoxedFuture<'a, T>, subscribers: Arc>>>>, } impl<'a, T: 'a + Send> Watchdog<'a, T> { - pub fn new(task: PinnedBoxedFuture<'a, T>) -> Self { + pub fn new(task: LifetimedPinnedBoxedFuture<'a, T>) -> Self { Self { task, subscribers: Arc::new(Mutex::new(Vec::new())), From 7b3345c8860f2a48cef00338b93644e0d4c3cf45 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sun, 28 Jul 2024 12:56:01 +0200 Subject: [PATCH 16/25] WIP: EventRepeater --- src/event.rs | 2 ++ src/event/event.rs | 12 ++++---- src/event/event_repeater.rs | 57 +++++++++++++++++++++++++++++++++++++ 3 files changed, 65 insertions(+), 6 deletions(-) diff --git a/src/event.rs b/src/event.rs index 108f203..4c6721c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,9 +1,11 @@ pub mod arc_observable; pub mod event; +pub mod event_repeater; pub mod observable; pub mod subscriber; pub use arc_observable::ArcObservable; pub use event::Event; +pub use event_repeater::EventRepeater; pub use observable::{Observable, ObservableResult}; pub use subscriber::{Callback, DispatchError, Subscriber}; diff --git a/src/event/event.rs b/src/event/event.rs index 6b0aed5..258fbe8 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -54,16 +54,16 @@ impl Event { receiver_subscription } - pub async fn register_callback( + pub async fn subscribe_async( &self, name: S, - closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, remove_on_error: bool, ) -> Subscription where S: Into, { - let subscriber = Subscriber::new(name, remove_on_error, Callback::Closure(Box::new(closure))); + let subscriber = Subscriber::new(name, remove_on_error, Callback::AsyncClosure(Box::new(closure))); let subscription = Subscription::from(&subscriber); let mut subscribers = self.subscribers.lock().await; @@ -72,16 +72,16 @@ impl Event { subscription } - pub async fn register_async_callback( + pub async fn subscribe( &self, name: S, - closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, + closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, remove_on_error: bool, ) -> Subscription where S: Into, { - let subscriber = Subscriber::new(name, remove_on_error, Callback::AsyncClosure(Box::new(closure))); + let subscriber = Subscriber::new(name, remove_on_error, Callback::Closure(Box::new(closure))); let subscription = Subscription::from(&subscriber); let mut subscribers = self.subscribers.lock().await; diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs index e69de29..99d0c46 100644 --- a/src/event/event_repeater.rs +++ b/src/event/event_repeater.rs @@ -0,0 +1,57 @@ +use std::sync::Arc; + +use tokio::{spawn, task::JoinHandle}; + +use crate::service::{BoxedError, PinnedBoxedFutureResult}; + +use super::{ + subscriber::{ReceiverSubscription, Subscription}, + Event, +}; + +pub struct EventRepeater { + pub name: String, + + channel_task: JoinHandle<()>, + + subscription_channel: ReceiverSubscription>, + subscription_async_closure: Subscription, + subscription_closure: Subscription, +} + +impl EventRepeater { + pub async fn new(name: S, event: &Event, buffer: usize) -> Self + where + T: 'static, + S: Into, + { + let name = name.into(); + + let subscription_channel = event.open_channel(name.clone(), buffer, false).await; + + let subscription_async_closure = event.subscribe_async(name.clone(), async_closure, false).await; + let subscription_closure = event.subscribe(name.clone(), closure, false).await; + + let channel_task = spawn(run_channel_task()); + + Self { + name, + + channel_task, + + subscription_channel, + subscription_async_closure, + subscription_closure, + } + } +} + +async fn run_channel_task() {} + +fn async_closure(data: Arc) -> PinnedBoxedFutureResult<()> { + Box::pin(async move { Ok(()) }) +} + +fn closure(data: Arc) -> Result<(), BoxedError> { + Ok(()) +} From 4cdff2fbcd14a46cc72b8c12d592d3da0e6eaeeb Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Mon, 5 Aug 2024 22:24:25 +0200 Subject: [PATCH 17/25] add: EventRepeater --- Cargo.toml | 2 +- src/event/arc_observable.rs | 10 ++--- src/event/event.rs | 65 ++++++++++++++------------- src/event/event_repeater.rs | 89 ++++++++++++++++++------------------- src/event/observable.rs | 19 +++++--- src/event/subscriber.rs | 68 +++++++++++++++++++++------- 6 files changed, 149 insertions(+), 104 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3022bda..4aedd4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,4 +23,4 @@ serenity = { version = "0.12.0", default-features=false, features = ["builder", sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] } thiserror = "1.0.52" tokio = { version = "1.35.1", features = ["full"] } -uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } +uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } \ No newline at end of file diff --git a/src/event/arc_observable.rs b/src/event/arc_observable.rs index bc62aa4..b399b67 100644 --- a/src/event/arc_observable.rs +++ b/src/event/arc_observable.rs @@ -10,20 +10,20 @@ use super::{Event, ObservableResult}; #[derive(Debug)] pub struct ArcObservable where - T: Hash, + T: Send + 'static + Hash, { value: Arc>, - on_change: Event>>, + on_change: Event>, } impl ArcObservable where - T: Hash, + T: Send + 'static + Hash, { pub fn new(value: T, event_name: impl Into) -> Self { Self { value: Arc::new(Mutex::new(value)), - on_change: Event::from(event_name), + on_change: Event::new(event_name), } } @@ -31,7 +31,7 @@ where Arc::clone(&self.value) } - pub async fn set(&self, value: T) -> ObservableResult>> { + pub async fn set(&self, value: T) -> ObservableResult> { let mut lock = self.value.lock().await; let mut hasher = DefaultHasher::new(); diff --git a/src/event/event.rs b/src/event/event.rs index 258fbe8..9a45919 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -11,19 +11,25 @@ use super::{ Callback, DispatchError, Subscriber, }; -pub struct Event { +pub struct Event +where + T: Send + Sync + 'static, +{ pub name: String, - log_on_error: bool, - subscribers: Mutex>>, } -impl Event { - pub fn new(name: impl Into, log_on_error: bool) -> Self { +impl Event +where + T: Send + Sync + 'static, +{ + pub fn new(name: S) -> Self + where + S: Into, + { Self { name: name.into(), - log_on_error, subscribers: Mutex::new(Vec::new()), } } @@ -37,13 +43,14 @@ impl Event { &self, name: S, buffer: usize, + log_on_error: bool, remove_on_error: bool, ) -> ReceiverSubscription> where S: Into, { let (sender, receiver) = channel(buffer); - let subscriber = Subscriber::new(name, remove_on_error, Callback::Channel(sender)); + let subscriber = Subscriber::new(name, log_on_error, remove_on_error, Callback::Channel(sender)); let subscription = Subscription::from(&subscriber); let receiver_subscription = ReceiverSubscription::new(subscription, receiver); @@ -58,12 +65,18 @@ impl Event { &self, name: S, closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, + log_on_error: bool, remove_on_error: bool, ) -> Subscription where S: Into, { - let subscriber = Subscriber::new(name, remove_on_error, Callback::AsyncClosure(Box::new(closure))); + let subscriber = Subscriber::new( + name, + log_on_error, + remove_on_error, + Callback::AsyncClosure(Box::new(closure)), + ); let subscription = Subscription::from(&subscriber); let mut subscribers = self.subscribers.lock().await; @@ -76,12 +89,18 @@ impl Event { &self, name: S, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + log_on_error: bool, remove_on_error: bool, ) -> Subscription where S: Into, { - let subscriber = Subscriber::new(name, remove_on_error, Callback::Closure(Box::new(closure))); + let subscriber = Subscriber::new( + name, + log_on_error, + remove_on_error, + Callback::Closure(Box::new(closure)), + ); let subscription = Subscription::from(&subscriber); let mut subscribers = self.subscribers.lock().await; @@ -90,9 +109,7 @@ impl Event { subscription } - pub async fn dispatch(&self, data: T) -> Result<(), Vec>> { - let data = Arc::new(data); - + pub async fn dispatch(&self, data: Arc) -> Result<(), Vec>> { let mut errors = Vec::new(); let mut subscribers_to_remove = Vec::new(); @@ -102,7 +119,7 @@ impl Event { let result = subscriber.dispatch(data).await; if let Err(err) = result { - if self.log_on_error { + if subscriber.log_on_error { log::error!( "Event \"{}\" failed to dispatch data to subscriber {}: {}.", self.name, @@ -112,7 +129,7 @@ impl Event { } if subscriber.remove_on_error { - if self.log_on_error { + if subscriber.log_on_error { log::error!("Subscriber will be unregistered from event."); } @@ -135,29 +152,13 @@ impl Event { } } -impl Default for Event { - fn default() -> Self { - Self::new("Unnamed Event", true) - } -} - -impl From for Event +impl Debug for Event where - S: Into, + T: Send + Sync + 'static, { - fn from(name: S) -> Self { - Self { - name: name.into(), - ..Default::default() - } - } -} - -impl Debug for Event { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct(type_name::()) .field("name", &self.name) - .field("log_on_error", &self.log_on_error) .field("subscribers", &self.subscribers.blocking_lock().len()) .finish() } diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs index 99d0c46..e3abeab 100644 --- a/src/event/event_repeater.rs +++ b/src/event/event_repeater.rs @@ -1,57 +1,56 @@ use std::sync::Arc; +use tokio::{sync::Mutex, task::JoinHandle}; -use tokio::{spawn, task::JoinHandle}; +use super::Event; -use crate::service::{BoxedError, PinnedBoxedFutureResult}; - -use super::{ - subscriber::{ReceiverSubscription, Subscription}, - Event, -}; - -pub struct EventRepeater { - pub name: String, - - channel_task: JoinHandle<()>, - - subscription_channel: ReceiverSubscription>, - subscription_async_closure: Subscription, - subscription_closure: Subscription, +pub struct EventRepeater +where + T: Send + Sync + 'static, +{ + pub event: Event, + self_arc: Mutex>>, + tasks: Mutex>>, } -impl EventRepeater { - pub async fn new(name: S, event: &Event, buffer: usize) -> Self +impl EventRepeater +where + T: Send + Sync + 'static, +{ + pub async fn new(name: S) -> Arc where T: 'static, S: Into, { - let name = name.into(); - - let subscription_channel = event.open_channel(name.clone(), buffer, false).await; - - let subscription_async_closure = event.subscribe_async(name.clone(), async_closure, false).await; - let subscription_closure = event.subscribe(name.clone(), closure, false).await; - - let channel_task = spawn(run_channel_task()); - - Self { - name, - - channel_task, - - subscription_channel, - subscription_async_closure, - subscription_closure, - } + let event = Event::new(name); + let event_repeater = Self { + self_arc: Mutex::new(None), + event, + tasks: Mutex::new(Vec::new()), + }; + + let self_arc = Arc::new(event_repeater); + let mut lock = self_arc.self_arc.lock().await; + let self_arc_clone = Arc::clone(&self_arc); + *lock = Some(self_arc_clone); + drop(lock); + + self_arc } -} - -async fn run_channel_task() {} -fn async_closure(data: Arc) -> PinnedBoxedFutureResult<()> { - Box::pin(async move { Ok(()) }) -} - -fn closure(data: Arc) -> Result<(), BoxedError> { - Ok(()) + pub async fn attach(&self, event: &Event, buffer: usize) { + let self_arc = match self.self_arc.lock().await.as_ref() { + Some(arc) => Arc::clone(arc), + None => panic!("Tried to attach event {} to EventRepeater {} before it was initialized. Did you not use EventRepeater::new()?", event.name, self.event.name), + }; + + let mut receiver = event.open_channel(&self.event.name, buffer, true, true).await; + let join_handle = tokio::spawn(async move { + while let Some(value) = receiver.receiver.recv().await { + let _ = self_arc.event.dispatch(value).await; + } + }); + + let mut tasks = self.tasks.lock().await; + tasks.push(join_handle); + } } diff --git a/src/event/observable.rs b/src/event/observable.rs index c06917b..788d7cd 100644 --- a/src/event/observable.rs +++ b/src/event/observable.rs @@ -1,9 +1,14 @@ +use std::sync::Arc; + use tokio::sync::Mutex; use super::{DispatchError, Event}; #[derive(Debug)] -pub enum ObservableResult { +pub enum ObservableResult +where + T: Send + Sync + 'static, +{ Unchanged, Changed(Result<(), Vec>>), } @@ -11,7 +16,7 @@ pub enum ObservableResult { #[derive(Debug)] pub struct Observable where - T: Clone + PartialEq, + T: Send + Sync + 'static + Clone + PartialEq, { value: Mutex, on_change: Event, @@ -19,7 +24,7 @@ where impl Observable where - T: Clone + PartialEq, + T: Send + Sync + 'static + Clone + PartialEq, { pub fn new(value: T, event_name: I) -> Self where @@ -27,7 +32,7 @@ where { Self { value: Mutex::new(value), - on_change: Event::from(event_name), + on_change: Event::new(event_name), } } @@ -44,8 +49,10 @@ where return ObservableResult::Unchanged; } - *lock = value; - let dispatch_result = self.on_change.dispatch(lock.clone()).await; + *lock = value.clone(); + + let value = Arc::new(value); + let dispatch_result = self.on_change.dispatch(value).await; match dispatch_result { Ok(_) => ObservableResult::Changed(Ok(())), diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs index 50a90ef..48b29f6 100644 --- a/src/event/subscriber.rs +++ b/src/event/subscriber.rs @@ -6,14 +6,20 @@ use uuid::Uuid; use crate::service::{BoxedError, PinnedBoxedFutureResult}; -pub enum Callback { +pub enum Callback +where + T: Send + Sync + 'static, +{ Channel(Sender>), AsyncClosure(Box) -> PinnedBoxedFutureResult<()> + Send + Sync>), Closure(Box) -> Result<(), BoxedError> + Send + Sync>), } #[derive(Debug, Error)] -pub enum DispatchError { +pub enum DispatchError +where + T: Send + Sync + 'static, +{ #[error("Failed to send data to channel: {0}")] ChannelSend(#[from] SendError>), @@ -24,21 +30,29 @@ pub enum DispatchError { Closure(BoxedError), } -pub struct Subscriber { +pub struct Subscriber +where + T: Send + Sync + 'static, +{ pub name: String, + pub log_on_error: bool, pub remove_on_error: bool, pub callback: Callback, pub uuid: Uuid, } -impl Subscriber { - pub fn new(name: S, remove_on_error: bool, callback: Callback) -> Self +impl Subscriber +where + T: Send + Sync + 'static, +{ + pub fn new(name: S, log_on_error: bool, remove_on_error: bool, callback: Callback) -> Self where S: Into, { Self { name: name.into(), + log_on_error, remove_on_error, callback, uuid: Uuid::new_v4(), @@ -54,20 +68,26 @@ impl Subscriber { } } -impl PartialEq for Subscriber { +impl PartialEq for Subscriber +where + T: Send + Sync + 'static, +{ fn eq(&self, other: &Self) -> bool { self.uuid == other.uuid } } -impl Eq for Subscriber {} +impl Eq for Subscriber where T: Send + Sync {} #[derive(Debug, PartialEq, Eq)] pub struct Subscription { pub uuid: Uuid, } -impl From> for Subscription { +impl From> for Subscription +where + T: Send + Sync + 'static, +{ fn from(subscriber: Subscriber) -> Self { Self { uuid: subscriber.uuid, @@ -75,7 +95,10 @@ impl From> for Subscription { } } -impl From<&Subscriber> for Subscription { +impl From<&Subscriber> for Subscription +where + T: Send + Sync + 'static, +{ fn from(subscriber: &Subscriber) -> Self { Self { uuid: subscriber.uuid, @@ -89,12 +112,18 @@ impl AsRef for Subscription { } } -pub struct ReceiverSubscription { +pub struct ReceiverSubscription +where + T: Send + Sync + 'static, +{ pub subscription: Subscription, pub receiver: Receiver, } -impl ReceiverSubscription { +impl ReceiverSubscription +where + T: Send + Sync + 'static, +{ pub fn new(subscription: Subscription, receiver: Receiver) -> Self { Self { subscription, @@ -103,21 +132,30 @@ impl ReceiverSubscription { } } -impl PartialEq for ReceiverSubscription { +impl PartialEq for ReceiverSubscription +where + T: Send + Sync + 'static, +{ fn eq(&self, other: &Self) -> bool { self.subscription == other.subscription } } -impl Eq for ReceiverSubscription {} +impl Eq for ReceiverSubscription where T: Send + Sync {} -impl AsRef for ReceiverSubscription { +impl AsRef for ReceiverSubscription +where + T: Send + Sync + 'static, +{ fn as_ref(&self) -> &Subscription { &self.subscription } } -impl AsRef for ReceiverSubscription { +impl AsRef for ReceiverSubscription +where + T: Send + Sync + 'static, +{ fn as_ref(&self) -> &Uuid { self.subscription.as_ref() } From 2e6f93394db9e80a1260e94e08beb6020ef4e983 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 21 Sep 2024 16:50:31 +0200 Subject: [PATCH 18/25] refactor: event subscribe method names --- src/event/event.rs | 6 +++--- src/event/event_repeater.rs | 5 ++++- src/event/subscriber.rs | 10 +++++----- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/src/event/event.rs b/src/event/event.rs index 9a45919..4b3ef5e 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -39,7 +39,7 @@ where subscribers.len() } - pub async fn open_channel( + pub async fn subscribe_channel( &self, name: S, buffer: usize, @@ -61,7 +61,7 @@ where receiver_subscription } - pub async fn subscribe_async( + pub async fn subscribe_async_closure( &self, name: S, closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, @@ -85,7 +85,7 @@ where subscription } - pub async fn subscribe( + pub async fn subscribe_closure( &self, name: S, closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs index e3abeab..724d0f8 100644 --- a/src/event/event_repeater.rs +++ b/src/event/event_repeater.rs @@ -43,7 +43,10 @@ where None => panic!("Tried to attach event {} to EventRepeater {} before it was initialized. Did you not use EventRepeater::new()?", event.name, self.event.name), }; - let mut receiver = event.open_channel(&self.event.name, buffer, true, true).await; + let mut receiver = event + .subscribe_channel(&self.event.name, buffer, true, true) + .await; + let join_handle = tokio::spawn(async move { while let Some(value) = receiver.receiver.recv().await { let _ = self_arc.event.dispatch(value).await; diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs index 48b29f6..4f5aa58 100644 --- a/src/event/subscriber.rs +++ b/src/event/subscriber.rs @@ -11,8 +11,8 @@ where T: Send + Sync + 'static, { Channel(Sender>), - AsyncClosure(Box) -> PinnedBoxedFutureResult<()> + Send + Sync>), Closure(Box) -> Result<(), BoxedError> + Send + Sync>), + AsyncClosure(Box) -> PinnedBoxedFutureResult<()> + Send + Sync>), } #[derive(Debug, Error)] @@ -23,11 +23,11 @@ where #[error("Failed to send data to channel: {0}")] ChannelSend(#[from] SendError>), - #[error("Failed to dispatch data to async closure: {0}")] - AsyncClosure(BoxedError), - #[error("Failed to dispatch data to closure: {0}")] Closure(BoxedError), + + #[error("Failed to dispatch data to async closure: {0}")] + AsyncClosure(BoxedError), } pub struct Subscriber @@ -62,8 +62,8 @@ where pub async fn dispatch(&self, data: Arc) -> Result<(), DispatchError> { match &self.callback { Callback::Channel(sender) => sender.send(data).await.map_err(DispatchError::ChannelSend), - Callback::AsyncClosure(closure) => closure(data).await.map_err(DispatchError::AsyncClosure), Callback::Closure(closure) => closure(data).map_err(DispatchError::Closure), + Callback::AsyncClosure(closure) => closure(data).await.map_err(DispatchError::AsyncClosure), } } } From abe190afb9ca4f9dd87f4ba3c582b9264eb2000c Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 21 Sep 2024 16:59:00 +0200 Subject: [PATCH 19/25] refactor: move subscription into own module --- src/event.rs | 2 + src/event/event.rs | 5 +-- src/event/subscriber.rs | 84 +-------------------------------------- src/event/subscription.rs | 71 +++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 87 deletions(-) create mode 100644 src/event/subscription.rs diff --git a/src/event.rs b/src/event.rs index 4c6721c..0eab11f 100644 --- a/src/event.rs +++ b/src/event.rs @@ -3,9 +3,11 @@ pub mod event; pub mod event_repeater; pub mod observable; pub mod subscriber; +pub mod subscription; pub use arc_observable::ArcObservable; pub use event::Event; pub use event_repeater::EventRepeater; pub use observable::{Observable, ObservableResult}; pub use subscriber::{Callback, DispatchError, Subscriber}; +pub use subscription::{ReceiverSubscription, Subscription}; diff --git a/src/event/event.rs b/src/event/event.rs index 4b3ef5e..eea2473 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -6,10 +6,7 @@ use std::{ }; use tokio::sync::{mpsc::channel, Mutex}; -use super::{ - subscriber::{ReceiverSubscription, Subscription}, - Callback, DispatchError, Subscriber, -}; +use super::{Callback, DispatchError, ReceiverSubscription, Subscriber, Subscription}; pub struct Event where diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs index 4f5aa58..8fd8e51 100644 --- a/src/event/subscriber.rs +++ b/src/event/subscriber.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use thiserror::Error; -use tokio::sync::mpsc::{error::SendError, Receiver, Sender}; +use tokio::sync::mpsc::{error::SendError, Sender}; use uuid::Uuid; use crate::service::{BoxedError, PinnedBoxedFutureResult}; @@ -78,85 +78,3 @@ where } impl Eq for Subscriber where T: Send + Sync {} - -#[derive(Debug, PartialEq, Eq)] -pub struct Subscription { - pub uuid: Uuid, -} - -impl From> for Subscription -where - T: Send + Sync + 'static, -{ - fn from(subscriber: Subscriber) -> Self { - Self { - uuid: subscriber.uuid, - } - } -} - -impl From<&Subscriber> for Subscription -where - T: Send + Sync + 'static, -{ - fn from(subscriber: &Subscriber) -> Self { - Self { - uuid: subscriber.uuid, - } - } -} - -impl AsRef for Subscription { - fn as_ref(&self) -> &Uuid { - &self.uuid - } -} - -pub struct ReceiverSubscription -where - T: Send + Sync + 'static, -{ - pub subscription: Subscription, - pub receiver: Receiver, -} - -impl ReceiverSubscription -where - T: Send + Sync + 'static, -{ - pub fn new(subscription: Subscription, receiver: Receiver) -> Self { - Self { - subscription, - receiver, - } - } -} - -impl PartialEq for ReceiverSubscription -where - T: Send + Sync + 'static, -{ - fn eq(&self, other: &Self) -> bool { - self.subscription == other.subscription - } -} - -impl Eq for ReceiverSubscription where T: Send + Sync {} - -impl AsRef for ReceiverSubscription -where - T: Send + Sync + 'static, -{ - fn as_ref(&self) -> &Subscription { - &self.subscription - } -} - -impl AsRef for ReceiverSubscription -where - T: Send + Sync + 'static, -{ - fn as_ref(&self) -> &Uuid { - self.subscription.as_ref() - } -} diff --git a/src/event/subscription.rs b/src/event/subscription.rs new file mode 100644 index 0000000..e10f4dc --- /dev/null +++ b/src/event/subscription.rs @@ -0,0 +1,71 @@ +use tokio::sync::mpsc::Receiver; +use uuid::Uuid; + +use super::Subscriber; + +#[derive(Debug, PartialEq, Eq)] +pub struct Subscription { + pub uuid: Uuid, +} + +impl From> for Subscription +where + T: Send + Sync + 'static, +{ + fn from(subscriber: Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +impl From<&Subscriber> for Subscription +where + T: Send + Sync + 'static, +{ + fn from(subscriber: &Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +pub struct ReceiverSubscription +where + T: Send + Sync + 'static, +{ + pub subscription: Subscription, + pub receiver: Receiver, +} + +impl ReceiverSubscription +where + T: Send + Sync + 'static, +{ + pub fn new(subscription: Subscription, receiver: Receiver) -> Self { + Self { + subscription, + receiver, + } + } +} + +impl PartialEq for ReceiverSubscription +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.subscription == other.subscription + } +} + +impl Eq for ReceiverSubscription where T: Send + Sync {} + +impl AsRef for ReceiverSubscription +where + T: Send + Sync + 'static, +{ + fn as_ref(&self) -> &Subscription { + &self.subscription + } +} From 05b6d49e1e5c20ccb10426c61104af7bc8e70591 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 21 Sep 2024 18:35:27 +0200 Subject: [PATCH 20/25] add: AsRef> --- src/event/event_repeater.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs index 724d0f8..61a3fd7 100644 --- a/src/event/event_repeater.rs +++ b/src/event/event_repeater.rs @@ -57,3 +57,12 @@ where tasks.push(join_handle); } } + +impl AsRef> for EventRepeater +where + T: Send + Sync + 'static, +{ + fn as_ref(&self) -> &Event { + &self.event + } +} From cb8fa7e528843c82bdb62071a73cb93e4acd5357 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 21 Sep 2024 20:00:33 +0200 Subject: [PATCH 21/25] add: UUID, PartialEq/Eq, unsubscribe() --- src/event/event.rs | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/src/event/event.rs b/src/event/event.rs index eea2473..d714381 100644 --- a/src/event/event.rs +++ b/src/event/event.rs @@ -5,6 +5,7 @@ use std::{ sync::Arc, }; use tokio::sync::{mpsc::channel, Mutex}; +use uuid::Uuid; use super::{Callback, DispatchError, ReceiverSubscription, Subscriber, Subscription}; @@ -14,6 +15,7 @@ where { pub name: String, + pub uuid: Uuid, subscribers: Mutex>>, } @@ -27,6 +29,7 @@ where { Self { name: name.into(), + uuid: Uuid::new_v4(), subscribers: Mutex::new(Vec::new()), } } @@ -106,6 +109,25 @@ where subscription } + pub async fn unsubscribe(&self, subscription: S) -> Option + where + S: Into, + { + let subscription_to_remove = subscription.into(); + + let mut subscribers = self.subscribers.lock().await; + let index = subscribers + .iter() + .position(|subscription_of_event| subscription_of_event.uuid == subscription_to_remove.uuid); + + if let Some(index) = index { + subscribers.remove(index); + None + } else { + Some(subscription_to_remove) + } + } + pub async fn dispatch(&self, data: Arc) -> Result<(), Vec>> { let mut errors = Vec::new(); let mut subscribers_to_remove = Vec::new(); @@ -149,12 +171,24 @@ where } } +impl PartialEq for Event +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Event where T: Send + Sync {} + impl Debug for Event where T: Send + Sync + 'static, { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { f.debug_struct(type_name::()) + .field("uuid", &self.uuid) .field("name", &self.name) .field("subscribers", &self.subscribers.blocking_lock().len()) .finish() From 0d3c1c1214694bf79a0aaa2fac6fdcbb2c1a0cf2 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sun, 22 Sep 2024 15:34:01 +0200 Subject: [PATCH 22/25] add: event_repeater detach(), close() --- src/event/event_repeater.rs | 106 ++++++++++++++++++++++++++++++++---- src/event/observable.rs | 2 +- 2 files changed, 97 insertions(+), 11 deletions(-) diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs index 61a3fd7..d585df7 100644 --- a/src/event/event_repeater.rs +++ b/src/event/event_repeater.rs @@ -1,7 +1,46 @@ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; +use thiserror::Error; use tokio::{sync::Mutex, task::JoinHandle}; +use uuid::Uuid; -use super::Event; +use super::{Event, Subscription}; + +#[derive(Debug, Error)] +pub enum AttachError { + #[error("Tried to attach event {event_name} to EventRepeater {repeater_name} before it was initialized. Did you not use EventRepeater::new()?")] + NotInitialized { + event_name: String, + repeater_name: String, + }, + + #[error( + "Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached." + )] + AlreadyAttached { + event_name: String, + repeater_name: String, + }, +} + +#[derive(Debug, Error)] +pub enum DetachError { + #[error( + "Tried to detach event {event_name} from EventRepeater {repeater_name}, which was not attached." + )] + NotAttached { + event_name: String, + repeater_name: String, + }, +} + +#[derive(Error)] +pub enum CloseError +where + T: Send + Sync + 'static, +{ + #[error("EventRepeater still has attached events. Detach all events before closing.")] + AttachedEvents(EventRepeater), +} pub struct EventRepeater where @@ -9,7 +48,7 @@ where { pub event: Event, self_arc: Mutex>>, - tasks: Mutex>>, + subscriptions: Mutex)>>, } impl EventRepeater @@ -25,7 +64,7 @@ where let event_repeater = Self { self_arc: Mutex::new(None), event, - tasks: Mutex::new(Vec::new()), + subscriptions: Mutex::new(HashMap::new()), }; let self_arc = Arc::new(event_repeater); @@ -37,24 +76,71 @@ where self_arc } - pub async fn attach(&self, event: &Event, buffer: usize) { + pub async fn subscription_count(&self) -> usize { + self.subscriptions.lock().await.len() + } + + pub async fn attach(&self, event: &Event, buffer: usize) -> Result<(), AttachError> { let self_arc = match self.self_arc.lock().await.as_ref() { Some(arc) => Arc::clone(arc), - None => panic!("Tried to attach event {} to EventRepeater {} before it was initialized. Did you not use EventRepeater::new()?", event.name, self.event.name), + None => { + return Err(AttachError::NotInitialized { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }) + } }; - let mut receiver = event + let mut subscriptions = self.subscriptions.lock().await; + if subscriptions.contains_key(&event.uuid) { + return Err(AttachError::AlreadyAttached { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }); + } + + let receiver_subscription = event .subscribe_channel(&self.event.name, buffer, true, true) .await; + let subscription = receiver_subscription.subscription; + let mut receiver = receiver_subscription.receiver; + let join_handle = tokio::spawn(async move { - while let Some(value) = receiver.receiver.recv().await { + while let Some(value) = receiver.recv().await { let _ = self_arc.event.dispatch(value).await; } }); + subscriptions.insert(event.uuid, (subscription, join_handle)); + + Ok(()) + } + + pub async fn detach(&self, event: &Event) -> Result<(), DetachError> { + let mut subscriptions = self.subscriptions.lock().await; + + let subscription = match subscriptions.remove(&event.uuid) { + Some(subscription) => subscription, + None => { + return Err(DetachError::NotAttached { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }) + } + }; + subscription.1.abort(); + + Ok(()) + } + + pub async fn close(self) -> Result<(), CloseError> { + let subscription_count = self.subscription_count().await; + + if subscription_count > 0 { + return Err(CloseError::AttachedEvents(self)); + } - let mut tasks = self.tasks.lock().await; - tasks.push(join_handle); + Ok(()) } } diff --git a/src/event/observable.rs b/src/event/observable.rs index 788d7cd..7ce3775 100644 --- a/src/event/observable.rs +++ b/src/event/observable.rs @@ -16,7 +16,7 @@ where #[derive(Debug)] pub struct Observable where - T: Send + Sync + 'static + Clone + PartialEq, + T: Send + Sync + 'static + Clone + PartialEq, //TODO: Try out if we can remove Sync here { value: Mutex, on_change: Event, From 4ff9c8bc04617384b669c3b4fc47ff3d7a68ee2d Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sun, 22 Sep 2024 16:28:13 +0200 Subject: [PATCH 23/25] add: attach/deattach EventRepeater on start/stop of service --- src/event/observable.rs | 9 +++++++++ src/service/service_manager.rs | 19 ++++++++++++++++--- src/service/types.rs | 21 +++++++++++++++++++-- 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/src/event/observable.rs b/src/event/observable.rs index 7ce3775..e3fd726 100644 --- a/src/event/observable.rs +++ b/src/event/observable.rs @@ -60,3 +60,12 @@ where } } } + +impl AsRef> for Observable +where + T: Send + Sync + 'static + Clone + PartialEq, +{ + fn as_ref(&self) -> &Event { + &self.on_change + } +} diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 3028997..8bd2b16 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -3,8 +3,7 @@ use super::{ types::{LifetimedPinnedBoxedFuture, OverallStatus, Priority, ShutdownError, StartupError, Status}, }; use crate::{ - service::Watchdog, - setlock::{SetLock, SetLockError}, + event::EventRepeater, service::Watchdog, setlock::{SetLock, SetLockError} }; use log::{error, info, warn}; use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; @@ -58,6 +57,7 @@ pub fn new() -> Self { arc: Mutex::new(SetLock::new()), services: self.services, background_tasks: Mutex::new(HashMap::new()), + on_status_change: EventRepeater::new("service_manager_on_status_change").await, }; let self_arc = Arc::new(service_manager); @@ -81,6 +81,8 @@ pub struct ServiceManager { arc: Mutex>>, services: Vec>>, background_tasks: Mutex>>, + + pub on_status_change: Arc>, } impl ServiceManager { @@ -118,8 +120,13 @@ impl ServiceManager { return Err(StartupError::BackgroundTaskAlreadyRunning(service_id.clone())); } - service_lock.info().status.set(Status::Starting).await; + let service_status_event = service_lock.info().status.as_ref(); + let attachment_result = self.on_status_change.attach(service_status_event, 2).await; + if let Err(err) = attachment_result { + return Err(StartupError::StatusAttachmentFailed(service_id.clone(), err)); + } + service_lock.info().status.set(Status::Starting).await; self.init_service(&mut service_lock).await?; self.start_background_task(&service_lock, Arc::clone(&service)) .await; @@ -149,6 +156,12 @@ impl ServiceManager { self.shutdown_service(&mut service_lock).await?; + let service_status_event = service_lock.info().status.as_ref(); + let detach_result = self.on_status_change.detach(service_status_event).await; + if let Err(err) = detach_result { + return Err(ShutdownError::StatusDetachmentFailed(service_id.clone(), err)); + } + info!("Stopped service {}", service_lock.info().name); Ok(()) diff --git a/src/service/types.rs b/src/service/types.rs index 32d7b8c..6d388de 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -2,6 +2,8 @@ use std::{error::Error, fmt::Display, future::Future, pin::Pin}; use thiserror::Error; +use crate::event::event_repeater::{AttachError, DetachError}; + pub type BoxedError = Box; pub type BoxedFuture = Box + Send>; @@ -89,10 +91,18 @@ impl Display for Priority { pub enum StartupError { #[error("Service {0} is not managed by this Service Manager")] ServiceNotManaged(String), - #[error("Service {0} already has a background task running")] - BackgroundTaskAlreadyRunning(String), + #[error("Service {0} is not stopped")] ServiceNotStopped(String), + + #[error("Service {0} already has a background task running")] + BackgroundTaskAlreadyRunning(String), + + #[error( + "Failed to attach Service Manager's status_change EventRepeater to {0}'s status_change Event: {1}" + )] + StatusAttachmentFailed(String, AttachError), + #[error("Service {0} failed to start")] FailedToStartService(String), } @@ -101,8 +111,15 @@ pub enum StartupError { pub enum ShutdownError { #[error("Service {0} is not managed by this Service Manager")] ServiceNotManaged(String), + #[error("Service {0} is not started")] ServiceNotStarted(String), + #[error("Service {0} failed to stop")] FailedToStopService(String), + + #[error( + "Failed to detach Service Manager's status_change EventRepeater from {0}'s status_change Event: {1}" + )] + StatusDetachmentFailed(String, DetachError), } From 2155cbfde29c882226818563d2be79fdbc08cdec Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 4 Oct 2024 15:44:53 +0200 Subject: [PATCH 24/25] add: service runtime failure handling --- src/bot.rs | 62 ++++++++++++++++++++++++++------- src/lib.rs | 27 ++++++++++----- src/service/discord.rs | 39 ++++++--------------- src/service/service_manager.rs | 63 +++++++++++++++++++++------------- 4 files changed, 118 insertions(+), 73 deletions(-) diff --git a/src/bot.rs b/src/bot.rs index 79df9a5..37dce6e 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -1,13 +1,26 @@ -use std::sync::Arc; +use core::fmt; +use std::{fmt::Display, sync::Arc}; -use log::info; -use tokio::sync::Mutex; +use log::error; +use tokio::{signal, sync::Mutex}; -use crate::service::{types::LifetimedPinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder}; +use crate::service::{ + types::LifetimedPinnedBoxedFuture, OverallStatus, Service, ServiceManager, ServiceManagerBuilder, +}; +#[derive(Debug, Clone, Copy)] pub enum ExitReason { SIGINT, - EssentialServiceFailed(String), + EssentialServiceFailed, +} + +impl Display for ExitReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::SIGINT => write!(f, "SIGINT"), + Self::EssentialServiceFailed => write!(f, "Essential Service Failed"), + } + } } pub struct BotBuilder { @@ -72,15 +85,40 @@ impl Bot { } pub async fn join(&self) -> ExitReason { - match tokio::signal::ctrl_c().await { - Ok(_) => { - info!("Received SIGINT, {} will now shut down", self.name); + let name_clone = self.name.clone(); + let signal_task = tokio::spawn(async move { + let name = name_clone; + + let result = signal::ctrl_c().await; + if let Err(error) = result { + error!( + "Error receiving SIGINT: {}. {} will exit ungracefully immediately to prevent undefined behavior.", + error, name + ); + panic!("Error receiving SIGINT: {}", error); } - Err(error) => { - panic!("Error receiving SIGINT: {}\n{} will exit.", error, self.name); + }); + + let service_manager_clone = self.service_manager.clone(); + let mut receiver = self + .service_manager + .on_status_change + .event + .subscribe_channel("t", 2, true, true) + .await; + let status_task = tokio::spawn(async move { + let service_manager = service_manager_clone; + while (receiver.receiver.recv().await).is_some() { + let overall_status = service_manager.overall_status().await; + if overall_status == OverallStatus::Unhealthy { + return; + } } - } + }); - ExitReason::SIGINT + tokio::select! { + _ = signal_task => ExitReason::SIGINT, + _ = status_task => ExitReason::EssentialServiceFailed, + } } } diff --git a/src/lib.rs b/src/lib.rs index 793ec3f..dc2f19b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,9 +21,7 @@ pub async fn run(mut bot: Bot) { } let now = SystemTime::now(); - bot.start().await; - match now.elapsed() { Ok(elapsed) => info!("Startup took {}ms", elapsed.as_millis()), Err(error) => { @@ -36,21 +34,34 @@ pub async fn run(mut bot: Bot) { }; if bot.service_manager.overall_status().await != OverallStatus::Healthy { - let status_tree = bot.service_manager.status_tree().await; + let status_overview = bot.service_manager.status_overview().await; - error!("{} is not healthy! Some essential services did not start up successfully. Please check the logs.\nService status tree:\n{}\n{} will exit.", + error!("{} is not healthy! Some essential services did not start up successfully. {} will now exit ungracefully.\n\n{}", + bot.name, bot.name, - status_tree, - bot.name); + status_overview); return; } info!("{} is alive", bot.name,); //TODO: Add CLI commands + let exit_reason = bot.join().await; + match exit_reason { + bot::ExitReason::SIGINT => info!( + "{} received a SIGINT signal! Attempting to shut down gracefully.", + bot.name + ), + bot::ExitReason::EssentialServiceFailed => { + let status_overview = bot.service_manager.status_overview().await; + error!( + "An essential service failed! Attempting to shut down gracefully.\n{}", + status_overview + ); + } + } bot.stop().await; - - info!("{} has shut down", bot.name); + info!("Oyasumi 💤"); } diff --git a/src/service/discord.rs b/src/service/discord.rs index bd77d13..47ec3d2 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -97,7 +97,6 @@ impl Service for DiscordService { return Err(format!("Failed to set ws_url SetLock: {}", error).into()); } - info!("Connecting to Discord"); let client_handle = spawn(async move { client.start().await }); select! { @@ -120,40 +119,22 @@ impl Service for DiscordService { if let Some(client_handle) = self.client_handle.take() { info!("Waiting for Discord client to stop..."); - client_handle.abort(); - let result = convert_thread_result(client_handle).await; + client_handle.abort(); // Should trigger a JoinError in the client_handle, if the task hasn't already ended + + // If the thread ended WITHOUT a JoinError, the client already stopped unexpectedly + let result = async move { + match client_handle.await { + Ok(result) => result, + Err(_) => Ok(()), + } + } + .await; result?; } Ok(()) }) } - - fn task<'a>(&self) -> Option> { - Some(Box::pin(async move { - let mut i = 0; - loop { - sleep(Duration::from_secs(1)).await; - if i < 5 { - i += 1; - info!("Wohoo!"); - } else { - info!("Bye!"); - break; - } - } - - Err("Sheesh".into()) - })) - } -} - -// If the thread ended WITHOUT a JoinError from aborting, the client already stopped unexpectedly -async fn convert_thread_result(client_handle: JoinHandle>) -> Result<(), Error> { - match client_handle.await { - Ok(result) => result, - Err(_) => Ok(()), - } } struct EventHandler { diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 8bd2b16..6d25e44 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -79,9 +79,9 @@ pub fn new() -> Self { pub struct ServiceManager { arc: Mutex>>, - services: Vec>>, background_tasks: Mutex>>, + pub services: Vec>>, pub on_status_change: Arc>, } @@ -225,9 +225,13 @@ impl ServiceManager { Box::pin(async move { for service in self.services.iter() { let service = service.lock().await; - let status = service.info().status.get().await; - if !matches!(status, Status::Started) { + if service.info().priority != Priority::Essential { + continue; + } + + let status = service.info().status.get().await; + if status != Status::Started { return OverallStatus::Unhealthy; } } @@ -237,15 +241,15 @@ impl ServiceManager { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn status_tree(&self) -> LifetimedPinnedBoxedFuture<'_, String> { + pub fn status_overview(&self) -> LifetimedPinnedBoxedFuture<'_, String> { Box::pin(async move { let mut text_buffer = String::new(); - let mut failed_essentials = String::new(); - let mut failed_optionals = String::new(); - let mut non_failed_essentials = String::new(); - let mut non_failed_optionals = String::new(); - let mut others = String::new(); + let mut failed_essentials = Vec::new(); + let mut failed_optionals = Vec::new(); + let mut non_failed_essentials = Vec::new(); + let mut non_failed_optionals = Vec::new(); + let mut others = Vec::new(); for service in self.services.iter() { let service = service.lock().await; @@ -256,53 +260,64 @@ impl ServiceManager { match status { Status::Started | Status::Stopped => match priority { Priority::Essential => { - non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + non_failed_essentials.push(format!(" - {}: {}", info.name, status)); } Priority::Optional => { - non_failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + non_failed_optionals.push(format!(" - {}: {}", info.name, status)); } }, Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { match priority { Priority::Essential => { - failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + failed_essentials.push(format!(" - {}: {}", info.name, status)); } Priority::Optional => { - failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + failed_optionals.push(format!(" - {}: {}", info.name, status)); } } } _ => { - others.push_str(&format!(" - {}: {}\n", info.name, status)); + others.push(format!(" - {}: {}", info.name, status)); } } } if !failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed essential services")); - text_buffer.push_str(&failed_essentials); + text_buffer.push_str(&format!("{}:\n", "Failed essential services")); + text_buffer.push_str(failed_essentials.join("\n").as_str()); } if !failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed optional services")); - text_buffer.push_str(&failed_optionals); + text_buffer.push_str(&format!("{}:\n", "Failed optional services")); + text_buffer.push_str(failed_optionals.join("\n").as_str()); } if !non_failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Essential services")); - text_buffer.push_str(&non_failed_essentials); + text_buffer.push_str(&format!("{}:\n", "Essential services")); + text_buffer.push_str(non_failed_essentials.join("\n").as_str()); } if !non_failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Optional services")); - text_buffer.push_str(&non_failed_optionals); + text_buffer.push_str(&format!("{}:\n", "Optional services")); + text_buffer.push_str(non_failed_optionals.join("\n").as_str()); } if !others.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Other services")); - text_buffer.push_str(&others); + text_buffer.push_str(&format!("{}:\n", "Other services")); + text_buffer.push_str(others.join("\n").as_str()); } + let longest_width = text_buffer + .lines() + .map(|line| line.len()) + .max() + .unwrap_or(0); + + let mut headline = String::from("Status overview\n"); + headline.push_str("─".repeat(longest_width).as_str()); + headline.push('\n'); + text_buffer.insert_str(0, &headline); + text_buffer }) } From 456e86c18f66b37a1fe29cc1d21cfbf59159e254 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 4 Oct 2024 15:58:41 +0200 Subject: [PATCH 25/25] fix: bump version to 0.2.1 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 4aedd4e..6f9421f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lum" -version = "0.1.0" +version = "0.2.1" edition = "2021" description = "Lum Discord Bot" license= "MIT"