Skip to content

Commit

Permalink
Fix bug when materialising documents containing (non-existent) pinned…
Browse files Browse the repository at this point in the history
… relations (#177)

* Add logging

* Pretty print input

* Introduce send_to_store helper method

* Some more comments

* Update CHANGELOG

* Make env_logger a dev-dependency

* Failing materializer tests

* Logging in dependency task

* More logging in reduce task

* Don't critically fail when document for requested view not found

* Tests

* Small clippy happy

* fmt

* Update CHANGELOG

* Remove env_logger

* Wait a little longer...

* Wait after both reduce tasks

* Fix method visibility after merge

Co-authored-by: Andreas Dzialocha <adzialocha@users.noreply.github.com>
Co-authored-by: Andreas Dzialocha <x12@adz.garden>
  • Loading branch information
3 people committed Jun 29, 2022
1 parent 07235c8 commit d3c4df4
Show file tree
Hide file tree
Showing 5 changed files with 313 additions and 86 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Improve CI, track test coverage [#139](https://github.com/p2panda/aquadoggo/pull/139)
- Fix compatibility with PostgreSQL, change sqlx runtime to `tokio` [#170](https://github.com/p2panda/aquadoggo/pull/170)
- Use UPSERT for inserting or updating documents [#173](https://github.com/p2panda/aquadoggo/pull/173)
- Don't critically fail reduce task when document missing [#177](https://github.com/p2panda/aquadoggo/pull/177)

## [0.2.0]

Expand Down
7 changes: 2 additions & 5 deletions aquadoggo/src/db/stores/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,7 @@ use p2panda_rs::operation::{
PinnedRelation, PinnedRelationList, Relation, RelationList, VerifiedOperation,
};
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::traits::{
AsStorageEntry, AsStorageLog, EntryStore, LogStore, OperationStore, StorageProvider,
};
use p2panda_rs::storage_provider::traits::{OperationStore, StorageProvider};
use p2panda_rs::test_utils::constants::{DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID};
use p2panda_rs::test_utils::fixtures::{operation, operation_fields};
use rstest::fixture;
Expand All @@ -23,7 +21,6 @@ use sqlx::Any;
use tokio::runtime::Builder;

use crate::db::provider::SqlStorage;
use crate::db::stores::{StorageEntry, StorageLog};
use crate::db::traits::DocumentStore;
use crate::db::{connection_pool, create_database, run_pending_migrations, Pool};
use crate::graphql::client::{EntryArgsRequest, PublishEntryRequest, PublishEntryResponse};
Expand Down Expand Up @@ -378,7 +375,7 @@ async fn create_test_db(
}
}

async fn send_to_store(
pub async fn send_to_store(
store: &SqlStorage,
operation: &Operation,
document_id: Option<&DocumentId>,
Expand Down
202 changes: 200 additions & 2 deletions aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,15 +132,24 @@ pub async fn materializer_service(
mod tests {
use std::time::Duration;

use p2panda_rs::operation::{AsVerifiedOperation, OperationValue};
use p2panda_rs::document::DocumentViewId;
use p2panda_rs::identity::KeyPair;
use p2panda_rs::operation::{
AsVerifiedOperation, Operation, OperationValue, PinnedRelation, PinnedRelationList,
Relation, RelationList,
};
use p2panda_rs::storage_provider::traits::OperationStore;
use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID;
use p2panda_rs::test_utils::fixtures::{
key_pair, operation, operation_fields, random_document_id, random_document_view_id,
random_operation_id,
};
use rstest::rstest;
use tokio::sync::broadcast;
use tokio::task;

use crate::context::Context;
use crate::db::stores::test_utils::{test_db, TestDatabase, TestDatabaseRunner};
use crate::db::stores::test_utils::{send_to_store, test_db, TestDatabase, TestDatabaseRunner};
use crate::db::traits::DocumentStore;
use crate::materializer::{Task, TaskInput};
use crate::Configuration;
Expand Down Expand Up @@ -282,4 +291,193 @@ mod tests {
);
});
}

#[rstest]
fn materialize_update_document(
#[from(test_db)]
#[with(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("name", OperationValue::Text("panda".into()))])]
runner: TestDatabaseRunner,
) {
// Prepare database which inserts data for one document
runner.with_db_teardown(|db: TestDatabase| async move {
// Identify key_[air, document and operation which was inserted for testing
let key_pair = db.key_pairs.first().unwrap();
let document_id = db.documents.first().unwrap();
let verified_operation = db
.store
.get_operations_by_document_id(document_id)
.await
.unwrap()
.first()
.unwrap()
.to_owned();

// Prepare arguments for service
let context = Context::new(db.store.clone(), Configuration::default());
let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let (tx, _) = broadcast::channel(1024);

// Start materializer service
let tx_clone = tx.clone();
let handle = tokio::spawn(async move {
materializer_service(context, shutdown, tx_clone)
.await
.unwrap();
});

// Wait for service to be ready ..
tokio::time::sleep(Duration::from_millis(50)).await;

// Send a message over the bus which kicks in materialization
tx.send(crate::bus::ServiceMessage::NewOperation(
verified_operation.operation_id().to_owned(),
))
.unwrap();

// Wait a little bit for work being done ..
tokio::time::sleep(Duration::from_millis(50)).await;

// Then straight away publish an UPDATE on this document and send it over the bus too.
let (entry_encoded, _) = send_to_store(
&db.store,
&operation(
Some(operation_fields(vec![(
"name",
OperationValue::Text("panda123".into()),
)])),
Some(verified_operation.operation_id().to_owned().into()),
Some(TEST_SCHEMA_ID.parse().unwrap()),
),
Some(document_id),
key_pair,
)
.await;

// Send a message over the bus which kicks in materialization
tx.send(crate::bus::ServiceMessage::NewOperation(
entry_encoded.hash().into(),
))
.unwrap();

// Wait a little bit for work being done ..
tokio::time::sleep(Duration::from_millis(50)).await;

// Make sure the service did not crash and is still running
assert_eq!(handle.is_finished(), false);

// Check database for materialized documents
let document = db
.store
.get_document_by_id(document_id)
.await
.unwrap()
.expect("We expect that the document is `Some`");
assert_eq!(document.id(), &entry_encoded.hash().into());
assert_eq!(
document.fields().get("name").unwrap().value().to_owned(),
OperationValue::Text("panda123".into())
);
});
}

#[rstest]
#[case(
operation(Some(operation_fields(vec![(
"relation",
OperationValue::Relation(Relation::new(
random_document_id(),
)),
)])), None, None)
)]
#[case(
operation(Some(operation_fields(vec![(
"pinned_relation",
OperationValue::PinnedRelation(PinnedRelation::new(
DocumentViewId::new(&[random_operation_id(), random_operation_id()]).unwrap(),
)),
)])), None, None)
)]
#[case(
operation(Some(operation_fields(vec![(
"relation_list",
OperationValue::RelationList(RelationList::new(
vec![
random_document_id(),
random_document_id(),
random_document_id()
],
)),
)])), None, None)
)]
#[case(
operation(Some(operation_fields(vec![(
"pinned_relation_list",
OperationValue::PinnedRelationList(PinnedRelationList::new(
vec![
DocumentViewId::new(&[random_operation_id(), random_operation_id()]).unwrap(),
DocumentViewId::new(&[random_operation_id(), random_operation_id(), random_operation_id()]).unwrap()
],
)),
)])), None, None)
)]
fn materialize_complex_documents(
#[from(test_db)]
#[with(0, 0)]
runner: TestDatabaseRunner,
#[case] operation: Operation,
key_pair: KeyPair,
) {
// Prepare empty database
runner.with_db_teardown(|db: TestDatabase| async move {
// Prepare arguments for service
let context = Context::new(db.store.clone(), Configuration::default());
let shutdown = task::spawn(async {
loop {
// Do this forever .. this means that the shutdown handler will never resolve
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
let (tx, _) = broadcast::channel(1024);

// Start materializer service
let tx_clone = tx.clone();
let handle = tokio::spawn(async move {
materializer_service(context, shutdown, tx_clone)
.await
.unwrap();
});

// Wait for service to be ready ..
tokio::time::sleep(Duration::from_millis(50)).await;

// Then straight away publish a CREATE operation and send it to the bus.
let (entry_encoded, _) = send_to_store(&db.store, &operation, None, &key_pair).await;

// Send a message over the bus which kicks in materialization
tx.send(crate::bus::ServiceMessage::NewOperation(
entry_encoded.hash().into(),
))
.unwrap();

// Wait a little bit for work being done ..
tokio::time::sleep(Duration::from_millis(100)).await;

// Make sure the service did not crash and is still running
assert_eq!(handle.is_finished(), false);

// Check database for materialized documents
let document = db
.store
.get_document_by_id(&entry_encoded.hash().into())
.await
.unwrap()
.expect("We expect that the document is `Some`");
assert_eq!(document.id(), &entry_encoded.hash().into());
});
}
}
64 changes: 52 additions & 12 deletions aquadoggo/src/materializer/tasks/dependency.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use log::debug;
use p2panda_rs::document::DocumentViewId;

use crate::context::Context;
Expand All @@ -19,25 +20,40 @@ use crate::materializer::TaskInput;
/// Expects a _reduce_ task to have completed successfully for the given document view itself and
/// returns a critical error otherwise.
pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
debug!("Working on dependency task {:#?}", input);

// Here we retrive the document view by document view id.
let document_view = match input.document_view_id {
Some(view_id) => context
.store
.get_document_view_by_id(&view_id)
.await
.map_err(|_| TaskError::Critical)
,
.map_err(|err| {
debug!("Fatal error getting document view from storage");
debug!("{}", err);
TaskError::Critical
})
,
// We expect to handle document_view_ids in a dependency task.
None => Err(TaskError::Critical),
}?;

let document_view = match document_view {
Some(document_view) => Ok(document_view),
Some(document_view) => {
debug!(
"Document view retrieved from storage with id: {}",
document_view.id()
);
Ok(document_view)
}
// If no document view for the id passed into this task could be retrieved then this
// document has been deleted or the document view id was invalid. As "dependency" tasks
// are only dispatched after a successful "reduce" task, neither `None` case should
// happen, so this is a critical error.
None => Err(TaskError::Critical),
None => {
debug!("Expected document view not found in the store.");
Err(TaskError::Critical)
}
}?;

let mut next_tasks = Vec::new();
Expand All @@ -51,22 +67,32 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult<T
// means we either have no entries for this document, or we are not materialising
// it for some reason. We don't want to kick of a "reduce" or "dependency" task in
// either of these cases.
debug!("Relation field found, no action required.");
}
p2panda_rs::operation::OperationValue::RelationList(_relation_list) => {
// same as above...
debug!("Relation list field found, no action required.");
}
p2panda_rs::operation::OperationValue::PinnedRelation(pinned_relation) => {
// These are pinned relations. We may have the operations for these views in the db,
// but this view wasn't pinned yet, so hasn't been materialised. To make sure it is
// materialised when possible, we dispatch a "reduce" task for any pinned relations
// which aren't found.
debug!(
"Pinned relation field found refering to view id: {}",
pinned_relation.view_id()
);
next_tasks.push(
construct_relation_task(&context, pinned_relation.view_id().clone()).await?,
);
}
p2panda_rs::operation::OperationValue::PinnedRelationList(pinned_relation_list) => {
// same as above...
for document_view_id in pinned_relation_list.iter() {
debug!(
"Pinned relation list field found containing view id: {}",
document_view_id
);
next_tasks
.push(construct_relation_task(&context, document_view_id.clone()).await?);
}
Expand All @@ -75,7 +101,11 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult<T
}
}

Ok(Some(next_tasks.into_iter().flatten().collect()))
let next_tasks: Vec<Task<TaskInput>> = next_tasks.into_iter().flatten().collect();

debug!("Dispatching {} reduce task(s)", next_tasks.len());

Ok(Some(next_tasks))
}

/// Returns a _reduce_ task for a given document view only if that view does not yet exist in the
Expand All @@ -84,17 +114,27 @@ async fn construct_relation_task(
context: &Context,
document_view_id: DocumentViewId,
) -> Result<Option<Task<TaskInput>>, TaskError> {
debug!("Get view for pinned relation with id: {}", document_view_id);
match context
.store
.get_document_view_by_id(&document_view_id)
.await
.map_err(|_| TaskError::Critical)?
{
Some(_) => Ok(None),
None => Ok(Some(Task::new(
"reduce",
TaskInput::new(None, Some(document_view_id)),
))),
.map_err(|err| {
debug!("Fatal error getting document view from storage");
debug!("{}", err);
TaskError::Critical
})? {
Some(_) => {
debug!("View found for pinned relation: {}", document_view_id);
Ok(None)
}
None => {
debug!("No view found for pinned relation: {}", document_view_id);
Ok(Some(Task::new(
"reduce",
TaskInput::new(None, Some(document_view_id)),
)))
}
}
}

Expand Down
Loading

0 comments on commit d3c4df4

Please sign in to comment.