diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..521c9cf --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 110 \ No newline at end of file diff --git a/src/log.rs b/src/log.rs index 43aa4e5..55c46ba 100644 --- a/src/log.rs +++ b/src/log.rs @@ -25,7 +25,7 @@ pub fn setup() -> Result<(), SetLoggerError> { fern::Dispatch::new() .format(move |out, message, record| { out.finish(format_args!( - "[{} {: <25} {: <5}] {}", + "[{} {: <30} {: <5}] {}", humantime::format_rfc3339_seconds(SystemTime::now()), record.target(), colors.color(record.level()), diff --git a/src/service.rs b/src/service.rs index 55d6721..10131cc 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,531 +1,13 @@ -use downcast_rs::{impl_downcast, DowncastSync}; -use log::{error, info, warn}; -use std::{ - any::Any, - cmp::Ordering, - error::Error, - fmt::Display, - future::Future, - hash::{Hash, Hasher}, - mem, - pin::Pin, - sync::Arc, - time::Duration, -}; -use tokio::{sync::RwLock, time::timeout}; - -use crate::setlock::SetLock; - pub mod discord; - -pub type BoxedFuture<'a, T> = Box + 'a>; -pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>>; - -pub type PinnedBoxedFuture<'a, T> = Pin + 'a>>; -pub type PinnedBoxedFutureResult<'a, T> = - PinnedBoxedFuture<'a, Result>>; - -#[derive(Debug)] -pub enum Status { - Started, - Stopped, - Starting, - Stopping, - FailedToStart(Box), //TODO: Test out if it'd be better to use a String instead - FailedToStop(Box), - RuntimeError(Box), -} - -impl Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Status::Started => write!(f, "Started"), - Status::Stopped => write!(f, "Stopped"), - Status::Starting => write!(f, "Starting"), - Status::Stopping => write!(f, "Stopping"), - Status::FailedToStart(error) => write!(f, "Failed to start: {}", error), - Status::FailedToStop(error) => write!(f, "Failed to stop: {}", error), - Status::RuntimeError(error) => write!(f, "Runtime error: {}", error), - } - } -} - -impl PartialEq for Status { - fn eq(&self, other: &Self) -> bool { - matches!( - (self, other), - (Status::Started, Status::Started) - | (Status::Stopped, Status::Stopped) - | (Status::Starting, Status::Starting) - | (Status::Stopping, Status::Stopping) - | (Status::FailedToStart(_), Status::FailedToStart(_)) - | (Status::FailedToStop(_), Status::FailedToStop(_)) - | (Status::RuntimeError(_), Status::RuntimeError(_)) - ) - } -} - -impl Eq for Status {} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -pub enum OverallStatus { - Healthy, - Unhealthy, -} - -impl Display for OverallStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - OverallStatus::Healthy => write!(f, "Healthy"), - OverallStatus::Unhealthy => write!(f, "Unhealthy"), - } - } -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -pub enum Priority { - Essential, - Optional, -} - -impl Display for Priority { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Priority::Essential => write!(f, "Essential"), - Priority::Optional => write!(f, "Optional"), - } - } -} - -#[derive(Debug)] -pub struct ServiceInfo { - id: String, - pub name: String, - pub priority: Priority, - - pub status: Arc>, -} - -impl ServiceInfo { - pub fn new(id: &str, name: &str, priority: Priority) -> Self { - Self { - id: id.to_string(), - name: name.to_string(), - priority, - status: Arc::new(RwLock::new(Status::Stopped)), - } - } - - pub async fn set_status(&self, status: Status) { - *(self.status.write().await) = status - } -} - -impl PartialEq for ServiceInfo { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for ServiceInfo {} - -impl Ord for ServiceInfo { - fn cmp(&self, other: &Self) -> Ordering { - self.name.cmp(&other.name) - } -} - -impl PartialOrd for ServiceInfo { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Hash for ServiceInfo { - fn hash(&self, state: &mut H) { - self.id.hash(state); - } -} -//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult -pub trait Service: DowncastSync { - fn info(&self) -> &ServiceInfo; - fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; - fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; - - fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { - Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) - } -} - -impl_downcast!(sync Service); - -impl Eq for dyn Service {} - -impl PartialEq for dyn Service { - fn eq(&self, other: &Self) -> bool { - self.info() == other.info() - } -} - -impl Ord for dyn Service { - fn cmp(&self, other: &Self) -> Ordering { - self.info().cmp(other.info()) - } -} - -impl PartialOrd for dyn Service { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Hash for dyn Service { - fn hash(&self, state: &mut H) { - self.info().hash(state); - } -} - -#[derive(Default)] -pub struct ServiceManagerBuilder { - services: Vec>>, -} - -impl ServiceManagerBuilder { - pub fn new() -> Self { - Self { - services: Vec::new(), - } - } - - //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub async fn with_service(mut self, service: Arc>) -> Self { - let lock = service.read().await; - - let mut found = false; - for registered_service in self.services.iter() { - let registered_service = registered_service.read().await; - - if registered_service.info().id == lock.info().id { - found = true; - } - } - - if found { - warn!( - "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", - lock.info().name, lock.info().id - ); - return self; - } - - drop(lock); - - self.services.push(service); - self - } - - pub async fn build(self) -> Arc { - let service_manager = ServiceManager { - services: self.services, - arc: RwLock::new(SetLock::new()), - }; - - let self_arc = Arc::new(service_manager); - - match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { - Ok(()) => {} - Err(err) => { - panic!( - "Failed to set ServiceManager in SetLock for self_arc: {}", - err - ); - } - } - - self_arc - } -} - -pub struct ServiceManager { - pub services: Vec>>, - arc: RwLock>>, -} - -impl ServiceManager { - pub fn builder() -> ServiceManagerBuilder { - ServiceManagerBuilder::new() - } - - pub async fn manages_service(&self, service: &Arc>) -> bool { - for registered_service in self.services.iter() { - if registered_service.read().await.info().id == service.read().await.info().id { - return true; - } - } - - false - } - - pub async fn start_service(&self, service: Arc>) { - if !self.manages_service(&service).await { - warn!( - "Tried to start service {} ({}), but it's not managed by this Service Manager. Ignoring start request.", - service.read().await.info().name, - service.read().await.info().id - ); - return; - } - - let mut service = service.write().await; - - let mut status = service.info().status.write().await; - if !matches!(&*status, Status::Stopped) { - warn!( - "Tried to start service {} while it was in state {}. Ignoring start request.", - service.info().name, - status - ); - return; - } - *status = Status::Starting; - drop(status); - - let service_manager = Arc::clone(self.arc.read().await.unwrap()); - - let start = service.start(service_manager); - - let duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(duration, start); - - match timeout_result.await { - Ok(start_result) => match start_result { - Ok(()) => { - info!("Started service: {}", service.info().name); - service.info().set_status(Status::Started).await; - } - Err(error) => { - error!("Failed to start service {}: {}", service.info().name, error); - service - .info() - .set_status(Status::FailedToStart(error)) - .await; - } - }, - Err(error) => { - error!( - "Failed to start service {}: Timeout of {} seconds reached.", - service.info().name, - duration.as_secs() - ); - service - .info() - .set_status(Status::FailedToStart(Box::new(error))) - .await; - } - } - } - - pub async fn stop_service(&self, service: Arc>) { - if !self.manages_service(&service).await { - warn!( - "Tried to stop service {} ({}), but it's not managed by this Service Manager. Ignoring stop request.", - service.read().await.info().name, - service.read().await.info().id - ); - return; - } - - let mut service = service.write().await; - - let mut status = service.info().status.write().await; - if !matches!(&*status, Status::Started) { - warn!( - "Tried to stop service {} while it was in state {}. Ignoring stop request.", - service.info().name, - status - ); - return; - } - *status = Status::Stopping; - drop(status); - - let stop = service.stop(); - - let duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(duration, stop); - - match timeout_result.await { - Ok(stop_result) => match stop_result { - Ok(()) => { - info!("Stopped service: {}", service.info().name); - service.info().set_status(Status::Stopped).await; - } - Err(error) => { - error!("Failed to stop service {}: {}", service.info().name, error); - service.info().set_status(Status::FailedToStop(error)).await; - } - }, - Err(error) => { - error!( - "Failed to stop service {}: Timeout of {} seconds reached.", - service.info().name, - duration.as_secs() - ); - service - .info() - .set_status(Status::FailedToStop(Box::new(error))) - .await; - } - } - } - - pub async fn start_services(&self) { - for service in &self.services { - self.start_service(Arc::clone(service)).await; - } - } - - pub async fn stop_services(&self) { - for service in &self.services { - self.stop_service(Arc::clone(service)).await; - } - } - - pub async fn get_service(&self) -> Option>> - where - T: Service + Any + Send + Sync + 'static, - { - for service in self.services.iter() { - let lock = service.read().await; - let is_t = lock.as_any().is::(); - drop(lock); - - if is_t { - let arc_clone = Arc::clone(service); - let service_ptr: *const Arc> = &arc_clone; - - /* - I tried to do this in safe rust for 3 days, but I couldn't figure it out - Should you come up with a way to do this in safe rust, please make a PR! :) - Anyways, this should never cause any issues, since we checked if the service is of type T - */ - unsafe { - let t_ptr: *const Arc> = mem::transmute(service_ptr); - return Some(Arc::clone(&*t_ptr)); - } - } - } - - None - } - - //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { - Box::pin(async move { - for service in self.services.iter() { - let service = service.read().await; - let status = service.info().status.read().await; - - if !matches!(&*status, Status::Started) { - return OverallStatus::Unhealthy; - } - } - - OverallStatus::Healthy - }) - } - - //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn status_tree(&self) -> PinnedBoxedFuture<'_, String> { - Box::pin(async move { - let mut text_buffer = String::new(); - - let mut failed_essentials = String::new(); - let mut failed_optionals = String::new(); - let mut non_failed_essentials = String::new(); - let mut non_failed_optionals = String::new(); - let mut others = String::new(); - - for service in self.services.iter() { - let service = service.read().await; - let info = service.info(); - let priority = &info.priority; - let status = info.status.read().await; - - match *status { - Status::Started | Status::Stopped => match priority { - Priority::Essential => { - non_failed_essentials - .push_str(&format!(" - {}: {}\n", info.name, status)); - } - Priority::Optional => { - non_failed_optionals - .push_str(&format!(" - {}: {}\n", info.name, status)); - } - }, - Status::FailedToStart(_) - | Status::FailedToStop(_) - | Status::RuntimeError(_) => match priority { - Priority::Essential => { - failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); - } - Priority::Optional => { - failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); - } - }, - _ => { - others.push_str(&format!(" - {}: {}\n", info.name, status)); - } - } - } - - if !failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed essential services")); - text_buffer.push_str(&failed_essentials); - } - - if !failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed optional services")); - text_buffer.push_str(&failed_optionals); - } - - if !non_failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Essential services")); - text_buffer.push_str(&non_failed_essentials); - } - - if !non_failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Optional services")); - text_buffer.push_str(&non_failed_optionals); - } - - if !others.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Other services")); - text_buffer.push_str(&others); - } - - text_buffer - }) - } -} - -impl Display for ServiceManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Services: ")?; - - if self.services.is_empty() { - write!(f, "None")?; - return Ok(()); - } - - let mut services = self.services.iter().peekable(); - while let Some(service) = services.next() { - let service = service.blocking_read(); - write!(f, "{} ({})", service.info().name, service.info().id)?; - if services.peek().is_some() { - write!(f, ", ")?; - } - } - Ok(()) - } -} +pub mod service; +pub mod service_manager; +pub mod types; +pub mod watchdog; + +pub use service::{Service, ServiceInfo}; +pub use service_manager::{ServiceManager, ServiceManagerBuilder}; +pub use types::{ + BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, + Priority, StartupError, Status, +}; +pub use watchdog::Watchdog; diff --git a/src/service/discord.rs b/src/service/discord.rs index 8a84419..d6f31ff 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -57,11 +57,11 @@ impl Service for DiscordService { fn start(&mut self, _service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()> { Box::pin(async move { + let client_ready_notify = Arc::new(Notify::new()); + let framework = StandardFramework::new(); framework.configure(Configuration::new().prefix("!")); - let client_ready_notify = Arc::new(Notify::new()); - let mut client = Client::builder(self.discord_token.as_str(), GatewayIntents::all()) .framework(framework) .event_handler(EventHandler::new( @@ -124,10 +124,27 @@ impl Service for DiscordService { result?; } - info!("Discord client stopped"); Ok(()) }) } + + fn task<'a>(&self) -> Option> { + Some(Box::pin(async move { + let mut i = 0; + loop { + sleep(Duration::from_secs(1)).await; + if i < 5 { + i += 1; + info!("Wohoo!"); + } else { + info!("Bye!"); + break; + } + } + + Err("Sheesh".into()) + })) + } } // If the thread ended WITHOUT a JoinError from aborting, the client already stopped unexpectedly @@ -145,10 +162,7 @@ struct EventHandler { impl EventHandler { pub fn new(client: Arc>>, ready_notify: Arc) -> Self { - Self { - client, - ready_notify, - } + Self { client, ready_notify } } } diff --git a/src/service/service.rs b/src/service/service.rs new file mode 100644 index 0000000..185f06a --- /dev/null +++ b/src/service/service.rs @@ -0,0 +1,105 @@ +use std::{ + cmp::Ordering, + hash::{Hash, Hasher}, + sync::Arc, +}; + +use downcast_rs::{impl_downcast, DowncastSync}; +use tokio::sync::RwLock; + +use super::{ + service_manager::ServiceManager, + types::{PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, Status}, +}; + +#[derive(Debug)] +pub struct ServiceInfo { + pub id: String, + pub name: String, + pub priority: Priority, + + pub status: Arc>, +} + +impl ServiceInfo { + pub fn new(id: &str, name: &str, priority: Priority) -> Self { + Self { + id: id.to_string(), + name: name.to_string(), + priority, + status: Arc::new(RwLock::new(Status::Stopped)), + } + } + + pub async fn set_status(&self, status: Status) { + let mut lock = self.status.write().await; + *(lock) = status + } +} + +impl PartialEq for ServiceInfo { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for ServiceInfo {} + +impl Ord for ServiceInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.name.cmp(&other.name) + } +} + +impl PartialOrd for ServiceInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Hash for ServiceInfo { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} +//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult +pub trait Service: DowncastSync { + fn info(&self) -> &ServiceInfo; + fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; + fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; + fn task<'a>(&self) -> Option> { + None + } + + fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { + Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) + } +} + +impl_downcast!(sync Service); + +impl Eq for dyn Service {} + +impl PartialEq for dyn Service { + fn eq(&self, other: &Self) -> bool { + self.info() == other.info() + } +} + +impl Ord for dyn Service { + fn cmp(&self, other: &Self) -> Ordering { + self.info().cmp(other.info()) + } +} + +impl PartialOrd for dyn Service { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Hash for dyn Service { + fn hash(&self, state: &mut H) { + self.info().hash(state); + } +} diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs new file mode 100644 index 0000000..91fb842 --- /dev/null +++ b/src/service/service_manager.rs @@ -0,0 +1,453 @@ +use crate::{service::Watchdog, setlock::SetLock}; +use log::{error, info, warn}; +use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; +use tokio::{ + spawn, + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + task::JoinHandle, + time::timeout, +}; +use super::{ + service::Service, + types::{OverallStatus, PinnedBoxedFuture, Priority, ShutdownError, StartupError, Status}, +}; + +#[derive(Default)] +pub struct ServiceManagerBuilder { + services: Vec>>, +} + +impl ServiceManagerBuilder { + pub fn new() -> Self { + Self { services: Vec::new() } + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub async fn with_service(mut self, service: Arc>) -> Self { + let lock = service.read().await; + + let mut found = false; + for registered_service in self.services.iter() { + let registered_service = registered_service.read().await; + + if registered_service.info().id == lock.info().id { + found = true; + } + } + + if found { + warn!( + "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", + lock.info().name, + lock.info().id + ); + return self; + } + + drop(lock); + + self.services.push(service); + self + } + + pub async fn build(self) -> Arc { + let service_manager = ServiceManager { + arc: RwLock::new(SetLock::new()), + services: self.services, + background_tasks: RwLock::new(HashMap::new()), + }; + + let self_arc = Arc::new(service_manager); + + match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { + Ok(()) => {} + Err(err) => { + panic!("Failed to set ServiceManager in SetLock for self_arc: {}", err); + } + } + + self_arc + } +} + +pub struct ServiceManager { + arc: RwLock>>, + services: Vec>>, + background_tasks: RwLock>>, +} + +impl ServiceManager { + pub fn builder() -> ServiceManagerBuilder { + ServiceManagerBuilder::new() + } + + pub async fn manages_service(&self, service_id: &str) -> bool { + for registered_service in self.services.iter() { + if registered_service.read().await.info().id == service_id { + return true; + } + } + + false + } + + pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { + let service_lock = service.read().await; + + let service_id = service_lock.info().id.clone(); + + if !self.manages_service(&service_id).await { + return Err(StartupError::ServiceNotManaged(service_id)); + } + + if !self.is_service_stopped(&service_lock).await { + return Err(StartupError::ServiceNotStopped(service_id)); + } + + if self.has_background_task_running(&service_lock).await { + return Err(StartupError::BackgroundTaskAlreadyRunning(service_id)); + } + + drop(service_lock); + let mut service_lock = service.write().await; + + service_lock.info().set_status(Status::Starting).await; + self.init_service(&mut service_lock).await?; + self.start_background_task(&service_lock, Arc::clone(&service)).await; + + info!("Started service {}", service_lock.info().name); + + Ok(()) + } + + pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { + let service_lock = service.read().await; + + if !(self.manages_service(service_lock.info().id.as_str()).await) { + return Err(ShutdownError::ServiceNotManaged(service_lock.info().id.clone())); + } + + if !self.is_service_started(&service_lock).await { + return Err(ShutdownError::ServiceNotStarted(service_lock.info().id.clone())); + } + + self.stop_background_task(&service_lock).await; + + drop(service_lock); + let mut service_lock = service.write().await; + + service_lock.info().set_status(Status::Stopping).await; + self.shutdown_service(&mut service_lock).await?; + + info!("Stopped service {}", service_lock.info().name); + + Ok(()) + } + + pub async fn start_services(&self) -> Vec> { + let mut results = Vec::new(); + + for service in &self.services { + results.push(self.start_service(Arc::clone(service)).await); + } + + results + } + + pub async fn stop_services(&self) -> Vec> { + let mut results = Vec::new(); + + for service in &self.services { + results.push(self.stop_service(Arc::clone(service)).await); + } + + results + } + + pub async fn get_service(&self) -> Option>> + where + T: Service, + { + for service in self.services.iter() { + let lock = service.read().await; + let is_t = lock.as_any().is::(); + drop(lock); + + if is_t { + let arc_clone = Arc::clone(service); + let service_ptr: *const Arc> = &arc_clone; + + /* + I tried to do this in safe rust for 3 days, but I couldn't figure it out + Should you come up with a way to do this in safe rust, please make a PR! :) + Anyways, this should never cause any issues, since we checked if the service is of type T + */ + unsafe { + let t_ptr: *const Arc> = mem::transmute(service_ptr); + return Some(Arc::clone(&*t_ptr)); + } + } + } + + None + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { + Box::pin(async move { + for service in self.services.iter() { + let service = service.read().await; + let status = service.info().status.read().await; + + if !matches!(&*status, Status::Started) { + return OverallStatus::Unhealthy; + } + } + + OverallStatus::Healthy + }) + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub fn status_tree(&self) -> PinnedBoxedFuture<'_, String> { + Box::pin(async move { + let mut text_buffer = String::new(); + + let mut failed_essentials = String::new(); + let mut failed_optionals = String::new(); + let mut non_failed_essentials = String::new(); + let mut non_failed_optionals = String::new(); + let mut others = String::new(); + + for service in self.services.iter() { + let service = service.read().await; + let info = service.info(); + let priority = &info.priority; + let status = info.status.read().await; + + match *status { + Status::Started | Status::Stopped => match priority { + Priority::Essential => { + non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + } + Priority::Optional => { + non_failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + } + }, + Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { + match priority { + Priority::Essential => { + failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + } + Priority::Optional => { + failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + } + } + } + _ => { + others.push_str(&format!(" - {}: {}\n", info.name, status)); + } + } + } + + if !failed_essentials.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Failed essential services")); + text_buffer.push_str(&failed_essentials); + } + + if !failed_optionals.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Failed optional services")); + text_buffer.push_str(&failed_optionals); + } + + if !non_failed_essentials.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Essential services")); + text_buffer.push_str(&non_failed_essentials); + } + + if !non_failed_optionals.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Optional services")); + text_buffer.push_str(&non_failed_optionals); + } + + if !others.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Other services")); + text_buffer.push_str(&others); + } + + text_buffer + }) + } + + // Helper methods for start_service and stop_service + + async fn has_background_task_running( + &self, + service: &RwLockReadGuard<'_, dyn Service>, + ) -> bool { + let tasks = self.background_tasks.read().await; + tasks.contains_key(service.info().id.as_str()) + } + + async fn is_service_started( + &self, + service: &RwLockReadGuard<'_, dyn Service>, + ) -> bool { + let status = service.info().status.read().await; + matches!(&*status, Status::Started) + } + + async fn is_service_stopped( + &self, + service: &RwLockReadGuard<'_, dyn Service>, + ) -> bool { + let status = service.info().status.read().await; + matches!(&*status, Status::Stopped) + } + + async fn init_service( + &self, + service: &mut RwLockWriteGuard<'_, dyn Service>, + ) -> Result<(), StartupError> { + let service_manager = Arc::clone(self.arc.read().await.unwrap()); + + //TODO: Add to config instead of hardcoding duration + let start = service.start(service_manager); + let timeout_result = timeout(Duration::from_secs(10), start).await; + + match timeout_result { + Ok(start_result) => match start_result { + Ok(()) => { + service.info().set_status(Status::Started).await; + } + Err(error) => { + service.info().set_status(Status::FailedToStart(error)).await; + return Err(StartupError::FailedToStartService(service.info().id.clone())); + } + }, + Err(error) => { + service + .info() + .set_status(Status::FailedToStart(Box::new(error))) + .await; + return Err(StartupError::FailedToStartService(service.info().id.clone())); + } + } + + Ok(()) + } + + async fn shutdown_service( + &self, + service: &mut RwLockWriteGuard<'_, dyn Service>, + ) -> Result<(), ShutdownError> { + //TODO: Add to config instead of hardcoding duration + let stop = service.stop(); + let timeout_result = timeout(Duration::from_secs(10), stop).await; + + match timeout_result { + Ok(stop_result) => match stop_result { + Ok(()) => { + service.info().set_status(Status::Stopped).await; + } + Err(error) => { + service.info().set_status(Status::FailedToStop(error)).await; + return Err(ShutdownError::FailedToStopService(service.info().id.clone())); + } + }, + Err(error) => { + service + .info() + .set_status(Status::FailedToStop(Box::new(error))) + .await; + return Err(ShutdownError::FailedToStopService(service.info().id.clone())); + } + } + + Ok(()) + } + + async fn start_background_task(&self, service_lock: &RwLockWriteGuard<'_, dyn Service>, service: Arc>) { + let task = service_lock.task(); + if let Some(task) = task { + let mut watchdog = Watchdog::new(task); + + watchdog.append(|result| async move { + let service = service; + + /* + We technically only need a read lock here, but we want to immediately stop + other services from accessing the service, so we acquire a write lock instead. + */ + let service = service.write().await; + + match result { + Ok(()) => { + error!( + "Background task of service {} ended unexpectedly! Service will be marked as failed.", + service.info().name + ); + + service + .info() + .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) + .await; + } + + Err(error) => { + error!( + "Background task of service {} ended with error: {}. Service will be marked as failed.", + service.info().name, + error + ); + + service + .info() + .set_status(Status::RuntimeError( + format!("Background task ended with error: {}", error).into(), + )) + .await; + } + } + Ok(()) + }); + + let join_handle = spawn(watchdog.run()); + + self.background_tasks + .write() + .await + .insert(service_lock.info().id.clone(), join_handle); + } + } + + async fn stop_background_task(&self, service_lock: &RwLockReadGuard<'_, dyn Service>) { + if self.has_background_task_running(service_lock).await { + let mut tasks_lock = self.background_tasks.write().await; + let task = tasks_lock.get(service_lock.info().id.as_str()).unwrap(); + task.abort(); + tasks_lock.remove(service_lock.info().id.as_str()); + } + } +} + +impl Display for ServiceManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Services: ")?; + + if self.services.is_empty() { + write!(f, "None")?; + return Ok(()); + } + + let mut services = self.services.iter().peekable(); + while let Some(service) = services.next() { + let service = service.blocking_read(); + write!(f, "{} ({})", service.info().name, service.info().id)?; + if services.peek().is_some() { + write!(f, ", ")?; + } + } + Ok(()) + } +} diff --git a/src/service/types.rs b/src/service/types.rs new file mode 100644 index 0000000..5ae3760 --- /dev/null +++ b/src/service/types.rs @@ -0,0 +1,105 @@ +use std::{error::Error, fmt::Display, future::Future, pin::Pin}; + +use thiserror::Error; + +pub type BoxedError = Box; + +pub type BoxedFuture<'a, T> = Box + Send + 'a>; +pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; + +pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; + +#[derive(Debug)] +pub enum Status { + Started, + Stopped, + Starting, + Stopping, + FailedToStart(BoxedError), //TODO: Test out if it'd be better to use a String instead + FailedToStop(BoxedError), + RuntimeError(BoxedError), +} + +impl Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Status::Started => write!(f, "Started"), + Status::Stopped => write!(f, "Stopped"), + Status::Starting => write!(f, "Starting"), + Status::Stopping => write!(f, "Stopping"), + Status::FailedToStart(error) => write!(f, "Failed to start: {}", error), + Status::FailedToStop(error) => write!(f, "Failed to stop: {}", error), + Status::RuntimeError(error) => write!(f, "Runtime error: {}", error), + } + } +} + +impl PartialEq for Status { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (Status::Started, Status::Started) + | (Status::Stopped, Status::Stopped) + | (Status::Starting, Status::Starting) + | (Status::Stopping, Status::Stopping) + | (Status::FailedToStart(_), Status::FailedToStart(_)) + | (Status::FailedToStop(_), Status::FailedToStop(_)) + | (Status::RuntimeError(_), Status::RuntimeError(_)) + ) + } +} + +impl Eq for Status {} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum OverallStatus { + Healthy, + Unhealthy, +} + +impl Display for OverallStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OverallStatus::Healthy => write!(f, "Healthy"), + OverallStatus::Unhealthy => write!(f, "Unhealthy"), + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum Priority { + Essential, + Optional, +} + +impl Display for Priority { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Priority::Essential => write!(f, "Essential"), + Priority::Optional => write!(f, "Optional"), + } + } +} + +#[derive(Debug, Error)] +pub enum StartupError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + #[error("Service {0} already has a background task running")] + BackgroundTaskAlreadyRunning(String), + #[error("Service {0} is not stopped")] + ServiceNotStopped(String), + #[error("Service {0} failed to start")] + FailedToStartService(String), +} + +#[derive(Debug, Error)] +pub enum ShutdownError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + #[error("Service {0} is not started")] + ServiceNotStarted(String), + #[error("Service {0} failed to stop")] + FailedToStopService(String), +} diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs new file mode 100644 index 0000000..8102f98 --- /dev/null +++ b/src/service/watchdog.rs @@ -0,0 +1,59 @@ +use log::error; +use serenity::FutureExt; +use std::{future::Future, mem::replace, sync::Arc}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, +}; + +use super::PinnedBoxedFuture; + +pub struct Watchdog<'a, T: Send> { + task: PinnedBoxedFuture<'a, T>, + subscribers: Arc>>>>, +} + +impl<'a, T: 'a + Send> Watchdog<'a, T> { + pub fn new(task: PinnedBoxedFuture<'a, T>) -> Self { + Self { + task, + subscribers: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn append(&mut self, task: FN) + where + FN: FnOnce(T) -> FUT + Send + 'a, + FUT: Future + Send + 'a, + { + let previous_task = replace( + &mut self.task, + Box::pin(async { unreachable!("Undefined watchdog task") }), + ); + + let task = previous_task.then(task); + + self.task = Box::pin(task); + } + + pub async fn subscribe(&self) -> Receiver> { + let (tx, rx) = channel(1); + self.subscribers.lock().await.push(tx); + rx + } + + pub async fn run(self) { + let result = self.task.await; + let result = Arc::new(result); + for subscriber in self.subscribers.lock().await.iter() { + let send_result = subscriber.send(Arc::clone(&result)).await; + + if let Err(e) = send_result { + error!( + "Failed to send a watchdog task result to one of its subscribers: {}", + e + ); + } + } + } +}