Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
2e2d0e4
Bump main to staging (#7)
Kitt3120 Dec 29, 2023
04ad42b
Clonable Status
Kitt3120 Jan 29, 2024
b37e205
Adapt Service Manager to new Status enum
Kitt3120 Jan 29, 2024
41e11c5
Events
Kitt3120 Feb 8, 2024
c6ce236
Event improvements
Kitt3120 Feb 10, 2024
e500458
Event improvements
Kitt3120 Mar 29, 2024
6c7477d
Slight refactors
Kitt3120 Mar 31, 2024
bdf8ce9
WIP: Idk lol
Kitt3120 Jun 28, 2024
2f8850b
add: allow clippy::multiple_bound_locations for service module
Kitt3120 Jun 28, 2024
02e2441
add: observables
Kitt3120 Jul 26, 2024
782f704
refactor: use Mutex instead of RwLock everywhere
Kitt3120 Jul 26, 2024
198296b
refactor: make remove_on_error work on per-subscriber basis
Kitt3120 Jul 27, 2024
88a28c1
refactor: make subscribers identifiable
Kitt3120 Jul 27, 2024
8f4d648
refactor: move dispatch logic to Subscriber
Kitt3120 Jul 27, 2024
740f6d3
add: AsyncClosure Callback type
Kitt3120 Jul 27, 2024
7b3345c
WIP: EventRepeater
Kitt3120 Jul 28, 2024
4cdff2f
add: EventRepeater
Kitt3120 Aug 5, 2024
2e6f933
refactor: event subscribe method names
Kitt3120 Sep 21, 2024
abe190a
refactor: move subscription into own module
Kitt3120 Sep 21, 2024
05b6d49
add: AsRef<Event<T>>
Kitt3120 Sep 21, 2024
cb8fa7e
add: UUID, PartialEq/Eq, unsubscribe()
Kitt3120 Sep 21, 2024
0d3c1c1
add: event_repeater detach(), close()
Kitt3120 Sep 22, 2024
4ff9c8b
add: attach/deattach EventRepeater on start/stop of service
Kitt3120 Sep 22, 2024
2155cbf
add: service runtime failure handling
Kitt3120 Oct 4, 2024
456e86c
fix: bump version to 0.2.1
Kitt3120 Oct 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "lum"
version = "0.1.0"
version = "0.2.1"
edition = "2021"
description = "Lum Discord Bot"
license= "MIT"
Expand All @@ -23,3 +23,4 @@ serenity = { version = "0.12.0", default-features=false, features = ["builder",
sqlx = { version = "0.8.0", features = ["runtime-tokio", "any", "postgres", "mysql", "sqlite", "tls-native-tls", "migrate", "macros", "uuid", "chrono", "json"] }
thiserror = "1.0.52"
tokio = { version = "1.35.1", features = ["full"] }
uuid = { version = "1.10.0", features = ["fast-rng", "macro-diagnostics", "v4"] }
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ Beta: [![Deploy](https://github.com/Kitt3120/lum/actions/workflows/deploy_prerel

# Collaborating

Checkout out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues)
Check out [Milestones](https://github.com/Kitt3120/lum/milestones), [Board](https://github.com/users/Kitt3120/projects/3), and [Issues](https://github.com/Kitt3120/lum/issues)
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
fn main() {
// trigger recompilation when a new migration is added
println!("cargo:rerun-if-changed=migrations");
}
}
71 changes: 64 additions & 7 deletions src/bot.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
use std::sync::Arc;
use core::fmt;
use std::{fmt::Display, sync::Arc};

use tokio::sync::RwLock;
use log::error;
use tokio::{signal, sync::Mutex};

use crate::service::{PinnedBoxedFuture, Service, ServiceManager, ServiceManagerBuilder};
use crate::service::{
types::LifetimedPinnedBoxedFuture, OverallStatus, Service, ServiceManager, ServiceManagerBuilder,
};

#[derive(Debug, Clone, Copy)]
pub enum ExitReason {
SIGINT,
EssentialServiceFailed,
}

impl Display for ExitReason {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::SIGINT => write!(f, "SIGINT"),
Self::EssentialServiceFailed => write!(f, "Essential Service Failed"),
}
}
}

pub struct BotBuilder {
name: String,
Expand All @@ -17,13 +36,13 @@ impl BotBuilder {
}
}

pub async fn with_service(mut self, service: Arc<RwLock<dyn Service>>) -> Self {
pub async fn with_service(mut self, service: Arc<Mutex<dyn Service>>) -> Self {
self.service_manager = self.service_manager.with_service(service).await; // The ServiceManagerBuilder itself will warn when adding a service multiple times

self
}

pub async fn with_services(mut self, services: Vec<Arc<RwLock<dyn Service>>>) -> Self {
pub async fn with_services(mut self, services: Vec<Arc<Mutex<dyn Service>>>) -> Self {
for service in services {
self.service_manager = self.service_manager.with_service(service).await;
}
Expand All @@ -50,18 +69,56 @@ impl Bot {
}

//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future
pub fn start(&mut self) -> PinnedBoxedFuture<'_, ()> {
pub fn start(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> {
Box::pin(async move {
self.service_manager.start_services().await;
//TODO: Potential for further initialization here, like modules
})
}

//TODO: When Rust allows async trait methods to be object-safe, refactor this to use async instead of returning a future
pub fn stop(&mut self) -> PinnedBoxedFuture<'_, ()> {
pub fn stop(&mut self) -> LifetimedPinnedBoxedFuture<'_, ()> {
Box::pin(async move {
self.service_manager.stop_services().await;
//TODO: Potential for further deinitialization here, like modules
})
}

pub async fn join(&self) -> ExitReason {
let name_clone = self.name.clone();
let signal_task = tokio::spawn(async move {
let name = name_clone;

let result = signal::ctrl_c().await;
if let Err(error) = result {
error!(
"Error receiving SIGINT: {}. {} will exit ungracefully immediately to prevent undefined behavior.",
error, name
);
panic!("Error receiving SIGINT: {}", error);
}
});

let service_manager_clone = self.service_manager.clone();
let mut receiver = self
.service_manager
.on_status_change
.event
.subscribe_channel("t", 2, true, true)
.await;
let status_task = tokio::spawn(async move {
let service_manager = service_manager_clone;
while (receiver.receiver.recv().await).is_some() {
let overall_status = service_manager.overall_status().await;
if overall_status == OverallStatus::Unhealthy {
return;
}
}
});

tokio::select! {
_ = signal_task => ExitReason::SIGINT,
_ = status_task => ExitReason::EssentialServiceFailed,
}
}
}
13 changes: 13 additions & 0 deletions src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pub mod arc_observable;
pub mod event;
pub mod event_repeater;
pub mod observable;
pub mod subscriber;
pub mod subscription;

pub use arc_observable::ArcObservable;
pub use event::Event;
pub use event_repeater::EventRepeater;
pub use observable::{Observable, ObservableResult};
pub use subscriber::{Callback, DispatchError, Subscriber};
pub use subscription::{ReceiverSubscription, Subscription};
60 changes: 60 additions & 0 deletions src/event/arc_observable.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::{
hash::{DefaultHasher, Hash, Hasher},
sync::Arc,
};

use tokio::sync::Mutex;

use super::{Event, ObservableResult};

#[derive(Debug)]
pub struct ArcObservable<T>
where
T: Send + 'static + Hash,
{
value: Arc<Mutex<T>>,
on_change: Event<Mutex<T>>,
}

impl<T> ArcObservable<T>
where
T: Send + 'static + Hash,
{
pub fn new(value: T, event_name: impl Into<String>) -> Self {
Self {
value: Arc::new(Mutex::new(value)),
on_change: Event::new(event_name),
}
}

pub async fn get(&self) -> Arc<Mutex<T>> {
Arc::clone(&self.value)
}

pub async fn set(&self, value: T) -> ObservableResult<Mutex<T>> {
let mut lock = self.value.lock().await;

let mut hasher = DefaultHasher::new();
(*lock).hash(&mut hasher);
let current_value = hasher.finish();

let mut hasher = DefaultHasher::new();
value.hash(&mut hasher);
let new_value = hasher.finish();

if current_value == new_value {
return ObservableResult::Unchanged;
}

*lock = value;
drop(lock);

let value = Arc::clone(&self.value);
let dispatch_result = self.on_change.dispatch(value).await;

match dispatch_result {
Ok(_) => ObservableResult::Changed(Ok(())),
Err(errors) => ObservableResult::Changed(Err(errors)),
}
}
}
Loading
Loading