Skip to content

Commit

Permalink
Replicate operations in topo order (#442)
Browse files Browse the repository at this point in the history
* Send entries ordered by document and their operations sorted_index

* Ingest replicated entries and operations using p2panda-rs `publish`

* Remove test case which should correctly pass now

* Bump p2panda-rs

* Doc strings and tests for log height strategy

* Clippy & fmt
  • Loading branch information
sandreae committed Jul 9, 2023
1 parent 17a1096 commit 88e1605
Show file tree
Hide file tree
Showing 7 changed files with 390 additions and 175 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lipmaa-link = "0.2.2"
log = "0.4.19"
once_cell = "1.18.0"
openssl-probe = "0.1.5"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b", features = [
"storage-provider",
] }
serde = { version = "1.0.152", features = ["derive"] }
Expand Down Expand Up @@ -84,7 +84,7 @@ http = "0.2.9"
hyper = "0.14.19"
libp2p-swarm-test = "0.2.0"
once_cell = "1.17.0"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b", features = [
"test-utils",
"storage-provider",
] }
Expand Down
12 changes: 0 additions & 12 deletions aquadoggo/src/graphql/mutations/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,18 +680,6 @@ mod tests {
},
"Previous operation 0020b177ec1bf26dfb3b7010d473e6d44713b29b765b99c6e60ecbfae742de496543 not found in store"
)]
#[case::claimed_log_id_does_not_match_expected(
&entry_signed_encoded_unvalidated(
1,
2,
None,
None,
Some(EncodedOperation::from_bytes(&OPERATION_ENCODED)),
key_pair(PRIVATE_KEY)
).to_string(),
&OPERATION_ENCODED,
"Entry's claimed log id of 2 does not match expected next log id of 1 for given public key"
)]
fn validation_of_entry_and_operation_values(
#[case] entry_encoded: &str,
#[case] encoded_operation: &[u8],
Expand Down
145 changes: 16 additions & 129 deletions aquadoggo/src/replication/ingest.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,19 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use log::trace;
use p2panda_rs::api::validation::{
ensure_document_not_deleted, get_checked_document_id_for_view_id, get_expected_skiplink,
is_next_seq_num, validate_claimed_schema_id,
};
use p2panda_rs::api::DomainError;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::decode::decode_entry;
use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry};
use p2panda_rs::api::publish;
use p2panda_rs::entry::traits::AsEncodedEntry;
use p2panda_rs::entry::EncodedEntry;
use p2panda_rs::operation::decode::decode_operation;
use p2panda_rs::operation::plain::PlainOperation;
use p2panda_rs::operation::traits::{AsOperation, Schematic};
use p2panda_rs::operation::validate::validate_operation_with_entry;
use p2panda_rs::operation::{EncodedOperation, Operation, OperationAction, OperationId};
use p2panda_rs::schema::Schema;
use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore};
use p2panda_rs::operation::traits::Schematic;
use p2panda_rs::operation::{EncodedOperation, OperationId};
use p2panda_rs::storage_provider::traits::EntryStore;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::db::SqlStore;
use crate::replication::errors::IngestError;
use crate::schema::SchemaProvider;

// @TODO: This method exists in `p2panda-rs`, we need to make it public there
async fn validate_entry_and_operation<S: EntryStore + OperationStore + LogStore>(
store: &S,
schema: &Schema,
entry: &impl AsEntry,
encoded_entry: &impl AsEncodedEntry,
plain_operation: &PlainOperation,
encoded_operation: &EncodedOperation,
) -> Result<(Operation, OperationId), DomainError> {
// Verify that the claimed seq num matches the expected seq num for this public_key and log.
let latest_entry = store
.get_latest_entry(entry.public_key(), entry.log_id())
.await?;
let latest_seq_num = latest_entry.as_ref().map(|entry| entry.seq_num());
is_next_seq_num(latest_seq_num, entry.seq_num())?;

// If a skiplink is claimed, get the expected skiplink from the database, errors if it can't be found.
let skiplink = match entry.skiplink() {
Some(_) => Some(
get_expected_skiplink(store, entry.public_key(), entry.log_id(), entry.seq_num())
.await?,
),
None => None,
};

// Construct params as `validate_operation_with_entry` expects.
let skiplink_params = skiplink.as_ref().map(|entry| {
let hash = entry.hash();
(entry.clone(), hash)
});

// The backlink for this entry is the latest entry from this public key's log.
let backlink_params = latest_entry.as_ref().map(|entry| {
let hash = entry.hash();
(entry.clone(), hash)
});

// Perform validation of the entry and it's operation.
let (operation, operation_id) = validate_operation_with_entry(
entry,
encoded_entry,
skiplink_params.as_ref().map(|(entry, hash)| (entry, hash)),
backlink_params.as_ref().map(|(entry, hash)| (entry, hash)),
plain_operation,
encoded_operation,
schema,
)?;

Ok((operation, operation_id))
}

// @TODO: This method exists in `p2panda-rs`, we need to make it public there
async fn determine_document_id<S: OperationStore>(
store: &S,
operation: &impl AsOperation,
operation_id: &OperationId,
) -> Result<DocumentId, DomainError> {
let document_id = match operation.action() {
OperationAction::Create => {
// Derive the document id for this new document.
Ok::<DocumentId, DomainError>(DocumentId::new(operation_id))
}
_ => {
// We can unwrap previous operations here as we know all UPDATE and DELETE operations contain them.
let previous = operation.previous().unwrap();

// Validate claimed schema for this operation matches the expected found in the
// previous operations.
validate_claimed_schema_id(store, &operation.schema_id(), &previous).await?;

// Get the document_id for the document_view_id contained in previous operations.
// This performs several validation steps (check method doc string).
let document_id = get_checked_document_id_for_view_id(store, &previous).await?;

Ok(document_id)
}
}?;

// Ensure the document isn't deleted.
ensure_document_not_deleted(store, &document_id)
.await
.map_err(|_| DomainError::DeletedDocument)?;

Ok(document_id)
}

#[derive(Debug, Clone)]
pub struct SyncIngest {
tx: ServiceSender,
Expand All @@ -129,8 +34,6 @@ impl SyncIngest {
encoded_entry: &EncodedEntry,
encoded_operation: &EncodedOperation,
) -> Result<(), IngestError> {
let entry = decode_entry(encoded_entry)?;

trace!("Received entry and operation: {}", encoded_entry.hash());

// Check if we already have this entry. This can happen if another peer sent it to us
Expand All @@ -152,42 +55,26 @@ impl SyncIngest {
.await
.ok_or_else(|| IngestError::UnsupportedSchema)?;

let (operation, operation_id) = validate_entry_and_operation(
/////////////////////////////////////
// PUBLISH THE ENTRY AND OPERATION //
/////////////////////////////////////

let _ = publish(
store,
&schema,
&entry,
encoded_entry,
&plain_operation,
encoded_operation,
)
.await?;

let document_id = determine_document_id(store, &operation, &operation_id).await?;

// If the entries' seq num is 1 we insert a new log here
if entry.seq_num().is_first() {
store
.insert_log(
entry.log_id(),
entry.public_key(),
Schematic::schema_id(&operation),
&document_id,
)
.await
.expect("Fatal database error");
}
////////////////////////////////////////
// SEND THE OPERATION TO MATERIALIZER //
////////////////////////////////////////

store
.insert_entry(&entry, encoded_entry, Some(encoded_operation))
.await
.expect("Fatal database error");

store
.insert_operation(&operation_id, entry.public_key(), &operation, &document_id)
.await
.expect("Fatal database error");
// Send new operation on service communication bus, this will arrive eventually at
// the materializer service

// Inform other services about received data
let operation_id: OperationId = encoded_entry.hash().into();

if self
Expand All @@ -197,7 +84,7 @@ impl SyncIngest {
{
// Silently fail here as we don't mind if there are no subscribers. We have
// tests in other places to check if messages arrive.
}
};

Ok(())
}
Expand Down
40 changes: 35 additions & 5 deletions aquadoggo/src/replication/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ mod tests {
use crate::replication::manager::INITIAL_SESSION_ID;
use crate::replication::{Message, Mode, SessionState, TargetSet};
use crate::test_utils::helpers::random_target_set;
use crate::test_utils::{populate_store_config, test_runner, TestNode};
use crate::test_utils::{
populate_and_materialize, populate_store_config, test_runner, test_runner_with_manager,
TestNode, TestNodeManager,
};

use super::Session;

Expand Down Expand Up @@ -221,9 +224,7 @@ mod tests {
#[with(5, 2, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |node: TestNode| async move {
populate_store(&node.context.store, &config).await;

test_runner_with_manager(move |manager: TestNodeManager| async move {
let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]);
let mut session = Session::new(
&INITIAL_SESSION_ID,
Expand All @@ -233,13 +234,42 @@ mod tests {
false,
);

let mut node_a = manager.create().await;
populate_and_materialize(&mut node_a, &config).await;

let response_messages = session
.handle_message(&node.context.store, &Message::Have(vec![]))
.handle_message(&node_a.context.store, &Message::Have(vec![]))
.await
.unwrap();

// This node has materialized their documents already so we expect the following
// messages.
//
// 1x Have + 10x Entry + 1x SyncDone = 12 messages
assert_eq!(response_messages.len(), 12);

let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]);
let mut session = Session::new(
&INITIAL_SESSION_ID,
&target_set,
&Mode::LogHeight,
true,
false,
);

let node_b: TestNode = manager.create().await;
populate_store(&node_b.context.store, &config).await;

let response_messages = session
.handle_message(&node_b.context.store, &Message::Have(vec![]))
.await
.unwrap();

// This node has not materialized any documents so they can't compose their return
// entries yet.
//
// 1x Have + 1x SyncDone = 2 messages
assert_eq!(response_messages.len(), 2);
});
}
}
Loading

0 comments on commit 88e1605

Please sign in to comment.