Skip to content

Commit

Permalink
Merge branch 'main' into graphql-document-query-experimental
Browse files Browse the repository at this point in the history
  • Loading branch information
adzialocha committed Jun 29, 2022
2 parents 85de0be + 07235c8 commit 7e2f198
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 70 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- e2e publish entry tests [#167](https://github.com/p2panda/aquadoggo/pull/167)
- Reschedule pending tasks on startup [#168](https://github.com/p2panda/aquadoggo/pull/168)
- Add schema task and schema provider that update when new schema views are materialised [#166](https://github.com/p2panda/aquadoggo/pull/166/files)
- Debug logging in reduce task [#175](https://github.com/p2panda/aquadoggo/pull/175)

### Changed

Expand All @@ -40,6 +41,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Refactor tests to use fixtures exported from `p2panda-rs` [#147](https://github.com/p2panda/aquadoggo/pull/147)
- Use `DocumentViewId` for previous operations [#147](https://github.com/p2panda/aquadoggo/pull/147)
- Use `VerifiedOperation` [#158](https://github.com/p2panda/aquadoggo/pull/158)
- Refactor test_db creation [#176](https://github.com/p2panda/aquadoggo/pull/176)

### Fixed

Expand Down
146 changes: 88 additions & 58 deletions aquadoggo/src/db/stores/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::convert::TryFrom;

use futures::Future;
use p2panda_rs::document::{DocumentBuilder, DocumentId, DocumentViewId};
use p2panda_rs::entry::{sign_and_encode, Entry};
use p2panda_rs::entry::{sign_and_encode, Entry, EntrySigned};
use p2panda_rs::hash::Hash;
use p2panda_rs::identity::{Author, KeyPair};
use p2panda_rs::operation::{
Expand All @@ -16,7 +16,7 @@ use p2panda_rs::storage_provider::traits::{
AsStorageEntry, AsStorageLog, EntryStore, LogStore, OperationStore, StorageProvider,
};
use p2panda_rs::test_utils::constants::{DEFAULT_PRIVATE_KEY, TEST_SCHEMA_ID};
use p2panda_rs::test_utils::fixtures::{create_operation, delete_operation, update_operation};
use p2panda_rs::test_utils::fixtures::{operation, operation_fields};
use rstest::fixture;
use sqlx::migrate::MigrateDatabase;
use sqlx::Any;
Expand All @@ -26,7 +26,7 @@ use crate::db::provider::SqlStorage;
use crate::db::stores::{StorageEntry, StorageLog};
use crate::db::traits::DocumentStore;
use crate::db::{connection_pool, create_database, run_pending_migrations, Pool};
use crate::graphql::client::{EntryArgsRequest, PublishEntryRequest};
use crate::graphql::client::{EntryArgsRequest, PublishEntryRequest, PublishEntryResponse};
use crate::test_helpers::TEST_CONFIG;

/// The fields used as defaults in the tests.
Expand Down Expand Up @@ -313,7 +313,7 @@ async fn create_test_db(
no_of_entries: usize,
no_of_authors: usize,
with_delete: bool,
schema: SchemaId,
schema_id: SchemaId,
create_operation_fields: Vec<(&'static str, OperationValue)>,
update_operation_fields: Vec<(&'static str, OperationValue)>,
) -> TestDatabase {
Expand All @@ -333,68 +333,41 @@ async fn create_test_db(
}

for key_pair in &key_pairs {
let mut document: Option<DocumentId> = None;
let author = Author::try_from(key_pair.public_key().to_owned()).unwrap();
let mut document_id: Option<DocumentId> = None;
let mut previous_operation: Option<DocumentViewId> = None;
for index in 0..no_of_entries {
let next_entry_args = store
.get_entry_args(&EntryArgsRequest {
author: author.clone(),
document: document.as_ref().cloned(),
})
.await
.unwrap();

let next_operation = if index == 0 {
create_operation(&create_operation_fields)
} else if index == (no_of_entries - 1) && with_delete {
delete_operation(&next_entry_args.backlink.clone().unwrap().into())
} else {
update_operation(
&update_operation_fields,
&next_entry_args.backlink.clone().unwrap().into(),
)
// Create an operation based on the current index and whether this document should contain
// a DELETE operation.
let next_operation_fields = match index {
// First operation is a CREATE.
0 => Some(operation_fields(create_operation_fields.clone())),
// Last operation is a DELETE if the with_delete flag is set.
seq if seq == (no_of_entries - 1) && with_delete => None,
// All other operations are UPDATE.
_ => Some(operation_fields(update_operation_fields.clone())),
};

let next_entry = Entry::new(
&next_entry_args.log_id,
Some(&next_operation),
next_entry_args.skiplink.as_ref(),
next_entry_args.backlink.as_ref(),
&next_entry_args.seq_num,
// Publish the operation encoded on an entry to storage.
let (entry_encoded, publish_entry_response) = send_to_store(
&store,
&operation(
next_operation_fields,
previous_operation,
Some(schema_id.to_owned()),
),
document_id.as_ref(),
key_pair,
)
.unwrap();
.await;

let entry_encoded = sign_and_encode(&next_entry, key_pair).unwrap();
let operation_encoded = OperationEncoded::try_from(&next_operation).unwrap();
// Set the previous_operations based on the backlink.
previous_operation = publish_entry_response.backlink.map(|hash| hash.into());

// If this was the first entry in the document, store the doucment id for later.
if index == 0 {
document = Some(entry_encoded.hash().into());
documents.push(entry_encoded.hash().into());
}

let storage_entry = StorageEntry::new(&entry_encoded, &operation_encoded).unwrap();

store.insert_entry(storage_entry).await.unwrap();

let storage_log = StorageLog::new(
&author,
&schema,
&document.clone().unwrap(),
&next_entry_args.log_id,
);

if next_entry_args.seq_num.is_first() {
store.insert_log(storage_log).await.unwrap();
document_id = Some(entry_encoded.hash().into());
documents.push(document_id.clone().unwrap());
}

let verified_operation =
VerifiedOperation::new(&author, &entry_encoded.hash().into(), &next_operation)
.unwrap();

store
.insert_operation(&verified_operation, &document.clone().unwrap())
.await
.unwrap();
}
}

Expand All @@ -405,6 +378,63 @@ async fn create_test_db(
}
}

async fn send_to_store(
store: &SqlStorage,
operation: &Operation,
document_id: Option<&DocumentId>,
key_pair: &KeyPair,
) -> (EntrySigned, PublishEntryResponse) {
// Get an Author from the key_pair.
let author = Author::try_from(key_pair.public_key().to_owned()).unwrap();

// Get the next entry arguments for this author and the passed document id.
let next_entry_args = store
.get_entry_args(&EntryArgsRequest {
author: author.clone(),
document: document_id.cloned(),
})
.await
.unwrap();

// Construct the next entry.
let next_entry = Entry::new(
&next_entry_args.log_id,
Some(operation),
next_entry_args.skiplink.as_ref(),
next_entry_args.backlink.as_ref(),
&next_entry_args.seq_num,
)
.unwrap();

// Encode both the entry and operation.
let entry_encoded = sign_and_encode(&next_entry, key_pair).unwrap();
let operation_encoded = OperationEncoded::try_from(operation).unwrap();

// Publish the entry and get the next entry args.
let publish_entry_request = PublishEntryRequest {
entry_encoded: entry_encoded.clone(),
operation_encoded,
};
let publish_entry_response = store.publish_entry(&publish_entry_request).await.unwrap();

// Set or unwrap the passed document_id.
let document_id = if operation.is_create() {
entry_encoded.hash().into()
} else {
document_id.unwrap().to_owned()
};

// Also insert the operation into the store.
let verified_operation =
VerifiedOperation::new(&author, &entry_encoded.hash().into(), operation).unwrap();
store
.insert_operation(&verified_operation, &document_id)
.await
.unwrap();

(entry_encoded, publish_entry_response)
}

/// Create test database.
async fn initialize_db() -> Pool {
// Reset database first
Expand Down
56 changes: 44 additions & 12 deletions aquadoggo/src/materializer/tasks/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::materializer::TaskInput;
/// After succesfully reducing and storing a document view an array of dependency tasks is returned.
/// If invalid inputs were passed or a fatal db error occured a critical error is returned.
pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskInput> {
debug!("Working on {}", input);
debug!("Working on reduce task {:#?}", input);

// Find out which document we are handling
let document_id = resolve_document_id(&context, &input).await?;
Expand All @@ -28,7 +28,10 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskI
.store
.get_operations_by_document_id(&document_id)
.await
.map_err(|_| TaskError::Critical)?;
.map_err(|err| {
debug!("Failed loading operations from storage: {}", err);
TaskError::Critical
})?;

let document_view_id = match &input.document_view_id {
// If this task was passed a document_view_id as input then we want to build to document
Expand All @@ -40,11 +43,17 @@ pub async fn reduce_task(context: Context, input: TaskInput) -> TaskResult<TaskI

// Dispatch a "dependency" task if we created a new document view
match document_view_id {
Some(view_id) => Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(view_id)),
)])),
None => Ok(None),
Some(view_id) => {
debug!("Dispatch dependency task for view with id: {}", view_id);
Ok(Some(vec![Task::new(
"dependency",
TaskInput::new(None, Some(view_id)),
)]))
}
None => {
debug!("No dependency tasks to dispatch");
Ok(None)
}
}
}

Expand Down Expand Up @@ -97,14 +106,23 @@ async fn reduce_document_view(
{
Ok(document) => {
// If the document was deleted, then we return nothing
debug!(
"Document materialized to view with id: {}",
document_view_id
);
if document.is_deleted() {
return Ok(None);
};

document
}
Err(_) => {
debug!("Reduce task: failed building");
Err(err) => {
debug!(
"Document view materialization failed view with id: {}",
document_view_id
);
debug!("{}", err);

// There is not enough operations yet to materialise this view. Maybe next time!
return Ok(None);
}
Expand All @@ -115,7 +133,15 @@ async fn reduce_document_view(
.store
.insert_document_view(document.view().unwrap(), document.schema())
.await
.map_err(|_| TaskError::Critical)?;
.map_err(|err| {
debug!(
"Failed to insert document view into database: {}",
document_view_id
);
debug!("{}", err);

TaskError::Critical
})?;

info!("Stored {} view {}", document, document.view_id());

Expand All @@ -140,7 +166,11 @@ async fn reduce_document(
.store
.insert_document(&document)
.await
.map_err(|_| TaskError::Critical)?;
.map_err(|err| {
debug!("Failed to insert document into database: {}", document.id());
debug!("{}", err);
TaskError::Critical
})?;

// If the document was deleted, then we return nothing
if document.is_deleted() {
Expand All @@ -152,8 +182,10 @@ async fn reduce_document(
// Return the new document_view id to be used in the resulting dependency task
Ok(Some(document.view_id().to_owned()))
}
Err(_) => {
Err(err) => {
// There is not enough operations yet to materialise this view. Maybe next time!
debug!("Document materialization error: {}", err);

Ok(None)
}
}
Expand Down

0 comments on commit 7e2f198

Please sign in to comment.