Skip to content

Commit

Permalink
Replication protocol session manager (#363)
Browse files Browse the repository at this point in the history
* Introduce SyncManager handling session initiation and incoming messages

* WIP: Handle incoming SyncRequest message

* Move it into separate files

* Refactoring SyncManager methods

* Fix some clippy warnings in SyncManager code

* Distinct duplicate session errors by in- or outbound

* De-duplicate schema ids in target set, add some tests

* Use a fixture to generate random target set for tests

* Introduce scoped message

* Rename ScopedMessage -> StrategyMessage

* Make StrategyMessage public

* Define strategy trait

* Implement Strategy trait for NaiveStrategy

* Integrate strategies onto Session

* Fix test

* Remove duplicate Mode structs

* Add inbound test

* Move traits into own module and implement Clone for Strategy

* Update Session

* Update SessionManager to use new Session api

* Use latest version of p2panda-rs

* A little bit of clippy happyness

* Reject duplicate session if it came from remote

* Make clippy happy

* Add entry to CHANGELOG.md

---------

Co-authored-by: Sam Andreae <contact@samandreae.com>
  • Loading branch information
adzialocha and sandreae committed May 15, 2023
1 parent 20516f5 commit 11744f7
Show file tree
Hide file tree
Showing 12 changed files with 630 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- SQL query for collections [#311](https://github.com/p2panda/aquadoggo/pull/311)
- Add custom validation to GraphQL scalar types [#318](https://github.com/p2panda/aquadoggo/pull/318)
- Introduce property tests for GraphQL query API with `proptest` [#338](https://github.com/p2panda/aquadoggo/pull/338)
- Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363)

### Changed

Expand Down
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lipmaa-link = "0.2.2"
log = "0.4.17"
once_cell = "1.17.0"
openssl-probe = "0.1.5"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [
"storage-provider",
] }
serde = { version = "1.0.152", features = ["derive"] }
Expand Down Expand Up @@ -81,7 +81,7 @@ env_logger = "0.9.0"
http = "0.2.9"
hyper = "0.14.19"
once_cell = "1.17.0"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [
"test-utils",
"storage-provider",
] }
Expand Down
3 changes: 3 additions & 0 deletions aquadoggo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ mod manager;
mod materializer;
mod network;
mod node;
// @TODO: Remove this as soon as we use the replication code
#[allow(dead_code)]
mod replication;
mod schema;
#[cfg(test)]
mod test_utils;
Expand Down
2 changes: 0 additions & 2 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use triggered::{Listener, Trigger};
/// Sends messages through the communication bus between services.
pub type Sender<T> = broadcast::Sender<T>;

// pub type ServiceReady = oneshot::channel<()>;

// Receives ready signal from services once they are ready to handle messages on the communication bus.
pub type ServiceReadyReceiver = oneshot::Receiver<()>;

Expand Down
15 changes: 15 additions & 0 deletions aquadoggo/src/replication/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use thiserror::Error;

#[derive(Error, Debug)]
pub enum ReplicationError {
#[error("Remote peer requested unsupported replication mode")]
UnsupportedMode,

#[error("Tried to initialise duplicate inbound replication session with id {0}")]
DuplicateInboundRequest(u64),

#[error("Tried to initialise duplicate outbound replication session with id {0}")]
DuplicateOutboundRequest(u64),
}
286 changes: 286 additions & 0 deletions aquadoggo/src/replication/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::collections::HashMap;

use anyhow::Result;

use crate::replication::errors::ReplicationError;
use crate::replication::{Mode, Session, SessionId, SessionState, SyncMessage, TargetSet};

pub const INITIAL_SESSION_ID: SessionId = 0;

pub const SUPPORTED_MODES: [Mode; 1] = [Mode::Naive];

#[derive(Debug)]
pub struct SyncManager<P> {
local_peer: P,
sessions: HashMap<P, Vec<Session>>,
}

impl<P> SyncManager<P>
where
P: Clone + std::hash::Hash + Eq + PartialOrd,
{
pub fn new(local_peer: P) -> Self {
Self {
local_peer,
sessions: HashMap::new(),
}
}

/// Get all sessions related to a remote peer.
fn get_sessions(&self, remote_peer: &P) -> Vec<Session> {
self.sessions
.get(remote_peer)
// Always return an array, even when it is empty
.unwrap_or(&vec![])
.to_owned()
}

/// Register a new session in manager.
fn insert_session(
&mut self,
remote_peer: &P,
session_id: &SessionId,
target_set: &TargetSet,
mode: &Mode,
local: bool,
) {
let session = Session::new(session_id, target_set, mode, local);

if let Some(sessions) = self.sessions.get_mut(remote_peer) {
sessions.push(session);
} else {
self.sessions.insert(remote_peer.clone(), vec![session]);
}
}

pub fn initiate_session(
&mut self,
remote_peer: &P,
target_set: &TargetSet,
mode: &Mode,
) -> Result<(), ReplicationError> {
SyncManager::<P>::is_mode_supported(mode)?;

let sessions = self.get_sessions(remote_peer);

// Make sure to not have duplicate sessions over the same schema ids
let session = sessions
.iter()
.find(|session| session.target_set() == *target_set);

if let Some(session) = session {
return Err(ReplicationError::DuplicateOutboundRequest(session.id));
}

// Determine next id for upcoming session
let session_id = {
if let Some(session) = sessions.last() {
session.id + 1
} else {
INITIAL_SESSION_ID
}
};

self.insert_session(remote_peer, &session_id, target_set, mode, true);

Ok(())
}

fn is_mode_supported(mode: &Mode) -> Result<(), ReplicationError> {
if !SUPPORTED_MODES.contains(mode) {
return Err(ReplicationError::UnsupportedMode);
}

Ok(())
}

fn handle_duplicate_session(
&mut self,
remote_peer: &P,
target_set: &TargetSet,
index: usize,
session: &Session,
) -> Result<(), ReplicationError> {
let accept_inbound_request = match session.state {
// Handle only duplicate sessions when they haven't started yet
SessionState::Pending => {
if &self.local_peer < remote_peer {
// Drop our pending session
let sessions = self
.sessions
.get_mut(remote_peer)
.expect("Expected at least one pending session");
sessions.remove(index);

// Accept the inbound request
true
} else {
// Keep our pending session, ignore inbound request
false
}
}
_ => return Err(ReplicationError::DuplicateInboundRequest(session.id)),
};

if accept_inbound_request {
self.insert_session(remote_peer, &session.id, target_set, &session.mode(), false);

// @TODO: Session needs to generate some messages on creation and
// it will pass them back up to us to then forward onto
// the swarm

// If we dropped our own outbound session request regarding a different target set, we
// need to re-establish it with another session id, otherwise it would get lost
if session.target_set() != *target_set {
self.initiate_session(remote_peer, target_set, &session.mode())?;
// @TODO: Again, the new session will generate a message
// which we send onto the swarm
}
}

Ok(())
}

fn handle_sync_request(
&mut self,
remote_peer: &P,
mode: &Mode,
session_id: &SessionId,
target_set: &TargetSet,
) -> Result<(), ReplicationError> {
SyncManager::<P>::is_mode_supported(mode)?;

let sessions = self.get_sessions(remote_peer);

// Check if a session with this id already exists for this peer, this can happen if both
// peers started to initiate a session at the same time, we can try to resolve this
if let Some((index, session)) = sessions
.iter()
.enumerate()
.find(|(_, session)| session.id == *session_id && session.local)
{
return self.handle_duplicate_session(remote_peer, target_set, index, session);
}

// Check if a session with this target set already exists for this peer, this always gets
// rejected because it is clearly redundant
if let Some(session) = sessions
.iter()
.find(|session| session.target_set() == *target_set)
{
return Err(ReplicationError::DuplicateInboundRequest(session.id));
}

self.insert_session(remote_peer, session_id, target_set, mode, false);

Ok(())
}

pub fn handle_message(
&mut self,
remote_peer: &P,
message: &SyncMessage,
) -> Result<(), ReplicationError> {
match message {
SyncMessage::SyncRequest(mode, session_id, target_set) => {
self.handle_sync_request(remote_peer, mode, session_id, target_set)
}
SyncMessage::Other => todo!(),
}
}
}

#[cfg(test)]
mod tests {
use p2panda_rs::schema::{SchemaId, SchemaName};
use p2panda_rs::test_utils::fixtures::random_document_view_id;
use rstest::rstest;

use crate::replication::errors::ReplicationError;
use crate::replication::{Mode, SyncMessage, TargetSet};

use super::{SyncManager, INITIAL_SESSION_ID};

const PEER_ID_LOCAL: &'static str = "local";
const PEER_ID_REMOTE: &'static str = "remote";

#[rstest::fixture]
fn random_target_set() -> TargetSet {
let document_view_id = random_document_view_id();
let schema_id =
SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id);
TargetSet::new(&[schema_id])
}

#[rstest]
fn initiate_outbound_session(
#[from(random_target_set)] target_set_1: TargetSet,
#[from(random_target_set)] target_set_2: TargetSet,
) {
let mode = Mode::Naive;

let mut manager = SyncManager::new(PEER_ID_LOCAL);
let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode);
assert!(result.is_ok());

let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2, &mode);
assert!(result.is_ok());

// Expect error when initiating a session for the same target set
let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode);
assert!(matches!(
result,
Err(ReplicationError::DuplicateOutboundRequest(0))
));
}

#[rstest]
fn initiate_inbound_session(
#[from(random_target_set)] target_set_1: TargetSet,
#[from(random_target_set)] target_set_2: TargetSet,
) {
let mut manager = SyncManager::new(PEER_ID_LOCAL);

let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_ok());

let message = SyncMessage::SyncRequest(Mode::Naive, 1, target_set_2.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_ok());

// Reject attempt to create session again
let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(matches!(
result,
Err(ReplicationError::DuplicateInboundRequest(0))
));

// Reject different session concerning same target set
let message = SyncMessage::SyncRequest(Mode::Naive, 2, target_set_2.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(matches!(
result,
Err(ReplicationError::DuplicateInboundRequest(1))
));
}

#[rstest]
fn inbound_checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) {
// Should not fail when requesting supported replication mode
let mut manager = SyncManager::new(PEER_ID_LOCAL);
let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_ok());

// Should fail when requesting unsupported replication mode
let mut manager = SyncManager::new(PEER_ID_LOCAL);
let message =
SyncMessage::SyncRequest(Mode::SetReconciliation, INITIAL_SESSION_ID, target_set);
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_err());
}
}
Loading

0 comments on commit 11744f7

Please sign in to comment.