Skip to content

Commit

Permalink
"blob" materializer task (#493)
Browse files Browse the repository at this point in the history
* Introduce blob materializer task

* fmt

* Materialize blobs to filesystem in "blob" task

* Issue a "blob" task when dependencies met.

* Some errors are not Critical errors

* fmt

* Use tempfile for data dir during tests

* Test for blob filesystem materialization

* fmt
  • Loading branch information
sandreae committed Aug 22, 2023
1 parent 81cae95 commit ba98934
Show file tree
Hide file tree
Showing 6 changed files with 297 additions and 5 deletions.
4 changes: 3 additions & 1 deletion aquadoggo/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ const DEFAULT_SQLITE_NAME: &str = "aquadoggo-node.sqlite3";
/// Blobs directory
pub const BLOBS_DIR_NAME: &str = "blobs";

pub const BLOBS_SYMLINK_DIR_NAME: &str = "documents";

/// Configuration object holding all important variables throughout the application.
///
/// Each configuration also assures that a data directory exists on the host machine where database
Expand Down Expand Up @@ -81,7 +83,7 @@ impl Configuration {
});

// Create folders when they don't exist yet
fs::create_dir_all(base_path.join(BLOBS_DIR_NAME))?;
fs::create_dir_all(base_path.join(BLOBS_DIR_NAME).join(BLOBS_SYMLINK_DIR_NAME))?;

Ok(base_path)
}
Expand Down
3 changes: 2 additions & 1 deletion aquadoggo/src/materializer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use tokio::task;
use crate::bus::{ServiceMessage, ServiceSender};
use crate::context::Context;
use crate::manager::{ServiceReadySender, Shutdown};
use crate::materializer::tasks::{dependency_task, reduce_task, schema_task};
use crate::materializer::tasks::{blob_task, dependency_task, reduce_task, schema_task};
use crate::materializer::worker::{Factory, Task, TaskStatus};
use crate::materializer::TaskInput;

Expand Down Expand Up @@ -38,6 +38,7 @@ pub async fn materializer_service(
factory.register("reduce", pool_size, reduce_task);
factory.register("dependency", pool_size, dependency_task);
factory.register("schema", pool_size, schema_task);
factory.register("blob", pool_size, blob_task);

// Get a listener for error signal from factory
let on_error = factory.on_error();
Expand Down
274 changes: 274 additions & 0 deletions aquadoggo/src/materializer/tasks/blob.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::fs::{self, File};
use std::io::Write;
use std::os::unix::fs::symlink;

use log::{debug, info};
use p2panda_rs::document::traits::AsDocument;
use p2panda_rs::document::DocumentViewId;
use p2panda_rs::operation::OperationValue;
use p2panda_rs::schema::SchemaId;
use p2panda_rs::storage_provider::traits::{DocumentStore, OperationStore};

use crate::config::{BLOBS_DIR_NAME, BLOBS_SYMLINK_DIR_NAME};
use crate::context::Context;
use crate::db::types::StorageDocument;
use crate::db::SqlStore;
use crate::materializer::worker::{TaskError, TaskResult};
use crate::materializer::TaskInput;

/// A blob task assembles and persists blobs to the filesystem.
///
/// Blob tasks are dispatched whenever a blob or blob piece document has all its immediate
/// dependencies available in the store.
pub async fn blob_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
debug!("Working on {}", input);

let input_view_id = match input {
TaskInput::DocumentViewId(view_id) => view_id,
_ => return Err(TaskError::Critical("Invalid task input".into())),
};

// Determine the schema of the updated view id.
let schema = context
.store
.get_schema_by_document_view(&input_view_id)
.await
.map_err(|err| TaskError::Critical(err.to_string()))?
.unwrap();

let updated_blobs: Vec<StorageDocument> = match schema {
// This task is about an updated blob document so we only handle that.
SchemaId::Blob(_) => {
let document = context
.store
.get_document_by_view_id(&input_view_id)
.await
.map_err(|err| TaskError::Failure(err.to_string()))?
.unwrap();
Ok(vec![document])
}

// This task is about an updated blob piece document that may be used in one or more blob documents.
SchemaId::BlobPiece(_) => get_related_blobs(&input_view_id, &context).await,
_ => Err(TaskError::Critical(format!(
"Unknown system schema id: {}",
schema
))),
}?;

// The related blobs are not known yet to this node so we mark this task failed.
if updated_blobs.is_empty() {
return Err(TaskError::Failure(
"Related blob does not exist (yet)".into(),
));
}

// Materialize all updated blobs to the filesystem.
for blob_document in updated_blobs.iter() {
// Get the raw blob data.
let blob_data = context
.store
.get_blob_by_view_id(blob_document.view_id())
.await
// We don't raise a critical error here, as it is possible that this method returns an
// error.
.map_err(|err| TaskError::Failure(err.to_string()))?
.unwrap();

// Compose, and when needed create, the path for the blob file.
let base_path = match &context.config.base_path {
Some(base_path) => base_path,
None => return Err(TaskError::Critical("No base path configured".to_string())),
};

let blob_dir = base_path
.join(BLOBS_DIR_NAME)
.join(blob_document.id().as_str());

fs::create_dir_all(&blob_dir).map_err(|err| TaskError::Critical(err.to_string()))?;
let blob_view_path = blob_dir.join(blob_document.view_id().to_string());

// Write the blob to the filesystem.
info!("Creating blob at path {blob_view_path:?}");

let mut file = File::create(&blob_view_path).unwrap();
file.write_all(blob_data.as_bytes()).unwrap();

// create a symlink from `../documents/<document_id>` -> `../<document_id>/<current_view_id>`
if is_current_view(&context.store, blob_document.view_id()).await? {
info!("Creating symlink from document id to current view");

let link_path = base_path
.join(BLOBS_DIR_NAME)
.join(BLOBS_SYMLINK_DIR_NAME)
.join(blob_document.id().as_str());

let _ = fs::remove_file(&link_path);

symlink(blob_view_path, link_path)
.map_err(|err| TaskError::Critical(err.to_string()))?;
}
}

Ok(None)
}

/// Retrieve blobs that use the targeted blob piece as one of their fields.
async fn get_related_blobs(
target_blob_piece: &DocumentViewId,
context: &Context,
) -> Result<Vec<StorageDocument>, TaskError> {
// Retrieve all blob documents from the store
let blobs = context
.store
.get_documents_by_schema(&SchemaId::Blob(1))
.await
.map_err(|err| TaskError::Critical(err.to_string()))
.unwrap();

// Collect all blobs that use the targeted blob piece
let mut related_blobs = vec![];
for blob in blobs {
// We can unwrap the value here as all documents returned from the storage method above
// have a current view (they are not deleted).
let fields_value = blob.get("pieces").unwrap();

if let OperationValue::PinnedRelationList(fields) = fields_value {
if fields
.iter()
.any(|field_view_id| field_view_id == target_blob_piece)
{
related_blobs.push(blob)
} else {
continue;
}
} else {
// It is a critical if there are blobs in the store that don't match the blob schema.
Err(TaskError::Critical(
"Blob operation does not have a 'pieces' operation field".into(),
))?
}
}

Ok(related_blobs)
}

// Check if this is the current view for this blob.
async fn is_current_view(
store: &SqlStore,
document_view_id: &DocumentViewId,
) -> Result<bool, TaskError> {
let blob_document_id = store
.get_document_id_by_operation_id(document_view_id.graph_tips().first().unwrap())
.await
.map_err(|err| TaskError::Critical(err.to_string()))?
.expect("Document for blob exists");

let current_blob_document = store
.get_document(&blob_document_id)
.await
.map_err(|err| TaskError::Critical(err.to_string()))?
.expect("Document for blob exists");

Ok(current_blob_document.view_id() == document_view_id)
}

#[cfg(test)]
mod tests {
use std::fs;

use p2panda_rs::document::DocumentId;
use p2panda_rs::identity::KeyPair;
use p2panda_rs::schema::SchemaId;
use p2panda_rs::test_utils::fixtures::key_pair;
use rstest::rstest;

use crate::config::{BLOBS_DIR_NAME, BLOBS_SYMLINK_DIR_NAME};
use crate::materializer::tasks::blob_task;
use crate::materializer::TaskInput;
use crate::test_utils::{add_document, test_runner, TestNode};

#[rstest]
fn materializes_blob_to_filesystem(key_pair: KeyPair) {
test_runner(|mut node: TestNode| async move {
let blob_data = "Hello, World!".to_string();

// Publish blob pieces and blob.
let blob_piece_view_id_1 = add_document(
&mut node,
&SchemaId::BlobPiece(1),
vec![("data", blob_data[..5].into())],
&key_pair,
)
.await;

let blob_piece_view_id_2 = add_document(
&mut node,
&SchemaId::BlobPiece(1),
vec![("data", blob_data[5..].into())],
&key_pair,
)
.await;

// Publish blob.
let blob_view_id = add_document(
&mut node,
&SchemaId::Blob(1),
vec![
("length", { blob_data.len() as i64 }.into()),
("mime_type", "text/plain".into()),
(
"pieces",
vec![blob_piece_view_id_1, blob_piece_view_id_2].into(),
),
],
&key_pair,
)
.await;

// Run blob task.
let result = blob_task(
node.context.clone(),
TaskInput::DocumentViewId(blob_view_id.clone()),
)
.await;

// It shouldn't fail.
assert!(result.is_ok(), "{:#?}", result);
// It should return no extra tasks.
assert!(result.unwrap().is_none());

// Convert blob view id to document id.
let document_id: DocumentId = blob_view_id.to_string().parse().unwrap();

// Construct the expected path to the blob view file.
let base_path = node.context.config.base_path.as_ref().unwrap();
let blob_path = base_path
.join(BLOBS_DIR_NAME)
.join(document_id.as_str())
.join(blob_view_id.to_string());

// Read from this file
let retrieved_blob_data = fs::read_to_string(blob_path);

// It should match the complete published blob data.
assert!(retrieved_blob_data.is_ok(), "{:?}", retrieved_blob_data);
assert_eq!(blob_data, retrieved_blob_data.unwrap());

// Construct the expected path to the blob symlink file location.
let blob_path = base_path
.join(BLOBS_DIR_NAME)
.join(BLOBS_SYMLINK_DIR_NAME)
.join(document_id.as_str());

// Read from this file
let retrieved_blob_data = fs::read_to_string(blob_path);

// It should match the complete published blob data.
assert!(retrieved_blob_data.is_ok(), "{:?}", retrieved_blob_data);
assert_eq!(blob_data, retrieved_blob_data.unwrap())
})
}
}
9 changes: 8 additions & 1 deletion aquadoggo/src/materializer/tasks/dependency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,17 @@ pub async fn dependency_task(context: Context, input: TaskInput) -> TaskResult<T
TaskInput::DocumentViewId(document_view.id().clone()),
));
}
// Start `blob` task when a blob or blob_piece view is completed with
// dependencies.
SchemaId::Blob(_) | SchemaId::BlobPiece(_) => {
next_tasks.push(Task::new(
"blob",
TaskInput::DocumentViewId(document_view.id().clone()),
));
}
_ => {}
}
}

// Now we check all the "parent" or "inverse" relations, that is _other_ documents pointing at
// the one we're currently looking at
let mut reverse_tasks = get_inverse_relation_tasks(&context, document.schema_id()).await?;
Expand Down
2 changes: 2 additions & 0 deletions aquadoggo/src/materializer/tasks/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

mod blob;
mod dependency;
mod reduce;
mod schema;

pub use blob::blob_task;
pub use dependency::dependency_task;
pub use reduce::reduce_task;
pub use schema::schema_task;
10 changes: 8 additions & 2 deletions aquadoggo/src/test_utils/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,11 @@ impl TestNodeManager {
// Initialise test store using pool.
let store = SqlStore::new(pool.clone());

// Construct tempfile directory for the test runner.
let tmp_dir = tempfile::TempDir::new().unwrap();

// Construct node config supporting any schema.
let cfg = Configuration::default();
let cfg = Configuration::new(Some(tmp_dir.path().to_path_buf())).unwrap();

// Construct the actual test node
let test_node = TestNode {
Expand Down Expand Up @@ -101,8 +104,11 @@ pub fn test_runner<F: AsyncTestFn + Send + Sync + 'static>(test: F) {
let (_config, pool) = initialize_db().await;
let store = SqlStore::new(pool);

// Construct tempfile directory for the test runner.
let tmp_dir = tempfile::TempDir::new().unwrap();

// Construct node config supporting any schema.
let cfg = Configuration::default();
let cfg = Configuration::new(Some(tmp_dir.path().to_path_buf())).unwrap();

// Construct the actual test node
let node = TestNode {
Expand Down

0 comments on commit ba98934

Please sign in to comment.