diff --git a/CHANGELOG.md b/CHANGELOG.md index de71dd987..33b9885e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Move GraphQL `types` into separate modules [#343](https://github.com/p2panda/aquadoggo/pull/343) - Set default order for root queries to document id [#352](https://github.com/p2panda/aquadoggo/pull/352) - Remove property tests again because of concurrency bug [#347](https://github.com/p2panda/aquadoggo/pull/347) +- Incrementally update documents in materializer [#280](https://github.com/p2panda/aquadoggo/pull/280) ### Fixed diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index afce47dc1..a73f00670 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -31,7 +31,7 @@ //! explicitly wish to keep. use async_trait::async_trait; use p2panda_rs::document::traits::AsDocument; -use p2panda_rs::document::{Document, DocumentId, DocumentView, DocumentViewId}; +use p2panda_rs::document::{DocumentId, DocumentView, DocumentViewId}; use p2panda_rs::schema::SchemaId; use p2panda_rs::storage_provider::error::DocumentStorageError; use p2panda_rs::storage_provider::traits::DocumentStore; @@ -284,7 +284,10 @@ impl SqlStore { /// /// Note: "out-of-date" document views will remain in storage when a document already existed /// and is updated. If they are not needed for anything else they can be garbage collected. - pub async fn insert_document(&self, document: &Document) -> Result<(), DocumentStorageError> { + pub async fn insert_document( + &self, + document: &impl AsDocument, + ) -> Result<(), DocumentStorageError> { // Start a transaction, any db insertions after this point, and before the `commit()` // can be rolled back in the event of an error. let mut tx = self @@ -473,7 +476,7 @@ async fn insert_document_view( // `documents`, `document_views` and `document_view_fields` tables. async fn insert_document( tx: &mut Transaction<'_, Any>, - document: &Document, + document: &impl AsDocument, ) -> Result<(), DocumentStorageError> { // Insert or update the document to the `documents` table. query( diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 90ac41ac8..d61e71d97 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -2,11 +2,14 @@ use anyhow::Result; use log::{debug, warn}; -use p2panda_rs::storage_provider::traits::OperationStore; +use p2panda_rs::document::traits::AsDocument; +use p2panda_rs::operation::OperationId; +use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore}; use tokio::task; use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; +use crate::db::types::StorageDocument; use crate::manager::{ServiceReadySender, Shutdown}; use crate::materializer::tasks::{dependency_task, reduce_task, schema_task}; use crate::materializer::worker::{Factory, Task, TaskStatus}; @@ -75,7 +78,7 @@ pub async fn materializer_service( .store .get_tasks() .await - .expect("Failed retreiving pending tasks from database"); + .expect("Failed retrieving pending tasks from database"); debug!("Dispatch {} pending tasks from last runtime", tasks.len()); @@ -90,19 +93,58 @@ pub async fn materializer_service( let handle = task::spawn(async move { while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await { // Resolve document id of regarding operation - match context + let document_id = context .store .get_document_id_by_operation_id(&operation_id) .await .unwrap_or_else(|_| { panic!( - "Failed database query when retreiving document for operation_id {}", + "Failed database query when retrieving document id by operation_id {}", operation_id ) - }) { + }); + + match document_id { Some(document_id) => { - // Dispatch "reduce" task which will materialize the regarding document - factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None))); + // Get the document by it's document id. + let document = context + .store + .get_document(&document_id) + .await + .unwrap_or_else(|_| { + panic!( + "Failed database query when retrieving document {}", + document_id + ) + }); + + let mut quick_commit_success = false; + + // If a document was found we can try to incrementally update the document. + if document.is_some() { + // Attempt a quick commit of the document. + // + // This succeeds if the operation passed on the bus refers to the documents' + // current view in it's previous field. + if let Some(mut document) = document { + quick_commit_success = + quick_commit(&context, &mut document, &operation_id).await; + + // If the commit succeeded and the document isn't now deleted dispatch "dependency" task for the documents new view. + if quick_commit_success && !document.is_deleted() { + factory.queue(Task::new( + "dependency", + TaskInput::new(None, Some(document.view_id().to_owned())), + )) + }; + }; + } + + if !quick_commit_success { + // We couldn't perform a quick commit for this document. + // Dispatch "reduce" task which will materialize the regarding document. + factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None))) + } } None => { // Panic when we couldn't find the regarding document in the database. We can @@ -134,6 +176,44 @@ pub async fn materializer_service( Ok(()) } +async fn quick_commit( + context: &Context, + document: &mut StorageDocument, + operation_id: &OperationId, +) -> bool { + let operation = context + .store + .get_operation(operation_id) + .await + .unwrap_or_else(|_| { + panic!( + "Failed database query when retrieving operation {}", + operation_id + ) + }) + // An operation should exist for every operation id passed on the bus + .unwrap(); + + match document.commit(&operation) { + Ok(_) => { + // The quick commit was successful so we now insert the updated document. + context + .store + .insert_document(document) + .await + .unwrap_or_else(|_| { + panic!( + "Failed inserting document with view {} into database", + document.view_id() + ) + }); + debug!("Incrementally updated document {}", document.view_id()); + true + } + Err(_) => false, + } +} + #[cfg(test)] mod tests { use std::time::Duration; @@ -154,10 +234,12 @@ mod tests { use tokio::task; use crate::context::Context; + use crate::materializer::service::quick_commit; use crate::materializer::{Task, TaskInput}; use crate::schema::SchemaProvider; use crate::test_utils::{ - doggo_fields, doggo_schema, populate_store_config, test_runner, TestNode, + doggo_fields, doggo_schema, populate_and_materialize, populate_store_config, test_runner, + TestNode, }; use crate::Configuration; @@ -415,18 +497,12 @@ mod tests { #[rstest] fn materialize_complex_documents( - #[from(populate_store_config)] - #[with(0, 0, 0)] - config: PopulateStoreConfig, #[from(operation)] #[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())] operation: Operation, key_pair: KeyPair, ) { test_runner(move |node: TestNode| async move { - // Populate the store with some entries and operations but DON'T materialise any resulting documents. - populate_store(&node.context.store, &config).await; - // Prepare arguments for service let context = Context::new( node.context.store.clone(), @@ -484,4 +560,101 @@ mod tests { assert_eq!(document.id(), &entry_encoded.hash().into()); }); } + + #[rstest] + fn performs_quick_commit( + #[from(populate_store_config)] + #[with(1, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(move |mut node: TestNode| async move { + let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + let key_pair = &key_pairs[0]; + let schema = config.schema; + let store = node.context.store.clone(); + + // Now we create and insert an UPDATE operation for this document which is pointing at + // the root CREATE operation. + let (encoded_entry, _) = send_to_store( + &node.context.store, + &operation( + Some(operation_fields(vec![( + "username", + OperationValue::String("melon".to_string()), + )])), + Some(document_id.as_str().parse().unwrap()), + schema.id().to_owned(), + ), + &schema, + key_pair, + ) + .await + .unwrap(); + + // Get the document. + let mut document = store.get_document(&document_id).await.unwrap().unwrap(); + + // We expect the quick commit to succeed as the new operation is pointing at the + // current document view id. + assert!(quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await); + + // Get the document again. + let document = store.get_document(&document_id).await.unwrap().unwrap(); + // It should have an updated value. + assert_eq!( + *document.get("username").unwrap(), + "melon".to_string().into() + ) + }) + } + + #[rstest] + fn does_not_performs_quick_commit( + #[from(populate_store_config)] + #[with(2, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(move |mut node: TestNode| async move { + let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + let key_pair = &key_pairs[0]; + let schema = config.schema; + let store = node.context.store.clone(); + + // Now we create and insert an UPDATE operation for this document which is pointing at + // the root CREATE operation. + let (encoded_entry, _) = send_to_store( + &node.context.store, + &operation( + Some(operation_fields(vec![( + "username", + OperationValue::String("melon".to_string()), + )])), + Some(document_id.as_str().parse().unwrap()), + schema.id().to_owned(), + ), + &schema, + key_pair, + ) + .await + .unwrap(); + + // Get the document. + let mut document = store.get_document(&document_id).await.unwrap().unwrap(); + // We expect the quick commit to fail as the operation isn't pointing at the current + // document view id. + assert!( + !quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await + ); + + // Get the document again. + let document = store.get_document(&document_id).await.unwrap().unwrap(); + // It should be the original value still. + assert_eq!( + *document.get("username").unwrap(), + "bubu".to_string().into() + ) + }) + } }