From 66ded9592918c6ba6034a175cf01d16ddf182061 Mon Sep 17 00:00:00 2001 From: Torben Schweren Date: Sat, 5 Oct 2024 23:12:11 +0200 Subject: [PATCH] refactor: use async_trait where possible --- Cargo.lock | 1 + Cargo.toml | 3 +- src/bot.rs | 22 ++--- src/service.rs | 6 +- src/service/discord.rs | 135 ++++++++++++++-------------- src/service/service.rs | 15 ++-- src/service/service_manager.rs | 156 ++++++++++++++++----------------- src/service/types.rs | 20 +++-- src/service/watchdog.rs | 7 +- 9 files changed, 180 insertions(+), 185 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e6b5872..2f0e3c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1182,6 +1182,7 @@ dependencies = [ name = "lum" version = "0.2.1" dependencies = [ + "async-trait", "dirs", "downcast-rs", "fern", diff --git a/Cargo.toml b/Cargo.toml index 6f9421f..ea753bc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ repository = "https://github.com/Kitt3120/lum" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.83" dirs = "5.0.1" downcast-rs = "1.2.0" fern = { version = "0.6.2", features = ["chrono", "colored", "date-based"] } @@ -23,4 +24,4 @@ serenity = { version = "0.12.0", default-features=false, features = ["builder", sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] } thiserror = "1.0.52" tokio = { version = "1.35.1", features = ["full"] } -uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } \ No newline at end of file +uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] } diff --git a/src/bot.rs b/src/bot.rs index 37dce6e..7f693ab 100644 --- a/src/bot.rs +++ b/src/bot.rs @@ -4,9 +4,7 @@ use std::{fmt::Display, sync::Arc}; use log::error; use tokio::{signal, sync::Mutex}; -use crate::service::{ - types::LifetimedPinnedBoxedFuture, OverallStatus, Service, ServiceManager, ServiceManagerBuilder, -}; +use crate::service::{OverallStatus, Service, ServiceManager, ServiceManagerBuilder}; #[derive(Debug, Clone, Copy)] pub enum ExitReason { @@ -68,20 +66,14 @@ impl Bot { BotBuilder::new(name) } - //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future - pub fn start(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { - Box::pin(async move { - self.service_manager.start_services().await; - //TODO: Potential for further initialization here, like modules - }) + pub async fn start(&mut self) { + self.service_manager.start_services().await; + //TODO: Potential for further initialization here, like modules } - //TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future - pub fn stop(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> { - Box::pin(async move { - self.service_manager.stop_services().await; - //TODO: Potential for further deinitialization here, like modules - }) + pub async fn stop(&mut self) { + self.service_manager.stop_services().await; + //TODO: Potential for further deinitialization here, like modules } pub async fn join(&self) -> ExitReason { diff --git a/src/service.rs b/src/service.rs index e835fc2..b45e5b7 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,6 +1,4 @@ pub mod discord; -// TODO: Used for downcast_rs. Maybe this can be removed when updating the crate. -#[allow(clippy::multiple_bound_locations)] pub mod service; // Will be fixed when lum gets seperated into multiple workspaces pub mod service_manager; pub mod types; @@ -9,7 +7,7 @@ pub mod watchdog; pub use service::{Service, ServiceInfo}; pub use service_manager::{ServiceManager, ServiceManagerBuilder}; pub use types::{ - BoxedError, BoxedFuture, BoxedFutureResult, OverallStatus, PinnedBoxedFuture, PinnedBoxedFutureResult, - Priority, ShutdownError, StartupError, Status, + BoxedError, LifetimedPinnedBoxedFuture, LifetimedPinnedBoxedFutureResult, OverallStatus, + PinnedBoxedFuture, PinnedBoxedFutureResult, Priority, ShutdownError, StartupError, Status, }; pub use watchdog::Watchdog; diff --git a/src/service/discord.rs b/src/service/discord.rs index 5825363..5fea4a6 100644 --- a/src/service/discord.rs +++ b/src/service/discord.rs @@ -1,4 +1,4 @@ -use super::{types::LifetimedPinnedBoxedFutureResult, Priority, Service, ServiceInfo, ServiceManager}; +use super::{BoxedError, Priority, Service, ServiceInfo, ServiceManager}; use log::{error, info, warn}; use serenity::{ all::{GatewayIntents, Ready}, @@ -52,97 +52,97 @@ impl DiscordService { } } +//TODO: When Rust allows async trait methods to be object-safe, refactor this to not use async_trait anymore +#[async_trait] impl Service for DiscordService { fn info(&self) -> &ServiceInfo { &self.info } - fn start(&mut self, _service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()> { - Box::pin(async move { - let client_ready_notify = Arc::new(Notify::new()); + async fn start(&mut self, _service_manager: Arc) -> Result<(), BoxedError> { + let client_ready_notify = Arc::new(Notify::new()); - let framework = StandardFramework::new(); - framework.configure(Configuration::new().prefix("!")); + let framework = StandardFramework::new(); + framework.configure(Configuration::new().prefix("!")); - let mut client = Client::builder(self.discord_token.as_str(), GatewayIntents::all()) - .framework(framework) - .event_handler(EventHandler::new( - Arc::clone(&self.ready), - Arc::clone(&client_ready_notify), - )) - .await?; + let mut client = Client::builder(self.discord_token.as_str(), GatewayIntents::all()) + .framework(framework) + .event_handler(EventHandler::new( + Arc::clone(&self.ready), + Arc::clone(&client_ready_notify), + )) + .await?; - if self.cache.set(Arc::clone(&client.cache)).is_err() { - error!("Could not set cache OnceLock because it was already set. This should never happen."); - return Err("Could not set cache OnceLock because it was already set.".into()); - } + if self.cache.set(Arc::clone(&client.cache)).is_err() { + error!("Could not set cache OnceLock because it was already set. This should never happen."); + return Err("Could not set cache OnceLock because it was already set.".into()); + } - if self.data.set(Arc::clone(&client.data)).is_err() { - error!("Could not set data OnceLock because it was already set. This should never happen."); - return Err("Could not set data OnceLock because it was already set.".into()); - } + if self.data.set(Arc::clone(&client.data)).is_err() { + error!("Could not set data OnceLock because it was already set. This should never happen."); + return Err("Could not set data OnceLock because it was already set.".into()); + } - if self.http.set(Arc::clone(&client.http)).is_err() { - error!("Could not set http OnceLock because it was already set. This should never happen."); - return Err("Could not set http OnceLock because it was already set.".into()); - } + if self.http.set(Arc::clone(&client.http)).is_err() { + error!("Could not set http OnceLock because it was already set. This should never happen."); + return Err("Could not set http OnceLock because it was already set.".into()); + } - if self.shard_manager.set(Arc::clone(&client.shard_manager)).is_err() { - error!("Could not set shard_manager OnceLock because it was already set. This should never happen."); - return Err("Could not set shard_manager OnceLock because it was already set.".into()); - } + if self.shard_manager.set(Arc::clone(&client.shard_manager)).is_err() { + error!( + "Could not set shard_manager OnceLock because it was already set. This should never happen." + ); + return Err("Could not set shard_manager OnceLock because it was already set.".into()); + } - if let Some(voice_manager) = &client.voice_manager { - if self.voice_manager.set(Arc::clone(voice_manager)).is_err() { - error!("Could not set voice_manager OnceLock because it was already set. This should never happen."); - return Err("Could not set voice_manager OnceLock because it was already set.".into()); - } - } else { - warn!("Voice manager is not available"); + if let Some(voice_manager) = &client.voice_manager { + if self.voice_manager.set(Arc::clone(voice_manager)).is_err() { + error!("Could not set voice_manager OnceLock because it was already set. This should never happen."); + return Err("Could not set voice_manager OnceLock because it was already set.".into()); } + } else { + warn!("Voice manager is not available"); + } - if self.ws_url.set(Arc::clone(&client.ws_url)).is_err() { - error!("Could not set ws_url OnceLock because it was already set. This should never happen."); - return Err("Could not set ws_url OnceLock because it was already set.".into()); - } + if self.ws_url.set(Arc::clone(&client.ws_url)).is_err() { + error!("Could not set ws_url OnceLock because it was already set. This should never happen."); + return Err("Could not set ws_url OnceLock because it was already set.".into()); + } - let client_handle = spawn(async move { client.start().await }); + let client_handle = spawn(async move { client.start().await }); - select! { - _ = client_ready_notify.notified() => {}, - _ = sleep(Duration::from_secs(2)) => {}, - } + select! { + _ = client_ready_notify.notified() => {}, + _ = sleep(Duration::from_secs(2)) => {}, + } - if client_handle.is_finished() { - client_handle.await??; - return Err("Discord client stopped unexpectedly".into()); - } + if client_handle.is_finished() { + client_handle.await??; + return Err("Discord client stopped unexpectedly".into()); + } - self.client_handle = Some(client_handle); - Ok(()) - }) + self.client_handle = Some(client_handle); + Ok(()) } - fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()> { - Box::pin(async move { - if let Some(client_handle) = self.client_handle.take() { - info!("Waiting for Discord client to stop..."); + async fn stop(&mut self) -> Result<(), BoxedError> { + if let Some(client_handle) = self.client_handle.take() { + info!("Waiting for Discord client to stop..."); - client_handle.abort(); // Should trigger a JoinError in the client_handle, if the task hasn't already ended + client_handle.abort(); // Should trigger a JoinError in the client_handle, if the task hasn't already ended - // If the thread ended WITHOUT a JoinError, the client already stopped unexpectedly - let result = async move { - match client_handle.await { - Ok(result) => result, - Err(_) => Ok(()), - } + // If the thread ended WITHOUT a JoinError, the client already stopped unexpectedly + let result = async move { + match client_handle.await { + Ok(result) => result, + Err(_) => Ok(()), } - .await; - result?; } + .await; + result?; + } - Ok(()) - }) + Ok(()) } } @@ -157,6 +157,7 @@ impl EventHandler { } } +//TODO: When Rust allows async trait methods to be object-safe, refactor this to not use async_trait anymore #[async_trait] impl client::EventHandler for EventHandler { async fn ready(&self, _ctx: Context, data_about_bot: Ready) { diff --git a/src/service/service.rs b/src/service/service.rs index 4c0c83a..b12fb24 100644 --- a/src/service/service.rs +++ b/src/service/service.rs @@ -4,13 +4,15 @@ use std::{ sync::Arc, }; +use async_trait::async_trait; use downcast_rs::{impl_downcast, DowncastSync}; use crate::event::Observable; use super::{ service_manager::ServiceManager, - types::{LifetimedPinnedBoxedFuture, LifetimedPinnedBoxedFutureResult, Priority, Status}, + types::{Priority, Status}, + BoxedError, LifetimedPinnedBoxedFutureResult, }; #[derive(Debug)] @@ -58,17 +60,18 @@ impl Hash for ServiceInfo { self.id.hash(state); } } -//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a PinnedBoxedFutureResult +//TODO: When Rust allows async trait methods to be object-safe, refactor this to not use async_trait anymore +#[async_trait] pub trait Service: DowncastSync { fn info(&self) -> &ServiceInfo; - fn start(&mut self, service_manager: Arc) -> LifetimedPinnedBoxedFutureResult<'_, ()>; - fn stop(&mut self) -> LifetimedPinnedBoxedFutureResult<'_, ()>; + async fn start(&mut self, service_manager: Arc) -> Result<(), BoxedError>; + async fn stop(&mut self) -> Result<(), BoxedError>; fn task<'a>(&self) -> Option> { None } - fn is_available(&self) -> LifetimedPinnedBoxedFuture<'_, bool> { - Box::pin(async move { matches!(self.info().status.get().await, Status::Started) }) + async fn is_available(&self) -> bool { + matches!(self.info().status.get().await, Status::Started) } } diff --git a/src/service/service_manager.rs b/src/service/service_manager.rs index f715e5b..ad5e3a9 100644 --- a/src/service/service_manager.rs +++ b/src/service/service_manager.rs @@ -1,12 +1,12 @@ use super::{ service::Service, - types::{LifetimedPinnedBoxedFuture, OverallStatus, Priority, ShutdownError, StartupError, Status}, + types::{OverallStatus, Priority, ShutdownError, StartupError, Status}, }; use crate::{ event::EventRepeater, service::Watchdog }; use log::{error, info, warn}; -use std::{collections::HashMap, fmt::Display, mem, sync::{Arc, OnceLock, Weak}, time::Duration}; +use std::{collections::HashMap, fmt::{self, Display}, mem, sync::{Arc, OnceLock, Weak}, time::Duration}; use tokio::{ spawn, sync::{Mutex, MutexGuard}, @@ -217,105 +217,101 @@ impl ServiceManager { } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn overall_status(&self) -> LifetimedPinnedBoxedFuture<'_, OverallStatus> { - Box::pin(async move { - for service in self.services.iter() { - let service = service.lock().await; + pub async fn overall_status(&self) -> OverallStatus { + for service in self.services.iter() { + let service = service.lock().await; - if service.info().priority != Priority::Essential { - continue; - } + if service.info().priority != Priority::Essential { + continue; + } - let status = service.info().status.get().await; - if status != Status::Started { - return OverallStatus::Unhealthy; - } + let status = service.info().status.get().await; + if status != Status::Started { + return OverallStatus::Unhealthy; } + } - OverallStatus::Healthy - }) + OverallStatus::Healthy } //TODO: When Rust allows async closures, refactor this to use iterator methods instead of for loop - pub fn status_overview(&self) -> LifetimedPinnedBoxedFuture<'_, String> { - Box::pin(async move { - let mut text_buffer = String::new(); - - let mut failed_essentials = Vec::new(); - let mut failed_optionals = Vec::new(); - let mut non_failed_essentials = Vec::new(); - let mut non_failed_optionals = Vec::new(); - let mut others = Vec::new(); + pub async fn status_overview(&self) -> String { + let mut text_buffer = String::new(); - for service in self.services.iter() { - let service = service.lock().await; - let info = service.info(); - let priority = &info.priority; - let status = info.status.get().await; + let mut failed_essentials = Vec::new(); + let mut failed_optionals = Vec::new(); + let mut non_failed_essentials = Vec::new(); + let mut non_failed_optionals = Vec::new(); + let mut others = Vec::new(); - match status { - Status::Started | Status::Stopped => match priority { + for service in self.services.iter() { + let service = service.lock().await; + let info = service.info(); + let priority = &info.priority; + let status = info.status.get().await; + + match status { + Status::Started | Status::Stopped => match priority { + Priority::Essential => { + non_failed_essentials.push(format!(" - {}: {}", info.name, status)); + } + Priority::Optional => { + non_failed_optionals.push(format!(" - {}: {}", info.name, status)); + } + }, + Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { + match priority { Priority::Essential => { - non_failed_essentials.push(format!(" - {}: {}", info.name, status)); + failed_essentials.push(format!(" - {}: {}", info.name, status)); } Priority::Optional => { - non_failed_optionals.push(format!(" - {}: {}", info.name, status)); + failed_optionals.push(format!(" - {}: {}", info.name, status)); } - }, - Status::FailedToStart(_) | Status::FailedToStop(_) | Status::RuntimeError(_) => { - match priority { - Priority::Essential => { - failed_essentials.push(format!(" - {}: {}", info.name, status)); - } - Priority::Optional => { - failed_optionals.push(format!(" - {}: {}", info.name, status)); - } - } - } - _ => { - others.push(format!(" - {}: {}", info.name, status)); } } + _ => { + others.push(format!(" - {}: {}", info.name, status)); + } } + } - if !failed_essentials.is_empty() { - text_buffer.push_str(&format!("{}:\n", "Failed essential services")); - text_buffer.push_str(failed_essentials.join("\n").as_str()); - } - - if !failed_optionals.is_empty() { - text_buffer.push_str(&format!("{}:\n", "Failed optional services")); - text_buffer.push_str(failed_optionals.join("\n").as_str()); - } + if !failed_essentials.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Failed essential services")); + text_buffer.push_str(failed_essentials.join("\n").as_str()); + } - if !non_failed_essentials.is_empty() { - text_buffer.push_str(&format!("{}:\n", "Essential services")); - text_buffer.push_str(non_failed_essentials.join("\n").as_str()); - } + if !failed_optionals.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Failed optional services")); + text_buffer.push_str(failed_optionals.join("\n").as_str()); + } - if !non_failed_optionals.is_empty() { - text_buffer.push_str(&format!("{}:\n", "Optional services")); - text_buffer.push_str(non_failed_optionals.join("\n").as_str()); - } + if !non_failed_essentials.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Essential services")); + text_buffer.push_str(non_failed_essentials.join("\n").as_str()); + } - if !others.is_empty() { - text_buffer.push_str(&format!("{}:\n", "Other services")); - text_buffer.push_str(others.join("\n").as_str()); - } + if !non_failed_optionals.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Optional services")); + text_buffer.push_str(non_failed_optionals.join("\n").as_str()); + } - let longest_width = text_buffer - .lines() - .map(|line| line.len()) - .max() - .unwrap_or(0); - - let mut headline = String::from("Status overview\n"); - headline.push_str("─".repeat(longest_width).as_str()); - headline.push('\n'); - text_buffer.insert_str(0, &headline); + if !others.is_empty() { + text_buffer.push_str(&format!("{}:\n", "Other services")); + text_buffer.push_str(others.join("\n").as_str()); + } - text_buffer - }) + let longest_width = text_buffer + .lines() + .map(|line| line.len()) + .max() + .unwrap_or(0); + + let mut headline = String::from("Status overview\n"); + headline.push_str("─".repeat(longest_width).as_str()); + headline.push('\n'); + text_buffer.insert_str(0, &headline); + + text_buffer } async fn init_service( @@ -486,7 +482,7 @@ impl ServiceManager { } impl Display for ServiceManager { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Services: ")?; if self.services.is_empty() { diff --git a/src/service/types.rs b/src/service/types.rs index 6d388de..70c383c 100644 --- a/src/service/types.rs +++ b/src/service/types.rs @@ -1,4 +1,9 @@ -use std::{error::Error, fmt::Display, future::Future, pin::Pin}; +use std::{ + error::Error, + fmt::{self, Display}, + future::Future, + pin::Pin, +}; use thiserror::Error; @@ -6,13 +11,10 @@ use crate::event::event_repeater::{AttachError, DetachError}; pub type BoxedError = Box; -pub type BoxedFuture = Box + Send>; -pub type BoxedFutureResult = BoxedFuture>; - -pub type PinnedBoxedFuture = Pin + Send>>; +pub type PinnedBoxedFuture = Pin + Send + Sync>>; pub type PinnedBoxedFutureResult = PinnedBoxedFuture>; -pub type LifetimedPinnedBoxedFuture<'a, T> = Pin + Send + 'a>>; +pub type LifetimedPinnedBoxedFuture<'a, T> = Pin + Send + Sync + 'a>>; pub type LifetimedPinnedBoxedFutureResult<'a, T> = LifetimedPinnedBoxedFuture<'a, Result>; #[derive(Debug, Clone)] @@ -27,7 +29,7 @@ pub enum Status { } impl Display for Status { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Status::Started => write!(f, "Started"), Status::Stopped => write!(f, "Stopped"), @@ -64,7 +66,7 @@ pub enum OverallStatus { } impl Display for OverallStatus { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { OverallStatus::Healthy => write!(f, "Healthy"), OverallStatus::Unhealthy => write!(f, "Unhealthy"), @@ -79,7 +81,7 @@ pub enum Priority { } impl Display for Priority { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { Priority::Essential => write!(f, "Essential"), Priority::Optional => write!(f, "Optional"), diff --git a/src/service/watchdog.rs b/src/service/watchdog.rs index 2eb53a9..6b686db 100644 --- a/src/service/watchdog.rs +++ b/src/service/watchdog.rs @@ -1,4 +1,3 @@ -use super::types::LifetimedPinnedBoxedFuture; use log::error; use serenity::FutureExt; use std::{future::Future, mem::replace, sync::Arc}; @@ -7,6 +6,8 @@ use tokio::sync::{ Mutex, }; +use super::LifetimedPinnedBoxedFuture; + //TODO: Rename to TaskChain and use Event instead of manual subscriber handling pub struct Watchdog<'a, T: Send> { task: LifetimedPinnedBoxedFuture<'a, T>, @@ -23,8 +24,8 @@ impl<'a, T: 'a + Send> Watchdog<'a, T> { pub fn append(&mut self, task: FN) where - FN: FnOnce(T) -> FUT + Send + 'a, - FUT: Future + Send + 'a, + FN: FnOnce(T) -> FUT + Send + Sync + 'a, + FUT: Future + Send + Sync + 'a, { let previous_task = replace( &mut self.task,