Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replicate operations in topo order #442

Merged
merged 6 commits into from
Jul 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ lipmaa-link = "0.2.2"
log = "0.4.19"
once_cell = "1.18.0"
openssl-probe = "0.1.5"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b", features = [
"storage-provider",
] }
serde = { version = "1.0.152", features = ["derive"] }
Expand Down Expand Up @@ -84,7 +84,7 @@ http = "0.2.9"
hyper = "0.14.19"
libp2p-swarm-test = "0.2.0"
once_cell = "1.17.0"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b", features = [
"test-utils",
"storage-provider",
] }
Expand Down
12 changes: 0 additions & 12 deletions aquadoggo/src/graphql/mutations/publish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,18 +680,6 @@ mod tests {
},
"Previous operation 0020b177ec1bf26dfb3b7010d473e6d44713b29b765b99c6e60ecbfae742de496543 not found in store"
)]
#[case::claimed_log_id_does_not_match_expected(
&entry_signed_encoded_unvalidated(
1,
2,
None,
None,
Some(EncodedOperation::from_bytes(&OPERATION_ENCODED)),
key_pair(PRIVATE_KEY)
).to_string(),
&OPERATION_ENCODED,
"Entry's claimed log id of 2 does not match expected next log id of 1 for given public key"
)]
fn validation_of_entry_and_operation_values(
#[case] entry_encoded: &str,
#[case] encoded_operation: &[u8],
Expand Down
145 changes: 16 additions & 129 deletions aquadoggo/src/replication/ingest.rs
Original file line number Diff line number Diff line change
@@ -1,114 +1,19 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use log::trace;
use p2panda_rs::api::validation::{
ensure_document_not_deleted, get_checked_document_id_for_view_id, get_expected_skiplink,
is_next_seq_num, validate_claimed_schema_id,
};
use p2panda_rs::api::DomainError;
use p2panda_rs::document::DocumentId;
use p2panda_rs::entry::decode::decode_entry;
use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry};
use p2panda_rs::api::publish;
use p2panda_rs::entry::traits::AsEncodedEntry;
use p2panda_rs::entry::EncodedEntry;
use p2panda_rs::operation::decode::decode_operation;
use p2panda_rs::operation::plain::PlainOperation;
use p2panda_rs::operation::traits::{AsOperation, Schematic};
use p2panda_rs::operation::validate::validate_operation_with_entry;
use p2panda_rs::operation::{EncodedOperation, Operation, OperationAction, OperationId};
use p2panda_rs::schema::Schema;
use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore};
use p2panda_rs::operation::traits::Schematic;
use p2panda_rs::operation::{EncodedOperation, OperationId};
use p2panda_rs::storage_provider::traits::EntryStore;

use crate::bus::{ServiceMessage, ServiceSender};
use crate::db::SqlStore;
use crate::replication::errors::IngestError;
use crate::schema::SchemaProvider;

// @TODO: This method exists in `p2panda-rs`, we need to make it public there
async fn validate_entry_and_operation<S: EntryStore + OperationStore + LogStore>(
store: &S,
schema: &Schema,
entry: &impl AsEntry,
encoded_entry: &impl AsEncodedEntry,
plain_operation: &PlainOperation,
encoded_operation: &EncodedOperation,
) -> Result<(Operation, OperationId), DomainError> {
// Verify that the claimed seq num matches the expected seq num for this public_key and log.
let latest_entry = store
.get_latest_entry(entry.public_key(), entry.log_id())
.await?;
let latest_seq_num = latest_entry.as_ref().map(|entry| entry.seq_num());
is_next_seq_num(latest_seq_num, entry.seq_num())?;

// If a skiplink is claimed, get the expected skiplink from the database, errors if it can't be found.
let skiplink = match entry.skiplink() {
Some(_) => Some(
get_expected_skiplink(store, entry.public_key(), entry.log_id(), entry.seq_num())
.await?,
),
None => None,
};

// Construct params as `validate_operation_with_entry` expects.
let skiplink_params = skiplink.as_ref().map(|entry| {
let hash = entry.hash();
(entry.clone(), hash)
});

// The backlink for this entry is the latest entry from this public key's log.
let backlink_params = latest_entry.as_ref().map(|entry| {
let hash = entry.hash();
(entry.clone(), hash)
});

// Perform validation of the entry and it's operation.
let (operation, operation_id) = validate_operation_with_entry(
entry,
encoded_entry,
skiplink_params.as_ref().map(|(entry, hash)| (entry, hash)),
backlink_params.as_ref().map(|(entry, hash)| (entry, hash)),
plain_operation,
encoded_operation,
schema,
)?;

Ok((operation, operation_id))
}

// @TODO: This method exists in `p2panda-rs`, we need to make it public there
async fn determine_document_id<S: OperationStore>(
store: &S,
operation: &impl AsOperation,
operation_id: &OperationId,
) -> Result<DocumentId, DomainError> {
let document_id = match operation.action() {
OperationAction::Create => {
// Derive the document id for this new document.
Ok::<DocumentId, DomainError>(DocumentId::new(operation_id))
}
_ => {
// We can unwrap previous operations here as we know all UPDATE and DELETE operations contain them.
let previous = operation.previous().unwrap();

// Validate claimed schema for this operation matches the expected found in the
// previous operations.
validate_claimed_schema_id(store, &operation.schema_id(), &previous).await?;

// Get the document_id for the document_view_id contained in previous operations.
// This performs several validation steps (check method doc string).
let document_id = get_checked_document_id_for_view_id(store, &previous).await?;

Ok(document_id)
}
}?;

// Ensure the document isn't deleted.
ensure_document_not_deleted(store, &document_id)
.await
.map_err(|_| DomainError::DeletedDocument)?;

Ok(document_id)
}

#[derive(Debug, Clone)]
pub struct SyncIngest {
tx: ServiceSender,
Expand All @@ -129,8 +34,6 @@ impl SyncIngest {
encoded_entry: &EncodedEntry,
encoded_operation: &EncodedOperation,
) -> Result<(), IngestError> {
let entry = decode_entry(encoded_entry)?;

trace!("Received entry and operation: {}", encoded_entry.hash());

// Check if we already have this entry. This can happen if another peer sent it to us
Expand All @@ -152,42 +55,26 @@ impl SyncIngest {
.await
.ok_or_else(|| IngestError::UnsupportedSchema)?;

let (operation, operation_id) = validate_entry_and_operation(
/////////////////////////////////////
// PUBLISH THE ENTRY AND OPERATION //
/////////////////////////////////////

let _ = publish(
store,
&schema,
&entry,
encoded_entry,
&plain_operation,
encoded_operation,
)
.await?;

let document_id = determine_document_id(store, &operation, &operation_id).await?;

// If the entries' seq num is 1 we insert a new log here
if entry.seq_num().is_first() {
store
.insert_log(
entry.log_id(),
entry.public_key(),
Schematic::schema_id(&operation),
&document_id,
)
.await
.expect("Fatal database error");
}
////////////////////////////////////////
// SEND THE OPERATION TO MATERIALIZER //
////////////////////////////////////////

store
.insert_entry(&entry, encoded_entry, Some(encoded_operation))
.await
.expect("Fatal database error");

store
.insert_operation(&operation_id, entry.public_key(), &operation, &document_id)
.await
.expect("Fatal database error");
// Send new operation on service communication bus, this will arrive eventually at
// the materializer service

// Inform other services about received data
let operation_id: OperationId = encoded_entry.hash().into();

if self
Expand All @@ -197,7 +84,7 @@ impl SyncIngest {
{
// Silently fail here as we don't mind if there are no subscribers. We have
// tests in other places to check if messages arrive.
}
};

Ok(())
}
Expand Down
40 changes: 35 additions & 5 deletions aquadoggo/src/replication/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,10 @@ mod tests {
use crate::replication::manager::INITIAL_SESSION_ID;
use crate::replication::{Message, Mode, SessionState, TargetSet};
use crate::test_utils::helpers::random_target_set;
use crate::test_utils::{populate_store_config, test_runner, TestNode};
use crate::test_utils::{
populate_and_materialize, populate_store_config, test_runner, test_runner_with_manager,
TestNode, TestNodeManager,
};

use super::Session;

Expand Down Expand Up @@ -221,9 +224,7 @@ mod tests {
#[with(5, 2, 1)]
config: PopulateStoreConfig,
) {
test_runner(move |node: TestNode| async move {
populate_store(&node.context.store, &config).await;

test_runner_with_manager(move |manager: TestNodeManager| async move {
let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]);
let mut session = Session::new(
&INITIAL_SESSION_ID,
Expand All @@ -233,13 +234,42 @@ mod tests {
false,
);

let mut node_a = manager.create().await;
populate_and_materialize(&mut node_a, &config).await;

let response_messages = session
.handle_message(&node.context.store, &Message::Have(vec![]))
.handle_message(&node_a.context.store, &Message::Have(vec![]))
.await
.unwrap();

// This node has materialized their documents already so we expect the following
// messages.
//
// 1x Have + 10x Entry + 1x SyncDone = 12 messages
assert_eq!(response_messages.len(), 12);

let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]);
let mut session = Session::new(
&INITIAL_SESSION_ID,
&target_set,
&Mode::LogHeight,
true,
false,
);

let node_b: TestNode = manager.create().await;
populate_store(&node_b.context.store, &config).await;

let response_messages = session
.handle_message(&node_b.context.store, &Message::Have(vec![]))
.await
.unwrap();

// This node has not materialized any documents so they can't compose their return
// entries yet.
//
// 1x Have + 1x SyncDone = 2 messages
assert_eq!(response_messages.len(), 2);
});
}
}
Loading
Loading