Skip to content

Commit

Permalink
Logging in reduce task (#175)
Browse files Browse the repository at this point in the history
* Add logging

* Pretty print input

* Update CHANGELOG

* Minor change to import group

Co-authored-by: Andreas Dzialocha <x12@adz.garden>
  • Loading branch information
sandreae and adzialocha committed Jun 29, 2022
1 parent 4b9f18f commit 07235c8
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Inform materialization service about new operations [#161](https://github.com/p2panda/aquadoggo/pull/161)
- e2e publish entry tests [#167](https://github.com/p2panda/aquadoggo/pull/167)
- Reschedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168)
- Debug logging in reduce task [#175](https://github.com/p2panda/aquadoggo/pull/175)

### Changed

Expand Down
56 changes: 46 additions & 10 deletions aquadoggo/src/materializer/tasks/reduce.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use log::debug;
use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId};
use p2panda_rs::operation::VerifiedOperation;
use p2panda_rs::storage_provider::traits::OperationStore;
Expand All @@ -17,6 +18,8 @@ use crate::materializer::TaskInput;
/// After succesfully reducing and storing a document view an array of dependency tasks is returned.
/// If invalid inputs were passed or a fatal db error occured a critical error is returned.
pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
debug!("Working on reduce task {:#?}", input);

// Find out which document we are handling
let document_id = resolve_document_id(&context, &input).await?;

Expand All @@ -25,7 +28,10 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskI
.store
.get_operations_by_document_id(&document_id)
.await
.map_err(|_| TaskError::Critical)?;
.map_err(|err| {
debug!("Failed loading operations from storage: {}", err);
TaskError::Critical
})?;

let document_view_id = match &input.document_view_id {
// If this task was passed a document_view_id as input then we want to build to document
Expand All @@ -37,11 +43,17 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskI

// Dispatch a "dependency" task if we created a new document view
match document_view_id {
Some(view_id) => Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(view_id)),
)])),
None => Ok(None),
Some(view_id) => {
debug!("Dispatch dependency task for view with id: {}", view_id);
Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(view_id)),
)]))
}
None => {
debug!("No dependency tasks to dispatch");
Ok(None)
}
}
}

Expand Down Expand Up @@ -94,13 +106,23 @@ async fn reduce_document_view(
{
Ok(document) => {
// If the document was deleted, then we return nothing
debug!(
"Document materialized to view with id: {}",
document_view_id
);
if document.is_deleted() {
return Ok(None);
};

document
}
Err(_) => {
Err(err) => {
debug!(
"Document view materialization failed view with id: {}",
document_view_id
);
debug!("{}", err);

// There is not enough operations yet to materialise this view. Maybe next time!
return Ok(None);
}
Expand All @@ -111,7 +133,15 @@ async fn reduce_document_view(
.store
.insert_document_view(document.view().unwrap(), document.schema())
.await
.map_err(|_| TaskError::Critical)?;
.map_err(|err| {
debug!(
"Failed to insert document view into database: {}",
document_view_id
);
debug!("{}", err);

TaskError::Critical
})?;

// Return the new view id to be used in the resulting dependency task
Ok(Some(document.view_id().to_owned()))
Expand All @@ -134,7 +164,11 @@ async fn reduce_document(
.store
.insert_document(&document)
.await
.map_err(|_| TaskError::Critical)?;
.map_err(|err| {
debug!("Failed to insert document into database: {}", document.id());
debug!("{}", err);
TaskError::Critical
})?;

// If the document was deleted, then we return nothing
if document.is_deleted() {
Expand All @@ -144,8 +178,10 @@ async fn reduce_document(
// Return the new document_view id to be used in the resulting dependency task
Ok(Some(document.view_id().to_owned()))
}
Err(_) => {
Err(err) => {
// There is not enough operations yet to materialise this view. Maybe next time!
debug!("Document materialization error: {}", err);

Ok(None)
}
}
Expand Down

0 comments on commit 07235c8

Please sign in to comment.