Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replication protocol session manager #363

Merged
merged 27 commits into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
544f39e
Introduce SyncManager handling session initiation and incoming messages
adzialocha May 10, 2023
3513e44
WIP: Handle incoming SyncRequest message
sandreae May 10, 2023
2224aad
Move it into separate files
adzialocha May 11, 2023
49491f7
Refactoring SyncManager methods
adzialocha May 11, 2023
8500c35
Fix some clippy warnings in SyncManager code
adzialocha May 11, 2023
2fa86d9
Distinct duplicate session errors by in- or outbound
adzialocha May 11, 2023
54370c4
De-duplicate schema ids in target set, add some tests
adzialocha May 11, 2023
f68b95b
Use a fixture to generate random target set for tests
adzialocha May 11, 2023
573c229
Introduce scoped message
sandreae May 11, 2023
737996d
Rename ScopedMessage -> StrategyMessage
sandreae May 11, 2023
4679583
Make StrategyMessage public
sandreae May 11, 2023
2ebbf59
Define strategy trait
sandreae May 11, 2023
651616a
Implement Strategy trait for NaiveStrategy
sandreae May 11, 2023
7ffb120
Integrate strategies onto Session
sandreae May 11, 2023
7940e4d
Fix test
adzialocha May 11, 2023
e5eae31
Remove duplicate Mode structs
sandreae May 11, 2023
9a043f3
Add inbound test
adzialocha May 11, 2023
f9ee186
Move traits into own module and implement Clone for Strategy
sandreae May 11, 2023
4637cc1
Update Session
sandreae May 11, 2023
8885652
Update SessionManager to use new Session api
sandreae May 11, 2023
99a02ba
Merge branch 'sync-session' into replication-protocol
adzialocha May 11, 2023
8d2aaa1
Merge branch 'main' into replication-protocol
adzialocha May 11, 2023
a039487
Use latest version of p2panda-rs
adzialocha May 11, 2023
6e2deab
A little bit of clippy happyness
adzialocha May 11, 2023
521dd09
Reject duplicate session if it came from remote
adzialocha May 11, 2023
5e721aa
Make clippy happy
adzialocha May 15, 2023
3c19701
Add entry to CHANGELOG.md
adzialocha May 15, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- SQL query for collections [#311](https://github.com/p2panda/aquadoggo/pull/311)
- Add custom validation to GraphQL scalar types [#318](https://github.com/p2panda/aquadoggo/pull/318)
- Introduce property tests for GraphQL query API with `proptest` [#338](https://github.com/p2panda/aquadoggo/pull/338)
- Replication protocol session manager [#363](https://github.com/p2panda/aquadoggo/pull/363)

### Changed

Expand Down
14 changes: 7 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions aquadoggo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lipmaa-link = "0.2.2"
log = "0.4.17"
once_cell = "1.17.0"
openssl-probe = "0.1.5"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [
"storage-provider",
] }
serde = { version = "1.0.152", features = ["derive"] }
Expand Down Expand Up @@ -81,7 +81,7 @@ env_logger = "0.9.0"
http = "0.2.9"
hyper = "0.14.19"
once_cell = "1.17.0"
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "50f9f0b02570c8ed895baf499cb0d98bbadd1687", features = [
p2panda-rs = { git = "https://github.com/p2panda/p2panda", rev = "679107b636eea39e7713d9b21ac69df28aaad97d", features = [
"test-utils",
"storage-provider",
] }
Expand Down
3 changes: 3 additions & 0 deletions aquadoggo/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ mod manager;
mod materializer;
mod network;
mod node;
// @TODO: Remove this as soon as we use the replication code
#[allow(dead_code)]
mod replication;
mod schema;
#[cfg(test)]
mod test_utils;
Expand Down
2 changes: 0 additions & 2 deletions aquadoggo/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ use triggered::{Listener, Trigger};
/// Sends messages through the communication bus between services.
pub type Sender<T> = broadcast::Sender<T>;

// pub type ServiceReady = oneshot::channel<()>;

// Receives ready signal from services once they are ready to handle messages on the communication bus.
pub type ServiceReadyReceiver = oneshot::Receiver<()>;

Expand Down
15 changes: 15 additions & 0 deletions aquadoggo/src/replication/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use thiserror::Error;

#[derive(Error, Debug)]
pub enum ReplicationError {
#[error("Remote peer requested unsupported replication mode")]
UnsupportedMode,

#[error("Tried to initialise duplicate inbound replication session with id {0}")]
DuplicateInboundRequest(u64),

#[error("Tried to initialise duplicate outbound replication session with id {0}")]
DuplicateOutboundRequest(u64),
}
286 changes: 286 additions & 0 deletions aquadoggo/src/replication/manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,286 @@
// SPDX-License-Identifier: AGPL-3.0-or-later

use std::collections::HashMap;

use anyhow::Result;

use crate::replication::errors::ReplicationError;
use crate::replication::{Mode, Session, SessionId, SessionState, SyncMessage, TargetSet};

pub const INITIAL_SESSION_ID: SessionId = 0;

pub const SUPPORTED_MODES: [Mode; 1] = [Mode::Naive];

#[derive(Debug)]
pub struct SyncManager<P> {
local_peer: P,
sessions: HashMap<P, Vec<Session>>,
}

impl<P> SyncManager<P>
where
P: Clone + std::hash::Hash + Eq + PartialOrd,
{
pub fn new(local_peer: P) -> Self {
Self {
local_peer,
sessions: HashMap::new(),
}
}

/// Get all sessions related to a remote peer.
fn get_sessions(&self, remote_peer: &P) -> Vec<Session> {
self.sessions
.get(remote_peer)
// Always return an array, even when it is empty
.unwrap_or(&vec![])
.to_owned()
}

/// Register a new session in manager.
fn insert_session(
&mut self,
remote_peer: &P,
session_id: &SessionId,
target_set: &TargetSet,
mode: &Mode,
local: bool,
) {
let session = Session::new(session_id, target_set, mode, local);

if let Some(sessions) = self.sessions.get_mut(remote_peer) {
sessions.push(session);
} else {
self.sessions.insert(remote_peer.clone(), vec![session]);
}
}

pub fn initiate_session(
&mut self,
remote_peer: &P,
target_set: &TargetSet,
mode: &Mode,
) -> Result<(), ReplicationError> {
SyncManager::<P>::is_mode_supported(mode)?;

let sessions = self.get_sessions(remote_peer);

// Make sure to not have duplicate sessions over the same schema ids
let session = sessions
.iter()
.find(|session| session.target_set() == *target_set);

if let Some(session) = session {
return Err(ReplicationError::DuplicateOutboundRequest(session.id));
}

// Determine next id for upcoming session
let session_id = {
if let Some(session) = sessions.last() {
session.id + 1
} else {
INITIAL_SESSION_ID
}
};

self.insert_session(remote_peer, &session_id, target_set, mode, true);

Ok(())
}

fn is_mode_supported(mode: &Mode) -> Result<(), ReplicationError> {
if !SUPPORTED_MODES.contains(mode) {
return Err(ReplicationError::UnsupportedMode);
}

Ok(())
}

fn handle_duplicate_session(
&mut self,
remote_peer: &P,
target_set: &TargetSet,
index: usize,
session: &Session,
) -> Result<(), ReplicationError> {
let accept_inbound_request = match session.state {
// Handle only duplicate sessions when they haven't started yet
SessionState::Pending => {
if &self.local_peer < remote_peer {
// Drop our pending session
let sessions = self
.sessions
.get_mut(remote_peer)
.expect("Expected at least one pending session");
sessions.remove(index);

// Accept the inbound request
true
} else {
// Keep our pending session, ignore inbound request
false
}
}
_ => return Err(ReplicationError::DuplicateInboundRequest(session.id)),
};

if accept_inbound_request {
self.insert_session(remote_peer, &session.id, target_set, &session.mode(), false);

// @TODO: Session needs to generate some messages on creation and
// it will pass them back up to us to then forward onto
// the swarm

// If we dropped our own outbound session request regarding a different target set, we
// need to re-establish it with another session id, otherwise it would get lost
if session.target_set() != *target_set {
self.initiate_session(remote_peer, target_set, &session.mode())?;
// @TODO: Again, the new session will generate a message
// which we send onto the swarm
}
}

Ok(())
}

fn handle_sync_request(
&mut self,
remote_peer: &P,
mode: &Mode,
session_id: &SessionId,
target_set: &TargetSet,
) -> Result<(), ReplicationError> {
SyncManager::<P>::is_mode_supported(mode)?;

let sessions = self.get_sessions(remote_peer);

// Check if a session with this id already exists for this peer, this can happen if both
// peers started to initiate a session at the same time, we can try to resolve this
if let Some((index, session)) = sessions
.iter()
.enumerate()
.find(|(_, session)| session.id == *session_id && session.local)
{
return self.handle_duplicate_session(remote_peer, target_set, index, session);
}

// Check if a session with this target set already exists for this peer, this always gets
// rejected because it is clearly redundant
if let Some(session) = sessions
.iter()
.find(|session| session.target_set() == *target_set)
{
return Err(ReplicationError::DuplicateInboundRequest(session.id));
}

self.insert_session(remote_peer, session_id, target_set, mode, false);

Ok(())
}

pub fn handle_message(
&mut self,
remote_peer: &P,
message: &SyncMessage,
) -> Result<(), ReplicationError> {
match message {
SyncMessage::SyncRequest(mode, session_id, target_set) => {
self.handle_sync_request(remote_peer, mode, session_id, target_set)
}
SyncMessage::Other => todo!(),
}
}
}

#[cfg(test)]
mod tests {
use p2panda_rs::schema::{SchemaId, SchemaName};
use p2panda_rs::test_utils::fixtures::random_document_view_id;
use rstest::rstest;

use crate::replication::errors::ReplicationError;
use crate::replication::{Mode, SyncMessage, TargetSet};

use super::{SyncManager, INITIAL_SESSION_ID};

const PEER_ID_LOCAL: &'static str = "local";
const PEER_ID_REMOTE: &'static str = "remote";

#[rstest::fixture]
fn random_target_set() -> TargetSet {
let document_view_id = random_document_view_id();
let schema_id =
SchemaId::new_application(&SchemaName::new("messages").unwrap(), &document_view_id);
TargetSet::new(&[schema_id])
}

#[rstest]
fn initiate_outbound_session(
#[from(random_target_set)] target_set_1: TargetSet,
#[from(random_target_set)] target_set_2: TargetSet,
) {
let mode = Mode::Naive;

let mut manager = SyncManager::new(PEER_ID_LOCAL);
let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode);
assert!(result.is_ok());

let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_2, &mode);
assert!(result.is_ok());

// Expect error when initiating a session for the same target set
let result = manager.initiate_session(&PEER_ID_REMOTE, &target_set_1, &mode);
assert!(matches!(
result,
Err(ReplicationError::DuplicateOutboundRequest(0))
));
}

#[rstest]
fn initiate_inbound_session(
#[from(random_target_set)] target_set_1: TargetSet,
#[from(random_target_set)] target_set_2: TargetSet,
) {
let mut manager = SyncManager::new(PEER_ID_LOCAL);

let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_ok());

let message = SyncMessage::SyncRequest(Mode::Naive, 1, target_set_2.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_ok());

// Reject attempt to create session again
let message = SyncMessage::SyncRequest(Mode::Naive, 0, target_set_1.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(matches!(
result,
Err(ReplicationError::DuplicateInboundRequest(0))
));

// Reject different session concerning same target set
let message = SyncMessage::SyncRequest(Mode::Naive, 2, target_set_2.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(matches!(
result,
Err(ReplicationError::DuplicateInboundRequest(1))
));
}

#[rstest]
fn inbound_checks_supported_mode(#[from(random_target_set)] target_set: TargetSet) {
// Should not fail when requesting supported replication mode
let mut manager = SyncManager::new(PEER_ID_LOCAL);
let message = SyncMessage::SyncRequest(Mode::Naive, INITIAL_SESSION_ID, target_set.clone());
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_ok());

// Should fail when requesting unsupported replication mode
let mut manager = SyncManager::new(PEER_ID_LOCAL);
let message =
SyncMessage::SyncRequest(Mode::SetReconciliation, INITIAL_SESSION_ID, target_set);
let result = manager.handle_message(&PEER_ID_REMOTE, &message);
assert!(result.is_err());
}
}
Loading
Loading