diff --git a/CHANGELOG.md b/CHANGELOG.md index e367678f7..83ee86c04 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Serve static files from `blobs` directory [#480](https://github.com/p2panda/aquadoggo/pull/480) - Add method to store for pruning document views [#491](https://github.com/p2panda/aquadoggo/pull/491) - Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484) +- Task for automatic garbage collection of unused documents and views [#500](https://github.com/p2panda/aquadoggo/pull/500) ## [0.5.0] diff --git a/aquadoggo/migrations/20220509090252_create-operations.sql b/aquadoggo/migrations/20220509090252_create-operations.sql index d33712c6d..383de95a8 100644 --- a/aquadoggo/migrations/20220509090252_create-operations.sql +++ b/aquadoggo/migrations/20220509090252_create-operations.sql @@ -16,7 +16,7 @@ CREATE TABLE IF NOT EXISTS operation_fields_v1 ( field_type TEXT NOT NULL, value TEXT NULL, list_index INT NOT NULL, - FOREIGN KEY(operation_id) REFERENCES operations_v1(operation_id) + FOREIGN KEY(operation_id) REFERENCES operations_v1(operation_id) ON DELETE CASCADE ); CREATE INDEX idx_operation_fields_v1 ON operation_fields_v1 (operation_id, name); diff --git a/aquadoggo/migrations/20230114140233_alter-documents.sql b/aquadoggo/migrations/20230114140233_alter-documents.sql index ec8e11be6..023475477 100644 --- a/aquadoggo/migrations/20230114140233_alter-documents.sql +++ b/aquadoggo/migrations/20230114140233_alter-documents.sql @@ -1,3 +1,3 @@ -- SPDX-License-Identifier: AGPL-3.0-or-later -ALTER TABLE document_views ADD COLUMN document_id TEXT NOT NULL REFERENCES documents(document_id); \ No newline at end of file +ALTER TABLE document_views ADD COLUMN document_id TEXT NOT NULL REFERENCES documents(document_id) ON DELETE CASCADE; \ No newline at end of file diff --git a/aquadoggo/src/db/errors.rs b/aquadoggo/src/db/errors.rs index ae7b1253c..d76f6b061 100644 --- a/aquadoggo/src/db/errors.rs +++ b/aquadoggo/src/db/errors.rs @@ -17,6 +17,10 @@ pub enum SqlStoreError { /// Error returned from BlobStore. #[error(transparent)] BlobStoreError(#[from] BlobStoreError), + + /// Error returned from `DocumentStore` methods. + #[error(transparent)] + DocumentStorage(#[from] DocumentStorageError), } /// `SchemaStore` errors. diff --git a/aquadoggo/src/db/stores/blob.rs b/aquadoggo/src/db/stores/blob.rs index e413e32e8..1cc491d67 100644 --- a/aquadoggo/src/db/stores/blob.rs +++ b/aquadoggo/src/db/stores/blob.rs @@ -7,8 +7,9 @@ use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::{Schema, SchemaId}; use p2panda_rs::storage_provider::traits::DocumentStore; +use sqlx::{query_scalar, AnyPool}; -use crate::db::errors::BlobStoreError; +use crate::db::errors::{BlobStoreError, SqlStoreError}; use crate::db::query::{Field, Filter, Order, Pagination, Select}; use crate::db::stores::query::{Query, RelationList}; use crate::db::SqlStore; @@ -53,6 +54,109 @@ impl SqlStore { }; document_to_blob_data(self, blob_document).await } + + /// Purge blob data from the node _if_ it is not related to from another document. + pub async fn purge_blob(&self, document_id: &DocumentId) -> Result<(), SqlStoreError> { + // Collect the view id of any existing document views which contain a relation to the blob + // which is the purge target. + let blob_reverse_relations = reverse_relations(&self.pool, document_id, None).await?; + + // If there are no documents referring to the blob then we continue with the purge. + if blob_reverse_relations.is_empty() { + // Collect the document view ids of all pieces this blob has ever referred to in it's + // `pieces` + let blob_piece_ids: Vec = query_scalar( + " + SELECT + operation_fields_v1.value + FROM + operation_fields_v1 + LEFT JOIN + operations_v1 + ON + operations_v1.operation_id = operation_fields_v1.operation_id + WHERE + operations_v1.document_id = $1 + AND + operation_fields_v1.name = 'pieces' + ", + ) + .bind(document_id.to_string()) + .fetch_all(&self.pool) + .await + .map_err(|e| SqlStoreError::Transaction(e.to_string()))?; + + // Purge the blob document itself. + self.purge_document(document_id).await?; + + // Now iterate over each collected blob piece in order to check if they are still + // needed by any other blob document, and if not purge them as well. + for blob_piece_id in blob_piece_ids { + let blob_piece_id: DocumentId = blob_piece_id + .parse() + .expect("Document Id's from the store are valid"); + + // Collect reverse relations for this blob piece. + let blob_piece_reverse_relations = + reverse_relations(&self.pool, &blob_piece_id, Some(SchemaId::Blob(1))).await?; + + // If there are none then purge the blob piece. + if blob_piece_reverse_relations.is_empty() { + self.purge_document(&blob_piece_id).await?; + } + } + } + + Ok(()) + } +} + +/// Helper for getting the document ids of any document which relates to the specified document. +/// +/// Optionally pass in a `SchemaId` to restrict the results to documents of a certain schema. +async fn reverse_relations( + pool: &AnyPool, + document_id: &DocumentId, + schema_id: Option, +) -> Result, SqlStoreError> { + let schema_id_condition = match schema_id { + Some(schema_id) => format!("AND document_views.schema_id = '{}'", schema_id), + None => String::new(), + }; + + query_scalar(&format!( + " + SELECT + document_view_fields.document_view_id + FROM + document_view_fields + LEFT JOIN + operation_fields_v1 + ON + document_view_fields.operation_id = operation_fields_v1.operation_id + AND + document_view_fields.name = operation_fields_v1.name + LEFT JOIN + document_views + ON + document_view_fields.document_view_id = document_views.document_view_id + WHERE + operation_fields_v1.field_type + IN + ('pinned_relation', 'pinned_relation_list', 'relation', 'relation_list') + {schema_id_condition} + AND + operation_fields_v1.value IN ( + SELECT document_views.document_view_id + FROM document_views + WHERE document_views.document_id = $1 + ) OR operation_fields_v1.value = $1 + ", + )) + .bind(document_id.to_string()) + .fetch_all(pool) + .await + .map_err(|e| SqlStoreError::Transaction(e.to_string())) } /// Helper method for validation and parsing a document into blob data. @@ -72,7 +176,7 @@ async fn document_to_blob_data( _ => panic!(), // We should never hit this as we already validated that this is a blob document. }; - // Now collect all exiting pieces for the blob. + // Now collect all existing pieces for the blob. // // We do this using the stores' query method, targeting pieces which are in the relation // list of the blob. @@ -129,46 +233,20 @@ mod tests { use p2panda_rs::identity::KeyPair; use p2panda_rs::schema::SchemaId; use p2panda_rs::test_utils::fixtures::{key_pair, random_document_view_id}; + use p2panda_rs::test_utils::memory_store::helpers::PopulateStoreConfig; use rstest::rstest; use crate::db::errors::BlobStoreError; - use crate::test_utils::{add_document, test_runner, TestNode}; + use crate::test_utils::{ + add_blob, add_document, add_schema_and_documents, assert_query, populate_and_materialize, + populate_store_config, test_runner, update_document, TestNode, + }; #[rstest] fn get_blob(key_pair: KeyPair) { test_runner(|mut node: TestNode| async move { let blob_data = "Hello, World!".to_string(); - - // Publish blob pieces and blob. - let blob_piece_view_id_1 = add_document( - &mut node, - &SchemaId::BlobPiece(1), - vec![("data", blob_data[..5].into())], - &key_pair, - ) - .await; - - let blob_piece_view_id_2 = add_document( - &mut node, - &SchemaId::BlobPiece(1), - vec![("data", blob_data[5..].into())], - &key_pair, - ) - .await; - let blob_view_id = add_document( - &mut node, - &SchemaId::Blob(1), - vec![ - ("length", { blob_data.len() as i64 }.into()), - ("mime_type", "text/plain".into()), - ( - "pieces", - vec![blob_piece_view_id_1, blob_piece_view_id_2].into(), - ), - ], - &key_pair, - ) - .await; + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); @@ -294,4 +372,180 @@ mod tests { ); }) } + + #[rstest] + fn purge_blob(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + let blob_data = "Hello, World!".to_string(); + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; + + // There is one blob and two blob pieces in database. + // + // These are the rows we expect to exist in each table. + assert_query(&node, "SELECT entry_hash FROM entries", 3).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 3).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 6).await; + assert_query(&node, "SELECT log_id FROM logs", 3).await; + assert_query(&node, "SELECT document_id FROM documents", 3).await; + assert_query(&node, "SELECT document_id FROM document_views", 3).await; + assert_query(&node, "SELECT name FROM document_view_fields", 5).await; + + // Purge this blob from the database, we now expect all tables to be empty (except the + // logs table). + let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + let result = node.context.store.purge_blob(&document_id).await; + assert!(result.is_ok(), "{:#?}", result); + assert_query(&node, "SELECT entry_hash FROM entries", 0).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 0).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 0).await; + assert_query(&node, "SELECT log_id FROM logs", 3).await; + assert_query(&node, "SELECT document_id FROM documents", 0).await; + assert_query(&node, "SELECT document_id FROM document_views", 0).await; + assert_query(&node, "SELECT name FROM document_view_fields", 0).await; + + let result = node.context.store.purge_blob(&document_id).await; + + assert!(result.is_ok(), "{:#?}", result) + }) + } + + #[rstest] + fn purge_blob_only_purges_blobs( + #[from(populate_store_config)] + #[with(1, 1, 1)] + config: PopulateStoreConfig, + key_pair: KeyPair, + ) { + test_runner(|mut node: TestNode| async move { + let _ = populate_and_materialize(&mut node, &config).await; + + let blob_data = "Hello, World!".to_string(); + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; + + // There is one blob and two blob pieces in database. + // + // These are the rows we expect to exist in each table. + assert_query(&node, "SELECT entry_hash FROM entries", 4).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 4).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 19).await; + assert_query(&node, "SELECT log_id FROM logs", 4).await; + assert_query(&node, "SELECT document_id FROM documents", 4).await; + assert_query(&node, "SELECT document_id FROM document_views", 4).await; + assert_query(&node, "SELECT name FROM document_view_fields", 15).await; + + let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + let result = node.context.store.purge_blob(&document_id).await; + assert!(result.is_ok(), "{:#?}", result); + assert_query(&node, "SELECT entry_hash FROM entries", 1).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 1).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 13).await; + assert_query(&node, "SELECT log_id FROM logs", 4).await; + assert_query(&node, "SELECT document_id FROM documents", 1).await; + assert_query(&node, "SELECT document_id FROM document_views", 1).await; + assert_query(&node, "SELECT name FROM document_view_fields", 10).await; + + let result = node.context.store.purge_blob(&document_id).await; + + assert!(result.is_ok(), "{:#?}", result) + }) + } + + #[rstest] + fn does_not_purge_blob_if_still_pinned(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + let blob_data = "Hello, World!".to_string(); + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; + + let _ = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_view_id.clone().into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + assert_query(&node, "SELECT entry_hash FROM entries", 6).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 6).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 12).await; + assert_query(&node, "SELECT log_id FROM logs", 6).await; + assert_query(&node, "SELECT document_id FROM documents", 6).await; + assert_query(&node, "SELECT document_id FROM document_views", 6).await; + assert_query(&node, "SELECT name FROM document_view_fields", 11).await; + + // Purge this blob from the database, we now expect all tables to be empty. + let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + let result = node.context.store.purge_blob(&document_id).await; + assert!(result.is_ok(), "{:#?}", result); + assert_query(&node, "SELECT entry_hash FROM entries", 6).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 6).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 12).await; + assert_query(&node, "SELECT log_id FROM logs", 6).await; + assert_query(&node, "SELECT document_id FROM documents", 6).await; + assert_query(&node, "SELECT document_id FROM document_views", 6).await; + assert_query(&node, "SELECT name FROM document_view_fields", 11).await; + + let result = node.context.store.purge_blob(&document_id).await; + + assert!(result.is_ok(), "{:#?}", result) + }) + } + + #[rstest] + fn purge_all_pieces_of_updated_blob(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + let blob_data = "Hello, World!".to_string(); + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; + + // Create a new blob piece. + let new_blob_pieces = add_document( + &mut node, + &SchemaId::BlobPiece(1), + vec![("data", "more blob data".into())], + &key_pair, + ) + .await; + + // Update the blob document to point at the new blob piece. + let _ = update_document( + &mut node, + &SchemaId::Blob(1), + vec![("pieces", vec![new_blob_pieces].into())], + &blob_view_id, + &key_pair, + ) + .await; + + // There is one blob and three blob pieces in database. + // + // These are the rows we expect to exist in each table. + assert_query(&node, "SELECT entry_hash FROM entries", 5).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 5).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 8).await; + assert_query(&node, "SELECT log_id FROM logs", 4).await; + assert_query(&node, "SELECT document_id FROM documents", 4).await; + assert_query(&node, "SELECT document_id FROM document_views", 5).await; + assert_query(&node, "SELECT name FROM document_view_fields", 9).await; + + // Purge this blob from the database, we now expect all tables to be empty (except the + // logs table). + let document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + let result = node.context.store.purge_blob(&document_id).await; + assert!(result.is_ok(), "{:#?}", result); + assert_query(&node, "SELECT entry_hash FROM entries", 0).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 0).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 0).await; + assert_query(&node, "SELECT log_id FROM logs", 4).await; + assert_query(&node, "SELECT document_id FROM documents", 0).await; + assert_query(&node, "SELECT document_id FROM document_views", 0).await; + assert_query(&node, "SELECT name FROM document_view_fields", 0).await; + + let result = node.context.store.purge_blob(&document_id).await; + + assert!(result.is_ok(), "{:#?}", result) + }) + } } diff --git a/aquadoggo/src/db/stores/document.rs b/aquadoggo/src/db/stores/document.rs index 22db8423b..6aa16d3e0 100644 --- a/aquadoggo/src/db/stores/document.rs +++ b/aquadoggo/src/db/stores/document.rs @@ -30,6 +30,7 @@ //! retained, we use a system of "pinned relations" to identify and materialise only views we //! explicitly wish to keep. use async_trait::async_trait; +use log::debug; use p2panda_rs::document::traits::AsDocument; use p2panda_rs::document::{DocumentId, DocumentView, DocumentViewId}; use p2panda_rs::schema::SchemaId; @@ -366,73 +367,215 @@ impl SqlStore { .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string())) } - /// Iterate over all views of a document and delete any which: - /// - are not the current view - /// - _and_ no document field exists in the database which contains a pinned relation to this view - #[allow(dead_code)] - async fn prune_document_views( + /// Get the ids for all document views for a document which are currently materialized to the store. + pub async fn get_all_document_view_ids( &self, document_id: &DocumentId, - ) -> Result<(), DocumentStorageError> { - // Start a transaction, any db insertions after this point, and before the `commit()` - // will be rolled back in the event of an error. - let mut tx = self - .pool - .begin() - .await - .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; - - // Collect all views _except_ the current view for this document + ) -> Result, DocumentStorageError> { let document_view_ids: Vec = query_scalar( " SELECT - document_views.document_view_id, - documents.document_view_id + document_views.document_view_id FROM document_views - LEFT JOIN - documents - ON - documents.document_view_id = document_views.document_view_id WHERE document_views.document_id = $1 - AND - documents.document_view_id IS NULL ", ) .bind(document_id.as_str()) - .fetch_all(&mut tx) + .fetch_all(&self.pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + + Ok(document_view_ids + .iter() + .map(|document_id_str| { + document_id_str + .parse::() + .expect("Document Id's coming from the store should be valid") + }) + .collect()) + } + + /// Get the ids of all documents which are related to from another document view. + pub async fn get_child_document_ids( + &self, + document_view_id: &DocumentViewId, + ) -> Result, DocumentStorageError> { + let document_view_ids: Vec = query_scalar( + " + SELECT DISTINCT + document_views.document_id + FROM + document_views + WHERE + document_views.document_view_id + IN ( + SELECT + operation_fields_v1.value + FROM + document_view_fields + LEFT JOIN + operation_fields_v1 + ON + document_view_fields.operation_id = operation_fields_v1.operation_id + AND + document_view_fields.name = operation_fields_v1.name + WHERE + operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list') + AND + document_view_fields.document_view_id = $1 + ) + ", + ) + .bind(document_view_id.to_string()) + .fetch_all(&self.pool) .await .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; - // Iterate over all document views and delete them if no document field exists in the - // database which contains a pinned relation to this view. - // - // Deletes on "document_views" cascade to "document_view_fields" so rows there are also removed - // from the database. - for document_view_id in document_view_ids { - query( + Ok(document_view_ids + .iter() + .map(|document_id_str| { + document_id_str + .parse::() + .expect("Document Id's coming from the store should be valid") + }) + .collect()) + } + + /// Attempt to remove a document view from the store. Returns a boolean which indicates if the + /// removal took place. + /// + /// This operations only succeeds if the view is "dangling", meaning no other document view + /// exists which relates to this view, AND it is not the current view of any document. + pub async fn prune_document_view( + &self, + document_view_id: &DocumentViewId, + ) -> Result { + // Attempt to delete the view. If it is pinned from an existing view, or it is the current + // view of a document, the deletion will not go ahead. + let result = query( " DELETE FROM document_views WHERE document_views.document_view_id = $1 AND NOT EXISTS ( - SELECT * FROM operation_fields_v1 + SELECT + document_view_fields.document_view_id + FROM + document_view_fields + LEFT JOIN + operation_fields_v1 + ON + document_view_fields.operation_id = operation_fields_v1.operation_id + AND + document_view_fields.name = operation_fields_v1.name WHERE operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list') AND operation_fields_v1.value = $1 ) - ", + AND NOT EXISTS ( + SELECT documents.document_id FROM documents + WHERE documents.document_view_id = $1 + ) + " ) - .bind(document_view_id) - .execute(&mut tx) + .bind(document_view_id.to_string()) + .execute(&self.pool) .await .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + + // If any rows were affected the deletion went ahead. + if result.rows_affected() > 0 { + debug!("Deleted view: {}", document_view_id); + Ok(true) + } else { + debug!("Did not delete view: {}", document_view_id); + Ok(false) } + } - // Commit the tx here as no errors occurred. + /// Check if this view is the current view of it's document. + pub async fn is_current_view( + &self, + document_view_id: &DocumentViewId, + ) -> Result { + let document_view_id: Option = query_scalar( + " + SELECT documents.document_view_id FROM documents + WHERE documents.document_view_id = $1 + ", + ) + .bind(document_view_id.to_string()) + .fetch_optional(&self.pool) + .await + .map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?; + + Ok(document_view_id.is_some()) + } + + /// Purge a document from the store by it's id. + /// + /// This removes entries, operations and any materialized documents which exist. + /// + /// The only unaffected table after deletion is the `logs` table as we still want to remember + /// which log ids an author has already used so we can continue to avoid collisions. + pub async fn purge_document( + &self, + document_id: &DocumentId, + ) -> Result<(), DocumentStorageError> { + // Start a transaction, any db insertions after this point, and before the `commit()` + // will be rolled back in the event of an error. + let mut tx = self + .pool + .begin() + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + // Delete rows from `documents` table, this cascades up to `document_views` and + // `document_view_fields` tables. + query( + " + DELETE FROM documents + WHERE documents.document_id = $1 + ", + ) + .bind(document_id.to_string()) + .fetch_all(&mut tx) + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + // Delete rows from `entries` table. + query( + " + DELETE FROM entries + WHERE entries.entry_hash IN ( + SELECT operations_v1.operation_id FROM operations_v1 + WHERE operations_v1.document_id = $1 + ) + ", + ) + .bind(document_id.to_string()) + .fetch_all(&mut tx) + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + // Delete rows from `operations_v1` table, this cascades up to `operation_fields_v1` table + // as well. + query( + " + DELETE FROM operations_v1 + WHERE operations_v1.document_id = $1 + ", + ) + .bind(document_id.to_string()) + .fetch_all(&mut tx) + .await + .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; + + // Commit the transaction if all queries succeeded. tx.commit() .await .map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?; @@ -465,16 +608,18 @@ async fn get_document_view_field_rows( operation_fields_v1.list_index, operation_fields_v1.field_type, operation_fields_v1.value - FROM + FROM document_view_fields - LEFT JOIN document_views - ON - document_view_fields.document_view_id = document_views.document_view_id - LEFT JOIN operation_fields_v1 - ON - document_view_fields.operation_id = operation_fields_v1.operation_id - AND - document_view_fields.name = operation_fields_v1.name + LEFT JOIN + operation_fields_v1 + ON + document_view_fields.operation_id = operation_fields_v1.operation_id + AND + document_view_fields.name = operation_fields_v1.name + LEFT JOIN + document_views + ON + document_view_fields.document_view_id = document_views.document_view_id WHERE document_view_fields.document_view_id = $1 ORDER BY @@ -603,9 +748,11 @@ async fn insert_document( #[cfg(test)] mod tests { + use p2panda_rs::api::next_args; use p2panda_rs::document::materialization::build_graph; use p2panda_rs::document::traits::AsDocument; use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId}; + use p2panda_rs::entry::{LogId, SeqNum}; use p2panda_rs::identity::KeyPair; use p2panda_rs::operation::traits::AsOperation; use p2panda_rs::operation::{Operation, OperationId}; @@ -622,8 +769,8 @@ mod tests { use crate::materializer::tasks::reduce_task; use crate::materializer::TaskInput; use crate::test_utils::{ - add_schema_and_documents, build_document, populate_and_materialize, populate_store_config, - test_runner, TestNode, + add_schema_and_documents, assert_query, build_document, populate_and_materialize, + populate_store_config, test_runner, TestNode, }; #[rstest] @@ -1008,7 +1155,7 @@ mod tests { } #[rstest] - fn prunes_document_views( + fn prunes_document_view( #[from(populate_store_config)] #[with(2, 1, 1)] config: PopulateStoreConfig, @@ -1041,9 +1188,15 @@ mod tests { .unwrap(); assert!(document.is_some()); - // Now prune dangling views for the document. - let result = node.context.store.prune_document_views(&document_id).await; + // Prune the first document view. + let result = node + .context + .store + .prune_document_view(&first_document_view_id) + .await; assert!(result.is_ok()); + // Returns `true` when pruning succeeded. + assert!(result.unwrap()); // Get the first document view again, it should no longer be there. let document = node @@ -1098,9 +1251,15 @@ mod tests { ) .await; - // Now prune dangling views for the document. - let result = node.context.store.prune_document_views(&document_id).await; + // Attempt to prune the first document view. + let result = node + .context + .store + .prune_document_view(&first_document_view_id) + .await; assert!(result.is_ok()); + // Returns `false` when pruning failed. + assert!(!result.unwrap()); // Get the first document view, it should still be in the store as it was pinned. let document = node @@ -1112,4 +1271,139 @@ mod tests { assert!(document.is_some()); }); } + + #[rstest] + fn does_not_prune_current_view( + #[from(populate_store_config)] + #[with(1, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(|mut node: TestNode| async move { + // Populate the store and materialize all documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + let current_document_view_id: DocumentViewId = document_id.as_str().parse().unwrap(); + + // Attempt to prune the current document view. + let result = node + .context + .store + .prune_document_view(¤t_document_view_id) + .await; + assert!(result.is_ok()); + // Returns `false` when pruning failed. + assert!(!result.unwrap()); + + // Get the current document view, it should still be in the store. + let document = node + .context + .store + .get_document_by_view_id(¤t_document_view_id) + .await + .unwrap(); + assert!(document.is_some()); + }); + } + + #[rstest] + fn purge_document( + #[from(populate_store_config)] + #[with(2, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(|mut node: TestNode| async move { + // Populate the store and materialize all documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + + // There is one document in the database which contains an CREATE and UPDATE operation + // which were both published by the same author. These are the number of rows we + // expect for each table. + assert_query(&node, "SELECT entry_hash FROM entries", 2).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 2).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 26).await; + assert_query(&node, "SELECT log_id FROM logs", 1).await; + assert_query(&node, "SELECT document_id FROM documents", 1).await; + assert_query(&node, "SELECT document_id FROM document_views", 1).await; + assert_query(&node, "SELECT name FROM document_view_fields", 10).await; + + // Purge this document from the database, we now expect all tables to be empty. + let result = node.context.store.purge_document(&document_id).await; + assert!(result.is_ok(), "{:#?}", result); + assert_query(&node, "SELECT entry_hash FROM entries", 0).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 0).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 0).await; + assert_query(&node, "SELECT log_id FROM logs", 1).await; + assert_query(&node, "SELECT document_id FROM documents", 0).await; + assert_query(&node, "SELECT document_id FROM document_views", 0).await; + assert_query(&node, "SELECT name FROM document_view_fields", 0).await; + }); + } + + #[rstest] + fn purging_only_effects_target_document( + #[from(populate_store_config)] + #[with(1, 2, 1)] + config: PopulateStoreConfig, + ) { + test_runner(|mut node: TestNode| async move { + // Populate the store and materialize all documents. + let (_, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + + // There are two documents in the database which each contain a single CREATE operation + // and they were published by the same author. These are the number of rows we expect + // for each table. + assert_query(&node, "SELECT entry_hash FROM entries", 2).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 2).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 26).await; + assert_query(&node, "SELECT log_id FROM logs", 2).await; + assert_query(&node, "SELECT document_id FROM documents", 2).await; + assert_query(&node, "SELECT document_id FROM document_views", 2).await; + assert_query(&node, "SELECT name FROM document_view_fields", 20).await; + + // Purge one document from the database, we now expect half the rows to be remaining. + let result = node.context.store.purge_document(&document_id).await; + assert!(result.is_ok(), "{:#?}", result); + + assert_query(&node, "SELECT entry_hash FROM entries", 1).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 1).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 13).await; + assert_query(&node, "SELECT log_id FROM logs", 2).await; + assert_query(&node, "SELECT document_id FROM documents", 1).await; + assert_query(&node, "SELECT document_id FROM document_views", 1).await; + assert_query(&node, "SELECT name FROM document_view_fields", 10).await; + }); + } + + #[rstest] + fn next_args_after_purge( + #[from(populate_store_config)] + #[with(2, 1, 1)] + config: PopulateStoreConfig, + ) { + test_runner(|mut node: TestNode| async move { + // Populate the store and materialize all documents. + let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await; + let document_id = document_ids[0].clone(); + let public_key = key_pairs[0].public_key(); + + let _ = node.context.store.purge_document(&document_id).await; + + let result = next_args( + &node.context.store, + &public_key, + Some(&document_id.as_str().parse().unwrap()), + ) + .await; + println!("{:#?}", result); + assert!(result.is_err()); + + let result = next_args(&node.context.store, &public_key, None).await; + + assert!(result.is_ok()); + let next_args = result.unwrap(); + assert_eq!(next_args, (None, None, SeqNum::default(), LogId::new(1))); + }); + } } diff --git a/aquadoggo/src/materializer/service.rs b/aquadoggo/src/materializer/service.rs index 7465ab0b0..7b1ebb45c 100644 --- a/aquadoggo/src/materializer/service.rs +++ b/aquadoggo/src/materializer/service.rs @@ -8,7 +8,9 @@ use tokio::task; use crate::bus::{ServiceMessage, ServiceSender}; use crate::context::Context; use crate::manager::{ServiceReadySender, Shutdown}; -use crate::materializer::tasks::{blob_task, dependency_task, reduce_task, schema_task}; +use crate::materializer::tasks::{ + blob_task, dependency_task, garbage_collection_task, reduce_task, schema_task, +}; use crate::materializer::worker::{Factory, Task, TaskStatus}; use crate::materializer::TaskInput; @@ -39,6 +41,7 @@ pub async fn materializer_service( factory.register("dependency", pool_size, dependency_task); factory.register("schema", pool_size, schema_task); factory.register("blob", pool_size, blob_task); + factory.register("garbage_collection", pool_size, garbage_collection_task); // Get a listener for error signal from factory let on_error = factory.on_error(); diff --git a/aquadoggo/src/materializer/tasks/dependency.rs b/aquadoggo/src/materializer/tasks/dependency.rs index 02effd0c5..83c3d58d4 100644 --- a/aquadoggo/src/materializer/tasks/dependency.rs +++ b/aquadoggo/src/materializer/tasks/dependency.rs @@ -995,8 +995,9 @@ mod tests { .await .unwrap() .expect("Should have returned new tasks"); - assert_eq!(tasks.len(), 1); - assert_eq!(tasks[0].worker_name(), &String::from("dependency")); + assert_eq!(tasks.len(), 2); + assert_eq!(tasks[0].worker_name(), &String::from("garbage_collection")); + assert_eq!(tasks[1].worker_name(), &String::from("dependency")); // We should have now a materialized latest post and comment document but not the // pinned historical version of the post, where the comment was pointing at! @@ -1026,7 +1027,7 @@ mod tests { // 2. The "dependency" task followed materialising the "post" found a reverse relation // to a "comment" document .. it dispatches another "dependency" task for it - let tasks = dependency_task(node_b.context.clone(), tasks[0].input().clone()) + let tasks = dependency_task(node_b.context.clone(), tasks[1].input().clone()) .await .unwrap(); assert_eq!( diff --git a/aquadoggo/src/materializer/tasks/garbage_collection.rs b/aquadoggo/src/materializer/tasks/garbage_collection.rs new file mode 100644 index 000000000..36963b3a7 --- /dev/null +++ b/aquadoggo/src/materializer/tasks/garbage_collection.rs @@ -0,0 +1,650 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use log::debug; +use p2panda_rs::document::DocumentViewId; +use p2panda_rs::operation::traits::AsOperation; +use p2panda_rs::schema::SchemaId; +use p2panda_rs::storage_provider::traits::OperationStore; +use p2panda_rs::Human; + +use crate::context::Context; +use crate::materializer::worker::{TaskError, TaskResult}; +use crate::materializer::{Task, TaskInput}; + +pub async fn garbage_collection_task(context: Context, input: TaskInput) -> TaskResult { + debug!("Working on {}", input); + + match input { + TaskInput::DocumentId(document_id) => { + // This task is concerned with a document which may now have dangling views. We want + // to check for this and delete any views which are no longer needed. + debug!( + "Prune document views for document: {}", + document_id.display() + ); + + // Collect the ids of all views for this document. + let all_document_view_ids: Vec = context + .store + .get_all_document_view_ids(&document_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))?; + + // Iterate over all document views and delete them if no document view exists which refers + // to it in a pinned relation field AND they are not the current view of a document. + // + // Deletes on "document_views" cascade to "document_view_fields" so rows there are also removed + // from the database. + let mut all_effected_child_relations = vec![]; + let mut deleted_views_count = 0; + for document_view_id in &all_document_view_ids { + // Check if this is the current view of it's document. This will still return true + // if the document in question is deleted. + let is_current_view = context + .store + .is_current_view(document_view_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))?; + + let mut effected_child_relations = vec![]; + let mut view_deleted = false; + + if !is_current_view { + // Before attempting to delete this view we need to fetch the ids of any child documents + // which might have views that could become unpinned as a result of this delete. These + // will be returned if the deletion is successful. + effected_child_relations = context + .store + .get_child_document_ids(document_view_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))?; + + // Attempt to delete the view. If it is pinned from an existing view the deletion will + // not go ahead. + view_deleted = context + .store + .prune_document_view(document_view_id) + .await + .map_err(|err| TaskError::Critical(err.to_string()))?; + } + + // If the view was deleted then push the effected children to the return array + if view_deleted { + debug!("Deleted view: {}", document_view_id); + deleted_views_count += 1; + all_effected_child_relations.extend(effected_child_relations); + } else { + debug!("Did not delete view: {}", document_view_id); + } + } + + // If the number of deleted views equals the total existing views (minus one for the + // current view), then there is a chance this became completely detached. In this case + // we should check if this document is a blob document and then try to purge it. + if all_document_view_ids.len() - 1 == deleted_views_count { + let operation = context + .store + .get_operation(&document_id.as_str().parse().unwrap()) + .await + .map_err(|err| TaskError::Failure(err.to_string()))? + .expect("Operation exists in store"); + + if let SchemaId::Blob(_) = operation.schema_id() { + // Purge the blob and all it's pieces. This only succeeds if no document + // refers to the blob document by either a relation or pinned relation. + context + .store + .purge_blob(&document_id) + .await + .map_err(|err| TaskError::Failure(err.to_string()))?; + } + } + + // We compose some more prune tasks based on the effected documents returned above. + let next_tasks: Vec> = all_effected_child_relations + .iter() + .map(|document_id| { + debug!("Issue prune task for document: {document_id:#?}"); + Task::new( + "garbage_collection", + TaskInput::DocumentId(document_id.to_owned()), + ) + }) + .collect(); + + if next_tasks.is_empty() { + Ok(None) + } else { + Ok(Some(next_tasks)) + } + } + _ => Err(TaskError::Critical("Invalid task input".into())), + } +} + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentId; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::schema::SchemaId; + use p2panda_rs::storage_provider::traits::DocumentStore; + use p2panda_rs::test_utils::fixtures::{key_pair, random_document_view_id}; + use rstest::rstest; + + use crate::materializer::tasks::garbage_collection_task; + use crate::materializer::{Task, TaskInput}; + use crate::test_utils::{ + add_blob, add_schema_and_documents, assert_query, test_runner, update_document, TestNode, + }; + + #[rstest] + fn e2e_pruning(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Publish some documents which we will later point relations at. + let (child_schema, child_document_view_ids) = add_schema_and_documents( + &mut node, + "schema_for_child", + vec![ + vec![("uninteresting_field", 1.into(), None)], + vec![("uninteresting_field", 2.into(), None)], + ], + &key_pair, + ) + .await; + + // Create some parent documents which contain a pinned relation list pointing to the + // children created above. + let (parent_schema, parent_document_view_ids) = add_schema_and_documents( + &mut node, + "schema_for_parent", + vec![vec![ + ("name", "parent".into(), None), + ( + "children", + child_document_view_ids.clone().into(), + Some(child_schema.id().to_owned()), + ), + ]], + &key_pair, + ) + .await; + + // Convert view id to document id. + let parent_document_id: DocumentId = parent_document_view_ids[0] + .clone() + .to_string() + .parse() + .unwrap(); + + // Update the parent document so that there are now two views stored in the db, one + // current and one dangling. + let updated_parent_view_id = update_document( + &mut node, + parent_schema.id(), + vec![("name", "Parent".into())], + &parent_document_view_ids[0], + &key_pair, + ) + .await; + + // Get the historic (dangling) view to check it's actually there. + let historic_document_view = node + .context + .store + .get_document_by_view_id(&parent_document_view_ids[0].clone()) + .await + .unwrap(); + + // It is there... + assert!(historic_document_view.is_some()); + + // Create another document, which has a pinned relation to the parent document created + // above. Now the relation graph looks like this + // + // GrandParent --> Parent --> Child1 + // \ + // --> Child2 + // + let (schema_for_grand_parent, grand_parent_document_view_ids) = + add_schema_and_documents( + &mut node, + "schema_for_grand_parent", + vec![vec![ + ("name", "grand parent".into(), None), + ( + "child", + parent_document_view_ids[0].clone().into(), + Some(parent_schema.id().to_owned()), + ), + ]], + &key_pair, + ) + .await; + + // Convert view id to document id. + let grand_parent_document_id: DocumentId = grand_parent_document_view_ids[0] + .clone() + .to_string() + .parse() + .unwrap(); + + // Update the grand parent document to a new view, leaving the previous one dangling. + // + // Note: this test method _does not_ dispatch "garbage_collection" tasks. + update_document( + &mut node, + schema_for_grand_parent.id(), + vec![ + ("name", "Grand Parent".into()), + ("child", updated_parent_view_id.into()), + ], + &grand_parent_document_view_ids[0], + &key_pair, + ) + .await; + + // Get the historic (dangling) view to make sure it exists. + let historic_document_view = node + .context + .store + .get_document_by_view_id(&grand_parent_document_view_ids[0].clone()) + .await + .unwrap(); + + // It does... + assert!(historic_document_view.is_some()); + + // Now prune dangling views for the grand parent document. This method deletes any + // dangling views (not pinned, not current) from the database for this document. It + // returns the document ids of any documents which may have views which have become + // "un-pinned" as a result of this view being removed. In this case, that's the + // document id of the "parent" document. + let next_tasks = garbage_collection_task( + node.context.clone(), + TaskInput::DocumentId(grand_parent_document_id), + ) + .await + .unwrap() + .unwrap(); + + // One new prune task is issued. + assert_eq!(next_tasks.len(), 1); + // It is the parent (which this grand parent relates to) as we expect. + assert_eq!( + next_tasks[0], + Task::new( + "garbage_collection", + TaskInput::DocumentId(parent_document_id) + ) + ); + + // Check the historic view has been deleted. + let historic_document_view = node + .context + .store + .get_document_by_view_id(&grand_parent_document_view_ids[0].clone()) + .await + .unwrap(); + + // It has... + assert!(historic_document_view.is_none()); + + // Now prune dangling views for the parent document. + let next_tasks = + garbage_collection_task(node.context.clone(), next_tasks[0].input().to_owned()) + .await + .unwrap() + .unwrap(); + + // Two new prune tasks issued. + assert_eq!(next_tasks.len(), 2); + // These are the two final child documents. + assert_eq!( + next_tasks, + child_document_view_ids + .iter() + .rev() + .map(|document_view_id| { + let document_id: DocumentId = document_view_id.to_string().parse().unwrap(); + Task::new("garbage_collection", TaskInput::DocumentId(document_id)) + }) + .collect::>>() + ); + + // Check the historic view has been deleted. + let historic_document_view = node + .context + .store + .get_document_by_view_id(&parent_document_view_ids[0].clone()) + .await + .unwrap(); + + // It has. + assert!(historic_document_view.is_none()); + + // Running the child tasks returns no new tasks. + let next_tasks = + garbage_collection_task(node.context.clone(), next_tasks[0].input().to_owned()) + .await + .unwrap(); + + assert!(next_tasks.is_none()); + }); + } + + #[rstest] + fn no_new_tasks_issued_when_no_views_pruned(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Create a child document. + let (child_schema, child_document_view_ids) = add_schema_and_documents( + &mut node, + "schema_for_child", + vec![vec![("uninteresting_field", 1.into(), None)]], + &key_pair, + ) + .await; + + // Create a parent document which contains a pinned relation list pointing to the + // child created above. + let (_, parent_document_view_ids) = add_schema_and_documents( + &mut node, + "schema_for_parent", + vec![vec![ + ("name", "parent".into(), None), + ( + "children", + child_document_view_ids.clone().into(), + Some(child_schema.id().to_owned()), + ), + ]], + &key_pair, + ) + .await; + + // Run a garbage collection task for the parent. + let document_id: DocumentId = parent_document_view_ids[0].to_string().parse().unwrap(); + let next_tasks = + garbage_collection_task(node.context.clone(), TaskInput::DocumentId(document_id)) + .await + .unwrap(); + + // No views were pruned so we expect no new tasks to be issued. + assert!(next_tasks.is_none()); + }) + } + + #[rstest] + fn purges_blobs(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Publish a blob. + let blob_document_view = add_blob(&mut node, "Hello World!", &key_pair).await; + let blob_document_id: DocumentId = blob_document_view.to_string().parse().unwrap(); + + // Check the blob is there. + let blob = node + .context + .store + .get_blob(&blob_document_id) + .await + .unwrap(); + assert!(blob.is_some()); + + // Run a garbage collection task for the blob document. + let next_tasks = garbage_collection_task( + node.context.clone(), + TaskInput::DocumentId(blob_document_id.clone()), + ) + .await + .unwrap(); + + // It shouldn't return any new tasks. + assert!(next_tasks.is_none()); + + // The blob should no longer be available. + let blob = node + .context + .store + .get_blob(&blob_document_id) + .await + .unwrap(); + assert!(blob.is_none()); + + // And all expected rows deleted from the database. + assert_query(&node, "SELECT entry_hash FROM entries", 0).await; + assert_query(&node, "SELECT operation_id FROM operations_v1", 0).await; + assert_query(&node, "SELECT operation_id FROM operation_fields_v1", 0).await; + assert_query(&node, "SELECT log_id FROM logs", 3).await; + assert_query(&node, "SELECT document_id FROM documents", 0).await; + assert_query(&node, "SELECT document_id FROM document_views", 0).await; + assert_query(&node, "SELECT name FROM document_view_fields", 0).await; + }); + } + + #[rstest] + fn purges_newly_detached_blobs(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Create a blob document. + let blob_data = "Hello, World!".to_string(); + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + + // Relate to the blob from a new document. + let (schema, documents_pinning_blob) = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_view_id.clone().into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + // Now update the document to relate to another blob. This means the previously + // created blob is now "dangling". + update_document( + &mut node, + schema.id(), + vec![("blob", random_document_view_id().into())], + &documents_pinning_blob[0].clone(), + &key_pair, + ) + .await; + + // Run a task for the parent document. + let document_id: DocumentId = documents_pinning_blob[0].to_string().parse().unwrap(); + let next_tasks = + garbage_collection_task(node.context.clone(), TaskInput::DocumentId(document_id)) + .await + .unwrap() + .unwrap(); + + // It issues one new task which is for the blob document. + assert_eq!(next_tasks.len(), 1); + let next_tasks = + garbage_collection_task(node.context.clone(), next_tasks[0].input().to_owned()) + .await + .unwrap(); + // No new tasks issued. + assert!(next_tasks.is_none()); + + // The blob has correctly been purged. + let blob = node + .context + .store + .get_blob(&blob_document_id) + .await + .unwrap(); + + assert!(blob.is_none()); + }) + } + + #[rstest] + fn other_documents_keep_blob_alive(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Create a blob document. + let blob_data = "Hello, World!".to_string(); + let blob_view_id = add_blob(&mut node, &blob_data, &key_pair).await; + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + + // Relate to the blob from a new document. + let (schema, documents_pinning_blob) = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_view_id.clone().into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + // Now update the document to relate to another blob. This means the previously + // created blob is now "dangling". + update_document( + &mut node, + schema.id(), + vec![("blob", random_document_view_id().into())], + &documents_pinning_blob[0].clone(), + &key_pair, + ) + .await; + + // Another document relating to the blob (this time from in a relation field). + let _ = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_document_id.clone().into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + // Run a task for the parent document. + let document_id: DocumentId = documents_pinning_blob[0].to_string().parse().unwrap(); + let next_tasks = + garbage_collection_task(node.context.clone(), TaskInput::DocumentId(document_id)) + .await + .unwrap() + .unwrap(); + + // It issues one new task which is for the blob document. + assert_eq!(next_tasks.len(), 1); + let next_tasks = + garbage_collection_task(node.context.clone(), next_tasks[0].input().to_owned()) + .await + .unwrap(); + // No new tasks issued. + assert!(next_tasks.is_none()); + + // The blob should still be there as it was kept alive by a different document. + let blob = node + .context + .store + .get_blob(&blob_document_id) + .await + .unwrap(); + + assert!(blob.is_some()); + }) + } + + #[rstest] + fn all_relation_types_keep_blobs_alive(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + let blob_data = "Hello, World!".to_string(); + + // Any type of relation can keep a blob alive, here we create one of each and run + // garbage collection tasks for each blob. + + let blob_view_id_1 = add_blob(&mut node, &blob_data, &key_pair).await; + let _ = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_view_id_1.clone().into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + let blob_view_id_2 = add_blob(&mut node, &blob_data, &key_pair).await; + let _ = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + vec![blob_view_id_2.clone()].into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + let blob_view_id_3 = add_blob(&mut node, &blob_data, &key_pair).await; + let _ = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + blob_view_id_3 + .to_string() + .parse::() + .unwrap() + .into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + let blob_view_id_4 = add_blob(&mut node, &blob_data, &key_pair).await; + let _ = add_schema_and_documents( + &mut node, + "img", + vec![vec![( + "blob", + vec![blob_view_id_4.to_string().parse::().unwrap()].into(), + Some(SchemaId::Blob(1)), + )]], + &key_pair, + ) + .await; + + for blob_view_id in [ + blob_view_id_1, + blob_view_id_2, + blob_view_id_3, + blob_view_id_4, + ] { + let blob_document_id: DocumentId = blob_view_id.to_string().parse().unwrap(); + let next_tasks = garbage_collection_task( + node.context.clone(), + TaskInput::DocumentId(blob_document_id.clone()), + ) + .await + .unwrap(); + + assert!(next_tasks.is_none()); + + // All blobs should be kept alive. + let blob = node + .context + .store + .get_blob(&blob_document_id) + .await + .unwrap(); + + assert!(blob.is_some()); + } + }) + } +} diff --git a/aquadoggo/src/materializer/tasks/mod.rs b/aquadoggo/src/materializer/tasks/mod.rs index 674fab8f5..4f53e1bec 100644 --- a/aquadoggo/src/materializer/tasks/mod.rs +++ b/aquadoggo/src/materializer/tasks/mod.rs @@ -2,10 +2,12 @@ mod blob; mod dependency; +mod garbage_collection; mod reduce; mod schema; pub use blob::blob_task; pub use dependency::dependency_task; +pub use garbage_collection::garbage_collection_task; pub use reduce::reduce_task; pub use schema::schema_task; diff --git a/aquadoggo/src/materializer/tasks/reduce.rs b/aquadoggo/src/materializer/tasks/reduce.rs index 1e3a4b408..f24aeb3c5 100644 --- a/aquadoggo/src/materializer/tasks/reduce.rs +++ b/aquadoggo/src/materializer/tasks/reduce.rs @@ -231,6 +231,8 @@ async fn reduce_document + WithPublicKey>( .await .map_err(|err| TaskError::Critical(err.to_string()))?; + let mut tasks = vec![]; + // If the document was deleted, then we return nothing if document.is_deleted() { debug!( @@ -238,7 +240,6 @@ async fn reduce_document + WithPublicKey>( document.display(), document.view_id().display() ); - return Ok(None); } if document.is_edited() { @@ -251,14 +252,31 @@ async fn reduce_document + WithPublicKey>( debug!("Created {}", document.display()); }; - debug!( - "Dispatch dependency task for view with id: {}", - document.view_id() - ); - Ok(Some(vec![Task::new( - "dependency", - TaskInput::DocumentViewId(document.view_id().to_owned()), - )])) + if document.is_deleted() || document.is_edited() { + debug!( + "Dispatch prune task for document with id: {}", + document.id() + ); + + tasks.push(Task::new( + "garbage_collection", + TaskInput::DocumentId(document.id().to_owned()), + )) + } + + if !document.is_deleted() { + debug!( + "Dispatch dependency task for view with id: {}", + document.view_id() + ); + + tasks.push(Task::new( + "dependency", + TaskInput::DocumentViewId(document.view_id().to_owned()), + )); + } + + Ok(Some(tasks)) } Err(err) => { // There is not enough operations yet to materialise this view. Maybe next time! @@ -500,7 +518,7 @@ mod tests { for document_id in &document_ids { let input = TaskInput::DocumentId(document_id.clone()); let tasks = reduce_task(node.context.clone(), input).await.unwrap(); - assert!(tasks.is_none()); + assert_eq!(tasks.unwrap().len(), 1); } for document_id in &document_ids { @@ -527,16 +545,16 @@ mod tests { #[rstest] #[case( populate_store_config(3, 1, 1, false, doggo_schema(), doggo_fields(), doggo_fields()), - true + vec!["garbage_collection".to_string(), "dependency".to_string()] )] // This document is deleted, it shouldn't spawn a dependency task. #[case( populate_store_config(3, 1, 1, true, doggo_schema(), doggo_fields(), doggo_fields()), - false + vec!["garbage_collection".to_string()] )] - fn returns_dependency_task_inputs( + fn returns_correct_dependency_and_prune_tasks( #[case] config: PopulateStoreConfig, - #[case] is_next_task: bool, + #[case] expected_worker_names: Vec, ) { test_runner(move |node: TestNode| async move { // Populate the store with some entries and operations but DON'T materialise any @@ -547,9 +565,16 @@ mod tests { .expect("There should be at least one document id"); let input = TaskInput::DocumentId(document_id.clone()); - let next_task_inputs = reduce_task(node.context.clone(), input).await.unwrap(); + let next_tasks = reduce_task(node.context.clone(), input) + .await + .expect("Ok result") + .expect("Some tasks returned"); - assert_eq!(next_task_inputs.is_some(), is_next_task); + assert_eq!(next_tasks.len(), expected_worker_names.len()); + + for (index, worker_name) in expected_worker_names.iter().enumerate() { + assert_eq!(next_tasks[index].worker_name(), worker_name); + } }); } diff --git a/aquadoggo/src/test_utils/mod.rs b/aquadoggo/src/test_utils/mod.rs index 19d444074..eb0548ee3 100644 --- a/aquadoggo/src/test_utils/mod.rs +++ b/aquadoggo/src/test_utils/mod.rs @@ -12,7 +12,7 @@ pub use config::TestConfiguration; pub use db::{drop_database, initialize_db, initialize_sqlite_db}; pub use helpers::{build_document, doggo_fields, doggo_schema, schema_from_fields}; pub use node::{ - add_document, add_schema, add_schema_and_documents, populate_and_materialize, - populate_store_config, TestNode, + add_blob, add_document, add_schema, add_schema_and_documents, assert_query, + populate_and_materialize, populate_store_config, update_document, TestNode, }; pub use runner::{test_runner, test_runner_with_manager, TestNodeManager}; diff --git a/aquadoggo/src/test_utils/node.rs b/aquadoggo/src/test_utils/node.rs index 85a05cd43..5c7fae147 100644 --- a/aquadoggo/src/test_utils/node.rs +++ b/aquadoggo/src/test_utils/node.rs @@ -4,12 +4,14 @@ use log::{debug, info}; use p2panda_rs::document::{DocumentId, DocumentViewId}; use p2panda_rs::entry::traits::AsEncodedEntry; use p2panda_rs::identity::KeyPair; -use p2panda_rs::operation::{OperationBuilder, OperationValue}; +use p2panda_rs::operation::{OperationAction, OperationBuilder, OperationId, OperationValue}; use p2panda_rs::schema::{FieldType, Schema, SchemaId, SchemaName}; +use p2panda_rs::storage_provider::traits::OperationStore; use p2panda_rs::test_utils::memory_store::helpers::{ populate_store, send_to_store, PopulateStoreConfig, }; use rstest::fixture; +use sqlx::query_scalar; use crate::context::Context; use crate::db::SqlStore; @@ -97,13 +99,18 @@ pub async fn populate_and_materialize( // Create reduce task input. let input = TaskInput::DocumentId(document_id); // Run reduce task and collect returned dependency tasks. - let dependency_tasks = reduce_task(node.context.clone(), input.clone()) + let next_tasks = reduce_task(node.context.clone(), input.clone()) .await .expect("Reduce document"); // Run dependency tasks. - if let Some(tasks) = dependency_tasks { - for task in tasks { + if let Some(tasks) = next_tasks { + // We only want to issue dependency tasks. + let dependency_tasks = tasks + .iter() + .filter(|task| task.worker_name() == "depenedency"); + + for task in dependency_tasks { dependency_task(node.context.clone(), task.input().to_owned()) .await .expect("Run dependency task"); @@ -145,13 +152,18 @@ pub async fn add_document( .expect("Publish CREATE operation"); let input = TaskInput::DocumentId(DocumentId::from(entry_signed.hash())); - let dependency_tasks = reduce_task(node.context.clone(), input.clone()) + let next_tasks = reduce_task(node.context.clone(), input.clone()) .await .expect("Reduce document"); // Run dependency tasks - if let Some(tasks) = dependency_tasks { - for task in tasks { + if let Some(tasks) = next_tasks { + // We only want to issue dependency tasks. + let dependency_tasks = tasks + .iter() + .filter(|task| task.worker_name() == "depenedency"); + + for task in dependency_tasks { dependency_task(node.context.clone(), task.input().to_owned()) .await .expect("Run dependency task"); @@ -264,3 +276,105 @@ pub async fn add_schema_and_documents( (schema, view_ids) } + +/// Helper method for updating documents. +pub async fn update_document( + node: &mut TestNode, + schema_id: &SchemaId, + fields: Vec<(&str, OperationValue)>, + previous: &DocumentViewId, + key_pair: &KeyPair, +) -> DocumentViewId { + // Get requested schema from store. + let schema = node + .context + .schema_provider + .get(schema_id) + .await + .expect("Schema not found"); + + // Build, publish and reduce an update operation for document. + let create_op = OperationBuilder::new(schema.id()) + .action(OperationAction::Update) + .fields(&fields) + .previous(previous) + .build() + .expect("Build operation"); + + let (entry_signed, _) = send_to_store(&node.context.store, &create_op, &schema, key_pair) + .await + .expect("Publish UPDATE operation"); + + let document_id = node + .context + .store + .get_document_id_by_operation_id(&OperationId::from(entry_signed.hash())) + .await + .expect("No db errors") + .expect("Can get document id"); + + let input = TaskInput::DocumentId(document_id); + let next_tasks = reduce_task(node.context.clone(), input.clone()) + .await + .expect("Reduce document"); + + // Run dependency tasks + if let Some(tasks) = next_tasks { + // We only want to issue dependency tasks. + let dependency_tasks = tasks + .iter() + .filter(|task| task.worker_name() == "dependency"); + + for task in dependency_tasks { + dependency_task(node.context.clone(), task.input().to_owned()) + .await + .expect("Run dependency task"); + } + } + DocumentViewId::from(entry_signed.hash()) +} + +pub async fn add_blob(node: &mut TestNode, blob_data: &str, key_pair: &KeyPair) -> DocumentViewId { + // Publish blob pieces and blob. + let (blob_data_a, blob_data_b) = blob_data.split_at(blob_data.len() / 2); + let blob_piece_view_id_1 = add_document( + node, + &SchemaId::BlobPiece(1), + vec![("data", blob_data_a.into())], + &key_pair, + ) + .await; + + let blob_piece_view_id_2 = add_document( + node, + &SchemaId::BlobPiece(1), + vec![("data", blob_data_b.into())], + &key_pair, + ) + .await; + let blob_view_id = add_document( + node, + &SchemaId::Blob(1), + vec![ + ("length", { blob_data.len() as i64 }.into()), + ("mime_type", "text/plain".into()), + ( + "pieces", + vec![blob_piece_view_id_1, blob_piece_view_id_2].into(), + ), + ], + &key_pair, + ) + .await; + + blob_view_id +} + +// Helper for asserting expected number of items yielded from a SQL query. +pub async fn assert_query(node: &TestNode, sql: &str, expected_len: usize) { + let result: Result, _> = + query_scalar(sql).fetch_all(&node.context.store.pool).await; + + assert!(result.is_ok(), "{:#?}", result); + assert_eq!(result.unwrap().len(), expected_len, "{:?}", sql); +}