diff --git a/CHANGELOG.md b/CHANGELOG.md index ad6e20993..bf1514994 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - e2e publish entry tests [#167](https://github.com/p2panda/aquadoggo/pull/167) - Reschedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168) - Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166/files) +- Debug logging in reduce task [#175](https://github.com/p2panda/aquadoggo/pull/175) ### Changed @@ -40,6 +41,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Refactor tests to use fixtures exported from `p2panda-rs` [#147](https://github.com/p2panda/aquadoggo/pull/147) - Use `DocumentViewId` for previous operations [#147](https://github.com/p2panda/aquadoggo/pull/147) - Use `VerifiedOperation` [#158](https://github.com/p2panda/aquadoggo/pull/158) +- Refactor test_db creation [#176](https://github.com/p2panda/aquadoggo/pull/176) ### Fixed diff --git a/aquadoggo/src/db/stores/test_utils.rs b/aquadoggo/src/db/stores/test_utils.rs index 609669f55..3dd98f294 100644 --- a/aquadoggo/src/db/stores/test_utils.rs +++ b/aquadoggo/src/db/stores/test_utils.rs @@ -4,7 +4,7 @@ use std::convert::TryFrom; use futures::Future; use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId}; -use p2panda_rs::entry::{sign_and_encode, Entry}; +use p2panda_rs::entry::{sign_and_encode, Entry, EntrySigned}; use p2panda_rs::hash::Hash; use p2panda_rs::identity::{Author, KeyPair}; use p2panda_rs::operation::{ @@ -16,7 +16,7 @@ use p2panda_rs::storage_provider::traits::{ AsStorageEntry, AsStorageLog, EntryStore, LogStore, OperationStore, StorageProvider, }; use p2panda_rs::test_utils::constants::{DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID}; -use p2panda_rs::test_utils::fixtures::{create_operation, delete_operation, update_operation}; +use p2panda_rs::test_utils::fixtures::{operation, operation_fields}; use rstest::fixture; use sqlx::migrate::MigrateDatabase; use sqlx::Any; @@ -26,7 +26,7 @@ use crate::db::provider::SqlStorage; use crate::db::stores::{StorageEntry, StorageLog}; use crate::db::traits::DocumentStore; use crate::db::{connection_pool, create_database, run_pending_migrations, Pool}; -use crate::graphql::client::{EntryArgsRequest, PublishEntryRequest}; +use crate::graphql::client::{EntryArgsRequest, PublishEntryRequest, PublishEntryResponse}; use crate::test_helpers::TEST_CONFIG; /// The fields used as defaults in the tests. @@ -313,7 +313,7 @@ async fn create_test_db( no_of_entries: usize, no_of_authors: usize, with_delete: bool, - schema: SchemaId, + schema_id: SchemaId, create_operation_fields: Vec<(&'static str, OperationValue)>, update_operation_fields: Vec<(&'static str, OperationValue)>, ) -> TestDatabase { @@ -333,68 +333,41 @@ async fn create_test_db( } for key_pair in &key_pairs { - let mut document: Option = None; - let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + let mut document_id: Option = None; + let mut previous_operation: Option = None; for index in 0..no_of_entries { - let next_entry_args = store - .get_entry_args(&EntryArgsRequest { - author: author.clone(), - document: document.as_ref().cloned(), - }) - .await - .unwrap(); - - let next_operation = if index == 0 { - create_operation(&create_operation_fields) - } else if index == (no_of_entries - 1) && with_delete { - delete_operation(&next_entry_args.backlink.clone().unwrap().into()) - } else { - update_operation( - &update_operation_fields, - &next_entry_args.backlink.clone().unwrap().into(), - ) + // Create an operation based on the current index and whether this document should contain + // a DELETE operation. + let next_operation_fields = match index { + // First operation is a CREATE. + 0 => Some(operation_fields(create_operation_fields.clone())), + // Last operation is a DELETE if the with_delete flag is set. + seq if seq == (no_of_entries - 1) && with_delete => None, + // All other operations are UPDATE. + _ => Some(operation_fields(update_operation_fields.clone())), }; - let next_entry = Entry::new( - &next_entry_args.log_id, - Some(&next_operation), - next_entry_args.skiplink.as_ref(), - next_entry_args.backlink.as_ref(), - &next_entry_args.seq_num, + // Publish the operation encoded on an entry to storage. + let (entry_encoded, publish_entry_response) = send_to_store( + &store, + &operation( + next_operation_fields, + previous_operation, + Some(schema_id.to_owned()), + ), + document_id.as_ref(), + key_pair, ) - .unwrap(); + .await; - let entry_encoded = sign_and_encode(&next_entry, key_pair).unwrap(); - let operation_encoded = OperationEncoded::try_from(&next_operation).unwrap(); + // Set the previous_operations based on the backlink. + previous_operation = publish_entry_response.backlink.map(|hash| hash.into()); + // If this was the first entry in the document, store the doucment id for later. if index == 0 { - document = Some(entry_encoded.hash().into()); - documents.push(entry_encoded.hash().into()); - } - - let storage_entry = StorageEntry::new(&entry_encoded, &operation_encoded).unwrap(); - - store.insert_entry(storage_entry).await.unwrap(); - - let storage_log = StorageLog::new( - &author, - &schema, - &document.clone().unwrap(), - &next_entry_args.log_id, - ); - - if next_entry_args.seq_num.is_first() { - store.insert_log(storage_log).await.unwrap(); + document_id = Some(entry_encoded.hash().into()); + documents.push(document_id.clone().unwrap()); } - - let verified_operation = - VerifiedOperation::new(&author, &entry_encoded.hash().into(), &next_operation) - .unwrap(); - - store - .insert_operation(&verified_operation, &document.clone().unwrap()) - .await - .unwrap(); } } @@ -405,6 +378,63 @@ async fn create_test_db( } } +async fn send_to_store( + store: &SqlStorage, + operation: &Operation, + document_id: Option<&DocumentId>, + key_pair: &KeyPair, +) -> (EntrySigned, PublishEntryResponse) { + // Get an Author from the key_pair. + let author = Author::try_from(key_pair.public_key().to_owned()).unwrap(); + + // Get the next entry arguments for this author and the passed document id. + let next_entry_args = store + .get_entry_args(&EntryArgsRequest { + author: author.clone(), + document: document_id.cloned(), + }) + .await + .unwrap(); + + // Construct the next entry. + let next_entry = Entry::new( + &next_entry_args.log_id, + Some(operation), + next_entry_args.skiplink.as_ref(), + next_entry_args.backlink.as_ref(), + &next_entry_args.seq_num, + ) + .unwrap(); + + // Encode both the entry and operation. + let entry_encoded = sign_and_encode(&next_entry, key_pair).unwrap(); + let operation_encoded = OperationEncoded::try_from(operation).unwrap(); + + // Publish the entry and get the next entry args. + let publish_entry_request = PublishEntryRequest { + entry_encoded: entry_encoded.clone(), + operation_encoded, + }; + let publish_entry_response = store.publish_entry(&publish_entry_request).await.unwrap(); + + // Set or unwrap the passed document_id. + let document_id = if operation.is_create() { + entry_encoded.hash().into() + } else { + document_id.unwrap().to_owned() + }; + + // Also insert the operation into the store. + let verified_operation = + VerifiedOperation::new(&author, &entry_encoded.hash().into(), operation).unwrap(); + store + .insert_operation(&verified_operation, &document_id) + .await + .unwrap(); + + (entry_encoded, publish_entry_response) +} + /// Create test database. async fn initialize_db() -> Pool { // Reset database first diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index e713a2192..431118b81 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -18,7 +18,7 @@ use crate::materializer::TaskInput; /// After succesfully reducing and storing a document view an array of dependency tasks is returned. /// If invalid inputs were passed or a fatal db error occured a critical error is returned. pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { - debug!("Working on {}", input); + debug!("Working on reduce task {:#?}", input); // Find out which document we are handling let document_id = resolve_document_id(&context, &input).await?; @@ -28,7 +28,10 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult TaskResult Ok(Some(vec![Task::new( - "dependency", - TaskInput::new(None, Some(view_id)), - )])), - None => Ok(None), + Some(view_id) => { + debug!("Dispatch dependency task for view with id: {}", view_id); + Ok(Some(vec![Task::new( + "dependency", + TaskInput::new(None, Some(view_id)), + )])) + } + None => { + debug!("No dependency tasks to dispatch"); + Ok(None) + } } } @@ -97,14 +106,23 @@ async fn reduce_document_view( { Ok(document) => { // If the document was deleted, then we return nothing + debug!( + "Document materialized to view with id: {}", + document_view_id + ); if document.is_deleted() { return Ok(None); }; document } - Err(_) => { - debug!("Reduce task: failed building"); + Err(err) => { + debug!( + "Document view materialization failed view with id: {}", + document_view_id + ); + debug!("{}", err); + // There is not enough operations yet to materialise this view. Maybe next time! return Ok(None); } @@ -115,7 +133,15 @@ async fn reduce_document_view( .store .insert_document_view(document.view().unwrap(), document.schema()) .await - .map_err(|_| TaskError::Critical)?; + .map_err(|err| { + debug!( + "Failed to insert document view into database: {}", + document_view_id + ); + debug!("{}", err); + + TaskError::Critical + })?; info!("Stored {} view {}", document, document.view_id()); @@ -140,7 +166,11 @@ async fn reduce_document( .store .insert_document(&document) .await - .map_err(|_| TaskError::Critical)?; + .map_err(|err| { + debug!("Failed to insert document into database: {}", document.id()); + debug!("{}", err); + TaskError::Critical + })?; // If the document was deleted, then we return nothing if document.is_deleted() { @@ -152,8 +182,10 @@ async fn reduce_document( // Return the new document_view id to be used in the resulting dependency task Ok(Some(document.view_id().to_owned())) } - Err(_) => { + Err(err) => { // There is not enough operations yet to materialise this view. Maybe next time! + debug!("Document materialization error: {}", err); + Ok(None) } }