Skip to content

Commit

Permalink
Add service ready signal (#218)
Browse files Browse the repository at this point in the history
* Make waits in materialiser service tests longer

* Add service ready signal

* Cargo fmt

* Add tests

* Update changelog

* Review changes

Co-authored-by: Sam Andreae <contact@samandreae.com>
  • Loading branch information
cafca and sandreae committed Aug 3, 2022
1 parent 4d2e672 commit dd73fe5
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

- GraphQL replication service gets and verifies new entries and inserts them into the db [#137](https://github.com/p2panda/aquadoggo/pull/137)
- Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166)
- Service ready signal [#218](https://github.com/p2panda/aquadoggo/pull/218)

### Changed

Expand Down
15 changes: 13 additions & 2 deletions aquadoggo/src/http/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ use axum::extract::Extension;
use axum::http::Method;
use axum::routing::get;
use axum::Router;
use log::{debug, warn};
use tower_http::cors::{Any, CorsLayer};

use crate::bus::ServiceSender;
use crate::context::Context;
use crate::http::api::{handle_graphql_playground, handle_graphql_query};
use crate::http::context::HttpServiceContext;
use crate::manager::Shutdown;
use crate::manager::{ServiceReadySender, Shutdown};

const GRAPHQL_ROUTE: &str = "/graphql";

Expand All @@ -38,7 +39,12 @@ pub fn build_server(http_context: HttpServiceContext) -> Router {
}

/// Start HTTP server.
pub async fn http_service(context: Context, signal: Shutdown, tx: ServiceSender) -> Result<()> {
pub async fn http_service(
context: Context,
signal: Shutdown,
tx: ServiceSender,
tx_ready: ServiceReadySender,
) -> Result<()> {
let http_port = context.config.http_port;
let http_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), http_port);

Expand All @@ -48,6 +54,11 @@ pub async fn http_service(context: Context, signal: Shutdown, tx: ServiceSender)
axum::Server::try_bind(&http_address)?
.serve(build_server(http_context).into_make_service())
.with_graceful_shutdown(async {
debug!("HTTP service is ready");
if tx_ready.send(()).is_err() {
warn!("No subscriber informed about HTTP service being ready");
};

signal.await.ok();
})
.await?;
Expand Down
142 changes: 104 additions & 38 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,23 @@ use std::future::Future;

use anyhow::Result;
use log::{error, info};
use tokio::sync::broadcast;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, oneshot};
use tokio::task;
use tokio::task::JoinHandle;
use triggered::{Listener, Trigger};

/// Sends messages through the communication bus between services.
pub type Sender<T> = broadcast::Sender<T>;

// pub type ServiceReady = oneshot::channel<()>;

// Receives ready signal from services once they are ready to handle messages on the communication bus.
pub type ServiceReadyReceiver = oneshot::Receiver<()>;

/// Transmits ready signal from services once they are ready to handle messages on the communication bus.
pub type ServiceReadySender = oneshot::Sender<()>;

/// Receives shutdown signal for services so they can react accordingly.
pub type Shutdown = JoinHandle<()>;

Expand All @@ -27,15 +35,21 @@ where
D: Clone + Send + Sync + 'static,
M: Clone + Send + Sync + 'static,
{
async fn call(&self, context: D, shutdown: Shutdown, tx: Sender<M>) -> Result<()>;
async fn call(
&self,
context: D,
shutdown: Shutdown,
tx: Sender<M>,
tx_ready: ServiceReadySender,
) -> Result<()>;
}

/// Implements our `Service` trait for a generic async function.
#[async_trait::async_trait]
impl<FN, F, D, M> Service<D, M> for FN
where
// Function accepting a context and our communication channels, returning a future.
FN: Fn(D, Shutdown, Sender<M>) -> F + Sync,
FN: Fn(D, Shutdown, Sender<M>, ServiceReadySender) -> F + Sync,
// A future
F: Future<Output = Result<()>> + Send + 'static,
// Generic context type.
Expand All @@ -48,8 +62,14 @@ where
///
/// This gets automatically wrapped in a static, boxed and pinned function signature by the
/// `async_trait` macro so we don't need to do it ourselves.
async fn call(&self, context: D, shutdown: Shutdown, tx: Sender<M>) -> Result<()> {
(self)(context, shutdown, tx).await
async fn call(
&self,
context: D,
shutdown: Shutdown,
tx: Sender<M>,
tx_ready: ServiceReadySender,
) -> Result<()> {
(self)(context, shutdown, tx, tx_ready).await
}
}

Expand Down Expand Up @@ -140,7 +160,7 @@ where
&mut self,
name: &'static str,
service: F,
) {
) -> ServiceReadyReceiver {
// Sender for communication bus
let tx = self.tx.clone();

Expand All @@ -156,14 +176,18 @@ where
// Sender for exit signal
let exit_signal = self.exit_signal.clone();

// Oneshot channel on which services send a message once they are listening on the
// communication bus.
let (tx_ready, rx_ready) = oneshot::channel::<()>();

// Reference to shared context
let context = self.context.clone();

task::spawn(async move {
info!("Start {} service", name);

// Run the service!
let handle = service.call(context, signal, tx).await;
let handle = service.call(context, signal, tx, tx_ready).await;

// Drop the shutdown sender of this service when we're done, this signals the shutdown
// process that this service has finally stopped
Expand All @@ -179,6 +203,9 @@ where
// `Drop` trait on `Signal` we will be able to fire a signal also when this task panics
// or stops.
});

// Return ready signal receiver so this method can be awaited.
rx_ready
}

/// Future which resolves as soon as a service returned an error, panicked or stopped.
Expand Down Expand Up @@ -215,15 +242,15 @@ mod tests {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use super::{Sender, ServiceManager, Shutdown};
use super::{Sender, ServiceManager, ServiceReadySender, Shutdown};

type Counter = Arc<AtomicUsize>;

#[tokio::test]
async fn service_manager() {
let mut manager = ServiceManager::<usize, usize>::new(16, 0);

manager.add("test", |_, signal: Shutdown, _| async {
manager.add("test", |_, signal: Shutdown, _, _| async {
let work = tokio::task::spawn(async {
loop {
// Doing some very important work here ..
Expand Down Expand Up @@ -258,21 +285,24 @@ mod tests {

// Create five services waiting for message
for _ in 0..5 {
manager.add("rx", |data: Counter, _, tx: Sender<Message>| async move {
let mut rx = tx.subscribe();
let message = rx.recv().await.unwrap();

// Increment counter as soon as we received the right message
if matches!(message, Message::Hello) {
data.fetch_add(1, Ordering::Relaxed);
}

Ok(())
});
manager.add(
"rx",
|data: Counter, _, tx: Sender<Message>, _| async move {
let mut rx = tx.subscribe();
let message = rx.recv().await.unwrap();

// Increment counter as soon as we received the right message
if matches!(message, Message::Hello) {
data.fetch_add(1, Ordering::Relaxed);
}

Ok(())
},
);
}

// Create another service sending message over communication bus
manager.add("tx", |_, _, tx: Sender<Message>| async move {
manager.add("tx", |_, _, tx: Sender<Message>, _| async move {
tx.send(Message::Hello).unwrap();
Ok(())
});
Expand All @@ -288,29 +318,32 @@ mod tests {
let counter: Counter = Arc::new(AtomicUsize::new(0));
let mut manager = ServiceManager::<Counter, usize>::new(32, counter.clone());

manager.add("one", |counter: Counter, signal: Shutdown, _| async move {
let counter_clone = counter.clone();
manager.add(
"one",
|counter: Counter, signal: Shutdown, _, _| async move {
let counter_clone = counter.clone();

let work = tokio::task::spawn(async move {
// Increment counter once within the work task
counter_clone.fetch_add(1, Ordering::Relaxed);
let work = tokio::task::spawn(async move {
// Increment counter once within the work task
counter_clone.fetch_add(1, Ordering::Relaxed);

loop {
// We stay here forever now and make sure this task will not stop until we
// receive the shutdown signal
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
});
loop {
// We stay here forever now and make sure this task will not stop until we
// receive the shutdown signal
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
});

tokio::select! { _ = work => (), _ = signal => () };
tokio::select! { _ = work => (), _ = signal => () };

// Increment counter another time during shutdown
counter.fetch_add(1, Ordering::Relaxed);
// Increment counter another time during shutdown
counter.fetch_add(1, Ordering::Relaxed);

Ok(())
});
Ok(())
},
);

manager.add("two", |_, _, _| async move {
manager.add("two", |_, _, _, _| async move {
// Wait a little bit for the first task to do its work
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
panic!("This went wrong");
Expand All @@ -325,4 +358,37 @@ mod tests {
// Check if we could do our work and shutdown procedure
assert_eq!(counter.load(Ordering::Relaxed), 2);
}

#[tokio::test]
async fn ready_signal() {
let mut manager = ServiceManager::<usize, usize>::new(16, 0);

let service_ready = manager.add(
"ready_signal",
|_, _, _, tx_ready: ServiceReadySender| async {
// Send a message to indicate that this service is ready for some WORK!
tx_ready.send(()).unwrap();
Ok(())
},
);

// Blocking here until service has signalled that it's ready.
assert!(service_ready.await.is_ok());
}

#[tokio::test]
async fn ready_signal_error() {
let mut manager = ServiceManager::<usize, usize>::new(16, 0);

let service_ready = manager.add(
"ready_signal",
|_, _, _, _tx_ready: ServiceReadySender| async {
// This service doesn't indicate that it's ready!
Ok(())
},
);

// We panic when trying to wait for the service to become ready.
assert!(service_ready.await.is_err());
}
}
Loading

0 comments on commit dd73fe5

Please sign in to comment.