Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service ready signal #218

Merged
merged 6 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- GraphQL replication service gets and verifies new entries and inserts them into the db [#137](https://github.com/p2panda/aquadoggo/pull/137)
- Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166)
- Service ready signal [#218](https://github.com/p2panda/aquadoggo/pull/218)

### Changed

Expand Down
14 changes: 13 additions & 1 deletion aquadoggo/src/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use axum::extract::Extension;
use axum::http::Method;
use axum::routing::get;
use axum::Router;
use log::{debug, warn};
use tokio::sync::oneshot;
use tower_http::cors::{Any, CorsLayer};

use crate::bus::ServiceSender;
Expand Down Expand Up @@ -38,7 +40,12 @@ pub fn build_server(http_context: HttpServiceContext) -> Router {
}

/// Start HTTP server.
pub async fn http_service(context: Context, signal: Shutdown, tx: ServiceSender) -> Result<()> {
pub async fn http_service(
context: Context,
signal: Shutdown,
tx: ServiceSender,
tx_ready: oneshot::Sender<()>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe give the sender also an own type like you already do for the receiver (ServiceReady)? I think that would also ready easier as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like this?

) -> Result<()> {
let http_port = context.config.http_port;
let http_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), http_port);

Expand All @@ -48,6 +55,11 @@ pub async fn http_service(context: Context, signal: Shutdown, tx: ServiceSender)
axum::Server::try_bind(&http_address)?
.serve(build_server(http_context).into_make_service())
.with_graceful_shutdown(async {
debug!("HTTP service is ready");
if tx_ready.send(()).is_err() {
warn!("No subscriber informed about HTTP service being ready");
};

signal.await.ok();
})
.await?;
Expand Down
139 changes: 102 additions & 37 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ use std::future::Future;

use anyhow::Result;
use log::{error, info};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, oneshot};
use tokio::task;
use tokio::task::JoinHandle;
use triggered::{Listener, Trigger};

/// Sends messages through the communication bus between services.
pub type Sender<T> = broadcast::Sender<T>;

// Receives ready signal from services once they are ready to handle messages on the communication bus.
pub type ServiceReady = oneshot::Receiver<()>;

/// Receives shutdown signal for services so they can react accordingly.
pub type Shutdown = JoinHandle<()>;

Expand All @@ -27,15 +30,21 @@ where
D: Clone + Send + Sync + 'static,
M: Clone + Send + Sync + 'static,
{
async fn call(&self, context: D, shutdown: Shutdown, tx: Sender<M>) -> Result<()>;
async fn call(
&self,
context: D,
shutdown: Shutdown,
tx: Sender<M>,
tx_ready: oneshot::Sender<()>,
) -> Result<()>;
}

/// Implements our `Service` trait for a generic async function.
#[async_trait::async_trait]
impl<FN, F, D, M> Service<D, M> for FN
where
// Function accepting a context and our communication channels, returning a future.
FN: Fn(D, Shutdown, Sender<M>) -> F + Sync,
FN: Fn(D, Shutdown, Sender<M>, oneshot::Sender<()>) -> F + Sync,
// A future
F: Future<Output = Result<()>> + Send + 'static,
// Generic context type.
Expand All @@ -48,8 +57,14 @@ where
///
/// This gets automatically wrapped in a static, boxed and pinned function signature by the
/// `async_trait` macro so we don't need to do it ourselves.
async fn call(&self, context: D, shutdown: Shutdown, tx: Sender<M>) -> Result<()> {
(self)(context, shutdown, tx).await
async fn call(
&self,
context: D,
shutdown: Shutdown,
tx: Sender<M>,
tx_ready: oneshot::Sender<()>,
) -> Result<()> {
(self)(context, shutdown, tx, tx_ready).await
}
}

Expand Down Expand Up @@ -136,11 +151,13 @@ where
///
/// Errors returned and panics by the service will send an exit signal which can be subscribed
/// to via the `on_exit` method.
///
///
cafca marked this conversation as resolved.
Show resolved Hide resolved
pub fn add<F: Service<D, M> + Send + Sync + 'static>(
&mut self,
name: &'static str,
service: F,
) {
) -> ServiceReady {
// Sender for communication bus
let tx = self.tx.clone();

Expand All @@ -156,14 +173,18 @@ where
// Sender for exit signal
let exit_signal = self.exit_signal.clone();

// Oneshot channel on which services send a message once they are listening on the
// communication bus.
let (tx_ready, rx_ready) = oneshot::channel::<()>();

// Reference to shared context
let context = self.context.clone();

task::spawn(async move {
info!("Start {} service", name);

// Run the service!
let handle = service.call(context, signal, tx).await;
let handle = service.call(context, signal, tx, tx_ready).await;

// Drop the shutdown sender of this service when we're done, this signals the shutdown
// process that this service has finally stopped
Expand All @@ -179,6 +200,9 @@ where
// `Drop` trait on `Signal` we will be able to fire a signal also when this task panics
// or stops.
});

// Return ready signal receiver so this method can be awaited.
rx_ready
}

/// Future which resolves as soon as a service returned an error, panicked or stopped.
Expand Down Expand Up @@ -215,6 +239,8 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use tokio::sync::oneshot;

use super::{Sender, ServiceManager, Shutdown};

type Counter = Arc<AtomicUsize>;
Expand All @@ -223,7 +249,7 @@ mod tests {
async fn service_manager() {
let mut manager = ServiceManager::<usize, usize>::new(16, 0);

manager.add("test", |_, signal: Shutdown, _| async {
manager.add("test", |_, signal: Shutdown, _, _| async {
let work = tokio::task::spawn(async {
loop {
// Doing some very important work here ..
Expand Down Expand Up @@ -258,21 +284,24 @@ mod tests {

// Create five services waiting for message
for _ in 0..5 {
manager.add("rx", |data: Counter, _, tx: Sender<Message>| async move {
let mut rx = tx.subscribe();
let message = rx.recv().await.unwrap();

// Increment counter as soon as we received the right message
if matches!(message, Message::Hello) {
data.fetch_add(1, Ordering::Relaxed);
}

Ok(())
});
manager.add(
"rx",
|data: Counter, _, tx: Sender<Message>, _| async move {
let mut rx = tx.subscribe();
let message = rx.recv().await.unwrap();

// Increment counter as soon as we received the right message
if matches!(message, Message::Hello) {
data.fetch_add(1, Ordering::Relaxed);
}

Ok(())
},
);
}

// Create another service sending message over communication bus
manager.add("tx", |_, _, tx: Sender<Message>| async move {
manager.add("tx", |_, _, tx: Sender<Message>, _| async move {
tx.send(Message::Hello).unwrap();
Ok(())
});
Expand All @@ -288,29 +317,32 @@ mod tests {
let counter: Counter = Arc::new(AtomicUsize::new(0));
let mut manager = ServiceManager::<Counter, usize>::new(32, counter.clone());

manager.add("one", |counter: Counter, signal: Shutdown, _| async move {
let counter_clone = counter.clone();
manager.add(
"one",
|counter: Counter, signal: Shutdown, _, _| async move {
let counter_clone = counter.clone();

let work = tokio::task::spawn(async move {
// Increment counter once within the work task
counter_clone.fetch_add(1, Ordering::Relaxed);
let work = tokio::task::spawn(async move {
// Increment counter once within the work task
counter_clone.fetch_add(1, Ordering::Relaxed);

loop {
// We stay here forever now and make sure this task will not stop until we
// receive the shutdown signal
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
});
loop {
// We stay here forever now and make sure this task will not stop until we
// receive the shutdown signal
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
});

tokio::select! { _ = work => (), _ = signal => () };
tokio::select! { _ = work => (), _ = signal => () };

// Increment counter another time during shutdown
counter.fetch_add(1, Ordering::Relaxed);
// Increment counter another time during shutdown
counter.fetch_add(1, Ordering::Relaxed);

Ok(())
});
Ok(())
},
);

manager.add("two", |_, _, _| async move {
manager.add("two", |_, _, _, _| async move {
// Wait a little bit for the first task to do its work
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
panic!("This went wrong");
Expand All @@ -325,4 +357,37 @@ mod tests {
// Check if we could do our work and shutdown procedure
assert_eq!(counter.load(Ordering::Relaxed), 2);
}

#[tokio::test]
async fn ready_signal() {
let mut manager = ServiceManager::<usize, usize>::new(16, 0);

let service_ready = manager.add(
"ready_signal",
|_, _, _, tx_ready: oneshot::Sender<()>| async {
// Send a message to indicate that this service is ready for some WORK!
tx_ready.send(()).unwrap();
Ok(())
},
);

// Blocking here until service has signalled that it's ready.
assert!(service_ready.await.is_ok());
}

#[tokio::test]
async fn ready_signal_error() {
let mut manager = ServiceManager::<usize, usize>::new(16, 0);

let service_ready = manager.add(
"ready_signal",
|_, _, _, _tx_ready: oneshot::Sender<()>| async {
// This service doesn't indicate that it's ready!
Ok(())
},
);

// We panic when trying to wait for the service to become ready.
assert!(service_ready.await.is_err());
}
}
Loading