From 544f39e0bb239fe7be7198f5d3313eda6db82719 Mon Sep 17 00:00:00 2001 From: adz Date: Wed, 10 May 2023 13:00:06 +0200 Subject: [PATCH 01/25] Introduce SyncManager handling session initiation and incoming messages --- Cargo.lock | 2 +- aquadoggo/Cargo.toml | 4 +- aquadoggo/src/lib.rs | 1 + aquadoggo/src/replication/mod.rs | 91 ++++++++++++++++++++++++++++++++ 4 files changed, 95 insertions(+), 3 deletions(-) create mode 100644 aquadoggo/src/replication/mod.rs diff --git a/Cargo.lock b/Cargo.lock index fb44952d6..ce016b9a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3286,7 +3286,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.7.0" -source = "git+https://github.com/p2panda/p2panda?rev=00ea109e74f236cbfcc877bfbcd17d4504233896#00ea109e74f236cbfcc877bfbcd17d4504233896" +source = "git+https://github.com/p2panda/p2panda?rev=40d300cfa88279d3baa6e92e6f156f868bf52780#40d300cfa88279d3baa6e92e6f156f868bf52780" dependencies = [ "arrayvec 0.5.2", "async-trait", diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index b6b4bfc6d..0de6f3128 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -50,7 +50,7 @@ lipmaa-link = "^0.2.2" log = "^0.4.17" once_cell = "^1.12.0" openssl-probe = "^0.1.5" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "00ea109e74f236cbfcc877bfbcd17d4504233896", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "40d300cfa88279d3baa6e92e6f156f868bf52780", features = [ "storage-provider", ] } serde = { version = "^1.0.144", features = ["derive"] } @@ -81,7 +81,7 @@ env_logger = "^0.9.0" http = "^0.2.8" hyper = "^0.14.19" once_cell = "^1.12.0" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "00ea109e74f236cbfcc877bfbcd17d4504233896", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "40d300cfa88279d3baa6e92e6f156f868bf52780", features = [ "test-utils", "storage-provider", ] } diff --git a/aquadoggo/src/lib.rs b/aquadoggo/src/lib.rs index 9a73fe762..79270f8d6 100644 --- a/aquadoggo/src/lib.rs +++ b/aquadoggo/src/lib.rs @@ -52,6 +52,7 @@ mod manager; mod materializer; mod network; mod node; +mod replication; mod schema; #[cfg(test)] mod test_utils; diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs new file mode 100644 index 000000000..2e0945da8 --- /dev/null +++ b/aquadoggo/src/replication/mod.rs @@ -0,0 +1,91 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::HashMap; + +use anyhow::{bail, Result}; +use p2panda_rs::schema::SchemaId; + +const INITIAL_SESSION_ID: SessionId = 0; + +type SessionId = u64; + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +pub struct TargetSet(Vec); + +impl TargetSet { + pub fn new(schema_ids: &[SchemaId]) -> Self { + // Sort schema ids to compare target sets easily + let mut schema_ids = schema_ids.to_vec(); + schema_ids.sort(); + + Self(schema_ids) + } +} + +#[derive(Debug)] +pub enum SyncMessage {} + +#[derive(Debug)] +pub struct Session { + id: SessionId, + target_set: TargetSet, +} + +#[derive(Debug)] +pub struct SyncManager

{ + local_peer: P, + sessions: HashMap>, +} + +impl

SyncManager

+where + P: std::hash::Hash + Eq, +{ + pub fn new(local_peer: P) -> Self { + Self { + local_peer, + sessions: HashMap::new(), + } + } + + pub fn initiate_session(&self, remote_peer: P, schema_ids: &[SchemaId]) -> Result<()> { + let target_set = TargetSet::new(schema_ids); + + let session_id = if let Some(sessions) = self.sessions.get(&remote_peer) { + // Make sure to not have duplicate sessions over the same schema ids + let session = sessions + .iter() + .find(|session| session.target_set == target_set); + + if session.is_some() { + bail!("Session concerning same schema ids already exists"); + } + + if let Some(session) = sessions.last() { + session.id + 1 + } else { + INITIAL_SESSION_ID + } + } else { + INITIAL_SESSION_ID + }; + + println!("{}", session_id); + + Ok(()) + } + + pub fn handle_message(&self, remote_peer: P, message: SyncMessage) -> Result<()> { + // @TODO: Handle `SyncRequest` + // @TODO: If message = SyncRequest, then check if a) session id doesn't exist yet b) we're + // not already handling the same schema id's in a running session + // + // If session id exists, then check if we just initialised it, in that case use peer id as + // tie-breaker to decide who continues + + Ok(()) + } +} + +#[cfg(test)] +mod tests {} From 3513e4486a6964cd170be8d17fedba10207fb23f Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Wed, 10 May 2023 15:57:33 +0100 Subject: [PATCH 02/25] WIP: Handle incoming SyncRequest message --- aquadoggo/src/replication/mod.rs | 142 ++++++++++++++++++++++++++++--- 1 file changed, 131 insertions(+), 11 deletions(-) diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 2e0945da8..1d32c2eff 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -6,10 +6,11 @@ use anyhow::{bail, Result}; use p2panda_rs::schema::SchemaId; const INITIAL_SESSION_ID: SessionId = 0; +const SUPPORTED_MODES: [u64; 1] = [0]; type SessionId = u64; -#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct TargetSet(Vec); impl TargetSet { @@ -23,12 +24,33 @@ impl TargetSet { } #[derive(Debug)] -pub enum SyncMessage {} +pub enum SyncMessage { + SyncRequest(u64, u64, TargetSet), + Other, +} -#[derive(Debug)] +#[derive(Clone, Debug)] +pub enum SessionState { + Pending, + Established, + Done, +} + +#[derive(Clone, Debug)] pub struct Session { id: SessionId, target_set: TargetSet, + state: SessionState, +} + +impl Session { + pub fn new(id: &SessionId, target_set: &TargetSet) -> Self { + Session { + id: id.clone(), + state: SessionState::Pending, + target_set: target_set.clone(), + } + } } #[derive(Debug)] @@ -39,7 +61,7 @@ pub struct SyncManager

{ impl

SyncManager

where - P: std::hash::Hash + Eq, + P: Clone + std::hash::Hash + Eq + PartialOrd, { pub fn new(local_peer: P) -> Self { Self { @@ -48,14 +70,27 @@ where } } - pub fn initiate_session(&self, remote_peer: P, schema_ids: &[SchemaId]) -> Result<()> { - let target_set = TargetSet::new(schema_ids); + fn insert_session(&mut self, remote_peer: &P, session_id: &SessionId, target_set: &TargetSet) { + let session = Session::new(session_id, target_set); + if let Some(sessions) = self.sessions.get_mut(remote_peer) { + sessions.push(session); + } else { + let sessions = vec![session]; + self.sessions.insert(remote_peer.clone(), sessions); + } + } + + fn get_sessions(&self, remote_peer: &P) -> Option<&Vec> { + self.sessions.get(remote_peer) + } + + pub fn initiate_session(&mut self, remote_peer: &P, target_set: &TargetSet) -> Result<()> { let session_id = if let Some(sessions) = self.sessions.get(&remote_peer) { // Make sure to not have duplicate sessions over the same schema ids let session = sessions .iter() - .find(|session| session.target_set == target_set); + .find(|session| &session.target_set == target_set); if session.is_some() { bail!("Session concerning same schema ids already exists"); @@ -70,19 +105,104 @@ where INITIAL_SESSION_ID }; - println!("{}", session_id); + self.insert_session(remote_peer, &session_id, target_set); Ok(()) } - pub fn handle_message(&self, remote_peer: P, message: SyncMessage) -> Result<()> { - // @TODO: Handle `SyncRequest` - // @TODO: If message = SyncRequest, then check if a) session id doesn't exist yet b) we're + pub fn handle_message(&mut self, remote_peer: &P, message: &SyncMessage) -> Result<()> { + // Handle `SyncRequest` + // If message = SyncRequest, then check if a) session id doesn't exist yet b) we're // not already handling the same schema id's in a running session // // If session id exists, then check if we just initialised it, in that case use peer id as // tie-breaker to decide who continues + match message { + SyncMessage::SyncRequest(mode, session_id, target_set) => { + // Check we support this replication mode + if !SUPPORTED_MODES.contains(&mode) { + bail!("Unsupported replication mode requested") + } + + // Check if any sessions for this peer already exist + if let Some(sessions) = self.sessions.get_mut(remote_peer) { + // lolz... + let sessions_clone = sessions.clone(); + + // Check if a session with this session id already exists for this peer + let session = sessions_clone + .iter() + .enumerate() + .find(|(_, session)| session.id == *session_id); + + match session { + Some((index, session)) => { + // Check session state, if it is already established then this is an error + // if it is in initial state, then we need to compare peer ids and drop + // the connection depending on which is "lower". + let accept_sync_request = match session.state { + SessionState::Pending => { + // Use peer ids as tie breaker as this SyncRequest contains a + // existing sessin id. + if &self.local_peer < remote_peer { + // Drop our pending session + sessions.remove(index); + + // We want to accept the incoming session + true + } else { + // Keep our pending session + // Ignore incoming sync request + false + } + } + _ => { + bail!("Invalid SyncRequest received: duplicate session id used") + } + }; + + if accept_sync_request { + // Accept incoming sync request + self.insert_session(remote_peer, &session_id, &target_set); + // @TODO: Session needs to generate some messages on creation and + // it will pass them back up to us to then forward onto + // the swarm + + // Check if the target sets match + if &session.target_set != target_set { + // Send a new sync request for the dropped session + // with new session id + + self.initiate_session(remote_peer, &target_set)?; + // @TODO: Again, the new session will generate a message + // which we send onto the swarm + } + } + + // We handled the request we return here. + return Ok(()); + } + // No duplicate session ids exist, move on. + None => (), + } + + // Check we don't already have a session handling this target set + if sessions + .iter() + .find(|session| &session.target_set == target_set) + .is_some() + { + bail!("SyncRequest containing duplicate target set found") + } + }; + + self.insert_session(remote_peer, &session_id, &target_set); + // @TODO: Handle messages that the new session will generate + } + SyncMessage::Other => todo!(), + } + Ok(()) } } From 2224aade104e0a35124bca187c50c1891bc50e8f Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 11:12:15 +0200 Subject: [PATCH 03/25] Move it into separate files --- aquadoggo/src/replication/manager.rs | 175 +++++++++++++++++++++++ aquadoggo/src/replication/message.rs | 9 ++ aquadoggo/src/replication/mod.rs | 199 +-------------------------- aquadoggo/src/replication/session.rs | 30 ++++ 4 files changed, 220 insertions(+), 193 deletions(-) create mode 100644 aquadoggo/src/replication/manager.rs create mode 100644 aquadoggo/src/replication/message.rs create mode 100644 aquadoggo/src/replication/session.rs diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs new file mode 100644 index 000000000..bac520bc5 --- /dev/null +++ b/aquadoggo/src/replication/manager.rs @@ -0,0 +1,175 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use std::collections::HashMap; + +use anyhow::{bail, Result}; + +use crate::replication::{Session, SessionId, SessionState, SyncMessage, TargetSet}; + +pub const INITIAL_SESSION_ID: SessionId = 0; + +// @TODO: Can the modes be an enum? +pub const SUPPORTED_MODES: [u64; 1] = [0]; + +#[derive(Debug)] +pub struct SyncManager

{ + local_peer: P, + sessions: HashMap>, +} + +impl

SyncManager

+where + P: Clone + std::hash::Hash + Eq + PartialOrd, +{ + pub fn new(local_peer: P) -> Self { + Self { + local_peer, + sessions: HashMap::new(), + } + } + + fn insert_session(&mut self, remote_peer: &P, session_id: &SessionId, target_set: &TargetSet) { + let session = Session::new(session_id, target_set); + + if let Some(sessions) = self.sessions.get_mut(remote_peer) { + sessions.push(session); + } else { + let sessions = vec![session]; + self.sessions.insert(remote_peer.clone(), sessions); + } + } + + fn get_sessions(&self, remote_peer: &P) -> Option<&Vec> { + self.sessions.get(remote_peer) + } + + pub fn initiate_session(&mut self, remote_peer: &P, target_set: &TargetSet) -> Result<()> { + let session_id = if let Some(sessions) = self.sessions.get(&remote_peer) { + // Make sure to not have duplicate sessions over the same schema ids + let session = sessions + .iter() + .find(|session| &session.target_set == target_set); + + if session.is_some() { + bail!("Session concerning same schema ids already exists"); + } + + if let Some(session) = sessions.last() { + session.id + 1 + } else { + INITIAL_SESSION_ID + } + } else { + INITIAL_SESSION_ID + }; + + self.insert_session(remote_peer, &session_id, target_set); + + Ok(()) + } + + pub fn handle_message(&mut self, remote_peer: &P, message: &SyncMessage) -> Result<()> { + // Handle `SyncRequest` + // If message = SyncRequest, then check if a) session id doesn't exist yet b) we're + // not already handling the same schema id's in a running session + // + // If session id exists, then check if we just initialised it, in that case use peer id as + // tie-breaker to decide who continues + + match message { + SyncMessage::SyncRequest(mode, session_id, target_set) => { + // Check we support this replication mode + if !SUPPORTED_MODES.contains(&mode) { + // @TODO: We want custom error types for all of these + bail!("Unsupported replication mode requested") + } + + // Check if any sessions for this peer already exist + if let Some(sessions) = self.sessions.get_mut(remote_peer) { + // lolz... + let sessions_clone = sessions.clone(); + + // Check if a session with this session id already exists for this peer + let session = sessions_clone + .iter() + .enumerate() + .find(|(_, session)| session.id == *session_id); + + match session { + Some((index, session)) => { + // Check session state, if it is already established then this is an error + // if it is in initial state, then we need to compare peer ids and drop + // the connection depending on which is "lower". + let accept_sync_request = match session.state { + SessionState::Pending => { + // Use peer ids as tie breaker as this SyncRequest contains a + // existing sessin id. + if &self.local_peer < remote_peer { + // Drop our pending session + sessions.remove(index); + + // We want to accept the incoming session + true + } else { + // Keep our pending session + // Ignore incoming sync request + false + } + } + _ => { + bail!("Invalid SyncRequest received: duplicate session id used") + } + }; + + if accept_sync_request { + // Accept incoming sync request + self.insert_session(remote_peer, &session_id, &target_set); + // @TODO: Session needs to generate some messages on creation and + // it will pass them back up to us to then forward onto + // the swarm + + // Check if the target sets match + if &session.target_set != target_set { + // Send a new sync request for the dropped session + // with new session id + + self.initiate_session(remote_peer, &target_set)?; + // @TODO: Again, the new session will generate a message + // which we send onto the swarm + } + } + + // We handled the request we return here. + return Ok(()); + } + // No duplicate session ids exist, move on. + None => (), + } + + // Check we don't already have a session handling this target set + if sessions + .iter() + .find(|session| &session.target_set == target_set) + .is_some() + { + bail!("SyncRequest containing duplicate target set found") + } + }; + + self.insert_session(remote_peer, &session_id, &target_set); + // @TODO: Handle messages that the new session will generate + } + SyncMessage::Other => todo!(), + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + #[test] + fn session_manager() { + // @TODO + } +} diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs new file mode 100644 index 000000000..394d37ade --- /dev/null +++ b/aquadoggo/src/replication/message.rs @@ -0,0 +1,9 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use crate::replication::TargetSet; + +#[derive(Debug)] +pub enum SyncMessage { + SyncRequest(u64, u64, TargetSet), + Other, +} diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 1d32c2eff..c6ebeef46 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -1,14 +1,14 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use std::collections::HashMap; - -use anyhow::{bail, Result}; use p2panda_rs::schema::SchemaId; -const INITIAL_SESSION_ID: SessionId = 0; -const SUPPORTED_MODES: [u64; 1] = [0]; +mod manager; +mod message; +mod session; -type SessionId = u64; +pub use manager::SyncManager; +pub use message::SyncMessage; +pub use session::{Session, SessionId, SessionState}; #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct TargetSet(Vec); @@ -22,190 +22,3 @@ impl TargetSet { Self(schema_ids) } } - -#[derive(Debug)] -pub enum SyncMessage { - SyncRequest(u64, u64, TargetSet), - Other, -} - -#[derive(Clone, Debug)] -pub enum SessionState { - Pending, - Established, - Done, -} - -#[derive(Clone, Debug)] -pub struct Session { - id: SessionId, - target_set: TargetSet, - state: SessionState, -} - -impl Session { - pub fn new(id: &SessionId, target_set: &TargetSet) -> Self { - Session { - id: id.clone(), - state: SessionState::Pending, - target_set: target_set.clone(), - } - } -} - -#[derive(Debug)] -pub struct SyncManager

{ - local_peer: P, - sessions: HashMap>, -} - -impl

SyncManager

-where - P: Clone + std::hash::Hash + Eq + PartialOrd, -{ - pub fn new(local_peer: P) -> Self { - Self { - local_peer, - sessions: HashMap::new(), - } - } - - fn insert_session(&mut self, remote_peer: &P, session_id: &SessionId, target_set: &TargetSet) { - let session = Session::new(session_id, target_set); - - if let Some(sessions) = self.sessions.get_mut(remote_peer) { - sessions.push(session); - } else { - let sessions = vec![session]; - self.sessions.insert(remote_peer.clone(), sessions); - } - } - - fn get_sessions(&self, remote_peer: &P) -> Option<&Vec> { - self.sessions.get(remote_peer) - } - - pub fn initiate_session(&mut self, remote_peer: &P, target_set: &TargetSet) -> Result<()> { - let session_id = if let Some(sessions) = self.sessions.get(&remote_peer) { - // Make sure to not have duplicate sessions over the same schema ids - let session = sessions - .iter() - .find(|session| &session.target_set == target_set); - - if session.is_some() { - bail!("Session concerning same schema ids already exists"); - } - - if let Some(session) = sessions.last() { - session.id + 1 - } else { - INITIAL_SESSION_ID - } - } else { - INITIAL_SESSION_ID - }; - - self.insert_session(remote_peer, &session_id, target_set); - - Ok(()) - } - - pub fn handle_message(&mut self, remote_peer: &P, message: &SyncMessage) -> Result<()> { - // Handle `SyncRequest` - // If message = SyncRequest, then check if a) session id doesn't exist yet b) we're - // not already handling the same schema id's in a running session - // - // If session id exists, then check if we just initialised it, in that case use peer id as - // tie-breaker to decide who continues - - match message { - SyncMessage::SyncRequest(mode, session_id, target_set) => { - // Check we support this replication mode - if !SUPPORTED_MODES.contains(&mode) { - bail!("Unsupported replication mode requested") - } - - // Check if any sessions for this peer already exist - if let Some(sessions) = self.sessions.get_mut(remote_peer) { - // lolz... - let sessions_clone = sessions.clone(); - - // Check if a session with this session id already exists for this peer - let session = sessions_clone - .iter() - .enumerate() - .find(|(_, session)| session.id == *session_id); - - match session { - Some((index, session)) => { - // Check session state, if it is already established then this is an error - // if it is in initial state, then we need to compare peer ids and drop - // the connection depending on which is "lower". - let accept_sync_request = match session.state { - SessionState::Pending => { - // Use peer ids as tie breaker as this SyncRequest contains a - // existing sessin id. - if &self.local_peer < remote_peer { - // Drop our pending session - sessions.remove(index); - - // We want to accept the incoming session - true - } else { - // Keep our pending session - // Ignore incoming sync request - false - } - } - _ => { - bail!("Invalid SyncRequest received: duplicate session id used") - } - }; - - if accept_sync_request { - // Accept incoming sync request - self.insert_session(remote_peer, &session_id, &target_set); - // @TODO: Session needs to generate some messages on creation and - // it will pass them back up to us to then forward onto - // the swarm - - // Check if the target sets match - if &session.target_set != target_set { - // Send a new sync request for the dropped session - // with new session id - - self.initiate_session(remote_peer, &target_set)?; - // @TODO: Again, the new session will generate a message - // which we send onto the swarm - } - } - - // We handled the request we return here. - return Ok(()); - } - // No duplicate session ids exist, move on. - None => (), - } - - // Check we don't already have a session handling this target set - if sessions - .iter() - .find(|session| &session.target_set == target_set) - .is_some() - { - bail!("SyncRequest containing duplicate target set found") - } - }; - - self.insert_session(remote_peer, &session_id, &target_set); - // @TODO: Handle messages that the new session will generate - } - SyncMessage::Other => todo!(), - } - - Ok(()) - } -} - -#[cfg(test)] -mod tests {} diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs new file mode 100644 index 000000000..2c72110b6 --- /dev/null +++ b/aquadoggo/src/replication/session.rs @@ -0,0 +1,30 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use crate::replication::TargetSet; + +pub type SessionId = u64; + +#[derive(Clone, Debug)] +pub enum SessionState { + Pending, + Established, + Done, +} + +#[derive(Clone, Debug)] +pub struct Session { + // @TODO: Access to the store + pub id: SessionId, + pub target_set: TargetSet, + pub state: SessionState, +} + +impl Session { + pub fn new(id: &SessionId, target_set: &TargetSet) -> Self { + Session { + id: id.clone(), + state: SessionState::Pending, + target_set: target_set.clone(), + } + } +} From 49491f773201ece0ffe0b238ee42ed584d97ca4d Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 12:39:44 +0200 Subject: [PATCH 04/25] Refactoring SyncManager methods --- aquadoggo/src/lib.rs | 2 + aquadoggo/src/manager.rs | 2 - aquadoggo/src/replication/errors.rs | 12 ++ aquadoggo/src/replication/manager.rs | 257 +++++++++++++++------------ aquadoggo/src/replication/message.rs | 4 +- aquadoggo/src/replication/mod.rs | 8 + 6 files changed, 170 insertions(+), 115 deletions(-) create mode 100644 aquadoggo/src/replication/errors.rs diff --git a/aquadoggo/src/lib.rs b/aquadoggo/src/lib.rs index 79270f8d6..30c2afdde 100644 --- a/aquadoggo/src/lib.rs +++ b/aquadoggo/src/lib.rs @@ -52,6 +52,8 @@ mod manager; mod materializer; mod network; mod node; +// @TODO: Remove this as soon as we use the replication code +#[allow(dead_code)] mod replication; mod schema; #[cfg(test)] diff --git a/aquadoggo/src/manager.rs b/aquadoggo/src/manager.rs index 339476256..16da4106f 100644 --- a/aquadoggo/src/manager.rs +++ b/aquadoggo/src/manager.rs @@ -13,8 +13,6 @@ use triggered::{Listener, Trigger}; /// Sends messages through the communication bus between services. pub type Sender = broadcast::Sender; -// pub type ServiceReady = oneshot::channel<()>; - // Receives ready signal from services once they are ready to handle messages on the communication bus. pub type ServiceReadyReceiver = oneshot::Receiver<()>; diff --git a/aquadoggo/src/replication/errors.rs b/aquadoggo/src/replication/errors.rs new file mode 100644 index 000000000..89833ffe5 --- /dev/null +++ b/aquadoggo/src/replication/errors.rs @@ -0,0 +1,12 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ReplicationError { + #[error("Remote peer requested unsupported replication mode")] + UnsupportedMode, + + #[error("Tried to initialise duplicate session with id {0}")] + DuplicateSession(u64), +} diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index bac520bc5..9591c7f18 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -2,14 +2,14 @@ use std::collections::HashMap; -use anyhow::{bail, Result}; +use anyhow::Result; -use crate::replication::{Session, SessionId, SessionState, SyncMessage, TargetSet}; +use crate::replication::errors::ReplicationError; +use crate::replication::{Mode, Session, SessionId, SessionState, SyncMessage, TargetSet}; pub const INITIAL_SESSION_ID: SessionId = 0; -// @TODO: Can the modes be an enum? -pub const SUPPORTED_MODES: [u64; 1] = [0]; +pub const SUPPORTED_MODES: [Mode; 1] = [Mode::Naive]; #[derive(Debug)] pub struct SyncManager

{ @@ -28,39 +28,49 @@ where } } + /// Get all sessions related to a remote peer. + fn get_sessions(&self, remote_peer: &P) -> Vec { + self.sessions + .get(&remote_peer) + // Always return an array, even when it is empty + .unwrap_or(&vec![]) + .to_owned() + } + + /// Register a new session in manager. fn insert_session(&mut self, remote_peer: &P, session_id: &SessionId, target_set: &TargetSet) { let session = Session::new(session_id, target_set); if let Some(sessions) = self.sessions.get_mut(remote_peer) { sessions.push(session); } else { - let sessions = vec![session]; - self.sessions.insert(remote_peer.clone(), sessions); + self.sessions.insert(remote_peer.clone(), vec![session]); } } - fn get_sessions(&self, remote_peer: &P) -> Option<&Vec> { - self.sessions.get(remote_peer) - } + pub fn initiate_session( + &mut self, + remote_peer: &P, + target_set: &TargetSet, + ) -> Result<(), ReplicationError> { + let sessions = self.get_sessions(remote_peer); - pub fn initiate_session(&mut self, remote_peer: &P, target_set: &TargetSet) -> Result<()> { - let session_id = if let Some(sessions) = self.sessions.get(&remote_peer) { - // Make sure to not have duplicate sessions over the same schema ids - let session = sessions - .iter() - .find(|session| &session.target_set == target_set); + // Make sure to not have duplicate sessions over the same schema ids + let session = sessions + .iter() + .find(|session| &session.target_set == target_set); - if session.is_some() { - bail!("Session concerning same schema ids already exists"); - } + if let Some(session) = session { + return Err(ReplicationError::DuplicateSession(session.id)); + } + // Determine next id for upcoming session + let session_id = { if let Some(session) = sessions.last() { session.id + 1 } else { INITIAL_SESSION_ID } - } else { - INITIAL_SESSION_ID }; self.insert_session(remote_peer, &session_id, target_set); @@ -68,108 +78,133 @@ where Ok(()) } - pub fn handle_message(&mut self, remote_peer: &P, message: &SyncMessage) -> Result<()> { - // Handle `SyncRequest` - // If message = SyncRequest, then check if a) session id doesn't exist yet b) we're - // not already handling the same schema id's in a running session - // - // If session id exists, then check if we just initialised it, in that case use peer id as - // tie-breaker to decide who continues + fn is_mode_supported(mode: &Mode) -> Result<(), ReplicationError> { + if !SUPPORTED_MODES.contains(mode) { + return Err(ReplicationError::UnsupportedMode); + } - match message { - SyncMessage::SyncRequest(mode, session_id, target_set) => { - // Check we support this replication mode - if !SUPPORTED_MODES.contains(&mode) { - // @TODO: We want custom error types for all of these - bail!("Unsupported replication mode requested") + Ok(()) + } + + fn handle_duplicate_session( + &mut self, + remote_peer: &P, + target_set: &TargetSet, + index: usize, + session: &Session, + ) -> Result<(), ReplicationError> { + let accept_inbound_request = match session.state { + // Handle only duplicate sessions when they haven't started yet + SessionState::Pending => { + if &self.local_peer < remote_peer { + // Drop our pending session + let mut sessions = self.get_sessions(remote_peer); + sessions.remove(index); + + // Accept the inbound request + true + } else { + // Keep our pending session, ignore inbound request + false } + } + _ => return Err(ReplicationError::DuplicateSession(session.id)), + }; + + if accept_inbound_request { + self.insert_session(remote_peer, &session.id, &target_set); - // Check if any sessions for this peer already exist - if let Some(sessions) = self.sessions.get_mut(remote_peer) { - // lolz... - let sessions_clone = sessions.clone(); - - // Check if a session with this session id already exists for this peer - let session = sessions_clone - .iter() - .enumerate() - .find(|(_, session)| session.id == *session_id); - - match session { - Some((index, session)) => { - // Check session state, if it is already established then this is an error - // if it is in initial state, then we need to compare peer ids and drop - // the connection depending on which is "lower". - let accept_sync_request = match session.state { - SessionState::Pending => { - // Use peer ids as tie breaker as this SyncRequest contains a - // existing sessin id. - if &self.local_peer < remote_peer { - // Drop our pending session - sessions.remove(index); - - // We want to accept the incoming session - true - } else { - // Keep our pending session - // Ignore incoming sync request - false - } - } - _ => { - bail!("Invalid SyncRequest received: duplicate session id used") - } - }; - - if accept_sync_request { - // Accept incoming sync request - self.insert_session(remote_peer, &session_id, &target_set); - // @TODO: Session needs to generate some messages on creation and - // it will pass them back up to us to then forward onto - // the swarm - - // Check if the target sets match - if &session.target_set != target_set { - // Send a new sync request for the dropped session - // with new session id - - self.initiate_session(remote_peer, &target_set)?; - // @TODO: Again, the new session will generate a message - // which we send onto the swarm - } - } - - // We handled the request we return here. - return Ok(()); - } - // No duplicate session ids exist, move on. - None => (), - } - - // Check we don't already have a session handling this target set - if sessions - .iter() - .find(|session| &session.target_set == target_set) - .is_some() - { - bail!("SyncRequest containing duplicate target set found") - } - }; - - self.insert_session(remote_peer, &session_id, &target_set); - // @TODO: Handle messages that the new session will generate + // @TODO: Session needs to generate some messages on creation and + // it will pass them back up to us to then forward onto + // the swarm + + // If we dropped our own outbound session request regarding a different target set, we + // need to re-establish it with another session id, otherwise it would get lost + if &session.target_set != target_set { + self.initiate_session(remote_peer, &target_set)?; + // @TODO: Again, the new session will generate a message + // which we send onto the swarm } - SyncMessage::Other => todo!(), } Ok(()) } + + fn handle_sync_request( + &mut self, + remote_peer: &P, + mode: &Mode, + session_id: &SessionId, + target_set: &TargetSet, + ) -> Result<(), ReplicationError> { + SyncManager::

::is_mode_supported(&mode)?; + + let sessions = self.get_sessions(remote_peer); + + // Check if a session with this id already exists for this peer + if let Some((index, session)) = sessions + .iter() + .enumerate() + .find(|(_, session)| session.id == *session_id) + { + return self.handle_duplicate_session(remote_peer, target_set, index, session); + } + + // Check if a session with this target set already exists for this peer + if let Some(session) = sessions + .iter() + .find(|session| &session.target_set == target_set) + { + return Err(ReplicationError::DuplicateSession(session.id)); + } + + self.insert_session(remote_peer, &session_id, &target_set); + + Ok(()) + } + + pub fn handle_message( + &mut self, + remote_peer: &P, + message: &SyncMessage, + ) -> Result<(), ReplicationError> { + match message { + SyncMessage::SyncRequest(mode, session_id, target_set) => { + return self.handle_sync_request(remote_peer, mode, session_id, target_set); + } + SyncMessage::Other => todo!(), + } + } } #[cfg(test)] mod tests { - #[test] - fn session_manager() { - // @TODO + use p2panda_rs::schema::SchemaId; + use p2panda_rs::test_utils::fixtures::schema_id; + use rstest::rstest; + + use crate::replication::{Mode, SyncMessage, TargetSet}; + + use super::{SyncManager, INITIAL_SESSION_ID}; + + const PEER_ID_LOCAL: &'static str = "local"; + const PEER_ID_REMOTE: &'static str = "remote"; + + #[rstest] + fn checks_supported_mode(schema_id: SchemaId) { + let target_set = TargetSet::new(&[schema_id]); + + // Should not fail when requesting supported replication mode + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + // Should fail when requesting unsupported replication mode + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let message = + SyncMessage::SyncRequest(Mode::SetReconciliation, INITIAL_SESSION_ID, target_set); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_err()); } } diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs index 394d37ade..2c576f6af 100644 --- a/aquadoggo/src/replication/message.rs +++ b/aquadoggo/src/replication/message.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use crate::replication::TargetSet; +use crate::replication::{Mode, SessionId, TargetSet}; #[derive(Debug)] pub enum SyncMessage { - SyncRequest(u64, u64, TargetSet), + SyncRequest(Mode, SessionId, TargetSet), Other, } diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index c6ebeef46..035ef40aa 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -2,6 +2,7 @@ use p2panda_rs::schema::SchemaId; +pub mod errors; mod manager; mod message; mod session; @@ -22,3 +23,10 @@ impl TargetSet { Self(schema_ids) } } + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum Mode { + Naive, + SetReconciliation, + Unknown, +} From 8500c35d11271692e1e17149316e7100cec6a4af Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 12:45:30 +0200 Subject: [PATCH 05/25] Fix some clippy warnings in SyncManager code --- aquadoggo/src/replication/manager.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index 9591c7f18..cf8cf5cc7 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -31,7 +31,7 @@ where /// Get all sessions related to a remote peer. fn get_sessions(&self, remote_peer: &P) -> Vec { self.sessions - .get(&remote_peer) + .get(remote_peer) // Always return an array, even when it is empty .unwrap_or(&vec![]) .to_owned() @@ -112,7 +112,7 @@ where }; if accept_inbound_request { - self.insert_session(remote_peer, &session.id, &target_set); + self.insert_session(remote_peer, &session.id, target_set); // @TODO: Session needs to generate some messages on creation and // it will pass them back up to us to then forward onto @@ -121,7 +121,7 @@ where // If we dropped our own outbound session request regarding a different target set, we // need to re-establish it with another session id, otherwise it would get lost if &session.target_set != target_set { - self.initiate_session(remote_peer, &target_set)?; + self.initiate_session(remote_peer, target_set)?; // @TODO: Again, the new session will generate a message // which we send onto the swarm } @@ -137,7 +137,7 @@ where session_id: &SessionId, target_set: &TargetSet, ) -> Result<(), ReplicationError> { - SyncManager::

::is_mode_supported(&mode)?; + SyncManager::

::is_mode_supported(mode)?; let sessions = self.get_sessions(remote_peer); @@ -158,7 +158,7 @@ where return Err(ReplicationError::DuplicateSession(session.id)); } - self.insert_session(remote_peer, &session_id, &target_set); + self.insert_session(remote_peer, session_id, target_set); Ok(()) } @@ -170,7 +170,7 @@ where ) -> Result<(), ReplicationError> { match message { SyncMessage::SyncRequest(mode, session_id, target_set) => { - return self.handle_sync_request(remote_peer, mode, session_id, target_set); + self.handle_sync_request(remote_peer, mode, session_id, target_set) } SyncMessage::Other => todo!(), } From 2fa86d9131fe726a119148a2d453d52980835a6f Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 12:52:23 +0200 Subject: [PATCH 06/25] Distinct duplicate session errors by in- or outbound --- aquadoggo/src/replication/errors.rs | 7 +++++-- aquadoggo/src/replication/manager.rs | 18 +++++++++++++----- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/aquadoggo/src/replication/errors.rs b/aquadoggo/src/replication/errors.rs index 89833ffe5..ba7110b0b 100644 --- a/aquadoggo/src/replication/errors.rs +++ b/aquadoggo/src/replication/errors.rs @@ -7,6 +7,9 @@ pub enum ReplicationError { #[error("Remote peer requested unsupported replication mode")] UnsupportedMode, - #[error("Tried to initialise duplicate session with id {0}")] - DuplicateSession(u64), + #[error("Tried to initialise duplicate inbound replication session with id {0}")] + DuplicateInboundRequest(u64), + + #[error("Tried to initialise duplicate outbound replication session with id {0}")] + DuplicateOutboundRequest(u64), } diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index cf8cf5cc7..7aa000512 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -61,7 +61,7 @@ where .find(|session| &session.target_set == target_set); if let Some(session) = session { - return Err(ReplicationError::DuplicateSession(session.id)); + return Err(ReplicationError::DuplicateOutboundRequest(session.id)); } // Determine next id for upcoming session @@ -108,7 +108,7 @@ where false } } - _ => return Err(ReplicationError::DuplicateSession(session.id)), + _ => return Err(ReplicationError::DuplicateInboundRequest(session.id)), }; if accept_inbound_request { @@ -141,7 +141,8 @@ where let sessions = self.get_sessions(remote_peer); - // Check if a session with this id already exists for this peer + // Check if a session with this id already exists for this peer, this can happen if both + // peers started to initiate a session at the same time, we can try to resolve this if let Some((index, session)) = sessions .iter() .enumerate() @@ -150,12 +151,13 @@ where return self.handle_duplicate_session(remote_peer, target_set, index, session); } - // Check if a session with this target set already exists for this peer + // Check if a session with this target set already exists for this peer, this always gets + // rejected because it is clearly redundant if let Some(session) = sessions .iter() .find(|session| &session.target_set == target_set) { - return Err(ReplicationError::DuplicateSession(session.id)); + return Err(ReplicationError::DuplicateInboundRequest(session.id)); } self.insert_session(remote_peer, session_id, target_set); @@ -207,4 +209,10 @@ mod tests { let result = manager.handle_message(&PEER_ID_REMOTE, &message); assert!(result.is_err()); } + + #[rstest] + fn initiate_outbound_session() {} + + #[rstest] + fn initiate_inbound_session() {} } From 54370c407cc364f622560bdaf173bb4b6d43c6ef Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 13:14:23 +0200 Subject: [PATCH 07/25] De-duplicate schema ids in target set, add some tests --- aquadoggo/src/replication/manager.rs | 34 ++++++++++++++++-- aquadoggo/src/replication/mod.rs | 54 ++++++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 6 deletions(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index 7aa000512..cc7b75275 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -181,10 +181,12 @@ where #[cfg(test)] mod tests { - use p2panda_rs::schema::SchemaId; - use p2panda_rs::test_utils::fixtures::schema_id; + use p2panda_rs::document::DocumentViewId; + use p2panda_rs::schema::{SchemaId, SchemaName}; + use p2panda_rs::test_utils::fixtures::{random_document_view_id, schema_id}; use rstest::rstest; + use crate::replication::errors::ReplicationError; use crate::replication::{Mode, SyncMessage, TargetSet}; use super::{SyncManager, INITIAL_SESSION_ID}; @@ -211,7 +213,33 @@ mod tests { } #[rstest] - fn initiate_outbound_session() {} + fn initiate_outbound_session( + #[from(random_document_view_id)] document_view_id_1: DocumentViewId, + #[from(random_document_view_id)] document_view_id_2: DocumentViewId, + ) { + let schema_id_1 = + SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id_1); + let schema_id_2 = + SchemaId::new_application(&SchemaName::new("profiles").unwrap(), &document_view_id_2); + + let target_set_1 = TargetSet::new(&[schema_id_1]); + let target_set_2 = TargetSet::new(&[schema_id_2]); + + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1); + assert!(result.is_ok()); + + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2); + assert!(result.is_ok()); + + // Expect error when initiating a session for the same target set + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1); + assert!(matches!( + result, + Err(ReplicationError::DuplicateOutboundRequest(0)) + )); + } #[rstest] fn initiate_inbound_session() {} diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 035ef40aa..072aed48b 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -11,16 +11,25 @@ pub use manager::SyncManager; pub use message::SyncMessage; pub use session::{Session, SessionId, SessionState}; +/// De-duplicated and sorted set of schema ids which define the target data for the replication +/// session. #[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd)] pub struct TargetSet(Vec); impl TargetSet { pub fn new(schema_ids: &[SchemaId]) -> Self { + // Remove duplicates + let mut deduplicated_set: Vec = Vec::new(); + for schema_id in schema_ids { + if !deduplicated_set.contains(schema_id) { + deduplicated_set.push(schema_id.clone()); + } + } + // Sort schema ids to compare target sets easily - let mut schema_ids = schema_ids.to_vec(); - schema_ids.sort(); + deduplicated_set.sort(); - Self(schema_ids) + Self(deduplicated_set) } } @@ -30,3 +39,42 @@ pub enum Mode { SetReconciliation, Unknown, } + +#[cfg(test)] +mod tests { + use p2panda_rs::document::DocumentViewId; + use p2panda_rs::schema::{SchemaId, SchemaName}; + use p2panda_rs::test_utils::fixtures::random_document_view_id; + use rstest::rstest; + + use super::TargetSet; + + #[rstest] + fn compare_target_sets( + #[from(random_document_view_id)] document_view_id_1: DocumentViewId, + #[from(random_document_view_id)] document_view_id_2: DocumentViewId, + ) { + let schema_id_1 = + SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id_1); + let schema_id_2 = + SchemaId::new_application(&SchemaName::new("profiles").unwrap(), &document_view_id_2); + + // Sort schema ids + assert_eq!( + TargetSet::new(&[schema_id_1.clone(), schema_id_2.clone()]), + TargetSet::new(&[schema_id_2.clone(), schema_id_1.clone()]), + ); + + // De-duplicate schema ids + assert_eq!( + TargetSet::new(&[schema_id_1.clone(), schema_id_1.clone()]), + TargetSet::new(&[schema_id_1.clone()]), + ); + + // Distinct different target sets from each other + assert_ne!( + TargetSet::new(&[schema_id_1.clone()]), + TargetSet::new(&[schema_id_2.clone()]), + ); + } +} From f68b95b5dfb0866ba7ae461f498e46a21609296f Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 13:21:43 +0200 Subject: [PATCH 08/25] Use a fixture to generate random target set for tests --- aquadoggo/src/replication/manager.rs | 30 +++++++++++++++------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index cc7b75275..a11fed0a3 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -194,10 +194,16 @@ mod tests { const PEER_ID_LOCAL: &'static str = "local"; const PEER_ID_REMOTE: &'static str = "remote"; - #[rstest] - fn checks_supported_mode(schema_id: SchemaId) { - let target_set = TargetSet::new(&[schema_id]); + #[rstest::fixture] + fn random_target_set() -> TargetSet { + let document_view_id = random_document_view_id(); + let schema_id = + SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id); + TargetSet::new(&[schema_id]) + } + #[rstest] + fn checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) { // Should not fail when requesting supported replication mode let mut manager = SyncManager::new(PEER_ID_LOCAL); let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone()); @@ -214,17 +220,9 @@ mod tests { #[rstest] fn initiate_outbound_session( - #[from(random_document_view_id)] document_view_id_1: DocumentViewId, - #[from(random_document_view_id)] document_view_id_2: DocumentViewId, + #[from(random_target_set)] target_set_1: TargetSet, + #[from(random_target_set)] target_set_2: TargetSet, ) { - let schema_id_1 = - SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id_1); - let schema_id_2 = - SchemaId::new_application(&SchemaName::new("profiles").unwrap(), &document_view_id_2); - - let target_set_1 = TargetSet::new(&[schema_id_1]); - let target_set_2 = TargetSet::new(&[schema_id_2]); - let mut manager = SyncManager::new(PEER_ID_LOCAL); let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1); assert!(result.is_ok()); @@ -242,5 +240,9 @@ mod tests { } #[rstest] - fn initiate_inbound_session() {} + fn initiate_inbound_session( + #[from(random_target_set)] target_set_1: TargetSet, + #[from(random_target_set)] target_set_2: TargetSet, + ) { + } } From 573c229f320d35c285ecfdac34ddb3b3e6eabf74 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 10:26:11 +0100 Subject: [PATCH 09/25] Introduce scoped message --- aquadoggo/src/replication/message.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs index 2c576f6af..e3909636f 100644 --- a/aquadoggo/src/replication/message.rs +++ b/aquadoggo/src/replication/message.rs @@ -7,3 +7,9 @@ pub enum SyncMessage { SyncRequest(Mode, SessionId, TargetSet), Other, } + +#[derive(Debug)] +enum ScopedMessage { + Have, + Entry, +} From 737996d538aaeab1114834d9c419a0ff5a4a5265 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 10:28:02 +0100 Subject: [PATCH 10/25] Rename ScopedMessage -> StrategyMessage --- aquadoggo/src/replication/message.rs | 2 +- aquadoggo/src/replication/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs index e3909636f..11f57b083 100644 --- a/aquadoggo/src/replication/message.rs +++ b/aquadoggo/src/replication/message.rs @@ -9,7 +9,7 @@ pub enum SyncMessage { } #[derive(Debug)] -enum ScopedMessage { +enum StrategyMessage { Have, Entry, } diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 072aed48b..00ad1671d 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -8,7 +8,7 @@ mod message; mod session; pub use manager::SyncManager; -pub use message::SyncMessage; +pub use message::{SyncMessage, StrategyMessage}; pub use session::{Session, SessionId, SessionState}; /// De-duplicated and sorted set of schema ids which define the target data for the replication From 46795839bb72a4044d550f1ac4bda71081e07114 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 10:28:35 +0100 Subject: [PATCH 11/25] Make StrategyMessage public --- aquadoggo/src/replication/message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs index 11f57b083..b6aed3a42 100644 --- a/aquadoggo/src/replication/message.rs +++ b/aquadoggo/src/replication/message.rs @@ -9,7 +9,7 @@ pub enum SyncMessage { } #[derive(Debug)] -enum StrategyMessage { +pub enum StrategyMessage { Have, Entry, } From 2ebbf5978fe4a1675735b621fc7d0b1698055f4b Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 10:43:59 +0100 Subject: [PATCH 12/25] Define strategy trait --- aquadoggo/src/replication/mod.rs | 6 ++++++ aquadoggo/src/replication/strategies.rs | 22 ++++++++++++++++++++++ 2 files changed, 28 insertions(+) create mode 100644 aquadoggo/src/replication/strategies.rs diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 00ad1671d..440d4d75a 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -6,6 +6,7 @@ pub mod errors; mod manager; mod message; mod session; +mod strategies; pub use manager::SyncManager; pub use message::{SyncMessage, StrategyMessage}; @@ -78,3 +79,8 @@ mod tests { ); } } + +pub enum ReplicationMode { + Naive, + SetReconciliation +} \ No newline at end of file diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs new file mode 100644 index 000000000..ca6edc5eb --- /dev/null +++ b/aquadoggo/src/replication/strategies.rs @@ -0,0 +1,22 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use async_trait::async_trait; + +use crate::replication::{ReplicationMode, StrategyMessage}; + +struct StrategyResult { + is_done: bool, + messages: Vec, +} + +#[async_trait] +pub trait Strategy { + /// Replication mode of this strategy. + fn mode() -> ReplicationMode; + + // Generate initial messages. + async fn initial_messages(&self) -> Vec; + + // Handle incoming message and return response. + async fn handle_message(&self, message: StrategyMessage) -> Result; +} \ No newline at end of file From 651616a9870612d6e238d0214dbe342404bfa4fa Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 11:42:40 +0100 Subject: [PATCH 13/25] Implement Strategy trait for NaiveStrategy --- aquadoggo/src/replication/strategies.rs | 79 ++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs index ca6edc5eb..2a2c5ee23 100644 --- a/aquadoggo/src/replication/strategies.rs +++ b/aquadoggo/src/replication/strategies.rs @@ -1,10 +1,12 @@ // SPDX-License-Identifier: AGPL-3.0-or-later +use anyhow::Result; use async_trait::async_trait; +use p2panda_rs::schema::SchemaId; -use crate::replication::{ReplicationMode, StrategyMessage}; +use crate::replication::{ReplicationMode, StrategyMessage, TargetSet}; -struct StrategyResult { +pub struct StrategyResult { is_done: bool, messages: Vec, } @@ -14,9 +16,80 @@ pub trait Strategy { /// Replication mode of this strategy. fn mode() -> ReplicationMode; + /// Target set replication is occurring over. + fn target_set(&self) -> TargetSet; + // Generate initial messages. + // + // @TODO: we want to pass the store in here too eventually. async fn initial_messages(&self) -> Vec; // Handle incoming message and return response. + // + // @TODO: we want to pass the store in here too eventually. async fn handle_message(&self, message: StrategyMessage) -> Result; -} \ No newline at end of file + + // Validate and store entry and operation. + // + // @TODO: we want to pass the store in here too eventually. + async fn handle_entry( + &self, + schema_id: &SchemaId, + entry_bytes: Vec, + operation_bytes: Vec, + ) -> Result<()> { + // Validation: + // Check against schema_id and target_set if entry is what we've asked for + let _target_set = self.target_set(); + + // Further validation through our publish api stuff (?!) + + // Have an entry waiting lobby service here, to batch stuff?! + // Nice to check certificate pool in one go. + // Nice to not put too much pressure on the database. + Ok(()) + } +} + +pub struct NaiveStrategy { + target_set: TargetSet, +} + +#[async_trait] +impl Strategy for NaiveStrategy { + fn mode() -> ReplicationMode { + ReplicationMode::Naive + } + + fn target_set(&self) -> TargetSet { + self.target_set.clone() + } + + async fn initial_messages(&self) -> Vec { + // TODO: Access the store and compose a have message which contains our local log heights over + // the TargetSet. + let _target_set = self.target_set(); + + vec![StrategyMessage::Have] + } + + async fn handle_message(&self, message: StrategyMessage) -> Result { + // TODO: Verify that the TargetSet contained in the message is a sub-set of the passed + // local TargetSet. + let _target_set = self.target_set(); + let mut messages = Vec::new(); + let mut is_done = false; + + match message { + StrategyMessage::Have => { + // Compose Have message and push to messages + is_done = true; + } + StrategyMessage::Entry => { + // self.handle_entry(..) + } + } + + Ok(StrategyResult { is_done, messages }) + } +} From 7ffb12054f7527c98650150bc7fb523a6b744a49 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 12:22:34 +0100 Subject: [PATCH 14/25] Integrate strategies onto Session --- aquadoggo/src/replication/message.rs | 2 +- aquadoggo/src/replication/mod.rs | 8 +++-- aquadoggo/src/replication/session.rs | 46 ++++++++++++++++++++----- aquadoggo/src/replication/strategies.rs | 33 +++++++++++++++--- 4 files changed, 73 insertions(+), 16 deletions(-) diff --git a/aquadoggo/src/replication/message.rs b/aquadoggo/src/replication/message.rs index b6aed3a42..958e2c205 100644 --- a/aquadoggo/src/replication/message.rs +++ b/aquadoggo/src/replication/message.rs @@ -8,7 +8,7 @@ pub enum SyncMessage { Other, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub enum StrategyMessage { Have, Entry, diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 440d4d75a..6611a9365 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -9,8 +9,9 @@ mod session; mod strategies; pub use manager::SyncManager; -pub use message::{SyncMessage, StrategyMessage}; +pub use message::{StrategyMessage, SyncMessage}; pub use session::{Session, SessionId, SessionState}; +pub use strategies::{NaiveStrategy, SetReconciliationStrategy, Strategy}; /// De-duplicated and sorted set of schema ids which define the target data for the replication /// session. @@ -80,7 +81,8 @@ mod tests { } } +#[derive(Clone, Debug)] pub enum ReplicationMode { Naive, - SetReconciliation -} \ No newline at end of file + SetReconciliation, +} diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index 2c72110b6..b128d3db4 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -1,6 +1,11 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use crate::replication::TargetSet; +use anyhow::Result; +use p2panda_rs::schema::SchemaId; + +use crate::replication::{ + NaiveStrategy, ReplicationMode, SetReconciliationStrategy, Strategy, StrategyMessage, TargetSet, +}; pub type SessionId = u64; @@ -11,20 +16,45 @@ pub enum SessionState { Done, } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct Session { // @TODO: Access to the store + // store: Store pub id: SessionId, - pub target_set: TargetSet, pub state: SessionState, + pub strategy: Box, } impl Session { - pub fn new(id: &SessionId, target_set: &TargetSet) -> Self { - Session { - id: id.clone(), - state: SessionState::Pending, - target_set: target_set.clone(), + pub fn new(id: &SessionId, target_set: &TargetSet, mode: ReplicationMode) -> Self { + match mode { + ReplicationMode::Naive => { + let strategy = Box::new(NaiveStrategy { + mode, + target_set: target_set.clone(), + }); + return Session { + id: id.clone(), + state: SessionState::Pending, + strategy, + }; + } + ReplicationMode::SetReconciliation => { + let strategy = Box::new(SetReconciliationStrategy()); + return Session { + id: id.clone(), + state: SessionState::Pending, + strategy, + }; + } } } + + pub fn mode(&self) -> &ReplicationMode { + &self.strategy.mode() + } + + pub fn target_set(&self) -> &TargetSet { + &self.strategy.target_set() + } } diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs index 2a2c5ee23..99ed6b320 100644 --- a/aquadoggo/src/replication/strategies.rs +++ b/aquadoggo/src/replication/strategies.rs @@ -6,15 +6,16 @@ use p2panda_rs::schema::SchemaId; use crate::replication::{ReplicationMode, StrategyMessage, TargetSet}; +#[derive(Clone, Debug)] pub struct StrategyResult { is_done: bool, messages: Vec, } #[async_trait] -pub trait Strategy { +pub trait Strategy: std::fmt::Debug { /// Replication mode of this strategy. - fn mode() -> ReplicationMode; + fn mode(&self) -> ReplicationMode; /// Target set replication is occurring over. fn target_set(&self) -> TargetSet; @@ -51,14 +52,16 @@ pub trait Strategy { } } +#[derive(Clone, Debug)] pub struct NaiveStrategy { target_set: TargetSet, + mode: ReplicationMode, } #[async_trait] impl Strategy for NaiveStrategy { - fn mode() -> ReplicationMode { - ReplicationMode::Naive + fn mode(&self) -> ReplicationMode { + self.mode.clone() } fn target_set(&self) -> TargetSet { @@ -93,3 +96,25 @@ impl Strategy for NaiveStrategy { Ok(StrategyResult { is_done, messages }) } } + +#[derive(Clone, Debug)] +pub struct SetReconciliationStrategy(); + +#[async_trait] +impl Strategy for SetReconciliationStrategy { + fn mode(&self) -> ReplicationMode { + todo!() + } + + fn target_set(&self) -> TargetSet { + todo!() + } + + async fn initial_messages(&self) -> Vec { + todo!() + } + + async fn handle_message(&self, message: StrategyMessage) -> Result { + todo!() + } +} From 7940e4d972f41fd4b3aad9c3d661c14686ca34c5 Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 13:25:41 +0200 Subject: [PATCH 15/25] Fix test --- aquadoggo/src/replication/manager.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index a11fed0a3..4c8070944 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -227,7 +227,6 @@ mod tests { let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1); assert!(result.is_ok()); - let mut manager = SyncManager::new(PEER_ID_LOCAL); let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2); assert!(result.is_ok()); From e5eae31ef1eb5ab71f8b8389c8dde3881cd34c98 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 12:30:24 +0100 Subject: [PATCH 16/25] Remove duplicate Mode structs --- aquadoggo/src/replication/mod.rs | 6 ------ aquadoggo/src/replication/session.rs | 11 ++++++----- aquadoggo/src/replication/strategies.rs | 10 +++++----- 3 files changed, 11 insertions(+), 16 deletions(-) diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 6611a9365..4aa7cd1c7 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -80,9 +80,3 @@ mod tests { ); } } - -#[derive(Clone, Debug)] -pub enum ReplicationMode { - Naive, - SetReconciliation, -} diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index b128d3db4..2892eb738 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -4,7 +4,7 @@ use anyhow::Result; use p2panda_rs::schema::SchemaId; use crate::replication::{ - NaiveStrategy, ReplicationMode, SetReconciliationStrategy, Strategy, StrategyMessage, TargetSet, + NaiveStrategy, Mode, SetReconciliationStrategy, Strategy, StrategyMessage, TargetSet, }; pub type SessionId = u64; @@ -26,9 +26,9 @@ pub struct Session { } impl Session { - pub fn new(id: &SessionId, target_set: &TargetSet, mode: ReplicationMode) -> Self { + pub fn new(id: &SessionId, target_set: &TargetSet, mode: Mode) -> Self { match mode { - ReplicationMode::Naive => { + Mode::Naive => { let strategy = Box::new(NaiveStrategy { mode, target_set: target_set.clone(), @@ -39,7 +39,7 @@ impl Session { strategy, }; } - ReplicationMode::SetReconciliation => { + Mode::SetReconciliation => { let strategy = Box::new(SetReconciliationStrategy()); return Session { id: id.clone(), @@ -47,10 +47,11 @@ impl Session { strategy, }; } + Unknown => panic!("Unknown replication mode found"), } } - pub fn mode(&self) -> &ReplicationMode { + pub fn mode(&self) -> &Mode { &self.strategy.mode() } diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs index 99ed6b320..8ca3ace25 100644 --- a/aquadoggo/src/replication/strategies.rs +++ b/aquadoggo/src/replication/strategies.rs @@ -4,7 +4,7 @@ use anyhow::Result; use async_trait::async_trait; use p2panda_rs::schema::SchemaId; -use crate::replication::{ReplicationMode, StrategyMessage, TargetSet}; +use crate::replication::{Mode, StrategyMessage, TargetSet}; #[derive(Clone, Debug)] pub struct StrategyResult { @@ -15,7 +15,7 @@ pub struct StrategyResult { #[async_trait] pub trait Strategy: std::fmt::Debug { /// Replication mode of this strategy. - fn mode(&self) -> ReplicationMode; + fn mode(&self) -> Mode; /// Target set replication is occurring over. fn target_set(&self) -> TargetSet; @@ -55,12 +55,12 @@ pub trait Strategy: std::fmt::Debug { #[derive(Clone, Debug)] pub struct NaiveStrategy { target_set: TargetSet, - mode: ReplicationMode, + mode: Mode, } #[async_trait] impl Strategy for NaiveStrategy { - fn mode(&self) -> ReplicationMode { + fn mode(&self) -> Mode { self.mode.clone() } @@ -102,7 +102,7 @@ pub struct SetReconciliationStrategy(); #[async_trait] impl Strategy for SetReconciliationStrategy { - fn mode(&self) -> ReplicationMode { + fn mode(&self) -> Mode { todo!() } From 9a043f31606120a8347ec04e5510d495593d42f3 Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 13:37:40 +0200 Subject: [PATCH 17/25] Add inbound test --- aquadoggo/src/replication/manager.rs | 60 +++++++++++++++++++--------- 1 file changed, 42 insertions(+), 18 deletions(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index 4c8070944..e93b161d7 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -181,9 +181,8 @@ where #[cfg(test)] mod tests { - use p2panda_rs::document::DocumentViewId; use p2panda_rs::schema::{SchemaId, SchemaName}; - use p2panda_rs::test_utils::fixtures::{random_document_view_id, schema_id}; + use p2panda_rs::test_utils::fixtures::random_document_view_id; use rstest::rstest; use crate::replication::errors::ReplicationError; @@ -202,22 +201,6 @@ mod tests { TargetSet::new(&[schema_id]) } - #[rstest] - fn checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) { - // Should not fail when requesting supported replication mode - let mut manager = SyncManager::new(PEER_ID_LOCAL); - let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone()); - let result = manager.handle_message(&PEER_ID_REMOTE, &message); - assert!(result.is_ok()); - - // Should fail when requesting unsupported replication mode - let mut manager = SyncManager::new(PEER_ID_LOCAL); - let message = - SyncMessage::SyncRequest(Mode::SetReconciliation, INITIAL_SESSION_ID, target_set); - let result = manager.handle_message(&PEER_ID_REMOTE, &message); - assert!(result.is_err()); - } - #[rstest] fn initiate_outbound_session( #[from(random_target_set)] target_set_1: TargetSet, @@ -243,5 +226,46 @@ mod tests { #[from(random_target_set)] target_set_1: TargetSet, #[from(random_target_set)] target_set_2: TargetSet, ) { + let mut manager = SyncManager::new(PEER_ID_LOCAL); + + let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + let message = SyncMessage::SyncRequest(Mode::Naive, 1, target_set_2.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + // Reject session with duplicate session id + let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(matches!( + result, + Err(ReplicationError::DuplicateInboundRequest(0)) + )); + + // Reject different session concerning same target set + let message = SyncMessage::SyncRequest(Mode::Naive, 2, target_set_2.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(matches!( + result, + Err(ReplicationError::DuplicateInboundRequest(1)) + )); + } + + #[rstest] + fn inbound_checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) { + // Should not fail when requesting supported replication mode + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone()); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_ok()); + + // Should fail when requesting unsupported replication mode + let mut manager = SyncManager::new(PEER_ID_LOCAL); + let message = + SyncMessage::SyncRequest(Mode::SetReconciliation, INITIAL_SESSION_ID, target_set); + let result = manager.handle_message(&PEER_ID_REMOTE, &message); + assert!(result.is_err()); } } From f9ee186a065b76034b2ca9155a92430382566acb Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 13:08:23 +0100 Subject: [PATCH 18/25] Move traits into own module and implement Clone for Strategy --- aquadoggo/src/replication/mod.rs | 3 +- aquadoggo/src/replication/strategies.rs | 56 ++++++-------------- aquadoggo/src/replication/traits.rs | 68 +++++++++++++++++++++++++ 3 files changed, 86 insertions(+), 41 deletions(-) create mode 100644 aquadoggo/src/replication/traits.rs diff --git a/aquadoggo/src/replication/mod.rs b/aquadoggo/src/replication/mod.rs index 4aa7cd1c7..c43557e14 100644 --- a/aquadoggo/src/replication/mod.rs +++ b/aquadoggo/src/replication/mod.rs @@ -7,11 +7,12 @@ mod manager; mod message; mod session; mod strategies; +pub mod traits; pub use manager::SyncManager; pub use message::{StrategyMessage, SyncMessage}; pub use session::{Session, SessionId, SessionState}; -pub use strategies::{NaiveStrategy, SetReconciliationStrategy, Strategy}; +pub use strategies::{NaiveStrategy, SetReconciliationStrategy, StrategyResult}; /// De-duplicated and sorted set of schema ids which define the target data for the replication /// session. diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs index 8ca3ace25..90806d1db 100644 --- a/aquadoggo/src/replication/strategies.rs +++ b/aquadoggo/src/replication/strategies.rs @@ -4,6 +4,7 @@ use anyhow::Result; use async_trait::async_trait; use p2panda_rs::schema::SchemaId; +use crate::replication::traits::Strategy; use crate::replication::{Mode, StrategyMessage, TargetSet}; #[derive(Clone, Debug)] @@ -12,52 +13,21 @@ pub struct StrategyResult { messages: Vec, } -#[async_trait] -pub trait Strategy: std::fmt::Debug { - /// Replication mode of this strategy. - fn mode(&self) -> Mode; - - /// Target set replication is occurring over. - fn target_set(&self) -> TargetSet; - - // Generate initial messages. - // - // @TODO: we want to pass the store in here too eventually. - async fn initial_messages(&self) -> Vec; - - // Handle incoming message and return response. - // - // @TODO: we want to pass the store in here too eventually. - async fn handle_message(&self, message: StrategyMessage) -> Result; - - // Validate and store entry and operation. - // - // @TODO: we want to pass the store in here too eventually. - async fn handle_entry( - &self, - schema_id: &SchemaId, - entry_bytes: Vec, - operation_bytes: Vec, - ) -> Result<()> { - // Validation: - // Check against schema_id and target_set if entry is what we've asked for - let _target_set = self.target_set(); - - // Further validation through our publish api stuff (?!) - - // Have an entry waiting lobby service here, to batch stuff?! - // Nice to check certificate pool in one go. - // Nice to not put too much pressure on the database. - Ok(()) - } -} - #[derive(Clone, Debug)] pub struct NaiveStrategy { target_set: TargetSet, mode: Mode, } +impl NaiveStrategy { + pub fn new(target_set: &TargetSet, mode: &Mode) -> Self { + Self { + target_set: target_set.clone(), + mode: mode.clone(), + } + } +} + #[async_trait] impl Strategy for NaiveStrategy { fn mode(&self) -> Mode { @@ -100,6 +70,12 @@ impl Strategy for NaiveStrategy { #[derive(Clone, Debug)] pub struct SetReconciliationStrategy(); +impl SetReconciliationStrategy { + pub fn new() -> Self { + Self() + } +} + #[async_trait] impl Strategy for SetReconciliationStrategy { fn mode(&self) -> Mode { diff --git a/aquadoggo/src/replication/traits.rs b/aquadoggo/src/replication/traits.rs new file mode 100644 index 000000000..23936e0ea --- /dev/null +++ b/aquadoggo/src/replication/traits.rs @@ -0,0 +1,68 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later + +use anyhow::Result; +use async_trait::async_trait; +use p2panda_rs::schema::SchemaId; + +use crate::replication::{Mode, StrategyMessage, StrategyResult, TargetSet}; + +#[async_trait] +pub trait Strategy: std::fmt::Debug + StrategyClone { + /// Replication mode of this strategy. + fn mode(&self) -> Mode; + + /// Target set replication is occurring over. + fn target_set(&self) -> TargetSet; + + // Generate initial messages. + // + // @TODO: we want to pass the store in here too eventually. + async fn initial_messages(&self) -> Vec; + + // Handle incoming message and return response. + // + // @TODO: we want to pass the store in here too eventually. + async fn handle_message(&self, message: StrategyMessage) -> Result; + + // Validate and store entry and operation. + // + // @TODO: we want to pass the store in here too eventually. + async fn handle_entry( + &self, + schema_id: &SchemaId, + entry_bytes: Vec, + operation_bytes: Vec, + ) -> Result<()> { + // Validation: + // Check against schema_id and target_set if entry is what we've asked for + let _target_set = self.target_set(); + + // Further validation through our publish api stuff (?!) + + // Have an entry waiting lobby service here, to batch stuff?! + // Nice to check certificate pool in one go. + // Nice to not put too much pressure on the database. + Ok(()) + } +} + +/// This is a little trick so we can clone trait objects. +pub trait StrategyClone { + fn clone_box(&self) -> Box; +} + +impl StrategyClone for T +where + T: 'static + Strategy + Clone, +{ + fn clone_box(&self) -> Box { + Box::new(self.clone()) + } +} + +// We can now implement Clone manually by forwarding to clone_box. +impl Clone for Box { + fn clone(&self) -> Box { + self.clone_box() + } +} From 4637cc162f2198fb1a9efac04a34ba3e9714f61b Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 13:08:54 +0100 Subject: [PATCH 19/25] Update Session --- aquadoggo/src/replication/session.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index 2892eb738..70caaf719 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -3,8 +3,9 @@ use anyhow::Result; use p2panda_rs::schema::SchemaId; +use crate::replication::traits::Strategy; use crate::replication::{ - NaiveStrategy, Mode, SetReconciliationStrategy, Strategy, StrategyMessage, TargetSet, + Mode, NaiveStrategy, SetReconciliationStrategy, StrategyMessage, TargetSet, }; pub type SessionId = u64; @@ -16,7 +17,7 @@ pub enum SessionState { Done, } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct Session { // @TODO: Access to the store // store: Store @@ -26,36 +27,33 @@ pub struct Session { } impl Session { - pub fn new(id: &SessionId, target_set: &TargetSet, mode: Mode) -> Self { + pub fn new(id: &SessionId, target_set: &TargetSet, mode: &Mode) -> Self { match mode { Mode::Naive => { - let strategy = Box::new(NaiveStrategy { - mode, - target_set: target_set.clone(), - }); - return Session { + let strategy = Box::new(NaiveStrategy::new(target_set, mode)); + return Self { id: id.clone(), state: SessionState::Pending, strategy, }; } Mode::SetReconciliation => { - let strategy = Box::new(SetReconciliationStrategy()); - return Session { + let strategy = Box::new(SetReconciliationStrategy::new()); + return Self { id: id.clone(), state: SessionState::Pending, strategy, }; } - Unknown => panic!("Unknown replication mode found"), + Mode::Unknown => panic!("Unknown replication mode found"), } } - pub fn mode(&self) -> &Mode { - &self.strategy.mode() + pub fn mode(&self) -> Mode { + self.strategy.mode() } - pub fn target_set(&self) -> &TargetSet { - &self.strategy.target_set() + pub fn target_set(&self) -> TargetSet { + self.strategy.target_set() } } From 88856523628e54cfd4390230d00fa0eff8410681 Mon Sep 17 00:00:00 2001 From: Sam Andreae Date: Thu, 11 May 2023 13:09:46 +0100 Subject: [PATCH 20/25] Update SessionManager to use new Session api --- aquadoggo/src/replication/manager.rs | 35 ++++++++++++++++++---------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index a11fed0a3..9d2dd6cd6 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -38,8 +38,14 @@ where } /// Register a new session in manager. - fn insert_session(&mut self, remote_peer: &P, session_id: &SessionId, target_set: &TargetSet) { - let session = Session::new(session_id, target_set); + fn insert_session( + &mut self, + remote_peer: &P, + session_id: &SessionId, + target_set: &TargetSet, + mode: &Mode, + ) { + let session = Session::new(session_id, target_set, mode); if let Some(sessions) = self.sessions.get_mut(remote_peer) { sessions.push(session); @@ -52,13 +58,16 @@ where &mut self, remote_peer: &P, target_set: &TargetSet, + mode: &Mode, ) -> Result<(), ReplicationError> { + SyncManager::

::is_mode_supported(mode)?; + let sessions = self.get_sessions(remote_peer); // Make sure to not have duplicate sessions over the same schema ids let session = sessions .iter() - .find(|session| &session.target_set == target_set); + .find(|session| session.target_set() == *target_set); if let Some(session) = session { return Err(ReplicationError::DuplicateOutboundRequest(session.id)); @@ -73,7 +82,7 @@ where } }; - self.insert_session(remote_peer, &session_id, target_set); + self.insert_session(remote_peer, &session_id, target_set, mode); Ok(()) } @@ -112,7 +121,7 @@ where }; if accept_inbound_request { - self.insert_session(remote_peer, &session.id, target_set); + self.insert_session(remote_peer, &session.id, target_set, &session.mode()); // @TODO: Session needs to generate some messages on creation and // it will pass them back up to us to then forward onto @@ -120,8 +129,8 @@ where // If we dropped our own outbound session request regarding a different target set, we // need to re-establish it with another session id, otherwise it would get lost - if &session.target_set != target_set { - self.initiate_session(remote_peer, target_set)?; + if session.target_set() != *target_set { + self.initiate_session(remote_peer, target_set, &session.mode())?; // @TODO: Again, the new session will generate a message // which we send onto the swarm } @@ -155,12 +164,12 @@ where // rejected because it is clearly redundant if let Some(session) = sessions .iter() - .find(|session| &session.target_set == target_set) + .find(|session| session.target_set() == *target_set) { return Err(ReplicationError::DuplicateInboundRequest(session.id)); } - self.insert_session(remote_peer, session_id, target_set); + self.insert_session(remote_peer, session_id, target_set, mode); Ok(()) } @@ -223,16 +232,18 @@ mod tests { #[from(random_target_set)] target_set_1: TargetSet, #[from(random_target_set)] target_set_2: TargetSet, ) { + let mode = Mode::Naive; + let mut manager = SyncManager::new(PEER_ID_LOCAL); - let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1); + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode); assert!(result.is_ok()); let mut manager = SyncManager::new(PEER_ID_LOCAL); - let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2); + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2, &mode); assert!(result.is_ok()); // Expect error when initiating a session for the same target set - let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1); + let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode); assert!(matches!( result, Err(ReplicationError::DuplicateOutboundRequest(0)) From a0394873384e5e70699c7fff658c53268820d3c2 Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 14:59:48 +0200 Subject: [PATCH 21/25] Use latest version of p2panda-rs --- Cargo.lock | 2 +- aquadoggo/Cargo.toml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e200b090a..d54e59690 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3229,7 +3229,7 @@ dependencies = [ [[package]] name = "p2panda-rs" version = "0.7.0" -source = "git+https://github.com/p2panda/p2panda?rev=50f9f0b02570c8ed895baf499cb0d98bbadd1687#50f9f0b02570c8ed895baf499cb0d98bbadd1687" +source = "git+https://github.com/p2panda/p2panda?rev=679107b636eea39e7713d9b21ac69df28aaad97d#679107b636eea39e7713d9b21ac69df28aaad97d" dependencies = [ "arrayvec 0.5.2", "async-trait", diff --git a/aquadoggo/Cargo.toml b/aquadoggo/Cargo.toml index b53fade83..d43e8d0ce 100644 --- a/aquadoggo/Cargo.toml +++ b/aquadoggo/Cargo.toml @@ -50,7 +50,7 @@ lipmaa-link = "0.2.2" log = "0.4.17" once_cell = "1.17.0" openssl-probe = "0.1.5" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [ "storage-provider", ] } serde = { version = "1.0.152", features = ["derive"] } @@ -81,7 +81,7 @@ env_logger = "0.9.0" http = "0.2.9" hyper = "0.14.19" once_cell = "1.17.0" -p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [ +p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [ "test-utils", "storage-provider", ] } From 6e2deaba90e00f2ec9f853a05f927a5ad226bbe1 Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 15:03:17 +0200 Subject: [PATCH 22/25] A little bit of clippy happyness --- aquadoggo/src/replication/session.rs | 33 ++++++++----------------- aquadoggo/src/replication/strategies.rs | 3 +-- 2 files changed, 11 insertions(+), 25 deletions(-) diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index 70caaf719..82dc7a42b 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -1,12 +1,7 @@ // SPDX-License-Identifier: AGPL-3.0-or-later -use anyhow::Result; -use p2panda_rs::schema::SchemaId; - use crate::replication::traits::Strategy; -use crate::replication::{ - Mode, NaiveStrategy, SetReconciliationStrategy, StrategyMessage, TargetSet, -}; +use crate::replication::{Mode, NaiveStrategy, SetReconciliationStrategy, TargetSet}; pub type SessionId = u64; @@ -28,24 +23,16 @@ pub struct Session { impl Session { pub fn new(id: &SessionId, target_set: &TargetSet, mode: &Mode) -> Self { - match mode { - Mode::Naive => { - let strategy = Box::new(NaiveStrategy::new(target_set, mode)); - return Self { - id: id.clone(), - state: SessionState::Pending, - strategy, - }; - } - Mode::SetReconciliation => { - let strategy = Box::new(SetReconciliationStrategy::new()); - return Self { - id: id.clone(), - state: SessionState::Pending, - strategy, - }; - } + let strategy: Box = match mode { + Mode::Naive => Box::new(NaiveStrategy::new(target_set, mode)), + Mode::SetReconciliation => Box::new(SetReconciliationStrategy::new()), Mode::Unknown => panic!("Unknown replication mode found"), + }; + + Self { + id: *id, + state: SessionState::Pending, + strategy, } } diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs index 90806d1db..8e0050bf6 100644 --- a/aquadoggo/src/replication/strategies.rs +++ b/aquadoggo/src/replication/strategies.rs @@ -2,7 +2,6 @@ use anyhow::Result; use async_trait::async_trait; -use p2panda_rs::schema::SchemaId; use crate::replication::traits::Strategy; use crate::replication::{Mode, StrategyMessage, TargetSet}; @@ -50,7 +49,7 @@ impl Strategy for NaiveStrategy { // TODO: Verify that the TargetSet contained in the message is a sub-set of the passed // local TargetSet. let _target_set = self.target_set(); - let mut messages = Vec::new(); + let messages = Vec::new(); let mut is_done = false; match message { From 521dd0949705e12a78af265c83bada412b86996a Mon Sep 17 00:00:00 2001 From: adz Date: Thu, 11 May 2023 15:40:33 +0200 Subject: [PATCH 23/25] Reject duplicate session if it came from remote --- aquadoggo/src/replication/manager.rs | 18 +++++++++++------- aquadoggo/src/replication/session.rs | 11 ++++++++++- 2 files changed, 21 insertions(+), 8 deletions(-) diff --git a/aquadoggo/src/replication/manager.rs b/aquadoggo/src/replication/manager.rs index de960c266..b289f92bd 100644 --- a/aquadoggo/src/replication/manager.rs +++ b/aquadoggo/src/replication/manager.rs @@ -44,8 +44,9 @@ where session_id: &SessionId, target_set: &TargetSet, mode: &Mode, + local: bool, ) { - let session = Session::new(session_id, target_set, mode); + let session = Session::new(session_id, target_set, mode, local); if let Some(sessions) = self.sessions.get_mut(remote_peer) { sessions.push(session); @@ -82,7 +83,7 @@ where } }; - self.insert_session(remote_peer, &session_id, target_set, mode); + self.insert_session(remote_peer, &session_id, target_set, mode, true); Ok(()) } @@ -107,7 +108,10 @@ where SessionState::Pending => { if &self.local_peer < remote_peer { // Drop our pending session - let mut sessions = self.get_sessions(remote_peer); + let sessions = self + .sessions + .get_mut(remote_peer) + .expect("Expected at least one pending session"); sessions.remove(index); // Accept the inbound request @@ -121,7 +125,7 @@ where }; if accept_inbound_request { - self.insert_session(remote_peer, &session.id, target_set, &session.mode()); + self.insert_session(remote_peer, &session.id, target_set, &session.mode(), false); // @TODO: Session needs to generate some messages on creation and // it will pass them back up to us to then forward onto @@ -155,7 +159,7 @@ where if let Some((index, session)) = sessions .iter() .enumerate() - .find(|(_, session)| session.id == *session_id) + .find(|(_, session)| session.id == *session_id && session.local) { return self.handle_duplicate_session(remote_peer, target_set, index, session); } @@ -169,7 +173,7 @@ where return Err(ReplicationError::DuplicateInboundRequest(session.id)); } - self.insert_session(remote_peer, session_id, target_set, mode); + self.insert_session(remote_peer, session_id, target_set, mode, false); Ok(()) } @@ -247,7 +251,7 @@ mod tests { let result = manager.handle_message(&PEER_ID_REMOTE, &message); assert!(result.is_ok()); - // Reject session with duplicate session id + // Reject attempt to create session again let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone()); let result = manager.handle_message(&PEER_ID_REMOTE, &message); assert!(matches!( diff --git a/aquadoggo/src/replication/session.rs b/aquadoggo/src/replication/session.rs index 82dc7a42b..9bb54a5b0 100644 --- a/aquadoggo/src/replication/session.rs +++ b/aquadoggo/src/replication/session.rs @@ -16,13 +16,21 @@ pub enum SessionState { pub struct Session { // @TODO: Access to the store // store: Store + /// Unique identifier of this session for that peer. pub id: SessionId, + + /// Current state of this session. pub state: SessionState, + + /// True if session was established by us. + pub local: bool, + + /// Replication strategy handler. pub strategy: Box, } impl Session { - pub fn new(id: &SessionId, target_set: &TargetSet, mode: &Mode) -> Self { + pub fn new(id: &SessionId, target_set: &TargetSet, mode: &Mode, local: bool) -> Self { let strategy: Box = match mode { Mode::Naive => Box::new(NaiveStrategy::new(target_set, mode)), Mode::SetReconciliation => Box::new(SetReconciliationStrategy::new()), @@ -33,6 +41,7 @@ impl Session { id: *id, state: SessionState::Pending, strategy, + local, } } From 5e721aa547e17aafa60437bd0a45d2e867040aac Mon Sep 17 00:00:00 2001 From: adz Date: Mon, 15 May 2023 11:41:22 +0200 Subject: [PATCH 24/25] Make clippy happy --- aquadoggo/src/replication/strategies.rs | 2 +- aquadoggo/src/replication/traits.rs | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/aquadoggo/src/replication/strategies.rs b/aquadoggo/src/replication/strategies.rs index 8e0050bf6..bafbf12fa 100644 --- a/aquadoggo/src/replication/strategies.rs +++ b/aquadoggo/src/replication/strategies.rs @@ -89,7 +89,7 @@ impl Strategy for SetReconciliationStrategy { todo!() } - async fn handle_message(&self, message: StrategyMessage) -> Result { + async fn handle_message(&self, _message: StrategyMessage) -> Result { todo!() } } diff --git a/aquadoggo/src/replication/traits.rs b/aquadoggo/src/replication/traits.rs index 23936e0ea..323bb2931 100644 --- a/aquadoggo/src/replication/traits.rs +++ b/aquadoggo/src/replication/traits.rs @@ -29,9 +29,9 @@ pub trait Strategy: std::fmt::Debug + StrategyClone { // @TODO: we want to pass the store in here too eventually. async fn handle_entry( &self, - schema_id: &SchemaId, - entry_bytes: Vec, - operation_bytes: Vec, + _schema_id: &SchemaId, + _entry_bytes: Vec, + _operation_bytes: Vec, ) -> Result<()> { // Validation: // Check against schema_id and target_set if entry is what we've asked for From 3c1970175516c9807fee9e0e5103a48840f69650 Mon Sep 17 00:00:00 2001 From: adz Date: Mon, 15 May 2023 11:50:21 +0200 Subject: [PATCH 25/25] Add entry to CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05d209402..24998e759 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - SQL query for collections [#311](https://github.com/p2panda/aquadoggo/pull/311) - Add custom validation to GraphQL scalar types [#318](https://github.com/p2panda/aquadoggo/pull/318) - Introduce property tests for GraphQL query API with `proptest` [#338](https://github.com/p2panda/aquadoggo/pull/338) +- Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363) ### Changed