diff --git a/Cargo.lock b/Cargo.lock index bd0962183..39a13c2d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2908,7 +2908,7 @@ checksum = "ceedf44fb00f2d1984b0bc98102627ce622e083e49a5bacdb3e514fa4238e267" [[package]] name = "p2panda-rs" version = "0.7.0" -source = "git+https://github.com/p2panda/p2panda?rev=ae60bf754a60a1daf9c6862e9ae51ee7501b007f#ae60bf754a60a1daf9c6862e9ae51ee7501b007f" +source = "git+https://github.com/p2panda/p2panda?rev=a535ee229b367c79f18a50626c6bcab61581e32b#a535ee229b367c79f18a50626c6bcab61581e32b" dependencies = [ "arrayvec 0.5.2", "async-trait", diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index efd941f3a..a25f01655 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -51,7 +51,7 @@ lipmaa-link = "0.2.2" log = "0.4.19" once_cell = "1.18.0" openssl-probe = "0.1.5" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b", features = [ "storage-provider", ] } serde = { version = "1.0.152", features = ["derive"] } @@ -84,7 +84,7 @@ http = "0.2.9" hyper = "0.14.19" libp2p-swarm-test = "0.2.0" once_cell = "1.17.0" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b", features = [ "test-utils", "storage-provider", ] } diff --git a/aquadoggo/src/graphql/mutations/publish.rs b/aquadoggo/src/graphql/mutations/publish.rs index 03a8e3273..774a963ba 100644 --- a/aquadoggo/src/graphql/mutations/publish.rs +++ b/aquadoggo/src/graphql/mutations/publish.rs @@ -680,18 +680,6 @@ mod tests { }, "Previous operation 0020b177ec1bf26dfb3b7010d473e6d44713b29b765b99c6e60ecbfae742de496543 not found in store" )] - #[case::claimed_log_id_does_not_match_expected( - &entry_signed_encoded_unvalidated( - 1, - 2, - None, - None, - Some(EncodedOperation::from_bytes(&OPERATION_ENCODED)), - key_pair(PRIVATE_KEY) - ).to_string(), - &OPERATION_ENCODED, - "Entry's claimed log id of 2 does not match expected next log id of 1 for given public key" - )] fn validation_of_entry_and_operation_values( #[case] entry_encoded: &str, #[case] encoded_operation: &[u8], diff --git a/aquadoggo/src/replication/ingest.rs b/aquadoggo/src/replication/ingest.rs index eb3c7a5e4..e5f01adbf 100644 --- a/aquadoggo/src/replication/ingest.rs +++ b/aquadoggo/src/replication/ingest.rs @@ -1,114 +1,19 @@ // SPDX-License-Identifier: AGPL-3.0-or-later use log::trace; -use p2panda_rs::api::validation::{ - ensure_document_not_deleted, get_checked_document_id_for_view_id, get_expected_skiplink, - is_next_seq_num, validate_claimed_schema_id, -}; -use p2panda_rs::api::DomainError; -use p2panda_rs::document::DocumentId; -use p2panda_rs::entry::decode::decode_entry; -use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry}; +use p2panda_rs::api::publish; +use p2panda_rs::entry::traits::AsEncodedEntry; use p2panda_rs::entry::EncodedEntry; use p2panda_rs::operation::decode::decode_operation; -use p2panda_rs::operation::plain::PlainOperation; -use p2panda_rs::operation::traits::{AsOperation, Schematic}; -use p2panda_rs::operation::validate::validate_operation_with_entry; -use p2panda_rs::operation::{EncodedOperation, Operation, OperationAction, OperationId}; -use p2panda_rs::schema::Schema; -use p2panda_rs::storage_provider::traits::{EntryStore, LogStore, OperationStore}; +use p2panda_rs::operation::traits::Schematic; +use p2panda_rs::operation::{EncodedOperation, OperationId}; +use p2panda_rs::storage_provider::traits::EntryStore; use crate::bus::{ServiceMessage, ServiceSender}; use crate::db::SqlStore; use crate::replication::errors::IngestError; use crate::schema::SchemaProvider; -// @TODO: This method exists in `p2panda-rs`, we need to make it public there -async fn validate_entry_and_operation( - store: &S, - schema: &Schema, - entry: &impl AsEntry, - encoded_entry: &impl AsEncodedEntry, - plain_operation: &PlainOperation, - encoded_operation: &EncodedOperation, -) -> Result<(Operation, OperationId), DomainError> { - // Verify that the claimed seq num matches the expected seq num for this public_key and log. - let latest_entry = store - .get_latest_entry(entry.public_key(), entry.log_id()) - .await?; - let latest_seq_num = latest_entry.as_ref().map(|entry| entry.seq_num()); - is_next_seq_num(latest_seq_num, entry.seq_num())?; - - // If a skiplink is claimed, get the expected skiplink from the database, errors if it can't be found. - let skiplink = match entry.skiplink() { - Some(_) => Some( - get_expected_skiplink(store, entry.public_key(), entry.log_id(), entry.seq_num()) - .await?, - ), - None => None, - }; - - // Construct params as `validate_operation_with_entry` expects. - let skiplink_params = skiplink.as_ref().map(|entry| { - let hash = entry.hash(); - (entry.clone(), hash) - }); - - // The backlink for this entry is the latest entry from this public key's log. - let backlink_params = latest_entry.as_ref().map(|entry| { - let hash = entry.hash(); - (entry.clone(), hash) - }); - - // Perform validation of the entry and it's operation. - let (operation, operation_id) = validate_operation_with_entry( - entry, - encoded_entry, - skiplink_params.as_ref().map(|(entry, hash)| (entry, hash)), - backlink_params.as_ref().map(|(entry, hash)| (entry, hash)), - plain_operation, - encoded_operation, - schema, - )?; - - Ok((operation, operation_id)) -} - -// @TODO: This method exists in `p2panda-rs`, we need to make it public there -async fn determine_document_id( - store: &S, - operation: &impl AsOperation, - operation_id: &OperationId, -) -> Result { - let document_id = match operation.action() { - OperationAction::Create => { - // Derive the document id for this new document. - Ok::(DocumentId::new(operation_id)) - } - _ => { - // We can unwrap previous operations here as we know all UPDATE and DELETE operations contain them. - let previous = operation.previous().unwrap(); - - // Validate claimed schema for this operation matches the expected found in the - // previous operations. - validate_claimed_schema_id(store, &operation.schema_id(), &previous).await?; - - // Get the document_id for the document_view_id contained in previous operations. - // This performs several validation steps (check method doc string). - let document_id = get_checked_document_id_for_view_id(store, &previous).await?; - - Ok(document_id) - } - }?; - - // Ensure the document isn't deleted. - ensure_document_not_deleted(store, &document_id) - .await - .map_err(|_| DomainError::DeletedDocument)?; - - Ok(document_id) -} - #[derive(Debug, Clone)] pub struct SyncIngest { tx: ServiceSender, @@ -129,8 +34,6 @@ impl SyncIngest { encoded_entry: &EncodedEntry, encoded_operation: &EncodedOperation, ) -> Result<(), IngestError> { - let entry = decode_entry(encoded_entry)?; - trace!("Received entry and operation: {}", encoded_entry.hash()); // Check if we already have this entry. This can happen if another peer sent it to us @@ -152,42 +55,26 @@ impl SyncIngest { .await .ok_or_else(|| IngestError::UnsupportedSchema)?; - let (operation, operation_id) = validate_entry_and_operation( + ///////////////////////////////////// + // PUBLISH THE ENTRY AND OPERATION // + ///////////////////////////////////// + + let _ = publish( store, &schema, - &entry, encoded_entry, &plain_operation, encoded_operation, ) .await?; - let document_id = determine_document_id(store, &operation, &operation_id).await?; - - // If the entries' seq num is 1 we insert a new log here - if entry.seq_num().is_first() { - store - .insert_log( - entry.log_id(), - entry.public_key(), - Schematic::schema_id(&operation), - &document_id, - ) - .await - .expect("Fatal database error"); - } + //////////////////////////////////////// + // SEND THE OPERATION TO MATERIALIZER // + //////////////////////////////////////// - store - .insert_entry(&entry, encoded_entry, Some(encoded_operation)) - .await - .expect("Fatal database error"); - - store - .insert_operation(&operation_id, entry.public_key(), &operation, &document_id) - .await - .expect("Fatal database error"); + // Send new operation on service communication bus, this will arrive eventually at + // the materializer service - // Inform other services about received data let operation_id: OperationId = encoded_entry.hash().into(); if self @@ -197,7 +84,7 @@ impl SyncIngest { { // Silently fail here as we don't mind if there are no subscribers. We have // tests in other places to check if messages arrive. - } + }; Ok(()) } diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index 6df3f9272..eccc0372f 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -183,7 +183,10 @@ mod tests { use crate::replication::manager::INITIAL_SESSION_ID; use crate::replication::{Message, Mode, SessionState, TargetSet}; use crate::test_utils::helpers::random_target_set; - use crate::test_utils::{populate_store_config, test_runner, TestNode}; + use crate::test_utils::{ + populate_and_materialize, populate_store_config, test_runner, test_runner_with_manager, + TestNode, TestNodeManager, + }; use super::Session; @@ -221,9 +224,7 @@ mod tests { #[with(5, 2, 1)] config: PopulateStoreConfig, ) { - test_runner(move |node: TestNode| async move { - populate_store(&node.context.store, &config).await; - + test_runner_with_manager(move |manager: TestNodeManager| async move { let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]); let mut session = Session::new( &INITIAL_SESSION_ID, @@ -233,13 +234,42 @@ mod tests { false, ); + let mut node_a = manager.create().await; + populate_and_materialize(&mut node_a, &config).await; + let response_messages = session - .handle_message(&node.context.store, &Message::Have(vec![])) + .handle_message(&node_a.context.store, &Message::Have(vec![])) .await .unwrap(); + // This node has materialized their documents already so we expect the following + // messages. + // // 1x Have + 10x Entry + 1x SyncDone = 12 messages assert_eq!(response_messages.len(), 12); + + let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]); + let mut session = Session::new( + &INITIAL_SESSION_ID, + &target_set, + &Mode::LogHeight, + true, + false, + ); + + let node_b: TestNode = manager.create().await; + populate_store(&node_b.context.store, &config).await; + + let response_messages = session + .handle_message(&node_b.context.store, &Message::Have(vec![])) + .await + .unwrap(); + + // This node has not materialized any documents so they can't compose their return + // entries yet. + // + // 1x Have + 1x SyncDone = 2 messages + assert_eq!(response_messages.len(), 2); }); } } diff --git a/aquadoggo/src/replication/strategies/log_height.rs b/aquadoggo/src/replication/strategies/log_height.rs index 25d800913..4f99a7762 100644 --- a/aquadoggo/src/replication/strategies/log_height.rs +++ b/aquadoggo/src/replication/strategies/log_height.rs @@ -5,17 +5,65 @@ use std::collections::HashMap; use anyhow::Result; use async_trait::async_trait; use log::trace; -use p2panda_rs::entry::traits::AsEntry; +use p2panda_rs::document::DocumentId; +use p2panda_rs::entry::traits::{AsEncodedEntry, AsEntry}; use p2panda_rs::entry::{LogId, SeqNum}; use p2panda_rs::identity::PublicKey; +use p2panda_rs::storage_provider::traits::OperationStore; use p2panda_rs::Human; +use crate::db::types::StorageEntry; use crate::db::SqlStore; use crate::replication::errors::ReplicationError; use crate::replication::strategies::diff_log_heights; use crate::replication::traits::Strategy; use crate::replication::{LogHeights, Message, Mode, StrategyResult, TargetSet}; +type SortedIndex = i32; + +/// Retrieve entries from the store, group the result by document id and then sub-order them by +/// their sorted index. +async fn retrieve_entries( + store: &SqlStore, + remote_needs: &[LogHeights], +) -> Vec<(StorageEntry, DocumentId, SortedIndex)> { + let mut entries = Vec::new(); + + for (public_key, log_heights) in remote_needs { + for (log_id, seq_num) in log_heights { + // Get the entries the remote needs for each log. + let log_entries = store + .get_entries_from(public_key, log_id, seq_num) + .await + .expect("Fatal database error"); + + for entry in log_entries { + // Get the entry as well as we need some additional information in order to + // send the entries in the correct order. + let operation = store + .get_operation(&entry.hash().into()) + .await + .expect("Fatal database error") + .expect("Operation should be in store"); + + // We only send entries if their operation has been materialized. + if let Some(sorted_index) = operation.sorted_index { + entries.push((entry, operation.document_id, sorted_index)); + } + } + } + } + + // Sort all entries by document_id & sorted_index. + entries.sort_by( + |(_, document_id_a, sorted_index_a), (_, document_id_b, sorted_index_b)| { + (document_id_a, sorted_index_a).cmp(&(document_id_b, sorted_index_b)) + }, + ); + + entries +} + #[derive(Clone, Debug)] pub struct LogHeightStrategy { target_set: TargetSet, @@ -32,6 +80,7 @@ impl LogHeightStrategy { } } + // Calculate the heights of all logs which contain contributions to documents in the current `TargetSet`. async fn local_log_heights( &self, store: &SqlStore, @@ -57,42 +106,40 @@ impl LogHeightStrategy { log_heights } + // Prepare entry responses based on a remotes log heights. The response contains all entries + // the remote needs to bring their state in line with our own. The returned entries are + // grouped by document and ordered by the `sorted_index` of the operations they carry. With + // this ordering they can be ingested and validated easily on the remote. async fn entry_responses( &self, store: &SqlStore, remote_log_heights: &[LogHeights], ) -> Vec { - let mut messages = Vec::new(); - + // Get local log heights for the configured target set. let local_log_heights = self.local_log_heights(store).await; + + // Compare local and remote log heights to determine what they need from us. let remote_needs = diff_log_heights( &local_log_heights, &remote_log_heights.iter().cloned().collect(), ); - for (public_key, log_heights) in remote_needs { - for (log_id, seq_num) in log_heights { - let entry_messages: Vec = store - .get_entries_from(&public_key, &log_id, &seq_num) - .await - .expect("Fatal database error") - .iter() - .map(|entry| { - trace!( - "Prepare message containing entry at {:?} on {:?} for {}", - entry.seq_num(), - entry.log_id(), - entry.public_key().display() - ); - - Message::Entry(entry.clone().encoded_entry, entry.payload().cloned()) - }) - .collect(); - messages.extend(entry_messages); - } - } + let entries = retrieve_entries(store, &remote_needs).await; - messages + // Compose the actual messages. + entries + .iter() + .map(|(entry, _, _)| { + trace!( + "Prepare message containing entry at {:?} on {:?} for {}", + entry.seq_num(), + entry.log_id(), + entry.public_key().display() + ); + + Message::Entry(entry.clone().encoded_entry, entry.payload().cloned()) + }) + .collect() } } @@ -155,3 +202,266 @@ impl Strategy for LogHeightStrategy { Ok(result) } } + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentId; + use p2panda_rs::entry::{EncodedEntry, LogId, SeqNum}; + use p2panda_rs::identity::KeyPair; + use p2panda_rs::operation::{ + EncodedOperation, OperationAction, OperationBuilder, OperationValue, + }; + use p2panda_rs::schema::Schema; + use p2panda_rs::test_utils::memory_store::helpers::{send_to_store, PopulateStoreConfig}; + use rstest::rstest; + use tokio::sync::broadcast; + + use crate::materializer::tasks::reduce_task; + use crate::materializer::TaskInput; + use crate::replication::ingest::SyncIngest; + use crate::replication::strategies::log_height::{retrieve_entries, SortedIndex}; + use crate::replication::{LogHeightStrategy, LogHeights, Message, TargetSet}; + use crate::test_utils::{ + populate_and_materialize, populate_store_config, test_runner_with_manager, TestNode, + TestNodeManager, + }; + + // Helper for retrieving operations ordered as expected for replication and testing the result. + async fn retrieve_and_test_entries( + node: &TestNode, + remote_needs: &[LogHeights], + expected_entries: &Vec<(DocumentId, SortedIndex)>, + ) { + // Retrieve the entries. + let entries = retrieve_entries(&node.context.store, &remote_needs).await; + + // Map the returned value into a more easily testable form (we assume the entries are + // correct, here we are testing the entry retrieval logic mainly) + let entries: Vec<(DocumentId, SortedIndex)> = entries + .into_iter() + .map(|(_, document_id, sorted_index)| (document_id, sorted_index)) + .collect(); + + assert_eq!(&entries, expected_entries, "{remote_needs:#?}"); + } + + // Helper for updating a document. + async fn update_and_materialize_document( + node: &TestNode, + key_pair: &KeyPair, + schema: &Schema, + document_id: &DocumentId, + values: Vec<(&str, impl Into)>, + ) { + let values: Vec<(&str, OperationValue)> = values + .into_iter() + .map(|(key, value)| (key, value.into())) + .collect(); + + // Publish an update into this documents past. + let update_operation = OperationBuilder::new(schema.id()) + .action(OperationAction::Update) + .previous(&document_id.as_str().parse().unwrap()) + .fields(&values) + .build() + .unwrap(); + + send_to_store(&node.context.store, &update_operation, schema, key_pair) + .await + .unwrap(); + + // Reduce the updated document. + let input = TaskInput::DocumentId(document_id.clone()); + let _ = reduce_task(node.context.clone(), input).await.unwrap(); + } + + #[rstest] + fn retrieves_and_sorts_entries( + #[from(populate_store_config)] + #[with(3, 1, 2)] + config: PopulateStoreConfig, + ) { + test_runner_with_manager(move |manager: TestNodeManager| async move { + // Create one node and materialize some documents on it. + let mut node = manager.create().await; + let (key_pairs, document_ids) = populate_and_materialize(&mut node, &config).await; + let schema = config.schema.clone(); + + // Collect the values for the two authors and documents. + let key_pair_a = key_pairs.get(0).unwrap(); + let key_pair_b = key_pairs.get(1).unwrap(); + + let document_a = document_ids.get(0).unwrap(); + let document_b = document_ids.get(1).unwrap(); + + // Compose the list of logs the a remote might need. + let mut remote_needs_all = vec![ + ( + key_pair_a.public_key(), + vec![(LogId::default(), SeqNum::default())], + ), + ( + key_pair_b.public_key(), + vec![(LogId::default(), SeqNum::default())], + ), + ]; + + // Compose expected returned entries for each document, identified by the document id + // and sorted_index value. Entries should always be grouped into documents and then + // ordered by their index number. + let expected_entries = vec![ + (document_a.to_owned(), 0), + (document_a.to_owned(), 1), + (document_a.to_owned(), 2), + (document_b.to_owned(), 0), + (document_b.to_owned(), 1), + (document_b.to_owned(), 2), + ]; + + // Retrieve entries and test against expected. + retrieve_and_test_entries(&node, &remote_needs_all, &expected_entries).await; + + // Compose a different "needs" list now with values which should now only return a + // section of the log. + let remote_needs_some = [ + ( + key_pair_a.public_key(), + vec![(LogId::default(), SeqNum::new(3).unwrap())], + ), + ( + key_pair_b.public_key(), + vec![(LogId::default(), SeqNum::new(3).unwrap())], + ), + ]; + + // Compose expected value and test. + let expected_entries = vec![(document_a.to_owned(), 2), (document_b.to_owned(), 2)]; + retrieve_and_test_entries(&node, &remote_needs_some, &expected_entries).await; + + // We also want to make sure to with documents which contain concurrent updates. Here + // we publish two operations into the documents past, causing branches to occur. + + // Create a new author. + let key_pair = KeyPair::new(); + + // Publish a concurrent update. + update_and_materialize_document( + &node, + &key_pair, + &schema, + &document_a, + vec![("username", "よつば")], + ) + .await; + + // Publish another concurrent update. + update_and_materialize_document( + &node, + &key_pair, + &schema, + &document_a, + vec![("username", "ヂャンボ")], + ) + .await; + + // Add the new author to the "needs" list + remote_needs_all.push(( + key_pair.public_key(), + vec![(LogId::default(), SeqNum::default())], + )); + + // Now we expect 5 entries in document a still ordered by sorted index. + let expected_entries = vec![ + (document_a.to_owned(), 0), + (document_a.to_owned(), 1), + (document_a.to_owned(), 2), + (document_a.to_owned(), 3), + (document_a.to_owned(), 4), + (document_b.to_owned(), 0), + (document_b.to_owned(), 1), + (document_b.to_owned(), 2), + ]; + + // Retrieve entries and test against expected. + retrieve_and_test_entries(&node, &remote_needs_all, &expected_entries).await; + }) + } + + #[rstest] + fn entry_responses_can_be_ingested( + #[from(populate_store_config)] + #[with(5, 2, 1)] + config: PopulateStoreConfig, + ) { + test_runner_with_manager(move |manager: TestNodeManager| async move { + let schema = config.schema.clone(); + let target_set = TargetSet::new(&vec![schema.id().to_owned()]); + + let strategy_a = LogHeightStrategy::new(&target_set); + let mut node_a = manager.create().await; + populate_and_materialize(&mut node_a, &config).await; + + let node_b = manager.create().await; + let schema_provider = node_b.context.schema_provider.clone(); + schema_provider.update(schema).await; + let (tx, _) = broadcast::channel(50); + let ingest = SyncIngest::new(schema_provider, tx); + + let entry_responses: Vec<(EncodedEntry, Option)> = strategy_a + .entry_responses(&node_a.context.store, &[]) + .await + .into_iter() + .map(|message| match message { + Message::Entry(entry, operation) => (entry, operation), + _ => panic!(), + }) + .collect(); + + for (entry, operation) in &entry_responses { + let result = ingest + .handle_entry( + &node_b.context.store, + entry, + operation + .as_ref() + .expect("All messages contain an operation"), + ) + .await; + + assert!(result.is_ok()); + } + }); + } + + #[rstest] + fn calculates_log_heights( + #[from(populate_store_config)] + #[with(5, 2, 1)] + config: PopulateStoreConfig, + ) { + test_runner_with_manager(move |manager: TestNodeManager| async move { + let target_set = TargetSet::new(&vec![config.schema.id().to_owned()]); + + let strategy_a = LogHeightStrategy::new(&target_set); + let mut node_a = manager.create().await; + let (key_pairs, _) = populate_and_materialize(&mut node_a, &config).await; + + let log_heights = strategy_a.local_log_heights(&node_a.context.store).await; + + let expected_log_heights = key_pairs + .into_iter() + .map(|key_pair| { + ( + key_pair.public_key(), + vec![ + (LogId::default(), SeqNum::new(5).unwrap()), + (LogId::new(1), SeqNum::new(5).unwrap()), + ], + ) + }) + .collect(); + + assert_eq!(log_heights, expected_log_heights); + }); + } +} diff --git a/aquadoggo_cli/Cargo.toml b/aquadoggo_cli/Cargo.toml index 7fbc86a8d..51192a75e 100644 --- a/aquadoggo_cli/Cargo.toml +++ b/aquadoggo_cli/Cargo.toml @@ -25,7 +25,7 @@ clap = { version = "4.1.8", features = ["derive"] } env_logger = "0.9.0" hex = "0.4.3" libp2p = "0.52.0" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "ae60bf754a60a1daf9c6862e9ae51ee7501b007f" } +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "a535ee229b367c79f18a50626c6bcab61581e32b" } tokio = { version = "1.28.2", features = ["full"] } [dependencies.aquadoggo]