diff --git a/CHANGELOG.md b/CHANGELOG.md index 05d209402..24998e759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - SQL query for collections [#311](https://github.com/p2panda/aquadoggo/pull/311) - Add custom validation to GraphQL scalar types [#318](https://github.com/p2panda/aquadoggo/pull/318) - Introduce property tests for GraphQL query API with `proptest` [#338](https://github.com/p2panda/aquadoggo/pull/338) +- Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363) ### Changed diff --git a/Cargo.lock b/Cargo.lock index 37d51e3c8..d54e59690 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1869,9 +1869,9 @@ checksum = "eabb4a44450da02c90444cf74558da904edde8fb4e9035a9a6a4e15445af0bd7" [[package]] name = "handlebars" -version = "4.3.6" +version = "4.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "035ef95d03713f2c347a72547b7cd38cbc9af7cd51e6099fb62d586d4a6dee3a" +checksum = "83c3372087601b532857d332f5957cbae686da52bb7810bf038c3e3c3cc2fa0d" dependencies = [ "log", "pest", @@ -3229,7 +3229,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.7.0" -source = "git+https://github.com/p2panda/p2panda?rev=50f9f0b02570c8ed895baf499cb0d98bbadd1687#50f9f0b02570c8ed895baf499cb0d98bbadd1687" +source = "git+https://github.com/p2panda/p2panda?rev=679107b636eea39e7713d9b21ac69df28aaad97d#679107b636eea39e7713d9b21ac69df28aaad97d" dependencies = [ "arrayvec 0.5.2", "async-trait", @@ -4141,9 +4141,9 @@ checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" [[package]] name = "serde" -version = "1.0.162" +version = "1.0.163" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71b2f6e1ab5c2b98c05f0f35b236b22e8df7ead6ffbf51d7808da7f8817e7ab6" +checksum = "2113ab51b87a539ae008b5c6c02dc020ffa39afd2d83cffcb3f4eb2722cebec2" dependencies = [ "serde_derive", ] @@ -4179,9 +4179,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.162" +version = "1.0.163" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2a0814352fd64b58489904a44ea8d90cb1a91dcb6b4f5ebabc32c8318e93cb6" +checksum = "8c805777e3930c8883389c602315a24224bcc738b63905ef87cd1420353ea93e" dependencies = [ "proc-macro2", "quote", diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index b53fade83..d43e8d0ce 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -50,7 +50,7 @@ lipmaa-link = "0.2.2" log = "0.4.17" once_cell = "1.17.0" openssl-probe = "0.1.5" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [ "storage-provider", ] } serde = { version = "1.0.152", features = ["derive"] } @@ -81,7 +81,7 @@ env_logger = "0.9.0" http = "0.2.9" hyper = "0.14.19" once_cell = "1.17.0" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [ "test-utils", "storage-provider", ] } diff --git a/aquadoggo/src/lib.rs b/aquadoggo/src/lib.rs index 9a73fe762..30c2afdde 100644 --- a/aquadoggo/src/lib.rs +++ b/aquadoggo/src/lib.rs @@ -52,6 +52,9 @@ mod manager; mod materializer; mod network; mod node; +// @TODO: Remove this as soon as we use the replication code +#[allow(dead_code)] +mod replication; mod schema; #[cfg(test)] mod test_utils; diff --git a/aquadoggo/src/manager.rs b/aquadoggo/src/manager.rs index 339476256..16da4106f 100644 --- a/aquadoggo/src/manager.rs +++ b/aquadoggo/src/manager.rs @@ -13,8 +13,6 @@ use triggered::{Listener, Trigger}; /// Sends messages through the communication bus between services. pub type Sender = broadcast::Sender; -// pub type ServiceReady = oneshot::channel<()>; - // Receives ready signal from services once they are ready to handle messages on the communication bus. pub type ServiceReadyReceiver = oneshot::Receiver<()>; diff --git a/aquadoggo/src/replication/errors.rs b/aquadoggo/src/replication/errors.rs new file mode 100644 index 000000000..ba7110b0b --- /dev/null +++ b/aquadoggo/src/replication/errors.rs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ReplicationError { + #[error("Remote peer requested unsupported replication mode")] + UnsupportedMode, + + #[error("Tried to initialise duplicate inbound replication session with id {0}")] + DuplicateInboundRequest(u64), + + #[error("Tried to initialise duplicate outbound replication session with id {0}")] + DuplicateOutboundRequest(u64), +} diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs new file mode 100644 index 000000000..b289f92bd --- /dev/null +++ b/aquadoggo/src/replication/manager.rs @@ -0,0 +1,286 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::HashMap; + +use anyhow::Result; + +use crate::replication::errors::ReplicationError; +use crate::replication::{Mode, Session, SessionId, SessionState, SyncMessage, TargetSet}; + +pub const INITIAL_SESSION_ID: SessionId = 0; + +pub const SUPPORTED_MODES: [Mode; 1] = [Mode::Naive]; + +#[derive(Debug)] +pub struct SyncManager

{ + local_peer: P, + sessions: HashMap>, +} + +impl

SyncManager

+where + P: Clone + std::hash::Hash + Eq + PartialOrd, +{ + pub fn new(local_peer: P) -> Self { + Self { + local_peer, + sessions: HashMap::new(), + } + } + + /// Get all sessions related to a remote peer. + fn get_sessions(&self, remote_peer: &P) -> Vec { + self.sessions + .get(remote_peer) + // Always return an array, even when it is empty + .unwrap_or(&vec![]) + .to_owned() + } + + /// Register a new session in manager. + fn insert_session( + &mut self, + remote_peer: &P, + session_id: &SessionId, + target_set: &TargetSet, + mode: &Mode, + local: bool, + ) { + let session = Session::new(session_id, target_set, mode, local); + + if let Some(sessions) = self.sessions.get_mut(remote_peer) { + sessions.push(session); + } else { + self.sessions.insert(remote_peer.clone(), vec![session]); + } + } + + pub fn initiate_session( + &mut self, + remote_peer: &P, + target_set: &TargetSet, + mode: &Mode, + ) -> Result<(), ReplicationError> { + SyncManager::

::is_mode_supported(mode)?; + + let sessions = self.get_sessions(remote_peer); + + // Make sure to not have duplicate sessions over the same schema ids + let session = sessions + .iter() + .find(|session| session.target_set() == *target_set); + + if let Some(session) = session { + return Err(ReplicationError::DuplicateOutboundRequest(session.id)); + } + + // Determine next id for upcoming session + let session_id = { + if let Some(session) = sessions.last() { + session.id + 1 + } else { + INITIAL_SESSION_ID + } + }; + + self.insert_session(remote_peer, &session_id, target_set, mode, true); + + Ok(()) + } + + fn is_mode_supported(mode: &Mode) -> Result<(), ReplicationError> { + if !SUPPORTED_MODES.contains(mode) { + return Err(ReplicationError::UnsupportedMode); + } + + Ok(()) + } + + fn handle_duplicate_session( + &mut self, + remote_peer: &P, + target_set: &TargetSet, + index: usize, + session: &Session, + ) -> Result<(), ReplicationError> { + let accept_inbound_request = match session.state { + // Handle only duplicate sessions when they haven't started yet + SessionState::Pending => { + if &self.local_peer < remote_peer { + // Drop our pending session + let sessions = self + .sessions + .get_mut(remote_peer) + .expect("Expected at least one pending session"); + sessions.remove(index); + + // Accept the inbound request + true + } else { + // Keep our pending session, ignore inbound request + false + } + } + _ => return Err(ReplicationError::DuplicateInboundRequest(session.id)), + }; + + if accept_inbound_request { + self.insert_session(remote_peer, &session.id, target_set, &session.mode(), false); + + // @TODO: Session needs to generate some messages on creation and + // it will pass them back up to us to then forward onto + // the swarm + + // If we dropped our own outbound session request regarding a different target set, we + // need to re-establish it with another session id, otherwise it would get lost + if session.target_set() != *target_set { + self.initiate_session(remote_peer, target_set, &session.mode())?; + // @TODO: Again, the new session will generate a message + // which we send onto the swarm + } + } + + Ok(()) + } + + fn handle_sync_request( + &mut self, + remote_peer: &P, + mode: &Mode, + session_id: &SessionId, + target_set: &TargetSet, + ) -> Result<(), ReplicationError> { + SyncManager::

::is_mode_supported(mode)?; + + let sessions = self.get_sessions(remote_peer); + + // Check if a session with this id already exists for this peer, this can happen if both + // peers started to initiate a session at the same time, we can try to resolve this + if let Some((index, session)) = sessions + .iter() + .enumerate() + .find(|(_, session)| session.id == *session_id && session.local) + { + return self.handle_duplicate_session(remote_peer, target_set, index, session); + } + + // Check if a session with this target set already exists for this peer, this always gets + // rejected because it is clearly redundant + if let Some(session) = sessions + .iter() + .find(|session| session.target_set() == *target_set) + { + return Err(ReplicationError::DuplicateInboundRequest(session.id)); + } + + self.insert_session(remote_peer, session_id, target_set, mode, false); + + Ok(()) + } + + pub fn handle_message( + &mut self, + remote_peer: &P, + message: &SyncMessage, + ) -> Result<(), ReplicationError> { + match message { + SyncMessage::SyncRequest(mode, session_id, target_set) => { + self.handle_sync_request(remote_peer, mode, session_id, target_set) + } + SyncMessage::Other => todo!(), + } + } +} + +#[cfg(test)] +mod tests { + use p2panda_rs::schema::{SchemaId, SchemaName}; + use p2panda_rs::test_utils::fixtures::random_document_view_id; + use rstest::rstest; + + use crate::replication::errors::ReplicationError; + use crate::replication::{Mode, SyncMessage, TargetSet}; + + use super::{SyncManager, INITIAL_SESSION_ID}; + + const PEER_ID_LOCAL: &'static str = "local"; + const PEER_ID_REMOTE: &'static str = "remote"; + + #[rstest::fixture] + fn random_target_set() -> TargetSet { + let document_view_id = random_document_view_id(); + let schema_id = + SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id); + TargetSet::new(&[schema_id]) + } + + #[rstest] + fn initiate_outbound_session( + #[from(random_target_set)] target_set_1: TargetSet, + #[from(random_target_set)] target_set_2: TargetSet, + ) { + let mode = Mode::Naive; + + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode); + assert!(result.is_ok()); + + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2, &mode); + assert!(result.is_ok()); + + // Expect error when initiating a session for the same target set + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode); + assert!(matches!( + result, + Err(ReplicationError::DuplicateOutboundRequest(0)) + )); + } + + #[rstest] + fn initiate_inbound_session( + #[from(random_target_set)] target_set_1: TargetSet, + #[from(random_target_set)] target_set_2: TargetSet, + ) { + let mut manager = SyncManager::new(PEER_ID_LOCAL); + + let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + let message = SyncMessage::SyncRequest(Mode::Naive, 1, target_set_2.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + // Reject attempt to create session again + let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(matches!( + result, + Err(ReplicationError::DuplicateInboundRequest(0)) + )); + + // Reject different session concerning same target set + let message = SyncMessage::SyncRequest(Mode::Naive, 2, target_set_2.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(matches!( + result, + Err(ReplicationError::DuplicateInboundRequest(1)) + )); + } + + #[rstest] + fn inbound_checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) { + // Should not fail when requesting supported replication mode + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + // Should fail when requesting unsupported replication mode + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let message = + SyncMessage::SyncRequest(Mode::SetReconciliation, INITIAL_SESSION_ID, target_set); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_err()); + } +} diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs new file mode 100644 index 000000000..958e2c205 --- /dev/null +++ b/aquadoggo/src/replication/message.rs @@ -0,0 +1,15 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use crate::replication::{Mode, SessionId, TargetSet}; + +#[derive(Debug)] +pub enum SyncMessage { + SyncRequest(Mode, SessionId, TargetSet), + Other, +} + +#[derive(Clone, Debug)] +pub enum StrategyMessage { + Have, + Entry, +} diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs new file mode 100644 index 000000000..c43557e14 --- /dev/null +++ b/aquadoggo/src/replication/mod.rs @@ -0,0 +1,83 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use p2panda_rs::schema::SchemaId; + +pub mod errors; +mod manager; +mod message; +mod session; +mod strategies; +pub mod traits; + +pub use manager::SyncManager; +pub use message::{StrategyMessage, SyncMessage}; +pub use session::{Session, SessionId, SessionState}; +pub use strategies::{NaiveStrategy, SetReconciliationStrategy, StrategyResult}; + +/// De-duplicated and sorted set of schema ids which define the target data for the replication +/// session. +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct TargetSet(Vec); + +impl TargetSet { + pub fn new(schema_ids: &[SchemaId]) -> Self { + // Remove duplicates + let mut deduplicated_set: Vec = Vec::new(); + for schema_id in schema_ids { + if !deduplicated_set.contains(schema_id) { + deduplicated_set.push(schema_id.clone()); + } + } + + // Sort schema ids to compare target sets easily + deduplicated_set.sort(); + + Self(deduplicated_set) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Mode { + Naive, + SetReconciliation, + Unknown, +} + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentViewId; + use p2panda_rs::schema::{SchemaId, SchemaName}; + use p2panda_rs::test_utils::fixtures::random_document_view_id; + use rstest::rstest; + + use super::TargetSet; + + #[rstest] + fn compare_target_sets( + #[from(random_document_view_id)] document_view_id_1: DocumentViewId, + #[from(random_document_view_id)] document_view_id_2: DocumentViewId, + ) { + let schema_id_1 = + SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id_1); + let schema_id_2 = + SchemaId::new_application(&SchemaName::new("profiles").unwrap(), &document_view_id_2); + + // Sort schema ids + assert_eq!( + TargetSet::new(&[schema_id_1.clone(), schema_id_2.clone()]), + TargetSet::new(&[schema_id_2.clone(), schema_id_1.clone()]), + ); + + // De-duplicate schema ids + assert_eq!( + TargetSet::new(&[schema_id_1.clone(), schema_id_1.clone()]), + TargetSet::new(&[schema_id_1.clone()]), + ); + + // Distinct different target sets from each other + assert_ne!( + TargetSet::new(&[schema_id_1.clone()]), + TargetSet::new(&[schema_id_2.clone()]), + ); + } +} diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs new file mode 100644 index 000000000..9bb54a5b0 --- /dev/null +++ b/aquadoggo/src/replication/session.rs @@ -0,0 +1,55 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use crate::replication::traits::Strategy; +use crate::replication::{Mode, NaiveStrategy, SetReconciliationStrategy, TargetSet}; + +pub type SessionId = u64; + +#[derive(Clone, Debug)] +pub enum SessionState { + Pending, + Established, + Done, +} + +#[derive(Clone, Debug)] +pub struct Session { + // @TODO: Access to the store + // store: Store + /// Unique identifier of this session for that peer. + pub id: SessionId, + + /// Current state of this session. + pub state: SessionState, + + /// True if session was established by us. + pub local: bool, + + /// Replication strategy handler. + pub strategy: Box, +} + +impl Session { + pub fn new(id: &SessionId, target_set: &TargetSet, mode: &Mode, local: bool) -> Self { + let strategy: Box = match mode { + Mode::Naive => Box::new(NaiveStrategy::new(target_set, mode)), + Mode::SetReconciliation => Box::new(SetReconciliationStrategy::new()), + Mode::Unknown => panic!("Unknown replication mode found"), + }; + + Self { + id: *id, + state: SessionState::Pending, + strategy, + local, + } + } + + pub fn mode(&self) -> Mode { + self.strategy.mode() + } + + pub fn target_set(&self) -> TargetSet { + self.strategy.target_set() + } +} diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs new file mode 100644 index 000000000..bafbf12fa --- /dev/null +++ b/aquadoggo/src/replication/strategies.rs @@ -0,0 +1,95 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; +use async_trait::async_trait; + +use crate::replication::traits::Strategy; +use crate::replication::{Mode, StrategyMessage, TargetSet}; + +#[derive(Clone, Debug)] +pub struct StrategyResult { + is_done: bool, + messages: Vec, +} + +#[derive(Clone, Debug)] +pub struct NaiveStrategy { + target_set: TargetSet, + mode: Mode, +} + +impl NaiveStrategy { + pub fn new(target_set: &TargetSet, mode: &Mode) -> Self { + Self { + target_set: target_set.clone(), + mode: mode.clone(), + } + } +} + +#[async_trait] +impl Strategy for NaiveStrategy { + fn mode(&self) -> Mode { + self.mode.clone() + } + + fn target_set(&self) -> TargetSet { + self.target_set.clone() + } + + async fn initial_messages(&self) -> Vec { + // TODO: Access the store and compose a have message which contains our local log heights over + // the TargetSet. + let _target_set = self.target_set(); + + vec![StrategyMessage::Have] + } + + async fn handle_message(&self, message: StrategyMessage) -> Result { + // TODO: Verify that the TargetSet contained in the message is a sub-set of the passed + // local TargetSet. + let _target_set = self.target_set(); + let messages = Vec::new(); + let mut is_done = false; + + match message { + StrategyMessage::Have => { + // Compose Have message and push to messages + is_done = true; + } + StrategyMessage::Entry => { + // self.handle_entry(..) + } + } + + Ok(StrategyResult { is_done, messages }) + } +} + +#[derive(Clone, Debug)] +pub struct SetReconciliationStrategy(); + +impl SetReconciliationStrategy { + pub fn new() -> Self { + Self() + } +} + +#[async_trait] +impl Strategy for SetReconciliationStrategy { + fn mode(&self) -> Mode { + todo!() + } + + fn target_set(&self) -> TargetSet { + todo!() + } + + async fn initial_messages(&self) -> Vec { + todo!() + } + + async fn handle_message(&self, _message: StrategyMessage) -> Result { + todo!() + } +} diff --git a/aquadoggo/src/replication/traits.rs b/aquadoggo/src/replication/traits.rs new file mode 100644 index 000000000..323bb2931 --- /dev/null +++ b/aquadoggo/src/replication/traits.rs @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; +use async_trait::async_trait; +use p2panda_rs::schema::SchemaId; + +use crate::replication::{Mode, StrategyMessage, StrategyResult, TargetSet}; + +#[async_trait] +pub trait Strategy: std::fmt::Debug + StrategyClone { + /// Replication mode of this strategy. + fn mode(&self) -> Mode; + + /// Target set replication is occurring over. + fn target_set(&self) -> TargetSet; + + // Generate initial messages. + // + // @TODO: we want to pass the store in here too eventually. + async fn initial_messages(&self) -> Vec; + + // Handle incoming message and return response. + // + // @TODO: we want to pass the store in here too eventually. + async fn handle_message(&self, message: StrategyMessage) -> Result; + + // Validate and store entry and operation. + // + // @TODO: we want to pass the store in here too eventually. + async fn handle_entry( + &self, + _schema_id: &SchemaId, + _entry_bytes: Vec, + _operation_bytes: Vec, + ) -> Result<()> { + // Validation: + // Check against schema_id and target_set if entry is what we've asked for + let _target_set = self.target_set(); + + // Further validation through our publish api stuff (?!) + + // Have an entry waiting lobby service here, to batch stuff?! + // Nice to check certificate pool in one go. + // Nice to not put too much pressure on the database. + Ok(()) + } +} + +/// This is a little trick so we can clone trait objects. +pub trait StrategyClone { + fn clone_box(&self) -> Box; +} + +impl StrategyClone for T +where + T: 'static + Strategy + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +// We can now implement Clone manually by forwarding to clone_box. +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +}