Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perform "quick commit" in materializer service #280

Merged
merged 7 commits into from
May 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Move GraphQL `types` into separate modules [#343](https://github.com/p2panda/aquadoggo/pull/343)
- Set default order for root queries to document id [#352](https://github.com/p2panda/aquadoggo/pull/352)
- Remove property tests again because of concurrency bug [#347](https://github.com/p2panda/aquadoggo/pull/347)
- Incrementally update documents in materializer [#280](https://github.com/p2panda/aquadoggo/pull/280)

### Fixed

Expand Down
9 changes: 6 additions & 3 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
//! explicitly wish to keep.
use async_trait::async_trait;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{Document, DocumentId, DocumentView, DocumentViewId};
use p2panda_rs::document::{DocumentId, DocumentView, DocumentViewId};
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::error::DocumentStorageError;
use p2panda_rs::storage_provider::traits::DocumentStore;
Expand Down Expand Up @@ -284,7 +284,10 @@ impl SqlStore {
///
/// Note: "out-of-date" document views will remain in storage when a document already existed
/// and is updated. If they are not needed for anything else they can be garbage collected.
pub async fn insert_document(&self, document: &Document) -> Result<(), DocumentStorageError> {
pub async fn insert_document(
&self,
document: &impl AsDocument,
) -> Result<(), DocumentStorageError> {
// Start a transaction, any db insertions after this point, and before the `commit()`
// can be rolled back in the event of an error.
let mut tx = self
Expand Down Expand Up @@ -473,7 +476,7 @@ async fn insert_document_view(
// `documents`, `document_views` and `document_view_fields` tables.
async fn insert_document(
tx: &mut Transaction<'_, Any>,
document: &Document,
document: &impl AsDocument,
) -> Result<(), DocumentStorageError> {
// Insert or update the document to the `documents` table.
query(
Expand Down
201 changes: 187 additions & 14 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

use anyhow::Result;
use log::{debug, warn};
use p2panda_rs::storage_provider::traits::OperationStore;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::operation::OperationId;
use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore};
use tokio::task;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
use crate::db::types::StorageDocument;
use crate::manager::{ServiceReadySender, Shutdown};
use crate::materializer::tasks::{dependency_task, reduce_task, schema_task};
use crate::materializer::worker::{Factory, Task, TaskStatus};
Expand Down Expand Up @@ -75,7 +78,7 @@
.store
.get_tasks()
.await
.expect("Failed retreiving pending tasks from database");
.expect("Failed retrieving pending tasks from database");

debug!("Dispatch {} pending tasks from last runtime", tasks.len());

Expand All @@ -90,19 +93,58 @@
let handle = task::spawn(async move {
while let Ok(ServiceMessage::NewOperation(operation_id)) = rx.recv().await {
// Resolve document id of regarding operation
match context
let document_id = context
.store
.get_document_id_by_operation_id(&operation_id)
.await
.unwrap_or_else(|_| {
panic!(
"Failed database query when retreiving document for operation_id {}",
"Failed database query when retrieving document id by operation_id {}",
operation_id
)
}) {
});

match document_id {
Some(document_id) => {
// Dispatch "reduce" task which will materialize the regarding document
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)));
// Get the document by it's document id.
let document = context
.store
.get_document(&document_id)
.await
.unwrap_or_else(|_| {
panic!(

Check warning on line 115 in aquadoggo/src/materializer/service.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/materializer/service.rs#L115

Added line #L115 was not covered by tests
"Failed database query when retrieving document {}",
document_id
)
});

let mut quick_commit_success = false;

// If a document was found we can try to incrementally update the document.
if document.is_some() {
// Attempt a quick commit of the document.
//
// This succeeds if the operation passed on the bus refers to the documents'
// current view in it's previous field.
if let Some(mut document) = document {
quick_commit_success =
quick_commit(&context, &mut document, &operation_id).await;

// If the commit succeeded and the document isn't now deleted dispatch "dependency" task for the documents new view.
if quick_commit_success && !document.is_deleted() {
factory.queue(Task::new(
"dependency",
TaskInput::new(None, Some(document.view_id().to_owned())),
))
};
};
}

if !quick_commit_success {
// We couldn't perform a quick commit for this document.
// Dispatch "reduce" task which will materialize the regarding document.
factory.queue(Task::new("reduce", TaskInput::new(Some(document_id), None)))
}
}
None => {
// Panic when we couldn't find the regarding document in the database. We can
Expand Down Expand Up @@ -134,6 +176,44 @@
Ok(())
}

async fn quick_commit(
context: &Context,
document: &mut StorageDocument,
operation_id: &OperationId,
) -> bool {
let operation = context
.store
.get_operation(operation_id)
.await
.unwrap_or_else(|_| {
panic!(

Check warning on line 189 in aquadoggo/src/materializer/service.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/materializer/service.rs#L189

Added line #L189 was not covered by tests
"Failed database query when retrieving operation {}",
operation_id
)
})
// An operation should exist for every operation id passed on the bus
.unwrap();

match document.commit(&operation) {
Ok(_) => {
// The quick commit was successful so we now insert the updated document.
context
.store
.insert_document(document)
.await
.unwrap_or_else(|_| {
panic!(

Check warning on line 205 in aquadoggo/src/materializer/service.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/materializer/service.rs#L205

Added line #L205 was not covered by tests
"Failed inserting document with view {} into database",
document.view_id()

Check warning on line 207 in aquadoggo/src/materializer/service.rs

View check run for this annotation

Codecov / codecov/patch

aquadoggo/src/materializer/service.rs#L207

Added line #L207 was not covered by tests
)
});
debug!("Incrementally updated document {}", document.view_id());
true
}
Err(_) => false,
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;
Expand All @@ -154,10 +234,12 @@
use tokio::task;

use crate::context::Context;
use crate::materializer::service::quick_commit;
use crate::materializer::{Task, TaskInput};
use crate::schema::SchemaProvider;
use crate::test_utils::{
doggo_fields, doggo_schema, populate_store_config, test_runner, TestNode,
doggo_fields, doggo_schema, populate_and_materialize, populate_store_config, test_runner,
TestNode,
};
use crate::Configuration;

Expand Down Expand Up @@ -415,18 +497,12 @@

#[rstest]
fn materialize_complex_documents(
#[from(populate_store_config)]
#[with(0, 0, 0)]
config: PopulateStoreConfig,
#[from(operation)]
#[with(Some(operation_fields(doggo_fields())), None, doggo_schema().id().to_owned())]
operation: Operation,
key_pair: KeyPair,
) {
test_runner(move |node: TestNode| async move {
// Populate the store with some entries and operations but DON'T materialise any resulting documents.
populate_store(&node.context.store, &config).await;

// Prepare arguments for service
let context = Context::new(
node.context.store.clone(),
Expand Down Expand Up @@ -484,4 +560,101 @@
assert_eq!(document.id(), &entry_encoded.hash().into());
});
}

#[rstest]
fn performs_quick_commit(
#[from(populate_store_config)]
#[with(1, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let key_pair = &key_pairs[0];
let schema = config.schema;
let store = node.context.store.clone();

// Now we create and insert an UPDATE operation for this document which is pointing at
// the root CREATE operation.
let (encoded_entry, _) = send_to_store(
&node.context.store,
&operation(
Some(operation_fields(vec![(
"username",
OperationValue::String("melon".to_string()),
)])),
Some(document_id.as_str().parse().unwrap()),
schema.id().to_owned(),
),
&schema,
key_pair,
)
.await
.unwrap();

// Get the document.
let mut document = store.get_document(&document_id).await.unwrap().unwrap();

// We expect the quick commit to succeed as the new operation is pointing at the
// current document view id.
assert!(quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await);

// Get the document again.
let document = store.get_document(&document_id).await.unwrap().unwrap();
// It should have an updated value.
assert_eq!(
*document.get("username").unwrap(),
"melon".to_string().into()
)
})
}

#[rstest]
fn does_not_performs_quick_commit(
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |mut node: TestNode| async move {
let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let key_pair = &key_pairs[0];
let schema = config.schema;
let store = node.context.store.clone();

// Now we create and insert an UPDATE operation for this document which is pointing at
// the root CREATE operation.
let (encoded_entry, _) = send_to_store(
&node.context.store,
&operation(
Some(operation_fields(vec![(
"username",
OperationValue::String("melon".to_string()),
)])),
Some(document_id.as_str().parse().unwrap()),
schema.id().to_owned(),
),
&schema,
key_pair,
)
.await
.unwrap();

// Get the document.
let mut document = store.get_document(&document_id).await.unwrap().unwrap();
// We expect the quick commit to fail as the operation isn't pointing at the current
// document view id.
assert!(
!quick_commit(&node.context, &mut document, &encoded_entry.hash().into()).await
);

// Get the document again.
let document = store.get_document(&document_id).await.unwrap().unwrap();
// It should be the original value still.
assert_eq!(
*document.get("username").unwrap(),
"bubu".to_string().into()
)
})
}
}
Loading