Skip to content

Commit

Permalink
Re-run tasks for partially materialized blobs (#618)
Browse files Browse the repository at this point in the history
* Check materialized blob file is complete before aborting task

* Add test

* fmt

* Update CHANGELOG

* Clippy

* Correct cmp logic

* Remove double comment

---------

Co-authored-by: adz <x12@adz.garden>
  • Loading branch information
sandreae and adzialocha committed Jun 14, 2024
1 parent b342e25 commit 4b11f2b
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 7 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed

- Re-materialize blobs which were only partially written to disc due to node crash [#618](https://github.com/p2panda/aquadoggo/pull/618)

## [0.7.3]

### Fixed
Expand Down
62 changes: 56 additions & 6 deletions aquadoggo/src/materializer/tasks/blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use futures::{pin_mut, StreamExt};
use log::{debug, info};
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::operation::OperationValue;
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::traits::DocumentStore;
use tokio::fs::OpenOptions;
Expand Down Expand Up @@ -46,13 +47,25 @@ pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult<TaskInp
.blobs_base_path
.join(blob_document.view_id().to_string());

// Check if the blob has already been materialized and return early from this task
// Check if the blob has already been fully materialized and return early from this task
// with an error if it has.
let is_blob_materialized = OpenOptions::new()
.read(true)
.open(&blob_view_path)
.await
.is_ok();
let is_blob_materialized =
match OpenOptions::new().read(true).open(&blob_view_path).await {
Ok(file) => {
let metadata = file
.metadata()
.await
.expect("Can retrieve blob file metadata");

let expected_blob_length = match blob_document.get("length").unwrap() {
OperationValue::Integer(length) => length,
_ => unreachable!(),
};

metadata.len() == *expected_blob_length as u64
}
Err(_) => false,
};
if is_blob_materialized {
return Err(TaskError::Failure(format!(
"Blob file already exists at {}",
Expand Down Expand Up @@ -249,4 +262,41 @@ mod tests {
assert!(result.is_err());
})
}

#[rstest]
fn re_materialize_blob_after_previous_task_did_not_complete(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
// Publish blob
let blob_data = "Hello, World!";
let blob_view_id =
add_blob(&mut node, blob_data.as_bytes(), 5, "plain/text", &key_pair).await;

// Construct the expected path to the blob view file
let base_path = &node.context.config.blobs_base_path;
let blob_path = base_path.join(blob_view_id.to_string());

// Write some bytes to the expected blob path which are < than the actual blob
// bytes length. We expect this file to be overwritten when we run the blob task.
fs::write(blob_path.clone(), vec![0, 1, 2]).await.unwrap();

// Run blob task
let result = blob_task(
node.context.clone(),
TaskInput::DocumentViewId(blob_view_id.clone()),
)
.await;

// It shouldn't fail
assert!(result.is_ok());
// It should return no extra tasks
assert!(result.unwrap().is_none());

// Read from this file
let retrieved_blob_data = fs::read(blob_path).await;

// Number of bytes for the publish and materialized blob should be the same
assert!(retrieved_blob_data.is_ok());
assert_eq!(blob_data.len(), retrieved_blob_data.unwrap().len());
})
}
}
2 changes: 1 addition & 1 deletion aquadoggo_cli/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ConfigFilePath = Option<PathBuf>;
/// Returns a partly unchecked configuration object which results from all of these sources. It
/// still needs to be converted for aquadoggo as it might still contain invalid values.
pub fn load_config() -> Result<(ConfigFilePath, ConfigFile)> {
// Parse command line arguments and CONFIG environment variable first to get optional config
// Parse command line arguments and CONFIG environment variable first to get optional config
// file path
let cli = Cli::parse();

Expand Down

0 comments on commit 4b11f2b

Please sign in to comment.