diff --git a/.devcontainer/.env b/.devcontainer/.env new file mode 100644 index 0000000..e4f79bd --- /dev/null +++ b/.devcontainer/.env @@ -0,0 +1,5 @@ +POSTGRES_USER=postgres +POSTGRES_PASSWORD=postgres +POSTGRES_DB=postgres +POSTGRES_HOSTNAME=localhost +POSTGRES_PORT=5432 \ No newline at end of file diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile new file mode 100644 index 0000000..90b4ce9 --- /dev/null +++ b/.devcontainer/Dockerfile @@ -0,0 +1,6 @@ +FROM mcr.microsoft.com/devcontainers/rust:1-1-bullseye + +# Include lld linker to improve build times either by using environment variable +# RUSTFLAGS="-C link-arg=-fuse-ld=lld" or with Cargo's configuration file (i.e see .cargo/config.toml). +RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ + && apt-get autoremove -y && apt-get clean -y diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 0000000..4266384 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,22 @@ +{ + "name": "lum", + "dockerComposeFile": "docker-compose.yml", + "service": "app", + "workspaceFolder": "/workspaces/${localWorkspaceFolderBasename}", + + // Comment out the extensions you do not want to install + "customizations":{ + "vscode": { + "extensions": [ + "github.copilot-chat", + "github.copilot", + "JScearcy.rust-doc-viewer", + "swellaby.vscode-rust-test-adapter", + "Gruntfuggly.todo-tree", + "usernamehw.errorlens" + ] + } + }, + + "remoteUser": "vscode" +} diff --git a/.devcontainer/docker-compose.yml b/.devcontainer/docker-compose.yml new file mode 100644 index 0000000..2605b33 --- /dev/null +++ b/.devcontainer/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3.8' + +volumes: + postgres-data: + +services: + app: + build: + context: . + dockerfile: Dockerfile + env_file: + # Ensure that the variables in .env match the same variables in devcontainer.json + - .env + + volumes: + - ../..:/workspaces:cached + + # Overrides default command so things don't shut down after the process ends. + command: sleep infinity + + # Runs app on the same network as the database container, allows "forwardPorts" in devcontainer.json function. + network_mode: service:db + + # Use "forwardPorts" in **devcontainer.json** to forward an app port locally. + # (Adding the "ports" property to this file will not forward from a Codespace.) + + db: + image: postgres + restart: unless-stopped + volumes: + - postgres-data:/var/lib/postgresql/data + env_file: + # Ensure that the variables in .env match the same variables in devcontainer.json + - .env + + # Add "forwardPorts": ["5432"] to **devcontainer.json** to forward PostgreSQL locally. + # (Adding the "ports" property to this file will not forward from a Codespace.) \ No newline at end of file diff --git a/.github/assets/portrait.png b/.github/assets/portrait.png new file mode 100644 index 0000000..02d2ea8 Binary files /dev/null and b/.github/assets/portrait.png differ diff --git a/Cargo.toml b/Cargo.toml index d39ef20..6f9421f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "lum" -version = "0.1.0" +version = "0.2.1" edition = "2021" description = "Lum Discord Bot" license= "MIT" @@ -12,7 +12,15 @@ repository = "https://github.com/Kitt3120/lum" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +dirs = "5.0.1" +downcast-rs = "1.2.0" +fern = { version = "0.6.2", features = ["chrono", "colored", "date-based"] } +humantime = "2.1.0" +log = { version = "0.4.20", features = ["serde"] } serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" -sqlx = { version = "0.7.3", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] } +serenity = { version = "0.12.0", default-features=false, features = ["builder", "cache", "collector", "client", "framework", "gateway", "http", "model", "standard_framework", "utils", "voice", "default_native_tls", "tokio_task_builder", "unstable_discord_api", "simd_json", "temp_cache", "chrono", "interactions_endpoint"] } +sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] } +thiserror = "1.0.52" tokio = { version = "1.35.1", features = ["full"] } +uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } \ No newline at end of file diff --git a/README.md b/README.md index 3756181..2e34a93 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,17 @@ +

+ +

+ # lum Lum Discord Bot -# Deployment +## Deployment Stable: [![Deploy](https://github.com/Kitt3120/lum/actions/workflows/deploy_release.yml/badge.svg)](https://github.com/Kitt3120/lum/actions/workflows/deploy_release.yml) Beta: [![Deploy](https://github.com/Kitt3120/lum/actions/workflows/deploy_prerelease.yml/badge.svg)](https://github.com/Kitt3120/lum/actions/workflows/deploy_prerelease.yml) -# Collaborating - -A board can be viewed [here](https://github.com/users/Kitt3120/projects/3) +## Collaborating -Issues can be viewed [here](https://github.com/Kitt3120/lum/issues) +Check out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues) diff --git a/build.rs b/build.rs index 7609593..d506869 100644 --- a/build.rs +++ b/build.rs @@ -2,4 +2,4 @@ fn main() { // trigger recompilation when a new migration is added println!("cargo:rerun-if-changed=migrations"); -} \ No newline at end of file +} diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..521c9cf --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +max_width = 110 \ No newline at end of file diff --git a/src/bot.rs b/src/bot.rs new file mode 100644 index 0000000..37dce6e --- /dev/null +++ b/src/bot.rs @@ -0,0 +1,124 @@ +use core::fmt; +use std::{fmt::Display, sync::Arc}; + +use log::error; +use tokio::{signal, sync::Mutex}; + +use crate::service::{ + types::LifetimedPinnedBoxedFuture, OverallStatus, Service, ServiceManager, ServiceManagerBuilder, +}; + +#[derive(Debug, Clone, Copy)] +pub enum ExitReason { + SIGINT, + EssentialServiceFailed, +} + +impl Display for ExitReason { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Self::SIGINT => write!(f, "SIGINT"), + Self::EssentialServiceFailed => write!(f, "Essential Service Failed"), + } + } +} + +pub struct BotBuilder { + name: String, + service_manager: ServiceManagerBuilder, +} + +impl BotBuilder { + pub fn new(name: &str) -> Self { + Self { + name: name.to_string(), + service_manager: ServiceManager::builder(), + } + } + + pub async fn with_service(mut self, service: Arc>) -> Self { + self.service_manager = self.service_manager.with_service(service).await; // The ServiceManagerBuilder itself will warn when adding a service multiple times + + self + } + + pub async fn with_services(mut self, services: Vec>>) -> Self { + for service in services { + self.service_manager = self.service_manager.with_service(service).await; + } + + self + } + + pub async fn build(self) -> Bot { + Bot { + name: self.name, + service_manager: self.service_manager.build().await, + } + } +} + +pub struct Bot { + pub name: String, + pub service_manager: Arc, +} + +impl Bot { + pub fn builder(name: &str) -> BotBuilder { + BotBuilder::new(name) + } + + //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future + pub fn start(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { + Box::pin(async move { + self.service_manager.start_services().await; + //TODO: Potential for further initialization here, like modules + }) + } + + //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future + pub fn stop(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { + Box::pin(async move { + self.service_manager.stop_services().await; + //TODO: Potential for further deinitialization here, like modules + }) + } + + pub async fn join(&self) -> ExitReason { + let name_clone = self.name.clone(); + let signal_task = tokio::spawn(async move { + let name = name_clone; + + let result = signal::ctrl_c().await; + if let Err(error) = result { + error!( + "Error receiving SIGINT: {}. {} will exit ungracefully immediately to prevent undefined behavior.", + error, name + ); + panic!("Error receiving SIGINT: {}", error); + } + }); + + let service_manager_clone = self.service_manager.clone(); + let mut receiver = self + .service_manager + .on_status_change + .event + .subscribe_channel("t", 2, true, true) + .await; + let status_task = tokio::spawn(async move { + let service_manager = service_manager_clone; + while (receiver.receiver.recv().await).is_some() { + let overall_status = service_manager.overall_status().await; + if overall_status == OverallStatus::Unhealthy { + return; + } + } + }); + + tokio::select! { + _ = signal_task => ExitReason::SIGINT, + _ = status_task => ExitReason::EssentialServiceFailed, + } + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..41e4ed1 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,129 @@ +use core::fmt; +use serde::{Deserialize, Serialize}; +use std::{ + fmt::{Display, Formatter}, + fs, io, + path::PathBuf, +}; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum ConfigPathError { + #[error("Unable to get OS config directory")] + UnknownBasePath, +} + +#[derive(Debug, Error)] +pub enum ConfigInitError { + #[error("Unable to get config path: {0}")] + Path(#[from] ConfigPathError), + #[error("I/O error: {0}")] + IO(#[from] io::Error), +} + +#[derive(Debug, Error)] +pub enum ConfigParseError { + #[error("Unable to get config path: {0}")] + Path(#[from] ConfigPathError), + #[error("Unable to initialize config: {0}")] + Init(#[from] ConfigInitError), + #[error("Unable to serialize or deserialize config: {0}")] + Serde(#[from] serde_json::Error), + #[error("I/O error: {0}")] + IO(#[from] io::Error), +} + +fn discord_token_default() -> String { + String::from("Please provide a token") +} + +#[derive(Debug, PartialEq, PartialOrd, Serialize, Deserialize, Clone)] +pub struct Config { + #[serde(rename = "discordToken", default = "discord_token_default")] + pub discord_token: String, +} + +impl Default for Config { + fn default() -> Self { + Config { + discord_token: discord_token_default(), + } + } +} + +impl Display for Config { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + let content = match serde_json::to_string(self) { + Ok(content) => content, + Err(error) => { + return write!(f, "Unable to serialize config: {}", error); + } + }; + + write!(f, "{}", content) + } +} + +#[derive(Debug)] +pub struct ConfigHandler { + pub app_name: String, +} + +impl ConfigHandler { + pub fn new(app_name: &str) -> Self { + ConfigHandler { + app_name: app_name.to_string(), + } + } + + pub fn get_config_dir_path(&self) -> Result { + let mut path = match dirs::config_dir() { + Some(path) => path, + None => return Err(ConfigPathError::UnknownBasePath), + }; + + path.push(&self.app_name); + Ok(path) + } + + pub fn create_config_dir_path(&self) -> Result<(), ConfigInitError> { + let path = self.get_config_dir_path()?; + fs::create_dir_all(path)?; + Ok(()) + } + + pub fn get_config_file_path(&self) -> Result { + let mut path = self.get_config_dir_path()?; + path.push("config.json"); + Ok(path) + } + + pub fn save_config(&self, config: &Config) -> Result<(), ConfigParseError> { + let path = self.get_config_file_path()?; + + if !path.exists() { + self.create_config_dir_path()?; + } + + let config_json = serde_json::to_string_pretty(config)?; + + fs::write(path, config_json)?; + + Ok(()) + } + + pub fn load_config(&self) -> Result { + let path = self.get_config_file_path()?; + if !path.exists() { + self.create_config_dir_path()?; + fs::write(&path, "{}")?; + } + + let config_json = fs::read_to_string(path)?; + let config: Config = serde_json::from_str(&config_json)?; + + self.save_config(&config)?; // In case the config file was missing some fields which serde used the defaults for + + Ok(config) + } +} diff --git a/src/event.rs b/src/event.rs new file mode 100644 index 0000000..0eab11f --- /dev/null +++ b/src/event.rs @@ -0,0 +1,13 @@ +pub mod arc_observable; +pub mod event; +pub mod event_repeater; +pub mod observable; +pub mod subscriber; +pub mod subscription; + +pub use arc_observable::ArcObservable; +pub use event::Event; +pub use event_repeater::EventRepeater; +pub use observable::{Observable, ObservableResult}; +pub use subscriber::{Callback, DispatchError, Subscriber}; +pub use subscription::{ReceiverSubscription, Subscription}; diff --git a/src/event/arc_observable.rs b/src/event/arc_observable.rs new file mode 100644 index 0000000..b399b67 --- /dev/null +++ b/src/event/arc_observable.rs @@ -0,0 +1,60 @@ +use std::{ + hash::{DefaultHasher, Hash, Hasher}, + sync::Arc, +}; + +use tokio::sync::Mutex; + +use super::{Event, ObservableResult}; + +#[derive(Debug)] +pub struct ArcObservable +where + T: Send + 'static + Hash, +{ + value: Arc>, + on_change: Event>, +} + +impl ArcObservable +where + T: Send + 'static + Hash, +{ + pub fn new(value: T, event_name: impl Into) -> Self { + Self { + value: Arc::new(Mutex::new(value)), + on_change: Event::new(event_name), + } + } + + pub async fn get(&self) -> Arc> { + Arc::clone(&self.value) + } + + pub async fn set(&self, value: T) -> ObservableResult> { + let mut lock = self.value.lock().await; + + let mut hasher = DefaultHasher::new(); + (*lock).hash(&mut hasher); + let current_value = hasher.finish(); + + let mut hasher = DefaultHasher::new(); + value.hash(&mut hasher); + let new_value = hasher.finish(); + + if current_value == new_value { + return ObservableResult::Unchanged; + } + + *lock = value; + drop(lock); + + let value = Arc::clone(&self.value); + let dispatch_result = self.on_change.dispatch(value).await; + + match dispatch_result { + Ok(_) => ObservableResult::Changed(Ok(())), + Err(errors) => ObservableResult::Changed(Err(errors)), + } + } +} diff --git a/src/event/event.rs b/src/event/event.rs new file mode 100644 index 0000000..d714381 --- /dev/null +++ b/src/event/event.rs @@ -0,0 +1,196 @@ +use crate::service::{BoxedError, PinnedBoxedFutureResult}; +use std::{ + any::type_name, + fmt::{self, Debug, Formatter}, + sync::Arc, +}; +use tokio::sync::{mpsc::channel, Mutex}; +use uuid::Uuid; + +use super::{Callback, DispatchError, ReceiverSubscription, Subscriber, Subscription}; + +pub struct Event +where + T: Send + Sync + 'static, +{ + pub name: String, + + pub uuid: Uuid, + subscribers: Mutex>>, +} + +impl Event +where + T: Send + Sync + 'static, +{ + pub fn new(name: S) -> Self + where + S: Into, + { + Self { + name: name.into(), + uuid: Uuid::new_v4(), + subscribers: Mutex::new(Vec::new()), + } + } + + pub async fn subscriber_count(&self) -> usize { + let subscribers = self.subscribers.lock().await; + subscribers.len() + } + + pub async fn subscribe_channel( + &self, + name: S, + buffer: usize, + log_on_error: bool, + remove_on_error: bool, + ) -> ReceiverSubscription> + where + S: Into, + { + let (sender, receiver) = channel(buffer); + let subscriber = Subscriber::new(name, log_on_error, remove_on_error, Callback::Channel(sender)); + + let subscription = Subscription::from(&subscriber); + let receiver_subscription = ReceiverSubscription::new(subscription, receiver); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + receiver_subscription + } + + pub async fn subscribe_async_closure( + &self, + name: S, + closure: impl Fn(Arc) -> PinnedBoxedFutureResult<()> + Send + Sync + 'static, + log_on_error: bool, + remove_on_error: bool, + ) -> Subscription + where + S: Into, + { + let subscriber = Subscriber::new( + name, + log_on_error, + remove_on_error, + Callback::AsyncClosure(Box::new(closure)), + ); + let subscription = Subscription::from(&subscriber); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + subscription + } + + pub async fn subscribe_closure( + &self, + name: S, + closure: impl Fn(Arc) -> Result<(), BoxedError> + Send + Sync + 'static, + log_on_error: bool, + remove_on_error: bool, + ) -> Subscription + where + S: Into, + { + let subscriber = Subscriber::new( + name, + log_on_error, + remove_on_error, + Callback::Closure(Box::new(closure)), + ); + let subscription = Subscription::from(&subscriber); + + let mut subscribers = self.subscribers.lock().await; + subscribers.push(subscriber); + + subscription + } + + pub async fn unsubscribe(&self, subscription: S) -> Option + where + S: Into, + { + let subscription_to_remove = subscription.into(); + + let mut subscribers = self.subscribers.lock().await; + let index = subscribers + .iter() + .position(|subscription_of_event| subscription_of_event.uuid == subscription_to_remove.uuid); + + if let Some(index) = index { + subscribers.remove(index); + None + } else { + Some(subscription_to_remove) + } + } + + pub async fn dispatch(&self, data: Arc) -> Result<(), Vec>> { + let mut errors = Vec::new(); + let mut subscribers_to_remove = Vec::new(); + + let mut subscribers = self.subscribers.lock().await; + for (index, subscriber) in subscribers.iter().enumerate() { + let data = Arc::clone(&data); + + let result = subscriber.dispatch(data).await; + if let Err(err) = result { + if subscriber.log_on_error { + log::error!( + "Event \"{}\" failed to dispatch data to subscriber {}: {}.", + self.name, + subscriber.name, + err + ); + } + + if subscriber.remove_on_error { + if subscriber.log_on_error { + log::error!("Subscriber will be unregistered from event."); + } + + subscribers_to_remove.push(index); + } + + errors.push(err); + } + } + + for index in subscribers_to_remove.into_iter().rev() { + subscribers.remove(index); + } + + if errors.is_empty() { + Ok(()) + } else { + Err(errors) + } + } +} + +impl PartialEq for Event +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Event where T: Send + Sync {} + +impl Debug for Event +where + T: Send + Sync + 'static, +{ + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct(type_name::()) + .field("uuid", &self.uuid) + .field("name", &self.name) + .field("subscribers", &self.subscribers.blocking_lock().len()) + .finish() + } +} diff --git a/src/event/event_repeater.rs b/src/event/event_repeater.rs new file mode 100644 index 0000000..d585df7 --- /dev/null +++ b/src/event/event_repeater.rs @@ -0,0 +1,154 @@ +use std::{collections::HashMap, sync::Arc}; +use thiserror::Error; +use tokio::{sync::Mutex, task::JoinHandle}; +use uuid::Uuid; + +use super::{Event, Subscription}; + +#[derive(Debug, Error)] +pub enum AttachError { + #[error("Tried to attach event {event_name} to EventRepeater {repeater_name} before it was initialized. Did you not use EventRepeater::new()?")] + NotInitialized { + event_name: String, + repeater_name: String, + }, + + #[error( + "Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached." + )] + AlreadyAttached { + event_name: String, + repeater_name: String, + }, +} + +#[derive(Debug, Error)] +pub enum DetachError { + #[error( + "Tried to detach event {event_name} from EventRepeater {repeater_name}, which was not attached." + )] + NotAttached { + event_name: String, + repeater_name: String, + }, +} + +#[derive(Error)] +pub enum CloseError +where + T: Send + Sync + 'static, +{ + #[error("EventRepeater still has attached events. Detach all events before closing.")] + AttachedEvents(EventRepeater), +} + +pub struct EventRepeater +where + T: Send + Sync + 'static, +{ + pub event: Event, + self_arc: Mutex>>, + subscriptions: Mutex)>>, +} + +impl EventRepeater +where + T: Send + Sync + 'static, +{ + pub async fn new(name: S) -> Arc + where + T: 'static, + S: Into, + { + let event = Event::new(name); + let event_repeater = Self { + self_arc: Mutex::new(None), + event, + subscriptions: Mutex::new(HashMap::new()), + }; + + let self_arc = Arc::new(event_repeater); + let mut lock = self_arc.self_arc.lock().await; + let self_arc_clone = Arc::clone(&self_arc); + *lock = Some(self_arc_clone); + drop(lock); + + self_arc + } + + pub async fn subscription_count(&self) -> usize { + self.subscriptions.lock().await.len() + } + + pub async fn attach(&self, event: &Event, buffer: usize) -> Result<(), AttachError> { + let self_arc = match self.self_arc.lock().await.as_ref() { + Some(arc) => Arc::clone(arc), + None => { + return Err(AttachError::NotInitialized { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }) + } + }; + + let mut subscriptions = self.subscriptions.lock().await; + if subscriptions.contains_key(&event.uuid) { + return Err(AttachError::AlreadyAttached { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }); + } + + let receiver_subscription = event + .subscribe_channel(&self.event.name, buffer, true, true) + .await; + + let subscription = receiver_subscription.subscription; + let mut receiver = receiver_subscription.receiver; + + let join_handle = tokio::spawn(async move { + while let Some(value) = receiver.recv().await { + let _ = self_arc.event.dispatch(value).await; + } + }); + subscriptions.insert(event.uuid, (subscription, join_handle)); + + Ok(()) + } + + pub async fn detach(&self, event: &Event) -> Result<(), DetachError> { + let mut subscriptions = self.subscriptions.lock().await; + + let subscription = match subscriptions.remove(&event.uuid) { + Some(subscription) => subscription, + None => { + return Err(DetachError::NotAttached { + event_name: event.name.clone(), + repeater_name: self.event.name.clone(), + }) + } + }; + subscription.1.abort(); + + Ok(()) + } + + pub async fn close(self) -> Result<(), CloseError> { + let subscription_count = self.subscription_count().await; + + if subscription_count > 0 { + return Err(CloseError::AttachedEvents(self)); + } + + Ok(()) + } +} + +impl AsRef> for EventRepeater +where + T: Send + Sync + 'static, +{ + fn as_ref(&self) -> &Event { + &self.event + } +} diff --git a/src/event/observable.rs b/src/event/observable.rs new file mode 100644 index 0000000..e3fd726 --- /dev/null +++ b/src/event/observable.rs @@ -0,0 +1,71 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use super::{DispatchError, Event}; + +#[derive(Debug)] +pub enum ObservableResult +where + T: Send + Sync + 'static, +{ + Unchanged, + Changed(Result<(), Vec>>), +} + +#[derive(Debug)] +pub struct Observable +where + T: Send + Sync + 'static + Clone + PartialEq, //TODO: Try out if we can remove Sync here +{ + value: Mutex, + on_change: Event, +} + +impl Observable +where + T: Send + Sync + 'static + Clone + PartialEq, +{ + pub fn new(value: T, event_name: I) -> Self + where + I: Into, + { + Self { + value: Mutex::new(value), + on_change: Event::new(event_name), + } + } + + pub async fn get(&self) -> T { + let lock = self.value.lock().await; + lock.clone() + } + + pub async fn set(&self, value: T) -> ObservableResult { + let mut lock = self.value.lock().await; + let current_value = lock.clone(); + + if current_value == value { + return ObservableResult::Unchanged; + } + + *lock = value.clone(); + + let value = Arc::new(value); + let dispatch_result = self.on_change.dispatch(value).await; + + match dispatch_result { + Ok(_) => ObservableResult::Changed(Ok(())), + Err(errors) => ObservableResult::Changed(Err(errors)), + } + } +} + +impl AsRef> for Observable +where + T: Send + Sync + 'static + Clone + PartialEq, +{ + fn as_ref(&self) -> &Event { + &self.on_change + } +} diff --git a/src/event/subscriber.rs b/src/event/subscriber.rs new file mode 100644 index 0000000..8fd8e51 --- /dev/null +++ b/src/event/subscriber.rs @@ -0,0 +1,80 @@ +use std::sync::Arc; + +use thiserror::Error; +use tokio::sync::mpsc::{error::SendError, Sender}; +use uuid::Uuid; + +use crate::service::{BoxedError, PinnedBoxedFutureResult}; + +pub enum Callback +where + T: Send + Sync + 'static, +{ + Channel(Sender>), + Closure(Box) -> Result<(), BoxedError> + Send + Sync>), + AsyncClosure(Box) -> PinnedBoxedFutureResult<()> + Send + Sync>), +} + +#[derive(Debug, Error)] +pub enum DispatchError +where + T: Send + Sync + 'static, +{ + #[error("Failed to send data to channel: {0}")] + ChannelSend(#[from] SendError>), + + #[error("Failed to dispatch data to closure: {0}")] + Closure(BoxedError), + + #[error("Failed to dispatch data to async closure: {0}")] + AsyncClosure(BoxedError), +} + +pub struct Subscriber +where + T: Send + Sync + 'static, +{ + pub name: String, + pub log_on_error: bool, + pub remove_on_error: bool, + pub callback: Callback, + + pub uuid: Uuid, +} + +impl Subscriber +where + T: Send + Sync + 'static, +{ + pub fn new(name: S, log_on_error: bool, remove_on_error: bool, callback: Callback) -> Self + where + S: Into, + { + Self { + name: name.into(), + log_on_error, + remove_on_error, + callback, + uuid: Uuid::new_v4(), + } + } + + pub async fn dispatch(&self, data: Arc) -> Result<(), DispatchError> { + match &self.callback { + Callback::Channel(sender) => sender.send(data).await.map_err(DispatchError::ChannelSend), + Callback::Closure(closure) => closure(data).map_err(DispatchError::Closure), + Callback::AsyncClosure(closure) => closure(data).await.map_err(DispatchError::AsyncClosure), + } + } +} + +impl PartialEq for Subscriber +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.uuid == other.uuid + } +} + +impl Eq for Subscriber where T: Send + Sync {} diff --git a/src/event/subscription.rs b/src/event/subscription.rs new file mode 100644 index 0000000..e10f4dc --- /dev/null +++ b/src/event/subscription.rs @@ -0,0 +1,71 @@ +use tokio::sync::mpsc::Receiver; +use uuid::Uuid; + +use super::Subscriber; + +#[derive(Debug, PartialEq, Eq)] +pub struct Subscription { + pub uuid: Uuid, +} + +impl From> for Subscription +where + T: Send + Sync + 'static, +{ + fn from(subscriber: Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +impl From<&Subscriber> for Subscription +where + T: Send + Sync + 'static, +{ + fn from(subscriber: &Subscriber) -> Self { + Self { + uuid: subscriber.uuid, + } + } +} + +pub struct ReceiverSubscription +where + T: Send + Sync + 'static, +{ + pub subscription: Subscription, + pub receiver: Receiver, +} + +impl ReceiverSubscription +where + T: Send + Sync + 'static, +{ + pub fn new(subscription: Subscription, receiver: Receiver) -> Self { + Self { + subscription, + receiver, + } + } +} + +impl PartialEq for ReceiverSubscription +where + T: Send + Sync + 'static, +{ + fn eq(&self, other: &Self) -> bool { + self.subscription == other.subscription + } +} + +impl Eq for ReceiverSubscription where T: Send + Sync {} + +impl AsRef for ReceiverSubscription +where + T: Send + Sync + 'static, +{ + fn as_ref(&self) -> &Subscription { + &self.subscription + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..dc2f19b --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,67 @@ +use crate::service::OverallStatus; +use ::log::{error, info}; +use bot::Bot; +use std::time::SystemTime; + +pub mod bot; +pub mod config; +pub mod event; +pub mod log; +pub mod service; +pub mod setlock; + +pub fn is_debug() -> bool { + cfg!(debug_assertions) +} + +pub async fn run(mut bot: Bot) { + if !log::is_set_up() { + eprintln!("Logger has not been set up!\n{} will exit.", bot.name); + return; + } + + let now = SystemTime::now(); + bot.start().await; + match now.elapsed() { + Ok(elapsed) => info!("Startup took {}ms", elapsed.as_millis()), + Err(error) => { + error!( + "Error getting elapsed startup time: {}\n{} will exit.", + error, bot.name + ); + return; + } + }; + + if bot.service_manager.overall_status().await != OverallStatus::Healthy { + let status_overview = bot.service_manager.status_overview().await; + + error!("{} is not healthy! Some essential services did not start up successfully. {} will now exit ungracefully.\n\n{}", + bot.name, + bot.name, + status_overview); + return; + } + + info!("{} is alive", bot.name,); + + //TODO: Add CLI commands + + let exit_reason = bot.join().await; + match exit_reason { + bot::ExitReason::SIGINT => info!( + "{} received a SIGINT signal! Attempting to shut down gracefully.", + bot.name + ), + bot::ExitReason::EssentialServiceFailed => { + let status_overview = bot.service_manager.status_overview().await; + error!( + "An essential service failed! Attempting to shut down gracefully.\n{}", + status_overview + ); + } + } + + bot.stop().await; + info!("Oyasumi 💤"); +} diff --git a/src/log.rs b/src/log.rs new file mode 100644 index 0000000..55c46ba --- /dev/null +++ b/src/log.rs @@ -0,0 +1,55 @@ +use fern::colors::{Color, ColoredLevelConfig}; +use log::{LevelFilter, SetLoggerError}; +use std::{ + io, + sync::atomic::{AtomicBool, Ordering}, + time::SystemTime, +}; + +use crate::is_debug; + +static IS_LOGGER_SET_UP: AtomicBool = AtomicBool::new(false); + +pub fn is_set_up() -> bool { + IS_LOGGER_SET_UP.load(Ordering::Relaxed) +} + +pub fn setup() -> Result<(), SetLoggerError> { + let colors = ColoredLevelConfig::new() + .info(Color::Green) + .debug(Color::Magenta) + .warn(Color::Yellow) + .error(Color::Red) + .trace(Color::Cyan); + + fern::Dispatch::new() + .format(move |out, message, record| { + out.finish(format_args!( + "[{} {: <30} {: <5}] {}", + humantime::format_rfc3339_seconds(SystemTime::now()), + record.target(), + colors.color(record.level()), + message + )) + }) + .level(get_min_log_level()) + .level_for("serenity", LevelFilter::Warn) + .level_for("hyper", LevelFilter::Warn) + .level_for("tracing", LevelFilter::Warn) + .level_for("reqwest", LevelFilter::Warn) + .level_for("tungstenite", LevelFilter::Warn) + .chain(io::stdout()) + .apply()?; + + IS_LOGGER_SET_UP.store(true, Ordering::Relaxed); + + Ok(()) +} + +fn get_min_log_level() -> LevelFilter { + if is_debug() { + LevelFilter::Debug + } else { + LevelFilter::Info + } +} diff --git a/src/main.rs b/src/main.rs index e7a11a9..9cc1d1c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,53 @@ -fn main() { - println!("Hello, world!"); +use std::sync::Arc; + +use ::log::{error, warn}; +use lum::{ + bot::Bot, + config::{Config, ConfigHandler}, + log, + service::{discord::DiscordService, Service}, +}; +use tokio::sync::Mutex; + +const BOT_NAME: &str = "Lum"; + +#[tokio::main] +async fn main() { + setup_logger(); + + if lum::is_debug() { + warn!("THIS IS A DEBUG RELEASE!"); + } + + let config_handler = ConfigHandler::new(BOT_NAME.to_lowercase().as_str()); + let config = match config_handler.load_config() { + Ok(config) => config, + Err(err) => { + error!("Error reading config file: {}\n{} will exit.", err, BOT_NAME); + return; + } + }; + + let bot = Bot::builder(BOT_NAME) + .with_services(initialize_services(&config)) + .await + .build() + .await; + + lum::run(bot).await; +} + +fn setup_logger() { + if let Err(error) = log::setup() { + panic!("Error setting up the Logger: {}\n{} will exit.", error, BOT_NAME); + } +} + +fn initialize_services(config: &Config) -> Vec>> { + //TODO: Add services + //... + + let discord_service = DiscordService::new(config.discord_token.as_str()); + + vec![Arc::new(Mutex::new(discord_service))] } diff --git a/src/service.rs b/src/service.rs new file mode 100644 index 0000000..e835fc2 --- /dev/null +++ b/src/service.rs @@ -0,0 +1,15 @@ +pub mod discord; +// TODO: Used for downcast_rs. Maybe this can be removed when updating the crate. +#[allow(clippy::multiple_bound_locations)] +pub mod service; // Will be fixed when lum gets seperated into multiple workspaces +pub mod service_manager; +pub mod types; +pub mod watchdog; + +pub use service::{Service, ServiceInfo}; +pub use service_manager::{ServiceManager, ServiceManagerBuilder}; +pub use types::{ + BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, + Priority, ShutdownError, StartupError, Status, +}; +pub use watchdog::Watchdog; diff --git a/src/service/discord.rs b/src/service/discord.rs new file mode 100644 index 0000000..47ec3d2 --- /dev/null +++ b/src/service/discord.rs @@ -0,0 +1,161 @@ +use crate::setlock::SetLock; + +use super::{types::LifetimedPinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; +use log::{error, info}; +use serenity::{ + all::{GatewayIntents, Ready}, + async_trait, + client::{self, Cache, Context}, + framework::{standard::Configuration, StandardFramework}, + gateway::{ShardManager, VoiceGatewayManager}, + http::Http, + prelude::TypeMap, + Client, Error, +}; +use std::{sync::Arc, time::Duration}; +use tokio::{ + select, spawn, + sync::{Mutex, Notify, RwLock}, + task::JoinHandle, + time::sleep, +}; + +//TODO: Restructure +pub struct DiscordService { + info: ServiceInfo, + discord_token: String, + pub ready: Arc>>, + client_handle: Option>>, + pub cache: SetLock>, + pub data: SetLock>>, + pub http: SetLock>, + pub shard_manager: SetLock>, + pub voice_manager: SetLock>, + pub ws_url: SetLock>>, +} + +impl DiscordService { + pub fn new(discord_token: &str) -> Self { + Self { + info: ServiceInfo::new("lum_builtin_discord", "Discord", Priority::Essential), + discord_token: discord_token.to_string(), + ready: Arc::new(Mutex::new(SetLock::new())), + client_handle: None, + cache: SetLock::new(), + data: SetLock::new(), + http: SetLock::new(), + shard_manager: SetLock::new(), + voice_manager: SetLock::new(), + ws_url: SetLock::new(), + } + } +} + +impl Service for DiscordService { + fn info(&self) -> &ServiceInfo { + &self.info + } + + fn start(&mut self, _service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()> { + Box::pin(async move { + let client_ready_notify = Arc::new(Notify::new()); + + let framework = StandardFramework::new(); + framework.configure(Configuration::new().prefix("!")); + + let mut client = Client::builder(self.discord_token.as_str(), GatewayIntents::all()) + .framework(framework) + .event_handler(EventHandler::new( + Arc::clone(&self.ready), + Arc::clone(&client_ready_notify), + )) + .await?; + + if let Err(error) = self.cache.set(Arc::clone(&client.cache)) { + return Err(format!("Failed to set cache SetLock: {}", error).into()); + } + + if let Err(error) = self.data.set(Arc::clone(&client.data)) { + return Err(format!("Failed to set data SetLock: {}", error).into()); + } + + if let Err(error) = self.http.set(Arc::clone(&client.http)) { + return Err(format!("Failed to set http SetLock: {}", error).into()); + } + + if let Err(error) = self.shard_manager.set(Arc::clone(&client.shard_manager)) { + return Err(format!("Failed to set shard_manager SetLock: {}", error).into()); + } + + if let Some(voice_manager) = &client.voice_manager { + if let Err(error) = self.voice_manager.set(Arc::clone(voice_manager)) { + return Err(format!("Failed to set voice_manager SetLock: {}", error).into()); + } + } + + if let Err(error) = self.ws_url.set(Arc::clone(&client.ws_url)) { + return Err(format!("Failed to set ws_url SetLock: {}", error).into()); + } + + let client_handle = spawn(async move { client.start().await }); + + select! { + _ = client_ready_notify.notified() => {}, + _ = sleep(Duration::from_secs(2)) => {}, + } + + if client_handle.is_finished() { + client_handle.await??; + return Err("Discord client stopped unexpectedly".into()); + } + + self.client_handle = Some(client_handle); + Ok(()) + }) + } + + fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()> { + Box::pin(async move { + if let Some(client_handle) = self.client_handle.take() { + info!("Waiting for Discord client to stop..."); + + client_handle.abort(); // Should trigger a JoinError in the client_handle, if the task hasn't already ended + + // If the thread ended WITHOUT a JoinError, the client already stopped unexpectedly + let result = async move { + match client_handle.await { + Ok(result) => result, + Err(_) => Ok(()), + } + } + .await; + result?; + } + + Ok(()) + }) + } +} + +struct EventHandler { + client: Arc>>, + ready_notify: Arc, +} + +impl EventHandler { + pub fn new(client: Arc>>, ready_notify: Arc) -> Self { + Self { client, ready_notify } + } +} + +#[async_trait] +impl client::EventHandler for EventHandler { + async fn ready(&self, _ctx: Context, data_about_bot: Ready) { + info!("Connected to Discord as {}", data_about_bot.user.tag()); + if let Err(error) = self.client.lock().await.set(data_about_bot) { + error!("Failed to set client SetLock: {}", error); + panic!("Failed to set client SetLock: {}", error); + } + self.ready_notify.notify_one(); + } +} diff --git a/src/service/service.rs b/src/service/service.rs new file mode 100644 index 0000000..4c0c83a --- /dev/null +++ b/src/service/service.rs @@ -0,0 +1,101 @@ +use std::{ + cmp::Ordering, + hash::{Hash, Hasher}, + sync::Arc, +}; + +use downcast_rs::{impl_downcast, DowncastSync}; + +use crate::event::Observable; + +use super::{ + service_manager::ServiceManager, + types::{LifetimedPinnedBoxedFuture, LifetimedPinnedBoxedFutureResult, Priority, Status}, +}; + +#[derive(Debug)] +pub struct ServiceInfo { + pub id: String, + pub name: String, + pub priority: Priority, + + pub status: Observable, +} + +impl ServiceInfo { + pub fn new(id: &str, name: &str, priority: Priority) -> Self { + Self { + id: id.to_string(), + name: name.to_string(), + priority, + status: Observable::new(Status::Stopped, format!("{}_status_change", id)), + } + } +} + +impl PartialEq for ServiceInfo { + fn eq(&self, other: &Self) -> bool { + self.id == other.id + } +} + +impl Eq for ServiceInfo {} + +impl Ord for ServiceInfo { + fn cmp(&self, other: &Self) -> Ordering { + self.name.cmp(&other.name) + } +} + +impl PartialOrd for ServiceInfo { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Hash for ServiceInfo { + fn hash(&self, state: &mut H) { + self.id.hash(state); + } +} +//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult +pub trait Service: DowncastSync { + fn info(&self) -> &ServiceInfo; + fn start(&mut self, service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + fn task<'a>(&self) -> Option> { + None + } + + fn is_available(&self) -> LifetimedPinnedBoxedFuture<'_, bool> { + Box::pin(async move { matches!(self.info().status.get().await, Status::Started) }) + } +} + +impl_downcast!(sync Service); + +impl Eq for dyn Service {} + +impl PartialEq for dyn Service { + fn eq(&self, other: &Self) -> bool { + self.info() == other.info() + } +} + +impl Ord for dyn Service { + fn cmp(&self, other: &Self) -> Ordering { + self.info().cmp(other.info()) + } +} + +impl PartialOrd for dyn Service { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Hash for dyn Service { + fn hash(&self, state: &mut H) { + self.info().hash(state); + } +} diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs new file mode 100644 index 0000000..6d25e44 --- /dev/null +++ b/src/service/service_manager.rs @@ -0,0 +1,495 @@ +use super::{ + service::Service, + types::{LifetimedPinnedBoxedFuture, OverallStatus, Priority, ShutdownError, StartupError, Status}, +}; +use crate::{ + event::EventRepeater, service::Watchdog, setlock::{SetLock, SetLockError} +}; +use log::{error, info, warn}; +use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration}; +use tokio::{ + spawn, + sync::{Mutex, MutexGuard}, + task::JoinHandle, + time::timeout, +}; + +#[derive(Default)] +pub struct ServiceManagerBuilder { + services: Vec>>, +} + +impl ServiceManagerBuilder { +pub fn new() -> Self { + Self { services: Vec::new() } + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub async fn with_service(mut self, service: Arc>) -> Self { + let lock = service.lock().await; + + let mut found = false; + for registered_service in self.services.iter() { + let registered_service = registered_service.lock().await; + + if registered_service.info().id == lock.info().id { + found = true; + } + } + + if found { + warn!( + "Tried to add service {} ({}), but a service with that ID already exists. Ignoring.", + lock.info().name, + lock.info().id + ); + return self; + } + + drop(lock); + + self.services.push(service); + self + } + + pub async fn build(self) -> Arc { + let service_manager = ServiceManager { + arc: Mutex::new(SetLock::new()), + services: self.services, + background_tasks: Mutex::new(HashMap::new()), + on_status_change: EventRepeater::new("service_manager_on_status_change").await, + }; + + let self_arc = Arc::new(service_manager); + let self_arc_clone = Arc::clone(&self_arc); + + let result = self_arc_clone.arc.lock().await.set(Arc::clone(&self_arc_clone)); + + if let Err(err) = result { + match err { + SetLockError::AlreadySet => { + unreachable!("Unable to set ServiceManager's self-arc in ServiceManagerBuilder because it was already set. This should never happen. How did you...?"); + } + } + } + + self_arc + } +} + +pub struct ServiceManager { + arc: Mutex>>, + background_tasks: Mutex>>, + + pub services: Vec>>, + pub on_status_change: Arc>, +} + +impl ServiceManager { + pub fn builder() -> ServiceManagerBuilder { + ServiceManagerBuilder::new() + } + + pub async fn manages_service(&self, service_id: &str) -> bool + { + for service in self.services.iter() { + let service_lock = service.lock().await; + + if service_lock.info().id == service_id { + return true; + } + } + + false + } + + pub async fn start_service(&self, service: Arc>) -> Result<(), StartupError> { + let service_id = service.lock().await.info().id.clone(); + if !self.manages_service(&service_id).await { + return Err(StartupError::ServiceNotManaged(service_id.clone())); + } + + let mut service_lock = service.lock().await; + + let status = service_lock.info().status.get().await; + if !matches!(status, Status::Stopped) { + return Err(StartupError::ServiceNotStopped(service_id.clone())); + } + + if self.has_background_task_registered(&service_id).await { + return Err(StartupError::BackgroundTaskAlreadyRunning(service_id.clone())); + } + + let service_status_event = service_lock.info().status.as_ref(); + let attachment_result = self.on_status_change.attach(service_status_event, 2).await; + if let Err(err) = attachment_result { + return Err(StartupError::StatusAttachmentFailed(service_id.clone(), err)); + } + + service_lock.info().status.set(Status::Starting).await; + self.init_service(&mut service_lock).await?; + self.start_background_task(&service_lock, Arc::clone(&service)) + .await; + + info!("Started service {}", service_lock.info().name); + + Ok(()) + } + + //TODO: Clean up + pub async fn stop_service(&self, service: Arc>) -> Result<(), ShutdownError> { + let service_id = service.lock().await.info().id.clone(); + if !(self.manages_service(&service_id).await) { + return Err(ShutdownError::ServiceNotManaged(service_id.clone())); + } + + let mut service_lock = service.lock().await; + + let status = service_lock.info().status.get().await; + if !matches!(status, Status::Started) { + return Err(ShutdownError::ServiceNotStarted(service_id.clone())); + } + + self.stop_background_task(&service_lock).await; + + service_lock.info().status.set(Status::Stopping).await; + + self.shutdown_service(&mut service_lock).await?; + + let service_status_event = service_lock.info().status.as_ref(); + let detach_result = self.on_status_change.detach(service_status_event).await; + if let Err(err) = detach_result { + return Err(ShutdownError::StatusDetachmentFailed(service_id.clone(), err)); + } + + info!("Stopped service {}", service_lock.info().name); + + Ok(()) + } + + pub async fn start_services(&self) -> Vec> { + let mut results = Vec::new(); + + for service in &self.services { + let service_arc_clone = Arc::clone(service); + let result = self.start_service(service_arc_clone).await; + + results.push(result); + } + + results + } + + pub async fn stop_services(&self) -> Vec> { + let mut results = Vec::new(); + + for service in &self.services { + let service_arc_clone = Arc::clone(service); + let result = self.stop_service(service_arc_clone).await; + + results.push(result); + } + + results + } + + pub async fn get_service(&self) -> Option>> + where + T: Service, + { + for service in self.services.iter() { + let lock = service.lock().await; + let is_t = lock.as_any().is::(); + + if is_t { + let arc_clone = Arc::clone(service); + let service_ptr: *const Arc> = &arc_clone; + + /* + I tried to do this in safe rust for 3 days, but I couldn't figure it out + Should you come up with a way to do this in safe rust, please make a PR! :) + Anyways, this should never cause any issues, since we checked if the service is of type T + */ + unsafe { + let t_ptr: *const Arc> = mem::transmute(service_ptr); + return Some(Arc::clone(&*t_ptr)); + } + } + } + + None + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub fn overall_status(&self) -> LifetimedPinnedBoxedFuture<'_, OverallStatus> { + Box::pin(async move { + for service in self.services.iter() { + let service = service.lock().await; + + if service.info().priority != Priority::Essential { + continue; + } + + let status = service.info().status.get().await; + if status != Status::Started { + return OverallStatus::Unhealthy; + } + } + + OverallStatus::Healthy + }) + } + + //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop + pub fn status_overview(&self) -> LifetimedPinnedBoxedFuture<'_, String> { + Box::pin(async move { + let mut text_buffer = String::new(); + + let mut failed_essentials = Vec::new(); + let mut failed_optionals = Vec::new(); + let mut non_failed_essentials = Vec::new(); + let mut non_failed_optionals = Vec::new(); + let mut others = Vec::new(); + + for service in self.services.iter() { + let service = service.lock().await; + let info = service.info(); + let priority = &info.priority; + let status = info.status.get().await; + + match status { + Status::Started | Status::Stopped => match priority { + Priority::Essential => { + non_failed_essentials.push(format!(" - {}: {}", info.name, status)); + } + Priority::Optional => { + non_failed_optionals.push(format!(" - {}: {}", info.name, status)); + } + }, + Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { + match priority { + Priority::Essential => { + failed_essentials.push(format!(" - {}: {}", info.name, status)); + } + Priority::Optional => { + failed_optionals.push(format!(" - {}: {}", info.name, status)); + } + } + } + _ => { + others.push(format!(" - {}: {}", info.name, status)); + } + } + } + + if !failed_essentials.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Failed essential services")); + text_buffer.push_str(failed_essentials.join("\n").as_str()); + } + + if !failed_optionals.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Failed optional services")); + text_buffer.push_str(failed_optionals.join("\n").as_str()); + } + + if !non_failed_essentials.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Essential services")); + text_buffer.push_str(non_failed_essentials.join("\n").as_str()); + } + + if !non_failed_optionals.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Optional services")); + text_buffer.push_str(non_failed_optionals.join("\n").as_str()); + } + + if !others.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Other services")); + text_buffer.push_str(others.join("\n").as_str()); + } + + let longest_width = text_buffer + .lines() + .map(|line| line.len()) + .max() + .unwrap_or(0); + + let mut headline = String::from("Status overview\n"); + headline.push_str("─".repeat(longest_width).as_str()); + headline.push('\n'); + text_buffer.insert_str(0, &headline); + + text_buffer + }) + } + + async fn init_service( + &self, + service: &mut MutexGuard<'_, dyn Service>, + ) -> Result<(), StartupError> { + let service_manager = Arc::clone(self.arc.lock().await.unwrap()); + + //TODO: Add to config instead of hardcoding duration + let start = service.start(service_manager); + let timeout_result = timeout(Duration::from_secs(10), start).await; + + match timeout_result { + Ok(start_result) => match start_result { + Ok(()) => { + service.info().status.set(Status::Started).await; + } + Err(error) => { + service + .info() + .status + .set(Status::FailedToStart(error.to_string())) + .await; + return Err(StartupError::FailedToStartService(service.info().id.clone())); + } + }, + Err(error) => { + service + .info() + .status + .set(Status::FailedToStart(error.to_string())) + .await; + return Err(StartupError::FailedToStartService(service.info().id.clone())); + } + } + + Ok(()) + } + + async fn shutdown_service( + &self, + service: &mut MutexGuard<'_, dyn Service>, + ) -> Result<(), ShutdownError> { + //TODO: Add to config instead of hardcoding duration + let stop = service.stop(); + let timeout_result = timeout(Duration::from_secs(10), stop).await; + + match timeout_result { + Ok(stop_result) => match stop_result { + Ok(()) => { + service.info().status.set(Status::Stopped).await; + } + Err(error) => { + service + .info() + .status + .set(Status::FailedToStop(error.to_string())) + .await; + return Err(ShutdownError::FailedToStopService(service.info().id.clone())); + } + }, + Err(error) => { + service + .info() + .status + .set(Status::FailedToStop(error.to_string())) + .await; + return Err(ShutdownError::FailedToStopService(service.info().id.clone())); + } + } + + Ok(()) + } + + async fn has_background_task_registered(&self, service_id: &str) -> bool { + let tasks = self.background_tasks.lock().await; + tasks.contains_key(service_id) + } + + async fn start_background_task( + &self, + service_lock: &MutexGuard<'_, dyn Service>, + service: Arc>, + ) { + if self.has_background_task_registered(&service_lock.info().id).await { + return; + } + + let task = service_lock.task(); + if let Some(task) = task { + let mut watchdog = Watchdog::new(task); + + watchdog.append(|result| async move { + /* + We technically only need a read lock here, but we want to immediately stop + other services from accessing the service, so we acquire a write lock instead. + */ + let service = service.lock().await; + + match result { + Ok(()) => { + error!( + "Background task of service {} ended unexpectedly! Service will be marked as failed.", + service.info().name + ); + + service + .info() + .status + .set(Status::RuntimeError("Background task ended unexpectedly!".to_string())) + .await; + } + + Err(error) => { + error!( + "Background task of service {} ended with error: {}. Service will be marked as failed.", + service.info().name, + error + ); + + service + .info() + .status + .set(Status::RuntimeError( + format!("Background task ended with error: {}", error), + )) + .await; + } + } + Ok(()) + }); + + let join_handle = spawn(watchdog.run()); + + self.background_tasks + .lock() + .await + .insert(service_lock.info().id.clone(), join_handle); + } + } + + async fn stop_background_task(&self, service_lock: &MutexGuard<'_, dyn Service>) { + if !self.has_background_task_registered(&service_lock.info().id).await { + return; + } + + let mut tasks_lock = self.background_tasks.lock().await; + let task = tasks_lock.get(&service_lock.info().id).unwrap(); + task.abort(); + tasks_lock.remove(&service_lock.info().id); + } +} + +impl Display for ServiceManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "Services: ")?; + + if self.services.is_empty() { + write!(f, "None")?; + return Ok(()); + } + + let mut services = self.services.iter().peekable(); + while let Some(service) = services.next() { + let service = service.blocking_lock(); + write!(f, "{} ({})", service.info().name, service.info().id)?; + if services.peek().is_some() { + write!(f, ", ")?; + } + } + Ok(()) + } +} diff --git a/src/service/types.rs b/src/service/types.rs new file mode 100644 index 0000000..6d388de --- /dev/null +++ b/src/service/types.rs @@ -0,0 +1,125 @@ +use std::{error::Error, fmt::Display, future::Future, pin::Pin}; + +use thiserror::Error; + +use crate::event::event_repeater::{AttachError, DetachError}; + +pub type BoxedError = Box; + +pub type BoxedFuture = Box + Send>; +pub type BoxedFutureResult = BoxedFuture>; + +pub type PinnedBoxedFuture = Pin + Send>>; +pub type PinnedBoxedFutureResult = PinnedBoxedFuture>; + +pub type LifetimedPinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type LifetimedPinnedBoxedFutureResult<'a, T> = LifetimedPinnedBoxedFuture<'a, Result>; + +#[derive(Debug, Clone)] +pub enum Status { + Started, + Stopped, + Starting, + Stopping, + FailedToStart(String), + FailedToStop(String), + RuntimeError(String), +} + +impl Display for Status { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Status::Started => write!(f, "Started"), + Status::Stopped => write!(f, "Stopped"), + Status::Starting => write!(f, "Starting"), + Status::Stopping => write!(f, "Stopping"), + Status::FailedToStart(error) => write!(f, "Failed to start: {}", error), + Status::FailedToStop(error) => write!(f, "Failed to stop: {}", error), + Status::RuntimeError(error) => write!(f, "Runtime error: {}", error), + } + } +} + +impl PartialEq for Status { + fn eq(&self, other: &Self) -> bool { + matches!( + (self, other), + (Status::Started, Status::Started) + | (Status::Stopped, Status::Stopped) + | (Status::Starting, Status::Starting) + | (Status::Stopping, Status::Stopping) + | (Status::FailedToStart(_), Status::FailedToStart(_)) + | (Status::FailedToStop(_), Status::FailedToStop(_)) + | (Status::RuntimeError(_), Status::RuntimeError(_)) + ) + } +} + +impl Eq for Status {} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum OverallStatus { + Healthy, + Unhealthy, +} + +impl Display for OverallStatus { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + OverallStatus::Healthy => write!(f, "Healthy"), + OverallStatus::Unhealthy => write!(f, "Unhealthy"), + } + } +} + +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +pub enum Priority { + Essential, + Optional, +} + +impl Display for Priority { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Priority::Essential => write!(f, "Essential"), + Priority::Optional => write!(f, "Optional"), + } + } +} + +#[derive(Debug, Error)] +pub enum StartupError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + + #[error("Service {0} is not stopped")] + ServiceNotStopped(String), + + #[error("Service {0} already has a background task running")] + BackgroundTaskAlreadyRunning(String), + + #[error( + "Failed to attach Service Manager's status_change EventRepeater to {0}'s status_change Event: {1}" + )] + StatusAttachmentFailed(String, AttachError), + + #[error("Service {0} failed to start")] + FailedToStartService(String), +} + +#[derive(Debug, Error)] +pub enum ShutdownError { + #[error("Service {0} is not managed by this Service Manager")] + ServiceNotManaged(String), + + #[error("Service {0} is not started")] + ServiceNotStarted(String), + + #[error("Service {0} failed to stop")] + FailedToStopService(String), + + #[error( + "Failed to detach Service Manager's status_change EventRepeater from {0}'s status_change Event: {1}" + )] + StatusDetachmentFailed(String, DetachError), +} diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs new file mode 100644 index 0000000..2eb53a9 --- /dev/null +++ b/src/service/watchdog.rs @@ -0,0 +1,59 @@ +use super::types::LifetimedPinnedBoxedFuture; +use log::error; +use serenity::FutureExt; +use std::{future::Future, mem::replace, sync::Arc}; +use tokio::sync::{ + mpsc::{channel, Receiver, Sender}, + Mutex, +}; + +//TODO: Rename to TaskChain and use Event instead of manual subscriber handling +pub struct Watchdog<'a, T: Send> { + task: LifetimedPinnedBoxedFuture<'a, T>, + subscribers: Arc>>>>, +} + +impl<'a, T: 'a + Send> Watchdog<'a, T> { + pub fn new(task: LifetimedPinnedBoxedFuture<'a, T>) -> Self { + Self { + task, + subscribers: Arc::new(Mutex::new(Vec::new())), + } + } + + pub fn append(&mut self, task: FN) + where + FN: FnOnce(T) -> FUT + Send + 'a, + FUT: Future + Send + 'a, + { + let previous_task = replace( + &mut self.task, + Box::pin(async { unreachable!("Undefined watchdog task") }), + ); + + let task = previous_task.then(task); + + self.task = Box::pin(task); + } + + pub async fn subscribe(&self) -> Receiver> { + let (tx, rx) = channel(1); + self.subscribers.lock().await.push(tx); + rx + } + + pub async fn run(self) { + let result = self.task.await; + let result = Arc::new(result); + for subscriber in self.subscribers.lock().await.iter() { + let send_result = subscriber.send(Arc::clone(&result)).await; + + if let Err(e) = send_result { + error!( + "Failed to send a watchdog task result to one of its subscribers: {}", + e + ); + } + } + } +} diff --git a/src/setlock.rs b/src/setlock.rs new file mode 100644 index 0000000..7bee17c --- /dev/null +++ b/src/setlock.rs @@ -0,0 +1,70 @@ +use std::{error::Error, fmt::Display}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub enum SetLockError { + AlreadySet, +} + +impl Display for SetLockError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + SetLockError::AlreadySet => write!(f, "AlreadySet"), + } + } +} + +impl Error for SetLockError {} + +pub struct SetLock { + data: Option, +} + +impl SetLock { + pub fn new() -> Self { + Self { data: None } + } + + pub fn set(&mut self, data: T) -> Result<(), SetLockError> { + if self.data.is_some() { + return Err(SetLockError::AlreadySet); + } + + self.data = Some(data); + + Ok(()) + } + + pub fn is_set(&self) -> bool { + self.data.is_some() + } + + pub fn unwrap(&self) -> &T { + if self.data.is_none() { + panic!("unwrap called on an unset SetLock"); + } + + self.data.as_ref().unwrap() + } + + pub fn unwrap_mut(&mut self) -> &mut T { + if self.data.is_none() { + panic!("unwrap_mut called on an unset SetLock"); + } + + self.data.as_mut().unwrap() + } + + pub fn get(&self) -> Option<&T> { + self.data.as_ref() + } + + pub fn get_mut(&mut self) -> Option<&mut T> { + self.data.as_mut() + } +} + +impl Default for SetLock { + fn default() -> Self { + Self::new() + } +}