From 2345f9749d27c969d845b1743385fadeec9b5f3d Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Thu, 18 Jan 2024 21:40:47 +0100 Subject: [PATCH 1/9] Service framework background task - Implement an optional background task for services that starts automatically on service start. - Implement a watchdog that updates the service's status when its task fails at runtime --- src/service.rs | 158 +++++++++++++++++++++++++++++++---------- src/service/discord.rs | 22 +++++- 2 files changed, 142 insertions(+), 38 deletions(-) diff --git a/src/service.rs b/src/service.rs index 55d6721..516031d 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,8 +1,9 @@ use downcast_rs::{impl_downcast, DowncastSync}; use log::{error, info, warn}; +use serenity::FutureExt; use std::{ - any::Any, cmp::Ordering, + collections::HashMap, error::Error, fmt::Display, future::Future, @@ -12,18 +13,19 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::{sync::RwLock, time::timeout}; +use tokio::{spawn, sync::RwLock, task::JoinHandle, time::timeout}; use crate::setlock::SetLock; pub mod discord; -pub type BoxedFuture<'a, T> = Box + 'a>; -pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>>; +pub type BoxedError = Box; -pub type PinnedBoxedFuture<'a, T> = Pin + 'a>>; -pub type PinnedBoxedFutureResult<'a, T> = - PinnedBoxedFuture<'a, Result>>; +pub type BoxedFuture<'a, T> = Box + Send + 'a>; +pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; + +pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; #[derive(Debug)] pub enum Status { @@ -31,9 +33,9 @@ pub enum Status { Stopped, Starting, Stopping, - FailedToStart(Box), //TODO: Test out if it'd be better to use a String instead - FailedToStop(Box), - RuntimeError(Box), + FailedToStart(BoxedError), //TODO: Test out if it'd be better to use a String instead + FailedToStop(BoxedError), + RuntimeError(BoxedError), } impl Display for Status { @@ -151,6 +153,9 @@ pub trait Service: DowncastSync { fn info(&self) -> &ServiceInfo; fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; + fn task<'a>(&self) -> Option> { + None + } fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) @@ -228,6 +233,7 @@ impl ServiceManagerBuilder { let service_manager = ServiceManager { services: self.services, arc: RwLock::new(SetLock::new()), + tasks: RwLock::new(HashMap::new()), }; let self_arc = Arc::new(service_manager); @@ -247,8 +253,9 @@ impl ServiceManagerBuilder { } pub struct ServiceManager { - pub services: Vec>>, + services: Vec>>, arc: RwLock>>, + tasks: RwLock>>, } impl ServiceManager { @@ -256,7 +263,7 @@ impl ServiceManager { ServiceManagerBuilder::new() } - pub async fn manages_service(&self, service: &Arc>) -> bool { + pub async fn manages_service(&self, service: Arc>) -> bool { for registered_service in self.services.iter() { if registered_service.read().await.info().id == service.read().await.info().id { return true; @@ -267,45 +274,70 @@ impl ServiceManager { } pub async fn start_service(&self, service: Arc>) { - if !self.manages_service(&service).await { + let service_lock = service.read().await; + + // Check if the service is managed by this Service Manager + if !self.manages_service(Arc::clone(&service)).await { warn!( "Tried to start service {} ({}), but it's not managed by this Service Manager. Ignoring start request.", - service.read().await.info().name, - service.read().await.info().id + service_lock.info().name, + service_lock.info().id ); return; } - let mut service = service.write().await; + // Check if the service already has a background task running + if self + .tasks + .read() + .await + .contains_key(service_lock.info().id.as_str()) + { + warn!( + "Tried to start service {} ({}), which already has a background task running. Ignoring start request.", + service_lock.info().name, + service_lock.info().id + ); + return; + } - let mut status = service.info().status.write().await; + // Upgrade the read lock to a write lock + drop(service_lock); + let mut service_lock = service.write().await; + + // Check if the service is already running and cancel the start request if it is + let mut status = service_lock.info().status.write().await; if !matches!(&*status, Status::Stopped) { warn!( "Tried to start service {} while it was in state {}. Ignoring start request.", - service.info().name, + service_lock.info().name, status ); return; } + + // Set the status to Starting *status = Status::Starting; drop(status); + // Start the service let service_manager = Arc::clone(self.arc.read().await.unwrap()); - - let start = service.start(service_manager); - - let duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(duration, start); - - match timeout_result.await { + let start = service_lock.start(service_manager); + let timeout_duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding + let timeout_result = timeout(timeout_duration, start).await; + match timeout_result { Ok(start_result) => match start_result { Ok(()) => { - info!("Started service: {}", service.info().name); - service.info().set_status(Status::Started).await; + info!("Started service: {}", service_lock.info().name); + service_lock.info().set_status(Status::Started).await; } Err(error) => { - error!("Failed to start service {}: {}", service.info().name, error); - service + error!( + "Failed to start service {}: {}", + service_lock.info().name, + error + ); + service_lock .info() .set_status(Status::FailedToStart(error)) .await; @@ -314,23 +346,77 @@ impl ServiceManager { Err(error) => { error!( "Failed to start service {}: Timeout of {} seconds reached.", - service.info().name, - duration.as_secs() + service_lock.info().name, + timeout_duration.as_secs() ); - service + service_lock .info() .set_status(Status::FailedToStart(Box::new(error))) .await; } } + + // Start the background task if one is defined + let task = service_lock.task(); + if let Some(task) = task { + drop(service_lock); + + let service_clone = Arc::clone(&service); + let task_with_watchdog = task.then(|result| async move { + let service = service_clone; + + /* + We technically only need a read lock here, but we want to immediately stop other + services from accessing the service, so we acquire a write lock instead. + */ + let service = service.write().await; + + match result { + Ok(()) => { + error!("Background task of service {} ended unexpectedly! Service will be marked as failed.", + service.info().name + ); + service + .info() + .set_status(Status::RuntimeError( + "Background task ended unexpectedly!".into(), + )) + .await; + } + Err(error) => { + error!( + "Background task of service {} ended with error: {}! Service will be marked as failed.", + service.info().name, error + ); + service + .info() + .set_status(Status::RuntimeError( + format!("Background task ended with error: {}", error).into(), + )) + .await; + } + } + }); + + let join_handle = spawn(task_with_watchdog); + self.tasks + .write() + .await + .insert(service.read().await.info().id.clone(), join_handle); + info!( + "Started background task for service {}", + service.read().await.info().name + ); + } } pub async fn stop_service(&self, service: Arc>) { - if !self.manages_service(&service).await { + if !self.manages_service(Arc::clone(&service)).await { + let service = service.read().await; warn!( "Tried to stop service {} ({}), but it's not managed by this Service Manager. Ignoring stop request.", - service.read().await.info().name, - service.read().await.info().id + service.info().name, + service.info().id ); return; } @@ -393,7 +479,7 @@ impl ServiceManager { pub async fn get_service(&self) -> Option>> where - T: Service + Any + Send + Sync + 'static, + T: Service, { for service in self.services.iter() { let lock = service.read().await; diff --git a/src/service/discord.rs b/src/service/discord.rs index 8a84419..c5a44fb 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -1,6 +1,6 @@ use crate::setlock::SetLock; -use super::{PinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; +use super::{BoxedError, PinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; use log::{error, info}; use serenity::{ all::{GatewayIntents, Ready}, @@ -12,7 +12,7 @@ use serenity::{ prelude::TypeMap, Client, Error, }; -use std::{sync::Arc, time::Duration}; +use std::{any::Any, future::Future, sync::Arc, time::Duration}; use tokio::{ select, spawn, sync::{Mutex, Notify, RwLock}, @@ -128,6 +128,24 @@ impl Service for DiscordService { Ok(()) }) } + + fn task<'a>(&self) -> Option> { + Some(Box::pin(async move { + let mut i = 0; + loop { + sleep(Duration::from_secs(1)).await; + if i < 5 { + i += 1; + info!("Wohoo!"); + } else { + info!("Bye!"); + break; + } + } + + Err("Sheesh".into()) + })) + } } // If the thread ended WITHOUT a JoinError from aborting, the client already stopped unexpectedly From 6dc6c9a696f15f3eace389bc0fcd408a462e4e46 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 19 Jan 2024 23:00:49 +0100 Subject: [PATCH 2/9] WIP: Partially implemented - Background task startup + watchdog implemented for service startup - Implementation for service shutdown missing - Handling of watchdog triggers of essential tasks missing --- rustfmt.toml | 1 + src/service.rs | 126 +++++++++++++++++++++++-------------------------- 2 files changed, 60 insertions(+), 67 deletions(-) create mode 100644 rustfmt.toml diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..521c9cf --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 110 \ No newline at end of file diff --git a/src/service.rs b/src/service.rs index 516031d..31b94b0 100644 --- a/src/service.rs +++ b/src/service.rs @@ -13,6 +13,7 @@ use std::{ sync::Arc, time::Duration, }; +use thiserror::Error; use tokio::{spawn, sync::RwLock, task::JoinHandle, time::timeout}; use crate::setlock::SetLock; @@ -197,9 +198,7 @@ pub struct ServiceManagerBuilder { impl ServiceManagerBuilder { pub fn new() -> Self { - Self { - services: Vec::new(), - } + Self { services: Vec::new() } } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop @@ -217,9 +216,10 @@ impl ServiceManagerBuilder { if found { warn!( - "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", - lock.info().name, lock.info().id - ); + "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", + lock.info().name, + lock.info().id + ); return self; } @@ -241,10 +241,7 @@ impl ServiceManagerBuilder { match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { Ok(()) => {} Err(err) => { - panic!( - "Failed to set ServiceManager in SetLock for self_arc: {}", - err - ); + panic!("Failed to set ServiceManager in SetLock for self_arc: {}", err); } } @@ -252,6 +249,18 @@ impl ServiceManagerBuilder { } } +#[derive(Debug, Error)] +pub enum StartupError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + #[error("Service {0} already has a background task running")] + BackgroundTaskAlreadyRunning(String), + #[error("Service {0} is already running")] + ServiceAlreadyRunning(String), + #[error("Failed to start service {0}")] + FailedToStartService(String), +} + pub struct ServiceManager { services: Vec>>, arc: RwLock>>, @@ -264,8 +273,9 @@ impl ServiceManager { } pub async fn manages_service(&self, service: Arc>) -> bool { + let service_id = service.read().await.info().id.clone(); for registered_service in self.services.iter() { - if registered_service.read().await.info().id == service.read().await.info().id { + if registered_service.read().await.info().id == service_id { return true; } } @@ -273,17 +283,12 @@ impl ServiceManager { false } - pub async fn start_service(&self, service: Arc>) { + pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { let service_lock = service.read().await; // Check if the service is managed by this Service Manager if !self.manages_service(Arc::clone(&service)).await { - warn!( - "Tried to start service {} ({}), but it's not managed by this Service Manager. Ignoring start request.", - service_lock.info().name, - service_lock.info().id - ); - return; + return Err(StartupError::ServiceNotManaged(service_lock.info().id.clone())); } // Check if the service already has a background task running @@ -293,12 +298,9 @@ impl ServiceManager { .await .contains_key(service_lock.info().id.as_str()) { - warn!( - "Tried to start service {} ({}), which already has a background task running. Ignoring start request.", - service_lock.info().name, - service_lock.info().id - ); - return; + return Err(StartupError::BackgroundTaskAlreadyRunning( + service_lock.info().id.clone(), + )); } // Upgrade the read lock to a write lock @@ -308,12 +310,9 @@ impl ServiceManager { // Check if the service is already running and cancel the start request if it is let mut status = service_lock.info().status.write().await; if !matches!(&*status, Status::Stopped) { - warn!( - "Tried to start service {} while it was in state {}. Ignoring start request.", - service_lock.info().name, - status - ); - return; + return Err(StartupError::ServiceAlreadyRunning( + service_lock.info().id.clone(), + )); } // Set the status to Starting @@ -328,39 +327,27 @@ impl ServiceManager { match timeout_result { Ok(start_result) => match start_result { Ok(()) => { - info!("Started service: {}", service_lock.info().name); service_lock.info().set_status(Status::Started).await; } Err(error) => { - error!( - "Failed to start service {}: {}", - service_lock.info().name, - error - ); - service_lock - .info() - .set_status(Status::FailedToStart(error)) - .await; + service_lock.info().set_status(Status::FailedToStart(error)).await; + return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); } }, Err(error) => { - error!( - "Failed to start service {}: Timeout of {} seconds reached.", - service_lock.info().name, - timeout_duration.as_secs() - ); service_lock .info() .set_status(Status::FailedToStart(Box::new(error))) .await; + return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); } } // Start the background task if one is defined let task = service_lock.task(); - if let Some(task) = task { - drop(service_lock); + drop(service_lock); + if let Some(task) = task { let service_clone = Arc::clone(&service); let task_with_watchdog = task.then(|result| async move { let service = service_clone; @@ -371,22 +358,23 @@ impl ServiceManager { */ let service = service.write().await; + //TODO: Better handling of this. For example, send a message to a channel and let ServiceManager know. match result { Ok(()) => { - error!("Background task of service {} ended unexpectedly! Service will be marked as failed.", + error!( + "Background task of service {} ended unexpectedly! Service will be marked as failed.", service.info().name ); service .info() - .set_status(Status::RuntimeError( - "Background task ended unexpectedly!".into(), - )) + .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) .await; } Err(error) => { error!( "Background task of service {} ended with error: {}! Service will be marked as failed.", - service.info().name, error + service.info().name, + error ); service .info() @@ -408,6 +396,8 @@ impl ServiceManager { service.read().await.info().name ); } + + Ok(()) } pub async fn stop_service(&self, service: Arc>) { @@ -465,10 +455,14 @@ impl ServiceManager { } } - pub async fn start_services(&self) { + pub async fn start_services(&self) -> Vec> { + let mut results = Vec::new(); + for service in &self.services { - self.start_service(Arc::clone(service)).await; + results.push(self.start_service(Arc::clone(service)).await); } + + results } pub async fn stop_services(&self) { @@ -541,24 +535,22 @@ impl ServiceManager { match *status { Status::Started | Status::Stopped => match priority { Priority::Essential => { - non_failed_essentials - .push_str(&format!(" - {}: {}\n", info.name, status)); + non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); } Priority::Optional => { - non_failed_optionals - .push_str(&format!(" - {}: {}\n", info.name, status)); + non_failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); } }, - Status::FailedToStart(_) - | Status::FailedToStop(_) - | Status::RuntimeError(_) => match priority { - Priority::Essential => { - failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { + match priority { + Priority::Essential => { + failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + } + Priority::Optional => { + failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + } } - Priority::Optional => { - failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); - } - }, + } _ => { others.push_str(&format!(" - {}: {}\n", info.name, status)); } From 78ae5574230814abe8c89d7b7012f03bcffefa20 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 20 Jan 2024 17:45:34 +0100 Subject: [PATCH 3/9] Refactor of service framework Split service framework into multiple submodules --- src/service.rs | 618 +-------------------------------- src/service/service.rs | 104 ++++++ src/service/service_manager.rs | 415 ++++++++++++++++++++++ src/service/types.rs | 95 +++++ 4 files changed, 624 insertions(+), 608 deletions(-) create mode 100644 src/service/service.rs create mode 100644 src/service/service_manager.rs create mode 100644 src/service/types.rs diff --git a/src/service.rs b/src/service.rs index 31b94b0..ff1ee87 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,609 +1,11 @@ -use downcast_rs::{impl_downcast, DowncastSync}; -use log::{error, info, warn}; -use serenity::FutureExt; -use std::{ - cmp::Ordering, - collections::HashMap, - error::Error, - fmt::Display, - future::Future, - hash::{Hash, Hasher}, - mem, - pin::Pin, - sync::Arc, - time::Duration, -}; -use thiserror::Error; -use tokio::{spawn, sync::RwLock, task::JoinHandle, time::timeout}; - -use crate::setlock::SetLock; - pub mod discord; - -pub type BoxedError = Box; - -pub type BoxedFuture<'a, T> = Box + Send + 'a>; -pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; - -pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; -pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; - -#[derive(Debug)] -pub enum Status { - Started, - Stopped, - Starting, - Stopping, - FailedToStart(BoxedError), //TODO: Test out if it'd be better to use a String instead - FailedToStop(BoxedError), - RuntimeError(BoxedError), -} - -impl Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Status::Started => write!(f, "Started"), - Status::Stopped => write!(f, "Stopped"), - Status::Starting => write!(f, "Starting"), - Status::Stopping => write!(f, "Stopping"), - Status::FailedToStart(error) => write!(f, "Failed to start: {}", error), - Status::FailedToStop(error) => write!(f, "Failed to stop: {}", error), - Status::RuntimeError(error) => write!(f, "Runtime error: {}", error), - } - } -} - -impl PartialEq for Status { - fn eq(&self, other: &Self) -> bool { - matches!( - (self, other), - (Status::Started, Status::Started) - | (Status::Stopped, Status::Stopped) - | (Status::Starting, Status::Starting) - | (Status::Stopping, Status::Stopping) - | (Status::FailedToStart(_), Status::FailedToStart(_)) - | (Status::FailedToStop(_), Status::FailedToStop(_)) - | (Status::RuntimeError(_), Status::RuntimeError(_)) - ) - } -} - -impl Eq for Status {} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -pub enum OverallStatus { - Healthy, - Unhealthy, -} - -impl Display for OverallStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - OverallStatus::Healthy => write!(f, "Healthy"), - OverallStatus::Unhealthy => write!(f, "Unhealthy"), - } - } -} - -#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] -pub enum Priority { - Essential, - Optional, -} - -impl Display for Priority { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Priority::Essential => write!(f, "Essential"), - Priority::Optional => write!(f, "Optional"), - } - } -} - -#[derive(Debug)] -pub struct ServiceInfo { - id: String, - pub name: String, - pub priority: Priority, - - pub status: Arc>, -} - -impl ServiceInfo { - pub fn new(id: &str, name: &str, priority: Priority) -> Self { - Self { - id: id.to_string(), - name: name.to_string(), - priority, - status: Arc::new(RwLock::new(Status::Stopped)), - } - } - - pub async fn set_status(&self, status: Status) { - *(self.status.write().await) = status - } -} - -impl PartialEq for ServiceInfo { - fn eq(&self, other: &Self) -> bool { - self.id == other.id - } -} - -impl Eq for ServiceInfo {} - -impl Ord for ServiceInfo { - fn cmp(&self, other: &Self) -> Ordering { - self.name.cmp(&other.name) - } -} - -impl PartialOrd for ServiceInfo { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Hash for ServiceInfo { - fn hash(&self, state: &mut H) { - self.id.hash(state); - } -} -//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult -pub trait Service: DowncastSync { - fn info(&self) -> &ServiceInfo; - fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; - fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; - fn task<'a>(&self) -> Option> { - None - } - - fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { - Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) - } -} - -impl_downcast!(sync Service); - -impl Eq for dyn Service {} - -impl PartialEq for dyn Service { - fn eq(&self, other: &Self) -> bool { - self.info() == other.info() - } -} - -impl Ord for dyn Service { - fn cmp(&self, other: &Self) -> Ordering { - self.info().cmp(other.info()) - } -} - -impl PartialOrd for dyn Service { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Hash for dyn Service { - fn hash(&self, state: &mut H) { - self.info().hash(state); - } -} - -#[derive(Default)] -pub struct ServiceManagerBuilder { - services: Vec>>, -} - -impl ServiceManagerBuilder { - pub fn new() -> Self { - Self { services: Vec::new() } - } - - //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub async fn with_service(mut self, service: Arc>) -> Self { - let lock = service.read().await; - - let mut found = false; - for registered_service in self.services.iter() { - let registered_service = registered_service.read().await; - - if registered_service.info().id == lock.info().id { - found = true; - } - } - - if found { - warn!( - "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", - lock.info().name, - lock.info().id - ); - return self; - } - - drop(lock); - - self.services.push(service); - self - } - - pub async fn build(self) -> Arc { - let service_manager = ServiceManager { - services: self.services, - arc: RwLock::new(SetLock::new()), - tasks: RwLock::new(HashMap::new()), - }; - - let self_arc = Arc::new(service_manager); - - match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { - Ok(()) => {} - Err(err) => { - panic!("Failed to set ServiceManager in SetLock for self_arc: {}", err); - } - } - - self_arc - } -} - -#[derive(Debug, Error)] -pub enum StartupError { - #[error("Service {0} is not managed by this Service Manager")] - ServiceNotManaged(String), - #[error("Service {0} already has a background task running")] - BackgroundTaskAlreadyRunning(String), - #[error("Service {0} is already running")] - ServiceAlreadyRunning(String), - #[error("Failed to start service {0}")] - FailedToStartService(String), -} - -pub struct ServiceManager { - services: Vec>>, - arc: RwLock>>, - tasks: RwLock>>, -} - -impl ServiceManager { - pub fn builder() -> ServiceManagerBuilder { - ServiceManagerBuilder::new() - } - - pub async fn manages_service(&self, service: Arc>) -> bool { - let service_id = service.read().await.info().id.clone(); - for registered_service in self.services.iter() { - if registered_service.read().await.info().id == service_id { - return true; - } - } - - false - } - - pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { - let service_lock = service.read().await; - - // Check if the service is managed by this Service Manager - if !self.manages_service(Arc::clone(&service)).await { - return Err(StartupError::ServiceNotManaged(service_lock.info().id.clone())); - } - - // Check if the service already has a background task running - if self - .tasks - .read() - .await - .contains_key(service_lock.info().id.as_str()) - { - return Err(StartupError::BackgroundTaskAlreadyRunning( - service_lock.info().id.clone(), - )); - } - - // Upgrade the read lock to a write lock - drop(service_lock); - let mut service_lock = service.write().await; - - // Check if the service is already running and cancel the start request if it is - let mut status = service_lock.info().status.write().await; - if !matches!(&*status, Status::Stopped) { - return Err(StartupError::ServiceAlreadyRunning( - service_lock.info().id.clone(), - )); - } - - // Set the status to Starting - *status = Status::Starting; - drop(status); - - // Start the service - let service_manager = Arc::clone(self.arc.read().await.unwrap()); - let start = service_lock.start(service_manager); - let timeout_duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(timeout_duration, start).await; - match timeout_result { - Ok(start_result) => match start_result { - Ok(()) => { - service_lock.info().set_status(Status::Started).await; - } - Err(error) => { - service_lock.info().set_status(Status::FailedToStart(error)).await; - return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); - } - }, - Err(error) => { - service_lock - .info() - .set_status(Status::FailedToStart(Box::new(error))) - .await; - return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); - } - } - - // Start the background task if one is defined - let task = service_lock.task(); - drop(service_lock); - - if let Some(task) = task { - let service_clone = Arc::clone(&service); - let task_with_watchdog = task.then(|result| async move { - let service = service_clone; - - /* - We technically only need a read lock here, but we want to immediately stop other - services from accessing the service, so we acquire a write lock instead. - */ - let service = service.write().await; - - //TODO: Better handling of this. For example, send a message to a channel and let ServiceManager know. - match result { - Ok(()) => { - error!( - "Background task of service {} ended unexpectedly! Service will be marked as failed.", - service.info().name - ); - service - .info() - .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) - .await; - } - Err(error) => { - error!( - "Background task of service {} ended with error: {}! Service will be marked as failed.", - service.info().name, - error - ); - service - .info() - .set_status(Status::RuntimeError( - format!("Background task ended with error: {}", error).into(), - )) - .await; - } - } - }); - - let join_handle = spawn(task_with_watchdog); - self.tasks - .write() - .await - .insert(service.read().await.info().id.clone(), join_handle); - info!( - "Started background task for service {}", - service.read().await.info().name - ); - } - - Ok(()) - } - - pub async fn stop_service(&self, service: Arc>) { - if !self.manages_service(Arc::clone(&service)).await { - let service = service.read().await; - warn!( - "Tried to stop service {} ({}), but it's not managed by this Service Manager. Ignoring stop request.", - service.info().name, - service.info().id - ); - return; - } - - let mut service = service.write().await; - - let mut status = service.info().status.write().await; - if !matches!(&*status, Status::Started) { - warn!( - "Tried to stop service {} while it was in state {}. Ignoring stop request.", - service.info().name, - status - ); - return; - } - *status = Status::Stopping; - drop(status); - - let stop = service.stop(); - - let duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(duration, stop); - - match timeout_result.await { - Ok(stop_result) => match stop_result { - Ok(()) => { - info!("Stopped service: {}", service.info().name); - service.info().set_status(Status::Stopped).await; - } - Err(error) => { - error!("Failed to stop service {}: {}", service.info().name, error); - service.info().set_status(Status::FailedToStop(error)).await; - } - }, - Err(error) => { - error!( - "Failed to stop service {}: Timeout of {} seconds reached.", - service.info().name, - duration.as_secs() - ); - service - .info() - .set_status(Status::FailedToStop(Box::new(error))) - .await; - } - } - } - - pub async fn start_services(&self) -> Vec> { - let mut results = Vec::new(); - - for service in &self.services { - results.push(self.start_service(Arc::clone(service)).await); - } - - results - } - - pub async fn stop_services(&self) { - for service in &self.services { - self.stop_service(Arc::clone(service)).await; - } - } - - pub async fn get_service(&self) -> Option>> - where - T: Service, - { - for service in self.services.iter() { - let lock = service.read().await; - let is_t = lock.as_any().is::(); - drop(lock); - - if is_t { - let arc_clone = Arc::clone(service); - let service_ptr: *const Arc> = &arc_clone; - - /* - I tried to do this in safe rust for 3 days, but I couldn't figure it out - Should you come up with a way to do this in safe rust, please make a PR! :) - Anyways, this should never cause any issues, since we checked if the service is of type T - */ - unsafe { - let t_ptr: *const Arc> = mem::transmute(service_ptr); - return Some(Arc::clone(&*t_ptr)); - } - } - } - - None - } - - //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { - Box::pin(async move { - for service in self.services.iter() { - let service = service.read().await; - let status = service.info().status.read().await; - - if !matches!(&*status, Status::Started) { - return OverallStatus::Unhealthy; - } - } - - OverallStatus::Healthy - }) - } - - //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn status_tree(&self) -> PinnedBoxedFuture<'_, String> { - Box::pin(async move { - let mut text_buffer = String::new(); - - let mut failed_essentials = String::new(); - let mut failed_optionals = String::new(); - let mut non_failed_essentials = String::new(); - let mut non_failed_optionals = String::new(); - let mut others = String::new(); - - for service in self.services.iter() { - let service = service.read().await; - let info = service.info(); - let priority = &info.priority; - let status = info.status.read().await; - - match *status { - Status::Started | Status::Stopped => match priority { - Priority::Essential => { - non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); - } - Priority::Optional => { - non_failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); - } - }, - Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { - match priority { - Priority::Essential => { - failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); - } - Priority::Optional => { - failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); - } - } - } - _ => { - others.push_str(&format!(" - {}: {}\n", info.name, status)); - } - } - } - - if !failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed essential services")); - text_buffer.push_str(&failed_essentials); - } - - if !failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Failed optional services")); - text_buffer.push_str(&failed_optionals); - } - - if !non_failed_essentials.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Essential services")); - text_buffer.push_str(&non_failed_essentials); - } - - if !non_failed_optionals.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Optional services")); - text_buffer.push_str(&non_failed_optionals); - } - - if !others.is_empty() { - text_buffer.push_str(&format!("- {}:\n", "Other services")); - text_buffer.push_str(&others); - } - - text_buffer - }) - } -} - -impl Display for ServiceManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "Services: ")?; - - if self.services.is_empty() { - write!(f, "None")?; - return Ok(()); - } - - let mut services = self.services.iter().peekable(); - while let Some(service) = services.next() { - let service = service.blocking_read(); - write!(f, "{} ({})", service.info().name, service.info().id)?; - if services.peek().is_some() { - write!(f, ", ")?; - } - } - Ok(()) - } -} +pub mod service; +pub mod service_manager; +pub mod types; + +pub use service::{Service, ServiceInfo}; +pub use service_manager::{ServiceManager, ServiceManagerBuilder}; +pub use types::{ + BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, + Priority, StartupError, Status, +}; diff --git a/src/service/service.rs b/src/service/service.rs new file mode 100644 index 0000000..be332ad --- /dev/null +++ b/src/service/service.rs @@ -0,0 +1,104 @@ +use std::{ + cmp::Ordering, + hash::{Hash, Hasher}, + sync::Arc, +}; + +use downcast_rs::{impl_downcast, DowncastSync}; +use tokio::sync::RwLock; + +use super::{ + service_manager::ServiceManager, + types::{PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, Status}, +}; + +#[derive(Debug)] +pub struct ServiceInfo { + pub id: String, + pub name: String, + pub priority: Priority, + + pub status: Arc>, +} + +impl ServiceInfo { + pub fn new(id: &str, name: &str, priority: Priority) -> Self { + Self { + id: id.to_string(), + name: name.to_string(), + priority, + status: Arc::new(RwLock::new(Status::Stopped)), + } + } + + pub async fn set_status(&self, status: Status) { + *(self.status.write().await) = status + } +} + +impl PartialEq for ServiceInfo { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for ServiceInfo {} + +impl Ord for ServiceInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.name.cmp(&other.name) + } +} + +impl PartialOrd for ServiceInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Hash for ServiceInfo { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} +//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult +pub trait Service: DowncastSync { + fn info(&self) -> &ServiceInfo; + fn start(&mut self, service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()>; + fn stop(&mut self) -> PinnedBoxedFutureResult<'_, ()>; + fn task<'a>(&self) -> Option> { + None + } + + fn is_available(&self) -> PinnedBoxedFuture<'_, bool> { + Box::pin(async move { matches!(&*(self.info().status.read().await), Status::Started) }) + } +} + +impl_downcast!(sync Service); + +impl Eq for dyn Service {} + +impl PartialEq for dyn Service { + fn eq(&self, other: &Self) -> bool { + self.info() == other.info() + } +} + +impl Ord for dyn Service { + fn cmp(&self, other: &Self) -> Ordering { + self.info().cmp(other.info()) + } +} + +impl PartialOrd for dyn Service { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Hash for dyn Service { + fn hash(&self, state: &mut H) { + self.info().hash(state); + } +} diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs new file mode 100644 index 0000000..bafe6a7 --- /dev/null +++ b/src/service/service_manager.rs @@ -0,0 +1,415 @@ +use crate::setlock::SetLock; +use log::{error, info, warn}; +use serenity::FutureExt; +use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; +use tokio::{spawn, sync::RwLock, task::JoinHandle, time::timeout}; + +use super::{ + service::Service, + types::{OverallStatus, PinnedBoxedFuture, Priority, StartupError, Status}, +}; + +#[derive(Default)] +pub struct ServiceManagerBuilder { + services: Vec>>, +} + +impl ServiceManagerBuilder { + pub fn new() -> Self { + Self { services: Vec::new() } + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub async fn with_service(mut self, service: Arc>) -> Self { + let lock = service.read().await; + + let mut found = false; + for registered_service in self.services.iter() { + let registered_service = registered_service.read().await; + + if registered_service.info().id == lock.info().id { + found = true; + } + } + + if found { + warn!( + "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", + lock.info().name, + lock.info().id + ); + return self; + } + + drop(lock); + + self.services.push(service); + self + } + + pub async fn build(self) -> Arc { + let service_manager = ServiceManager { + services: self.services, + arc: RwLock::new(SetLock::new()), + tasks: RwLock::new(HashMap::new()), + }; + + let self_arc = Arc::new(service_manager); + + match self_arc.arc.write().await.set(Arc::clone(&self_arc)) { + Ok(()) => {} + Err(err) => { + panic!("Failed to set ServiceManager in SetLock for self_arc: {}", err); + } + } + + self_arc + } +} + +pub struct ServiceManager { + services: Vec>>, + arc: RwLock>>, + tasks: RwLock>>, +} + +impl ServiceManager { + pub fn builder() -> ServiceManagerBuilder { + ServiceManagerBuilder::new() + } + + pub async fn manages_service(&self, service: Arc>) -> bool { + let service_id = service.read().await.info().id.clone(); + for registered_service in self.services.iter() { + if registered_service.read().await.info().id == service_id { + return true; + } + } + + false + } + + pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { + let service_lock = service.read().await; + + // Check if the service is managed by this Service Manager + if !self.manages_service(Arc::clone(&service)).await { + return Err(StartupError::ServiceNotManaged(service_lock.info().id.clone())); + } + + // Check if the service already has a background task running + if self + .tasks + .read() + .await + .contains_key(service_lock.info().id.as_str()) + { + return Err(StartupError::BackgroundTaskAlreadyRunning( + service_lock.info().id.clone(), + )); + } + + // Upgrade the read lock to a write lock + drop(service_lock); + let mut service_lock = service.write().await; + + // Check if the service is already running and cancel the start request if it is + let mut status = service_lock.info().status.write().await; + if !matches!(&*status, Status::Stopped) { + return Err(StartupError::ServiceAlreadyRunning( + service_lock.info().id.clone(), + )); + } + + // Set the status to Starting + *status = Status::Starting; + drop(status); + + // Start the service + let service_manager = Arc::clone(self.arc.read().await.unwrap()); + let start = service_lock.start(service_manager); + let timeout_duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding + let timeout_result = timeout(timeout_duration, start).await; + match timeout_result { + Ok(start_result) => match start_result { + Ok(()) => { + service_lock.info().set_status(Status::Started).await; + } + Err(error) => { + service_lock.info().set_status(Status::FailedToStart(error)).await; + return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); + } + }, + Err(error) => { + service_lock + .info() + .set_status(Status::FailedToStart(Box::new(error))) + .await; + return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); + } + } + + // Start the background task if one is defined + let task = service_lock.task(); + drop(service_lock); + + if let Some(task) = task { + let service_clone = Arc::clone(&service); + let task_with_watchdog = task.then(|result| async move { + let service = service_clone; + + /* + We technically only need a read lock here, but we want to immediately stop other + services from accessing the service, so we acquire a write lock instead. + */ + let service = service.write().await; + + //TODO: Better handling of this. For example, send a message to a channel and let ServiceManager know. + match result { + Ok(()) => { + error!( + "Background task of service {} ended unexpectedly! Service will be marked as failed.", + service.info().name + ); + service + .info() + .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) + .await; + } + Err(error) => { + error!( + "Background task of service {} ended with error: {}! Service will be marked as failed.", + service.info().name, + error + ); + service + .info() + .set_status(Status::RuntimeError( + format!("Background task ended with error: {}", error).into(), + )) + .await; + } + } + }); + + let join_handle = spawn(task_with_watchdog); + self.tasks + .write() + .await + .insert(service.read().await.info().id.clone(), join_handle); + info!( + "Started background task for service {}", + service.read().await.info().name + ); + } + + Ok(()) + } + + pub async fn stop_service(&self, service: Arc>) { + if !self.manages_service(Arc::clone(&service)).await { + let service = service.read().await; + warn!( + "Tried to stop service {} ({}), but it's not managed by this Service Manager. Ignoring stop request.", + service.info().name, + service.info().id + ); + return; + } + + let mut service = service.write().await; + + let mut status = service.info().status.write().await; + if !matches!(&*status, Status::Started) { + warn!( + "Tried to stop service {} while it was in state {}. Ignoring stop request.", + service.info().name, + status + ); + return; + } + *status = Status::Stopping; + drop(status); + + let stop = service.stop(); + + let duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding + let timeout_result = timeout(duration, stop); + + match timeout_result.await { + Ok(stop_result) => match stop_result { + Ok(()) => { + info!("Stopped service: {}", service.info().name); + service.info().set_status(Status::Stopped).await; + } + Err(error) => { + error!("Failed to stop service {}: {}", service.info().name, error); + service.info().set_status(Status::FailedToStop(error)).await; + } + }, + Err(error) => { + error!( + "Failed to stop service {}: Timeout of {} seconds reached.", + service.info().name, + duration.as_secs() + ); + service + .info() + .set_status(Status::FailedToStop(Box::new(error))) + .await; + } + } + } + + pub async fn start_services(&self) -> Vec> { + let mut results = Vec::new(); + + for service in &self.services { + results.push(self.start_service(Arc::clone(service)).await); + } + + results + } + + pub async fn stop_services(&self) { + for service in &self.services { + self.stop_service(Arc::clone(service)).await; + } + } + + pub async fn get_service(&self) -> Option>> + where + T: Service, + { + for service in self.services.iter() { + let lock = service.read().await; + let is_t = lock.as_any().is::(); + drop(lock); + + if is_t { + let arc_clone = Arc::clone(service); + let service_ptr: *const Arc> = &arc_clone; + + /* + I tried to do this in safe rust for 3 days, but I couldn't figure it out + Should you come up with a way to do this in safe rust, please make a PR! :) + Anyways, this should never cause any issues, since we checked if the service is of type T + */ + unsafe { + let t_ptr: *const Arc> = mem::transmute(service_ptr); + return Some(Arc::clone(&*t_ptr)); + } + } + } + + None + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub fn overall_status(&self) -> PinnedBoxedFuture<'_, OverallStatus> { + Box::pin(async move { + for service in self.services.iter() { + let service = service.read().await; + let status = service.info().status.read().await; + + if !matches!(&*status, Status::Started) { + return OverallStatus::Unhealthy; + } + } + + OverallStatus::Healthy + }) + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub fn status_tree(&self) -> PinnedBoxedFuture<'_, String> { + Box::pin(async move { + let mut text_buffer = String::new(); + + let mut failed_essentials = String::new(); + let mut failed_optionals = String::new(); + let mut non_failed_essentials = String::new(); + let mut non_failed_optionals = String::new(); + let mut others = String::new(); + + for service in self.services.iter() { + let service = service.read().await; + let info = service.info(); + let priority = &info.priority; + let status = info.status.read().await; + + match *status { + Status::Started | Status::Stopped => match priority { + Priority::Essential => { + non_failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + } + Priority::Optional => { + non_failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + } + }, + Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { + match priority { + Priority::Essential => { + failed_essentials.push_str(&format!(" - {}: {}\n", info.name, status)); + } + Priority::Optional => { + failed_optionals.push_str(&format!(" - {}: {}\n", info.name, status)); + } + } + } + _ => { + others.push_str(&format!(" - {}: {}\n", info.name, status)); + } + } + } + + if !failed_essentials.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Failed essential services")); + text_buffer.push_str(&failed_essentials); + } + + if !failed_optionals.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Failed optional services")); + text_buffer.push_str(&failed_optionals); + } + + if !non_failed_essentials.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Essential services")); + text_buffer.push_str(&non_failed_essentials); + } + + if !non_failed_optionals.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Optional services")); + text_buffer.push_str(&non_failed_optionals); + } + + if !others.is_empty() { + text_buffer.push_str(&format!("- {}:\n", "Other services")); + text_buffer.push_str(&others); + } + + text_buffer + }) + } +} + +impl Display for ServiceManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Services: ")?; + + if self.services.is_empty() { + write!(f, "None")?; + return Ok(()); + } + + let mut services = self.services.iter().peekable(); + while let Some(service) = services.next() { + let service = service.blocking_read(); + write!(f, "{} ({})", service.info().name, service.info().id)?; + if services.peek().is_some() { + write!(f, ", ")?; + } + } + Ok(()) + } +} diff --git a/src/service/types.rs b/src/service/types.rs new file mode 100644 index 0000000..8226c42 --- /dev/null +++ b/src/service/types.rs @@ -0,0 +1,95 @@ +use std::{error::Error, fmt::Display, future::Future, pin::Pin}; + +use thiserror::Error; + +pub type BoxedError = Box; + +pub type BoxedFuture<'a, T> = Box + Send + 'a>; +pub type BoxedFutureResult<'a, T> = BoxedFuture<'a, Result>; + +pub type PinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type PinnedBoxedFutureResult<'a, T> = PinnedBoxedFuture<'a, Result>; + +#[derive(Debug)] +pub enum Status { + Started, + Stopped, + Starting, + Stopping, + FailedToStart(BoxedError), //TODO: Test out if it'd be better to use a String instead + FailedToStop(BoxedError), + RuntimeError(BoxedError), +} + +impl Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Status::Started => write!(f, "Started"), + Status::Stopped => write!(f, "Stopped"), + Status::Starting => write!(f, "Starting"), + Status::Stopping => write!(f, "Stopping"), + Status::FailedToStart(error) => write!(f, "Failed to start: {}", error), + Status::FailedToStop(error) => write!(f, "Failed to stop: {}", error), + Status::RuntimeError(error) => write!(f, "Runtime error: {}", error), + } + } +} + +impl PartialEq for Status { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (Status::Started, Status::Started) + | (Status::Stopped, Status::Stopped) + | (Status::Starting, Status::Starting) + | (Status::Stopping, Status::Stopping) + | (Status::FailedToStart(_), Status::FailedToStart(_)) + | (Status::FailedToStop(_), Status::FailedToStop(_)) + | (Status::RuntimeError(_), Status::RuntimeError(_)) + ) + } +} + +impl Eq for Status {} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum OverallStatus { + Healthy, + Unhealthy, +} + +impl Display for OverallStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OverallStatus::Healthy => write!(f, "Healthy"), + OverallStatus::Unhealthy => write!(f, "Unhealthy"), + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum Priority { + Essential, + Optional, +} + +impl Display for Priority { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Priority::Essential => write!(f, "Essential"), + Priority::Optional => write!(f, "Optional"), + } + } +} + +#[derive(Debug, Error)] +pub enum StartupError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + #[error("Service {0} already has a background task running")] + BackgroundTaskAlreadyRunning(String), + #[error("Service {0} is already running")] + ServiceAlreadyRunning(String), + #[error("Failed to start service {0}")] + FailedToStartService(String), +} From 889aa7c2a3cdad6dfaa5fb219a7c37773510bed3 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sun, 21 Jan 2024 15:46:24 +0100 Subject: [PATCH 4/9] Implement Watchdog module --- src/service.rs | 2 ++ src/service/watchdog.rs | 56 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 58 insertions(+) create mode 100644 src/service/watchdog.rs diff --git a/src/service.rs b/src/service.rs index ff1ee87..10131cc 100644 --- a/src/service.rs +++ b/src/service.rs @@ -2,6 +2,7 @@ pub mod discord; pub mod service; pub mod service_manager; pub mod types; +pub mod watchdog; pub use service::{Service, ServiceInfo}; pub use service_manager::{ServiceManager, ServiceManagerBuilder}; @@ -9,3 +10,4 @@ pub use types::{ BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, StartupError, Status, }; +pub use watchdog::Watchdog; diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs new file mode 100644 index 0000000..7edca86 --- /dev/null +++ b/src/service/watchdog.rs @@ -0,0 +1,56 @@ +use log::error; +use serenity::FutureExt; +use std::{mem::replace, sync::Arc}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, +}; + +use super::PinnedBoxedFuture; + +pub struct Watchdog<'a, T: Send> { + task: PinnedBoxedFuture<'a, T>, + subscribers: Arc>>>>, +} + +impl<'a, T: 'a + Send> Watchdog<'a, T> { + pub fn new(task: PinnedBoxedFuture<'a, T>) -> Self { + Self { + task, + subscribers: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn append(&mut self, new_task: F) + where + F: FnOnce(T) -> PinnedBoxedFuture<'a, T> + 'a + Send, + { + let previous_task = replace( + &mut self.task, + Box::pin(async { unreachable!("Undefined watchdog task") }), + ); + + self.task = Box::pin(previous_task.then(new_task)); + } + + pub async fn subscribe(&self) -> Receiver> { + let (tx, rx) = channel(1); + self.subscribers.lock().await.push(tx); + rx + } + + pub async fn run(self) { + let result = self.task.await; + let result = Arc::new(result); + for subscriber in self.subscribers.lock().await.iter() { + let send_result = subscriber.send(Arc::clone(&result)).await; + + if let Err(e) = send_result { + error!( + "Failed to send watchdog task result to one of the subscribers: {}", + e + ); + } + } + } +} From ddf5ca5ba474e12a2b3a8493b670c05460f56c6b Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Mon, 22 Jan 2024 04:38:08 +0100 Subject: [PATCH 5/9] Optimize Watchdog module Make usage of Watchdog more flexible --- src/service/watchdog.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs index 7edca86..8102f98 100644 --- a/src/service/watchdog.rs +++ b/src/service/watchdog.rs @@ -1,6 +1,6 @@ use log::error; use serenity::FutureExt; -use std::{mem::replace, sync::Arc}; +use std::{future::Future, mem::replace, sync::Arc}; use tokio::sync::{ mpsc::{channel, Receiver, Sender}, Mutex, @@ -21,16 +21,19 @@ impl<'a, T: 'a + Send> Watchdog<'a, T> { } } - pub fn append(&mut self, new_task: F) + pub fn append(&mut self, task: FN) where - F: FnOnce(T) -> PinnedBoxedFuture<'a, T> + 'a + Send, + FN: FnOnce(T) -> FUT + Send + 'a, + FUT: Future + Send + 'a, { let previous_task = replace( &mut self.task, Box::pin(async { unreachable!("Undefined watchdog task") }), ); - self.task = Box::pin(previous_task.then(new_task)); + let task = previous_task.then(task); + + self.task = Box::pin(task); } pub async fn subscribe(&self) -> Receiver> { @@ -47,7 +50,7 @@ impl<'a, T: 'a + Send> Watchdog<'a, T> { if let Err(e) = send_result { error!( - "Failed to send watchdog task result to one of the subscribers: {}", + "Failed to send a watchdog task result to one of its subscribers: {}", e ); } From 73170d124c4d9205b34d408d4b18df51752ba48e Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Wed, 24 Jan 2024 20:52:36 +0100 Subject: [PATCH 6/9] Refactor start_service Refactor the giant block of cod e into smaller helper methods where possible --- src/service/discord.rs | 9 +- src/service/service_manager.rs | 168 ++++++++++++++++++++------------- src/service/types.rs | 14 ++- 3 files changed, 119 insertions(+), 72 deletions(-) diff --git a/src/service/discord.rs b/src/service/discord.rs index c5a44fb..cfb9cce 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -1,6 +1,6 @@ use crate::setlock::SetLock; -use super::{BoxedError, PinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; +use super::{PinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; use log::{error, info}; use serenity::{ all::{GatewayIntents, Ready}, @@ -12,7 +12,7 @@ use serenity::{ prelude::TypeMap, Client, Error, }; -use std::{any::Any, future::Future, sync::Arc, time::Duration}; +use std::{sync::Arc, time::Duration}; use tokio::{ select, spawn, sync::{Mutex, Notify, RwLock}, @@ -163,10 +163,7 @@ struct EventHandler { impl EventHandler { pub fn new(client: Arc>>, ready_notify: Arc) -> Self { - Self { - client, - ready_notify, - } + Self { client, ready_notify } } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index bafe6a7..f743fed 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -1,8 +1,12 @@ -use crate::setlock::SetLock; +use crate::{service::Watchdog, setlock::SetLock}; use log::{error, info, warn}; -use serenity::FutureExt; use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; -use tokio::{spawn, sync::RwLock, task::JoinHandle, time::timeout}; +use tokio::{ + spawn, + sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}, + task::JoinHandle, + time::timeout, +}; use super::{ service::Service, @@ -49,9 +53,9 @@ impl ServiceManagerBuilder { pub async fn build(self) -> Arc { let service_manager = ServiceManager { - services: self.services, arc: RwLock::new(SetLock::new()), - tasks: RwLock::new(HashMap::new()), + services: self.services, + background_tasks: RwLock::new(HashMap::new()), }; let self_arc = Arc::new(service_manager); @@ -68,9 +72,9 @@ impl ServiceManagerBuilder { } pub struct ServiceManager { - services: Vec>>, arc: RwLock>>, - tasks: RwLock>>, + services: Vec>>, + background_tasks: RwLock>>, } impl ServiceManager { @@ -78,8 +82,7 @@ impl ServiceManager { ServiceManagerBuilder::new() } - pub async fn manages_service(&self, service: Arc>) -> bool { - let service_id = service.read().await.info().id.clone(); + pub async fn manages_service(&self, service_id: &str) -> bool { for registered_service in self.services.iter() { if registered_service.read().await.info().id == service_id { return true; @@ -92,62 +95,18 @@ impl ServiceManager { pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { let service_lock = service.read().await; - // Check if the service is managed by this Service Manager - if !self.manages_service(Arc::clone(&service)).await { - return Err(StartupError::ServiceNotManaged(service_lock.info().id.clone())); - } - - // Check if the service already has a background task running - if self - .tasks - .read() - .await - .contains_key(service_lock.info().id.as_str()) - { - return Err(StartupError::BackgroundTaskAlreadyRunning( - service_lock.info().id.clone(), - )); - } + self.check_is_service_managed(&service_lock).await?; + self.check_no_known_background_task(&service_lock).await?; + self.check_is_service_stopped(&service_lock).await?; - // Upgrade the read lock to a write lock drop(service_lock); let mut service_lock = service.write().await; - // Check if the service is already running and cancel the start request if it is let mut status = service_lock.info().status.write().await; - if !matches!(&*status, Status::Stopped) { - return Err(StartupError::ServiceAlreadyRunning( - service_lock.info().id.clone(), - )); - } - - // Set the status to Starting *status = Status::Starting; drop(status); - // Start the service - let service_manager = Arc::clone(self.arc.read().await.unwrap()); - let start = service_lock.start(service_manager); - let timeout_duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(timeout_duration, start).await; - match timeout_result { - Ok(start_result) => match start_result { - Ok(()) => { - service_lock.info().set_status(Status::Started).await; - } - Err(error) => { - service_lock.info().set_status(Status::FailedToStart(error)).await; - return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); - } - }, - Err(error) => { - service_lock - .info() - .set_status(Status::FailedToStart(Box::new(error))) - .await; - return Err(StartupError::FailedToStartService(service_lock.info().id.clone())); - } - } + self.boot_service(&mut service_lock).await?; // Start the background task if one is defined let task = service_lock.task(); @@ -155,33 +114,37 @@ impl ServiceManager { if let Some(task) = task { let service_clone = Arc::clone(&service); - let task_with_watchdog = task.then(|result| async move { + let mut watchdog = Watchdog::new(task); + + watchdog.append(|result| async move { let service = service_clone; /* - We technically only need a read lock here, but we want to immediately stop other - services from accessing the service, so we acquire a write lock instead. + We technically only need a read lock here, but we want to immediately stop + otherservices from accessing the service, so we acquire a write lock instead. */ let service = service.write().await; - //TODO: Better handling of this. For example, send a message to a channel and let ServiceManager know. match result { Ok(()) => { error!( "Background task of service {} ended unexpectedly! Service will be marked as failed.", service.info().name ); + service .info() .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) .await; } + Err(error) => { error!( "Background task of service {} ended with error: {}! Service will be marked as failed.", service.info().name, error ); + service .info() .set_status(Status::RuntimeError( @@ -190,13 +153,16 @@ impl ServiceManager { .await; } } + Ok(()) }); - let join_handle = spawn(task_with_watchdog); - self.tasks + let join_handle = spawn(watchdog.run()); + + self.background_tasks .write() .await .insert(service.read().await.info().id.clone(), join_handle); + info!( "Started background task for service {}", service.read().await.info().name @@ -207,7 +173,7 @@ impl ServiceManager { } pub async fn stop_service(&self, service: Arc>) { - if !self.manages_service(Arc::clone(&service)).await { + if !self.manages_service(&service.read().await.info().id).await { let service = service.read().await; warn!( "Tried to stop service {} ({}), but it's not managed by this Service Manager. Ignoring stop request.", @@ -391,6 +357,80 @@ impl ServiceManager { text_buffer }) } + + // Helper methods for start_service + + async fn check_is_service_managed( + &self, + service: &RwLockReadGuard<'_, dyn Service>, + ) -> Result<(), StartupError> { + let service_id = service.info().id.clone(); + let manages_service = self.manages_service(&service_id).await; + + match manages_service { + true => Ok(()), + false => Err(StartupError::ServiceNotManaged(service_id)), + } + } + + async fn check_no_known_background_task( + &self, + service: &RwLockReadGuard<'_, dyn Service>, + ) -> Result<(), StartupError> { + let tasks = self.background_tasks.read().await; + let is_background_task_running = tasks.contains_key(service.info().id.as_str()); + + match is_background_task_running { + true => Err(StartupError::BackgroundTaskAlreadyRunning( + service.info().id.clone(), + )), + false => Ok(()), + } + } + + async fn check_is_service_stopped( + &self, + service: &RwLockReadGuard<'_, dyn Service>, + ) -> Result<(), StartupError> { + let status = service.info().status.read().await; + + match &*status { + Status::Stopped => Ok(()), + _ => Err(StartupError::ServiceNotStopped(service.info().id.clone())), + } + } + + async fn boot_service( + &self, + service: &mut RwLockWriteGuard<'_, dyn Service>, + ) -> Result<(), StartupError> { + let service_manager = Arc::clone(self.arc.read().await.unwrap()); + + //TODO: Add to config instead of hardcoding duration + let start = service.start(service_manager); + let timeout_result = timeout(Duration::from_secs(10), start).await; + + match timeout_result { + Ok(start_result) => match start_result { + Ok(()) => { + service.info().set_status(Status::Started).await; + } + Err(error) => { + service.info().set_status(Status::FailedToStart(error)).await; + return Err(StartupError::FailedToStartService(service.info().id.clone())); + } + }, + Err(error) => { + service + .info() + .set_status(Status::FailedToStart(Box::new(error))) + .await; + return Err(StartupError::FailedToStartService(service.info().id.clone())); + } + } + + Ok(()) + } } impl Display for ServiceManager { diff --git a/src/service/types.rs b/src/service/types.rs index 8226c42..71346b1 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -88,8 +88,18 @@ pub enum StartupError { ServiceNotManaged(String), #[error("Service {0} already has a background task running")] BackgroundTaskAlreadyRunning(String), - #[error("Service {0} is already running")] - ServiceAlreadyRunning(String), + #[error("Service {0} is not stopped")] + ServiceNotStopped(String), #[error("Failed to start service {0}")] FailedToStartService(String), } + +#[derive(Debug, Error)] +pub enum ShutdownError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + #[error("Service {0} is not started")] + ServiceNotStarted(String), + #[error("Failed to stop service {0}")] + FailedToStopService(String), +} From 7fcf40a40943cc30a4b4647573e58383c7d01aaf Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Wed, 24 Jan 2024 22:06:19 +0100 Subject: [PATCH 7/9] Edit StartupError string representations --- src/service/types.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/service/types.rs b/src/service/types.rs index 71346b1..5ae3760 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -90,7 +90,7 @@ pub enum StartupError { BackgroundTaskAlreadyRunning(String), #[error("Service {0} is not stopped")] ServiceNotStopped(String), - #[error("Failed to start service {0}")] + #[error("Service {0} failed to start")] FailedToStartService(String), } @@ -100,6 +100,6 @@ pub enum ShutdownError { ServiceNotManaged(String), #[error("Service {0} is not started")] ServiceNotStarted(String), - #[error("Failed to stop service {0}")] + #[error("Service {0} failed to stop")] FailedToStopService(String), } From ac58035f018562c625e167225e6c680ad64e8bd3 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 26 Jan 2024 20:06:21 +0100 Subject: [PATCH 8/9] Finish refactor of start_service method --- src/service/service_manager.rs | 131 ++++++++++++++++----------------- 1 file changed, 65 insertions(+), 66 deletions(-) diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index f743fed..ac0b2c0 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -7,7 +7,6 @@ use tokio::{ task::JoinHandle, time::timeout, }; - use super::{ service::Service, types::{OverallStatus, PinnedBoxedFuture, Priority, StartupError, Status}, @@ -98,7 +97,7 @@ impl ServiceManager { self.check_is_service_managed(&service_lock).await?; self.check_no_known_background_task(&service_lock).await?; self.check_is_service_stopped(&service_lock).await?; - + drop(service_lock); let mut service_lock = service.write().await; @@ -106,69 +105,9 @@ impl ServiceManager { *status = Status::Starting; drop(status); - self.boot_service(&mut service_lock).await?; - - // Start the background task if one is defined - let task = service_lock.task(); - drop(service_lock); - - if let Some(task) = task { - let service_clone = Arc::clone(&service); - let mut watchdog = Watchdog::new(task); - - watchdog.append(|result| async move { - let service = service_clone; - - /* - We technically only need a read lock here, but we want to immediately stop - otherservices from accessing the service, so we acquire a write lock instead. - */ - let service = service.write().await; - - match result { - Ok(()) => { - error!( - "Background task of service {} ended unexpectedly! Service will be marked as failed.", - service.info().name - ); - - service - .info() - .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) - .await; - } - - Err(error) => { - error!( - "Background task of service {} ended with error: {}! Service will be marked as failed.", - service.info().name, - error - ); - - service - .info() - .set_status(Status::RuntimeError( - format!("Background task ended with error: {}", error).into(), - )) - .await; - } - } - Ok(()) - }); - - let join_handle = spawn(watchdog.run()); - - self.background_tasks - .write() - .await - .insert(service.read().await.info().id.clone(), join_handle); - - info!( - "Started background task for service {}", - service.read().await.info().name - ); - } - + self.init_service(&mut service_lock).await?; + self.start_background_task(&service_lock, Arc::clone(&service)).await; + Ok(()) } @@ -400,7 +339,7 @@ impl ServiceManager { } } - async fn boot_service( + async fn init_service( &self, service: &mut RwLockWriteGuard<'_, dyn Service>, ) -> Result<(), StartupError> { @@ -431,6 +370,66 @@ impl ServiceManager { Ok(()) } + + async fn start_background_task(&self, service_lock: &RwLockWriteGuard<'_, dyn Service>, service: Arc>) { + let task = service_lock.task(); + if let Some(task) = task { + let service_clone = Arc::clone(&service); + let mut watchdog = Watchdog::new(task); + + watchdog.append(|result| async move { + let service = service_clone; + + /* + We technically only need a read lock here, but we want to immediately stop + otherservices from accessing the service, so we acquire a write lock instead. + */ + let service = service.write().await; + + match result { + Ok(()) => { + error!( + "Background task of service {} ended unexpectedly! Service will be marked as failed.", + service.info().name + ); + + service + .info() + .set_status(Status::RuntimeError("Background task ended unexpectedly!".into())) + .await; + } + + Err(error) => { + error!( + "Background task of service {} ended with error: {}. Service will be marked as failed.", + service.info().name, + error + ); + + service + .info() + .set_status(Status::RuntimeError( + format!("Background task ended with error: {}", error).into(), + )) + .await; + } + } + Ok(()) + }); + + let join_handle = spawn(watchdog.run()); + + self.background_tasks + .write() + .await + .insert(service.read().await.info().id.clone(), join_handle); + + info!( + "Started background task for service {}", + service.read().await.info().name + ); + } + } } impl Display for ServiceManager { From b789ca4433c15b44da765be56051d3267beba9e4 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Fri, 26 Jan 2024 23:34:20 +0100 Subject: [PATCH 9/9] Refactor stop_service - Apply same refactoring to stop_service - Increase name placeholder space to 30 in logger --- src/log.rs | 2 +- src/service/discord.rs | 5 +- src/service/service.rs | 3 +- src/service/service_manager.rs | 185 ++++++++++++++++----------------- 4 files changed, 97 insertions(+), 98 deletions(-) diff --git a/src/log.rs b/src/log.rs index 43aa4e5..55c46ba 100644 --- a/src/log.rs +++ b/src/log.rs @@ -25,7 +25,7 @@ pub fn setup() -> Result<(), SetLoggerError> { fern::Dispatch::new() .format(move |out, message, record| { out.finish(format_args!( - "[{} {: <25} {: <5}] {}", + "[{} {: <30} {: <5}] {}", humantime::format_rfc3339_seconds(SystemTime::now()), record.target(), colors.color(record.level()), diff --git a/src/service/discord.rs b/src/service/discord.rs index cfb9cce..d6f31ff 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -57,11 +57,11 @@ impl Service for DiscordService { fn start(&mut self, _service_manager: Arc) -> PinnedBoxedFutureResult<'_, ()> { Box::pin(async move { + let client_ready_notify = Arc::new(Notify::new()); + let framework = StandardFramework::new(); framework.configure(Configuration::new().prefix("!")); - let client_ready_notify = Arc::new(Notify::new()); - let mut client = Client::builder(self.discord_token.as_str(), GatewayIntents::all()) .framework(framework) .event_handler(EventHandler::new( @@ -124,7 +124,6 @@ impl Service for DiscordService { result?; } - info!("Discord client stopped"); Ok(()) }) } diff --git a/src/service/service.rs b/src/service/service.rs index be332ad..185f06a 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -32,7 +32,8 @@ impl ServiceInfo { } pub async fn set_status(&self, status: Status) { - *(self.status.write().await) = status + let mut lock = self.status.write().await; + *(lock) = status } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index ac0b2c0..91fb842 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -9,7 +9,7 @@ use tokio::{ }; use super::{ service::Service, - types::{OverallStatus, PinnedBoxedFuture, Priority, StartupError, Status}, + types::{OverallStatus, PinnedBoxedFuture, Priority, ShutdownError, StartupError, Status}, }; #[derive(Default)] @@ -94,76 +94,54 @@ impl ServiceManager { pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { let service_lock = service.read().await; - self.check_is_service_managed(&service_lock).await?; - self.check_no_known_background_task(&service_lock).await?; - self.check_is_service_stopped(&service_lock).await?; + let service_id = service_lock.info().id.clone(); + + if !self.manages_service(&service_id).await { + return Err(StartupError::ServiceNotManaged(service_id)); + } + + if !self.is_service_stopped(&service_lock).await { + return Err(StartupError::ServiceNotStopped(service_id)); + } + + if self.has_background_task_running(&service_lock).await { + return Err(StartupError::BackgroundTaskAlreadyRunning(service_id)); + } drop(service_lock); let mut service_lock = service.write().await; - let mut status = service_lock.info().status.write().await; - *status = Status::Starting; - drop(status); - + service_lock.info().set_status(Status::Starting).await; self.init_service(&mut service_lock).await?; self.start_background_task(&service_lock, Arc::clone(&service)).await; + + info!("Started service {}", service_lock.info().name); Ok(()) } - pub async fn stop_service(&self, service: Arc>) { - if !self.manages_service(&service.read().await.info().id).await { - let service = service.read().await; - warn!( - "Tried to stop service {} ({}), but it's not managed by this Service Manager. Ignoring stop request.", - service.info().name, - service.info().id - ); - return; - } + pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { + let service_lock = service.read().await; - let mut service = service.write().await; + if !(self.manages_service(service_lock.info().id.as_str()).await) { + return Err(ShutdownError::ServiceNotManaged(service_lock.info().id.clone())); + } - let mut status = service.info().status.write().await; - if !matches!(&*status, Status::Started) { - warn!( - "Tried to stop service {} while it was in state {}. Ignoring stop request.", - service.info().name, - status - ); - return; + if !self.is_service_started(&service_lock).await { + return Err(ShutdownError::ServiceNotStarted(service_lock.info().id.clone())); } - *status = Status::Stopping; - drop(status); - let stop = service.stop(); + self.stop_background_task(&service_lock).await; - let duration = Duration::from_secs(10); //TODO: Add to config instead of hardcoding - let timeout_result = timeout(duration, stop); + drop(service_lock); + let mut service_lock = service.write().await; - match timeout_result.await { - Ok(stop_result) => match stop_result { - Ok(()) => { - info!("Stopped service: {}", service.info().name); - service.info().set_status(Status::Stopped).await; - } - Err(error) => { - error!("Failed to stop service {}: {}", service.info().name, error); - service.info().set_status(Status::FailedToStop(error)).await; - } - }, - Err(error) => { - error!( - "Failed to stop service {}: Timeout of {} seconds reached.", - service.info().name, - duration.as_secs() - ); - service - .info() - .set_status(Status::FailedToStop(Box::new(error))) - .await; - } - } + service_lock.info().set_status(Status::Stopping).await; + self.shutdown_service(&mut service_lock).await?; + + info!("Stopped service {}", service_lock.info().name); + + Ok(()) } pub async fn start_services(&self) -> Vec> { @@ -176,10 +154,14 @@ impl ServiceManager { results } - pub async fn stop_services(&self) { + pub async fn stop_services(&self) -> Vec> { + let mut results = Vec::new(); + for service in &self.services { - self.stop_service(Arc::clone(service)).await; + results.push(self.stop_service(Arc::clone(service)).await); } + + results } pub async fn get_service(&self) -> Option>> @@ -297,46 +279,30 @@ impl ServiceManager { }) } - // Helper methods for start_service + // Helper methods for start_service and stop_service - async fn check_is_service_managed( + async fn has_background_task_running( &self, service: &RwLockReadGuard<'_, dyn Service>, - ) -> Result<(), StartupError> { - let service_id = service.info().id.clone(); - let manages_service = self.manages_service(&service_id).await; - - match manages_service { - true => Ok(()), - false => Err(StartupError::ServiceNotManaged(service_id)), - } + ) -> bool { + let tasks = self.background_tasks.read().await; + tasks.contains_key(service.info().id.as_str()) } - async fn check_no_known_background_task( + async fn is_service_started( &self, service: &RwLockReadGuard<'_, dyn Service>, - ) -> Result<(), StartupError> { - let tasks = self.background_tasks.read().await; - let is_background_task_running = tasks.contains_key(service.info().id.as_str()); - - match is_background_task_running { - true => Err(StartupError::BackgroundTaskAlreadyRunning( - service.info().id.clone(), - )), - false => Ok(()), - } + ) -> bool { + let status = service.info().status.read().await; + matches!(&*status, Status::Started) } - async fn check_is_service_stopped( + async fn is_service_stopped( &self, service: &RwLockReadGuard<'_, dyn Service>, - ) -> Result<(), StartupError> { + ) -> bool { let status = service.info().status.read().await; - - match &*status { - Status::Stopped => Ok(()), - _ => Err(StartupError::ServiceNotStopped(service.info().id.clone())), - } + matches!(&*status, Status::Stopped) } async fn init_service( @@ -371,18 +337,47 @@ impl ServiceManager { Ok(()) } + async fn shutdown_service( + &self, + service: &mut RwLockWriteGuard<'_, dyn Service>, + ) -> Result<(), ShutdownError> { + //TODO: Add to config instead of hardcoding duration + let stop = service.stop(); + let timeout_result = timeout(Duration::from_secs(10), stop).await; + + match timeout_result { + Ok(stop_result) => match stop_result { + Ok(()) => { + service.info().set_status(Status::Stopped).await; + } + Err(error) => { + service.info().set_status(Status::FailedToStop(error)).await; + return Err(ShutdownError::FailedToStopService(service.info().id.clone())); + } + }, + Err(error) => { + service + .info() + .set_status(Status::FailedToStop(Box::new(error))) + .await; + return Err(ShutdownError::FailedToStopService(service.info().id.clone())); + } + } + + Ok(()) + } + async fn start_background_task(&self, service_lock: &RwLockWriteGuard<'_, dyn Service>, service: Arc>) { let task = service_lock.task(); if let Some(task) = task { - let service_clone = Arc::clone(&service); let mut watchdog = Watchdog::new(task); watchdog.append(|result| async move { - let service = service_clone; + let service = service; /* We technically only need a read lock here, but we want to immediately stop - otherservices from accessing the service, so we acquire a write lock instead. + other services from accessing the service, so we acquire a write lock instead. */ let service = service.write().await; @@ -422,12 +417,16 @@ impl ServiceManager { self.background_tasks .write() .await - .insert(service.read().await.info().id.clone(), join_handle); + .insert(service_lock.info().id.clone(), join_handle); + } + } - info!( - "Started background task for service {}", - service.read().await.info().name - ); + async fn stop_background_task(&self, service_lock: &RwLockReadGuard<'_, dyn Service>) { + if self.has_background_task_running(service_lock).await { + let mut tasks_lock = self.background_tasks.write().await; + let task = tasks_lock.get(service_lock.info().id.as_str()).unwrap(); + task.abort(); + tasks_lock.remove(service_lock.info().id.as_str()); } } }