Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Efficient Signal in service manager #95

Merged
merged 2 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ tokio = { version = "1.17.0", features = [
tower-http = { version = "0.2.4", default-features = false, features = [
"cors",
] }
triggered = "0.1.2"

[dev-dependencies]
reqwest = { version = "0.11.9", default-features = false, features = [
Expand Down
6 changes: 3 additions & 3 deletions aquadoggo/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ impl Runtime {

/// Close all running concurrent tasks and wait until they are fully shut down.
pub async fn shutdown(self) {
// Close connection pool
self.pool.close().await;

// Wait until all tasks are shut down
self.manager.shutdown().await;

// Close connection pool
self.pool.close().await;
}
}
86 changes: 30 additions & 56 deletions aquadoggo/src/service_manager.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::task::{Context, Poll};

use anyhow::Result;
use log::{error, info};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::task;
use tokio::task::JoinHandle;
use triggered::{Listener, Trigger};

/// Sends messages through the communication bus between services.
pub type Sender<T> = broadcast::Sender<T>;
Expand Down Expand Up @@ -56,51 +53,24 @@ where
}
}

/// Future which resolves as soon as the internal boolean flag gets flipped to "true".
///
/// The flag can be manually set with the `fire` method or automatically whenever the signal
/// instance gets dropped. The latter is a trick to fire a signal when something panics.
struct Signal(Arc<AtomicBool>);
/// Wrapper around `Trigger` which sends a signal as soon as `Signal` gets dropped.
#[derive(Clone)]
struct Signal(Trigger);

impl Signal {
/// Returns a new signal instance.
pub fn new() -> Self {
Self(Arc::new(AtomicBool::new(false)))
}

/// Sets the boolean flag to "true" which will resolve the future.
pub fn fire(&mut self) {
self.0.swap(true, Ordering::Relaxed);
}
}

impl Future for Signal {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.0.load(Ordering::Relaxed) {
true => Poll::Ready(()),
false => {
cx.waker().clone().wake();
Poll::Pending
}
}
}
}

impl Clone for Signal {
fn clone(&self) -> Self {
Self(self.0.clone())
/// Fires the signal manually.
pub fn trigger(&self) {
self.0.trigger();
}
}

impl Drop for Signal {
fn drop(&mut self) {
// Fire signal whenever signal handler goes out of scope
self.fire();
// Fires the signal automatically on drop
self.trigger();

// Drop object
drop(self)
// And now, drop it!
drop(self);
}
}

Expand All @@ -121,21 +91,24 @@ where
context: D,

/// Sender of our communication bus.
///
/// This is a broadcast channel where any amount of senders and receivers can be derived from.
tx: Sender<M>,

/// Sender of exit signal.
///
/// The manager catches returned errors and panics from services and notifies this channel.
/// This can be used to listen to incoming errors and react to them, for example by quitting
/// the program.
/// The manager catches returned errors or panics from services and sends the exit signal.
exit_signal: Signal,

/// Sender of the shutdown signal.
/// Receiver of exit signal.
///
/// This can be used to react to service errors, for example by quitting the program.
exit_handle: Listener,

/// Sender of shutdown signal.
///
/// All services can subscribe to this broadcast channel and accordingly react to it.
///
/// All services can subscribe to this broadcast channel and accordingly react to it if they
/// need to.
/// This needs to be a broadcast channel as we keep count of the subscribers and stop the
/// service manager as soon as all of them have been dropped.
shutdown_signal: broadcast::Sender<bool>,
}

Expand All @@ -146,17 +119,18 @@ where
{
/// Returns a new instance of a service manager.
///
/// The capacity argument defines the maximum bound of messages on the communication bus which
/// get broadcasted across all services.
/// The `capacity` argument defines the maximum bound of messages on the communication bus
/// which get broadcasted across all services.
pub fn new(capacity: usize, context: D) -> Self {
let (tx, _) = broadcast::channel(capacity);
let (shutdown_signal, _) = broadcast::channel(16);
let exit_signal = Signal::new();
let (exit_signal, exit_handle) = triggered::trigger();

Self {
context,
tx,
exit_signal,
exit_signal: Signal(exit_signal),
exit_handle,
shutdown_signal,
}
}
Expand All @@ -183,7 +157,7 @@ where
});

// Sender for exit signal
let mut exit_signal = self.exit_signal.clone();
let exit_signal = self.exit_signal.clone();

// Reference to shared context
let context = self.context.clone();
Expand All @@ -201,7 +175,7 @@ where
// Handle potential errors which have been returned by the service.
if let Some(err) = handle.err() {
error!("Error in {} service: {}", name, err);
exit_signal.fire();
exit_signal.trigger();
}

// `exit_signal` will go out of scope now and drops here. Since we also implemented the
Expand All @@ -212,7 +186,7 @@ where

/// Future which resolves as soon as a service returned an error, panicked or stopped.
pub async fn on_exit(&self) {
self.exit_signal.clone().await;
self.exit_handle.clone().await;
}

/// Informs all services about graceful shutdown and waits for them until they all stopped.
Expand Down