Skip to content

Commit

Permalink
Try to improve control flow in reduce task as well
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Jun 15, 2022
1 parent ba3d649 commit b0d0124
Showing 1 changed file with 112 additions and 58 deletions.
170 changes: 112 additions & 58 deletions aquadoggo/src/materializer/tasks/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,67 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use p2panda_rs::document::DocumentBuilder;
use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId};
use p2panda_rs::operation::VerifiedOperation;
use p2panda_rs::storage_provider::traits::OperationStore;

use crate::context::Context;
use crate::db::traits::DocumentStore;
use crate::materializer::worker::{Task, TaskError, TaskResult};
use crate::materializer::TaskInput;

/// A reduce task is dispatched for every entry and operation pair which arrives at a node. They
/// may also be dispatched from a dependency task when a pinned relations is present on an already
/// materialised document view.
/// A reduce task is dispatched for every entry and operation pair which arrives at a node.
///
/// They may also be dispatched from a dependency task when a pinned relations is present on an
/// already materialised document view.
///
/// After succesfully reducing and storing a document view an array of dependency tasks is returned.
/// If invalid inputs were passed or a fatal db error occured a critical error is returned.
pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
// Parse the task input, if they are invalid (both or neither ids provided) we critically fail
// the task at this point. If only a document_view was passed we retrieve the document_id as
// it is needed later.
let document_id = match (&input.document_id, &input.document_view_id) {
// Find out which document we are handling
let document_id = resolve_document_id(&context, &input).await?;

// Get all operations for the requested document
let operations = context
.store
.get_operations_by_document_id(&document_id)
.await
.map_err(|_| TaskError::Critical)?;

let document_view_id = match &input.document_view_id {
// If this task was passed a document_view_id as input then we want to build to document
// only to the requested view
Some(view_id) => reduce_document_view(&context, view_id, operations).await?,
// If no document_view_id was passed, this is a document_id reduce task.
None => reduce_document(&context, operations).await?,
};

// Dispatch a "dependency" task if we created a new document view
match document_view_id {
Some(view_id) => Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(view_id)),
)])),
None => Ok(None),
}
}

/// Helper method to resolve a `DocumentId` from task input.
///
/// If the task input is invalid (both document_id and document_view_id missing or given) we
/// critically fail the task at this point. If only a document_view_id was passed we retrieve the
/// document_id as it is needed later.
async fn resolve_document_id(
context: &Context,
input: &TaskInput,
) -> Result<DocumentId, TaskError> {
match (&input.document_id, &input.document_view_id) {
// The `DocumentId` is already given, we don't have to do anything
(Some(document_id), None) => Ok(document_id.to_owned()),

// A `DocumentViewId` is given, let's find out its document id
(None, Some(document_view_id)) => {
// TODO: We can skip this step if we implement https://github.com/p2panda/aquadoggo/issues/148
// @TODO: We can skip this step if we implement:
// https://github.com/p2panda/aquadoggo/issues/148
let operation_id = document_view_id.clone().into_iter().next().unwrap();
match context
.store
Expand All @@ -30,71 +70,85 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskI
.map_err(|_| TaskError::Critical)?
{
Some(document_id) => Ok(document_id),
None => return Ok(None),
None => Err(TaskError::Critical),
}
}

// None or both have been provided which smells like a bug
(_, _) => Err(TaskError::Critical),
}?;
}
}

// Get all operations for the requested document.
let operations = context
/// Helper method to reduce an operation graph to a specific document view, returning the
/// `DocumentViewId` of the just created new document view.
///
/// It returns `None` if either that document view reached "deleted" status or we don't have enough
/// operations to materialise.
async fn reduce_document_view(
context: &Context,
document_view_id: &DocumentViewId,
operations: Vec<VerifiedOperation>,
) -> Result<Option<DocumentViewId>, TaskError> {
let document = match DocumentBuilder::new(operations)
.build_to_view_id(Some(document_view_id.to_owned()))
{
Ok(document) => {
// If the document was deleted, then we return nothing
if document.is_deleted() {
return Ok(None);
};

document
}
Err(_) => {
// There is not enough operations yet to materialise this view. Maybe next time!
return Ok(None);
}
};

// Insert the new document view into the database
context
.store
.get_operations_by_document_id(&document_id)
.insert_document_view(document.view().unwrap(), document.schema())
.await
.map_err(|_| TaskError::Critical)?;

let document_view_id = match &input.document_view_id {
// If this task was passed a document_view_id as input then we want to build to document only to the
// requested view.
Some(document_view_id) => {
let document = match DocumentBuilder::new(operations)
.build_to_view_id(Some(document_view_id.to_owned()))
{
Ok(document) => {
// If the document was deleted, then we return nothing.
if document.is_deleted() {
return Ok(None);
};
document
}
Err(_) => return Ok(None),
};
// Return the new view id to be used in the resulting dependency task
Ok(Some(document.view_id().to_owned()))
}

// Insert the document document_view into the database.
/// Helper method to reduce an operation graph to the latest document view, returning the
/// `DocumentViewId` of the just created new document view.
///
/// It returns `None` if either that document view reached "deleted" status or we don't have enough
/// operations to materialise.
async fn reduce_document(
context: &Context,
operations: Vec<VerifiedOperation>,
) -> Result<Option<DocumentViewId>, TaskError> {
match DocumentBuilder::new(operations).build() {
Ok(document) => {
// Insert this document into storage. If it already existed, this will update it's
// current view
context
.store
.insert_document_view(document.view().unwrap(), document.schema())
.insert_document(&document)
.await
.map_err(|_| TaskError::Critical)?;

// Return the view id to be used in the resulting dependency task.
document.view_id().to_owned()
}
// If no document_view_id was passed, this is a document_id reduce task.
None => match DocumentBuilder::new(operations).build() {
Ok(document) => {
// Insert this document into storage. If it already existed, this will update it's current
// view.
context
.store
.insert_document(&document)
.await
.map_err(|_| TaskError::Critical)?;

if document.is_deleted() {
return Ok(None);
}
// Return the document_view id to be used in the resulting dependency task.
document.view_id().to_owned()
// If the document was deleted, then we return nothing
if document.is_deleted() {
return Ok(None);
}
Err(_) => return Ok(None),
},
};

Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(document_view_id)),
)]))
// Return the new document_view id to be used in the resulting dependency task
Ok(Some(document.view_id().to_owned()))
}
Err(_) => {
// There is not enough operations yet to materialise this view. Maybe next time!
Ok(None)
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit b0d0124

Please sign in to comment.