Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions src/event/event_repeater.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
use std::{collections::HashMap, sync::Arc};
use log::error;
use std::{
collections::HashMap,
sync::{Arc, Weak},
};
use thiserror::Error;
use tokio::{sync::Mutex, task::JoinHandle};
use uuid::Uuid;

use crate::setlock::{SetLock, SetLockError};

use super::{Event, Subscription};

#[derive(Debug, Error)]
pub enum AttachError {
#[error("Tried to attach event {event_name} to EventRepeater {repeater_name} before it was initialized. Did you not use EventRepeater<T>::new()?")]
#[error("Tried to attach event {event_name} to EventRepeater {repeater_name} while it was uninitialized. Did you not use EventRepeater<T>::new()?")]
NotInitialized {
event_name: String,
repeater_name: String,
},

#[error(
"Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached."
"Tried to attach event {event_name} to EventRepeater {repeater_name}, which was already attached to it."
)]
AlreadyAttached {
event_name: String,
Expand Down Expand Up @@ -47,7 +53,7 @@ where
T: Send + Sync + 'static,
{
pub event: Event<T>,
self_arc: Mutex<Option<Arc<Self>>>,
weak: Mutex<SetLock<Weak<Self>>>,
subscriptions: Mutex<HashMap<Uuid, (Subscription, JoinHandle<()>)>>,
}

Expand All @@ -62,27 +68,38 @@ where
{
let event = Event::new(name);
let event_repeater = Self {
self_arc: Mutex::new(None),
weak: Mutex::new(SetLock::new()),
event,
subscriptions: Mutex::new(HashMap::new()),
};

let self_arc = Arc::new(event_repeater);
let mut lock = self_arc.self_arc.lock().await;
let self_arc_clone = Arc::clone(&self_arc);
*lock = Some(self_arc_clone);
drop(lock);
let arc = Arc::new(event_repeater);
let weak = Arc::downgrade(&arc);

let result = arc.weak.lock().await.set(weak);
if let Err(err) = result {
match err {
SetLockError::AlreadySet => {
error!("Failed to set EventRepeater {}'s Weak self-reference because it was already set. This should never happen. Shutting down ungracefully to prevent further undefined behavior.", arc.event.name);
unreachable!(
"Unable to set EventRepeater {}'s Weak self-reference because it was already set.",
arc.event.name
);
}
}
}

self_arc
arc
}

pub async fn subscription_count(&self) -> usize {
self.subscriptions.lock().await.len()
}

pub async fn attach(&self, event: &Event<T>, buffer: usize) -> Result<(), AttachError> {
let self_arc = match self.self_arc.lock().await.as_ref() {
Some(arc) => Arc::clone(arc),
let lock = self.weak.lock().await;
let weak = match lock.get() {
Some(weak) => weak,
None => {
return Err(AttachError::NotInitialized {
event_name: event.name.clone(),
Expand All @@ -91,6 +108,15 @@ where
}
};

// This can't fail because the Arc is guaranteed to be valid as long as &self is valid.
let arc = match weak.upgrade() {
Some(arc) => arc,
None => {
error!("EventRepeater {}'s Weak self-reference could not be upgraded to Arc while attaching event {}. This should never happen. Shutting down ungracefully to prevent further undefined behavior.", self.event.name, event.name);
unreachable!("EventRepeater {}'s Weak self-reference could not be upgraded to Arc while attaching event {}.", self.event.name, event.name);
}
};

let mut subscriptions = self.subscriptions.lock().await;
if subscriptions.contains_key(&event.uuid) {
return Err(AttachError::AlreadyAttached {
Expand All @@ -108,7 +134,7 @@ where

let join_handle = tokio::spawn(async move {
while let Some(value) = receiver.recv().await {
let _ = self_arc.event.dispatch(value).await;
let _ = arc.event.dispatch(value).await;
}
});
subscriptions.insert(event.uuid, (subscription, join_handle));
Expand Down
39 changes: 28 additions & 11 deletions src/service/service_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
event::EventRepeater, service::Watchdog, setlock::{SetLock, SetLockError}
};
use log::{error, info, warn};
use std::{collections::HashMap, fmt::Display, mem, sync::Arc, time::Duration};
use std::{collections::HashMap, fmt::Display, mem, sync::{Arc, Weak}, time::Duration};
use tokio::{
spawn,
sync::{Mutex, MutexGuard},
Expand Down Expand Up @@ -54,31 +54,31 @@ pub fn new() -> Self {

pub async fn build(self) -> Arc<ServiceManager> {
let service_manager = ServiceManager {
arc: Mutex::new(SetLock::new()),
weak: Mutex::new(SetLock::new()),
services: self.services,
background_tasks: Mutex::new(HashMap::new()),
on_status_change: EventRepeater::new("service_manager_on_status_change").await,
};

let self_arc = Arc::new(service_manager);
let self_arc_clone = Arc::clone(&self_arc);

let result = self_arc_clone.arc.lock().await.set(Arc::clone(&self_arc_clone));
let arc = Arc::new(service_manager);
let weak = Arc::downgrade(&arc);

let result = arc.weak.lock().await.set(weak);
if let Err(err) = result {
match err {
SetLockError::AlreadySet => {
unreachable!("Unable to set ServiceManager's self-arc in ServiceManagerBuilder because it was already set. This should never happen. How did you...?");
error!("Unable to set ServiceManager's Weak self-reference in ServiceManagerBuilder because it was already set. This should never happen. Shutting down ungracefully to prevent further undefined behavior.");
unreachable!("Unable to set ServiceManager's Weak self-reference in ServiceManagerBuilder because it was already set.");
}
}
}

self_arc
arc
}
}

pub struct ServiceManager {
arc: Mutex<SetLock<Arc<Self>>>,
weak: Mutex<SetLock<Weak<Self>>>,
background_tasks: Mutex<HashMap<String, JoinHandle<()>>>,

pub services: Vec<Arc<Mutex<dyn Service>>>,
Expand Down Expand Up @@ -326,10 +326,27 @@ impl ServiceManager {
&self,
service: &mut MutexGuard<'_, dyn Service>,
) -> Result<(), StartupError> {
let service_manager = Arc::clone(self.arc.lock().await.unwrap());
let lock = self.weak.lock().await;
let weak = match lock.get() {
Some(weak) => weak,
None => {
error!("ServiceManager's Weak self-reference was None while initializing service {}. This should never happen. Did you not use a ServiceManagerBuilder? Shutting down ungracefully to prevent further undefined behavior.", service.info().name);
unreachable!("ServiceManager's Weak self-reference was None while initializing service {}.", service.info().name);
}
};

// This can't fail because the Arc is guaranteed to be valid as long as &self is valid.
let arc = match weak.upgrade() {
Some(arc) => arc,
None => {
error!("ServiceManager's Weak self-reference could not be upgraded to Arc while initializing service {}. This should never happen. Shutting down ungracefully to prevent further undefined behavior.", service.info().name);
unreachable!("ServiceManager's Weak self-reference could not be upgraded to Arc while initializing service {}.", service.info().name);
}
};


//TODO: Add to config instead of hardcoding duration
let start = service.start(service_manager);
let start = service.start(arc);
let timeout_result = timeout(Duration::from_secs(10), start).await;

match timeout_result {
Expand Down
Loading