diff --git a/Cargo.toml b/Cargo.toml index adbdd2a..6f9421f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lum" -version = "0.1.0" +version = "0.2.1" edition = "2021" description = "Lum Discord Bot" license= "MIT" @@ -23,3 +23,4 @@ serenity = { version = "0.12.0", default-features=false, features = ["builder", sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] } thiserror = "1.0.52" tokio = { version = "1.35.1", features = ["full"] } +uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } \ No newline at end of file diff --git a/README.md b/README.md index 03a82eb..985ef57 100644 --- a/README.md +++ b/README.md @@ -14,4 +14,4 @@ Beta: [![Deploy](https://github.com/Kitt3120/lum/actions/workflows/deploy_prerel # Collaborating -Checkout out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues) +Check out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues) diff --git a/build.rs b/build.rs index 7609593..d506869 100644 --- a/build.rs +++ b/build.rs @@ -2,4 +2,4 @@ fn main() { // trigger recompilation when a new migration is added println!("cargo:rerun-if-changed=migrations"); -} \ No newline at end of file +} diff --git a/src/bot.rs b/src/bot.rs index 69bda29..37dce6e 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -1,8 +1,27 @@ -use std::sync::Arc; +use core::fmt; +use std::{fmt::Display, sync::Arc}; -use tokio::sync::RwLock; +use log::error; +use tokio::{signal, sync::Mutex}; -use crate::service::{PinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder}; +use crate::service::{ + types::LifetimedPinnedBoxedFuture, OverallStatus, Service, ServiceManager, ServiceManagerBuilder, +}; + +#[derive(Debug, Clone, Copy)] +pub enum ExitReason { + SIGINT, + EssentialServiceFailed, +} + +impl Display for ExitReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::SIGINT => write!(f, "SIGINT"), + Self::EssentialServiceFailed => write!(f, "Essential Service Failed"), + } + } +} pub struct BotBuilder { name: String, @@ -17,13 +36,13 @@ impl BotBuilder { } } - pub async fn with_service(mut self, service: Arc>) -> Self { + pub async fn with_service(mut self, service: Arc>) -> Self { self.service_manager = self.service_manager.with_service(service).await; // The ServiceManagerBuilder itself will warn when adding a service multiple times self } - pub async fn with_services(mut self, services: Vec>>) -> Self { + pub async fn with_services(mut self, services: Vec>>) -> Self { for service in services { self.service_manager = self.service_manager.with_service(service).await; } @@ -50,7 +69,7 @@ impl Bot { } //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future - pub fn start(&mut self) -> PinnedBoxedFuture<'_, ()> { + pub fn start(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { Box::pin(async move { self.service_manager.start_services().await; //TODO: Potential for further initialization here, like modules @@ -58,10 +77,48 @@ impl Bot { } //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future - pub fn stop(&mut self) -> PinnedBoxedFuture<'_, ()> { + pub fn stop(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { Box::pin(async move { self.service_manager.stop_services().await; //TODO: Potential for further deinitialization here, like modules }) } + + pub async fn join(&self) -> ExitReason { + let name_clone = self.name.clone(); + let signal_task = tokio::spawn(async move { + let name = name_clone; + + let result = signal::ctrl_c().await; + if let Err(error) = result { + error!( + "Error receiving SIGINT: {}. {} will exit ungracefully immediately to prevent undefined behavior.", + error, name + ); + panic!("Error receiving SIGINT: {}", error); + } + }); + + let service_manager_clone = self.service_manager.clone(); + let mut receiver = self + .service_manager + .on_status_change + .event + .subscribe_channel("t", 2, true, true) + .await; + let status_task = tokio::spawn(async move { + let service_manager = service_manager_clone; + while (receiver.receiver.recv().await).is_some() { + let overall_status = service_manager.overall_status().await; + if overall_status == OverallStatus::Unhealthy { + return; + } + } + }); + + tokio::select! { + _ = signal_task => ExitReason::SIGINT, + _ = status_task => ExitReason::EssentialServiceFailed, + } + } } diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 0000000..0eab11f --- /dev/null +++ b/src/event.rs @@ -0,0 +1,13 @@ +pub mod arc_observable; +pub mod event; +pub mod event_repeater; +pub mod observable; +pub mod subscriber; +pub mod subscription; + +pub use arc_observable::ArcObservable; +pub use event::Event; +pub use event_repeater::EventRepeater; +pub use observable::{Observable, ObservableResult}; +pub use subscriber::{Callback, DispatchError, Subscriber}; +pub use subscription::{ReceiverSubscription, Subscription}; diff --git a/src/event/arc_observable.rs b/src/event/arc_observable.rs new file mode 100644 index 0000000..b399b67 --- /dev/null +++ b/src/event/arc_observable.rs @@ -0,0 +1,60 @@ +use std::{ + hash::{DefaultHasher, Hash, Hasher}, + sync::Arc, +}; + +use tokio::sync::Mutex; + +use super::{Event, ObservableResult}; + +#[derive(Debug)] +pub struct ArcObservable +where + T: Send + 'static + Hash, +{ + value: Arc>, + on_change: Event>, +} + +impl ArcObservable +where + T: Send + 'static + Hash, +{ + pub fn new(value: T, event_name: impl Into) -> Self { + Self { + value: Arc::new(Mutex::new(value)), + on_change: Event::new(event_name), + } + } + + pub async fn get(&self) -> Arc> { + Arc::clone(&self.value) + } + + pub async fn set(&self, value: T) -> ObservableResult> { + let mut lock = self.value.lock().await; + + let mut hasher = DefaultHasher::new(); + (*lock).hash(&mut hasher); + let current_value = hasher.finish(); + + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + let new_value = hasher.finish(); + + if current_value == new_value { + return ObservableResult::Unchanged; + } + + *lock = value; + drop(lock); + + let value = Arc::clone(&self.value); + let dispatch_result = self.on_change.dispatch(value).await; + + match dispatch_result { + Ok(_) => ObservableResult::Changed(Ok(())), + Err(errors) => ObservableResult::Changed(Err(errors)), + } + } +} diff --git a/src/event/event.rs b/src/event/event.rs new file mode 100644 index 0000000..d714381 --- /dev/null +++ b/src/event/event.rs @@ -0,0 +1,196 @@ +use crate::service::{BoxedError, PinnedBoxedFutureResult}; +use std::{ + any::type_name, + fmt::{self, Debug, Formatter}, + sync::Arc, +}; +use tokio::sync::{mpsc::channel, Mutex}; +use uuid::Uuid; + +use super::{Callback, DispatchError, ReceiverSubscription, Subscriber, Subscription}; + +pub struct Event +where + T: Send + Sync + 'static, +{ + pub name: String, + + pub uuid: Uuid, + subscribers: Mutex>>, +} + +impl Event +where + T: Send + Sync + 'static, +{ + pub fn new(name: S) -> Self + where + S: Into, + { + Self { + name: name.into(), + uuid: Uuid::new_v4(), + subscribers: Mutex::new(Vec::new()), + } + } + + pub async fn subscriber_count(&self) -> usize { + let subscribers = self.subscribers.lock().await; + subscribers.len() + } + + pub async fn subscribe_channel( + &self, + name: S, + buffer: usize, + log_on_error: bool, + remove_on_error: bool, + ) -> ReceiverSubscription> + where + S: Into, + { + let (sender, receiver) = channel(buffer); + let subscriber = Subscriber::new(name, log_on_error, remove_on_error, Callback::Channel(sender)); + + let subscription = Subscription::from(&subscriber); + let receiver_subscription = ReceiverSubscription::new(subscription, receiver); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + receiver_subscription + } + + pub async fn subscribe_async_closure( + &self, + name: S, + closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, + log_on_error: bool, + remove_on_error: bool, + ) -> Subscription + where + S: Into, + { + let subscriber = Subscriber::new( + name, + log_on_error, + remove_on_error, + Callback::AsyncClosure(Box::new(closure)), + ); + let subscription = Subscription::from(&subscriber); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + subscription + } + + pub async fn subscribe_closure( + &self, + name: S, + closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + log_on_error: bool, + remove_on_error: bool, + ) -> Subscription + where + S: Into, + { + let subscriber = Subscriber::new( + name, + log_on_error, + remove_on_error, + Callback::Closure(Box::new(closure)), + ); + let subscription = Subscription::from(&subscriber); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + subscription + } + + pub async fn unsubscribe(&self, subscription: S) -> Option + where + S: Into, + { + let subscription_to_remove = subscription.into(); + + let mut subscribers = self.subscribers.lock().await; + let index = subscribers + .iter() + .position(|subscription_of_event| subscription_of_event.uuid == subscription_to_remove.uuid); + + if let Some(index) = index { + subscribers.remove(index); + None + } else { + Some(subscription_to_remove) + } + } + + pub async fn dispatch(&self, data: Arc) -> Result<(), Vec>> { + let mut errors = Vec::new(); + let mut subscribers_to_remove = Vec::new(); + + let mut subscribers = self.subscribers.lock().await; + for (index, subscriber) in subscribers.iter().enumerate() { + let data = Arc::clone(&data); + + let result = subscriber.dispatch(data).await; + if let Err(err) = result { + if subscriber.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to subscriber {}: {}.", + self.name, + subscriber.name, + err + ); + } + + if subscriber.remove_on_error { + if subscriber.log_on_error { + log::error!("Subscriber will be unregistered from event."); + } + + subscribers_to_remove.push(index); + } + + errors.push(err); + } + } + + for index in subscribers_to_remove.into_iter().rev() { + subscribers.remove(index); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +impl PartialEq for Event +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Event where T: Send + Sync {} + +impl Debug for Event +where + T: Send + Sync + 'static, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("uuid", &self.uuid) + .field("name", &self.name) + .field("subscribers", &self.subscribers.blocking_lock().len()) + .finish() + } +} diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs new file mode 100644 index 0000000..d585df7 --- /dev/null +++ b/src/event/event_repeater.rs @@ -0,0 +1,154 @@ +use std::{collections::HashMap, sync::Arc}; +use thiserror::Error; +use tokio::{sync::Mutex, task::JoinHandle}; +use uuid::Uuid; + +use super::{Event, Subscription}; + +#[derive(Debug, Error)] +pub enum AttachError { + #[error("Tried to attach event {event_name} to EventRepeater {repeater_name} before it was initialized. Did you not use EventRepeater::new()?")] + NotInitialized { + event_name: String, + repeater_name: String, + }, + + #[error( + "Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached." + )] + AlreadyAttached { + event_name: String, + repeater_name: String, + }, +} + +#[derive(Debug, Error)] +pub enum DetachError { + #[error( + "Tried to detach event {event_name} from EventRepeater {repeater_name}, which was not attached." + )] + NotAttached { + event_name: String, + repeater_name: String, + }, +} + +#[derive(Error)] +pub enum CloseError +where + T: Send + Sync + 'static, +{ + #[error("EventRepeater still has attached events. Detach all events before closing.")] + AttachedEvents(EventRepeater), +} + +pub struct EventRepeater +where + T: Send + Sync + 'static, +{ + pub event: Event, + self_arc: Mutex>>, + subscriptions: Mutex)>>, +} + +impl EventRepeater +where + T: Send + Sync + 'static, +{ + pub async fn new(name: S) -> Arc + where + T: 'static, + S: Into, + { + let event = Event::new(name); + let event_repeater = Self { + self_arc: Mutex::new(None), + event, + subscriptions: Mutex::new(HashMap::new()), + }; + + let self_arc = Arc::new(event_repeater); + let mut lock = self_arc.self_arc.lock().await; + let self_arc_clone = Arc::clone(&self_arc); + *lock = Some(self_arc_clone); + drop(lock); + + self_arc + } + + pub async fn subscription_count(&self) -> usize { + self.subscriptions.lock().await.len() + } + + pub async fn attach(&self, event: &Event, buffer: usize) -> Result<(), AttachError> { + let self_arc = match self.self_arc.lock().await.as_ref() { + Some(arc) => Arc::clone(arc), + None => { + return Err(AttachError::NotInitialized { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }) + } + }; + + let mut subscriptions = self.subscriptions.lock().await; + if subscriptions.contains_key(&event.uuid) { + return Err(AttachError::AlreadyAttached { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }); + } + + let receiver_subscription = event + .subscribe_channel(&self.event.name, buffer, true, true) + .await; + + let subscription = receiver_subscription.subscription; + let mut receiver = receiver_subscription.receiver; + + let join_handle = tokio::spawn(async move { + while let Some(value) = receiver.recv().await { + let _ = self_arc.event.dispatch(value).await; + } + }); + subscriptions.insert(event.uuid, (subscription, join_handle)); + + Ok(()) + } + + pub async fn detach(&self, event: &Event) -> Result<(), DetachError> { + let mut subscriptions = self.subscriptions.lock().await; + + let subscription = match subscriptions.remove(&event.uuid) { + Some(subscription) => subscription, + None => { + return Err(DetachError::NotAttached { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }) + } + }; + subscription.1.abort(); + + Ok(()) + } + + pub async fn close(self) -> Result<(), CloseError> { + let subscription_count = self.subscription_count().await; + + if subscription_count > 0 { + return Err(CloseError::AttachedEvents(self)); + } + + Ok(()) + } +} + +impl AsRef> for EventRepeater +where + T: Send + Sync + 'static, +{ + fn as_ref(&self) -> &Event { + &self.event + } +} diff --git a/src/event/observable.rs b/src/event/observable.rs new file mode 100644 index 0000000..e3fd726 --- /dev/null +++ b/src/event/observable.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use super::{DispatchError, Event}; + +#[derive(Debug)] +pub enum ObservableResult +where + T: Send + Sync + 'static, +{ + Unchanged, + Changed(Result<(), Vec>>), +} + +#[derive(Debug)] +pub struct Observable +where + T: Send + Sync + 'static + Clone + PartialEq, //TODO: Try out if we can remove Sync here +{ + value: Mutex, + on_change: Event, +} + +impl Observable +where + T: Send + Sync + 'static + Clone + PartialEq, +{ + pub fn new(value: T, event_name: I) -> Self + where + I: Into, + { + Self { + value: Mutex::new(value), + on_change: Event::new(event_name), + } + } + + pub async fn get(&self) -> T { + let lock = self.value.lock().await; + lock.clone() + } + + pub async fn set(&self, value: T) -> ObservableResult { + let mut lock = self.value.lock().await; + let current_value = lock.clone(); + + if current_value == value { + return ObservableResult::Unchanged; + } + + *lock = value.clone(); + + let value = Arc::new(value); + let dispatch_result = self.on_change.dispatch(value).await; + + match dispatch_result { + Ok(_) => ObservableResult::Changed(Ok(())), + Err(errors) => ObservableResult::Changed(Err(errors)), + } + } +} + +impl AsRef> for Observable +where + T: Send + Sync + 'static + Clone + PartialEq, +{ + fn as_ref(&self) -> &Event { + &self.on_change + } +} diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs new file mode 100644 index 0000000..8fd8e51 --- /dev/null +++ b/src/event/subscriber.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use thiserror::Error; +use tokio::sync::mpsc::{error::SendError, Sender}; +use uuid::Uuid; + +use crate::service::{BoxedError, PinnedBoxedFutureResult}; + +pub enum Callback +where + T: Send + Sync + 'static, +{ + Channel(Sender>), + Closure(Box) -> Result<(), BoxedError> + Send + Sync>), + AsyncClosure(Box) -> PinnedBoxedFutureResult<()> + Send + Sync>), +} + +#[derive(Debug, Error)] +pub enum DispatchError +where + T: Send + Sync + 'static, +{ + #[error("Failed to send data to channel: {0}")] + ChannelSend(#[from] SendError>), + + #[error("Failed to dispatch data to closure: {0}")] + Closure(BoxedError), + + #[error("Failed to dispatch data to async closure: {0}")] + AsyncClosure(BoxedError), +} + +pub struct Subscriber +where + T: Send + Sync + 'static, +{ + pub name: String, + pub log_on_error: bool, + pub remove_on_error: bool, + pub callback: Callback, + + pub uuid: Uuid, +} + +impl Subscriber +where + T: Send + Sync + 'static, +{ + pub fn new(name: S, log_on_error: bool, remove_on_error: bool, callback: Callback) -> Self + where + S: Into, + { + Self { + name: name.into(), + log_on_error, + remove_on_error, + callback, + uuid: Uuid::new_v4(), + } + } + + pub async fn dispatch(&self, data: Arc) -> Result<(), DispatchError> { + match &self.callback { + Callback::Channel(sender) => sender.send(data).await.map_err(DispatchError::ChannelSend), + Callback::Closure(closure) => closure(data).map_err(DispatchError::Closure), + Callback::AsyncClosure(closure) => closure(data).await.map_err(DispatchError::AsyncClosure), + } + } +} + +impl PartialEq for Subscriber +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Subscriber where T: Send + Sync {} diff --git a/src/event/subscription.rs b/src/event/subscription.rs new file mode 100644 index 0000000..e10f4dc --- /dev/null +++ b/src/event/subscription.rs @@ -0,0 +1,71 @@ +use tokio::sync::mpsc::Receiver; +use uuid::Uuid; + +use super::Subscriber; + +#[derive(Debug, PartialEq, Eq)] +pub struct Subscription { + pub uuid: Uuid, +} + +impl From> for Subscription +where + T: Send + Sync + 'static, +{ + fn from(subscriber: Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +impl From<&Subscriber> for Subscription +where + T: Send + Sync + 'static, +{ + fn from(subscriber: &Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +pub struct ReceiverSubscription +where + T: Send + Sync + 'static, +{ + pub subscription: Subscription, + pub receiver: Receiver, +} + +impl ReceiverSubscription +where + T: Send + Sync + 'static, +{ + pub fn new(subscription: Subscription, receiver: Receiver) -> Self { + Self { + subscription, + receiver, + } + } +} + +impl PartialEq for ReceiverSubscription +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.subscription == other.subscription + } +} + +impl Eq for ReceiverSubscription where T: Send + Sync {} + +impl AsRef for ReceiverSubscription +where + T: Send + Sync + 'static, +{ + fn as_ref(&self) -> &Subscription { + &self.subscription + } +} diff --git a/src/lib.rs b/src/lib.rs index f59cd5b..dc2f19b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ use std::time::SystemTime; pub mod bot; pub mod config; +pub mod event; pub mod log; pub mod service; pub mod setlock; @@ -16,14 +17,11 @@ pub fn is_debug() -> bool { pub async fn run(mut bot: Bot) { if !log::is_set_up() { eprintln!("Logger has not been set up!\n{} will exit.", bot.name); - return; } let now = SystemTime::now(); - bot.start().await; - match now.elapsed() { Ok(elapsed) => info!("Startup took {}ms", elapsed.as_millis()), Err(error) => { @@ -31,34 +29,39 @@ pub async fn run(mut bot: Bot) { "Error getting elapsed startup time: {}\n{} will exit.", error, bot.name ); - return; } }; if bot.service_manager.overall_status().await != OverallStatus::Healthy { - let status_tree = bot.service_manager.status_tree().await; + let status_overview = bot.service_manager.status_overview().await; - error!("{} is not healthy! Some essential services did not start up successfully. Please check the logs.\nService status tree:\n{}\n{} will exit.", + error!("{} is not healthy! Some essential services did not start up successfully. {} will now exit ungracefully.\n\n{}", + bot.name, bot.name, - status_tree, - bot.name); + status_overview); return; } info!("{} is alive", bot.name,); //TODO: Add CLI commands - match tokio::signal::ctrl_c().await { - Ok(_) => { - info!("Received SIGINT, {} will now shut down", bot.name); - } - Err(error) => { - panic!("Error receiving SIGINT: {}\n{} will exit.", error, bot.name); + + let exit_reason = bot.join().await; + match exit_reason { + bot::ExitReason::SIGINT => info!( + "{} received a SIGINT signal! Attempting to shut down gracefully.", + bot.name + ), + bot::ExitReason::EssentialServiceFailed => { + let status_overview = bot.service_manager.status_overview().await; + error!( + "An essential service failed! Attempting to shut down gracefully.\n{}", + status_overview + ); } } bot.stop().await; - - info!("{} has shut down", bot.name); + info!("Oyasumi 💤"); } diff --git a/src/main.rs b/src/main.rs index 7412001..9cc1d1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,7 @@ use lum::{ log, service::{discord::DiscordService, Service}, }; -use tokio::sync::RwLock; +use tokio::sync::Mutex; const BOT_NAME: &str = "Lum"; @@ -23,11 +23,7 @@ async fn main() { let config = match config_handler.load_config() { Ok(config) => config, Err(err) => { - error!( - "Error reading config file: {}\n{} will exit.", - err, BOT_NAME - ); - + error!("Error reading config file: {}\n{} will exit.", err, BOT_NAME); return; } }; @@ -43,18 +39,15 @@ async fn main() { fn setup_logger() { if let Err(error) = log::setup() { - panic!( - "Error setting up the Logger: {}\n{} will exit.", - error, BOT_NAME - ); + panic!("Error setting up the Logger: {}\n{} will exit.", error, BOT_NAME); } } -fn initialize_services(config: &Config) -> Vec>> { +fn initialize_services(config: &Config) -> Vec>> { //TODO: Add services //... let discord_service = DiscordService::new(config.discord_token.as_str()); - vec![Arc::new(RwLock::new(discord_service))] + vec![Arc::new(Mutex::new(discord_service))] } diff --git a/src/service.rs b/src/service.rs index 10131cc..e835fc2 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,5 +1,7 @@ pub mod discord; -pub mod service; +// TODO: Used for downcast_rs. Maybe this can be removed when updating the crate. +#[allow(clippy::multiple_bound_locations)] +pub mod service; // Will be fixed when lum gets seperated into multiple workspaces pub mod service_manager; pub mod types; pub mod watchdog; @@ -8,6 +10,6 @@ pub use service::{Service, ServiceInfo}; pub use service_manager::{ServiceManager, ServiceManagerBuilder}; pub use types::{ BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, - Priority, StartupError, Status, + Priority, ShutdownError, StartupError, Status, }; pub use watchdog::Watchdog; diff --git a/src/service/discord.rs b/src/service/discord.rs index d6f31ff..47ec3d2 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -1,6 +1,6 @@ use crate::setlock::SetLock; -use super::{PinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; +use super::{types::LifetimedPinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; use log::{error, info}; use serenity::{ all::{GatewayIntents, Ready}, @@ -20,10 +20,11 @@ use tokio::{ time::sleep, }; +//TODO: Restructure pub struct DiscordService { info: ServiceInfo, discord_token: String, - pub ready: Arc>>, + pub ready: Arc>>, client_handle: Option>>, pub cache: SetLock>, pub data: SetLock>>, @@ -38,7 +39,7 @@ impl DiscordService { Self { info: ServiceInfo::new("lum_builtin_discord", "Discord", Priority::Essential), discord_token: discord_token.to_string(), - ready: Arc::new(RwLock::new(SetLock::new())), + ready: Arc::new(Mutex::new(SetLock::new())), client_handle: None, cache: SetLock::new(), data: SetLock::new(), @@ -55,7 +56,7 @@ impl Service for DiscordService { &self.info } - fn start(&mut self, _service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()> { + fn start(&mut self, _service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()> { Box::pin(async move { let client_ready_notify = Arc::new(Notify::new()); @@ -96,7 +97,6 @@ impl Service for DiscordService { return Err(format!("Failed to set ws_url SetLock: {}", error).into()); } - info!("Connecting to Discord"); let client_handle = spawn(async move { client.start().await }); select! { @@ -114,54 +114,36 @@ impl Service for DiscordService { }) } - fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()> { + fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()> { Box::pin(async move { if let Some(client_handle) = self.client_handle.take() { info!("Waiting for Discord client to stop..."); - client_handle.abort(); - let result = convert_thread_result(client_handle).await; + client_handle.abort(); // Should trigger a JoinError in the client_handle, if the task hasn't already ended + + // If the thread ended WITHOUT a JoinError, the client already stopped unexpectedly + let result = async move { + match client_handle.await { + Ok(result) => result, + Err(_) => Ok(()), + } + } + .await; result?; } Ok(()) }) } - - fn task<'a>(&self) -> Option> { - Some(Box::pin(async move { - let mut i = 0; - loop { - sleep(Duration::from_secs(1)).await; - if i < 5 { - i += 1; - info!("Wohoo!"); - } else { - info!("Bye!"); - break; - } - } - - Err("Sheesh".into()) - })) - } -} - -// If the thread ended WITHOUT a JoinError from aborting, the client already stopped unexpectedly -async fn convert_thread_result(client_handle: JoinHandle>) -> Result<(), Error> { - match client_handle.await { - Ok(result) => result, - Err(_) => Ok(()), - } } struct EventHandler { - client: Arc>>, + client: Arc>>, ready_notify: Arc, } impl EventHandler { - pub fn new(client: Arc>>, ready_notify: Arc) -> Self { + pub fn new(client: Arc>>, ready_notify: Arc) -> Self { Self { client, ready_notify } } } @@ -170,7 +152,7 @@ impl EventHandler { impl client::EventHandler for EventHandler { async fn ready(&self, _ctx: Context, data_about_bot: Ready) { info!("Connected to Discord as {}", data_about_bot.user.tag()); - if let Err(error) = self.client.write().await.set(data_about_bot) { + if let Err(error) = self.client.lock().await.set(data_about_bot) { error!("Failed to set client SetLock: {}", error); panic!("Failed to set client SetLock: {}", error); } diff --git a/src/service/service.rs b/src/service/service.rs index 185f06a..4c0c83a 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -5,11 +5,12 @@ use std::{ }; use downcast_rs::{impl_downcast, DowncastSync}; -use tokio::sync::RwLock; + +use crate::event::Observable; use super::{ service_manager::ServiceManager, - types::{PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, Status}, + types::{LifetimedPinnedBoxedFuture, LifetimedPinnedBoxedFutureResult, Priority, Status}, }; #[derive(Debug)] @@ -18,7 +19,7 @@ pub struct ServiceInfo { pub name: String, pub priority: Priority, - pub status: Arc>, + pub status: Observable, } impl ServiceInfo { @@ -27,14 +28,9 @@ impl ServiceInfo { id: id.to_string(), name: name.to_string(), priority, - status: Arc::new(RwLock::new(Status::Stopped)), + status: Observable::new(Status::Stopped, format!("{}_status_change", id)), } } - - pub async fn set_status(&self, status: Status) { - let mut lock = self.status.write().await; - *(lock) = status - } } impl PartialEq for ServiceInfo { @@ -65,14 +61,14 @@ impl Hash for ServiceInfo { //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult pub trait Service: DowncastSync { fn info(&self) -> &ServiceInfo; - fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; - fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; - fn task<'a>(&self) -> Option> { + fn start(&mut self, service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + fn task<'a>(&self) -> Option> { None } - fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { - Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) + fn is_available(&self) -> LifetimedPinnedBoxedFuture<'_, bool> { + Box::pin(async move { matches!(self.info().status.get().await, Status::Started) }) } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index 91fb842..6d25e44 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -1,34 +1,36 @@ -use crate::{service::Watchdog, setlock::SetLock}; +use super::{ + service::Service, + types::{LifetimedPinnedBoxedFuture, OverallStatus, Priority, ShutdownError, StartupError, Status}, +}; +use crate::{ + event::EventRepeater, service::Watchdog, setlock::{SetLock, SetLockError} +}; use log::{error, info, warn}; use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; use tokio::{ spawn, - sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + sync::{Mutex, MutexGuard}, task::JoinHandle, time::timeout, }; -use super::{ - service::Service, - types::{OverallStatus, PinnedBoxedFuture, Priority, ShutdownError, StartupError, Status}, -}; #[derive(Default)] pub struct ServiceManagerBuilder { - services: Vec>>, + services: Vec>>, } impl ServiceManagerBuilder { - pub fn new() -> Self { +pub fn new() -> Self { Self { services: Vec::new() } } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub async fn with_service(mut self, service: Arc>) -> Self { - let lock = service.read().await; + pub async fn with_service(mut self, service: Arc>) -> Self { + let lock = service.lock().await; let mut found = false; for registered_service in self.services.iter() { - let registered_service = registered_service.read().await; + let registered_service = registered_service.lock().await; if registered_service.info().id == lock.info().id { found = true; @@ -52,17 +54,22 @@ impl ServiceManagerBuilder { pub async fn build(self) -> Arc { let service_manager = ServiceManager { - arc: RwLock::new(SetLock::new()), + arc: Mutex::new(SetLock::new()), services: self.services, - background_tasks: RwLock::new(HashMap::new()), + background_tasks: Mutex::new(HashMap::new()), + on_status_change: EventRepeater::new("service_manager_on_status_change").await, }; let self_arc = Arc::new(service_manager); + let self_arc_clone = Arc::clone(&self_arc); + + let result = self_arc_clone.arc.lock().await.set(Arc::clone(&self_arc_clone)); - match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { - Ok(()) => {} - Err(err) => { - panic!("Failed to set ServiceManager in SetLock for self_arc: {}", err); + if let Err(err) = result { + match err { + SetLockError::AlreadySet => { + unreachable!("Unable to set ServiceManager's self-arc in ServiceManagerBuilder because it was already set. This should never happen. How did you...?"); + } } } @@ -71,9 +78,11 @@ impl ServiceManagerBuilder { } pub struct ServiceManager { - arc: RwLock>>, - services: Vec>>, - background_tasks: RwLock>>, + arc: Mutex>>, + background_tasks: Mutex>>, + + pub services: Vec>>, + pub on_status_change: Arc>, } impl ServiceManager { @@ -81,9 +90,12 @@ impl ServiceManager { ServiceManagerBuilder::new() } - pub async fn manages_service(&self, service_id: &str) -> bool { - for registered_service in self.services.iter() { - if registered_service.read().await.info().id == service_id { + pub async fn manages_service(&self, service_id: &str) -> bool + { + for service in self.services.iter() { + let service_lock = service.lock().await; + + if service_lock.info().id == service_id { return true; } } @@ -91,56 +103,67 @@ impl ServiceManager { false } - pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { - let service_lock = service.read().await; + pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { + let service_id = service.lock().await.info().id.clone(); + if !self.manages_service(&service_id).await { + return Err(StartupError::ServiceNotManaged(service_id.clone())); + } - let service_id = service_lock.info().id.clone(); + let mut service_lock = service.lock().await; - if !self.manages_service(&service_id).await { - return Err(StartupError::ServiceNotManaged(service_id)); + let status = service_lock.info().status.get().await; + if !matches!(status, Status::Stopped) { + return Err(StartupError::ServiceNotStopped(service_id.clone())); } - if !self.is_service_stopped(&service_lock).await { - return Err(StartupError::ServiceNotStopped(service_id)); + if self.has_background_task_registered(&service_id).await { + return Err(StartupError::BackgroundTaskAlreadyRunning(service_id.clone())); } - if self.has_background_task_running(&service_lock).await { - return Err(StartupError::BackgroundTaskAlreadyRunning(service_id)); + let service_status_event = service_lock.info().status.as_ref(); + let attachment_result = self.on_status_change.attach(service_status_event, 2).await; + if let Err(err) = attachment_result { + return Err(StartupError::StatusAttachmentFailed(service_id.clone(), err)); } - - drop(service_lock); - let mut service_lock = service.write().await; - service_lock.info().set_status(Status::Starting).await; + service_lock.info().status.set(Status::Starting).await; self.init_service(&mut service_lock).await?; - self.start_background_task(&service_lock, Arc::clone(&service)).await; + self.start_background_task(&service_lock, Arc::clone(&service)) + .await; info!("Started service {}", service_lock.info().name); - + Ok(()) } - pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { - let service_lock = service.read().await; - - if !(self.manages_service(service_lock.info().id.as_str()).await) { - return Err(ShutdownError::ServiceNotManaged(service_lock.info().id.clone())); + //TODO: Clean up + pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { + let service_id = service.lock().await.info().id.clone(); + if !(self.manages_service(&service_id).await) { + return Err(ShutdownError::ServiceNotManaged(service_id.clone())); } - if !self.is_service_started(&service_lock).await { - return Err(ShutdownError::ServiceNotStarted(service_lock.info().id.clone())); + let mut service_lock = service.lock().await; + + let status = service_lock.info().status.get().await; + if !matches!(status, Status::Started) { + return Err(ShutdownError::ServiceNotStarted(service_id.clone())); } self.stop_background_task(&service_lock).await; - drop(service_lock); - let mut service_lock = service.write().await; + service_lock.info().status.set(Status::Stopping).await; - service_lock.info().set_status(Status::Stopping).await; self.shutdown_service(&mut service_lock).await?; + let service_status_event = service_lock.info().status.as_ref(); + let detach_result = self.on_status_change.detach(service_status_event).await; + if let Err(err) = detach_result { + return Err(ShutdownError::StatusDetachmentFailed(service_id.clone(), err)); + } + info!("Stopped service {}", service_lock.info().name); - + Ok(()) } @@ -148,7 +171,10 @@ impl ServiceManager { let mut results = Vec::new(); for service in &self.services { - results.push(self.start_service(Arc::clone(service)).await); + let service_arc_clone = Arc::clone(service); + let result = self.start_service(service_arc_clone).await; + + results.push(result); } results @@ -158,24 +184,26 @@ impl ServiceManager { let mut results = Vec::new(); for service in &self.services { - results.push(self.stop_service(Arc::clone(service)).await); + let service_arc_clone = Arc::clone(service); + let result = self.stop_service(service_arc_clone).await; + + results.push(result); } results } - pub async fn get_service(&self) -> Option>> + pub async fn get_service(&self) -> Option>> where T: Service, { for service in self.services.iter() { - let lock = service.read().await; + let lock = service.lock().await; let is_t = lock.as_any().is::(); - drop(lock); if is_t { let arc_clone = Arc::clone(service); - let service_ptr: *const Arc> = &arc_clone; + let service_ptr: *const Arc> = &arc_clone; /* I tried to do this in safe rust for 3 days, but I couldn't figure it out @@ -183,7 +211,7 @@ impl ServiceManager { Anyways, this should never cause any issues, since we checked if the service is of type T */ unsafe { - let t_ptr: *const Arc> = mem::transmute(service_ptr); + let t_ptr: *const Arc> = mem::transmute(service_ptr); return Some(Arc::clone(&*t_ptr)); } } @@ -193,13 +221,17 @@ impl ServiceManager { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { + pub fn overall_status(&self) -> LifetimedPinnedBoxedFuture<'_, OverallStatus> { Box::pin(async move { for service in self.services.iter() { - let service = service.read().await; - let status = service.info().status.read().await; + let service = service.lock().await; + + if service.info().priority != Priority::Essential { + continue; + } - if !matches!(&*status, Status::Started) { + let status = service.info().status.get().await; + if status != Status::Started { return OverallStatus::Unhealthy; } } @@ -209,107 +241,92 @@ impl ServiceManager { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn status_tree(&self) -> PinnedBoxedFuture<'_, String> { + pub fn status_overview(&self) -> LifetimedPinnedBoxedFuture<'_, String> { Box::pin(async move { let mut text_buffer = String::new(); - let mut failed_essentials = String::new(); - let mut failed_optionals = String::new(); - let mut non_failed_essentials = String::new(); - let mut non_failed_optionals = String::new(); - let mut others = String::new(); + let mut failed_essentials = Vec::new(); + let mut failed_optionals = Vec::new(); + let mut non_failed_essentials = Vec::new(); + let mut non_failed_optionals = Vec::new(); + let mut others = Vec::new(); for service in self.services.iter() { - let service = service.read().await; + let service = service.lock().await; let info = service.info(); let priority = &info.priority; - let status = info.status.read().await; + let status = info.status.get().await; - match *status { + match status { Status::Started | Status::Stopped => match priority { Priority::Essential => { - non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + non_failed_essentials.push(format!(" - {}: {}", info.name, status)); } Priority::Optional => { - non_failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + non_failed_optionals.push(format!(" - {}: {}", info.name, status)); } }, Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { match priority { Priority::Essential => { - failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + failed_essentials.push(format!(" - {}: {}", info.name, status)); } Priority::Optional => { - failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + failed_optionals.push(format!(" - {}: {}", info.name, status)); } } } _ => { - others.push_str(&format!(" - {}: {}\n", info.name, status)); + others.push(format!(" - {}: {}", info.name, status)); } } } if !failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed essential services")); - text_buffer.push_str(&failed_essentials); + text_buffer.push_str(&format!("{}:\n", "Failed essential services")); + text_buffer.push_str(failed_essentials.join("\n").as_str()); } if !failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed optional services")); - text_buffer.push_str(&failed_optionals); + text_buffer.push_str(&format!("{}:\n", "Failed optional services")); + text_buffer.push_str(failed_optionals.join("\n").as_str()); } if !non_failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Essential services")); - text_buffer.push_str(&non_failed_essentials); + text_buffer.push_str(&format!("{}:\n", "Essential services")); + text_buffer.push_str(non_failed_essentials.join("\n").as_str()); } if !non_failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Optional services")); - text_buffer.push_str(&non_failed_optionals); + text_buffer.push_str(&format!("{}:\n", "Optional services")); + text_buffer.push_str(non_failed_optionals.join("\n").as_str()); } if !others.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Other services")); - text_buffer.push_str(&others); + text_buffer.push_str(&format!("{}:\n", "Other services")); + text_buffer.push_str(others.join("\n").as_str()); } + let longest_width = text_buffer + .lines() + .map(|line| line.len()) + .max() + .unwrap_or(0); + + let mut headline = String::from("Status overview\n"); + headline.push_str("─".repeat(longest_width).as_str()); + headline.push('\n'); + text_buffer.insert_str(0, &headline); + text_buffer }) } - // Helper methods for start_service and stop_service - - async fn has_background_task_running( - &self, - service: &RwLockReadGuard<'_, dyn Service>, - ) -> bool { - let tasks = self.background_tasks.read().await; - tasks.contains_key(service.info().id.as_str()) - } - - async fn is_service_started( - &self, - service: &RwLockReadGuard<'_, dyn Service>, - ) -> bool { - let status = service.info().status.read().await; - matches!(&*status, Status::Started) - } - - async fn is_service_stopped( - &self, - service: &RwLockReadGuard<'_, dyn Service>, - ) -> bool { - let status = service.info().status.read().await; - matches!(&*status, Status::Stopped) - } - async fn init_service( &self, - service: &mut RwLockWriteGuard<'_, dyn Service>, + service: &mut MutexGuard<'_, dyn Service>, ) -> Result<(), StartupError> { - let service_manager = Arc::clone(self.arc.read().await.unwrap()); + let service_manager = Arc::clone(self.arc.lock().await.unwrap()); //TODO: Add to config instead of hardcoding duration let start = service.start(service_manager); @@ -318,17 +335,22 @@ impl ServiceManager { match timeout_result { Ok(start_result) => match start_result { Ok(()) => { - service.info().set_status(Status::Started).await; + service.info().status.set(Status::Started).await; } Err(error) => { - service.info().set_status(Status::FailedToStart(error)).await; + service + .info() + .status + .set(Status::FailedToStart(error.to_string())) + .await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } }, Err(error) => { service .info() - .set_status(Status::FailedToStart(Box::new(error))) + .status + .set(Status::FailedToStart(error.to_string())) .await; return Err(StartupError::FailedToStartService(service.info().id.clone())); } @@ -339,7 +361,7 @@ impl ServiceManager { async fn shutdown_service( &self, - service: &mut RwLockWriteGuard<'_, dyn Service>, + service: &mut MutexGuard<'_, dyn Service>, ) -> Result<(), ShutdownError> { //TODO: Add to config instead of hardcoding duration let stop = service.stop(); @@ -348,17 +370,22 @@ impl ServiceManager { match timeout_result { Ok(stop_result) => match stop_result { Ok(()) => { - service.info().set_status(Status::Stopped).await; + service.info().status.set(Status::Stopped).await; } Err(error) => { - service.info().set_status(Status::FailedToStop(error)).await; + service + .info() + .status + .set(Status::FailedToStop(error.to_string())) + .await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } }, Err(error) => { service .info() - .set_status(Status::FailedToStop(Box::new(error))) + .status + .set(Status::FailedToStop(error.to_string())) .await; return Err(ShutdownError::FailedToStopService(service.info().id.clone())); } @@ -367,19 +394,30 @@ impl ServiceManager { Ok(()) } - async fn start_background_task(&self, service_lock: &RwLockWriteGuard<'_, dyn Service>, service: Arc>) { + async fn has_background_task_registered(&self, service_id: &str) -> bool { + let tasks = self.background_tasks.lock().await; + tasks.contains_key(service_id) + } + + async fn start_background_task( + &self, + service_lock: &MutexGuard<'_, dyn Service>, + service: Arc>, + ) { + if self.has_background_task_registered(&service_lock.info().id).await { + return; + } + let task = service_lock.task(); if let Some(task) = task { let mut watchdog = Watchdog::new(task); - - watchdog.append(|result| async move { - let service = service; + watchdog.append(|result| async move { /* We technically only need a read lock here, but we want to immediately stop other services from accessing the service, so we acquire a write lock instead. */ - let service = service.write().await; + let service = service.lock().await; match result { Ok(()) => { @@ -390,7 +428,8 @@ impl ServiceManager { service .info() - .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) + .status + .set(Status::RuntimeError("Background task ended unexpectedly!".to_string())) .await; } @@ -403,8 +442,9 @@ impl ServiceManager { service .info() - .set_status(Status::RuntimeError( - format!("Background task ended with error: {}", error).into(), + .status + .set(Status::RuntimeError( + format!("Background task ended with error: {}", error), )) .await; } @@ -415,19 +455,21 @@ impl ServiceManager { let join_handle = spawn(watchdog.run()); self.background_tasks - .write() + .lock() .await .insert(service_lock.info().id.clone(), join_handle); } } - async fn stop_background_task(&self, service_lock: &RwLockReadGuard<'_, dyn Service>) { - if self.has_background_task_running(service_lock).await { - let mut tasks_lock = self.background_tasks.write().await; - let task = tasks_lock.get(service_lock.info().id.as_str()).unwrap(); - task.abort(); - tasks_lock.remove(service_lock.info().id.as_str()); + async fn stop_background_task(&self, service_lock: &MutexGuard<'_, dyn Service>) { + if !self.has_background_task_registered(&service_lock.info().id).await { + return; } + + let mut tasks_lock = self.background_tasks.lock().await; + let task = tasks_lock.get(&service_lock.info().id).unwrap(); + task.abort(); + tasks_lock.remove(&service_lock.info().id); } } @@ -442,7 +484,7 @@ impl Display for ServiceManager { let mut services = self.services.iter().peekable(); while let Some(service) = services.next() { - let service = service.blocking_read(); + let service = service.blocking_lock(); write!(f, "{} ({})", service.info().name, service.info().id)?; if services.peek().is_some() { write!(f, ", ")?; diff --git a/src/service/types.rs b/src/service/types.rs index 5ae3760..6d388de 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -2,23 +2,28 @@ use std::{error::Error, fmt::Display, future::Future, pin::Pin}; use thiserror::Error; +use crate::event::event_repeater::{AttachError, DetachError}; + pub type BoxedError = Box; -pub type BoxedFuture<'a, T> = Box + Send + 'a>; -pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; +pub type BoxedFuture = Box + Send>; +pub type BoxedFutureResult = BoxedFuture>; + +pub type PinnedBoxedFuture = Pin + Send>>; +pub type PinnedBoxedFutureResult = PinnedBoxedFuture>; -pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; -pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; +pub type LifetimedPinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type LifetimedPinnedBoxedFutureResult<'a, T> = LifetimedPinnedBoxedFuture<'a, Result>; -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum Status { Started, Stopped, Starting, Stopping, - FailedToStart(BoxedError), //TODO: Test out if it'd be better to use a String instead - FailedToStop(BoxedError), - RuntimeError(BoxedError), + FailedToStart(String), + FailedToStop(String), + RuntimeError(String), } impl Display for Status { @@ -86,10 +91,18 @@ impl Display for Priority { pub enum StartupError { #[error("Service {0} is not managed by this Service Manager")] ServiceNotManaged(String), - #[error("Service {0} already has a background task running")] - BackgroundTaskAlreadyRunning(String), + #[error("Service {0} is not stopped")] ServiceNotStopped(String), + + #[error("Service {0} already has a background task running")] + BackgroundTaskAlreadyRunning(String), + + #[error( + "Failed to attach Service Manager's status_change EventRepeater to {0}'s status_change Event: {1}" + )] + StatusAttachmentFailed(String, AttachError), + #[error("Service {0} failed to start")] FailedToStartService(String), } @@ -98,8 +111,15 @@ pub enum StartupError { pub enum ShutdownError { #[error("Service {0} is not managed by this Service Manager")] ServiceNotManaged(String), + #[error("Service {0} is not started")] ServiceNotStarted(String), + #[error("Service {0} failed to stop")] FailedToStopService(String), + + #[error( + "Failed to detach Service Manager's status_change EventRepeater from {0}'s status_change Event: {1}" + )] + StatusDetachmentFailed(String, DetachError), } diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs index 8102f98..2eb53a9 100644 --- a/src/service/watchdog.rs +++ b/src/service/watchdog.rs @@ -1,3 +1,4 @@ +use super::types::LifetimedPinnedBoxedFuture; use log::error; use serenity::FutureExt; use std::{future::Future, mem::replace, sync::Arc}; @@ -6,15 +7,14 @@ use tokio::sync::{ Mutex, }; -use super::PinnedBoxedFuture; - +//TODO: Rename to TaskChain and use Event instead of manual subscriber handling pub struct Watchdog<'a, T: Send> { - task: PinnedBoxedFuture<'a, T>, + task: LifetimedPinnedBoxedFuture<'a, T>, subscribers: Arc>>>>, } impl<'a, T: 'a + Send> Watchdog<'a, T> { - pub fn new(task: PinnedBoxedFuture<'a, T>) -> Self { + pub fn new(task: LifetimedPinnedBoxedFuture<'a, T>) -> Self { Self { task, subscribers: Arc::new(Mutex::new(Vec::new())),