From 4d2e6723c9159ee5f201fbb0fd5852353ac444dc Mon Sep 17 00:00:00 2001 From: Vincent Ahrend Date: Thu, 28 Jul 2022 15:29:48 +0200 Subject: [PATCH] Schema provider and task (#166) * Add schema service * Add schema task * Fix docstring * Rename to SchemaProvider and add to node context * Remove temp file helper from this branch * Remove doctest that uses private items * Rewrite schema provider without storage provider * Add comment * Reset p2panda-rs dep * Remove custom error type * Allow dead code for now * Fix url * Add test * Remove unused import * Add default impl * Init env_logger when tests start * Consistent naming and default use * Update p2panda-rs api * Add task logging * Kick off schema task * Log all the things * Don't rely on all schemas to be in the provider * Add tests * Remove unused import * Remove unused imports * Update api after merge * Consistent logging * Update changelog * Add test for new functionality in dependency task * Move to unreleased section in CHANGELOG * Minor clean ups * Bring back receiver to make cov tests work * Move serde_json back to dev dependencies * Improve error logging in worker (#194) * Add an error message to worker error types * Use Display for QueueItem and TaskInput * Fix test * Add entry to CHANGELOG.md * Use Tokio Mutex * Update docstrings * Refactor schema task tests * Fix comments * Remove unused dependencies * Handle document view being deleted while processing task * Less verbose logging from tasks The task module from which the log is coming is always displayed in the beginning of the log line as the log source. * Formatting and docstrings * Replace helper function with store call * Rewrite test using test node * Improve test fidelity * Rename module * Remove unused method * Extend tests * Update docstring * Fix branch for upstream changes * Oops Co-authored-by: Andreas Dzialocha Co-authored-by: Andreas Dzialocha Co-authored-by: Sam Andreae --- CHANGELOG.md | 2 + Cargo.lock | 5 +- aquadoggo/Cargo.toml | 1 + aquadoggo/src/context.rs | 16 +- aquadoggo/src/db/provider.rs | 114 +++++++- aquadoggo/src/db/stores/test_utils.rs | 6 +- aquadoggo/src/lib.rs | 24 ++ aquadoggo/src/manager.rs | 16 +- aquadoggo/src/materializer/input.rs | 18 ++ aquadoggo/src/materializer/service.rs | 25 +- .../src/materializer/tasks/dependency.rs | 240 ++++++++++++++--- aquadoggo/src/materializer/tasks/reduce.rs | 82 +++--- aquadoggo/src/materializer/tasks/schema.rs | 247 +++++++++++++++++- aquadoggo/src/materializer/worker.rs | 88 +++++-- aquadoggo/src/node.rs | 13 +- aquadoggo/src/replication/service.rs | 5 +- aquadoggo/src/schema/mod.rs | 5 + aquadoggo/src/schema/schema_provider.rs | 96 +++++++ 18 files changed, 886 insertions(+), 117 deletions(-) create mode 100644 aquadoggo/src/schema/mod.rs create mode 100644 aquadoggo/src/schema/schema_provider.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index f2ff27382..dccd127e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - GraphQL replication service gets and verifies new entries and inserts them into the db [#137](https://github.com/p2panda/aquadoggo/pull/137) +- Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166) ### Changed - Refactor scalars and replication API, replace `graphql-client` with `gql_client` [#184](https://github.com/p2panda/aquadoggo/pull/184) +- Give error types of worker a string for better debugging [#194](https://github.com/p2panda/aquadoggo/pull/194) - Bump `p2panda-rs` which now supports log id's starting from `0` [#207](https://github.com/p2panda/aquadoggo/pull/207) ### Fixed diff --git a/Cargo.lock b/Cargo.lock index ee35e959e..ecde301b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -108,6 +108,7 @@ dependencies = [ "axum", "bamboo-rs-core-ed25519-yasmf", "ciborium", + "ctor", "deadqueue", "directories", "env_logger", @@ -3459,9 +3460,9 @@ checksum = "5bd2fe26506023ed7b5e1e315add59d6f584c621d037f9368fea9cfb988f368c" [[package]] name = "unicode-normalization" -version = "0.1.20" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dee68f85cab8cf68dec42158baf3a79a1cdc065a8b103025965d6ccb7f6cbd" +checksum = "854cbdc4f7bc6ae19c820d44abdc3277ac3e1b2b93db20a636825d9322fb60e6" dependencies = [ "tinyvec", ] diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index fe1c5a32f..e6a7e75be 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -54,6 +54,7 @@ tower-http = { version = "^0.3.4", default-features = false, features = [ triggered = "^0.1.2" [dev-dependencies] +ctor = "^0.1.22" ciborium = "0.2.0" env_logger = "^0.9.0" hex = "0.4.3" diff --git a/aquadoggo/src/context.rs b/aquadoggo/src/context.rs index 95d7dde18..d3f3f61a8 100644 --- a/aquadoggo/src/context.rs +++ b/aquadoggo/src/context.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use crate::config::Configuration; use crate::db::provider::SqlStorage; +use crate::schema::SchemaProvider; /// Inner data shared across all services. #[derive(Debug)] @@ -14,11 +15,18 @@ pub struct Data { /// Storage provider with database connection pool. pub store: SqlStorage, + + /// Schema provider gives access to system and application schemas. + pub schema_provider: SchemaProvider, } impl Data { - pub fn new(store: SqlStorage, config: Configuration) -> Self { - Self { config, store } + pub fn new(store: SqlStorage, config: Configuration, schema_provider: SchemaProvider) -> Self { + Self { + config, + store, + schema_provider, + } } } @@ -28,8 +36,8 @@ pub struct Context(pub Arc); impl Context { /// Returns a new instance of `Context`. - pub fn new(store: SqlStorage, config: Configuration) -> Self { - Self(Arc::new(Data::new(store, config))) + pub fn new(store: SqlStorage, config: Configuration, schema_provider: SchemaProvider) -> Self { + Self(Arc::new(Data::new(store, config, schema_provider))) } } diff --git a/aquadoggo/src/db/provider.rs b/aquadoggo/src/db/provider.rs index 16af351ec..f435edd5a 100644 --- a/aquadoggo/src/db/provider.rs +++ b/aquadoggo/src/db/provider.rs @@ -1,9 +1,11 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use async_trait::async_trait; -use p2panda_rs::document::DocumentId; +use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::hash::Hash; use p2panda_rs::operation::VerifiedOperation; +use p2panda_rs::schema::SchemaId; +use p2panda_rs::storage_provider::errors::OperationStorageError; use p2panda_rs::storage_provider::traits::StorageProvider; use sqlx::query_scalar; @@ -71,3 +73,113 @@ impl StorageProvider for SqlStorage Ok(hash) } } + +impl SqlStorage { + /// Returns the schema id for a document view. + /// + /// Returns `None` if this document view is not found. + pub async fn get_schema_by_document_view( + &self, + view_id: &DocumentViewId, + ) -> StorageProviderResult> { + let result: Option = query_scalar( + " + SELECT + schema_id + FROM + document_views + WHERE + document_view_id = $1 + ", + ) + .bind(view_id.as_str()) + .fetch_optional(&self.pool) + .await + .map_err(|e| OperationStorageError::FatalStorageError(e.to_string()))?; + + // Unwrap because we expect no invalid schema ids in the db. + Ok(result.map(|id_str| id_str.parse().unwrap())) + } +} + +#[cfg(test)] +mod tests { + use std::convert::TryFrom; + use std::str::FromStr; + + use p2panda_rs::document::{DocumentView, DocumentViewFields, DocumentViewId}; + use p2panda_rs::entry::{LogId, SeqNum}; + use p2panda_rs::identity::Author; + use p2panda_rs::operation::{AsOperation, OperationId}; + use p2panda_rs::schema::SchemaId; + use p2panda_rs::storage_provider::traits::{AsStorageEntry, EntryStore}; + use p2panda_rs::test_utils::constants::SCHEMA_ID; + use p2panda_rs::test_utils::fixtures::random_document_view_id; + use rstest::rstest; + + use crate::db::stores::test_utils::{test_db, TestDatabase, TestDatabaseRunner}; + use crate::db::traits::DocumentStore; + + /// Inserts a `DocumentView` into the db and returns its view id. + async fn insert_document_view(db: &TestDatabase) -> DocumentViewId { + let author = Author::try_from(db.test_data.key_pairs[0].public_key().to_owned()).unwrap(); + let entry = db + .store + .get_entry_at_seq_num(&author, &LogId::new(0), &SeqNum::new(1).unwrap()) + .await + .unwrap() + .unwrap(); + let operation_id: OperationId = entry.hash().into(); + let document_view_id: DocumentViewId = operation_id.clone().into(); + let document_view = DocumentView::new( + &document_view_id, + &DocumentViewFields::new_from_operation_fields( + &operation_id, + &entry.operation().fields().unwrap(), + ), + ); + let result = db + .store + .insert_document_view(&document_view, &SchemaId::from_str(SCHEMA_ID).unwrap()) + .await; + + assert!(result.is_ok()); + document_view_id + } + + #[rstest] + fn test_get_schema_for_view( + #[from(test_db)] + #[with(1, 1, 1)] + runner: TestDatabaseRunner, + ) { + runner.with_db_teardown(|db: TestDatabase| async move { + let document_view_id = insert_document_view(&db).await; + let result = db + .store + .get_schema_by_document_view(&document_view_id) + .await; + + assert!(result.is_ok()); + assert_eq!(result.unwrap().unwrap().name(), "venue"); + }); + } + + #[rstest] + fn test_get_schema_for_missing_view( + random_document_view_id: DocumentViewId, + #[from(test_db)] + #[with(1, 1, 1)] + runner: TestDatabaseRunner, + ) { + runner.with_db_teardown(|db: TestDatabase| async move { + let result = db + .store + .get_schema_by_document_view(&random_document_view_id) + .await; + + assert!(result.is_ok()); + assert!(result.unwrap().is_none()); + }); + } +} diff --git a/aquadoggo/src/db/stores/test_utils.rs b/aquadoggo/src/db/stores/test_utils.rs index 1702507c8..a196ee5b3 100644 --- a/aquadoggo/src/db/stores/test_utils.rs +++ b/aquadoggo/src/db/stores/test_utils.rs @@ -340,9 +340,9 @@ pub fn with_db_manager_teardown where D: Clone + Send + Sync + 'static, diff --git a/aquadoggo/src/materializer/input.rs b/aquadoggo/src/materializer/input.rs index 78eca34af..acc63206f 100644 --- a/aquadoggo/src/materializer/input.rs +++ b/aquadoggo/src/materializer/input.rs @@ -1,5 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use std::fmt::Display; + use p2panda_rs::document::{DocumentId, DocumentViewId}; /// Input of every task worker containing all information we need to process. @@ -26,3 +28,19 @@ impl TaskInput { } } } + +impl Display for TaskInput { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let document_id = match &self.document_id { + Some(id) => format!("{}", id), + None => "-".to_string(), + }; + + let view_id = match &self.document_view_id { + Some(view_id) => format!("{}", view_id), + None => "-".to_string(), + }; + + write!(f, "", document_id, view_id) + } +} diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index dc86167b3..e6299029b 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -151,6 +151,7 @@ mod tests { use crate::db::stores::test_utils::{send_to_store, test_db, TestDatabase, TestDatabaseRunner}; use crate::db::traits::DocumentStore; use crate::materializer::{Task, TaskInput}; + use crate::schema::SchemaProvider; use crate::Configuration; use super::materializer_service; @@ -183,7 +184,11 @@ mod tests { .is_none()); // Prepare arguments for service - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let shutdown = task::spawn(async { loop { // Do this forever .. this means that the shutdown handler will never resolve @@ -252,7 +257,11 @@ mod tests { .unwrap(); // Prepare arguments for service - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let shutdown = task::spawn(async { loop { // Do this forever .. this means that the shutdown handler will never resolve @@ -312,7 +321,11 @@ mod tests { .to_owned(); // Prepare arguments for service - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let shutdown = task::spawn(async { loop { // Do this forever .. this means that the shutdown handler will never resolve @@ -434,7 +447,11 @@ mod tests { // Prepare empty database runner.with_db_teardown(|db: TestDatabase| async move { // Prepare arguments for service - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let shutdown = task::spawn(async { loop { // Do this forever .. this means that the shutdown handler will never resolve diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 44c7adc8f..c50161f20 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -2,6 +2,7 @@ use log::debug; use p2panda_rs::document::DocumentViewId; +use p2panda_rs::schema::SchemaId; use crate::context::Context; use crate::db::traits::DocumentStore; @@ -20,22 +21,20 @@ use crate::materializer::TaskInput; /// Expects a _reduce_ task to have completed successfully for the given document view itself and /// returns a critical error otherwise. pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { - debug!("Working on dependency task {:#?}", input); + debug!("Working on {}", input); // Here we retrive the document view by document view id. - let document_view = match input.document_view_id { + let document_view = match &input.document_view_id { Some(view_id) => context .store - .get_document_view_by_id(&view_id) + .get_document_view_by_id(view_id) .await .map_err(|err| { - debug!("Fatal error getting document view from storage"); - debug!("{}", err); - TaskError::Critical + TaskError::Critical(err.to_string()) }) , // We expect to handle document_view_ids in a dependency task. - None => Err(TaskError::Critical), + None => Err(TaskError::Critical("Missing document_view_id in task input".into())), }?; let document_view = match document_view { @@ -50,10 +49,10 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { - debug!("Expected document view not found in the store."); - Err(TaskError::Critical) - } + None => Err(TaskError::Critical(format!( + "Expected document view {} not found in store", + &input.document_view_id.unwrap() + ))), }?; let mut next_tasks = Vec::new(); @@ -62,14 +61,14 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult { + p2panda_rs::operation::OperationValue::Relation(_) => { // This is a relation to a document, if it doesn't exist in the db yet, then that // means we either have no entries for this document, or we are not materialising // it for some reason. We don't want to kick of a "reduce" or "dependency" task in // either of these cases. debug!("Relation field found, no action required."); } - p2panda_rs::operation::OperationValue::RelationList(_relation_list) => { + p2panda_rs::operation::OperationValue::RelationList(_) => { // same as above... debug!("Relation list field found, no action required."); } @@ -101,11 +100,45 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult> = next_tasks.into_iter().flatten().collect(); + // Construct additional tasks if the task input matches certain system schemas and all + // dependencies have been reduced. + let all_dependencies_met = !next_tasks.iter().any(|task| task.is_some()); + if all_dependencies_met { + let task_input_schema = context + .store + .get_schema_by_document_view(document_view.id()) + .await + .map_err(|err| TaskError::Critical(err.to_string()))? + .ok_or_else(|| { + TaskError::Failure(format!( + "{} was deleted while processing task", + document_view + )) + })?; + + // Helper that returns a schema task for the current task input. + let schema_task = || { + Some(Task::new( + "schema", + TaskInput::new(None, Some(document_view.id().clone())), + )) + }; + + match task_input_schema { + // Start `schema` task when a schema (field) definition view is completed with + // dependencies + SchemaId::SchemaDefinition(_) => next_tasks.push(schema_task()), + SchemaId::SchemaFieldDefinition(_) => next_tasks.push(schema_task()), + _ => {} + } + } - debug!("Dispatching {} reduce task(s)", next_tasks.len()); + debug!( + "Scheduling {} reduce tasks", + next_tasks.iter().filter(|t| t.is_some()).count() + ); - Ok(Some(next_tasks)) + Ok(Some(next_tasks.into_iter().flatten().collect())) } /// Returns a _reduce_ task for a given document view only if that view does not yet exist in the @@ -119,11 +152,8 @@ async fn construct_relation_task( .store .get_document_view_by_id(&document_view_id) .await - .map_err(|err| { - debug!("Fatal error getting document view from storage"); - debug!("{}", err); - TaskError::Critical - })? { + .map_err(|err| TaskError::Critical(err.to_string()))? + { Some(_) => { debug!("View found for pinned relation: {}", document_view_id); Ok(None) @@ -140,27 +170,30 @@ async fn construct_relation_task( #[cfg(test)] mod tests { + use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::identity::KeyPair; use p2panda_rs::operation::{ - AsVerifiedOperation, OperationValue, PinnedRelation, PinnedRelationList, Relation, - RelationList, + AsVerifiedOperation, Operation, OperationValue, PinnedRelation, PinnedRelationList, + Relation, RelationList, }; + use p2panda_rs::schema::{FieldType, SchemaId}; use p2panda_rs::storage_provider::traits::OperationStore; use p2panda_rs::test_utils::constants::SCHEMA_ID; use p2panda_rs::test_utils::fixtures::{ - create_operation, random_document_id, random_document_view_id, + create_operation, operation_fields, random_document_id, random_document_view_id, }; use rstest::rstest; use crate::config::Configuration; use crate::context::Context; use crate::db::stores::test_utils::{ - insert_entry_operation_and_view, test_db, TestDatabase, TestDatabaseRunner, + insert_entry_operation_and_view, send_to_store, test_db, TestDatabase, TestDatabaseRunner, }; use crate::db::traits::DocumentStore; use crate::materializer::tasks::reduce_task; use crate::materializer::TaskInput; + use crate::schema::SchemaProvider; use super::dependency_task; @@ -274,7 +307,11 @@ mod tests { #[case] expected_next_tasks: usize, ) { runner.with_db_teardown(move |db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); for document_id in &db.test_data.documents { let input = TaskInput::new(Some(document_id.clone()), None); @@ -310,7 +347,11 @@ mod tests { runner: TestDatabaseRunner, ) { runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let document_id = db.test_data.documents[0].clone(); let input = TaskInput::new(Some(document_id.clone()), None); @@ -370,9 +411,12 @@ mod tests { #[from(test_db)] runner: TestDatabaseRunner, ) { runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store, + Configuration::default(), + SchemaProvider::default(), + ); let input = TaskInput::new(document_id, document_view_id); - let next_tasks = dependency_task(context.clone(), input).await; assert!(next_tasks.is_err()) }); @@ -413,7 +457,11 @@ mod tests { )] fn fails_on_deleted_documents(#[case] runner: TestDatabaseRunner) { runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let document_id = db.test_data.documents[0].clone(); let input = TaskInput::new(Some(document_id.clone()), None); @@ -435,4 +483,136 @@ mod tests { assert!(result.is_err()) }); } + + #[rstest] + fn dispatches_schema_tasks_for_field_definitions( + #[from(test_db)] + #[with(1, 1, 1, false, SchemaId::SchemaFieldDefinition(1), vec![ + ("name", OperationValue::Text("field_name".to_string())), + ("type", FieldType::String.into()), + ])] + runner: TestDatabaseRunner, + ) { + runner.with_db_teardown(|db: TestDatabase| async move { + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); + + // The document id for a schema_field_definition who's operation already exists in the + // store. + let document_id = db.test_data.documents.first().unwrap(); + // Materialise the schema field definition. + let input = TaskInput::new(Some(document_id.to_owned()), None); + reduce_task(context.clone(), input.clone()).await.unwrap(); + + // Parse the document_id into a document_view_id. + let document_view_id = document_id.as_str().parse().unwrap(); + // Dispatch a dependency task for this document_view_id. + let input = TaskInput::new(None, Some(document_view_id)); + let tasks = dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + + // Inserting a schema field definition, we expect a schema task because schema field + // definitions have no dependencies - every new completed field definition could be the + // last puzzle piece for a new schema. + let schema_tasks = tasks.iter().filter(|t| t.worker_name() == "schema").count(); + + assert_eq!(schema_tasks, 1); + }); + } + + #[rstest] + #[case::schema_definition_with_dependencies_met_dispatches_one_schema_task( + Operation::new_create( + SchemaId::SchemaDefinition(1), + operation_fields(vec![ + ("name", OperationValue::Text("schema_name".to_string())), + ( + "description", + OperationValue::Text("description".to_string()), + ), + ( + "fields", + OperationValue::PinnedRelationList(PinnedRelationList::new(vec![ + // The view_id for a schema field which exists in the database. Can be + // recreated from variable `schema_field_document_id` in the task below. + "0020d9cae33aed5742e06a24801195999e105e73081c474d8b25e988787c2ada025f" + .parse().unwrap(), + ])), + ), + ]), + ).unwrap(), + 1 + )] + #[case::schema_definition_without_dependencies_met_dispatches_zero_schema_task( + Operation::new_create( + SchemaId::SchemaDefinition(1), + operation_fields(vec![ + ("name", OperationValue::Text("schema_name".to_string())), + ( + "description", + OperationValue::Text("description".to_string()), + ), + ( + "fields", + OperationValue::PinnedRelationList(PinnedRelationList::new(vec![ + // This schema field does not exist in the database + random_document_view_id(), + ])), + ), + ]), + ).unwrap(), + 0 + )] + fn dispatches_schema_tasks_for_schema_definitions( + #[case] schema_create_operation: Operation, + #[case] expected_schema_tasks: usize, + #[from(test_db)] + #[with(1, 1, 1, false, SchemaId::SchemaFieldDefinition(1), vec![ + ("name", OperationValue::Text("field_name".to_string())), + ("type", FieldType::String.into()), + ])] + runner: TestDatabaseRunner, + ) { + runner.with_db_teardown(move |db: TestDatabase| async move { + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); + + // The document id for the schema_field_definition who's operation already exists in + // the store. + let schema_field_document_id = db.test_data.documents.first().unwrap(); + + println!("THIS {}", schema_field_document_id.as_str()); + + // Materialise the schema field definition. + let input = TaskInput::new(Some(schema_field_document_id.to_owned()), None); + reduce_task(context.clone(), input.clone()).await.unwrap(); + + // Persist a schema definition entry and operation to the store. + let (entry_signed, _) = + send_to_store(&db.store, &schema_create_operation, None, &KeyPair::new()).await; + + // Materialise the schema definition. + let document_view_id: DocumentViewId = entry_signed.hash().into(); + let input = TaskInput::new(None, Some(document_view_id.clone())); + reduce_task(context.clone(), input.clone()).await.unwrap(); + + // Dispatch a dependency task for the schema definition. + let input = TaskInput::new(None, Some(document_view_id)); + let tasks = dependency_task(context.clone(), input) + .await + .unwrap() + .unwrap(); + + let schema_tasks = tasks.iter().filter(|t| t.worker_name() == "schema").count(); + assert_eq!(schema_tasks, expected_schema_tasks); + }); + } } diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 65d746c85..9672932ea 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -1,6 +1,6 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use log::debug; +use log::{debug, info}; use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId}; use p2panda_rs::operation::VerifiedOperation; use p2panda_rs::storage_provider::traits::OperationStore; @@ -18,7 +18,7 @@ use crate::materializer::TaskInput; /// After succesfully reducing and storing a document view an array of dependency tasks is returned. /// If invalid inputs were passed or a fatal db error occured a critical error is returned. pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult { - debug!("Working on reduce task {:#?}", input); + debug!("Working on {}", input); // Find out which document we are handling let document_id = match resolve_document_id(&context, &input).await? { @@ -34,10 +34,7 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult TaskResult { debug!("Dispatch dependency task for view with id: {}", view_id); + Ok(Some(vec![Task::new( "dependency", TaskInput::new(None, Some(view_id)), @@ -81,19 +79,17 @@ async fn resolve_document_id( // @TODO: We can skip this step if we implement: // https://github.com/p2panda/aquadoggo/issues/148 debug!("Find document for view with id: {}", document_view_id); + let operation_id = document_view_id.clone().into_iter().next().unwrap(); + context .store .get_document_by_operation_id(&operation_id) .await - .map_err(|err| { - debug!("Fatal error getting document_id from storage"); - debug!("{}", err); - TaskError::Critical - }) + .map_err(|err| TaskError::Critical(err.to_string())) } // None or both have been provided which smells like a bug - (_, _) => Err(TaskError::Critical), + (_, _) => Err(TaskError::Critical("Invalid task input".into())), } } @@ -116,6 +112,7 @@ async fn reduce_document_view( "Document materialized to view with id: {}", document_view_id ); + if document.is_deleted() { return Ok(None); }; @@ -139,15 +136,9 @@ async fn reduce_document_view( .store .insert_document_view(document.view().unwrap(), document.schema()) .await - .map_err(|err| { - debug!( - "Failed to insert document view into database: {}", - document_view_id - ); - debug!("{}", err); + .map_err(|err| TaskError::Critical(err.to_string()))?; - TaskError::Critical - })?; + info!("Stored {} view {}", document, document.view_id()); // Return the new view id to be used in the resulting dependency task Ok(Some(document.view_id().to_owned())) @@ -170,17 +161,15 @@ async fn reduce_document( .store .insert_document(&document) .await - .map_err(|err| { - debug!("Failed to insert document into database: {}", document.id()); - debug!("{}", err); - TaskError::Critical - })?; + .map_err(|err| TaskError::Critical(err.to_string()))?; // If the document was deleted, then we return nothing if document.is_deleted() { return Ok(None); } + info!("Stored {} view {}", document, document.view_id()); + // Return the new document_view id to be used in the resulting dependency task Ok(Some(document.view_id().to_owned())) } @@ -210,6 +199,7 @@ mod tests { use crate::db::traits::DocumentStore; use crate::materializer::tasks::reduce_task; use crate::materializer::TaskInput; + use crate::schema::SchemaProvider; #[rstest] fn reduces_documents( @@ -226,7 +216,11 @@ mod tests { runner: TestDatabaseRunner, ) { runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store, + Configuration::default(), + SchemaProvider::default(), + ); for document_id in &db.test_data.documents { let input = TaskInput::new(Some(document_id.clone()), None); @@ -254,7 +248,11 @@ mod tests { let document_id = db.test_data.documents.first().unwrap(); let key_pair = db.test_data.key_pairs.first().unwrap(); - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let input = TaskInput::new(Some(document_id.clone()), None); // There is one CREATE operation for this document in the db, it should create a document @@ -313,7 +311,11 @@ mod tests { .clone() .into(); - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let input = TaskInput::new(None, Some(document_view_id.clone())); assert!(reduce_task(context.clone(), input).await.is_ok()); @@ -354,7 +356,11 @@ mod tests { runner: TestDatabaseRunner, ) { runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); for document_id in &db.test_data.documents { let input = TaskInput::new(Some(document_id.clone()), None); @@ -397,7 +403,11 @@ mod tests { #[case] is_next_task: bool, ) { runner.with_db_teardown(move |db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); let document_id = db.test_data.documents[0].clone(); let input = TaskInput::new(Some(document_id.clone()), None); @@ -415,7 +425,11 @@ mod tests { #[from(test_db)] runner: TestDatabaseRunner, ) { runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store, + Configuration::default(), + SchemaProvider::default(), + ); let input = TaskInput::new(document_id, document_view_id); assert!(reduce_task(context.clone(), input).await.is_err()); @@ -432,7 +446,11 @@ mod tests { ) { // Prepare empty database. runner.with_db_teardown(|db: TestDatabase| async move { - let context = Context::new(db.store.clone(), Configuration::default()); + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); // Dispatch a reduce task for a document which doesn't exist by it's document id. let input = TaskInput::new(Some(document_id), None); diff --git a/aquadoggo/src/materializer/tasks/schema.rs b/aquadoggo/src/materializer/tasks/schema.rs index d4a35e4e2..fd4b5aecb 100644 --- a/aquadoggo/src/materializer/tasks/schema.rs +++ b/aquadoggo/src/materializer/tasks/schema.rs @@ -1,9 +1,250 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use log::debug; +use p2panda_rs::document::DocumentViewId; +use p2panda_rs::operation::OperationValue; +use p2panda_rs::schema::SchemaId; + use crate::context::Context; -use crate::materializer::worker::TaskResult; +use crate::db::traits::{DocumentStore, SchemaStore}; +use crate::materializer::worker::{TaskError, TaskResult}; use crate::materializer::TaskInput; -pub async fn schema_task(_context: Context, _input: TaskInput) -> TaskResult { - todo!() +/// A schema task assembles and stores schemas from their views. +/// +/// Schema tasks are dispatched whenever a schema definition or schema field definition document +/// has all its immediate dependencies available in the store. It collects all required views for +/// the schema, instantiates it and adds it to the schema provider. +pub async fn schema_task(context: Context, input: TaskInput) -> TaskResult { + debug!("Working on {}", input); + + let input_view_id = match (input.document_id, input.document_view_id) { + (None, Some(view_id)) => Ok(view_id), + // The task input must contain only a view id. + (_, _) => Err(TaskError::Critical("Invalid task input".into())), + }?; + + // Determine the schema of the updated view id. + let schema = context + .store + .get_schema_by_document_view(&input_view_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))? + .unwrap(); + + let updated_schema_definitions: Vec = match schema { + // This task is about an updated schema definition document so we only handle that. + SchemaId::SchemaDefinition(_) => Ok(vec![input_view_id.clone()]), + + // This task is about an updated schema field definition document that may be used by + // multiple schema definition documents so we must handle all of those. + SchemaId::SchemaFieldDefinition(_) => { + get_related_schema_definitions(&input_view_id, &context).await + } + _ => Err(TaskError::Critical(format!( + "Unknown system schema id: {}", + schema + ))), + }?; + + // The related schema definitions are not known yet to this node so we mark this task failed. + if updated_schema_definitions.is_empty() { + return Err(TaskError::Failure( + "Related schema definition not given (yet)".into(), + )); + } + + for view_id in updated_schema_definitions.iter() { + match context + .store + .get_schema_by_id(view_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))? + { + // Updated schema was assembled successfully and is now passed to schema provider. + Some(schema) => { + context.schema_provider.update(schema.clone()).await; + } + // This schema was not ready to be assembled after all so it is ignored. + None => { + debug!("Not yet ready to build schema for {}", view_id) + } + }; + } + + Ok(None) +} + +/// Retrieve schema definitions that use the targeted schema field definition as one of their +/// fields. +async fn get_related_schema_definitions( + target_field_definition: &DocumentViewId, + context: &Context, +) -> Result, TaskError> { + // Retrieve all schema definition documents from the store + let schema_definitions = context + .store + .get_documents_by_schema(&SchemaId::SchemaDefinition(1)) + .await + .map_err(|err| TaskError::Critical(err.to_string())) + .unwrap(); + + // Collect all schema definitions that use the targeted field definition + let mut related_schema_definitions = vec![]; + for schema in schema_definitions { + let fields_value = schema.fields().get("fields").unwrap().value(); + + if let OperationValue::PinnedRelationList(fields) = fields_value { + if fields + .iter() + .any(|field_view_id| &field_view_id == target_field_definition) + { + related_schema_definitions.push(schema.id().clone()) + } else { + continue; + } + } else { + // Abort if there are schema definitions in the store that don't match the schema + // definition schema. + Err(TaskError::Critical( + "Schema definition operation does not have a 'fields' operation field".into(), + ))? + } + } + + Ok(related_schema_definitions) +} + +#[cfg(test)] +mod tests { + use log::debug; + use p2panda_rs::document::{DocumentId, DocumentViewId}; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::operation::{Operation, OperationValue, PinnedRelationList}; + use p2panda_rs::schema::{FieldType, SchemaId}; + use p2panda_rs::test_utils::fixtures::operation_fields; + use rstest::rstest; + + use crate::context::Context; + use crate::db::stores::test_utils::{send_to_store, test_db, TestDatabase, TestDatabaseRunner}; + use crate::db::traits::DocumentStore; + use crate::materializer::tasks::reduce_task; + use crate::materializer::TaskInput; + use crate::schema::SchemaProvider; + use crate::Configuration; + + use super::schema_task; + + /// Insert a test schema definition and schema field definition and run reduce tasks for both. + async fn create_schema_documents( + context: &Context, + db: &TestDatabase, + ) -> (DocumentViewId, DocumentViewId) { + // Create field definition + let create_field_definition = Operation::new_create( + SchemaId::SchemaFieldDefinition(1), + operation_fields(vec![ + ("name", OperationValue::Text("field_name".to_string())), + ("type", FieldType::String.into()), + ]), + ) + .unwrap(); + + let (entry_signed, _) = + send_to_store(&db.store, &create_field_definition, None, &KeyPair::new()).await; + let field_definition_id: DocumentId = entry_signed.hash().into(); + + let input = TaskInput::new(Some(field_definition_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap(); + let field_view_id = db + .store + .get_document_by_id(&field_definition_id) + .await + .unwrap() + .unwrap() + .id() + .to_owned(); + debug!("Created field definition {}", &field_view_id); + + // Create schema definition + let create_schema_definition = Operation::new_create( + SchemaId::SchemaDefinition(1), + operation_fields(vec![ + ("name", OperationValue::Text("schema_name".to_string())), + ( + "description", + OperationValue::Text("description".to_string()), + ), + ( + "fields", + OperationValue::PinnedRelationList(PinnedRelationList::new(vec![ + field_view_id.clone(), + ])), + ), + ]), + ) + .unwrap(); + + let (entry_signed, _) = + send_to_store(&db.store, &create_schema_definition, None, &KeyPair::new()).await; + let schema_definition_id: DocumentId = entry_signed.hash().into(); + + let input = TaskInput::new(Some(schema_definition_id.clone()), None); + reduce_task(context.clone(), input).await.unwrap(); + let definition_view_id = db + .store + .get_document_by_id(&schema_definition_id) + .await + .unwrap() + .unwrap() + .id() + .to_owned(); + debug!("Created schema definition {}", definition_view_id); + + return (definition_view_id, field_view_id); + } + + #[rstest] + fn assembles_schema( + #[from(test_db)] + #[with(1, 1)] + runner: TestDatabaseRunner, + ) { + runner.with_db_teardown(|db: TestDatabase| async move { + let context = Context::new( + db.store.clone(), + Configuration::default(), + SchemaProvider::default(), + ); + + // Prepare schema definition and schema field definition + let (definition_view_id, field_view_id) = create_schema_documents(&context, &db).await; + + // Start a task with each as input + let input = TaskInput::new(None, Some(definition_view_id.clone())); + assert!(schema_task(context.clone(), input).await.is_ok()); + + let input = TaskInput::new(None, Some(field_view_id)); + assert!(schema_task(context.clone(), input).await.is_ok()); + + // The new schema should be available on storage provider. + let schema = context + .schema_provider + .get(&SchemaId::Application( + "schema_name".to_string(), + definition_view_id.clone(), + )) + .await; + assert!(schema.is_some()); + assert_eq!( + schema + .unwrap() + .fields() + .get("field_name") + .unwrap() + .to_owned(), + FieldType::String + ); + }); + } } diff --git a/aquadoggo/src/materializer/worker.rs b/aquadoggo/src/materializer/worker.rs index 55b0297ad..f2fe461d0 100644 --- a/aquadoggo/src/materializer/worker.rs +++ b/aquadoggo/src/materializer/worker.rs @@ -76,14 +76,14 @@ //! Task 1 results in "25", Task 2 in "64", Task 4 in "9". //! ``` use std::collections::{HashMap, HashSet}; -use std::fmt::Debug; +use std::fmt::{Debug, Display}; use std::future::Future; use std::hash::Hash; use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, Mutex}; use deadqueue::unlimited::Queue; -use log::{error, info}; +use log::{debug, error, info}; use tokio::sync::broadcast::error::RecvError; use tokio::sync::broadcast::{channel, Receiver, Sender}; use tokio::task; @@ -120,11 +120,10 @@ pub type TaskResult = Result>>, TaskError>; #[derive(Debug)] pub enum TaskError { /// This tasks failed critically and will cause the whole program to panic. - Critical, + Critical(String), /// This task failed silently without any further effects. - #[allow(dead_code)] - Failure, + Failure(String), } /// Enum representing status of a task. @@ -144,7 +143,7 @@ pub type WorkerName = String; /// this registered work and an index of all current inputs in the task queue. struct WorkerManager where - IN: Send + Sync + Clone + Hash + Eq + 'static, + IN: Send + Sync + Clone + Hash + Eq + Display + 'static, { /// Index of all current inputs inside the task queue organized in a hash set. /// @@ -158,7 +157,7 @@ where impl WorkerManager where - IN: Send + Sync + Clone + Hash + Eq + 'static, + IN: Send + Sync + Clone + Hash + Eq + Display + 'static, { /// Returns a new worker manager. pub fn new() -> Self { @@ -210,7 +209,7 @@ where #[derive(Debug)] pub struct QueueItem where - IN: Send + Sync + Clone + 'static, + IN: Send + Sync + Clone + Display + 'static, { /// Unique task identifier. id: u64, @@ -219,9 +218,18 @@ where input: IN, } +impl Display for QueueItem +where + IN: Send + Sync + Clone + Display + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "", self.id, self.input) + } +} + impl QueueItem where - IN: Send + Sync + Clone + 'static, + IN: Send + Sync + Clone + Display + 'static, { /// Returns a new queue item. pub fn new(id: u64, input: IN) -> Self { @@ -243,7 +251,7 @@ where /// This factory serves as a main entry interface to dispatch, schedule and process tasks. pub struct Factory where - IN: Send + Sync + Clone + Hash + Eq + Debug + 'static, + IN: Send + Sync + Clone + Hash + Eq + Debug + Display + 'static, D: Send + Sync + Clone + 'static, { /// Shared context between all tasks. @@ -269,7 +277,7 @@ where impl Factory where - IN: Send + Sync + Clone + Hash + Eq + Debug + 'static, + IN: Send + Sync + Clone + Hash + Eq + Debug + Display + 'static, D: Send + Sync + Clone + 'static, { /// Initialises a new factory. @@ -478,7 +486,11 @@ where index.remove(&item.input()); } Err(err) => { - error!("Error while locking input index: {}", err); + error!( + "Error while locking input index in worker {} for task {:?}: {}", + name, item, err + ); + error_signal.trigger(); } } @@ -499,13 +511,20 @@ where } } } - Err(TaskError::Critical) => { + Err(TaskError::Critical(err)) => { // Something really horrible happened, we need to crash! - error!("Critical error in task {:?}", item); + error!( + "Critical error in worker {} with task {}: {}", + name, item, err + ); + error_signal.trigger(); } - Err(TaskError::Failure) => { - // Silently fail .. maybe write something to the log or retry? + Err(TaskError::Failure(err)) => { + debug!( + "Silently failing worker {} with task {}: {}", + name, item, err + ); } _ => (), // Task succeeded, but nothing to dispatch } @@ -518,6 +537,7 @@ where #[cfg(test)] mod tests { use std::collections::HashMap; + use std::fmt::Display; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -539,14 +559,18 @@ mod tests { // Define two workers async fn first(database: Data, input: Input) -> TaskResult { - let mut db = database.lock().map_err(|_| TaskError::Critical)?; + let mut db = database + .lock() + .map_err(|err| TaskError::Critical(err.to_string()))?; db.push(format!("first-{}", input)); Ok(None) } // .. the second worker dispatches a task for "first" at the end async fn second(database: Data, input: Input) -> TaskResult { - let mut db = database.lock().map_err(|_| TaskError::Critical)?; + let mut db = database + .lock() + .map_err(|err| TaskError::Critical(err.to_string()))?; db.push(format!("second-{}", input)); Ok(Some(vec![Task::new("first", input)])) } @@ -630,6 +654,12 @@ mod tests { relations: Vec, } + impl Display for JigsawPiece { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{} {:?}", self.id, self.relations) + } + } + // This is a whole puzzle, which is simply a list of puzzle pieces. It has a "complete" // flag, which turns true as soon as we finished the puzzle! #[derive(Hash, Clone, Debug)] @@ -656,7 +686,9 @@ mod tests { // This tasks "picks" a single piece out of the box and sorts it into the database async fn pick(database: Data, input: JigsawPiece) -> TaskResult { - let mut db = database.lock().map_err(|_| TaskError::Critical)?; + let mut db = database + .lock() + .map_err(|err| TaskError::Critical(err.to_string()))?; // 1. Take incoming puzzle piece from box and move it into the database first db.pieces.insert(input.id, input.clone()); @@ -677,7 +709,9 @@ mod tests { // This task finds fitting pieces and tries to combine them to a puzzle async fn find(database: Data, input: JigsawPiece) -> TaskResult { - let mut db = database.lock().map_err(|_| TaskError::Critical)?; + let mut db = database + .lock() + .map_err(|err| TaskError::Critical(err.to_string()))?; // 1. Merge all known and related pieces into one large list let mut ids: Vec = Vec::new(); @@ -759,7 +793,9 @@ mod tests { // This task checks if a puzzle was completed async fn finish(database: Data, input: JigsawPiece) -> TaskResult { - let mut db = database.lock().map_err(|_| TaskError::Critical)?; + let mut db = database + .lock() + .map_err(|err| TaskError::Critical(err.to_string()))?; // 1. Identify unfinished puzzle related to this piece let puzzle: Option = db @@ -770,15 +806,19 @@ mod tests { // 2. Check if all piece dependencies are met match puzzle { - None => Err(TaskError::Failure), + None => Err(TaskError::Failure("No puzzle given".into())), Some(mut puzzle) => { for piece_id in &puzzle.piece_ids { match db.pieces.get(piece_id) { - None => return Err(TaskError::Failure), + None => { + return Err(TaskError::Failure("Dependencies are not met".into())) + } Some(piece) => { for relation_piece_id in &piece.relations { if !puzzle.piece_ids.contains(relation_piece_id) { - return Err(TaskError::Failure); + return Err(TaskError::Failure( + "Dependencies are not met".into(), + )); } } } diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index 14da42656..d00b69461 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -6,11 +6,13 @@ use crate::bus::ServiceMessage; use crate::config::Configuration; use crate::context::Context; use crate::db::provider::SqlStorage; +use crate::db::traits::SchemaStore; use crate::db::{connection_pool, create_database, run_pending_migrations, Pool}; use crate::http::http_service; use crate::manager::ServiceManager; use crate::materializer::materializer_service; use crate::replication::replication_service; +use crate::schema::SchemaProvider; /// Makes sure database is created and migrated before returning connection pool. async fn initialize_db(config: &Configuration) -> Result { @@ -44,19 +46,20 @@ impl Node { /// Start p2panda node with your configuration. This method can be used to run the node within /// other applications. pub async fn start(config: Configuration) -> Self { - // Initialize database and get connection pool + // Initialize database and get connection pool. let pool = initialize_db(&config) .await .expect("Could not initialize database"); - // Prepare storage provider using connection pool + // Prepare storage and schema providers using connection pool. let store = SqlStorage::new(pool.clone()); + let schemas = SchemaProvider::new(store.get_all_schema().await.unwrap()); - // Create service manager with shared data between services - let context = Context::new(store, config.clone()); + // Create service manager with shared data between services. + let context = Context::new(store, config, schemas); let mut manager = ServiceManager::::new(1024, context); - // Start materializer service + // Start materializer service. manager.add("materializer", materializer_service); // Start replication service diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs index 6cef5f34f..f1885989b 100644 --- a/aquadoggo/src/replication/service.rs +++ b/aquadoggo/src/replication/service.rs @@ -284,6 +284,7 @@ mod tests { }; use crate::http::http_service; use crate::replication::ReplicationConfiguration; + use crate::schema::SchemaProvider; use crate::test_helpers::shutdown_handle; use crate::Configuration; @@ -320,6 +321,7 @@ mod tests { http_port: 3022, ..Configuration::default() }, + SchemaProvider::default(), ); let http_server_billie = task::spawn(async { @@ -353,7 +355,8 @@ mod tests { ..Configuration::default() }; let ada_db = db_manager.create("sqlite::memory:").await; - let context_ada = Context::new(ada_db.store.clone(), config_ada); + let context_ada = + Context::new(ada_db.store.clone(), config_ada, SchemaProvider::default()); let tx_ada = tx.clone(); let shutdown_ada = shutdown_handle(); diff --git a/aquadoggo/src/schema/mod.rs b/aquadoggo/src/schema/mod.rs new file mode 100644 index 000000000..4b7ccd75d --- /dev/null +++ b/aquadoggo/src/schema/mod.rs @@ -0,0 +1,5 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +mod schema_provider; + +pub use schema_provider::SchemaProvider; diff --git a/aquadoggo/src/schema/schema_provider.rs b/aquadoggo/src/schema/schema_provider.rs new file mode 100644 index 000000000..8742d86c2 --- /dev/null +++ b/aquadoggo/src/schema/schema_provider.rs @@ -0,0 +1,96 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::HashMap; +use std::sync::Arc; + +use log::info; +use p2panda_rs::schema::{Schema, SchemaId, SYSTEM_SCHEMAS}; +use tokio::sync::Mutex; + +/// Provides fast thread-safe access to system and application schemas. +/// +/// Application schemas can be added and updated. +#[derive(Clone, Debug)] +pub struct SchemaProvider(Arc>>); + +// Dead code allowed until this is used for https://github.com/p2panda/aquadoggo/pull/141 +#[allow(dead_code)] +impl SchemaProvider { + /// Returns a `SchemaProvider` containing the given application schemas and all system schemas. + pub fn new(application_schemas: Vec) -> Self { + // Collect all system and application schemas. + let mut schemas = SYSTEM_SCHEMAS.clone(); + schemas.extend(&application_schemas); + + // Build hash map from schemas for fast lookup. + let mut index = HashMap::new(); + for schema in schemas { + index.insert(schema.id().to_owned(), schema.to_owned()); + } + Self(Arc::new(Mutex::new(index))) + } + + /// Retrieve a schema that may be a system or application schema by its schema id. + pub async fn get(&self, schema_id: &SchemaId) -> Option { + self.0.lock().await.get(schema_id).cloned() + } + + /// Returns all system and application schemas. + pub async fn all(&self) -> Vec { + self.0.lock().await.values().cloned().collect() + } + + /// Inserts or updates the given schema in this provider. + /// + /// Returns `true` if a schema was updated and `false` if it was inserted. + pub async fn update(&self, schema: Schema) -> bool { + info!("Updating {}", schema); + let mut schemas = self.0.lock().await; + schemas.insert(schema.id().clone(), schema).is_some() + } +} + +impl Default for SchemaProvider { + fn default() -> Self { + Self::new(Vec::new()) + } +} + +#[cfg(test)] +mod test { + use p2panda_rs::schema::FieldType; + use p2panda_rs::test_utils::fixtures::random_document_view_id; + + use super::*; + + #[tokio::test] + async fn get_all_schemas() { + let provider = SchemaProvider::default(); + let result = provider.all().await; + assert_eq!(result.len(), 2); + } + + #[tokio::test] + async fn get_single_schema() { + let provider = SchemaProvider::default(); + let schema_definition_schema = provider.get(&SchemaId::SchemaDefinition(1)).await; + assert!(schema_definition_schema.is_some()); + } + + #[tokio::test] + async fn update_schemas() { + let provider = SchemaProvider::default(); + let new_schema_id = + SchemaId::Application("test_schema".to_string(), random_document_view_id()); + let new_schema = Schema::new( + &new_schema_id, + "description", + vec![("test_field", FieldType::String)], + ) + .unwrap(); + let is_update = provider.update(new_schema).await; + assert!(!is_update); + + assert!(provider.get(&new_schema_id).await.is_some()); + } +}