Skip to content

Commit

Permalink
Enable deletion of dangling document_views and related `document_vi…
Browse files Browse the repository at this point in the history
…ew_fields` from db (#491)

* Add fk to document_view_fields with cascading DELETE

* Introduce `prune_document_views` method to `DocumentStore`

* Test for pruning document views

* Test that pinned views don't get deleted

* Update CHANGELOG

* Clippy

* Remove fk constraint on `operation_id` in `document_view_fields` table

* Change table creation order in documents migration

* Use IS NULL in SQL conditional

* Don't use alias in SQL query
  • Loading branch information
sandreae committed Aug 9, 2023
1 parent 3b38ff0 commit a6981fd
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Introduce peer sampling to the replication service [#463](https://github.com/p2panda/aquadoggo/pull/463)
- Only replicate and materialize configured "supported schema" [#569](https://github.com/p2panda/aquadoggo/pull/469)
- Parse supported schema ids from `config.toml` [#473](https://github.com/p2panda/aquadoggo/pull/473)
- Add method to store for pruning document views [#491](https://github.com/p2panda/aquadoggo/pull/491)
- Introduce `BlobStore` [#484](https://github.com/p2panda/aquadoggo/pull/484)

### Changed
Expand Down
18 changes: 9 additions & 9 deletions aquadoggo/migrations/20220510022755_create-documents.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
-- SPDX-License-Identifier: AGPL-3.0-or-later

CREATE TABLE IF NOT EXISTS document_view_fields (
document_view_id TEXT NOT NULL,
operation_id TEXT NOT NULL,
name TEXT NOT NULL,
FOREIGN KEY(operation_id) REFERENCES operations_v1(operation_id)
);

CREATE INDEX idx_document_view_fields ON document_view_fields (document_view_id, operation_id, name);

CREATE TABLE IF NOT EXISTS document_views (
document_view_id TEXT NOT NULL UNIQUE,
schema_id TEXT NOT NULL,
PRIMARY KEY (document_view_id)
);

CREATE TABLE IF NOT EXISTS document_view_fields (
document_view_id TEXT NOT NULL,
operation_id TEXT NOT NULL,
name TEXT NOT NULL,
FOREIGN KEY(document_view_id) REFERENCES document_views(document_view_id) ON DELETE CASCADE
);

CREATE INDEX idx_document_view_fields ON document_view_fields (document_view_id, operation_id, name);

CREATE TABLE IF NOT EXISTS documents (
document_id TEXT NOT NULL UNIQUE,
document_view_id TEXT NOT NULL,
Expand Down
188 changes: 186 additions & 2 deletions aquadoggo/src/db/stores/document.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,80 @@ impl SqlStore {
.await
.map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))
}

/// Iterate over all views of a document and delete any which:
/// - are not the current view
/// - _and_ no document field exists in the database which contains a pinned relation to this view
#[allow(dead_code)]
async fn prune_document_views(
&self,
document_id: &DocumentId,
) -> Result<(), DocumentStorageError> {
// Start a transaction, any db insertions after this point, and before the `commit()`
// will be rolled back in the event of an error.
let mut tx = self
.pool
.begin()
.await
.map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?;

// Collect all views _except_ the current view for this document
let document_view_ids: Vec<String> = query_scalar(
"
SELECT
document_views.document_view_id,
documents.document_view_id
FROM
document_views
LEFT JOIN
documents
ON
documents.document_view_id = document_views.document_view_id
WHERE
document_views.document_id = $1
AND
documents.document_view_id IS NULL
",
)
.bind(document_id.as_str())
.fetch_all(&mut tx)
.await
.map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;

// Iterate over all document views and delete them if no document field exists in the
// database which contains a pinned relation to this view.
//
// Deletes on "document_views" cascade to "document_view_fields" so rows there are also removed
// from the database.
for document_view_id in document_view_ids {
query(
"
DELETE FROM
document_views
WHERE
document_views.document_view_id = $1
AND NOT EXISTS (
SELECT * FROM operation_fields_v1
WHERE
operation_fields_v1.field_type IN ('pinned_relation', 'pinned_relation_list')
AND
operation_fields_v1.value = $1
)
",
)
.bind(document_view_id)
.execute(&mut tx)
.await
.map_err(|err| DocumentStorageError::FatalStorageError(err.to_string()))?;
}

// Commit the tx here as no errors occurred.
tx.commit()
.await
.map_err(|e| DocumentStorageError::FatalStorageError(e.to_string()))?;

Ok(())
}
}

// Helper method for getting rows from the `document_view_fields` table.
Expand Down Expand Up @@ -532,20 +606,24 @@ mod tests {
use p2panda_rs::document::materialization::build_graph;
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewFields, DocumentViewId};
use p2panda_rs::identity::KeyPair;
use p2panda_rs::operation::traits::AsOperation;
use p2panda_rs::operation::{Operation, OperationId};
use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore};
use p2panda_rs::test_utils::constants;
use p2panda_rs::test_utils::fixtures::{
operation, random_document_id, random_document_view_id, random_operation_id,
key_pair, operation, random_document_id, random_document_view_id, random_operation_id,
};
use p2panda_rs::test_utils::memory_store::helpers::{populate_store, PopulateStoreConfig};
use p2panda_rs::WithId;
use rstest::rstest;

use crate::db::stores::document::DocumentView;
use crate::materializer::tasks::reduce_task;
use crate::materializer::TaskInput;
use crate::test_utils::{
build_document, populate_and_materialize, populate_store_config, test_runner, TestNode,
add_schema_and_documents, build_document, populate_and_materialize, populate_store_config,
test_runner, TestNode,
};

#[rstest]
Expand Down Expand Up @@ -928,4 +1006,110 @@ mod tests {
assert_eq!(schema_documents.len(), 10);
});
}

#[rstest]
fn prunes_document_views(
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
) {
test_runner(|mut node: TestNode| async move {
// Populate the store and materialize all documents.
let (_, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let first_document_view_id: DocumentViewId = document_id.as_str().parse().unwrap();

// Get the current document from the store.
let current_document = node.context.store.get_document(&document_id).await.unwrap();

// Get the current view id.
let current_document_view_id = current_document.unwrap().view_id().to_owned();

// Reduce a historic view of an existing document.
let _ = reduce_task(
node.context.clone(),
TaskInput::DocumentViewId(first_document_view_id.clone()),
)
.await;

// Get that view again to check it's in the db.
let document = node
.context
.store
.get_document_by_view_id(&first_document_view_id)
.await
.unwrap();
assert!(document.is_some());

// Now prune dangling views for the document.
let result = node.context.store.prune_document_views(&document_id).await;
assert!(result.is_ok());

// Get the first document view again, it should no longer be there.
let document = node
.context
.store
.get_document_by_view_id(&first_document_view_id)
.await
.unwrap();
assert!(document.is_none());

// Get the current view of the document to make sure that wasn't deleted too.
let document = node
.context
.store
.get_document_by_view_id(&current_document_view_id)
.await
.unwrap();
assert!(document.is_some());
});
}

#[rstest]
fn does_not_prune_pinned_views(
#[from(populate_store_config)]
#[with(2, 1, 1)]
config: PopulateStoreConfig,
key_pair: KeyPair,
) {
test_runner(|mut node: TestNode| async move {
// Populate the store and materialize all documents.
let (_, document_ids) = populate_and_materialize(&mut node, &config).await;
let document_id = document_ids[0].clone();
let first_document_view_id: DocumentViewId = document_id.as_str().parse().unwrap();

// Reduce a historic view of an existing document.
let _ = reduce_task(
node.context.clone(),
TaskInput::DocumentViewId(first_document_view_id.clone()),
)
.await;

// Add a new document to the store which pins the first view of the above document.
add_schema_and_documents(
&mut node,
"new_schema",
vec![vec![(
"pin_document",
first_document_view_id.clone().into(),
Some(config.schema.id().to_owned()),
)]],
&key_pair,
)
.await;

// Now prune dangling views for the document.
let result = node.context.store.prune_document_views(&document_id).await;
assert!(result.is_ok());

// Get the first document view, it should still be in the store as it was pinned.
let document = node
.context
.store
.get_document_by_view_id(&first_document_view_id)
.await
.unwrap();
assert!(document.is_some());
});
}
}

0 comments on commit a6981fd

Please sign in to comment.