From 2d77668102583f611bf01134f88905a70c317170 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 7 Aug 2023 18:06:29 +0100 Subject: [PATCH 01/14] Fix: only over-write `http_port` when cli arg is passed (#489) * Only over-ride http_port when cli arg is passed * Update CHANGELOG --- CHANGELOG.md | 1 + aquadoggo_cli/src/main.rs | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 820e259e1..cf102c13a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -83,6 +83,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Do nothing on log insertion conflict [#468](https://github.com/p2panda/aquadoggo/pull/468) - Don't update or announce an update in schema provider if a schema with this id exists already [#472](https://github.com/p2panda/aquadoggo/pull/472) - Do nothing on document_view insertion conflicts [#474](https://github.com/p2panda/aquadoggo/pull/474) +- Only over-write `http_port` when cli arg is passed [#489](https://github.com/p2panda/aquadoggo/pull/489) ### Open Sauce diff --git a/aquadoggo_cli/src/main.rs b/aquadoggo_cli/src/main.rs index 273865601..077b082d8 100644 --- a/aquadoggo_cli/src/main.rs +++ b/aquadoggo_cli/src/main.rs @@ -147,7 +147,9 @@ impl TryFrom for Configuration { None }; - config.http_port = cli.http_port.unwrap_or(2020); + if let Some(http_port) = cli.http_port { + config.http_port = http_port + } config.network = NetworkConfiguration { autonat: cli.autonat.unwrap_or(false), From 7c4495048206464a315bd898941a476202035048 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Mon, 7 Aug 2023 22:43:50 +0100 Subject: [PATCH 02/14] `aquaddogo` configured to support any schema by default (#487) * Make `supported_schema_ids` an optional field on the config * Update tests * Set `supported_schema_ids` to None when no config.toml file present * Adjust `SchemaProvider` constructor and getter methods * Replication service targets and accepts all schema by default * `clippy` * Update CHANGELOG --- CHANGELOG.md | 1 + aquadoggo/src/config.rs | 10 ++-- aquadoggo/src/node.rs | 30 +++-------- aquadoggo/src/replication/ingest.rs | 14 +++--- aquadoggo/src/replication/manager.rs | 6 +-- aquadoggo/src/replication/service.rs | 31 +++++++++--- aquadoggo/src/schema/schema_provider.rs | 67 ++++++------------------- aquadoggo/src/test_utils/runner.rs | 6 +-- aquadoggo/src/tests.rs | 4 +- aquadoggo_cli/src/main.rs | 10 ++-- 10 files changed, 68 insertions(+), 111 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cf102c13a..8a8d40004 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -53,6 +53,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Remove `quick_commit` from materialization service [#450](https://github.com/p2panda/aquadoggo/pull/450) - Reduce `warn` logging in network and replication services [#467](https://github.com/p2panda/aquadoggo/pull/467) - `mdns` and `autonat` disabled by default [#475](https://github.com/p2panda/aquadoggo/pull/475) +- By default, nodes support _any_ schema [#487](https://github.com/p2panda/aquadoggo/pull/487) ### Fixed diff --git a/aquadoggo/src/config.rs b/aquadoggo/src/config.rs index aef6b0d65..1712a3fed 100644 --- a/aquadoggo/src/config.rs +++ b/aquadoggo/src/config.rs @@ -46,10 +46,9 @@ pub struct Configuration { pub worker_pool_size: u32, /// The ids of schema this node supports. - pub supported_schema_ids: Vec, - - /// If set to true then the node will dynamically support any new schema it replicates. - pub dynamic_schema: bool, + /// + /// If `None` then the node will support all system schema and any new schema it discovers. + pub supported_schema_ids: Option>, } impl Default for Configuration { @@ -61,8 +60,7 @@ impl Default for Configuration { http_port: 2020, network: NetworkConfiguration::default(), worker_pool_size: 16, - supported_schema_ids: vec![], - dynamic_schema: false, + supported_schema_ids: None, } } } diff --git a/aquadoggo/src/node.rs b/aquadoggo/src/node.rs index 4d2187c18..027466ad3 100644 --- a/aquadoggo/src/node.rs +++ b/aquadoggo/src/node.rs @@ -1,9 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use anyhow::Result; -use log::{debug, info}; use p2panda_rs::identity::KeyPair; -use p2panda_rs::schema::SYSTEM_SCHEMAS; use crate::bus::ServiceMessage; use crate::config::Configuration; @@ -60,29 +58,13 @@ impl Node { // Prepare storage and schema providers using connection pool let store = SqlStore::new(pool.clone()); - // If the `dynamic_schema` flag is set then this node will be configured to support any - // schema that it discovers. Otherwise it is configured to support only the schema - // identified by their id in the `config.toml` file. - let schema_provider = if config.dynamic_schema { - SchemaProvider::default() - } else { - SchemaProvider::new_with_supported_schema(config.supported_schema_ids.clone()) - }; - - // Attempt to add any known schema to the schema provider. - let mut all_schemas = SYSTEM_SCHEMAS.clone(); + // Initiate the SchemaProvider with all currently known schema from the store. + // + // If supported_schema_ids are provided then only schema identified in this list will be + // added to the provider and supported by the node. let application_schema = store.get_all_schema().await.unwrap(); - all_schemas.extend(&application_schema); - - for schema in all_schemas { - match schema_provider.update(schema.clone()).await { - Ok(_) => info!("Schema added to schema provider: {}", schema.id()), - Err(_) => debug!( - "Schema not added to schema provider: not supported {}", - schema.id() - ), - } - } + let schema_provider = + SchemaProvider::new(application_schema, config.supported_schema_ids.clone()); // Create service manager with shared data between services let context = Context::new(store, key_pair, config, schema_provider); diff --git a/aquadoggo/src/replication/ingest.rs b/aquadoggo/src/replication/ingest.rs index c58eb2502..bfb52ba32 100644 --- a/aquadoggo/src/replication/ingest.rs +++ b/aquadoggo/src/replication/ingest.rs @@ -49,14 +49,12 @@ impl SyncIngest { let plain_operation = decode_operation(encoded_operation)?; - // Check that the sent operation follows one of our supported schema. - if !self - .schema_provider - .supported_schema() - .await - .contains(plain_operation.schema_id()) - { - return Err(IngestError::UnsupportedSchema); + // If the node has been configured with supported_schema_ids, check that the sent + // operation follows one of our supported schema. + if let Some(supported_schema_ids) = self.schema_provider.supported_schema_ids() { + if supported_schema_ids.contains(plain_operation.schema_id()) { + return Err(IngestError::UnsupportedSchema); + } } // Retrieve the schema if it has been materialized on the node. diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index 97b59f9f8..675b41801 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -466,9 +466,9 @@ where ) .await { - // Duplicate entries arriving at a node, or the case where a schema has not been - // materialized yet, we don't want to treat as an error. This is expected - // behavior which may occur when concurrent sync sessions are running. + // When duplicate entries arrive at a node, or a schema is not materialized yet, + // we don't want to treat as an error. This is expected behavior which may occur + // when concurrent sync sessions are running. Ok(_) | Err(IngestError::DuplicateEntry(_)) | Err(IngestError::SchemaNotFound) => { Ok(SyncResult { messages: vec![], diff --git a/aquadoggo/src/replication/service.rs b/aquadoggo/src/replication/service.rs index b813773a8..66b5fade9 100644 --- a/aquadoggo/src/replication/service.rs +++ b/aquadoggo/src/replication/service.rs @@ -137,7 +137,19 @@ impl ConnectionManager { /// Returns set of schema ids we are interested in and support on this node. async fn target_set(&self) -> TargetSet { - TargetSet::new(&self.schema_provider.supported_schema().await) + let supported_schema = match self.schema_provider.supported_schema_ids() { + // If supported_schema_ids is set return this list. + Some(supported_schema_ids) => supported_schema_ids.to_owned(), + // Otherwise return ids for all schema we know about on this node. + None => self + .schema_provider + .all() + .await + .iter() + .map(|schema| schema.id().to_owned()) + .collect(), + }; + TargetSet::new(&supported_schema) } /// Register a new peer connection on the manager. @@ -178,7 +190,12 @@ impl ConnectionManager { // If this is a SyncRequest message first we check if the contained TargetSet matches our // own locally configured TargetSet. if let Message::SyncRequest(_, target_set) = message.message() { - if target_set != &self.target_set().await { + // If this node has been configured with supported_schema_ids then we check the target + // set of the requests matches our own, otherwise we skip this step and accept any + // target set. + if self.schema_provider.supported_schema_ids().is_some() + && target_set != &self.target_set().await + { // If it doesn't match we signal that an error occurred and return at this point. self.on_replication_error(peer, session_id, ReplicationError::UnsupportedTargetSet) .await; @@ -382,6 +399,7 @@ mod tests { use crate::network::Peer; use crate::replication::service::PeerStatus; use crate::replication::{Message, Mode, SyncMessage, TargetSet}; + use crate::schema::SchemaProvider; use crate::test_utils::{test_runner, TestNode}; use super::ConnectionManager; @@ -452,12 +470,9 @@ mod tests { test_runner(move |node: TestNode| async move { let (tx, mut rx) = broadcast::channel::(10); - let mut manager = ConnectionManager::new( - &node.context.schema_provider, - &node.context.store, - &tx, - local_peer_id, - ); + let schema_provider = SchemaProvider::new(vec![], Some(vec![])); + let mut manager = + ConnectionManager::new(&schema_provider, &node.context.store, &tx, local_peer_id); let remote_peer = Peer::new(remote_peer_id, ConnectionId::new_unchecked(1)); diff --git a/aquadoggo/src/schema/schema_provider.rs b/aquadoggo/src/schema/schema_provider.rs index d093918f1..d548e68b5 100644 --- a/aquadoggo/src/schema/schema_provider.rs +++ b/aquadoggo/src/schema/schema_provider.rs @@ -20,7 +20,7 @@ pub struct SchemaProvider { /// Optional list of schema this provider supports. If set only these schema will be added to the schema /// registry once materialized. - supported_schema: Option>, + supported_schema_ids: Option>, /// Sender for broadcast channel informing subscribers about updated schemas. tx: Sender, @@ -28,7 +28,10 @@ pub struct SchemaProvider { impl SchemaProvider { /// Returns a `SchemaProvider` containing the given application schemas and all system schemas. - pub fn new(application_schemas: Vec) -> Self { + pub fn new( + application_schemas: Vec, + supported_schema_ids: Option>, + ) -> Self { // Collect all system and application schemas. let mut schemas = SYSTEM_SCHEMAS.clone(); schemas.extend(&application_schemas); @@ -39,41 +42,9 @@ impl SchemaProvider { index.insert(schema.id().to_owned(), schema.to_owned()); } - let (tx, _) = channel(64); - - debug!( - "Initialised schema provider:\n- {}", - index - .values() - .map(|schema| schema.to_string()) - .collect::>() - .join("\n- ") - ); - - Self { - schemas: Arc::new(Mutex::new(index)), - supported_schema: None, - tx, - } - } - - pub fn new_with_supported_schema(supported_schema: Vec) -> Self { - // Validate that the passed known schema are all mentioned in the supported schema list. - - // Collect all system and application schemas. - let system_schemas = SYSTEM_SCHEMAS.clone(); - - // Filter system schema against passed supported schema and collect into index Hashmap. - let index: HashMap = system_schemas - .into_iter() - .filter_map(|schema| { - if supported_schema.contains(schema.id()) { - Some((schema.id().to_owned(), schema.to_owned())) - } else { - None - } - }) - .collect(); + if let Some(supported_schema_ids) = &supported_schema_ids { + index.retain(|schema_id, _| supported_schema_ids.contains(schema_id)); + }; let (tx, _) = channel(64); @@ -88,7 +59,7 @@ impl SchemaProvider { Self { schemas: Arc::new(Mutex::new(index)), - supported_schema: Some(supported_schema), + supported_schema_ids, tx, } } @@ -113,7 +84,7 @@ impl SchemaProvider { /// Returns `true` if a schema was updated or it already existed in it's current state, and /// `false` if it was inserted. pub async fn update(&self, schema: Schema) -> Result { - if let Some(supported_schema) = self.supported_schema.as_ref() { + if let Some(supported_schema) = self.supported_schema_ids.as_ref() { if !supported_schema.contains(schema.id()) { return Err(anyhow!( "Attempted to add unsupported schema to schema provider" @@ -143,21 +114,15 @@ impl SchemaProvider { Ok(is_update) } - // Return the configured supported schema, or all schema we know about if no restrictions on - // schema have been set. - pub async fn supported_schema(&self) -> Vec { - match &self.supported_schema { - Some(supported_schema) => supported_schema.to_owned(), - // If `supported_schema` is None it means there are no limits on schema and so we - // support all schema we know about. - None => self.schemas.lock().await.keys().cloned().collect(), - } + // Return the configured supported schema. + pub fn supported_schema_ids(&self) -> Option<&Vec> { + self.supported_schema_ids.as_ref() } } impl Default for SchemaProvider { fn default() -> Self { - Self::new(Vec::new()) + Self::new(Vec::new(), None) } } @@ -214,7 +179,7 @@ mod test { &[("test_field", FieldType::String)], ) .unwrap(); - let provider = SchemaProvider::new_with_supported_schema(vec![new_schema_id.clone()]); + let provider = SchemaProvider::new(vec![], Some(vec![new_schema_id.clone()])); let result = provider.update(new_schema).await; assert!(result.is_ok()); assert!(!result.unwrap()); @@ -224,7 +189,7 @@ mod test { #[tokio::test] async fn update_unsupported_schemas() { - let provider = SchemaProvider::new_with_supported_schema(vec![]); + let provider = SchemaProvider::new(vec![], Some(vec![])); let new_schema_id = SchemaId::Application( SchemaName::new("test_schema").unwrap(), random_document_view_id(), diff --git a/aquadoggo/src/test_utils/runner.rs b/aquadoggo/src/test_utils/runner.rs index 7360d8c91..634890fba 100644 --- a/aquadoggo/src/test_utils/runner.rs +++ b/aquadoggo/src/test_utils/runner.rs @@ -66,8 +66,7 @@ impl TestNodeManager { let store = SqlStore::new(pool.clone()); // Construct node config supporting any schema. - let mut cfg = Configuration::default(); - cfg.dynamic_schema = true; + let cfg = Configuration::default(); // Construct the actual test node let test_node = TestNode { @@ -103,8 +102,7 @@ pub fn test_runner(test: F) { let store = SqlStore::new(pool); // Construct node config supporting any schema. - let mut cfg = Configuration::default(); - cfg.dynamic_schema = true; + let cfg = Configuration::default(); // Construct the actual test node let node = TestNode { diff --git a/aquadoggo/src/tests.rs b/aquadoggo/src/tests.rs index 0babda9ea..00ba33122 100644 --- a/aquadoggo/src/tests.rs +++ b/aquadoggo/src/tests.rs @@ -43,9 +43,7 @@ async fn e2e() { // default options. The only thing we want to do change is the database config. We want an // in-memory sqlite database for this test. - let mut config = Configuration::new_ephemeral(); - // In this demo we any schema to be automatically supported by the node. - config.dynamic_schema = true; + let config = Configuration::new_ephemeral(); let key_pair = KeyPair::new(); // Start the node. diff --git a/aquadoggo_cli/src/main.rs b/aquadoggo_cli/src/main.rs index 077b082d8..f5a03ba22 100644 --- a/aquadoggo_cli/src/main.rs +++ b/aquadoggo_cli/src/main.rs @@ -182,10 +182,12 @@ async fn main() { // Read schema ids from config.toml file or let supported_schemas = match File::open(CONFIG_FILE_PATH) { - Ok(mut file) => schemas::read_schema_ids_from_file(&mut file), - Err(_) => Ok(vec![]), - } - .expect("Reading schema ids from config.toml failed"); + Ok(mut file) => Some( + schemas::read_schema_ids_from_file(&mut file) + .expect("Reading schema ids from config.toml failed"), + ), + Err(_) => None, + }; config.supported_schema_ids = supported_schemas; // We unwrap the path as we know it has been initialised during the conversion step before From 170e8619fb38d1d67f820b1f015fe41a597b01e1 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 3 Aug 2023 17:32:03 +0100 Subject: [PATCH 03/14] Use p2panda-rs version which has blob schema --- Cargo.lock | 3 +-- aquadoggo/Cargo.toml | 4 ++-- aquadoggo_cli/Cargo.toml | 2 +- 3 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9099ebd00..3c30d6fa4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3081,8 +3081,7 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" [[package]] name = "p2panda-rs" version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "462f0e5a6df45b0b9ad387fdf77f9d43c75610d72036c1b1de0aebfe10f434b0" +source = "git+https://github.com/p2panda/p2panda?rev=17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2#17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2" dependencies = [ "arrayvec 0.5.2", "async-trait", diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index 8d7bec918..ba2fb75b4 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -55,7 +55,7 @@ lipmaa-link = "0.2.2" log = "0.4.19" once_cell = "1.18.0" openssl-probe = "0.1.5" -p2panda-rs = { version = "0.7.1", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2", features = [ "storage-provider", ] } rand = "0.8.5" @@ -90,7 +90,7 @@ http = "0.2.9" hyper = "0.14.19" libp2p-swarm-test = "0.2.0" once_cell = "1.17.0" -p2panda-rs = { version = "0.7.1", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2", features = [ "test-utils", "storage-provider", ] } diff --git a/aquadoggo_cli/Cargo.toml b/aquadoggo_cli/Cargo.toml index 959217228..747ec56df 100644 --- a/aquadoggo_cli/Cargo.toml +++ b/aquadoggo_cli/Cargo.toml @@ -25,7 +25,7 @@ clap = { version = "4.1.8", features = ["derive"] } env_logger = "0.9.0" hex = "0.4.3" libp2p = "0.52.0" -p2panda-rs = "0.7.1" +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "17f4fcb1dcf7cebabd6d9b5a824399e9384d96b2" } tokio = { version = "1.28.2", features = ["full"] } toml = "0.7.6" From 131b6822c01d73eb9bf87404db32865b37a2df97 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 3 Aug 2023 18:05:18 +0100 Subject: [PATCH 04/14] `SchemaProvider` now has 4 system schema :-) --- aquadoggo/src/schema/schema_provider.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/schema/schema_provider.rs b/aquadoggo/src/schema/schema_provider.rs index d548e68b5..2fef78630 100644 --- a/aquadoggo/src/schema/schema_provider.rs +++ b/aquadoggo/src/schema/schema_provider.rs @@ -137,7 +137,7 @@ mod test { async fn get_all_schemas() { let provider = SchemaProvider::default(); let result = provider.all().await; - assert_eq!(result.len(), 2); + assert_eq!(result.len(), 4); } #[tokio::test] From 618fbe4680a0590c7a6e62899b86aa34493a8dd5 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 4 Aug 2023 10:08:14 +0100 Subject: [PATCH 05/14] Update proptests --- aquadoggo/src/proptests/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/proptests/tests.rs b/aquadoggo/src/proptests/tests.rs index 60db777c6..8aa72e367 100644 --- a/aquadoggo/src/proptests/tests.rs +++ b/aquadoggo/src/proptests/tests.rs @@ -28,7 +28,7 @@ async fn sanity_checks( schemas: &Vec, ) { let node_schemas = node.context.schema_provider.all().await; - assert_eq!(schemas.len(), node_schemas.len() - 2); // minus 2 for system schema + assert_eq!(schemas.len(), node_schemas.len() - 4); // minus 4 for system schema for schema_id in schemas { let result = node .context From 5f2ccbdebb781aa6fa66abed0cbe840a57bd525d Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 4 Aug 2023 13:22:09 +0100 Subject: [PATCH 06/14] Implement BlobStore with `get_blob` method --- aquadoggo/src/db/errors.rs | 44 +++++++-- aquadoggo/src/db/stores/blob.rs | 152 ++++++++++++++++++++++++++++++++ aquadoggo/src/db/stores/mod.rs | 1 + 3 files changed, 192 insertions(+), 5 deletions(-) create mode 100644 aquadoggo/src/db/stores/blob.rs diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs index 76fc4724e..8cca038ee 100644 --- a/aquadoggo/src/db/errors.rs +++ b/aquadoggo/src/db/errors.rs @@ -13,16 +13,15 @@ pub enum SqlStoreError { #[error("Deletion of row from table {0} did not show any effect")] Deletion(String), + + /// Error returned from BlobStore. + #[error(transparent)] + BlobStoreError(#[from] BlobStoreError), } /// `SchemaStore` errors. #[derive(Error, Debug)] pub enum SchemaStoreError { - /// Catch all error which implementers can use for passing their own errors up the chain. - #[error("Error occured in DocumentStore: {0}")] - #[allow(dead_code)] - Custom(String), - /// Error returned from converting p2panda-rs `DocumentView` into `SchemaView. #[error(transparent)] SystemSchemaError(#[from] SystemSchemaError), @@ -43,3 +42,38 @@ pub enum SchemaStoreError { #[error(transparent)] OperationStorageError(#[from] OperationStorageError), } + +#[derive(Error, Debug)] +pub enum BlobStoreError { + /// Error when no "pieces" field found on blob document. + #[error("Missing \"pieces\" field on blob document")] + MissingPiecesField, + + /// Error when no "length" field found on blob document. + #[error("Missing \"length\" field on blob document")] + MissingLengthField, + + /// Error when no "data" field found on blob pieces document. + #[error("Missing \"data\" field on blob pieces document")] + MissingDataField, + + /// Error when no "pieces" field found on blob document. + #[error("Missing \"pieces\" field on blob document")] + NotBlobDocument, + + /// Error when no pieces found for existing blob document. + #[error("No pieces found for the requested blob")] + NoBlobPiecesFound, + + /// Error when some pieces not found for existing blob document. + #[error("Some pieces missing for the requested blob")] + MissingPieces, + + /// Error when combined pieces length and claimed blob length don't match. + #[error("The combined pieces length and claimed blob length don't match")] + IncorrectLength, + + /// Error returned from `DocumentStore` methods. + #[error(transparent)] + DocumentStorageError(#[from] DocumentStorageError), +} diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs new file mode 100644 index 000000000..8e62c68c0 --- /dev/null +++ b/aquadoggo/src/db/stores/blob.rs @@ -0,0 +1,152 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::num::NonZeroU64; + +use p2panda_rs::document::traits::AsDocument; +use p2panda_rs::document::DocumentId; +use p2panda_rs::operation::OperationValue; +use p2panda_rs::schema::{Schema, SchemaId}; +use p2panda_rs::storage_provider::traits::DocumentStore; + +use crate::db::errors::BlobStoreError; +use crate::db::query::{Field, Filter, Order, Pagination, Select}; +use crate::db::stores::query::{Query, RelationList}; +use crate::db::SqlStore; + +/// The maximum allowed number of blob pieces per blob. +/// @TODO: do we want this? If so, what value should it be and we should add this to +/// p2panda-rs blob validation too. +const MAX_BLOB_PIECES: u64 = 10000; + +pub type BlobData = String; + +impl SqlStore { + /// Get the data for one blob from the store, identified by it's document id. + pub async fn get_blob(&self, id: &DocumentId) -> Result, BlobStoreError> { + // Get the root blob document. + let blob = match self.get_document(id).await? { + Some(document) => { + if document.schema_id != SchemaId::Blob(1) { + return Err(BlobStoreError::NotBlobDocument); + } + document + } + None => return Ok(None), + }; + + // Get the length of the blob. + let length = match blob.get("length").unwrap() { + OperationValue::Integer(length) => length, + _ => return Err(BlobStoreError::MissingLengthField), + }; + + // Get the number of pieces in the blob. + let num_pieces = match blob.get("pieces").unwrap() { + OperationValue::PinnedRelationList(list) => list.len(), + _ => return Err(BlobStoreError::MissingPiecesField), + }; + + // Now collect all exiting pieces for the blob. + // + // We do this using the stores' query method, targeting pieces which are in the relation + // list of the blob. + let schema = Schema::get_system(SchemaId::BlobPiece(1)).unwrap(); + let list = RelationList::new_pinned(&blob.view_id(), "pieces".into()); + let mut pagination = Pagination::default(); + + pagination.first = NonZeroU64::new(MAX_BLOB_PIECES).unwrap(); + let args = Query::new( + &pagination, + &Select::new(&[Field::new("data")]), + &Filter::default(), + &Order::default(), + ); + + let (_, results) = self.query(&schema, &args, Some(&list)).await?; + + // No pieces were found. + if results.is_empty() { + return Err(BlobStoreError::NoBlobPiecesFound); + }; + + // Not all pieces were found. + if results.len() != num_pieces { + return Err(BlobStoreError::MissingPieces); + } + + // Now we construct the blob data. + let mut blob_data = "".to_string(); + + for (_, blob_piece_document) in results { + match blob_piece_document + .get("data") + .expect("Blob piece document without \"data\" field") + { + OperationValue::String(data_str) => blob_data += data_str, + _ => return Err(BlobStoreError::MissingPiecesField), + } + } + + // Combined blob data length doesn't match the claimed length. + if blob_data.len() != *length as usize { + return Err(BlobStoreError::IncorrectLength); + }; + + Ok(Some(blob_data)) + } +} + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentId; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::schema::SchemaId; + use p2panda_rs::test_utils::fixtures::key_pair; + use rstest::rstest; + + use crate::test_utils::{add_document, test_runner, TestNode}; + + #[rstest] + fn get_blob(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + let blob_data = "Hello, World!".to_string(); + + let blob_piece_view_id_1 = add_document( + &mut node, + &SchemaId::BlobPiece(1), + vec![("data", blob_data[..5].into())], + &key_pair, + ) + .await; + + let blob_piece_view_id_2 = add_document( + &mut node, + &SchemaId::BlobPiece(1), + vec![("data", blob_data[5..].into())], + &key_pair, + ) + .await; + let blob_piece_view_id = add_document( + &mut node, + &SchemaId::Blob(1), + vec![ + ("length", { blob_data.len() as i64 }.into()), + ("mime_type", "text/plain".into()), + ( + "pieces", + vec![blob_piece_view_id_1, blob_piece_view_id_2].into(), + ), + ], + &key_pair, + ) + .await; + + let document_id: DocumentId = blob_piece_view_id.to_string().parse().unwrap(); + + let blob = node.context.store.get_blob(&document_id).await.unwrap(); + + assert!(blob.is_some()); + assert_eq!(blob.unwrap(), blob_data) + }) + } +} diff --git a/aquadoggo/src/db/stores/mod.rs b/aquadoggo/src/db/stores/mod.rs index 440b4b3e4..e5ccebf8e 100644 --- a/aquadoggo/src/db/stores/mod.rs +++ b/aquadoggo/src/db/stores/mod.rs @@ -2,6 +2,7 @@ //! Implementations of all `p2panda-rs` defined storage provider traits and additionally //! `aquadoggo` specific interfaces. +mod blob; pub mod document; mod entry; mod log; From 9d4c8ccacfba49d5d7afdc3f5c4230467893fa1f Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 4 Aug 2023 17:24:22 +0100 Subject: [PATCH 07/14] Clippy happy --- aquadoggo/src/db/errors.rs | 10 +++++----- aquadoggo/src/db/stores/blob.rs | 11 ++++++----- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs index 8cca038ee..95c03e040 100644 --- a/aquadoggo/src/db/errors.rs +++ b/aquadoggo/src/db/errors.rs @@ -24,23 +24,23 @@ pub enum SqlStoreError { pub enum SchemaStoreError { /// Error returned from converting p2panda-rs `DocumentView` into `SchemaView. #[error(transparent)] - SystemSchemaError(#[from] SystemSchemaError), + SystemSchema(#[from] SystemSchemaError), /// Error returned from p2panda-rs `Schema` methods. #[error(transparent)] - SchemaError(#[from] SchemaError), + Schema(#[from] SchemaError), /// Error returned from p2panda-rs `SchemaId` methods. #[error(transparent)] - SchemaIdError(#[from] SchemaIdError), + SchemaId(#[from] SchemaIdError), /// Error returned from `DocumentStore` methods. #[error(transparent)] - DocumentStorageError(#[from] DocumentStorageError), + DocumentStorage(#[from] DocumentStorageError), /// Error returned from `OperationStore` methods. #[error(transparent)] - OperationStorageError(#[from] OperationStorageError), + OperationStorage(#[from] OperationStorageError), } #[derive(Error, Debug)] diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index 8e62c68c0..110952253 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -51,10 +51,12 @@ impl SqlStore { // We do this using the stores' query method, targeting pieces which are in the relation // list of the blob. let schema = Schema::get_system(SchemaId::BlobPiece(1)).unwrap(); - let list = RelationList::new_pinned(&blob.view_id(), "pieces".into()); - let mut pagination = Pagination::default(); + let list = RelationList::new_pinned(blob.view_id(), "pieces"); + let pagination = Pagination { + first: NonZeroU64::new(MAX_BLOB_PIECES).unwrap(), + ..Default::default() + }; - pagination.first = NonZeroU64::new(MAX_BLOB_PIECES).unwrap(); let args = Query::new( &pagination, &Select::new(&[Field::new("data")]), @@ -62,7 +64,7 @@ impl SqlStore { &Order::default(), ); - let (_, results) = self.query(&schema, &args, Some(&list)).await?; + let (_, results) = self.query(schema, &args, Some(&list)).await?; // No pieces were found. if results.is_empty() { @@ -98,7 +100,6 @@ impl SqlStore { #[cfg(test)] mod tests { - use p2panda_rs::document::DocumentId; use p2panda_rs::identity::KeyPair; use p2panda_rs::schema::SchemaId; use p2panda_rs::test_utils::fixtures::key_pair; From e44b6cbb3fdf5e8314f391ae221842b721e0c708 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 4 Aug 2023 17:29:45 +0100 Subject: [PATCH 08/14] Fix imports --- aquadoggo/src/db/stores/blob.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index 110952253..ccdf61850 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -101,6 +101,7 @@ impl SqlStore { #[cfg(test)] mod tests { use p2panda_rs::identity::KeyPair; + use p2panda_rs::document::DocumentId; use p2panda_rs::schema::SchemaId; use p2panda_rs::test_utils::fixtures::key_pair; use rstest::rstest; From 805c3a83c42afe77ec659c310010a725fc97a18a Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 4 Aug 2023 17:34:32 +0100 Subject: [PATCH 09/14] fmt --- aquadoggo/src/db/stores/blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index ccdf61850..9a6c3da31 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -100,8 +100,8 @@ impl SqlStore { #[cfg(test)] mod tests { - use p2panda_rs::identity::KeyPair; use p2panda_rs::document::DocumentId; + use p2panda_rs::identity::KeyPair; use p2panda_rs::schema::SchemaId; use p2panda_rs::test_utils::fixtures::key_pair; use rstest::rstest; From 3c9aa827240951a78ce56b3bdec185048141039e Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 8 Aug 2023 15:48:36 +0100 Subject: [PATCH 10/14] Remove some BlobStoreErrors and error tests --- aquadoggo/src/db/errors.rs | 12 ---- aquadoggo/src/db/stores/blob.rs | 117 ++++++++++++++++++++++++++++++-- 2 files changed, 111 insertions(+), 18 deletions(-) diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs index 95c03e040..ae7b1253c 100644 --- a/aquadoggo/src/db/errors.rs +++ b/aquadoggo/src/db/errors.rs @@ -45,18 +45,6 @@ pub enum SchemaStoreError { #[derive(Error, Debug)] pub enum BlobStoreError { - /// Error when no "pieces" field found on blob document. - #[error("Missing \"pieces\" field on blob document")] - MissingPiecesField, - - /// Error when no "length" field found on blob document. - #[error("Missing \"length\" field on blob document")] - MissingLengthField, - - /// Error when no "data" field found on blob pieces document. - #[error("Missing \"data\" field on blob pieces document")] - MissingDataField, - /// Error when no "pieces" field found on blob document. #[error("Missing \"pieces\" field on blob document")] NotBlobDocument, diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index 9a6c3da31..9a6d255c7 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -37,13 +37,13 @@ impl SqlStore { // Get the length of the blob. let length = match blob.get("length").unwrap() { OperationValue::Integer(length) => length, - _ => return Err(BlobStoreError::MissingLengthField), + _ => panic!(), // We should never hit this as we already validated that this is a blob document. }; // Get the number of pieces in the blob. let num_pieces = match blob.get("pieces").unwrap() { OperationValue::PinnedRelationList(list) => list.len(), - _ => return Err(BlobStoreError::MissingPiecesField), + _ => panic!(), // We should never hit this as we already validated that this is a blob document. }; // Now collect all exiting pieces for the blob. @@ -85,7 +85,7 @@ impl SqlStore { .expect("Blob piece document without \"data\" field") { OperationValue::String(data_str) => blob_data += data_str, - _ => return Err(BlobStoreError::MissingPiecesField), + _ => panic!(), // We should never hit this as we only queried for blob piece documents. } } @@ -103,9 +103,10 @@ mod tests { use p2panda_rs::document::DocumentId; use p2panda_rs::identity::KeyPair; use p2panda_rs::schema::SchemaId; - use p2panda_rs::test_utils::fixtures::key_pair; + use p2panda_rs::test_utils::fixtures::{key_pair, random_document_view_id}; use rstest::rstest; + use crate::db::errors::BlobStoreError; use crate::test_utils::{add_document, test_runner, TestNode}; #[rstest] @@ -128,7 +129,7 @@ mod tests { &key_pair, ) .await; - let blob_piece_view_id = add_document( + let blob_view_id = add_document( &mut node, &SchemaId::Blob(1), vec![ @@ -143,7 +144,7 @@ mod tests { ) .await; - let document_id: DocumentId = blob_piece_view_id.to_string().parse().unwrap(); + let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); let blob = node.context.store.get_blob(&document_id).await.unwrap(); @@ -151,4 +152,108 @@ mod tests { assert_eq!(blob.unwrap(), blob_data) }) } + + #[rstest] + fn get_blob_errors(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + let blob_data = "Hello, World!".to_string(); + + // Publish a blob containing pieces which aren't in the store. + let blob_view_id = add_document( + &mut node, + &SchemaId::Blob(1), + vec![ + ("length", { blob_data.len() as i64 }.into()), + ("mime_type", "text/plain".into()), + ( + "pieces", + vec![random_document_view_id(), random_document_view_id()].into(), + ), + ], + &key_pair, + ) + .await; + + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + + // We get the correct `NoBlobPiecesFound` error. + let result = node.context.store.get_blob(&blob_document_id).await; + assert!( + matches!(result, Err(BlobStoreError::NoBlobPiecesFound)), + "{:?}", + result + ); + + // Publish one blob piece. + let blob_piece_view_id_1 = add_document( + &mut node, + &SchemaId::BlobPiece(1), + vec![("data", blob_data[..5].into())], + &key_pair, + ) + .await; + + // Publish a blob with one piece that is in the store and one that isn't. + let blob_view_id = add_document( + &mut node, + &SchemaId::Blob(1), + vec![ + ("length", { blob_data.len() as i64 }.into()), + ("mime_type", "text/plain".into()), + ( + "pieces", + vec![blob_piece_view_id_1.clone(), random_document_view_id()].into(), + ), + ], + &key_pair, + ) + .await; + + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + + // We should get the correct `MissingBlobPieces` error. + let result = node.context.store.get_blob(&blob_document_id).await; + assert!( + matches!(result, Err(BlobStoreError::MissingPieces)), + "{:?}", + result + ); + + // Publish one more blob piece, but it doesn't contain the correct number of bytes. + let blob_piece_view_id_2 = add_document( + &mut node, + &SchemaId::BlobPiece(1), + vec![("data", blob_data[9..].into())], + &key_pair, + ) + .await; + + // Publish a blob with two pieces that are in the store but they don't add up to the + // right byte length. + let blob_view_id = add_document( + &mut node, + &SchemaId::Blob(1), + vec![ + ("length", { blob_data.len() as i64 }.into()), + ("mime_type", "text/plain".into()), + ( + "pieces", + vec![blob_piece_view_id_1, blob_piece_view_id_2].into(), + ), + ], + &key_pair, + ) + .await; + + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + + // We get the correct `IncorrectLength` error. + let result = node.context.store.get_blob(&blob_document_id).await; + assert!( + matches!(result, Err(BlobStoreError::IncorrectLength)), + "{:?}", + result + ); + }) + } } From 8f774266ed9064f4072ae4e18e33ce08d22be06c Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 8 Aug 2023 15:49:40 +0100 Subject: [PATCH 11/14] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8a8d40004..96dd642cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Introduce peer sampling to the replication service [#463](https://github.com/p2panda/aquadoggo/pull/463) - Only replicate and materialize configured "supported schema" [#569](https://github.com/p2panda/aquadoggo/pull/469) - Parse supported schema ids from `config.toml` [#473](https://github.com/p2panda/aquadoggo/pull/473) +- Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484) ### Changed From dc832089446b3a4059a031473a7fb65e3679632f Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 8 Aug 2023 17:07:43 +0100 Subject: [PATCH 12/14] add `get_blob_by_view_id` method --- aquadoggo/src/db/stores/blob.rs | 152 ++++++++++++++++++++------------ 1 file changed, 95 insertions(+), 57 deletions(-) diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index 9a6d255c7..1fc07549e 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -3,7 +3,7 @@ use std::num::NonZeroU64; use p2panda_rs::document::traits::AsDocument; -use p2panda_rs::document::DocumentId; +use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::{Schema, SchemaId}; use p2panda_rs::storage_provider::traits::DocumentStore; @@ -24,7 +24,7 @@ impl SqlStore { /// Get the data for one blob from the store, identified by it's document id. pub async fn get_blob(&self, id: &DocumentId) -> Result, BlobStoreError> { // Get the root blob document. - let blob = match self.get_document(id).await? { + let blob_document = match self.get_document(id).await? { Some(document) => { if document.schema_id != SchemaId::Blob(1) { return Err(BlobStoreError::NotBlobDocument); @@ -33,69 +33,94 @@ impl SqlStore { } None => return Ok(None), }; + document_to_blob_data(&self, blob_document).await + } - // Get the length of the blob. - let length = match blob.get("length").unwrap() { - OperationValue::Integer(length) => length, - _ => panic!(), // We should never hit this as we already validated that this is a blob document. - }; - - // Get the number of pieces in the blob. - let num_pieces = match blob.get("pieces").unwrap() { - OperationValue::PinnedRelationList(list) => list.len(), - _ => panic!(), // We should never hit this as we already validated that this is a blob document. - }; - - // Now collect all exiting pieces for the blob. - // - // We do this using the stores' query method, targeting pieces which are in the relation - // list of the blob. - let schema = Schema::get_system(SchemaId::BlobPiece(1)).unwrap(); - let list = RelationList::new_pinned(blob.view_id(), "pieces"); - let pagination = Pagination { - first: NonZeroU64::new(MAX_BLOB_PIECES).unwrap(), - ..Default::default() - }; - - let args = Query::new( - &pagination, - &Select::new(&[Field::new("data")]), - &Filter::default(), - &Order::default(), - ); - - let (_, results) = self.query(schema, &args, Some(&list)).await?; - - // No pieces were found. - if results.is_empty() { - return Err(BlobStoreError::NoBlobPiecesFound); + /// Get the data for one blob from the store, identified by it's document view id. + pub async fn get_blob_by_view_id( + &self, + view_id: &DocumentViewId, + ) -> Result, BlobStoreError> { + // Get the root blob document. + let blob_document = match self.get_document_by_view_id(view_id).await? { + Some(document) => { + if document.schema_id != SchemaId::Blob(1) { + return Err(BlobStoreError::NotBlobDocument); + } + document + } + None => return Ok(None), }; + document_to_blob_data(&self, blob_document).await + } +} - // Not all pieces were found. - if results.len() != num_pieces { - return Err(BlobStoreError::MissingPieces); - } +/// Helper method for validation and parsing a document into blob data. +async fn document_to_blob_data( + store: &SqlStore, + blob: impl AsDocument, +) -> Result, BlobStoreError> { + // Get the length of the blob. + let length = match blob.get("length").unwrap() { + OperationValue::Integer(length) => length, + _ => panic!(), // We should never hit this as we already validated that this is a blob document. + }; + + // Get the number of pieces in the blob. + let num_pieces = match blob.get("pieces").unwrap() { + OperationValue::PinnedRelationList(list) => list.len(), + _ => panic!(), // We should never hit this as we already validated that this is a blob document. + }; + + // Now collect all exiting pieces for the blob. + // + // We do this using the stores' query method, targeting pieces which are in the relation + // list of the blob. + let schema = Schema::get_system(SchemaId::BlobPiece(1)).unwrap(); + let list = RelationList::new_pinned(blob.view_id(), "pieces"); + let pagination = Pagination { + first: NonZeroU64::new(MAX_BLOB_PIECES).unwrap(), + ..Default::default() + }; + + let args = Query::new( + &pagination, + &Select::new(&[Field::new("data")]), + &Filter::default(), + &Order::default(), + ); + + let (_, results) = store.query(schema, &args, Some(&list)).await?; + + // No pieces were found. + if results.is_empty() { + return Err(BlobStoreError::NoBlobPiecesFound); + }; + + // Not all pieces were found. + if results.len() != num_pieces { + return Err(BlobStoreError::MissingPieces); + } - // Now we construct the blob data. - let mut blob_data = "".to_string(); + // Now we construct the blob data. + let mut blob_data = "".to_string(); - for (_, blob_piece_document) in results { - match blob_piece_document - .get("data") - .expect("Blob piece document without \"data\" field") - { - OperationValue::String(data_str) => blob_data += data_str, - _ => panic!(), // We should never hit this as we only queried for blob piece documents. - } + for (_, blob_piece_document) in results { + match blob_piece_document + .get("data") + .expect("Blob piece document without \"data\" field") + { + OperationValue::String(data_str) => blob_data += data_str, + _ => panic!(), // We should never hit this as we only queried for blob piece documents. } + } - // Combined blob data length doesn't match the claimed length. - if blob_data.len() != *length as usize { - return Err(BlobStoreError::IncorrectLength); - }; + // Combined blob data length doesn't match the claimed length. + if blob_data.len() != *length as usize { + return Err(BlobStoreError::IncorrectLength); + }; - Ok(Some(blob_data)) - } + Ok(Some(blob_data)) } #[cfg(test)] @@ -114,6 +139,7 @@ mod tests { test_runner(|mut node: TestNode| async move { let blob_data = "Hello, World!".to_string(); + // Publish blob pieces and blob. let blob_piece_view_id_1 = add_document( &mut node, &SchemaId::BlobPiece(1), @@ -146,8 +172,20 @@ mod tests { let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + // Get blob by document id. let blob = node.context.store.get_blob(&document_id).await.unwrap(); + assert!(blob.is_some()); + assert_eq!(blob.unwrap(), blob_data); + + // Get blob by view id. + let blob = node + .context + .store + .get_blob_by_view_id(&blob_view_id) + .await + .unwrap(); + assert!(blob.is_some()); assert_eq!(blob.unwrap(), blob_data) }) From 3b38ff0553691af10d8b907f3892a1fd31b530e7 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Tue, 8 Aug 2023 17:09:07 +0100 Subject: [PATCH 13/14] clippy --- aquadoggo/src/db/stores/blob.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index 1fc07549e..e413e32e8 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -33,7 +33,7 @@ impl SqlStore { } None => return Ok(None), }; - document_to_blob_data(&self, blob_document).await + document_to_blob_data(self, blob_document).await } /// Get the data for one blob from the store, identified by it's document view id. @@ -51,7 +51,7 @@ impl SqlStore { } None => return Ok(None), }; - document_to_blob_data(&self, blob_document).await + document_to_blob_data(self, blob_document).await } } From a6981fd2a5ef82c739b0014cc4570fa5f12f3c44 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 9 Aug 2023 10:36:47 +0100 Subject: [PATCH 14/14] Enable deletion of dangling `document_views` and related `document_view_fields` from db (#491) * Add fk to document_view_fields with cascading DELETE * Introduce `prune_document_views` method to `DocumentStore` * Test for pruning document views * Test that pinned views don't get deleted * Update CHANGELOG * Clippy * Remove fk constraint on `operation_id` in `document_view_fields` table * Change table creation order in documents migration * Use IS NULL in SQL conditional * Don't use alias in SQL query --- CHANGELOG.md | 1 + .../20220510022755_create-documents.sql | 18 +- aquadoggo/src/db/stores/document.rs | 188 +++++++++++++++++- 3 files changed, 196 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 96dd642cd..a16e92971 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Introduce peer sampling to the replication service [#463](https://github.com/p2panda/aquadoggo/pull/463) - Only replicate and materialize configured "supported schema" [#569](https://github.com/p2panda/aquadoggo/pull/469) - Parse supported schema ids from `config.toml` [#473](https://github.com/p2panda/aquadoggo/pull/473) +- Add method to store for pruning document views [#491](https://github.com/p2panda/aquadoggo/pull/491) - Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484) ### Changed diff --git a/aquadoggo/migrations/20220510022755_create-documents.sql b/aquadoggo/migrations/20220510022755_create-documents.sql index e1b2d0850..cc32860c5 100644 --- a/aquadoggo/migrations/20220510022755_create-documents.sql +++ b/aquadoggo/migrations/20220510022755_create-documents.sql @@ -1,20 +1,20 @@ -- SPDX-License-Identifier: AGPL-3.0-or-later -CREATE TABLE IF NOT EXISTS document_view_fields ( - document_view_id TEXT NOT NULL, - operation_id TEXT NOT NULL, - name TEXT NOT NULL, - FOREIGN KEY(operation_id) REFERENCES operations_v1(operation_id) -); - -CREATE INDEX idx_document_view_fields ON document_view_fields (document_view_id, operation_id, name); - CREATE TABLE IF NOT EXISTS document_views ( document_view_id TEXT NOT NULL UNIQUE, schema_id TEXT NOT NULL, PRIMARY KEY (document_view_id) ); +CREATE TABLE IF NOT EXISTS document_view_fields ( + document_view_id TEXT NOT NULL, + operation_id TEXT NOT NULL, + name TEXT NOT NULL, + FOREIGN KEY(document_view_id) REFERENCES document_views(document_view_id) ON DELETE CASCADE +); + +CREATE INDEX idx_document_view_fields ON document_view_fields (document_view_id, operation_id, name); + CREATE TABLE IF NOT EXISTS documents ( document_id TEXT NOT NULL UNIQUE, document_view_id TEXT NOT NULL, diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index 463880a2d..22db8423b 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -365,6 +365,80 @@ impl SqlStore { .await .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string())) } + + /// Iterate over all views of a document and delete any which: + /// - are not the current view + /// - _and_ no document field exists in the database which contains a pinned relation to this view + #[allow(dead_code)] + async fn prune_document_views( + &self, + document_id: &DocumentId, + ) -> Result<(), DocumentStorageError> { + // Start a transaction, any db insertions after this point, and before the `commit()` + // will be rolled back in the event of an error. + let mut tx = self + .pool + .begin() + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + // Collect all views _except_ the current view for this document + let document_view_ids: Vec = query_scalar( + " + SELECT + document_views.document_view_id, + documents.document_view_id + FROM + document_views + LEFT JOIN + documents + ON + documents.document_view_id = document_views.document_view_id + WHERE + document_views.document_id = $1 + AND + documents.document_view_id IS NULL + ", + ) + .bind(document_id.as_str()) + .fetch_all(&mut tx) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + + // Iterate over all document views and delete them if no document field exists in the + // database which contains a pinned relation to this view. + // + // Deletes on "document_views" cascade to "document_view_fields" so rows there are also removed + // from the database. + for document_view_id in document_view_ids { + query( + " + DELETE FROM + document_views + WHERE + document_views.document_view_id = $1 + AND NOT EXISTS ( + SELECT * FROM operation_fields_v1 + WHERE + operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list') + AND + operation_fields_v1.value = $1 + ) + ", + ) + .bind(document_view_id) + .execute(&mut tx) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + } + + // Commit the tx here as no errors occurred. + tx.commit() + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + Ok(()) + } } // Helper method for getting rows from the `document_view_fields` table. @@ -532,20 +606,24 @@ mod tests { use p2panda_rs::document::materialization::build_graph; use p2panda_rs::document::traits::AsDocument; use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId}; + use p2panda_rs::identity::KeyPair; use p2panda_rs::operation::traits::AsOperation; use p2panda_rs::operation::{Operation, OperationId}; use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore}; use p2panda_rs::test_utils::constants; use p2panda_rs::test_utils::fixtures::{ - operation, random_document_id, random_document_view_id, random_operation_id, + key_pair, operation, random_document_id, random_document_view_id, random_operation_id, }; use p2panda_rs::test_utils::memory_store::helpers::{populate_store, PopulateStoreConfig}; use p2panda_rs::WithId; use rstest::rstest; use crate::db::stores::document::DocumentView; + use crate::materializer::tasks::reduce_task; + use crate::materializer::TaskInput; use crate::test_utils::{ - build_document, populate_and_materialize, populate_store_config, test_runner, TestNode, + add_schema_and_documents, build_document, populate_and_materialize, populate_store_config, + test_runner, TestNode, }; #[rstest] @@ -928,4 +1006,110 @@ mod tests { assert_eq!(schema_documents.len(), 10); }); } + + #[rstest] + fn prunes_document_views( + #[from(populate_store_config)] + #[with(2, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(|mut node: TestNode| async move { + // Populate the store and materialize all documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + let first_document_view_id: DocumentViewId = document_id.as_str().parse().unwrap(); + + // Get the current document from the store. + let current_document = node.context.store.get_document(&document_id).await.unwrap(); + + // Get the current view id. + let current_document_view_id = current_document.unwrap().view_id().to_owned(); + + // Reduce a historic view of an existing document. + let _ = reduce_task( + node.context.clone(), + TaskInput::DocumentViewId(first_document_view_id.clone()), + ) + .await; + + // Get that view again to check it's in the db. + let document = node + .context + .store + .get_document_by_view_id(&first_document_view_id) + .await + .unwrap(); + assert!(document.is_some()); + + // Now prune dangling views for the document. + let result = node.context.store.prune_document_views(&document_id).await; + assert!(result.is_ok()); + + // Get the first document view again, it should no longer be there. + let document = node + .context + .store + .get_document_by_view_id(&first_document_view_id) + .await + .unwrap(); + assert!(document.is_none()); + + // Get the current view of the document to make sure that wasn't deleted too. + let document = node + .context + .store + .get_document_by_view_id(¤t_document_view_id) + .await + .unwrap(); + assert!(document.is_some()); + }); + } + + #[rstest] + fn does_not_prune_pinned_views( + #[from(populate_store_config)] + #[with(2, 1, 1)] + config: PopulateStoreConfig, + key_pair: KeyPair, + ) { + test_runner(|mut node: TestNode| async move { + // Populate the store and materialize all documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + let first_document_view_id: DocumentViewId = document_id.as_str().parse().unwrap(); + + // Reduce a historic view of an existing document. + let _ = reduce_task( + node.context.clone(), + TaskInput::DocumentViewId(first_document_view_id.clone()), + ) + .await; + + // Add a new document to the store which pins the first view of the above document. + add_schema_and_documents( + &mut node, + "new_schema", + vec![vec![( + "pin_document", + first_document_view_id.clone().into(), + Some(config.schema.id().to_owned()), + )]], + &key_pair, + ) + .await; + + // Now prune dangling views for the document. + let result = node.context.store.prune_document_views(&document_id).await; + assert!(result.is_ok()); + + // Get the first document view, it should still be in the store as it was pinned. + let document = node + .context + .store + .get_document_by_view_id(&first_document_view_id) + .await + .unwrap(); + assert!(document.is_some()); + }); + } }