From 7dd1adf948845241e192883ce1d39d4c4c2b4065 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Sat, 4 Jun 2022 11:38:24 +0100 Subject: [PATCH 01/33] WIP: implement document materialisation and storage --- aquadoggo/src/materializer/input.rs | 4 +-- aquadoggo/src/materializer/tasks/reduce.rs | 40 +++++++++++++++++++--- 2 files changed, 38 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/materializer/input.rs b/aquadoggo/src/materializer/input.rs index a2cfd97eb..036ac33d4 100644 --- a/aquadoggo/src/materializer/input.rs +++ b/aquadoggo/src/materializer/input.rs @@ -4,8 +4,8 @@ use p2panda_rs::document::{DocumentId, DocumentViewId}; #[derive(Clone, Eq, PartialEq, Debug, Hash)] pub struct TaskInput { - document_id: Option, - document_view_id: Option, + pub document_id: Option, + pub document_view_id: Option, } impl TaskInput { diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 3aecb888e..42600266e 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -1,17 +1,49 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use p2panda_rs::document::DocumentBuilder; +use p2panda_rs::storage_provider::traits::OperationStore; use crate::context::Context; +use crate::db::traits::DocumentStore; use crate::materializer::worker::{TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn reduce_task(_context: Context, _input: TaskInput) -> TaskResult { - // @TODO: Load operations from database - let _document = DocumentBuilder::new(vec![]) +pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { + let document_id = match (input.document_id, input.document_view_id) { + (Some(document_id), None) => document_id, + // TODO: find document_id from document_view_id then get operations. + // + // We could have a `get_operations_by_document_view_id()` in `OperationStore`, or + // could we even do this with some fancy recursive SQL query? We might need the `previous_operations` + // table back for that. + (None, Some(_)) => todo!(), + (_, _) => todo!(), + }; + + let operations = context + .store + .get_operations_by_document_id(&document_id) + .await + .map_err(|_| TaskError::Failure)? + .into_iter() + // TODO: we can avoid this conversion if we do https://github.com/p2panda/p2panda/issues/320 + .map(|op| op.into()) + .collect(); + + // TODO: If we are resolving a document_view_id, but we are missing operations from it's graph, + // what do we want to return? + + let document = DocumentBuilder::new(operations) .build() .map_err(|_| TaskError::Failure)?; - // @TODO: Store document view in database + // TODO: If this was a document_view reduction, we want to call `insert_document_view()` instead. + + context + .store + .insert_document(&document) + .await + .map_err(|_| TaskError::Failure)?; + Ok(None) } From f19c13618bf00f410a3f4f9060a0eb664cf7b8dc Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Sat, 4 Jun 2022 15:08:24 +0100 Subject: [PATCH 02/33] WIP: Implement document_view materialisation --- aquadoggo/src/materializer/tasks/reduce.rs | 63 +++++++++++++++------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 42600266e..73ffe07be 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -9,16 +9,25 @@ use crate::materializer::worker::{TaskError, TaskResult}; use crate::materializer::TaskInput; pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { - let document_id = match (input.document_id, input.document_view_id) { - (Some(document_id), None) => document_id, - // TODO: find document_id from document_view_id then get operations. - // - // We could have a `get_operations_by_document_view_id()` in `OperationStore`, or + let document_id = match (&input.document_id, &input.document_view_id) { + (Some(document_id), None) => Ok(document_id.to_owned()), + // TODO: Alt approach: we could have a `get_operations_by_document_view_id()` in `OperationStore`, or // could we even do this with some fancy recursive SQL query? We might need the `previous_operations` // table back for that. - (None, Some(_)) => todo!(), + (None, Some(document_view_id)) => { + let operation_id = document_view_id.clone().into_iter().next().unwrap(); + let document_id = context + .store + .get_document_by_operation_id(operation_id) + .await + .map_err(|_| TaskError::Failure)?; + match document_id { + Some(id) => Ok(id), + None => Err(TaskError::Failure), + } + } (_, _) => todo!(), - }; + }?; let operations = context .store @@ -30,20 +39,38 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { + // TODO: If we are resolving a document_view_id, but we are missing operations from it's graph, + // what do we want to return? + let document = DocumentBuilder::new(operations) + .build_to_view_id(Some(document_view_id.to_owned())) + .map_err(|_| TaskError::Failure)?; - let document = DocumentBuilder::new(operations) - .build() - .map_err(|_| TaskError::Failure)?; + // If the document is deleted, there is no view, so we don't insert it. + if document.is_deleted() { + return Ok(None); + } - // TODO: If this was a document_view reduction, we want to call `insert_document_view()` instead. + context + .store + // Unwrap as all not deleted documents have a view. + .insert_document_view(&document.view().unwrap(), document.schema()) + .await + .map_err(|_| TaskError::Failure)? + } + None => { + let document = DocumentBuilder::new(operations) + .build() + .map_err(|_| TaskError::Failure)?; - context - .store - .insert_document(&document) - .await - .map_err(|_| TaskError::Failure)?; + context + .store + .insert_document(&document) + .await + .map_err(|_| TaskError::Failure)? + } + }; Ok(None) } From be401bf9e9b768bf2ad5aa38ebfaa472b7143eb4 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Sat, 4 Jun 2022 15:14:14 +0100 Subject: [PATCH 03/33] Bump p2panda_rs branch --- Cargo.lock | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 414f49fbf..aa8b2f8b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1955,9 +1955,9 @@ checksum = "112c678d4050afce233f4f2852bb2eb519230b3cf12f33585275537d7e41578d" [[package]] name = "js-sys" -version = "0.3.57" +version = "0.3.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "671a26f820db17c2a2750743f1dd03bafd15b98c9f30c7c2628c024c05d73397" +checksum = "c3fac17f7123a73ca62df411b1bf727ccc805daa070338fda671c86dac1bdc27" dependencies = [ "wasm-bindgen", ] @@ -2391,7 +2391,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.3.0" -source = "git+https://github.com/p2panda/p2panda?branch=main#2fba9c7fdbafbdc07fd0a943731150d8e81717e1" +source = "git+https://github.com/p2panda/p2panda?branch=main#2a73825700d914034d5da9b352a28f1acd7c710d" dependencies = [ "arrayvec 0.5.2", "async-trait", @@ -4070,9 +4070,9 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "wasm-bindgen" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27370197c907c55e3f1a9fbe26f44e937fe6451368324e009cba39e139dc08ad" +checksum = "7c53b543413a17a202f4be280a7e5c62a1c69345f5de525ee64f8cfdbc954994" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -4080,9 +4080,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53e04185bfa3a779273da532f5025e33398409573f348985af9a1cbf3774d3f4" +checksum = "5491a68ab4500fa6b4d726bd67408630c3dbe9c4fe7bda16d5c82a1fd8c7340a" dependencies = [ "bumpalo", "lazy_static", @@ -4095,9 +4095,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.30" +version = "0.4.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f741de44b75e14c35df886aff5f1eb73aa114fa5d4d00dcd37b5e01259bf3b2" +checksum = "de9a9cec1733468a8c657e57fa2413d2ae2c0129b95e87c5b72b8ace4d13f31f" dependencies = [ "cfg-if", "js-sys", @@ -4107,9 +4107,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17cae7ff784d7e83a2fe7611cfe766ecf034111b49deb850a3dc7699c08251f5" +checksum = "c441e177922bc58f1e12c022624b6216378e5febc2f0533e41ba443d505b80aa" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -4117,9 +4117,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "99ec0dc7a4756fffc231aab1b9f2f578d23cd391390ab27f952ae0c9b3ece20b" +checksum = "7d94ac45fcf608c1f45ef53e748d35660f168490c10b23704c7779ab8f5c3048" dependencies = [ "proc-macro2", "quote", @@ -4130,15 +4130,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.80" +version = "0.2.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d554b7f530dee5964d9a9468d95c1f8b8acae4f282807e7d27d4b03099a46744" +checksum = "6a89911bd99e5f3659ec4acf9c4d93b0a90fe4a2a11f15328472058edc5261be" [[package]] name = "web-sys" -version = "0.3.57" +version = "0.3.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b17e741662c70c8bd24ac5c5b18de314a2c26c32bf8346ee1e6f53de919c283" +checksum = "2fed94beee57daf8dd7d51f2b15dc2bcde92d7a72304cdf662a4371008b71b90" dependencies = [ "js-sys", "wasm-bindgen", From 50a7ea588420986682f85393e906609f8010bc93 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 8 Jun 2022 08:57:00 +0100 Subject: [PATCH 04/33] Tests for reduce_task --- aquadoggo/src/materializer/tasks/reduce.rs | 63 ++++++++++++++++++++++ aquadoggo/src/materializer/worker.rs | 1 + 2 files changed, 64 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 73ffe07be..c88c6a968 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -74,3 +74,66 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Task { pub type TaskResult = Result>>, TaskError>; /// Possible return values of a failed task. +#[derive(Debug)] pub enum TaskError { /// This tasks failed critically and will cause the whole program to panic. #[allow(dead_code)] From f8b5b8ef84fa31d442eb57feeab7512da81874fc Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 8 Jun 2022 08:57:32 +0100 Subject: [PATCH 05/33] Clipp --- aquadoggo/src/materializer/tasks/reduce.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index c88c6a968..be42f4199 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -55,7 +55,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Date: Wed, 8 Jun 2022 09:10:03 +0100 Subject: [PATCH 06/33] Test for reducing document_views --- aquadoggo/src/materializer/tasks/reduce.rs | 47 ++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index be42f4199..af03aa0d7 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -77,7 +77,9 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Date: Wed, 8 Jun 2022 11:01:49 +0100 Subject: [PATCH 07/33] Return next dependency tasks --- aquadoggo/src/db/stores/document.rs | 2 - aquadoggo/src/materializer/tasks/reduce.rs | 106 ++++++++++++++++++--- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index 37a07624d..9efa670fd 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -233,8 +233,6 @@ impl DocumentStore for SqlStorage { .await .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; - println!("{:#?}", document_view_field_rows); - if document_view_field_rows.is_empty() { return Ok(None); } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index af03aa0d7..a2d336726 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -1,11 +1,12 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::document::DocumentBuilder; +use p2panda_rs::document::{DocumentBuilder, DocumentViewValue}; +use p2panda_rs::operation::OperationValue; use p2panda_rs::storage_provider::traits::OperationStore; use crate::context::Context; use crate::db::traits::DocumentStore; -use crate::materializer::worker::{TaskError, TaskResult}; +use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { @@ -39,6 +40,27 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult = Vec::new(); + + let parse_new_tasks = |view_field: (&String, &DocumentViewValue)| { + match view_field.1.value() { + OperationValue::Relation(relation) => next_task_inputs.push(TaskInput::new( + Some(relation.document_id().to_owned()), + None, + )), + OperationValue::PinnedRelation(pinned_relation) => next_task_inputs.push( + TaskInput::new(None, Some(pinned_relation.view_id().to_owned())), + ), + OperationValue::RelationList(relation_list) => { + next_task_inputs.append(&mut relation_list.iter().map(|document_id|TaskInput::new(Some(document_id), None)).collect()); + } + OperationValue::PinnedRelationList(pinned_relation_list) => { + next_task_inputs.append(&mut pinned_relation_list.iter().map(|document_view_id|TaskInput::new(None, Some(document_view_id))).collect()); + } + _ => (), + }; + }; + match &input.document_view_id { Some(document_view_id) => { // TODO: If we are resolving a document_view_id, but we are missing operations from it's graph, @@ -48,16 +70,17 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { let document = DocumentBuilder::new(operations) @@ -68,19 +91,40 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Date: Wed, 8 Jun 2022 11:04:27 +0100 Subject: [PATCH 08/33] fmt --- aquadoggo/src/materializer/tasks/reduce.rs | 45 ++++++++++++++-------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index a2d336726..69cb78fa9 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -52,10 +52,20 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { - next_task_inputs.append(&mut relation_list.iter().map(|document_id|TaskInput::new(Some(document_id), None)).collect()); + next_task_inputs.append( + &mut relation_list + .iter() + .map(|document_id| TaskInput::new(Some(document_id), None)) + .collect(), + ); } OperationValue::PinnedRelationList(pinned_relation_list) => { - next_task_inputs.append(&mut pinned_relation_list.iter().map(|document_view_id|TaskInput::new(None, Some(document_view_id))).collect()); + next_task_inputs.append( + &mut pinned_relation_list + .iter() + .map(|document_view_id| TaskInput::new(None, Some(document_view_id))) + .collect(), + ); } _ => (), }; @@ -97,7 +107,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult TaskResult TaskResult Date: Wed, 8 Jun 2022 13:21:03 +0100 Subject: [PATCH 09/33] Implement dependency task logic --- aquadoggo/src/db/models/mod.rs | 2 + aquadoggo/src/db/models/relation.rs | 13 +++ aquadoggo/src/db/provider.rs | 69 +++++++++++ .../src/materializer/tasks/dependency.rs | 110 +++++++++++++++++- aquadoggo/src/materializer/tasks/reduce.rs | 106 ++++------------- 5 files changed, 215 insertions(+), 85 deletions(-) create mode 100644 aquadoggo/src/db/models/relation.rs diff --git a/aquadoggo/src/db/models/mod.rs b/aquadoggo/src/db/models/mod.rs index 469eb1311..2aa01c3ed 100644 --- a/aquadoggo/src/db/models/mod.rs +++ b/aquadoggo/src/db/models/mod.rs @@ -4,7 +4,9 @@ pub mod document; mod entry; mod log; mod operation; +mod relation; pub use self::log::LogRow; pub use entry::EntryRow; pub use operation::{OperationFieldsJoinedRow, OperationRow}; +pub use relation::RelationRow; diff --git a/aquadoggo/src/db/models/relation.rs b/aquadoggo/src/db/models/relation.rs new file mode 100644 index 000000000..10b6a87f8 --- /dev/null +++ b/aquadoggo/src/db/models/relation.rs @@ -0,0 +1,13 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use sqlx::FromRow; + +/// A struct representing a single row with joins from the document_view_fields table. +#[derive(FromRow, Debug, Clone)] +pub struct RelationRow { + /// The type of this field. + pub relation_type: String, + + /// The actual value contained in this field. + pub value: String, +} diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index 8c40bfef8..b25644f25 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,5 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; +use p2panda_rs::document::DocumentViewId; +use sqlx::query_as; use sqlx::query_scalar; use p2panda_rs::document::DocumentId; @@ -14,6 +16,9 @@ use crate::graphql::client::{ EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse, }; +use super::errors::DocumentStorageError; +use super::models::RelationRow; + #[derive(Debug, Clone)] /// Sql based storage that implements `StorageProvider` pub struct SqlStorage { @@ -71,3 +76,67 @@ impl StorageProvider for SqlStorage { Ok(hash) } } + +impl SqlStorage { + pub async fn get_document_view_dependencies( + &self, + document_view_id: &DocumentViewId, + ) -> StorageProviderResult> { + let relation_ids = query_as::<_, RelationRow>( + " + WITH RECURSIVE cte_relation (document_id, document_view_id, operation_id, relation_type, value) AS ( + SELECT + documents.document_id, + document_view_fields.document_view_id, + document_view_fields.operation_id, + operation_fields_v1.field_type, + operation_fields_v1.value + FROM + document_view_fields + LEFT JOIN operation_fields_v1 + ON + operation_fields_v1.operation_id = document_view_fields.operation_id + AND + operation_fields_v1.name = document_view_fields.name + LEFT JOIN documents + ON + documents.document_view_id = document_view_fields.document_view_id + WHERE + document_view_fields.document_view_id = $1 AND operation_fields_v1.field_type LIKE '%relation%' + + UNION ALL + + SELECT + documents.document_id, + document_view_fields.document_view_id, + document_view_fields.operation_id, + operation_fields_v1.field_type, + operation_fields_v1.value + FROM + document_view_fields + LEFT JOIN operation_fields_v1 + ON + operation_fields_v1.operation_id = document_view_fields.operation_id + AND + operation_fields_v1.name = document_view_fields.name + LEFT JOIN documents + ON + documents.document_view_id = document_view_fields.document_view_id + JOIN cte_relation + ON + cte_relation.value = document_view_fields.document_view_id + OR + cte_relation.value = documents.document_id + ) + + SELECT relation_type, value FROM cte_relation; + ", + ) + .bind(document_view_id.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + Ok(relation_ids) + } +} diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 389651fda..55c2fbbe5 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -1,9 +1,113 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use crate::context::Context; -use crate::materializer::worker::TaskResult; +use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn dependency_task(_context: Context, _input: TaskInput) -> TaskResult { - Ok(None) +pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + let dependencies = context + .store + .get_document_view_dependencies(&input.document_view_id.unwrap()) + .await + .map_err(|_| TaskError::Failure)?; + + let next_tasks: Vec> = dependencies + .iter() + .map(|relation_row| match relation_row.relation_type.as_str() { + "relation" => Task::new( + "reduce", + TaskInput::new(Some(relation_row.value.parse().unwrap()), None), + ), + "relation_list" => Task::new( + "reduce", + TaskInput::new(Some(relation_row.value.parse().unwrap()), None), + ), + "pinned_relation" => Task::new( + "reduce", + TaskInput::new(None, Some(relation_row.value.parse().unwrap())), + ), + "pinned_relation_list" => Task::new( + "reduce", + TaskInput::new(None, Some(relation_row.value.parse().unwrap())), + ), + _ => panic!("Not a relation type"), + }) + .collect(); + + let next_tasks = if next_tasks.is_empty() { + None + } else { + Some(next_tasks) + }; + + Ok(next_tasks) +} + +#[cfg(test)] +mod tests { + use p2panda_rs::operation::{ + OperationValue, PinnedRelation, PinnedRelationList, Relation, RelationList, + }; + use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; + use p2panda_rs::test_utils::fixtures::{random_document_id, random_document_view_id}; + use rstest::rstest; + + use crate::config::Configuration; + use crate::context::Context; + use crate::db::stores::test_utils::{test_db, TestSqlStore}; + use crate::db::traits::DocumentStore; + use crate::materializer::tasks::reduce_task; + use crate::materializer::TaskInput; + + use super::dependency_task; + + #[rstest] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], + vec![]), 1)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("favorite_book_images", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![]), 6)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("something_from_the_past", OperationValue::PinnedRelation(PinnedRelation::new(random_document_view_id())))], + vec![]), 1)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("many_previous_drafts", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect())))], + vec![]), 2)] + #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![]), 8)] + #[tokio::test] + async fn returns_dependency_task_inputs( + #[case] + #[future] + db: TestSqlStore, + #[case] expected_next_tasks: usize, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + + for document_id in &db.documents { + let input = TaskInput::new(Some(document_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap().unwrap(); + } + + for document_id in &db.documents { + let document_view = db + .store + .get_document_by_id(document_id) + .await + .unwrap() + .unwrap(); + + let input = TaskInput::new(None, Some(document_view.id().clone())); + + let reduce_tasks = dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + assert_eq!(reduce_tasks.len(), expected_next_tasks); + } + } } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 69cb78fa9..b59183da4 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::document::{DocumentBuilder, DocumentViewValue}; -use p2panda_rs::operation::OperationValue; +use p2panda_rs::document::DocumentBuilder; use p2panda_rs::storage_provider::traits::OperationStore; use crate::context::Context; @@ -40,38 +39,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult = Vec::new(); - - let parse_new_tasks = |view_field: (&String, &DocumentViewValue)| { - match view_field.1.value() { - OperationValue::Relation(relation) => next_task_inputs.push(TaskInput::new( - Some(relation.document_id().to_owned()), - None, - )), - OperationValue::PinnedRelation(pinned_relation) => next_task_inputs.push( - TaskInput::new(None, Some(pinned_relation.view_id().to_owned())), - ), - OperationValue::RelationList(relation_list) => { - next_task_inputs.append( - &mut relation_list - .iter() - .map(|document_id| TaskInput::new(Some(document_id), None)) - .collect(), - ); - } - OperationValue::PinnedRelationList(pinned_relation_list) => { - next_task_inputs.append( - &mut pinned_relation_list - .iter() - .map(|document_view_id| TaskInput::new(None, Some(document_view_id))) - .collect(), - ); - } - _ => (), - }; - }; - - match &input.document_view_id { + let document_view_id = match &input.document_view_id { Some(document_view_id) => { // TODO: If we are resolving a document_view_id, but we are missing operations from it's graph, // what do we want to return? @@ -79,8 +47,9 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult TaskResult { @@ -103,41 +71,26 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Date: Wed, 8 Jun 2022 14:28:35 +0100 Subject: [PATCH 10/33] Get parent relations too --- aquadoggo/src/db/provider.rs | 13 +- .../src/materializer/tasks/dependency.rs | 182 +++++++++++++++++- 2 files changed, 187 insertions(+), 8 deletions(-) diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index b25644f25..d7a11be1e 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -19,14 +19,12 @@ use crate::graphql::client::{ use super::errors::DocumentStorageError; use super::models::RelationRow; -#[derive(Debug, Clone)] -/// Sql based storage that implements `StorageProvider` +#[derive(Clone)] pub struct SqlStorage { pub(crate) pool: Pool, } impl SqlStorage { - /// Create a new `SqlStorage` using the provided db `Pool` pub fn new(pool: Pool) -> Self { Self { pool } } @@ -104,7 +102,7 @@ impl SqlStorage { WHERE document_view_fields.document_view_id = $1 AND operation_fields_v1.field_type LIKE '%relation%' - UNION ALL + UNION SELECT documents.document_id, @@ -127,9 +125,14 @@ impl SqlStorage { cte_relation.value = document_view_fields.document_view_id OR cte_relation.value = documents.document_id + OR + cte_relation.document_id = operation_fields_v1.value + OR + cte_relation.document_view_id = operation_fields_v1.value ) - SELECT relation_type, value FROM cte_relation; + SELECT relation_type, value FROM cte_relation + WHERE cte_relation.relation_type LIKE '%relation%'; ", ) .bind(document_view_id.as_str()) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 55c2fbbe5..b6318fdb8 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -9,7 +9,10 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult> = dependencies .iter() @@ -45,16 +48,19 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult Date: Wed, 8 Jun 2022 17:57:58 +0100 Subject: [PATCH 11/33] Handle errors --- .../src/materializer/tasks/dependency.rs | 66 +++++++++++++------ aquadoggo/src/materializer/tasks/reduce.rs | 41 +++++++++--- 2 files changed, 78 insertions(+), 29 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index b6318fdb8..b567cd6c4 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -5,37 +5,40 @@ use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + let document_view_id = match input.document_view_id { + Some(id) => id, + // We only accept handling dependency tasks for specific document views. + None => return Err(TaskError::Failure), + }; let dependencies = context .store - .get_document_view_dependencies(&input.document_view_id.unwrap()) + .get_document_view_dependencies(&document_view_id) .await - .map_err(|e| { - println!("{:#?}", e); - TaskError::Failure - })?; - - let next_tasks: Vec> = dependencies - .iter() - .map(|relation_row| match relation_row.relation_type.as_str() { - "relation" => Task::new( + .map_err(|_| TaskError::Critical)?; + + let mut next_tasks = Vec::new(); + + for relation_row in dependencies.iter() { + match relation_row.relation_type.as_str() { + "relation" => next_tasks.push(Task::new( "reduce", TaskInput::new(Some(relation_row.value.parse().unwrap()), None), - ), - "relation_list" => Task::new( + )), + "relation_list" => next_tasks.push(Task::new( "reduce", TaskInput::new(Some(relation_row.value.parse().unwrap()), None), - ), - "pinned_relation" => Task::new( + )), + "pinned_relation" => next_tasks.push(Task::new( "reduce", TaskInput::new(None, Some(relation_row.value.parse().unwrap())), - ), - "pinned_relation_list" => Task::new( + )), + "pinned_relation_list" => next_tasks.push(Task::new( "reduce", TaskInput::new(None, Some(relation_row.value.parse().unwrap())), - ), - _ => panic!("Not a relation type"), - }) - .collect(); + )), + _ => return Err(TaskError::Critical), + } + } let next_tasks = if next_tasks.is_empty() { None @@ -48,6 +51,7 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult, + #[case] document_view_id: Option, + #[from(test_db)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store, Configuration::default()); + let input = TaskInput::new(document_id, document_view_id); + + let next_tasks = dependency_task(context.clone(), input).await.unwrap(); + assert!(next_tasks.is_none()) + } } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index b59183da4..ba5275f3c 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -16,24 +16,24 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { let operation_id = document_view_id.clone().into_iter().next().unwrap(); - let document_id = context + match context .store .get_document_by_operation_id(operation_id) .await - .map_err(|_| TaskError::Failure)?; - match document_id { - Some(id) => Ok(id), + .map_err(|_| TaskError::Critical)? // We only get an error here on a critical database error. + { + Some(document_id) => Ok(document_id), None => Err(TaskError::Failure), } } - (_, _) => todo!(), + (_, _) => Err(TaskError::Failure), }?; let operations = context .store .get_operations_by_document_id(&document_id) .await - .map_err(|_| TaskError::Failure)? + .map_err(|_| TaskError::Critical)? .into_iter() // TODO: we can avoid this conversion if we do https://github.com/p2panda/p2panda/issues/320 .map(|op| op.into()) @@ -45,7 +45,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult TaskResult TaskResult, + #[case] document_view_id: Option, + #[from(test_db)] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store, Configuration::default()); + let input = TaskInput::new(document_id, document_view_id); + + reduce_task(context.clone(), input).await.unwrap(); + } } From ddda36c513ecc151cb2860f1652ac87ab4a1a333 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 9 Jun 2022 10:11:25 +0100 Subject: [PATCH 12/33] Completely redo dependency task logic :lolz: --- aquadoggo/src/db/models/mod.rs | 2 - aquadoggo/src/db/models/relation.rs | 13 - aquadoggo/src/db/provider.rs | 72 ++-- .../src/materializer/tasks/dependency.rs | 403 +++++++++++++++--- aquadoggo/src/materializer/tasks/reduce.rs | 2 +- aquadoggo/src/materializer/worker.rs | 2 +- 6 files changed, 383 insertions(+), 111 deletions(-) delete mode 100644 aquadoggo/src/db/models/relation.rs diff --git a/aquadoggo/src/db/models/mod.rs b/aquadoggo/src/db/models/mod.rs index 2aa01c3ed..469eb1311 100644 --- a/aquadoggo/src/db/models/mod.rs +++ b/aquadoggo/src/db/models/mod.rs @@ -4,9 +4,7 @@ pub mod document; mod entry; mod log; mod operation; -mod relation; pub use self::log::LogRow; pub use entry::EntryRow; pub use operation::{OperationFieldsJoinedRow, OperationRow}; -pub use relation::RelationRow; diff --git a/aquadoggo/src/db/models/relation.rs b/aquadoggo/src/db/models/relation.rs deleted file mode 100644 index 10b6a87f8..000000000 --- a/aquadoggo/src/db/models/relation.rs +++ /dev/null @@ -1,13 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-or-later - -use sqlx::FromRow; - -/// A struct representing a single row with joins from the document_view_fields table. -#[derive(FromRow, Debug, Clone)] -pub struct RelationRow { - /// The type of this field. - pub relation_type: String, - - /// The actual value contained in this field. - pub value: String, -} diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index d7a11be1e..1c097e882 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; use p2panda_rs::document::DocumentViewId; -use sqlx::query_as; use sqlx::query_scalar; use p2panda_rs::document::DocumentId; @@ -17,7 +16,6 @@ use crate::graphql::client::{ }; use super::errors::DocumentStorageError; -use super::models::RelationRow; #[derive(Clone)] pub struct SqlStorage { @@ -76,69 +74,71 @@ impl StorageProvider for SqlStorage { } impl SqlStorage { - pub async fn get_document_view_dependencies( + pub async fn get_parents_with_pinned_relation( &self, document_view_id: &DocumentViewId, - ) -> StorageProviderResult> { - let relation_ids = query_as::<_, RelationRow>( + ) -> StorageProviderResult> { + let relation_ids = query_scalar( " - WITH RECURSIVE cte_relation (document_id, document_view_id, operation_id, relation_type, value) AS ( + WITH parents_pinned_relation (document_view_id, relation_type, value) AS ( SELECT - documents.document_id, document_view_fields.document_view_id, - document_view_fields.operation_id, operation_fields_v1.field_type, operation_fields_v1.value FROM - document_view_fields - LEFT JOIN operation_fields_v1 + operation_fields_v1 + LEFT JOIN document_view_fields ON - operation_fields_v1.operation_id = document_view_fields.operation_id + document_view_fields.operation_id = operation_fields_v1.operation_id AND - operation_fields_v1.name = document_view_fields.name - LEFT JOIN documents - ON - documents.document_view_id = document_view_fields.document_view_id + document_view_fields.name = operation_fields_v1.name WHERE - document_view_fields.document_view_id = $1 AND operation_fields_v1.field_type LIKE '%relation%' + operation_fields_v1.value = $1 AND operation_fields_v1.field_type LIKE 'pinned_relation%' + ) + SELECT document_view_id FROM parents_pinned_relation; + ", + ) + .bind(document_view_id.as_str()) + .fetch_all(&self.pool) + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))? + .iter().map(|id: &String| id.parse().unwrap() ).collect(); - UNION + Ok(relation_ids) + } + pub async fn get_parents_with_unpinned_relation( + &self, + document_view_id: &DocumentId, + ) -> StorageProviderResult> { + let relation_ids: Vec = query_scalar( + " + WITH parents_unpinned_relation (document_view_id, relation_type, value) AS ( SELECT - documents.document_id, document_view_fields.document_view_id, - document_view_fields.operation_id, operation_fields_v1.field_type, operation_fields_v1.value FROM - document_view_fields - LEFT JOIN operation_fields_v1 + operation_fields_v1 + LEFT JOIN document_view_fields ON - operation_fields_v1.operation_id = document_view_fields.operation_id + document_view_fields.operation_id = operation_fields_v1.operation_id AND - operation_fields_v1.name = document_view_fields.name + document_view_fields.name = operation_fields_v1.name LEFT JOIN documents ON documents.document_view_id = document_view_fields.document_view_id - JOIN cte_relation - ON - cte_relation.value = document_view_fields.document_view_id - OR - cte_relation.value = documents.document_id - OR - cte_relation.document_id = operation_fields_v1.value - OR - cte_relation.document_view_id = operation_fields_v1.value + WHERE + operation_fields_v1.value = $1 AND operation_fields_v1.field_type LIKE 'relation%' ) - - SELECT relation_type, value FROM cte_relation - WHERE cte_relation.relation_type LIKE '%relation%'; + SELECT document_view_id FROM parents_unpinned_relation; ", ) .bind(document_view_id.as_str()) .fetch_all(&self.pool) .await - .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))? + .iter().map(|id: &String| id.parse().unwrap() ).collect(); Ok(relation_ids) } diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index b567cd6c4..13b2c6e86 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -1,52 +1,180 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use p2panda_rs::document::{DocumentId, DocumentViewId}; +use p2panda_rs::storage_provider::traits::OperationStore; + use crate::context::Context; +use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { - let document_view_id = match input.document_view_id { - Some(id) => id, - // We only accept handling dependency tasks for specific document views. - None => return Err(TaskError::Failure), - }; - let dependencies = context +async fn pinned_relation_task( + context: &Context, + document_view_id: DocumentViewId, +) -> Result>, TaskError> { + match context .store - .get_document_view_dependencies(&document_view_id) + .get_document_view_by_id(&document_view_id) .await - .map_err(|_| TaskError::Critical)?; + .map_err(|_| TaskError::Critical)? + { + Some(_) => Ok(None), + None => Ok(Some(Task::new( + "reduce", + TaskInput::new(None, Some(document_view_id)), + ))), + } +} - let mut next_tasks = Vec::new(); +async fn unpinned_relation_task( + context: &Context, + document_id: DocumentId, +) -> Result>, TaskError> { + match context + .store + .get_document_by_id(&document_id) + .await + .map_err(|_| TaskError::Critical)? + { + Some(_) => Ok(None), + None => Ok(Some(Task::new( + "reduce", + TaskInput::new(Some(document_id), None), + ))), + } +} + +pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + // PARSE INPUT ARGUMENTS // + + // Here we retrive the document view by document id or document view id depending on what was passed into + // the task. This is needed so we can access the document view id and also check if a document has been + // deleted. We also catch invalid task inputs here (both or neither ids were passed), failing the task + // in a non critical way if this is the case. + let (document_id, document_view) = match (&input.document_id, &input.document_view_id) { + (Some(document_id), None) => { + let document_view = context + .store + .get_document_by_id(document_id) + .await + .map_err(|_| TaskError::Critical)?; + Ok((document_id.to_owned(), document_view)) + } + // TODO: Alt approach: we could have a `get_operations_by_document_view_id()` in `OperationStore`, or + // could we even do this with some fancy recursive SQL query? We might need the `previous_operations` + // table back for that. + (None, Some(document_view_id)) => { + let document_view = context + .store + .get_document_view_by_id(document_view_id) + .await + .map_err(|_| TaskError::Critical)?; + + let operation_id = document_view_id.clone().into_iter().next().unwrap(); + match context + .store + .get_document_by_operation_id(operation_id) + .await + .map_err(|_| TaskError::Critical)? // We only get an error here on a critical database error. + { + Some(document_id) => Ok((document_id, document_view)), + None => Err(TaskError::Failure), + } + } + (_, _) => Err(TaskError::Failure), + }?; + + let document_view = match document_view { + Some(document_view) => document_view, + // If no document view for the id passed into this task could be retrieved then either + // it has been deleted or the id was somehow invalid. In that case we fail the task at this point. + None => return Err(TaskError::Failure), + }; - for relation_row in dependencies.iter() { - match relation_row.relation_type.as_str() { - "relation" => next_tasks.push(Task::new( - "reduce", - TaskInput::new(Some(relation_row.value.parse().unwrap()), None), - )), - "relation_list" => next_tasks.push(Task::new( - "reduce", - TaskInput::new(Some(relation_row.value.parse().unwrap()), None), - )), - "pinned_relation" => next_tasks.push(Task::new( - "reduce", - TaskInput::new(None, Some(relation_row.value.parse().unwrap())), - )), - "pinned_relation_list" => next_tasks.push(Task::new( - "reduce", - TaskInput::new(None, Some(relation_row.value.parse().unwrap())), - )), - _ => return Err(TaskError::Critical), + // FETCH DEPENDENCIES & COMPOSE TASKS // + + let mut child_tasks = Vec::new(); + let mut parent_tasks = Vec::new(); + + // First we handle all pinned or unpinned relations defined in this tasks document view. + // We can think of these "child" relations. We query the store for every child, if a view is + // returned we do nothing. If it doesn't yet exist in the store (it hasn't been materialised) + // we compose a "reduce" task for it. + + for (_key, document_view_value) in document_view.fields().iter() { + match document_view_value.value() { + p2panda_rs::operation::OperationValue::Relation(relation) => { + child_tasks + .push(unpinned_relation_task(&context, relation.document_id().clone()).await?); + } + p2panda_rs::operation::OperationValue::RelationList(relation_list) => { + for document_id in relation_list.iter() { + child_tasks.push(unpinned_relation_task(&context, document_id.clone()).await?); + } + } + p2panda_rs::operation::OperationValue::PinnedRelation(pinned_relation) => { + child_tasks + .push(pinned_relation_task(&context, pinned_relation.view_id().clone()).await?); + } + p2panda_rs::operation::OperationValue::PinnedRelationList(pinned_relation_list) => { + for document_view_id in pinned_relation_list.iter() { + child_tasks + .push(pinned_relation_task(&context, document_view_id.clone()).await?); + } + } + _ => (), } } - let next_tasks = if next_tasks.is_empty() { - None + // Next we want to find any existing documents in the store which relate to this document view OR document + // themselves. We do this for both pinned and unpinned relations incase this is the first time the tasks + // document is being materialised. Here we dispatch a "dependency" task for any documents found. + context + .store + .get_parents_with_unpinned_relation(&document_id) + .await + .map_err(|_| TaskError::Critical)? + .iter() + .for_each(|document_view_id| { + parent_tasks.push(Some(Task::new( + "dependency", + TaskInput::new(None, Some(document_view_id.clone())), + ))) + }); + + context + .store + .get_parents_with_pinned_relation(document_view.id()) + .await + .map_err(|_| TaskError::Critical)? + .into_iter() + .for_each(|document_view_id| { + parent_tasks.push(Some(Task::new( + "dependency", + TaskInput::new(None, Some(document_view_id)), + ))) + }); + + let mut next_tasks = Vec::new(); + let mut child_tasks: Vec> = child_tasks.into_iter().flatten().collect(); + let mut parent_tasks: Vec> = parent_tasks.into_iter().flatten().collect(); + + if child_tasks.is_empty() { + // This means all dependencies this document view relates to are met and we + // should dispatch a schema task. We also dispatch all parent dependency tasks + // incase they now have all their dependencies met. + next_tasks.append(&mut vec![Task::new( + "schema", + TaskInput::new(None, Some(document_view.id().clone())), + )]); + next_tasks.append(&mut parent_tasks); } else { - Some(next_tasks) + // If not all dependencies were met, then we want to dispatch all children and parent tasks. + next_tasks.append(&mut child_tasks); + next_tasks.append(&mut parent_tasks); }; - Ok(next_tasks) + Ok(Some(next_tasks)) } #[cfg(test)] @@ -96,7 +224,7 @@ mod tests { ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], ), 12)] #[tokio::test] - async fn returns_dependency_task_inputs( + async fn dispatches_reduce_tasks_for_child_dependencies( #[case] #[future] db: TestSqlStore, @@ -125,6 +253,9 @@ mod tests { .unwrap() .unwrap(); assert_eq!(reduce_tasks.len(), expected_next_tasks); + for task in reduce_tasks { + assert_eq!(task.0, "reduce") + } } } @@ -143,11 +274,11 @@ mod tests { ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], ), 12)] #[tokio::test] - async fn gets_parent_dependencies_as_well( + async fn dispatches_task_for_parent_dependencies_as_well( #[case] #[future] db: TestSqlStore, - #[case] expected_next_tasks: usize, + #[case] expected_reduce_tasks: usize, ) { let db = db.await; let context = Context::new(db.store.clone(), Configuration::default()); @@ -156,6 +287,9 @@ mod tests { let input = TaskInput::new(Some(document_id.clone()), None); reduce_task(context.clone(), input).await.unwrap().unwrap(); + // Here we have one materialised document, (we are calling it a child as we will shortly be publishing parents) + // it contains relations which are not materialised yet so should dispatch a reduce task for each one. + let document_view_of_child = db .store .get_document_by_id(&document_id) @@ -166,12 +300,17 @@ mod tests { let document_view_id_of_child = document_view_of_child.id(); let input = TaskInput::new(None, Some(document_view_id_of_child.clone())); - let reduce_tasks = dependency_task(context.clone(), input) + let tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); - assert_eq!(reduce_tasks.len(), expected_next_tasks); + assert_eq!(tasks.len(), expected_reduce_tasks); + for task in tasks { + assert_eq!(task.0, "reduce") + } + + // Create a new document referencing the existing materialised document. let operation = create_operation(&[( "relation_to_existing_document", @@ -180,49 +319,130 @@ mod tests { let (_, document_view_id_of_parent) = insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + // The child should now dispatch one dependency task for the parent as well as + // reduce tasks for it's children. + let input = TaskInput::new(None, Some(document_view_id_of_child.clone())); - let reduce_tasks_of_child = dependency_task(context.clone(), input) + let child_tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); + let expected_task_types = [("reduce", expected_reduce_tasks), ("dependency", 1)]; + assert_eq!(child_tasks.len(), expected_reduce_tasks + 1); + for (task_type, count) in expected_task_types { + assert_eq!( + child_tasks + .iter() + .filter(|task| task.0 == task_type) + .count(), + count + ); + } + + // The parent should dispatch one schema task as it has one dependency which was already materialised. + let input = TaskInput::new(None, Some(document_view_id_of_parent.clone())); - let reduce_tasks_of_parent = dependency_task(context.clone(), input) + let parent_tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); - assert_eq!(reduce_tasks_of_child.len(), expected_next_tasks + 1); - assert_eq!(reduce_tasks_of_parent.len(), expected_next_tasks + 1); - - let operation = create_operation(&[( - "parent_of_a_parent", - OperationValue::PinnedRelation(PinnedRelation::new(document_view_id_of_parent.clone())), - )]); + assert_eq!(parent_tasks.len(), 1); + assert_eq!(parent_tasks[0].0, "schema"); + + // Now we create another document with a pinned relation to the parent and a pinned relation + // list with an item pointing to the child document. + + let operation = create_operation(&[ + ( + "parent_of", + OperationValue::PinnedRelation(PinnedRelation::new( + document_view_id_of_parent.clone(), + )), + ), + ( + "grandparent_of", + OperationValue::PinnedRelationList(PinnedRelationList::new(vec![ + document_view_id_of_child.clone(), + ])), + ), + ]); let (_, document_view_id_of_grandparent) = insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; let input = TaskInput::new(None, Some(document_view_id_of_child.clone())); - let reduce_tasks_of_child = dependency_task(context.clone(), input) + let child_tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); let input = TaskInput::new(None, Some(document_view_id_of_parent.clone())); - let reduce_tasks_of_parent = dependency_task(context.clone(), input) + let parent_tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); let input = TaskInput::new(None, Some(document_view_id_of_grandparent)); - let reduce_tasks_of_grandparent = dependency_task(context.clone(), input) + let grandparent_tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); - assert_eq!(reduce_tasks_of_child.len(), expected_next_tasks + 2); - assert_eq!(reduce_tasks_of_parent.len(), expected_next_tasks + 2); - assert_eq!(reduce_tasks_of_grandparent.len(), expected_next_tasks + 2); + // The child dispatches an extra dependency task for it's grand parent. + + let expected_dependency_tasks = 2; + let expected_task_types = [ + ("reduce", expected_reduce_tasks), + ("dependency", expected_dependency_tasks), + ]; + assert_eq!( + child_tasks.len(), + expected_reduce_tasks + expected_dependency_tasks + ); + for (task_type, count) in expected_task_types { + assert_eq!( + child_tasks + .iter() + .filter(|task| task.0 == task_type) + .count(), + count + ); + } + + // The parent dispatches a dependency task for it's parent and schema task for itself. + + let expected_schema_tasks = 1; + let expected_dependency_tasks = 1; + let expected_task_types = [ + ("schema", expected_schema_tasks), + ("dependency", expected_dependency_tasks), + ]; + assert_eq!( + parent_tasks.len(), + expected_schema_tasks + expected_dependency_tasks + ); + for (task_type, count) in expected_task_types { + assert_eq!( + parent_tasks + .iter() + .filter(|task| task.0 == task_type) + .count(), + count + ); + } + + // The grandparent dispatches one schema task as all it's dependencies are met. + + let expected_schema_tasks = 1; + assert_eq!(grandparent_tasks.len(), expected_schema_tasks); + assert_eq!( + grandparent_tasks + .iter() + .filter(|task| task.0 == "schema") + .count(), + expected_schema_tasks + ); } #[rstest] @@ -278,25 +498,31 @@ mod tests { insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; let input = TaskInput::new(None, Some(document_view_id.clone())); - let reduce_tasks = dependency_task(context.clone(), input) + let tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); let input = TaskInput::new(None, Some(document_view_id_of_unrelated_document.clone())); - let reduce_tasks_of_unrelated_document = - dependency_task(context.clone(), input).await.unwrap(); + let tasks_of_unrelated_document = dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); - assert_eq!(reduce_tasks.len(), expected_next_tasks); - assert!(reduce_tasks_of_unrelated_document.is_none()); + assert_eq!(tasks.len(), expected_next_tasks); + assert_eq!(tasks_of_unrelated_document.len(), 1); + assert_eq!(tasks_of_unrelated_document[0].0, "schema"); } #[rstest] + #[should_panic(expected = "Failure")] #[case(None, Some(random_document_view_id()))] #[should_panic(expected = "Failure")] #[case(None, None)] #[should_panic(expected = "Failure")] #[case(Some(random_document_id()), None)] + #[should_panic(expected = "Failure")] + #[case(Some(random_document_id()), Some(random_document_view_id()))] #[tokio::test] async fn fails_correctly( #[case] document_id: Option, @@ -312,4 +538,65 @@ mod tests { let next_tasks = dependency_task(context.clone(), input).await.unwrap(); assert!(next_tasks.is_none()) } + + #[rstest] + #[tokio::test] + async fn gets_parent_relations( + #[from(test_db)] + #[future] + #[with(1, 1)] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + let document_id = db.documents[0].clone(); + + let input = TaskInput::new(Some(document_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap().unwrap(); + + // Here we have one materialised document, (we are calling it a child as we will shortly be publishing parents). + + let document_view_of_child = db + .store + .get_document_by_id(&document_id) + .await + .unwrap() + .unwrap(); + + let document_view_id_of_child = document_view_of_child.id(); + + // Create a new document referencing the existing materialised document by unpinned relation. + + let operation = create_operation(&[( + "relation_to_existing_document", + OperationValue::Relation(Relation::new(document_id.clone())), + )]); + let (_, document_view_id_of_parent) = + insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + + let parent_dependencies_unpinned = db + .store + .get_parents_with_unpinned_relation(&document_id) + .await + .unwrap(); + assert_eq!(parent_dependencies_unpinned.len(), 1); + assert_eq!(parent_dependencies_unpinned[0], document_view_id_of_parent); + + // Create a new document referencing the existing materialised document by pinned relation. + + let operation = create_operation(&[( + "pinned_relation_to_existing_document", + OperationValue::PinnedRelation(PinnedRelation::new(document_view_id_of_child.clone())), + )]); + let (_, document_view_id_of_parent) = + insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + + let parent_dependencies_pinned = db + .store + .get_parents_with_pinned_relation(document_view_id_of_child) + .await + .unwrap(); + assert_eq!(parent_dependencies_pinned.len(), 1); + assert_eq!(parent_dependencies_pinned[0], document_view_id_of_parent); + } } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index ba5275f3c..5bc102865 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -57,7 +57,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 393379a3d..84e1a6714 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -92,7 +92,7 @@ use triggered::{Listener, Trigger}; /// A task holding a generic input value and the name of the worker which will process it /// eventually. #[derive(Debug, Clone)] -pub struct Task(WorkerName, IN); +pub struct Task(pub WorkerName, IN); impl Task { /// Returns a new task. From 82188c2363aa9db31300b1faa44d18d6e886fcaf Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 9 Jun 2022 12:18:09 +0100 Subject: [PATCH 13/33] Add test for deleted documents --- .../src/materializer/tasks/dependency.rs | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 13b2c6e86..8c9b202cc 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -184,6 +184,7 @@ mod tests { use p2panda_rs::operation::{ OperationValue, PinnedRelation, PinnedRelationList, Relation, RelationList, }; + use p2panda_rs::storage_provider::traits::{AsStorageOperation, OperationStore}; use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; use p2panda_rs::test_utils::fixtures::{ create_operation, random_document_id, random_document_view_id, @@ -539,6 +540,45 @@ mod tests { assert!(next_tasks.is_none()) } + #[rstest] + #[should_panic(expected = "Failure")] + #[case(test_db(2, 1, true, TEST_SCHEMA_ID.parse().unwrap(), + vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], + vec![]))] + #[should_panic(expected = "Failure")] + #[case(test_db(2, 1, true, TEST_SCHEMA_ID.parse().unwrap(), + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + vec![]))] + #[tokio::test] + async fn fails_on_deleted_documents( + #[case] + #[future] + db: TestSqlStore, + ) { + let db = db.await; + let context = Context::new(db.store.clone(), Configuration::default()); + let document_id = db.documents[0].clone(); + + let input = TaskInput::new(Some(document_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap(); + + let document_operations = db + .store + .get_operations_by_document_id(&document_id) + .await + .unwrap(); + + let document_view_id: DocumentViewId = document_operations[1].id().into(); + + let input = TaskInput::new(None, Some(document_view_id.clone())); + + dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + } + #[rstest] #[tokio::test] async fn gets_parent_relations( From 2b68e43459cf7d211b7ca0e8bb221004cfe14f56 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 9 Jun 2022 15:53:44 +0100 Subject: [PATCH 14/33] Schema task --- aquadoggo/src/materializer/tasks/schema.rs | 34 ++++++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/schema.rs b/aquadoggo/src/materializer/tasks/schema.rs index e27ea5b88..11e150136 100644 --- a/aquadoggo/src/materializer/tasks/schema.rs +++ b/aquadoggo/src/materializer/tasks/schema.rs @@ -1,9 +1,37 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use crate::context::Context; -use crate::materializer::worker::TaskResult; +use crate::db::errors::SchemaStoreError; +use crate::db::traits::SchemaStore; +use crate::materializer::worker::{TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn schema_task(_context: Context, _input: TaskInput) -> TaskResult { - Ok(None) +pub async fn schema_task(context: Context, input: TaskInput) -> TaskResult { + match (&input.document_id, &input.document_view_id) { + (None, Some(document_view_id)) => { + match context.store.get_schema_by_id(document_view_id).await { + Ok(schema) => { + match schema { + Some(_schema) => { + // Get the schema into the schema service somehow // + + Ok(None) + } + None => Ok(None), + } + } + Err(e) => match e { + SchemaStoreError::MissingSchemaFieldDefinition(_, _) => { + // If this was a schema definition and it's dependencies aren't met, there's something wrong with our task logic + Err(TaskError::Critical) + } + // If there was a fatal storage error we should crash + SchemaStoreError::DocumentStorageError(_) => Err(TaskError::Critical), + // All other errors just mean this wasn't a schema definition + _ => Ok(None), + }, + } + } + (_, _) => Err(TaskError::Critical), + } } From 54e4a9bc2e6beb884d056f32bdd789ff8288165c Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 10 Jun 2022 13:03:34 +0100 Subject: [PATCH 15/33] Refactoring and commetns in reduce --- aquadoggo/src/materializer/tasks/reduce.rs | 132 ++++++++++++++------- 1 file changed, 87 insertions(+), 45 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 5bc102865..65f4e72da 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -9,74 +9,83 @@ use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { + // Parse the task input, if they are invalid (both or neither ids provided) we critically fail + // the task at this point. If only a document_view was passed we retrieve the document_id as + // it is needed later. let document_id = match (&input.document_id, &input.document_view_id) { (Some(document_id), None) => Ok(document_id.to_owned()), - // TODO: Alt approach: we could have a `get_operations_by_document_view_id()` in `OperationStore`, or - // could we even do this with some fancy recursive SQL query? We might need the `previous_operations` - // table back for that. (None, Some(document_view_id)) => { + // TODO: We can skip this step if we implement https://github.com/p2panda/aquadoggo/issues/148 let operation_id = document_view_id.clone().into_iter().next().unwrap(); match context .store .get_document_by_operation_id(operation_id) .await - .map_err(|_| TaskError::Critical)? // We only get an error here on a critical database error. + .map_err(|_| TaskError::Critical)? { Some(document_id) => Ok(document_id), None => Err(TaskError::Failure), } } - (_, _) => Err(TaskError::Failure), + (_, _) => Err(TaskError::Critical), }?; + // Get all operations for the requested document. let operations = context .store .get_operations_by_document_id(&document_id) .await .map_err(|_| TaskError::Critical)? .into_iter() - // TODO: we can avoid this conversion if we do https://github.com/p2panda/p2panda/issues/320 .map(|op| op.into()) .collect(); let document_view_id = match &input.document_view_id { + // If this task was passed a document_view_id as input then we want to build to document only to the + // requested view. Some(document_view_id) => { - // TODO: If we are resolving a document_view_id, but we are missing operations from it's graph, - // what do we want to return? - let document = DocumentBuilder::new(operations) + let document = match DocumentBuilder::new(operations) .build_to_view_id(Some(document_view_id.to_owned())) + { + Ok(document) => { + // If the document was deleted, then we return nothing. + if document.is_deleted() { + return Ok(None); + }; + document + } + Err(_) => return Ok(None), + }; + + // Insert the document document_view into the database. + context + .store + .insert_document_view(document.view().unwrap(), document.schema()) + .await .map_err(|_| TaskError::Critical)?; - if document.is_deleted() { - return Ok(None); - } else { + // Return the view id to be used in the resulting dependency task. + document.view_id().to_owned() + } + // If no document_view_id was passed, this is a document_id reduce task. + None => match DocumentBuilder::new(operations).build() { + Ok(document) => { + // Insert this document into storage. If it already existed, this will update it's current + // view. context .store - // Unwrap as all not deleted documents have a view. - .insert_document_view(document.view().unwrap(), document.schema()) + .insert_document(&document) .await - .map_err(|_| TaskError::Critical)?; - - document.view_id().to_owned() - } - } - None => { - let document = DocumentBuilder::new(operations) - .build() - .map_err(|_| TaskError::Failure)?; + .map_err(|_| TaskError::Failure)?; - context - .store - .insert_document(&document) - .await - .map_err(|_| TaskError::Failure)?; - - if document.is_deleted() { - return Ok(None); - } else { + if document.is_deleted() { + return Ok(None); + } + // Return the document_view id to be used in the resulting dependency task. document.view_id().to_owned() } - } + Err(_) => return Ok(None), + }, }; Ok(Some(vec![Task::new( @@ -87,9 +96,9 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Date: Mon, 13 Jun 2022 11:14:51 +0100 Subject: [PATCH 16/33] Comment for helper tasks --- aquadoggo/src/materializer/tasks/dependency.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 8c9b202cc..3f35803cf 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -8,6 +8,8 @@ use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; +/// helper method for retrieving a document view by it's document view id and if it doesn't exist in +/// the store, composing a "reduce" task for this specific document view. async fn pinned_relation_task( context: &Context, document_view_id: DocumentViewId, @@ -26,6 +28,8 @@ async fn pinned_relation_task( } } +/// helper method for retrieving a document view by a documents' id and if it doesn't exist in +/// the store, composing a "reduce" task for this document. async fn unpinned_relation_task( context: &Context, document_id: DocumentId, From a5f941518c0d68df8710012a4cb924d210ff321a Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 13 Jun 2022 11:26:09 +0100 Subject: [PATCH 17/33] A few more comments --- aquadoggo/src/materializer/tasks/dependency.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 3f35803cf..1fb2f7ce0 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -57,6 +57,7 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + // Fetch the current document view for this document. let document_view = context .store .get_document_by_id(document_id) @@ -68,13 +69,17 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + // Fetch the document_view from the store by it's document view id. let document_view = context .store .get_document_view_by_id(document_view_id) .await .map_err(|_| TaskError::Critical)?; + // This is a tip operation which we can use to get the document_id from the store. let operation_id = document_view_id.clone().into_iter().next().unwrap(); + + // Get the document_id for the passed document_view_id. match context .store .get_document_by_operation_id(operation_id) @@ -101,7 +106,7 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult Date: Mon, 13 Jun 2022 11:33:22 +0100 Subject: [PATCH 18/33] Some more comments --- aquadoggo/src/materializer/tasks/dependency.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 1fb2f7ce0..19e8e2c75 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -172,6 +172,12 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult Date: Mon, 13 Jun 2022 14:57:08 +0100 Subject: [PATCH 19/33] We don't need to check parent dependencies in dependency task --- .../src/materializer/tasks/dependency.rs | 600 ++++++------------ 1 file changed, 199 insertions(+), 401 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 19e8e2c75..db9157170 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -28,25 +28,25 @@ async fn pinned_relation_task( } } -/// helper method for retrieving a document view by a documents' id and if it doesn't exist in -/// the store, composing a "reduce" task for this document. -async fn unpinned_relation_task( - context: &Context, - document_id: DocumentId, -) -> Result>, TaskError> { - match context - .store - .get_document_by_id(&document_id) - .await - .map_err(|_| TaskError::Critical)? - { - Some(_) => Ok(None), - None => Ok(Some(Task::new( - "reduce", - TaskInput::new(Some(document_id), None), - ))), - } -} +// /// helper method for retrieving a document view by a documents' id and if it doesn't exist in +// /// the store, composing a "reduce" task for this document. +// async fn unpinned_relation_task( +// context: &Context, +// document_id: DocumentId, +// ) -> Result>, TaskError> { +// match context +// .store +// .get_document_by_id(&document_id) +// .await +// .map_err(|_| TaskError::Critical)? +// { +// Some(_) => Ok(None), +// None => Ok(Some(Task::new( +// "reduce", +// TaskInput::new(Some(document_id), None), +// ))), +// } +// } pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // PARSE INPUT ARGUMENTS // @@ -55,79 +55,60 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { - // Fetch the current document view for this document. - let document_view = context - .store - .get_document_by_id(document_id) - .await - .map_err(|_| TaskError::Critical)?; - Ok((document_id.to_owned(), document_view)) - } + let document_view = match (&input.document_id, &input.document_view_id) { // TODO: Alt approach: we could have a `get_operations_by_document_view_id()` in `OperationStore`, or // could we even do this with some fancy recursive SQL query? We might need the `previous_operations` // table back for that. (None, Some(document_view_id)) => { // Fetch the document_view from the store by it's document view id. - let document_view = context + context .store .get_document_view_by_id(document_view_id) .await - .map_err(|_| TaskError::Critical)?; - - // This is a tip operation which we can use to get the document_id from the store. - let operation_id = document_view_id.clone().into_iter().next().unwrap(); - - // Get the document_id for the passed document_view_id. - match context - .store - .get_document_by_operation_id(operation_id) - .await - .map_err(|_| TaskError::Critical)? // We only get an error here on a critical database error. - { - Some(document_id) => Ok((document_id, document_view)), - None => Err(TaskError::Failure), - } + .map_err(|_| TaskError::Critical) } - (_, _) => Err(TaskError::Failure), + (_, _) => Err(TaskError::Critical), }?; let document_view = match document_view { Some(document_view) => document_view, - // If no document view for the id passed into this task could be retrieved then either - // it has been deleted or the id was somehow invalid. In that case we fail the task at this point. - None => return Err(TaskError::Failure), + // If no document view for the id passed into this task could be retrieved then this + // document has been deleted or the document view id was invalid. As "dependency" tasks + // are only dispatched after a successful "reduce" task, neither `None` case should + // happen, so this is a critical error. + None => return Err(TaskError::Critical), }; // FETCH DEPENDENCIES & COMPOSE TASKS // - let mut child_tasks = Vec::new(); - let mut parent_tasks = Vec::new(); + let mut next_tasks = Vec::new(); // First we handle all pinned or unpinned relations defined in this tasks document view. - // We can think of these as "child" relations. We query the store for every child, if a view is - // returned we do nothing. If it doesn't yet exist in the store (it hasn't been materialised) - // we compose a "reduce" task for it. - + // We can think of these as "child" relations. for (_key, document_view_value) in document_view.fields().iter() { match document_view_value.value() { - p2panda_rs::operation::OperationValue::Relation(relation) => { - child_tasks - .push(unpinned_relation_task(&context, relation.document_id().clone()).await?); + p2panda_rs::operation::OperationValue::Relation(_relation) => { + // This is a relation to a document, if it doesn't exist in the db yet, then that + // means we either have no entries for this document, or we are not materialising + // it for some reason. We don't want to kick of a "reduce" or "dependency" task in + // either of these cases, however in the future we may want to flag it as a "wanted" + // schema. } - p2panda_rs::operation::OperationValue::RelationList(relation_list) => { - for document_id in relation_list.iter() { - child_tasks.push(unpinned_relation_task(&context, document_id.clone()).await?); - } + p2panda_rs::operation::OperationValue::RelationList(_relation_list) => { + // same as above... } p2panda_rs::operation::OperationValue::PinnedRelation(pinned_relation) => { - child_tasks + // These are pinned relations. We may have the operations for these views in the db, + // but this view wasn't pinned yet, so hasn't been materialised. To make sure it is + // materialised when possible, we dispatch a "reduce" task for any pinned relations + // which aren't found. + next_tasks .push(pinned_relation_task(&context, pinned_relation.view_id().clone()).await?); } p2panda_rs::operation::OperationValue::PinnedRelationList(pinned_relation_list) => { + // same as above... for document_view_id in pinned_relation_list.iter() { - child_tasks + next_tasks .push(pinned_relation_task(&context, document_view_id.clone()).await?); } } @@ -135,61 +116,7 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult> = child_tasks.into_iter().flatten().collect(); - let mut parent_tasks: Vec> = parent_tasks.into_iter().flatten().collect(); - - if child_tasks.is_empty() { - // This means all dependencies this document view relates to are met and we - // should dispatch a schema task. We also dispatch all parent dependency tasks - // incase they now have all their dependencies met. - // - // NOTE: We don't have access to the `SchemaId` of a document in this task at the moment, - // so we just dispatch a schema task for any documents who have their dependencies - // met. This is quite wasteful... we would need to add a storage method for accessing the - // schema id for any document/document_view id, or re-work `DocumentView` to contain the - // the schema id itself. - next_tasks.append(&mut vec![Task::new( - "schema", - TaskInput::new(None, Some(document_view.id().clone())), - )]); - next_tasks.append(&mut parent_tasks); - } else { - // If not all dependencies were met, then we want to dispatch all children and parent tasks. - next_tasks.append(&mut child_tasks); - next_tasks.append(&mut parent_tasks); - }; - - Ok(Some(next_tasks)) + Ok(Some(next_tasks.into_iter().flatten().collect())) } #[cfg(test)] @@ -218,10 +145,10 @@ mod tests { #[rstest] #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], - vec![]), 1)] + vec![]), 0)] #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("favorite_book_images", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![]), 6)] + vec![]), 0)] #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("something_from_the_past", OperationValue::PinnedRelation(PinnedRelation::new(random_document_view_id())))], vec![]), 1)] @@ -231,16 +158,16 @@ mod tests { #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![]), 8)] + vec![]), 2)] // This document has been updated #[case(test_db(4, 1, false, TEST_SCHEMA_ID.parse().unwrap(), vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 3].iter().map(|_|random_document_view_id()).collect()))), ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], - ), 12)] + ), 3)] #[tokio::test] - async fn dispatches_reduce_tasks_for_child_dependencies( + async fn dispatches_reduce_tasks_for_pinned_child_dependencies( #[case] #[future] db: TestSqlStore, @@ -276,25 +203,12 @@ mod tests { } #[rstest] - #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], - vec![]), 1)] - #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![]), 8)] - #[case(test_db(4, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], - ), 12)] #[tokio::test] - async fn dispatches_task_for_parent_dependencies_as_well( - #[case] + async fn no_reduce_task_for_materialised_document_relations( + #[from(test_db)] + #[with(1, 1)] #[future] db: TestSqlStore, - #[case] expected_reduce_tasks: usize, ) { let db = db.await; let context = Context::new(db.store.clone(), Configuration::default()); @@ -315,229 +229,113 @@ mod tests { let document_view_id_of_child = document_view_of_child.id(); - let input = TaskInput::new(None, Some(document_view_id_of_child.clone())); - let tasks = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - assert_eq!(tasks.len(), expected_reduce_tasks); - for task in tasks { - assert_eq!(task.0, "reduce") - } - // Create a new document referencing the existing materialised document. - let operation = create_operation(&[( - "relation_to_existing_document", - OperationValue::Relation(Relation::new(document_id.clone())), - )]); - let (_, document_view_id_of_parent) = - insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - // The child should now dispatch one dependency task for the parent as well as - // reduce tasks for it's children. - - let input = TaskInput::new(None, Some(document_view_id_of_child.clone())); - let child_tasks = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - let expected_task_types = [("reduce", expected_reduce_tasks), ("dependency", 1)]; - assert_eq!(child_tasks.len(), expected_reduce_tasks + 1); - for (task_type, count) in expected_task_types { - assert_eq!( - child_tasks - .iter() - .filter(|task| task.0 == task_type) - .count(), - count - ); - } - - // The parent should dispatch one schema task as it has one dependency which was already materialised. - - let input = TaskInput::new(None, Some(document_view_id_of_parent.clone())); - let parent_tasks = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - assert_eq!(parent_tasks.len(), 1); - assert_eq!(parent_tasks[0].0, "schema"); - - // Now we create another document with a pinned relation to the parent and a pinned relation - // list with an item pointing to the child document. - let operation = create_operation(&[ ( - "parent_of", + "pinned_relation_to_existing_document", OperationValue::PinnedRelation(PinnedRelation::new( - document_view_id_of_parent.clone(), + document_view_id_of_child.clone(), )), ), ( - "grandparent_of", - OperationValue::PinnedRelationList(PinnedRelationList::new(vec![ - document_view_id_of_child.clone(), - ])), + "pinned_relation_to_not_existing_document", + OperationValue::PinnedRelation(PinnedRelation::new(random_document_view_id())), ), ]); - let (_, document_view_id_of_grandparent) = - insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - let input = TaskInput::new(None, Some(document_view_id_of_child.clone())); - let child_tasks = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - let input = TaskInput::new(None, Some(document_view_id_of_parent.clone())); - let parent_tasks = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - let input = TaskInput::new(None, Some(document_view_id_of_grandparent)); - let grandparent_tasks = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - // The child dispatches an extra dependency task for it's grand parent. - - let expected_dependency_tasks = 2; - let expected_task_types = [ - ("reduce", expected_reduce_tasks), - ("dependency", expected_dependency_tasks), - ]; - assert_eq!( - child_tasks.len(), - expected_reduce_tasks + expected_dependency_tasks - ); - for (task_type, count) in expected_task_types { - assert_eq!( - child_tasks - .iter() - .filter(|task| task.0 == task_type) - .count(), - count - ); - } - - // The parent dispatches a dependency task for it's parent and schema task for itself. - - let expected_schema_tasks = 1; - let expected_dependency_tasks = 1; - let expected_task_types = [ - ("schema", expected_schema_tasks), - ("dependency", expected_dependency_tasks), - ]; - assert_eq!( - parent_tasks.len(), - expected_schema_tasks + expected_dependency_tasks - ); - for (task_type, count) in expected_task_types { - assert_eq!( - parent_tasks - .iter() - .filter(|task| task.0 == task_type) - .count(), - count - ); - } - - // The grandparent dispatches one schema task as all it's dependencies are met. - - let expected_schema_tasks = 1; - assert_eq!(grandparent_tasks.len(), expected_schema_tasks); - assert_eq!( - grandparent_tasks - .iter() - .filter(|task| task.0 == "schema") - .count(), - expected_schema_tasks - ); - } - - #[rstest] - #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], - vec![]), 1)] - #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![]), 8)] - #[case(test_db(4, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], - ), 12)] - #[tokio::test] - async fn ignores_fields_which_look_like_relations_but_are_not( - #[case] - #[future] - db: TestSqlStore, - #[case] expected_next_tasks: usize, - ) { - let db = db.await; - let context = Context::new(db.store.clone(), Configuration::default()); - let document_id = db.documents[0].clone(); - - let input = TaskInput::new(Some(document_id.clone()), None); - reduce_task(context.clone(), input).await.unwrap().unwrap(); - - let document_view = db - .store - .get_document_by_id(&document_id) - .await - .unwrap() - .unwrap(); - - let document_view_id = document_view.id(); - - let input = TaskInput::new(None, Some(document_view_id.clone())); - - dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - let operation = create_operation(&[( - "not_a_relation_but_contains_a_hash", - OperationValue::Text(document_id.as_str().to_string()), - )]); - - let (_, document_view_id_of_unrelated_document) = + let (_, document_view_id) = insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + // The new document should now dispatch one dependency task for the child relation which + // has not been materialised yet. let input = TaskInput::new(None, Some(document_view_id.clone())); let tasks = dependency_task(context.clone(), input) .await .unwrap() .unwrap(); - let input = TaskInput::new(None, Some(document_view_id_of_unrelated_document.clone())); - let tasks_of_unrelated_document = dependency_task(context.clone(), input) - .await - .unwrap() - .unwrap(); - - assert_eq!(tasks.len(), expected_next_tasks); - assert_eq!(tasks_of_unrelated_document.len(), 1); - assert_eq!(tasks_of_unrelated_document[0].0, "schema"); + assert_eq!(tasks.len(), 1); + assert_eq!(tasks[0].0, "reduce"); } + // #[rstest] + // #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + // vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], + // vec![]), 1)] + // #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + // vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + // ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + // vec![]), 8)] + // #[case(test_db(4, 1, false, TEST_SCHEMA_ID.parse().unwrap(), + // vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + // ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], + // vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), + // ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], + // ), 12)] + // #[tokio::test] + // async fn ignores_fields_which_look_like_relations_but_are_not( + // #[case] + // #[future] + // db: TestSqlStore, + // #[case] expected_next_tasks: usize, + // ) { + // let db = db.await; + // let context = Context::new(db.store.clone(), Configuration::default()); + // let document_id = db.documents[0].clone(); + + // let input = TaskInput::new(Some(document_id.clone()), None); + // reduce_task(context.clone(), input).await.unwrap().unwrap(); + + // let document_view = db + // .store + // .get_document_by_id(&document_id) + // .await + // .unwrap() + // .unwrap(); + + // let document_view_id = document_view.id(); + + // let input = TaskInput::new(None, Some(document_view_id.clone())); + + // dependency_task(context.clone(), input) + // .await + // .unwrap() + // .unwrap(); + + // let operation = create_operation(&[( + // "not_a_relation_but_contains_a_hash", + // OperationValue::Text(document_id.as_str().to_string()), + // )]); + + // let (_, document_view_id_of_unrelated_document) = + // insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + + // let input = TaskInput::new(None, Some(document_view_id.clone())); + // let tasks = dependency_task(context.clone(), input) + // .await + // .unwrap() + // .unwrap(); + + // let input = TaskInput::new(None, Some(document_view_id_of_unrelated_document.clone())); + // let tasks_of_unrelated_document = dependency_task(context.clone(), input) + // .await + // .unwrap() + // .unwrap(); + + // assert_eq!(tasks.len(), expected_next_tasks); + // assert_eq!(tasks_of_unrelated_document.len(), 1); + // assert_eq!(tasks_of_unrelated_document[0].0, "schema"); + // } + #[rstest] - #[should_panic(expected = "Failure")] + #[should_panic(expected = "Critical")] #[case(None, Some(random_document_view_id()))] - #[should_panic(expected = "Failure")] + #[should_panic(expected = "Critical")] #[case(None, None)] - #[should_panic(expected = "Failure")] + #[should_panic(expected = "Critical")] #[case(Some(random_document_id()), None)] - #[should_panic(expected = "Failure")] + #[should_panic(expected = "Critical")] #[case(Some(random_document_id()), Some(random_document_view_id()))] #[tokio::test] async fn fails_correctly( @@ -556,11 +354,11 @@ mod tests { } #[rstest] - #[should_panic(expected = "Failure")] + #[should_panic(expected = "Critical")] #[case(test_db(2, 1, true, TEST_SCHEMA_ID.parse().unwrap(), vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], vec![]))] - #[should_panic(expected = "Failure")] + #[should_panic(expected = "Critical")] #[case(test_db(2, 1, true, TEST_SCHEMA_ID.parse().unwrap(), vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], @@ -594,64 +392,64 @@ mod tests { .unwrap(); } - #[rstest] - #[tokio::test] - async fn gets_parent_relations( - #[from(test_db)] - #[future] - #[with(1, 1)] - db: TestSqlStore, - ) { - let db = db.await; - let context = Context::new(db.store.clone(), Configuration::default()); - let document_id = db.documents[0].clone(); - - let input = TaskInput::new(Some(document_id.clone()), None); - reduce_task(context.clone(), input).await.unwrap().unwrap(); - - // Here we have one materialised document, (we are calling it a child as we will shortly be publishing parents). - - let document_view_of_child = db - .store - .get_document_by_id(&document_id) - .await - .unwrap() - .unwrap(); - - let document_view_id_of_child = document_view_of_child.id(); - - // Create a new document referencing the existing materialised document by unpinned relation. - - let operation = create_operation(&[( - "relation_to_existing_document", - OperationValue::Relation(Relation::new(document_id.clone())), - )]); - let (_, document_view_id_of_parent) = - insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - let parent_dependencies_unpinned = db - .store - .get_parents_with_unpinned_relation(&document_id) - .await - .unwrap(); - assert_eq!(parent_dependencies_unpinned.len(), 1); - assert_eq!(parent_dependencies_unpinned[0], document_view_id_of_parent); - - // Create a new document referencing the existing materialised document by pinned relation. - - let operation = create_operation(&[( - "pinned_relation_to_existing_document", - OperationValue::PinnedRelation(PinnedRelation::new(document_view_id_of_child.clone())), - )]); - let (_, document_view_id_of_parent) = - insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - let parent_dependencies_pinned = db - .store - .get_parents_with_pinned_relation(document_view_id_of_child) - .await - .unwrap(); - assert_eq!(parent_dependencies_pinned.len(), 1); - assert_eq!(parent_dependencies_pinned[0], document_view_id_of_parent); - } + // #[rstest] + // #[tokio::test] + // async fn gets_parent_relations( + // #[from(test_db)] + // #[future] + // #[with(1, 1)] + // db: TestSqlStore, + // ) { + // let db = db.await; + // let context = Context::new(db.store.clone(), Configuration::default()); + // let document_id = db.documents[0].clone(); + + // let input = TaskInput::new(Some(document_id.clone()), None); + // reduce_task(context.clone(), input).await.unwrap().unwrap(); + + // // Here we have one materialised document, (we are calling it a child as we will shortly be publishing parents). + + // let document_view_of_child = db + // .store + // .get_document_by_id(&document_id) + // .await + // .unwrap() + // .unwrap(); + + // let document_view_id_of_child = document_view_of_child.id(); + + // // Create a new document referencing the existing materialised document by unpinned relation. + + // let operation = create_operation(&[( + // "relation_to_existing_document", + // OperationValue::Relation(Relation::new(document_id.clone())), + // )]); + // let (_, document_view_id_of_parent) = + // insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + + // let parent_dependencies_unpinned = db + // .store + // .get_parents_with_unpinned_relation(&document_id) + // .await + // .unwrap(); + // assert_eq!(parent_dependencies_unpinned.len(), 1); + // assert_eq!(parent_dependencies_unpinned[0], document_view_id_of_parent); + + // // Create a new document referencing the existing materialised document by pinned relation. + + // let operation = create_operation(&[( + // "pinned_relation_to_existing_document", + // OperationValue::PinnedRelation(PinnedRelation::new(document_view_id_of_child.clone())), + // )]); + // let (_, document_view_id_of_parent) = + // insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; + + // let parent_dependencies_pinned = db + // .store + // .get_parents_with_pinned_relation(document_view_id_of_child) + // .await + // .unwrap(); + // assert_eq!(parent_dependencies_pinned.len(), 1); + // assert_eq!(parent_dependencies_pinned[0], document_view_id_of_parent); + // } } From 89d4357b8e43d344a8c7710ca0e439f6dec39ac8 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 13 Jun 2022 15:07:06 +0100 Subject: [PATCH 20/33] Remove code relating to schema task --- aquadoggo/src/db/provider.rs | 74 --------- .../src/materializer/tasks/dependency.rs | 153 +----------------- aquadoggo/src/materializer/tasks/schema.rs | 34 +--- 3 files changed, 4 insertions(+), 257 deletions(-) diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index 1c097e882..d0a744b4a 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,6 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; -use p2panda_rs::document::DocumentViewId; use sqlx::query_scalar; use p2panda_rs::document::DocumentId; @@ -15,8 +14,6 @@ use crate::graphql::client::{ EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse, }; -use super::errors::DocumentStorageError; - #[derive(Clone)] pub struct SqlStorage { pub(crate) pool: Pool, @@ -72,74 +69,3 @@ impl StorageProvider for SqlStorage { Ok(hash) } } - -impl SqlStorage { - pub async fn get_parents_with_pinned_relation( - &self, - document_view_id: &DocumentViewId, - ) -> StorageProviderResult> { - let relation_ids = query_scalar( - " - WITH parents_pinned_relation (document_view_id, relation_type, value) AS ( - SELECT - document_view_fields.document_view_id, - operation_fields_v1.field_type, - operation_fields_v1.value - FROM - operation_fields_v1 - LEFT JOIN document_view_fields - ON - document_view_fields.operation_id = operation_fields_v1.operation_id - AND - document_view_fields.name = operation_fields_v1.name - WHERE - operation_fields_v1.value = $1 AND operation_fields_v1.field_type LIKE 'pinned_relation%' - ) - SELECT document_view_id FROM parents_pinned_relation; - ", - ) - .bind(document_view_id.as_str()) - .fetch_all(&self.pool) - .await - .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))? - .iter().map(|id: &String| id.parse().unwrap() ).collect(); - - Ok(relation_ids) - } - - pub async fn get_parents_with_unpinned_relation( - &self, - document_view_id: &DocumentId, - ) -> StorageProviderResult> { - let relation_ids: Vec = query_scalar( - " - WITH parents_unpinned_relation (document_view_id, relation_type, value) AS ( - SELECT - document_view_fields.document_view_id, - operation_fields_v1.field_type, - operation_fields_v1.value - FROM - operation_fields_v1 - LEFT JOIN document_view_fields - ON - document_view_fields.operation_id = operation_fields_v1.operation_id - AND - document_view_fields.name = operation_fields_v1.name - LEFT JOIN documents - ON - documents.document_view_id = document_view_fields.document_view_id - WHERE - operation_fields_v1.value = $1 AND operation_fields_v1.field_type LIKE 'relation%' - ) - SELECT document_view_id FROM parents_unpinned_relation; - ", - ) - .bind(document_view_id.as_str()) - .fetch_all(&self.pool) - .await - .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))? - .iter().map(|id: &String| id.parse().unwrap() ).collect(); - - Ok(relation_ids) - } -} diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index db9157170..254298ba6 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -1,7 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::document::{DocumentId, DocumentViewId}; -use p2panda_rs::storage_provider::traits::OperationStore; +use p2panda_rs::document::DocumentViewId; use crate::context::Context; use crate::db::traits::DocumentStore; @@ -28,26 +27,6 @@ async fn pinned_relation_task( } } -// /// helper method for retrieving a document view by a documents' id and if it doesn't exist in -// /// the store, composing a "reduce" task for this document. -// async fn unpinned_relation_task( -// context: &Context, -// document_id: DocumentId, -// ) -> Result>, TaskError> { -// match context -// .store -// .get_document_by_id(&document_id) -// .await -// .map_err(|_| TaskError::Critical)? -// { -// Some(_) => Ok(None), -// None => Ok(Some(Task::new( -// "reduce", -// TaskInput::new(Some(document_id), None), -// ))), -// } -// } - pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // PARSE INPUT ARGUMENTS // @@ -259,75 +238,6 @@ mod tests { assert_eq!(tasks[0].0, "reduce"); } - // #[rstest] - // #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - // vec![("profile_picture", OperationValue::Relation(Relation::new(random_document_id())))], - // vec![]), 1)] - // #[case(test_db(1, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - // vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - // ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - // vec![]), 8)] - // #[case(test_db(4, 1, false, TEST_SCHEMA_ID.parse().unwrap(), - // vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - // ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 6].iter().map(|_|random_document_id()).collect())))], - // vec![("one_relation_field", OperationValue::PinnedRelationList(PinnedRelationList::new([0; 2].iter().map(|_|random_document_view_id()).collect()))), - // ("another_relation_field", OperationValue::RelationList(RelationList::new([0; 10].iter().map(|_|random_document_id()).collect())))], - // ), 12)] - // #[tokio::test] - // async fn ignores_fields_which_look_like_relations_but_are_not( - // #[case] - // #[future] - // db: TestSqlStore, - // #[case] expected_next_tasks: usize, - // ) { - // let db = db.await; - // let context = Context::new(db.store.clone(), Configuration::default()); - // let document_id = db.documents[0].clone(); - - // let input = TaskInput::new(Some(document_id.clone()), None); - // reduce_task(context.clone(), input).await.unwrap().unwrap(); - - // let document_view = db - // .store - // .get_document_by_id(&document_id) - // .await - // .unwrap() - // .unwrap(); - - // let document_view_id = document_view.id(); - - // let input = TaskInput::new(None, Some(document_view_id.clone())); - - // dependency_task(context.clone(), input) - // .await - // .unwrap() - // .unwrap(); - - // let operation = create_operation(&[( - // "not_a_relation_but_contains_a_hash", - // OperationValue::Text(document_id.as_str().to_string()), - // )]); - - // let (_, document_view_id_of_unrelated_document) = - // insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - // let input = TaskInput::new(None, Some(document_view_id.clone())); - // let tasks = dependency_task(context.clone(), input) - // .await - // .unwrap() - // .unwrap(); - - // let input = TaskInput::new(None, Some(document_view_id_of_unrelated_document.clone())); - // let tasks_of_unrelated_document = dependency_task(context.clone(), input) - // .await - // .unwrap() - // .unwrap(); - - // assert_eq!(tasks.len(), expected_next_tasks); - // assert_eq!(tasks_of_unrelated_document.len(), 1); - // assert_eq!(tasks_of_unrelated_document[0].0, "schema"); - // } - #[rstest] #[should_panic(expected = "Critical")] #[case(None, Some(random_document_view_id()))] @@ -391,65 +301,4 @@ mod tests { .unwrap() .unwrap(); } - - // #[rstest] - // #[tokio::test] - // async fn gets_parent_relations( - // #[from(test_db)] - // #[future] - // #[with(1, 1)] - // db: TestSqlStore, - // ) { - // let db = db.await; - // let context = Context::new(db.store.clone(), Configuration::default()); - // let document_id = db.documents[0].clone(); - - // let input = TaskInput::new(Some(document_id.clone()), None); - // reduce_task(context.clone(), input).await.unwrap().unwrap(); - - // // Here we have one materialised document, (we are calling it a child as we will shortly be publishing parents). - - // let document_view_of_child = db - // .store - // .get_document_by_id(&document_id) - // .await - // .unwrap() - // .unwrap(); - - // let document_view_id_of_child = document_view_of_child.id(); - - // // Create a new document referencing the existing materialised document by unpinned relation. - - // let operation = create_operation(&[( - // "relation_to_existing_document", - // OperationValue::Relation(Relation::new(document_id.clone())), - // )]); - // let (_, document_view_id_of_parent) = - // insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - // let parent_dependencies_unpinned = db - // .store - // .get_parents_with_unpinned_relation(&document_id) - // .await - // .unwrap(); - // assert_eq!(parent_dependencies_unpinned.len(), 1); - // assert_eq!(parent_dependencies_unpinned[0], document_view_id_of_parent); - - // // Create a new document referencing the existing materialised document by pinned relation. - - // let operation = create_operation(&[( - // "pinned_relation_to_existing_document", - // OperationValue::PinnedRelation(PinnedRelation::new(document_view_id_of_child.clone())), - // )]); - // let (_, document_view_id_of_parent) = - // insert_entry_operation_and_view(&db.store, &KeyPair::new(), None, &operation).await; - - // let parent_dependencies_pinned = db - // .store - // .get_parents_with_pinned_relation(document_view_id_of_child) - // .await - // .unwrap(); - // assert_eq!(parent_dependencies_pinned.len(), 1); - // assert_eq!(parent_dependencies_pinned[0], document_view_id_of_parent); - // } } diff --git a/aquadoggo/src/materializer/tasks/schema.rs b/aquadoggo/src/materializer/tasks/schema.rs index 11e150136..d4a35e4e2 100644 --- a/aquadoggo/src/materializer/tasks/schema.rs +++ b/aquadoggo/src/materializer/tasks/schema.rs @@ -1,37 +1,9 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use crate::context::Context; -use crate::db::errors::SchemaStoreError; -use crate::db::traits::SchemaStore; -use crate::materializer::worker::{TaskError, TaskResult}; +use crate::materializer::worker::TaskResult; use crate::materializer::TaskInput; -pub async fn schema_task(context: Context, input: TaskInput) -> TaskResult { - match (&input.document_id, &input.document_view_id) { - (None, Some(document_view_id)) => { - match context.store.get_schema_by_id(document_view_id).await { - Ok(schema) => { - match schema { - Some(_schema) => { - // Get the schema into the schema service somehow // - - Ok(None) - } - None => Ok(None), - } - } - Err(e) => match e { - SchemaStoreError::MissingSchemaFieldDefinition(_, _) => { - // If this was a schema definition and it's dependencies aren't met, there's something wrong with our task logic - Err(TaskError::Critical) - } - // If there was a fatal storage error we should crash - SchemaStoreError::DocumentStorageError(_) => Err(TaskError::Critical), - // All other errors just mean this wasn't a schema definition - _ => Ok(None), - }, - } - } - (_, _) => Err(TaskError::Critical), - } +pub async fn schema_task(_context: Context, _input: TaskInput) -> TaskResult { + todo!() } From 15ff1a0df0c77863d5bbc770d83a28440b95dc20 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 13 Jun 2022 15:20:42 +0100 Subject: [PATCH 21/33] Fix reduce error test --- aquadoggo/src/materializer/tasks/reduce.rs | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 65f4e72da..c33274880 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -24,7 +24,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Ok(document_id), - None => Err(TaskError::Failure), + None => return Ok(None), } } (_, _) => Err(TaskError::Critical), @@ -76,7 +76,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult, From e2193a7de664ff7eba07b7af13077edab9ecc83d Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 13 Jun 2022 16:21:28 +0100 Subject: [PATCH 22/33] Make clippy happy --- aquadoggo/src/materializer/worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 84e1a6714..871790de6 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -110,10 +110,10 @@ pub type TaskResult = Result>>, TaskError>; #[derive(Debug)] pub enum TaskError { /// This tasks failed critically and will cause the whole program to panic. - #[allow(dead_code)] Critical, /// This task failed silently without any further effects. + #[allow(dead_code)] Failure, } From 6f8b43d0607be65e731a85035535fa6945330dae Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 13 Jun 2022 16:40:35 +0100 Subject: [PATCH 23/33] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 23c932250..a874e6dde 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - GraphQL endpoint for publishing entries [#123](https://github.com/p2panda/aquadoggo/pull/132) - Implement SQL `DocumentStore` [#118](https://github.com/p2panda/aquadoggo/pull/118) - Implement SQL `SchemaStore` [#130](https://github.com/p2panda/aquadoggo/pull/130) +- Reduce and dependency tasks [#144](https://github.com/p2panda/aquadoggo/pull/144) - GraphQL endpoints for replication [#100](https://github.com/p2panda/aquadoggo/pull/100) ### Changed From 89783ac47f0b6c7a95a2f151d173935dd28d0ad7 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 14 Jun 2022 12:14:54 +0100 Subject: [PATCH 24/33] Changes after rebase --- .../src/materializer/tasks/dependency.rs | 7 ++++--- aquadoggo/src/materializer/tasks/reduce.rs | 19 +++++-------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 254298ba6..2e70fe261 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -103,9 +103,10 @@ mod tests { use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::identity::KeyPair; use p2panda_rs::operation::{ - OperationValue, PinnedRelation, PinnedRelationList, Relation, RelationList, + AsVerifiedOperation, OperationValue, PinnedRelation, PinnedRelationList, Relation, + RelationList, }; - use p2panda_rs::storage_provider::traits::{AsStorageOperation, OperationStore}; + use p2panda_rs::storage_provider::traits::OperationStore; use p2panda_rs::test_utils::constants::TEST_SCHEMA_ID; use p2panda_rs::test_utils::fixtures::{ create_operation, random_document_id, random_document_view_id, @@ -292,7 +293,7 @@ mod tests { .await .unwrap(); - let document_view_id: DocumentViewId = document_operations[1].id().into(); + let document_view_id: DocumentViewId = document_operations[1].operation_id().clone().into(); let input = TaskInput::new(None, Some(document_view_id.clone())); diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index c33274880..4ac94b732 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -19,7 +19,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult TaskResult TaskResult Date: Tue, 14 Jun 2022 13:59:06 +0100 Subject: [PATCH 25/33] Review changes --- .../src/materializer/tasks/dependency.rs | 58 ++++++++----------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 2e70fe261..b01410735 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -7,9 +7,9 @@ use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -/// helper method for retrieving a document view by it's document view id and if it doesn't exist in +/// Helper method for retrieving a document view by it's document view id and if it doesn't exist in /// the store, composing a "reduce" task for this specific document view. -async fn pinned_relation_task( +async fn construct_relation_task( context: &Context, document_view_id: DocumentViewId, ) -> Result>, TaskError> { @@ -28,38 +28,25 @@ async fn pinned_relation_task( } pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { - // PARSE INPUT ARGUMENTS // - - // Here we retrive the document view by document id or document view id depending on what was passed into - // the task. This is needed so we can access the document view id and also check if a document has been - // deleted. We also catch invalid task inputs here (both or neither ids were passed), failing the task - // in a non critical way if this is the case. - let document_view = match (&input.document_id, &input.document_view_id) { - // TODO: Alt approach: we could have a `get_operations_by_document_view_id()` in `OperationStore`, or - // could we even do this with some fancy recursive SQL query? We might need the `previous_operations` - // table back for that. - (None, Some(document_view_id)) => { - // Fetch the document_view from the store by it's document view id. - context - .store - .get_document_view_by_id(document_view_id) - .await - .map_err(|_| TaskError::Critical) - } - (_, _) => Err(TaskError::Critical), + // Here we retrive the document view by document view id. + let document_view = match input.document_view_id { + Some(view_id) => match context + .store + .get_document_view_by_id(&view_id) + .await + .map_err(|_| TaskError::Critical)? + { + Some(document_view) => Ok(document_view), + // If no document view for the id passed into this task could be retrieved then this + // document has been deleted or the document view id was invalid. As "dependency" tasks + // are only dispatched after a successful "reduce" task, neither `None` case should + // happen, so this is a critical error. + None => Err(TaskError::Critical), + }, + // We only expect to handle document_view_ids in a dependency task. + None => Err(TaskError::Critical), }?; - let document_view = match document_view { - Some(document_view) => document_view, - // If no document view for the id passed into this task could be retrieved then this - // document has been deleted or the document view id was invalid. As "dependency" tasks - // are only dispatched after a successful "reduce" task, neither `None` case should - // happen, so this is a critical error. - None => return Err(TaskError::Critical), - }; - - // FETCH DEPENDENCIES & COMPOSE TASKS // - let mut next_tasks = Vec::new(); // First we handle all pinned or unpinned relations defined in this tasks document view. @@ -81,14 +68,15 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // same as above... for document_view_id in pinned_relation_list.iter() { next_tasks - .push(pinned_relation_task(&context, document_view_id.clone()).await?); + .push(construct_relation_task(&context, document_view_id.clone()).await?); } } _ => (), From 1bb76ec8907db14661ac750ae2ff86e7a6d56027 Mon Sep 17 00:00:00 2001 From: Vincent Ahrend Date: Tue, 14 Jun 2022 20:37:36 +0200 Subject: [PATCH 26/33] Rephrase dependency task docstrings --- aquadoggo/src/materializer/tasks/dependency.rs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index b01410735..eaf5485e0 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -7,8 +7,8 @@ use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -/// Helper method for retrieving a document view by it's document view id and if it doesn't exist in -/// the store, composing a "reduce" task for this specific document view. +/// Returns a _reduce_ task for a given document view only if that view does not yet exist in the +/// store. async fn construct_relation_task( context: &Context, document_view_id: DocumentViewId, @@ -27,6 +27,10 @@ async fn construct_relation_task( } } +/// A dependency task prepares _reduce_ tasks for all pinned relations of a given document view. +/// +/// Expects a _reduce_ task to have completed successfully for the given document view itself and +/// returns a critical error otherwise. pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // Here we retrive the document view by document view id. let document_view = match input.document_view_id { @@ -49,7 +53,7 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult Date: Wed, 15 Jun 2022 09:57:10 +0100 Subject: [PATCH 27/33] Refactoring in dependency task --- .../src/materializer/tasks/dependency.rs | 64 ++++++++++--------- 1 file changed, 33 insertions(+), 31 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index eaf5485e0..b34ea5084 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -7,26 +7,6 @@ use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -/// Returns a _reduce_ task for a given document view only if that view does not yet exist in the -/// store. -async fn construct_relation_task( - context: &Context, - document_view_id: DocumentViewId, -) -> Result>, TaskError> { - match context - .store - .get_document_view_by_id(&document_view_id) - .await - .map_err(|_| TaskError::Critical)? - { - Some(_) => Ok(None), - None => Ok(Some(Task::new( - "reduce", - TaskInput::new(None, Some(document_view_id)), - ))), - } -} - /// A dependency task prepares _reduce_ tasks for all pinned relations of a given document view. /// /// Expects a _reduce_ task to have completed successfully for the given document view itself and @@ -34,20 +14,22 @@ async fn construct_relation_task( pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // Here we retrive the document view by document view id. let document_view = match input.document_view_id { - Some(view_id) => match context + Some(view_id) => context .store .get_document_view_by_id(&view_id) .await - .map_err(|_| TaskError::Critical)? - { - Some(document_view) => Ok(document_view), - // If no document view for the id passed into this task could be retrieved then this - // document has been deleted or the document view id was invalid. As "dependency" tasks - // are only dispatched after a successful "reduce" task, neither `None` case should - // happen, so this is a critical error. - None => Err(TaskError::Critical), - }, - // We only expect to handle document_view_ids in a dependency task. + .map_err(|_| TaskError::Critical) + , + // We expect to handle document_view_ids in a dependency task. + None => Err(TaskError::Critical), + }?; + + let document_view = match document_view { + Some(document_view) => Ok(document_view), + // If no document view for the id passed into this task could be retrieved then this + // document has been deleted or the document view id was invalid. As "dependency" tasks + // are only dispatched after a successful "reduce" task, neither `None` case should + // happen, so this is a critical error. None => Err(TaskError::Critical), }?; @@ -90,6 +72,26 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult Result>, TaskError> { + match context + .store + .get_document_view_by_id(&document_view_id) + .await + .map_err(|_| TaskError::Critical)? + { + Some(_) => Ok(None), + None => Ok(Some(Task::new( + "reduce", + TaskInput::new(None, Some(document_view_id)), + ))), + } +} + #[cfg(test)] mod tests { use p2panda_rs::document::{DocumentId, DocumentViewId}; From 95abba92357cbded28b5f2a7e0844e8525c45b81 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 15 Jun 2022 10:05:21 +0100 Subject: [PATCH 28/33] More detailed docs for sependency task --- aquadoggo/src/materializer/tasks/dependency.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index b34ea5084..38df60a68 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -9,6 +9,13 @@ use crate::materializer::TaskInput; /// A dependency task prepares _reduce_ tasks for all pinned relations of a given document view. /// +/// This task is dispatched after a reduce task completes. It identifies any pinned relations +/// present in a given document view as we need to guarantee the required document views are +/// materialised and stored in the database. We may have the required operations on the node +/// already, but they were never materialised to the document view required by the pinned +/// relation. In order to guarantee all required document views are present we dispatch a +/// reduce task for the view of each pinned relation found. +/// /// Expects a _reduce_ task to have completed successfully for the given document view itself and /// returns a critical error otherwise. pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { From 7263bfc000ee886ae2d70264da76efe86627a07e Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 15 Jun 2022 10:08:57 +0100 Subject: [PATCH 29/33] Reduce task doc string --- aquadoggo/src/materializer/tasks/reduce.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 4ac94b732..09506d292 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -8,6 +8,12 @@ use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; +/// A reduce task is dispatched for every entry and operation pair which arrives at a node. They +/// may also be dispatched from a dependency task when a pinned relations is present on an already +/// materialised document view. +/// +/// After succesfully reducing and storing a document view an array of dependency tasks is returned. +/// If invalid inputs were passed or a fatal db error occured a critical error is returned. pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { // Parse the task input, if they are invalid (both or neither ids provided) we critically fail // the task at this point. If only a document_view was passed we retrieve the document_id as From 1f337436ee3794c282403055daad547fefa1074e Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 15 Jun 2022 10:11:38 +0100 Subject: [PATCH 30/33] Remove comment about wanted schema --- aquadoggo/src/materializer/tasks/dependency.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 38df60a68..3daa18f4c 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -50,8 +50,7 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { // same as above... From 43bd91a75b0e6336b39042f8a6f87987dde509e5 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Wed, 15 Jun 2022 17:05:30 +0200 Subject: [PATCH 31/33] Fix rebase --- aquadoggo/src/db/provider.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index d0a744b4a..22fd704c2 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,4 +1,5 @@ // SPDX-License-Identifier: AGPL-3.0-or-later + use async_trait::async_trait; use sqlx::query_scalar; @@ -14,7 +15,8 @@ use crate::graphql::client::{ EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse, }; -#[derive(Clone)] +/// Sql based storage that implements `StorageProvider` +#[derive(Clone, Debug)] pub struct SqlStorage { pub(crate) pool: Pool, } From ba3d64917057b84d98996470f1df675f02c1ef84 Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Wed, 15 Jun 2022 17:06:06 +0200 Subject: [PATCH 32/33] Fix missing docstring after rebase --- aquadoggo/src/db/provider.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index 22fd704c2..6c75ffd58 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -15,19 +15,21 @@ use crate::graphql::client::{ EntryArgsRequest, EntryArgsResponse, PublishEntryRequest, PublishEntryResponse, }; -/// Sql based storage that implements `StorageProvider` +/// Sql based storage that implements `StorageProvider`. #[derive(Clone, Debug)] pub struct SqlStorage { pub(crate) pool: Pool, } impl SqlStorage { + /// Create a new `SqlStorage` using the provided db `Pool`. pub fn new(pool: Pool) -> Self { Self { pool } } } -/// A `StorageProvider` implementation based on `sqlx` that supports SQLite and PostgreSQL databases. +/// A `StorageProvider` implementation based on `sqlx` that supports SQLite and PostgreSQL +/// databases. #[async_trait] impl StorageProvider for SqlStorage { type EntryArgsResponse = EntryArgsResponse; From b0d012425fb695e8251b3390461378350d2306ba Mon Sep 17 00:00:00 2001 From: Andreas Dzialocha Date: Wed, 15 Jun 2022 17:56:34 +0200 Subject: [PATCH 33/33] Try to improve control flow in reduce task as well --- aquadoggo/src/materializer/tasks/reduce.rs | 170 ++++++++++++++------- 1 file changed, 112 insertions(+), 58 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 09506d292..279d8ad04 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use p2panda_rs::document::DocumentBuilder; +use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId}; +use p2panda_rs::operation::VerifiedOperation; use p2panda_rs::storage_provider::traits::OperationStore; use crate::context::Context; @@ -8,20 +9,59 @@ use crate::db::traits::DocumentStore; use crate::materializer::worker::{Task, TaskError, TaskResult}; use crate::materializer::TaskInput; -/// A reduce task is dispatched for every entry and operation pair which arrives at a node. They -/// may also be dispatched from a dependency task when a pinned relations is present on an already -/// materialised document view. +/// A reduce task is dispatched for every entry and operation pair which arrives at a node. +/// +/// They may also be dispatched from a dependency task when a pinned relations is present on an +/// already materialised document view. /// /// After succesfully reducing and storing a document view an array of dependency tasks is returned. /// If invalid inputs were passed or a fatal db error occured a critical error is returned. pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { - // Parse the task input, if they are invalid (both or neither ids provided) we critically fail - // the task at this point. If only a document_view was passed we retrieve the document_id as - // it is needed later. - let document_id = match (&input.document_id, &input.document_view_id) { + // Find out which document we are handling + let document_id = resolve_document_id(&context, &input).await?; + + // Get all operations for the requested document + let operations = context + .store + .get_operations_by_document_id(&document_id) + .await + .map_err(|_| TaskError::Critical)?; + + let document_view_id = match &input.document_view_id { + // If this task was passed a document_view_id as input then we want to build to document + // only to the requested view + Some(view_id) => reduce_document_view(&context, view_id, operations).await?, + // If no document_view_id was passed, this is a document_id reduce task. + None => reduce_document(&context, operations).await?, + }; + + // Dispatch a "dependency" task if we created a new document view + match document_view_id { + Some(view_id) => Ok(Some(vec![Task::new( + "dependency", + TaskInput::new(None, Some(view_id)), + )])), + None => Ok(None), + } +} + +/// Helper method to resolve a `DocumentId` from task input. +/// +/// If the task input is invalid (both document_id and document_view_id missing or given) we +/// critically fail the task at this point. If only a document_view_id was passed we retrieve the +/// document_id as it is needed later. +async fn resolve_document_id( + context: &Context, + input: &TaskInput, +) -> Result { + match (&input.document_id, &input.document_view_id) { + // The `DocumentId` is already given, we don't have to do anything (Some(document_id), None) => Ok(document_id.to_owned()), + + // A `DocumentViewId` is given, let's find out its document id (None, Some(document_view_id)) => { - // TODO: We can skip this step if we implement https://github.com/p2panda/aquadoggo/issues/148 + // @TODO: We can skip this step if we implement: + // https://github.com/p2panda/aquadoggo/issues/148 let operation_id = document_view_id.clone().into_iter().next().unwrap(); match context .store @@ -30,71 +70,85 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult Ok(document_id), - None => return Ok(None), + None => Err(TaskError::Critical), } } + + // None or both have been provided which smells like a bug (_, _) => Err(TaskError::Critical), - }?; + } +} - // Get all operations for the requested document. - let operations = context +/// Helper method to reduce an operation graph to a specific document view, returning the +/// `DocumentViewId` of the just created new document view. +/// +/// It returns `None` if either that document view reached "deleted" status or we don't have enough +/// operations to materialise. +async fn reduce_document_view( + context: &Context, + document_view_id: &DocumentViewId, + operations: Vec, +) -> Result, TaskError> { + let document = match DocumentBuilder::new(operations) + .build_to_view_id(Some(document_view_id.to_owned())) + { + Ok(document) => { + // If the document was deleted, then we return nothing + if document.is_deleted() { + return Ok(None); + }; + + document + } + Err(_) => { + // There is not enough operations yet to materialise this view. Maybe next time! + return Ok(None); + } + }; + + // Insert the new document view into the database + context .store - .get_operations_by_document_id(&document_id) + .insert_document_view(document.view().unwrap(), document.schema()) .await .map_err(|_| TaskError::Critical)?; - let document_view_id = match &input.document_view_id { - // If this task was passed a document_view_id as input then we want to build to document only to the - // requested view. - Some(document_view_id) => { - let document = match DocumentBuilder::new(operations) - .build_to_view_id(Some(document_view_id.to_owned())) - { - Ok(document) => { - // If the document was deleted, then we return nothing. - if document.is_deleted() { - return Ok(None); - }; - document - } - Err(_) => return Ok(None), - }; + // Return the new view id to be used in the resulting dependency task + Ok(Some(document.view_id().to_owned())) +} - // Insert the document document_view into the database. +/// Helper method to reduce an operation graph to the latest document view, returning the +/// `DocumentViewId` of the just created new document view. +/// +/// It returns `None` if either that document view reached "deleted" status or we don't have enough +/// operations to materialise. +async fn reduce_document( + context: &Context, + operations: Vec, +) -> Result, TaskError> { + match DocumentBuilder::new(operations).build() { + Ok(document) => { + // Insert this document into storage. If it already existed, this will update it's + // current view context .store - .insert_document_view(document.view().unwrap(), document.schema()) + .insert_document(&document) .await .map_err(|_| TaskError::Critical)?; - // Return the view id to be used in the resulting dependency task. - document.view_id().to_owned() - } - // If no document_view_id was passed, this is a document_id reduce task. - None => match DocumentBuilder::new(operations).build() { - Ok(document) => { - // Insert this document into storage. If it already existed, this will update it's current - // view. - context - .store - .insert_document(&document) - .await - .map_err(|_| TaskError::Critical)?; - - if document.is_deleted() { - return Ok(None); - } - // Return the document_view id to be used in the resulting dependency task. - document.view_id().to_owned() + // If the document was deleted, then we return nothing + if document.is_deleted() { + return Ok(None); } - Err(_) => return Ok(None), - }, - }; - Ok(Some(vec![Task::new( - "dependency", - TaskInput::new(None, Some(document_view_id)), - )])) + // Return the new document_view id to be used in the resulting dependency task + Ok(Some(document.view_id().to_owned())) + } + Err(_) => { + // There is not enough operations yet to materialise this view. Maybe next time! + Ok(None) + } + } } #[cfg(test)]