Skip to content

Commit

Permalink
Perform "quick commit" in materializer service (#280)
Browse files Browse the repository at this point in the history
* Perform quick commit in materializer service

* Clippy Happy

* Clippy more happy

* Update CHANGELOG

* Small refactor

* Tests for quick commit

* Remove incorrect test comments
  • Loading branch information
sandreae committed May 24, 2023
1 parent 8cd21bd commit a5b3d80
Show file tree
Hide file tree
Showing 3 changed files with 194 additions and 17 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Move GraphQL `types` into separate modules [#343](https://github.com/p2panda/aquadoggo/pull/343)
- Set default order for root queries to document id [#352](https://github.com/p2panda/aquadoggo/pull/352)
- Remove property tests again because of concurrency bug [#347](https://github.com/p2panda/aquadoggo/pull/347)
- Incrementally update documents in materializer [#280](https://github.com/p2panda/aquadoggo/pull/280)

### Fixed

Expand Down
9 changes: 6 additions & 3 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
//! explicitly wish to keep.
use async_trait::async_trait;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{Document, DocumentId, DocumentView, DocumentViewId};
use p2panda_rs::document::{DocumentId, DocumentView, DocumentViewId};
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::error::DocumentStorageError;
use p2panda_rs::storage_provider::traits::DocumentStore;
Expand Down Expand Up @@ -284,7 +284,10 @@ impl SqlStore {
///
/// Note: "out-of-date" document views will remain in storage when a document already existed
/// and is updated. If they are not needed for anything else they can be garbage collected.
pub async fn insert_document(&self, document: &Document) -> Result<(), DocumentStorageError> {
pub async fn insert_document(
&self,
document: &impl AsDocument,
) -> Result<(), DocumentStorageError> {
// Start a transaction, any db insertions after this point, and before the `commit()`
// can be rolled back in the event of an error.
let mut tx = self
Expand Down Expand Up @@ -473,7 +476,7 @@ async fn insert_document_view(
// `documents`, `document_views` and `document_view_fields` tables.
async fn insert_document(
tx: &mut Transaction<'_, Any>,
document: &Document,
document: &impl AsDocument,
) -> Result<(), DocumentStorageError> {
// Insert or update the document to the `documents` table.
query(
Expand Down
201 changes: 187 additions & 14 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

use anyhow::Result;
use log::{debug, warn};
use p2panda_rs::storage_provider::traits::OperationStore;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::operation::OperationId;
use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore};
use tokio::task;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
use crate::db::types::StorageDocument;
use crate::manager::{ServiceReadySender, Shutdown};
use crate::materializer::tasks::{dependency_task, reduce_task, schema_task};
use crate::materializer::worker::{Factory, Task, TaskStatus};
Expand Down Expand Up @@ -75,7 +78,7 @@ pub async fn materializer_service(
.store
.get_tasks()
.await
.expect("Failed retreiving pending tasks from database");
.expect("Failed retrieving pending tasks from database");

debug!("Dispatch {} pending tasks from last runtime", tasks.len());

Expand All @@ -90,19 +93,58 @@ pub async fn materializer_service(
let handle = task::spawn(async move {
while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
}) {
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
// Get the document by it's document id.
let document = context
.store
.get_document(&document_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving document {}",
document_id
)
});

let mut quick_commit_success = false;

// If a document was found we can try to incrementally update the document.
if document.is_some() {
// Attempt a quick commit of the document.
//
// This succeeds if the operation passed on the bus refers to the documents'
// current view in it's previous field.
if let Some(mut document) = document {
quick_commit_success =
quick_commit(&context, &mut document, &operation_id).await;

// If the commit succeeded and the document isn't now deleted dispatch "dependency" task for the documents new view.
if quick_commit_success && !document.is_deleted() {
factory.queue(Task::new(
"dependency",
TaskInput::new(None, Some(document.view_id().to_owned())),
))
};
};
}

if !quick_commit_success {
// We couldn't perform a quick commit for this document.
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)))
}
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
Expand Down Expand Up @@ -134,6 +176,44 @@ pub async fn materializer_service(
Ok(())
}

async fn quick_commit(
context: &Context,
document: &mut StorageDocument,
operation_id: &OperationId,
) -> bool {
let operation = context
.store
.get_operation(operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retrieving operation {}",
operation_id
)
})
// An operation should exist for every operation id passed on the bus
.unwrap();

match document.commit(&operation) {
Ok(_) => {
// The quick commit was successful so we now insert the updated document.
context
.store
.insert_document(document)
.await
.unwrap_or_else(|_| {
panic!(
"Failed inserting document with view {} into database",
document.view_id()
)
});
debug!("Incrementally updated document {}", document.view_id());
true
}
Err(_) => false,
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand All @@ -154,10 +234,12 @@ mod tests {
use tokio::task;

use crate::context::Context;
use crate::materializer::service::quick_commit;
use crate::materializer::{Task, TaskInput};
use crate::schema::SchemaProvider;
use crate::test_utils::{
doggo_fields, doggo_schema, populate_store_config, test_runner, TestNode,
doggo_fields, doggo_schema, populate_and_materialize, populate_store_config, test_runner,
TestNode,
};
use crate::Configuration;

Expand Down Expand Up @@ -415,18 +497,12 @@ mod tests {

#[rstest]
fn materialize_complex_documents(
#[from(populate_store_config)]
#[with(0, 0, 0)]
config: PopulateStoreConfig,
#[from(operation)]
#[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())]
operation: Operation,
key_pair: KeyPair,
) {
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
populate_store(&node.context.store, &config).await;

// Prepare arguments for service
let context = Context::new(
node.context.store.clone(),
Expand Down Expand Up @@ -484,4 +560,101 @@ mod tests {
assert_eq!(document.id(), &entry_encoded.hash().into());
});
}

#[rstest]
fn performs_quick_commit(
#[from(populate_store_config)]
#[with(1, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let key_pair = &key_pairs[0];
let schema = config.schema;
let store = node.context.store.clone();

// Now we create and insert an UPDATE operation for this document which is pointing at
// the root CREATE operation.
let (encoded_entry, _) = send_to_store(
&node.context.store,
&operation(
Some(operation_fields(vec![(
"username",
OperationValue::String("melon".to_string()),
)])),
Some(document_id.as_str().parse().unwrap()),
schema.id().to_owned(),
),
&schema,
key_pair,
)
.await
.unwrap();

// Get the document.
let mut document = store.get_document(&document_id).await.unwrap().unwrap();

// We expect the quick commit to succeed as the new operation is pointing at the
// current document view id.
assert!(quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await);

// Get the document again.
let document = store.get_document(&document_id).await.unwrap().unwrap();
// It should have an updated value.
assert_eq!(
*document.get("username").unwrap(),
"melon".to_string().into()
)
})
}

#[rstest]
fn does_not_performs_quick_commit(
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let key_pair = &key_pairs[0];
let schema = config.schema;
let store = node.context.store.clone();

// Now we create and insert an UPDATE operation for this document which is pointing at
// the root CREATE operation.
let (encoded_entry, _) = send_to_store(
&node.context.store,
&operation(
Some(operation_fields(vec![(
"username",
OperationValue::String("melon".to_string()),
)])),
Some(document_id.as_str().parse().unwrap()),
schema.id().to_owned(),
),
&schema,
key_pair,
)
.await
.unwrap();

// Get the document.
let mut document = store.get_document(&document_id).await.unwrap().unwrap();
// We expect the quick commit to fail as the operation isn't pointing at the current
// document view id.
assert!(
!quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await
);

// Get the document again.
let document = store.get_document(&document_id).await.unwrap().unwrap();
// It should be the original value still.
assert_eq!(
*document.get("username").unwrap(),
"bubu".to_string().into()
)
})
}
}

0 comments on commit a5b3d80

Please sign in to comment.