diff --git a/CHANGELOG.md b/CHANGELOG.md index dccd127e3..ac6e1d395 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - GraphQL replication service gets and verifies new entries and inserts them into the db [#137](https://github.com/p2panda/aquadoggo/pull/137) - Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166) +- Service ready signal [#218](https://github.com/p2panda/aquadoggo/pull/218) ### Changed diff --git a/aquadoggo/src/http/service.rs b/aquadoggo/src/http/service.rs index 1fb9992c4..7fdc29be4 100644 --- a/aquadoggo/src/http/service.rs +++ b/aquadoggo/src/http/service.rs @@ -7,13 +7,14 @@ use axum::extract::Extension; use axum::http::Method; use axum::routing::get; use axum::Router; +use log::{debug, warn}; use tower_http::cors::{Any, CorsLayer}; use crate::bus::ServiceSender; use crate::context::Context; use crate::http::api::{handle_graphql_playground, handle_graphql_query}; use crate::http::context::HttpServiceContext; -use crate::manager::Shutdown; +use crate::manager::{ServiceReadySender, Shutdown}; const GRAPHQL_ROUTE: &str = "/graphql"; @@ -38,7 +39,12 @@ pub fn build_server(http_context: HttpServiceContext) -> Router { } /// Start HTTP server. -pub async fn http_service(context: Context, signal: Shutdown, tx: ServiceSender) -> Result<()> { +pub async fn http_service( + context: Context, + signal: Shutdown, + tx: ServiceSender, + tx_ready: ServiceReadySender, +) -> Result<()> { let http_port = context.config.http_port; let http_address = SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), http_port); @@ -48,6 +54,11 @@ pub async fn http_service(context: Context, signal: Shutdown, tx: ServiceSender) axum::Server::try_bind(&http_address)? .serve(build_server(http_context).into_make_service()) .with_graceful_shutdown(async { + debug!("HTTP service is ready"); + if tx_ready.send(()).is_err() { + warn!("No subscriber informed about HTTP service being ready"); + }; + signal.await.ok(); }) .await?; diff --git a/aquadoggo/src/manager.rs b/aquadoggo/src/manager.rs index 54275ed21..eb254fa16 100644 --- a/aquadoggo/src/manager.rs +++ b/aquadoggo/src/manager.rs @@ -4,8 +4,8 @@ use std::future::Future; use anyhow::Result; use log::{error, info}; -use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; +use tokio::sync::{broadcast, oneshot}; use tokio::task; use tokio::task::JoinHandle; use triggered::{Listener, Trigger}; @@ -13,6 +13,14 @@ use triggered::{Listener, Trigger}; /// Sends messages through the communication bus between services. pub type Sender = broadcast::Sender; +// pub type ServiceReady = oneshot::channel<()>; + +// Receives ready signal from services once they are ready to handle messages on the communication bus. +pub type ServiceReadyReceiver = oneshot::Receiver<()>; + +/// Transmits ready signal from services once they are ready to handle messages on the communication bus. +pub type ServiceReadySender = oneshot::Sender<()>; + /// Receives shutdown signal for services so they can react accordingly. pub type Shutdown = JoinHandle<()>; @@ -27,7 +35,13 @@ where D: Clone + Send + Sync + 'static, M: Clone + Send + Sync + 'static, { - async fn call(&self, context: D, shutdown: Shutdown, tx: Sender) -> Result<()>; + async fn call( + &self, + context: D, + shutdown: Shutdown, + tx: Sender, + tx_ready: ServiceReadySender, + ) -> Result<()>; } /// Implements our `Service` trait for a generic async function. @@ -35,7 +49,7 @@ where impl Service for FN where // Function accepting a context and our communication channels, returning a future. - FN: Fn(D, Shutdown, Sender) -> F + Sync, + FN: Fn(D, Shutdown, Sender, ServiceReadySender) -> F + Sync, // A future F: Future> + Send + 'static, // Generic context type. @@ -48,8 +62,14 @@ where /// /// This gets automatically wrapped in a static, boxed and pinned function signature by the /// `async_trait` macro so we don't need to do it ourselves. - async fn call(&self, context: D, shutdown: Shutdown, tx: Sender) -> Result<()> { - (self)(context, shutdown, tx).await + async fn call( + &self, + context: D, + shutdown: Shutdown, + tx: Sender, + tx_ready: ServiceReadySender, + ) -> Result<()> { + (self)(context, shutdown, tx, tx_ready).await } } @@ -140,7 +160,7 @@ where &mut self, name: &'static str, service: F, - ) { + ) -> ServiceReadyReceiver { // Sender for communication bus let tx = self.tx.clone(); @@ -156,6 +176,10 @@ where // Sender for exit signal let exit_signal = self.exit_signal.clone(); + // Oneshot channel on which services send a message once they are listening on the + // communication bus. + let (tx_ready, rx_ready) = oneshot::channel::<()>(); + // Reference to shared context let context = self.context.clone(); @@ -163,7 +187,7 @@ where info!("Start {} service", name); // Run the service! - let handle = service.call(context, signal, tx).await; + let handle = service.call(context, signal, tx, tx_ready).await; // Drop the shutdown sender of this service when we're done, this signals the shutdown // process that this service has finally stopped @@ -179,6 +203,9 @@ where // `Drop` trait on `Signal` we will be able to fire a signal also when this task panics // or stops. }); + + // Return ready signal receiver so this method can be awaited. + rx_ready } /// Future which resolves as soon as a service returned an error, panicked or stopped. @@ -215,7 +242,7 @@ mod tests { use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - use super::{Sender, ServiceManager, Shutdown}; + use super::{Sender, ServiceManager, ServiceReadySender, Shutdown}; type Counter = Arc; @@ -223,7 +250,7 @@ mod tests { async fn service_manager() { let mut manager = ServiceManager::::new(16, 0); - manager.add("test", |_, signal: Shutdown, _| async { + manager.add("test", |_, signal: Shutdown, _, _| async { let work = tokio::task::spawn(async { loop { // Doing some very important work here .. @@ -258,21 +285,24 @@ mod tests { // Create five services waiting for message for _ in 0..5 { - manager.add("rx", |data: Counter, _, tx: Sender| async move { - let mut rx = tx.subscribe(); - let message = rx.recv().await.unwrap(); - - // Increment counter as soon as we received the right message - if matches!(message, Message::Hello) { - data.fetch_add(1, Ordering::Relaxed); - } - - Ok(()) - }); + manager.add( + "rx", + |data: Counter, _, tx: Sender, _| async move { + let mut rx = tx.subscribe(); + let message = rx.recv().await.unwrap(); + + // Increment counter as soon as we received the right message + if matches!(message, Message::Hello) { + data.fetch_add(1, Ordering::Relaxed); + } + + Ok(()) + }, + ); } // Create another service sending message over communication bus - manager.add("tx", |_, _, tx: Sender| async move { + manager.add("tx", |_, _, tx: Sender, _| async move { tx.send(Message::Hello).unwrap(); Ok(()) }); @@ -288,29 +318,32 @@ mod tests { let counter: Counter = Arc::new(AtomicUsize::new(0)); let mut manager = ServiceManager::::new(32, counter.clone()); - manager.add("one", |counter: Counter, signal: Shutdown, _| async move { - let counter_clone = counter.clone(); + manager.add( + "one", + |counter: Counter, signal: Shutdown, _, _| async move { + let counter_clone = counter.clone(); - let work = tokio::task::spawn(async move { - // Increment counter once within the work task - counter_clone.fetch_add(1, Ordering::Relaxed); + let work = tokio::task::spawn(async move { + // Increment counter once within the work task + counter_clone.fetch_add(1, Ordering::Relaxed); - loop { - // We stay here forever now and make sure this task will not stop until we - // receive the shutdown signal - tokio::time::sleep(std::time::Duration::from_millis(50)).await; - } - }); + loop { + // We stay here forever now and make sure this task will not stop until we + // receive the shutdown signal + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + }); - tokio::select! { _ = work => (), _ = signal => () }; + tokio::select! { _ = work => (), _ = signal => () }; - // Increment counter another time during shutdown - counter.fetch_add(1, Ordering::Relaxed); + // Increment counter another time during shutdown + counter.fetch_add(1, Ordering::Relaxed); - Ok(()) - }); + Ok(()) + }, + ); - manager.add("two", |_, _, _| async move { + manager.add("two", |_, _, _, _| async move { // Wait a little bit for the first task to do its work tokio::time::sleep(std::time::Duration::from_millis(50)).await; panic!("This went wrong"); @@ -325,4 +358,37 @@ mod tests { // Check if we could do our work and shutdown procedure assert_eq!(counter.load(Ordering::Relaxed), 2); } + + #[tokio::test] + async fn ready_signal() { + let mut manager = ServiceManager::::new(16, 0); + + let service_ready = manager.add( + "ready_signal", + |_, _, _, tx_ready: ServiceReadySender| async { + // Send a message to indicate that this service is ready for some WORK! + tx_ready.send(()).unwrap(); + Ok(()) + }, + ); + + // Blocking here until service has signalled that it's ready. + assert!(service_ready.await.is_ok()); + } + + #[tokio::test] + async fn ready_signal_error() { + let mut manager = ServiceManager::::new(16, 0); + + let service_ready = manager.add( + "ready_signal", + |_, _, _, _tx_ready: ServiceReadySender| async { + // This service doesn't indicate that it's ready! + Ok(()) + }, + ); + + // We panic when trying to wait for the service to become ready. + assert!(service_ready.await.is_err()); + } } diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index e6299029b..373aa3e3e 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -1,13 +1,13 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use anyhow::Result; -use log::debug; +use log::{debug, warn}; use p2panda_rs::storage_provider::traits::OperationStore; use tokio::task; use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; -use crate::manager::Shutdown; +use crate::manager::{ServiceReadySender, Shutdown}; use crate::materializer::tasks::{dependency_task, reduce_task, schema_task}; use crate::materializer::worker::{Factory, Task, TaskStatus}; use crate::materializer::TaskInput; @@ -28,6 +28,7 @@ pub async fn materializer_service( context: Context, shutdown: Shutdown, tx: ServiceSender, + tx_ready: ServiceReadySender, ) -> Result<()> { // Create worker factory with task queue let pool_size = context.config.worker_pool_size as usize; @@ -114,6 +115,11 @@ pub async fn materializer_service( } }); + debug!("Materialiser service is ready"); + if tx_ready.send(()).is_err() { + warn!("No subscriber informed about materialiser service being ready"); + }; + // Wait until we received the application shutdown signal or handle closed tokio::select! { _ = handle => (), @@ -144,7 +150,7 @@ mod tests { key_pair, operation, operation_fields, random_document_id, random_operation_id, }; use rstest::rstest; - use tokio::sync::broadcast; + use tokio::sync::{broadcast, oneshot}; use tokio::task; use crate::context::Context; @@ -196,17 +202,19 @@ mod tests { } }); let (tx, _) = broadcast::channel(1024); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); // Start materializer service let tx_clone = tx.clone(); let handle = tokio::spawn(async move { - materializer_service(context, shutdown, tx_clone) + materializer_service(context, shutdown, tx_clone, tx_ready) .await .unwrap(); }); - // Wait for service to be ready .. - tokio::time::sleep(Duration::from_millis(50)).await; + if rx_ready.await.is_err() { + panic!("Service dropped"); + } // Send a message over the bus which kicks in materialization tx.send(crate::bus::ServiceMessage::NewOperation( @@ -269,18 +277,23 @@ mod tests { } }); let (tx, _) = broadcast::channel(1024); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); // Start materializer service let tx_clone = tx.clone(); let handle = tokio::spawn(async move { - materializer_service(context, shutdown, tx_clone) + materializer_service(context, shutdown, tx_clone, tx_ready) .await .unwrap(); }); + if rx_ready.await.is_err() { + panic!("Service dropped"); + } + // Wait for service to be done .. it should materialize the document since it was waiting // as a "pending" task in the database - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; // Make sure the service did not crash and is still running assert_eq!(handle.is_finished(), false); @@ -333,17 +346,19 @@ mod tests { } }); let (tx, _) = broadcast::channel(1024); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); // Start materializer service let tx_clone = tx.clone(); let handle = tokio::spawn(async move { - materializer_service(context, shutdown, tx_clone) + materializer_service(context, shutdown, tx_clone, tx_ready) .await .unwrap(); }); - // Wait for service to be ready .. - tokio::time::sleep(Duration::from_millis(50)).await; + if rx_ready.await.is_err() { + panic!("Service dropped"); + } // Send a message over the bus which kicks in materialization tx.send(crate::bus::ServiceMessage::NewOperation( @@ -462,14 +477,17 @@ mod tests { // Start materializer service let tx_clone = tx.clone(); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); + let handle = tokio::spawn(async move { - materializer_service(context, shutdown, tx_clone) + materializer_service(context, shutdown, tx_clone, tx_ready) .await .unwrap(); }); - // Wait for service to be ready .. - tokio::time::sleep(Duration::from_millis(50)).await; + if rx_ready.await.is_err() { + panic!("Service dropped"); + } // Then straight away publish a CREATE operation and send it to the bus. let (entry_encoded, _) = send_to_store(&db.store, &operation, None, &key_pair).await; @@ -481,7 +499,7 @@ mod tests { .unwrap(); // Wait a little bit for work being done .. - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; // Make sure the service did not crash and is still running assert_eq!(handle.is_finished(), false); diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index d00b69461..4bba0fca3 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -60,13 +60,26 @@ impl Node { let mut manager = ServiceManager::::new(1024, context); // Start materializer service. - manager.add("materializer", materializer_service); - + if manager + .add("materializer", materializer_service) + .await + .is_err() + { + panic!("Failed starting materialiser service"); + } // Start replication service - manager.add("replication", replication_service); + if manager + .add("replication", replication_service) + .await + .is_err() + { + panic!("Failed starting replication service"); + } // Start HTTP server with GraphQL API - manager.add("http", http_service); + if manager.add("http", http_service).await.is_err() { + panic!("Failed starting HTTP service"); + } Self { pool, manager } } diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs index f1885989b..574d60a3c 100644 --- a/aquadoggo/src/replication/service.rs +++ b/aquadoggo/src/replication/service.rs @@ -21,7 +21,7 @@ use crate::context::Context; use crate::db::request::PublishEntryRequest; use crate::db::stores::StorageEntry; use crate::graphql::replication::client; -use crate::manager::Shutdown; +use crate::manager::{ServiceReadySender, Shutdown}; /// Replication service polling other nodes frequently to ask them about new entries from a defined /// set of authors and log ids. @@ -29,6 +29,7 @@ pub async fn replication_service( context: Context, shutdown: Shutdown, tx: ServiceSender, + tx_ready: ServiceReadySender, ) -> Result<()> { // Prepare replication configuration let config = &context.config.replication; @@ -101,6 +102,11 @@ pub async fn replication_service( } }); + debug!("Replication service is ready"); + if tx_ready.send(()).is_err() { + warn!("No subscriber informed about replication service being ready"); + }; + tokio::select! { _ = handle => (), _ = shutdown => (), @@ -275,7 +281,7 @@ mod tests { use p2panda_rs::storage_provider::traits::EntryStore; use p2panda_rs::test_utils::constants::SCHEMA_ID; use rstest::rstest; - use tokio::sync::broadcast; + use tokio::sync::{broadcast, oneshot}; use tokio::task; use crate::context::Context; @@ -323,13 +329,18 @@ mod tests { }, SchemaProvider::default(), ); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); let http_server_billie = task::spawn(async { - http_service(context_billie, shutdown_billie, tx_billie) + http_service(context_billie, shutdown_billie, tx_billie, tx_ready) .await .unwrap(); }); + if rx_ready.await.is_err() { + panic!("Service dropped"); + } + // Our test database helper already populated the database for us. We retreive the // public keys here of the authors who created these test data entries let public_key = billie_db @@ -359,14 +370,19 @@ mod tests { Context::new(ada_db.store.clone(), config_ada, SchemaProvider::default()); let tx_ada = tx.clone(); let shutdown_ada = shutdown_handle(); + let (tx_ready, rx_ready) = oneshot::channel::<()>(); // Ada starts replication service to get data from Billies GraphQL API let replication_service_ada = task::spawn(async { - replication_service(context_ada, shutdown_ada, tx_ada) + replication_service(context_ada, shutdown_ada, tx_ada, tx_ready) .await .unwrap(); }); + if rx_ready.await.is_err() { + panic!("Service dropped"); + } + // Wait a little bit for replication to take place tokio::time::sleep(Duration::from_millis(500)).await;