diff --git a/CHANGELOG.md b/CHANGELOG.md index 46f0a2331..154804350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Improve CI, track test coverage [#139](https://github.com/p2panda/aquadoggo/pull/139) - Fix compatibility with PostgreSQL, change sqlx runtime to `tokio` [#170](https://github.com/p2panda/aquadoggo/pull/170) - Use UPSERT for inserting or updating documents [#173](https://github.com/p2panda/aquadoggo/pull/173) +- Don't critically fail reduce task when document missing [#177](https://github.com/p2panda/aquadoggo/pull/177) ## [0.2.0] diff --git a/aquadoggo/src/db/stores/test_utils.rs b/aquadoggo/src/db/stores/test_utils.rs index 3dd98f294..5b2514bc2 100644 --- a/aquadoggo/src/db/stores/test_utils.rs +++ b/aquadoggo/src/db/stores/test_utils.rs @@ -12,9 +12,7 @@ use p2panda_rs::operation::{ PinnedRelation, PinnedRelationList, Relation, RelationList, VerifiedOperation, }; use p2panda_rs::schema::SchemaId; -use p2panda_rs::storage_provider::traits::{ - AsStorageEntry, AsStorageLog, EntryStore, LogStore, OperationStore, StorageProvider, -}; +use p2panda_rs::storage_provider::traits::{OperationStore, StorageProvider}; use p2panda_rs::test_utils::constants::{DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID}; use p2panda_rs::test_utils::fixtures::{operation, operation_fields}; use rstest::fixture; @@ -23,7 +21,6 @@ use sqlx::Any; use tokio::runtime::Builder; use crate::db::provider::SqlStorage; -use crate::db::stores::{StorageEntry, StorageLog}; use crate::db::traits::DocumentStore; use crate::db::{connection_pool, create_database, run_pending_migrations, Pool}; use crate::graphql::client::{EntryArgsRequest, PublishEntryRequest, PublishEntryResponse}; @@ -378,7 +375,7 @@ async fn create_test_db( } } -async fn send_to_store( +pub async fn send_to_store( store: &SqlStorage, operation: &Operation, document_id: Option<&DocumentId>, diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index c7b5ceb5f..f2bb8eb2e 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -132,15 +132,24 @@ pub async fn materializer_service( mod tests { use std::time::Duration; - use p2panda_rs::operation::{AsVerifiedOperation, OperationValue}; + use p2panda_rs::document::DocumentViewId; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::operation::{ + AsVerifiedOperation, Operation, OperationValue, PinnedRelation, PinnedRelationList, + Relation, RelationList, + }; use p2panda_rs::storage_provider::traits::OperationStore; use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; + use p2panda_rs::test_utils::fixtures::{ + key_pair, operation, operation_fields, random_document_id, random_document_view_id, + random_operation_id, + }; use rstest::rstest; use tokio::sync::broadcast; use tokio::task; use crate::context::Context; - use crate::db::stores::test_utils::{test_db, TestDatabase, TestDatabaseRunner}; + use crate::db::stores::test_utils::{send_to_store, test_db, TestDatabase, TestDatabaseRunner}; use crate::db::traits::DocumentStore; use crate::materializer::{Task, TaskInput}; use crate::Configuration; @@ -282,4 +291,193 @@ mod tests { ); }); } + + #[rstest] + fn materialize_update_document( + #[from(test_db)] + #[with(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("name", OperationValue::Text("panda".into()))])] + runner: TestDatabaseRunner, + ) { + // Prepare database which inserts data for one document + runner.with_db_teardown(|db: TestDatabase| async move { + // Identify key_[air, document and operation which was inserted for testing + let key_pair = db.key_pairs.first().unwrap(); + let document_id = db.documents.first().unwrap(); + let verified_operation = db + .store + .get_operations_by_document_id(document_id) + .await + .unwrap() + .first() + .unwrap() + .to_owned(); + + // Prepare arguments for service + let context = Context::new(db.store.clone(), Configuration::default()); + let shutdown = task::spawn(async { + loop { + // Do this forever .. this means that the shutdown handler will never resolve + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + let (tx, _) = broadcast::channel(1024); + + // Start materializer service + let tx_clone = tx.clone(); + let handle = tokio::spawn(async move { + materializer_service(context, shutdown, tx_clone) + .await + .unwrap(); + }); + + // Wait for service to be ready .. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Send a message over the bus which kicks in materialization + tx.send(crate::bus::ServiceMessage::NewOperation( + verified_operation.operation_id().to_owned(), + )) + .unwrap(); + + // Wait a little bit for work being done .. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Then straight away publish an UPDATE on this document and send it over the bus too. + let (entry_encoded, _) = send_to_store( + &db.store, + &operation( + Some(operation_fields(vec![( + "name", + OperationValue::Text("panda123".into()), + )])), + Some(verified_operation.operation_id().to_owned().into()), + Some(TEST_SCHEMA_ID.parse().unwrap()), + ), + Some(document_id), + key_pair, + ) + .await; + + // Send a message over the bus which kicks in materialization + tx.send(crate::bus::ServiceMessage::NewOperation( + entry_encoded.hash().into(), + )) + .unwrap(); + + // Wait a little bit for work being done .. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Make sure the service did not crash and is still running + assert_eq!(handle.is_finished(), false); + + // Check database for materialized documents + let document = db + .store + .get_document_by_id(document_id) + .await + .unwrap() + .expect("We expect that the document is `Some`"); + assert_eq!(document.id(), &entry_encoded.hash().into()); + assert_eq!( + document.fields().get("name").unwrap().value().to_owned(), + OperationValue::Text("panda123".into()) + ); + }); + } + + #[rstest] + #[case( + operation(Some(operation_fields(vec![( + "relation", + OperationValue::Relation(Relation::new( + random_document_id(), + )), + )])), None, None) + )] + #[case( + operation(Some(operation_fields(vec![( + "pinned_relation", + OperationValue::PinnedRelation(PinnedRelation::new( + DocumentViewId::new(&[random_operation_id(), random_operation_id()]).unwrap(), + )), + )])), None, None) + )] + #[case( + operation(Some(operation_fields(vec![( + "relation_list", + OperationValue::RelationList(RelationList::new( + vec![ + random_document_id(), + random_document_id(), + random_document_id() + ], + )), + )])), None, None) + )] + #[case( + operation(Some(operation_fields(vec![( + "pinned_relation_list", + OperationValue::PinnedRelationList(PinnedRelationList::new( + vec![ + DocumentViewId::new(&[random_operation_id(), random_operation_id()]).unwrap(), + DocumentViewId::new(&[random_operation_id(), random_operation_id(), random_operation_id()]).unwrap() + ], + )), + )])), None, None) + )] + fn materialize_complex_documents( + #[from(test_db)] + #[with(0, 0)] + runner: TestDatabaseRunner, + #[case] operation: Operation, + key_pair: KeyPair, + ) { + // Prepare empty database + runner.with_db_teardown(|db: TestDatabase| async move { + // Prepare arguments for service + let context = Context::new(db.store.clone(), Configuration::default()); + let shutdown = task::spawn(async { + loop { + // Do this forever .. this means that the shutdown handler will never resolve + tokio::time::sleep(Duration::from_millis(100)).await; + } + }); + let (tx, _) = broadcast::channel(1024); + + // Start materializer service + let tx_clone = tx.clone(); + let handle = tokio::spawn(async move { + materializer_service(context, shutdown, tx_clone) + .await + .unwrap(); + }); + + // Wait for service to be ready .. + tokio::time::sleep(Duration::from_millis(50)).await; + + // Then straight away publish a CREATE operation and send it to the bus. + let (entry_encoded, _) = send_to_store(&db.store, &operation, None, &key_pair).await; + + // Send a message over the bus which kicks in materialization + tx.send(crate::bus::ServiceMessage::NewOperation( + entry_encoded.hash().into(), + )) + .unwrap(); + + // Wait a little bit for work being done .. + tokio::time::sleep(Duration::from_millis(100)).await; + + // Make sure the service did not crash and is still running + assert_eq!(handle.is_finished(), false); + + // Check database for materialized documents + let document = db + .store + .get_document_by_id(&entry_encoded.hash().into()) + .await + .unwrap() + .expect("We expect that the document is `Some`"); + assert_eq!(document.id(), &entry_encoded.hash().into()); + }); + } } diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 5f8fb929b..3d0577b11 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -1,5 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use log::debug; use p2panda_rs::document::DocumentViewId; use crate::context::Context; @@ -19,25 +20,40 @@ use crate::materializer::TaskInput; /// Expects a _reduce_ task to have completed successfully for the given document view itself and /// returns a critical error otherwise. pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + debug!("Working on dependency task {:#?}", input); + // Here we retrive the document view by document view id. let document_view = match input.document_view_id { Some(view_id) => context .store .get_document_view_by_id(&view_id) .await - .map_err(|_| TaskError::Critical) - , + .map_err(|err| { + debug!("Fatal error getting document view from storage"); + debug!("{}", err); + TaskError::Critical + }) + , // We expect to handle document_view_ids in a dependency task. None => Err(TaskError::Critical), }?; let document_view = match document_view { - Some(document_view) => Ok(document_view), + Some(document_view) => { + debug!( + "Document view retrieved from storage with id: {}", + document_view.id() + ); + Ok(document_view) + } // If no document view for the id passed into this task could be retrieved then this // document has been deleted or the document view id was invalid. As "dependency" tasks // are only dispatched after a successful "reduce" task, neither `None` case should // happen, so this is a critical error. - None => Err(TaskError::Critical), + None => { + debug!("Expected document view not found in the store."); + Err(TaskError::Critical) + } }?; let mut next_tasks = Vec::new(); @@ -51,15 +67,21 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // same as above... + debug!("Relation list field found, no action required."); } p2panda_rs::operation::OperationValue::PinnedRelation(pinned_relation) => { // These are pinned relations. We may have the operations for these views in the db, // but this view wasn't pinned yet, so hasn't been materialised. To make sure it is // materialised when possible, we dispatch a "reduce" task for any pinned relations // which aren't found. + debug!( + "Pinned relation field found refering to view id: {}", + pinned_relation.view_id() + ); next_tasks.push( construct_relation_task(&context, pinned_relation.view_id().clone()).await?, ); @@ -67,6 +89,10 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // same as above... for document_view_id in pinned_relation_list.iter() { + debug!( + "Pinned relation list field found containing view id: {}", + document_view_id + ); next_tasks .push(construct_relation_task(&context, document_view_id.clone()).await?); } @@ -75,7 +101,11 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult> = next_tasks.into_iter().flatten().collect(); + + debug!("Dispatching {} reduce task(s)", next_tasks.len()); + + Ok(Some(next_tasks)) } /// Returns a _reduce_ task for a given document view only if that view does not yet exist in the @@ -84,17 +114,27 @@ async fn construct_relation_task( context: &Context, document_view_id: DocumentViewId, ) -> Result>, TaskError> { + debug!("Get view for pinned relation with id: {}", document_view_id); match context .store .get_document_view_by_id(&document_view_id) .await - .map_err(|_| TaskError::Critical)? - { - Some(_) => Ok(None), - None => Ok(Some(Task::new( - "reduce", - TaskInput::new(None, Some(document_view_id)), - ))), + .map_err(|err| { + debug!("Fatal error getting document view from storage"); + debug!("{}", err); + TaskError::Critical + })? { + Some(_) => { + debug!("View found for pinned relation: {}", document_view_id); + Ok(None) + } + None => { + debug!("No view found for pinned relation: {}", document_view_id); + Ok(Some(Task::new( + "reduce", + TaskInput::new(None, Some(document_view_id)), + ))) + } } } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 730fae9c2..baa8ffc10 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -21,7 +21,13 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Ok(document_id), + None => { + debug!("No document found for this view, exit without dispatching any other tasks"); + return Ok(None); + } + }?; // Get all operations for the requested document let operations = context @@ -65,27 +71,27 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Result { +) -> Result, TaskError> { match (&input.document_id, &input.document_view_id) { // The `DocumentId` is already given, we don't have to do anything - (Some(document_id), None) => Ok(document_id.to_owned()), + (Some(document_id), None) => Ok(Some(document_id.to_owned())), // A `DocumentViewId` is given, let's find out its document id (None, Some(document_view_id)) => { // @TODO: We can skip this step if we implement: // https://github.com/p2panda/aquadoggo/issues/148 + debug!("Find document for view with id: {}", document_view_id); let operation_id = document_view_id.clone().into_iter().next().unwrap(); - match context + context .store .get_document_by_operation_id(&operation_id) .await - .map_err(|_| TaskError::Critical)? - { - Some(document_id) => Ok(document_id), - None => Err(TaskError::Critical), - } + .map_err(|err| { + debug!("Fatal error getting document_id from storage"); + debug!("{}", err); + TaskError::Critical + }) } - // None or both have been provided which smells like a bug (_, _) => Err(TaskError::Critical), } @@ -189,27 +195,19 @@ async fn reduce_document( #[cfg(test)] mod tests { - use std::convert::TryFrom; - use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId}; - use p2panda_rs::entry::{sign_and_encode, Entry}; - use p2panda_rs::identity::Author; - use p2panda_rs::operation::{ - AsVerifiedOperation, OperationEncoded, OperationValue, VerifiedOperation, - }; - use p2panda_rs::storage_provider::traits::{ - AsStorageEntry, EntryStore, OperationStore, StorageProvider, - }; + use p2panda_rs::operation::{AsVerifiedOperation, OperationValue}; + use p2panda_rs::storage_provider::traits::OperationStore; use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; - use p2panda_rs::test_utils::fixtures::{operation, operation_fields}; + use p2panda_rs::test_utils::fixtures::{ + operation, operation_fields, random_document_id, random_document_view_id, + }; use rstest::rstest; use crate::config::Configuration; use crate::context::Context; - use crate::db::stores::test_utils::{test_db, TestDatabase, TestDatabaseRunner}; - use crate::db::stores::StorageEntry; + use crate::db::stores::test_utils::{send_to_store, test_db, TestDatabase, TestDatabaseRunner}; use crate::db::traits::DocumentStore; - use crate::graphql::client::EntryArgsRequest; use crate::materializer::tasks::reduce_task; use crate::materializer::TaskInput; @@ -218,7 +216,7 @@ mod tests { #[from(test_db)] #[with( 2, - 20, + 5, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("username", OperationValue::Text("panda".into()))], @@ -254,7 +252,6 @@ mod tests { runner.with_db_teardown(|db: TestDatabase| async move { let document_id = db.documents.first().unwrap(); let key_pair = db.key_pairs.first().unwrap(); - let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); let context = Context::new(db.store.clone(), Configuration::default()); let input = TaskInput::new(Some(document_id.clone()), None); @@ -264,48 +261,20 @@ mod tests { assert!(reduce_task(context.clone(), input.clone()).await.is_ok()); // Now we create and insert an UPDATE operation for this document. - let entry_args = db - .store - .get_entry_args(&EntryArgsRequest { - author, - document: Some(document_id.clone()), - }) - .await - .unwrap(); - - let operation = operation( - Some(operation_fields(vec![( - "username", - OperationValue::Text("meeeeeee".to_string()), - )])), - Some(document_id.as_str().parse().unwrap()), - None, - ); - - let entry = Entry::new( - &entry_args.log_id, - Some(&operation), - entry_args.skiplink.as_ref(), - entry_args.backlink.as_ref(), - &entry_args.seq_num, + let (_, _) = send_to_store( + &db.store, + &operation( + Some(operation_fields(vec![( + "username", + OperationValue::Text("meeeeeee".to_string()), + )])), + Some(document_id.as_str().parse().unwrap()), + None, + ), + Some(document_id), + key_pair, ) - .unwrap(); - - let entry_signed = sign_and_encode(&entry, key_pair).unwrap(); - let operation_encoded = OperationEncoded::try_from(&operation).unwrap(); - - db.store - .insert_entry(StorageEntry::new(&entry_signed, &operation_encoded).unwrap()) - .await - .unwrap(); - - let verified_operation = - VerifiedOperation::new_from_entry(&entry_signed, &operation_encoded).unwrap(); - - db.store - .insert_operation(&verified_operation, document_id) - .await - .unwrap(); + .await; // This should now find the new UPDATE operation and perform an update on the document // in the documents table. @@ -472,4 +441,26 @@ mod tests { assert!(reduce_task(context.clone(), input).await.is_err()); }); } + + #[rstest] + fn does_not_error_when_document_missing( + #[from(test_db)] + #[with(0, 0)] + runner: TestDatabaseRunner, + #[from(random_document_id)] document_id: DocumentId, + #[from(random_document_view_id)] document_view_id: DocumentViewId, + ) { + // Prepare empty database. + runner.with_db_teardown(|db: TestDatabase| async move { + let context = Context::new(db.store.clone(), Configuration::default()); + + // Dispatch a reduce task for a document which doesn't exist by it's document id. + let input = TaskInput::new(Some(document_id), None); + assert!(reduce_task(context.clone(), input).await.is_ok()); + + // Dispatch a reduce task for a document which doesn't exist by it's document view id. + let input = TaskInput::new(None, Some(document_view_id)); + assert!(reduce_task(context.clone(), input).await.is_ok()); + }); + } }