diff --git a/CHANGELOG.md b/CHANGELOG.md index 23c932250..a874e6dde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - GraphQL endpoint for publishing entries [#123](https://github.com/p2panda/aquadoggo/pull/132) - Implement SQL `DocumentStore` [#118](https://github.com/p2panda/aquadoggo/pull/118) - Implement SQL `SchemaStore` [#130](https://github.com/p2panda/aquadoggo/pull/130) +- Reduce and dependency tasks [#144](https://github.com/p2panda/aquadoggo/pull/144) - GraphQL endpoints for replication [#100](https://github.com/p2panda/aquadoggo/pull/100) ### Changed diff --git a/Cargo.lock b/Cargo.lock index 414f49fbf..aa8b2f8b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1955,9 +1955,9 @@ checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "js-sys" -version = "0.3.57" +version = "0.3.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" +checksum = "c3fac17f7123a73ca62df411b1bf727ccc805daa070338fda671c86dac1bdc27" dependencies = [ "wasm-bindgen", ] @@ -2391,7 +2391,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.3.0" -source = "git+https://github.com/p2panda/p2panda?branch=main#2fba9c7fdbafbdc07fd0a943731150d8e81717e1" +source = "git+https://github.com/p2panda/p2panda?branch=main#2a73825700d914034d5da9b352a28f1acd7c710d" dependencies = [ "arrayvec 0.5.2", "async-trait", @@ -4070,9 +4070,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" +checksum = "7c53b543413a17a202f4be280a7e5c62a1c69345f5de525ee64f8cfdbc954994" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -4080,9 +4080,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" +checksum = "5491a68ab4500fa6b4d726bd67408630c3dbe9c4fe7bda16d5c82a1fd8c7340a" dependencies = [ "bumpalo", "lazy_static", @@ -4095,9 +4095,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2" +checksum = "de9a9cec1733468a8c657e57fa2413d2ae2c0129b95e87c5b72b8ace4d13f31f" dependencies = [ "cfg-if", "js-sys", @@ -4107,9 +4107,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" +checksum = "c441e177922bc58f1e12c022624b6216378e5febc2f0533e41ba443d505b80aa" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4117,9 +4117,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" +checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" dependencies = [ "proc-macro2", "quote", @@ -4130,15 +4130,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" +checksum = "6a89911bd99e5f3659ec4acf9c4d93b0a90fe4a2a11f15328472058edc5261be" [[package]] name = "web-sys" -version = "0.3.57" +version = "0.3.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" +checksum = "2fed94beee57daf8dd7d51f2b15dc2bcde92d7a72304cdf662a4371008b71b90" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index 8c40bfef8..6c75ffd58 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,4 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later + use async_trait::async_trait; use sqlx::query_scalar; @@ -14,20 +15,21 @@ use crate::graphql::client::{ EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse, }; -#[derive(Debug, Clone)] -/// Sql based storage that implements `StorageProvider` +/// Sql based storage that implements `StorageProvider`. +#[derive(Clone, Debug)] pub struct SqlStorage { pub(crate) pool: Pool, } impl SqlStorage { - /// Create a new `SqlStorage` using the provided db `Pool` + /// Create a new `SqlStorage` using the provided db `Pool`. pub fn new(pool: Pool) -> Self { Self { pool } } } -/// A `StorageProvider` implementation based on `sqlx` that supports SQLite and PostgreSQL databases. +/// A `StorageProvider` implementation based on `sqlx` that supports SQLite and PostgreSQL +/// databases. #[async_trait] impl StorageProvider for SqlStorage { type EntryArgsResponse = EntryArgsResponse; diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index 37a07624d..9efa670fd 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -233,8 +233,6 @@ impl DocumentStore for SqlStorage { .await .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; - println!("{:#?}", document_view_field_rows); - if document_view_field_rows.is_empty() { return Ok(None); } diff --git a/aquadoggo/src/materializer/input.rs b/aquadoggo/src/materializer/input.rs index a2cfd97eb..036ac33d4 100644 --- a/aquadoggo/src/materializer/input.rs +++ b/aquadoggo/src/materializer/input.rs @@ -4,8 +4,8 @@ use p2panda_rs::document::{DocumentId, DocumentViewId}; #[derive(Clone, Eq, PartialEq, Debug, Hash)] pub struct TaskInput { - document_id: Option, - document_view_id: Option, + pub document_id: Option, + pub document_view_id: Option, } impl TaskInput { diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 389651fda..3daa18f4c 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -1,9 +1,305 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use p2panda_rs::document::DocumentViewId; + use crate::context::Context; -use crate::materializer::worker::TaskResult; +use crate::db::traits::DocumentStore; +use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn dependency_task(_context: Context, _input: TaskInput) -> TaskResult { - Ok(None) +/// A dependency task prepares _reduce_ tasks for all pinned relations of a given document view. +/// +/// This task is dispatched after a reduce task completes. It identifies any pinned relations +/// present in a given document view as we need to guarantee the required document views are +/// materialised and stored in the database. We may have the required operations on the node +/// already, but they were never materialised to the document view required by the pinned +/// relation. In order to guarantee all required document views are present we dispatch a +/// reduce task for the view of each pinned relation found. +/// +/// Expects a _reduce_ task to have completed successfully for the given document view itself and +/// returns a critical error otherwise. +pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + // Here we retrive the document view by document view id. + let document_view = match input.document_view_id { + Some(view_id) => context + .store + .get_document_view_by_id(&view_id) + .await + .map_err(|_| TaskError::Critical) + , + // We expect to handle document_view_ids in a dependency task. + None => Err(TaskError::Critical), + }?; + + let document_view = match document_view { + Some(document_view) => Ok(document_view), + // If no document view for the id passed into this task could be retrieved then this + // document has been deleted or the document view id was invalid. As "dependency" tasks + // are only dispatched after a successful "reduce" task, neither `None` case should + // happen, so this is a critical error. + None => Err(TaskError::Critical), + }?; + + let mut next_tasks = Vec::new(); + + // First we handle all pinned or unpinned relations defined in this task's document view. + // We can think of these as "child" relations. + for (_key, document_view_value) in document_view.fields().iter() { + match document_view_value.value() { + p2panda_rs::operation::OperationValue::Relation(_relation) => { + // This is a relation to a document, if it doesn't exist in the db yet, then that + // means we either have no entries for this document, or we are not materialising + // it for some reason. We don't want to kick of a "reduce" or "dependency" task in + // either of these cases. + } + p2panda_rs::operation::OperationValue::RelationList(_relation_list) => { + // same as above... + } + p2panda_rs::operation::OperationValue::PinnedRelation(pinned_relation) => { + // These are pinned relations. We may have the operations for these views in the db, + // but this view wasn't pinned yet, so hasn't been materialised. To make sure it is + // materialised when possible, we dispatch a "reduce" task for any pinned relations + // which aren't found. + next_tasks.push( + construct_relation_task(&context, pinned_relation.view_id().clone()).await?, + ); + } + p2panda_rs::operation::OperationValue::PinnedRelationList(pinned_relation_list) => { + // same as above... + for document_view_id in pinned_relation_list.iter() { + next_tasks + .push(construct_relation_task(&context, document_view_id.clone()).await?); + } + } + _ => (), + } + } + + Ok(Some(next_tasks.into_iter().flatten().collect())) +} + +/// Returns a _reduce_ task for a given document view only if that view does not yet exist in the +/// store. +async fn construct_relation_task( + context: &Context, + document_view_id: DocumentViewId, +) -> Result>, TaskError> { + match context + .store + .get_document_view_by_id(&document_view_id) + .await + .map_err(|_| TaskError::Critical)? + { + Some(_) => Ok(None), + None => Ok(Some(Task::new( + "reduce", + TaskInput::new(None, Some(document_view_id)), + ))), + } +} + +#[cfg(test)] +mod tests { + use p2panda_rs::document::{DocumentId, DocumentViewId}; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::operation::{ + AsVerifiedOperation, OperationValue, PinnedRelation, PinnedRelationList, Relation, + RelationList, + }; + use p2panda_rs::storage_provider::traits::OperationStore; + use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; + use p2panda_rs::test_utils::fixtures::{ + create_operation, random_document_id, random_document_view_id, + }; + use rstest::rstest; + + use crate::config::Configuration; + use crate::context::Context; + use crate::db::stores::test_utils::{insert_entry_operation_and_view, test_db, TestSqlStore}; + use crate::db::traits::DocumentStore; + use crate::materializer::tasks::reduce_task; + use crate::materializer::TaskInput; + + use super::dependency_task; + + #[rstest] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], + vec![]), 0)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("favorite_book_images", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![]), 0)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("something_from_the_past", OperationValue::PinnedRelation(PinnedRelation::new(random_document_view_id())))], + vec![]), 1)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("many_previous_drafts", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect())))], + vec![]), 2)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![]), 2)] + // This document has been updated + #[case(test_db(4, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 3].iter().map(|_|random_document_view_id()).collect()))), + ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], + ), 3)] + #[tokio::test] + async fn dispatches_reduce_tasks_for_pinned_child_dependencies( + #[case] + #[future] + db: TestSqlStore, + #[case] expected_next_tasks: usize, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + + for document_id in &db.documents { + let input = TaskInput::new(Some(document_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap().unwrap(); + } + + for document_id in &db.documents { + let document_view = db + .store + .get_document_by_id(document_id) + .await + .unwrap() + .unwrap(); + + let input = TaskInput::new(None, Some(document_view.id().clone())); + + let reduce_tasks = dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + assert_eq!(reduce_tasks.len(), expected_next_tasks); + for task in reduce_tasks { + assert_eq!(task.0, "reduce") + } + } + } + + #[rstest] + #[tokio::test] + async fn no_reduce_task_for_materialised_document_relations( + #[from(test_db)] + #[with(1, 1)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + let document_id = db.documents[0].clone(); + + let input = TaskInput::new(Some(document_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap().unwrap(); + + // Here we have one materialised document, (we are calling it a child as we will shortly be publishing parents) + // it contains relations which are not materialised yet so should dispatch a reduce task for each one. + + let document_view_of_child = db + .store + .get_document_by_id(&document_id) + .await + .unwrap() + .unwrap(); + + let document_view_id_of_child = document_view_of_child.id(); + + // Create a new document referencing the existing materialised document. + + let operation = create_operation(&[ + ( + "pinned_relation_to_existing_document", + OperationValue::PinnedRelation(PinnedRelation::new( + document_view_id_of_child.clone(), + )), + ), + ( + "pinned_relation_to_not_existing_document", + OperationValue::PinnedRelation(PinnedRelation::new(random_document_view_id())), + ), + ]); + + let (_, document_view_id) = + insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + + // The new document should now dispatch one dependency task for the child relation which + // has not been materialised yet. + let input = TaskInput::new(None, Some(document_view_id.clone())); + let tasks = dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].0, "reduce"); + } + + #[rstest] + #[should_panic(expected = "Critical")] + #[case(None, Some(random_document_view_id()))] + #[should_panic(expected = "Critical")] + #[case(None, None)] + #[should_panic(expected = "Critical")] + #[case(Some(random_document_id()), None)] + #[should_panic(expected = "Critical")] + #[case(Some(random_document_id()), Some(random_document_view_id()))] + #[tokio::test] + async fn fails_correctly( + #[case] document_id: Option, + #[case] document_view_id: Option, + #[from(test_db)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store, Configuration::default()); + let input = TaskInput::new(document_id, document_view_id); + + let next_tasks = dependency_task(context.clone(), input).await.unwrap(); + assert!(next_tasks.is_none()) + } + + #[rstest] + #[should_panic(expected = "Critical")] + #[case(test_db(2, 1, true, TEST_SCHEMA_ID.parse().unwrap(), + vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], + vec![]))] + #[should_panic(expected = "Critical")] + #[case(test_db(2, 1, true, TEST_SCHEMA_ID.parse().unwrap(), + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![]))] + #[tokio::test] + async fn fails_on_deleted_documents( + #[case] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + let document_id = db.documents[0].clone(); + + let input = TaskInput::new(Some(document_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap(); + + let document_operations = db + .store + .get_operations_by_document_id(&document_id) + .await + .unwrap(); + + let document_view_id: DocumentViewId = document_operations[1].operation_id().clone().into(); + + let input = TaskInput::new(None, Some(document_view_id.clone())); + + dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + } } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 3aecb888e..279d8ad04 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -1,17 +1,330 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::document::DocumentBuilder; +use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId}; +use p2panda_rs::operation::VerifiedOperation; +use p2panda_rs::storage_provider::traits::OperationStore; use crate::context::Context; -use crate::materializer::worker::{TaskError, TaskResult}; +use crate::db::traits::DocumentStore; +use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn reduce_task(_context: Context, _input: TaskInput) -> TaskResult { - // @TODO: Load operations from database - let _document = DocumentBuilder::new(vec![]) - .build() - .map_err(|_| TaskError::Failure)?; +/// A reduce task is dispatched for every entry and operation pair which arrives at a node. +/// +/// They may also be dispatched from a dependency task when a pinned relations is present on an +/// already materialised document view. +/// +/// After succesfully reducing and storing a document view an array of dependency tasks is returned. +/// If invalid inputs were passed or a fatal db error occured a critical error is returned. +pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { + // Find out which document we are handling + let document_id = resolve_document_id(&context, &input).await?; - // @TODO: Store document view in database - Ok(None) + // Get all operations for the requested document + let operations = context + .store + .get_operations_by_document_id(&document_id) + .await + .map_err(|_| TaskError::Critical)?; + + let document_view_id = match &input.document_view_id { + // If this task was passed a document_view_id as input then we want to build to document + // only to the requested view + Some(view_id) => reduce_document_view(&context, view_id, operations).await?, + // If no document_view_id was passed, this is a document_id reduce task. + None => reduce_document(&context, operations).await?, + }; + + // Dispatch a "dependency" task if we created a new document view + match document_view_id { + Some(view_id) => Ok(Some(vec![Task::new( + "dependency", + TaskInput::new(None, Some(view_id)), + )])), + None => Ok(None), + } +} + +/// Helper method to resolve a `DocumentId` from task input. +/// +/// If the task input is invalid (both document_id and document_view_id missing or given) we +/// critically fail the task at this point. If only a document_view_id was passed we retrieve the +/// document_id as it is needed later. +async fn resolve_document_id( + context: &Context, + input: &TaskInput, +) -> Result { + match (&input.document_id, &input.document_view_id) { + // The `DocumentId` is already given, we don't have to do anything + (Some(document_id), None) => Ok(document_id.to_owned()), + + // A `DocumentViewId` is given, let's find out its document id + (None, Some(document_view_id)) => { + // @TODO: We can skip this step if we implement: + // https://github.com/p2panda/aquadoggo/issues/148 + let operation_id = document_view_id.clone().into_iter().next().unwrap(); + match context + .store + .get_document_by_operation_id(&operation_id) + .await + .map_err(|_| TaskError::Critical)? + { + Some(document_id) => Ok(document_id), + None => Err(TaskError::Critical), + } + } + + // None or both have been provided which smells like a bug + (_, _) => Err(TaskError::Critical), + } +} + +/// Helper method to reduce an operation graph to a specific document view, returning the +/// `DocumentViewId` of the just created new document view. +/// +/// It returns `None` if either that document view reached "deleted" status or we don't have enough +/// operations to materialise. +async fn reduce_document_view( + context: &Context, + document_view_id: &DocumentViewId, + operations: Vec, +) -> Result, TaskError> { + let document = match DocumentBuilder::new(operations) + .build_to_view_id(Some(document_view_id.to_owned())) + { + Ok(document) => { + // If the document was deleted, then we return nothing + if document.is_deleted() { + return Ok(None); + }; + + document + } + Err(_) => { + // There is not enough operations yet to materialise this view. Maybe next time! + return Ok(None); + } + }; + + // Insert the new document view into the database + context + .store + .insert_document_view(document.view().unwrap(), document.schema()) + .await + .map_err(|_| TaskError::Critical)?; + + // Return the new view id to be used in the resulting dependency task + Ok(Some(document.view_id().to_owned())) +} + +/// Helper method to reduce an operation graph to the latest document view, returning the +/// `DocumentViewId` of the just created new document view. +/// +/// It returns `None` if either that document view reached "deleted" status or we don't have enough +/// operations to materialise. +async fn reduce_document( + context: &Context, + operations: Vec, +) -> Result, TaskError> { + match DocumentBuilder::new(operations).build() { + Ok(document) => { + // Insert this document into storage. If it already existed, this will update it's + // current view + context + .store + .insert_document(&document) + .await + .map_err(|_| TaskError::Critical)?; + + // If the document was deleted, then we return nothing + if document.is_deleted() { + return Ok(None); + } + + // Return the new document_view id to be used in the resulting dependency task + Ok(Some(document.view_id().to_owned())) + } + Err(_) => { + // There is not enough operations yet to materialise this view. Maybe next time! + Ok(None) + } + } +} + +#[cfg(test)] +mod tests { + use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId}; + use p2panda_rs::operation::{AsVerifiedOperation, OperationValue}; + use p2panda_rs::storage_provider::traits::OperationStore; + use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; + use rstest::rstest; + + use crate::config::Configuration; + use crate::context::Context; + use crate::db::stores::test_utils::{test_db, TestSqlStore}; + use crate::db::traits::DocumentStore; + use crate::materializer::tasks::reduce_task; + use crate::materializer::TaskInput; + + #[rstest] + #[tokio::test] + async fn reduces_documents( + #[from(test_db)] + #[with(2, 20, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("username", OperationValue::Text("panda".into()))], vec![("username", OperationValue::Text("PANDA".into()))])] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store, Configuration::default()); + + for document_id in &db.documents { + let input = TaskInput::new(Some(document_id.clone()), None); + assert!(reduce_task(context.clone(), input).await.is_ok()); + } + + for document_id in &db.documents { + let document_view = context.store.get_document_by_id(document_id).await.unwrap(); + + assert_eq!( + document_view.unwrap().get("username").unwrap().value(), + &OperationValue::Text("PANDA".to_string()) + ) + } + } + + #[rstest] + #[tokio::test] + async fn reduces_document_to_specific_view_id( + #[from(test_db)] + #[with(2, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("username", OperationValue::Text("panda".into()))], vec![("username", OperationValue::Text("PANDA".into()))])] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + + let document_operations = db + .store + .get_operations_by_document_id(&db.documents[0]) + .await + .unwrap(); + + let document = DocumentBuilder::new(document_operations).build().unwrap(); + let mut sorted_document_operations = document.operations().clone(); + + let document_view_id: DocumentViewId = sorted_document_operations + .pop() + .unwrap() + .operation_id() + .clone() + .into(); + + let context = Context::new(db.store.clone(), Configuration::default()); + let input = TaskInput::new(None, Some(document_view_id.clone())); + + assert!(reduce_task(context.clone(), input).await.is_ok()); + + let document_view = db + .store + .get_document_view_by_id(&document_view_id) + .await + .unwrap(); + + assert_eq!( + document_view.unwrap().get("username").unwrap().value(), + &OperationValue::Text("PANDA".to_string()) + ); + + // We didn't reduce this document_view_id so it shouldn't exist in the db. + let document_view_id: DocumentViewId = sorted_document_operations + .pop() + .unwrap() + .operation_id() + .clone() + .into(); + + let document_view = db + .store + .get_document_view_by_id(&document_view_id) + .await + .unwrap(); + + assert!(document_view.is_none()); + } + + #[rstest] + #[tokio::test] + async fn deleted_documents_have_no_view( + #[from(test_db)] + #[with(3, 20, true)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + + for document_id in &db.documents { + let input = TaskInput::new(Some(document_id.clone()), None); + let tasks = reduce_task(context.clone(), input).await.unwrap(); + assert!(tasks.is_none()); + } + + for document_id in &db.documents { + let document_view = context.store.get_document_by_id(document_id).await.unwrap(); + assert!(document_view.is_none()) + } + + let document_operations = context + .store + .get_operations_by_document_id(&db.documents[0]) + .await + .unwrap(); + + let document = DocumentBuilder::new(document_operations).build().unwrap(); + + let input = TaskInput::new(None, Some(document.view_id().clone())); + let tasks = reduce_task(context.clone(), input).await.unwrap(); + + assert!(tasks.is_none()); + } + + #[rstest] + #[case(test_db(3, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("username", OperationValue::Text("panda".into()))], vec![("username", OperationValue::Text("PANDA".into()))]), true)] + // This document is deleted, it shouldn't spawn a dependency task. + #[case(test_db(3, 1, true, TEST_SCHEMA_ID.parse().unwrap(), + vec![("username", OperationValue::Text("panda".into()))], vec![("username", OperationValue::Text("PANDA".into()))]), false)] + #[tokio::test] + async fn returns_dependency_task_inputs( + #[case] + #[future] + db: TestSqlStore, + #[case] is_next_task: bool, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + let document_id = db.documents[0].clone(); + + let input = TaskInput::new(Some(document_id.clone()), None); + let next_task_inputs = reduce_task(context.clone(), input).await.unwrap(); + + assert_eq!(next_task_inputs.is_some(), is_next_task); + } + + #[rstest] + #[should_panic(expected = "Critical")] + #[case(None, None)] + #[tokio::test] + async fn fails_correctly( + #[case] document_id: Option, + #[case] document_view_id: Option, + #[from(test_db)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store, Configuration::default()); + let input = TaskInput::new(document_id, document_view_id); + + reduce_task(context.clone(), input).await.unwrap(); + } } diff --git a/aquadoggo/src/materializer/tasks/schema.rs b/aquadoggo/src/materializer/tasks/schema.rs index e27ea5b88..d4a35e4e2 100644 --- a/aquadoggo/src/materializer/tasks/schema.rs +++ b/aquadoggo/src/materializer/tasks/schema.rs @@ -5,5 +5,5 @@ use crate::materializer::worker::TaskResult; use crate::materializer::TaskInput; pub async fn schema_task(_context: Context, _input: TaskInput) -> TaskResult { - Ok(None) + todo!() } diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 166b81637..871790de6 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -92,7 +92,7 @@ use triggered::{Listener, Trigger}; /// A task holding a generic input value and the name of the worker which will process it /// eventually. #[derive(Debug, Clone)] -pub struct Task(WorkerName, IN); +pub struct Task(pub WorkerName, IN); impl Task { /// Returns a new task. @@ -107,12 +107,13 @@ impl Task { pub type TaskResult = Result>>, TaskError>; /// Possible return values of a failed task. +#[derive(Debug)] pub enum TaskError { /// This tasks failed critically and will cause the whole program to panic. - #[allow(dead_code)] Critical, /// This task failed silently without any further effects. + #[allow(dead_code)] Failure, }