From 067ee1d50b3947983e315796eeec029a31283d23 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 09:42:34 +0100 Subject: [PATCH 1/7] Check materialized blob file is complete before aborting task --- aquadoggo/src/materializer/tasks/blob.rs | 29 +++++++++++++++++++----- 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/blob.rs b/aquadoggo/src/materializer/tasks/blob.rs index 6dcd2a493..6591aaf43 100644 --- a/aquadoggo/src/materializer/tasks/blob.rs +++ b/aquadoggo/src/materializer/tasks/blob.rs @@ -3,6 +3,7 @@ use futures::{pin_mut, StreamExt}; use log::{debug, info}; use p2panda_rs::document::traits::AsDocument; +use p2panda_rs::operation::OperationValue; use p2panda_rs::schema::SchemaId; use p2panda_rs::storage_provider::traits::DocumentStore; use tokio::fs::OpenOptions; @@ -46,13 +47,29 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult { + let metadata = file + .metadata() + .await + .expect("Can retrieve blob file metadata"); + + let expected_blob_length = match blob_document.get("length").unwrap() { + OperationValue::Integer(length) => length, + _ => unreachable!(), + }; + + if metadata.len() < *expected_blob_length as u64 { + false + } else { + true + } + } + Err(_) => false, + }; if is_blob_materialized { return Err(TaskError::Failure(format!( "Blob file already exists at {}", From f5a2677c4ef4ab3c7db76a1d7365016bf1167485 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 10:01:14 +0100 Subject: [PATCH 2/7] Add test --- aquadoggo/src/materializer/tasks/blob.rs | 40 +++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/tasks/blob.rs b/aquadoggo/src/materializer/tasks/blob.rs index 6591aaf43..5244e6be4 100644 --- a/aquadoggo/src/materializer/tasks/blob.rs +++ b/aquadoggo/src/materializer/tasks/blob.rs @@ -61,7 +61,7 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult length, _ => unreachable!(), }; - + if metadata.len() < *expected_blob_length as u64 { false } else { @@ -266,4 +266,42 @@ mod tests { assert!(result.is_err()); }) } + + #[rstest] + fn re_materialize_blob_after_previous_task_did_not_complete(key_pair: KeyPair) { + test_runner(|mut node: TestNode| async move { + // Publish blob + // Publish blob + let blob_data = "Hello, World!"; + let blob_view_id = + add_blob(&mut node, blob_data.as_bytes(), 5, "plain/text", &key_pair).await; + + // Construct the expected path to the blob view file + let base_path = &node.context.config.blobs_base_path; + let blob_path = base_path.join(blob_view_id.to_string()); + + // Write some bytes to the expected blob path which are < than the actual blob + // bytes length. We expect this file to be overwritten when we run the blob task. + fs::write(blob_path.clone(), vec![0, 1, 2]).await.unwrap(); + + // Run blob task + let result = blob_task( + node.context.clone(), + TaskInput::DocumentViewId(blob_view_id.clone()), + ) + .await; + + // It shouldn't fail + assert!(result.is_ok()); + // It should return no extra tasks + assert!(result.unwrap().is_none()); + + // Read from this file + let retrieved_blob_data = fs::read(blob_path).await; + + // Number of bytes for the publish and materialized blob should be the same + assert!(retrieved_blob_data.is_ok()); + assert_eq!(blob_data.len(), retrieved_blob_data.unwrap().len()); + }) + } } From d22e0e36169d49bf64e5251a82129795cbf229e1 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 10:07:23 +0100 Subject: [PATCH 3/7] fmt --- aquadoggo_cli/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo_cli/src/config.rs b/aquadoggo_cli/src/config.rs index 4cddc5546..758beec51 100644 --- a/aquadoggo_cli/src/config.rs +++ b/aquadoggo_cli/src/config.rs @@ -27,7 +27,7 @@ type ConfigFilePath = Option; /// Returns a partly unchecked configuration object which results from all of these sources. It /// still needs to be converted for aquadoggo as it might still contain invalid values. pub fn load_config() -> Result<(ConfigFilePath, ConfigFile)> { - // Parse command line arguments and CONFIG environment variable first to get optional config + // Parse command line arguments and CONFIG environment variable first to get optional config // file path let cli = Cli::parse(); From cc445e4dae2f572b4b735ff16011a27478183660 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 10:09:08 +0100 Subject: [PATCH 4/7] Update CHANGELOG --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b506b7417..11436e985 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed + +- Re-materialize blobs which were only partially written to disc due to node crash [#618](https://github.com/p2panda/aquadoggo/pull/618) + ## [0.7.3] ### Fixed From ff864e0a58d720e41bc0a975935d661223f9fb24 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 10:11:48 +0100 Subject: [PATCH 5/7] Clippy --- aquadoggo/src/materializer/tasks/blob.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/aquadoggo/src/materializer/tasks/blob.rs b/aquadoggo/src/materializer/tasks/blob.rs index 5244e6be4..fcf3afe26 100644 --- a/aquadoggo/src/materializer/tasks/blob.rs +++ b/aquadoggo/src/materializer/tasks/blob.rs @@ -62,11 +62,7 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult unreachable!(), }; - if metadata.len() < *expected_blob_length as u64 { - false - } else { - true - } + metadata.len() < *expected_blob_length as u64 } Err(_) => false, }; From 8a43f0c7c3eb896a4fb10faba2ec653bb618a27e Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Fri, 14 Jun 2024 10:13:18 +0100 Subject: [PATCH 6/7] Correct cmp logic --- aquadoggo/src/materializer/tasks/blob.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/materializer/tasks/blob.rs b/aquadoggo/src/materializer/tasks/blob.rs index fcf3afe26..ac017dea3 100644 --- a/aquadoggo/src/materializer/tasks/blob.rs +++ b/aquadoggo/src/materializer/tasks/blob.rs @@ -62,7 +62,7 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult unreachable!(), }; - metadata.len() < *expected_blob_length as u64 + metadata.len() == *expected_blob_length as u64 } Err(_) => false, }; From 913ea5f87a543558825ea33ce3849af7229c70df Mon Sep 17 00:00:00 2001 From: adz Date: Fri, 14 Jun 2024 11:20:38 +0200 Subject: [PATCH 7/7] Remove double comment --- aquadoggo/src/materializer/tasks/blob.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aquadoggo/src/materializer/tasks/blob.rs b/aquadoggo/src/materializer/tasks/blob.rs index ac017dea3..73219fb4f 100644 --- a/aquadoggo/src/materializer/tasks/blob.rs +++ b/aquadoggo/src/materializer/tasks/blob.rs @@ -266,7 +266,6 @@ mod tests { #[rstest] fn re_materialize_blob_after_previous_task_did_not_complete(key_pair: KeyPair) { test_runner(|mut node: TestNode| async move { - // Publish blob // Publish blob let blob_data = "Hello, World!"; let blob_view_id =