Skip to content

Commit

Permalink
Merge branch 'main' into config-galore
Browse files Browse the repository at this point in the history
* main:
  Announce supported schema ids in network before replication (#515)
  • Loading branch information
adzialocha committed Aug 25, 2023
2 parents 626277c + 0c208d4 commit 4e0a167
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 31 deletions.
33 changes: 19 additions & 14 deletions aquadoggo/src/replication/announcement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@ pub fn now() -> u64 {

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Announcement {
/// This contains a list of schema ids this peer is interested in.
pub target_set: TargetSet,
/// This contains a list of schema ids this peer allowed to support.
// @TODO: `TargetSet` is not a good name anymore in this context. See related issue:
// https://github.com/p2panda/aquadoggo/issues/527
pub supported_schema_ids: TargetSet,

/// Timestamp of this announcement. Helps to understand if we can override the previous
/// announcement with a newer one.
pub timestamp: u64,
}

impl Announcement {
pub fn new(target_set: TargetSet) -> Self {
pub fn new(supported_schema_ids: TargetSet) -> Self {
Self {
timestamp: now(),
target_set,
supported_schema_ids,
}
}
}
Expand Down Expand Up @@ -65,7 +67,7 @@ impl Serialize for AnnouncementMessage {
seq.serialize_element(&ANNOUNCE_TYPE)?;
seq.serialize_element(&self.0)?;
seq.serialize_element(&self.1.timestamp)?;
seq.serialize_element(&self.1.target_set)?;
seq.serialize_element(&self.1.supported_schema_ids)?;
seq.end()
}
}
Expand Down Expand Up @@ -106,10 +108,10 @@ impl<'de> Deserialize<'de> for AnnouncementMessage {
serde::de::Error::custom("missing timestamp in announce message")
})?;

let target_set: TargetSet = seq.next_element()?.ok_or_else(|| {
let supported_schema_ids: TargetSet = seq.next_element()?.ok_or_else(|| {
serde::de::Error::custom("missing target set in announce message")
})?;
target_set.validate().map_err(|_| {
supported_schema_ids.validate().map_err(|_| {
serde::de::Error::custom("invalid target set in announce message")
})?;

Expand All @@ -124,7 +126,7 @@ impl<'de> Deserialize<'de> for AnnouncementMessage {
Ok(AnnouncementMessage(
protocol_version,
Announcement {
target_set,
supported_schema_ids,
timestamp,
},
))
Expand All @@ -148,24 +150,27 @@ mod tests {
use super::{Announcement, AnnouncementMessage};

#[rstest]
fn serialize(#[from(random_target_set)] target_set: TargetSet) {
let announcement = Announcement::new(target_set.clone());
fn serialize(#[from(random_target_set)] supported_schema_ids: TargetSet) {
let announcement = Announcement::new(supported_schema_ids.clone());
assert_eq!(
serialize_from(AnnouncementMessage::new(announcement.clone())),
serialize_value(cbor!([0, 1, announcement.timestamp, target_set]))
serialize_value(cbor!([0, 1, announcement.timestamp, supported_schema_ids]))
);
}

#[rstest]
fn deserialize(#[from(random_target_set)] target_set: TargetSet) {
fn deserialize(#[from(random_target_set)] supported_schema_ids: TargetSet) {
assert_eq!(
deserialize_into::<AnnouncementMessage>(&serialize_value(cbor!([
0, 1, 12345678, target_set
0,
1,
12345678,
supported_schema_ids
])))
.unwrap(),
AnnouncementMessage::new(Announcement {
timestamp: 12345678,
target_set,
supported_schema_ids,
})
);
}
Expand Down
36 changes: 19 additions & 17 deletions aquadoggo/src/replication/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,17 @@ impl ConnectionManager {

// If this is a SyncRequest message first we check if the contained target set matches our
// own locally configured one.
if let Message::SyncRequest(_, remote_target_set) = message.message() {
let local_target_set = &self
if let Message::SyncRequest(_, remote_supported_schema_ids) = message.message() {
let local_supported_schema_ids = &self
.announcement
.as_ref()
.expect("Announcement state needs to be set with 'update_announcement'")
.target_set;
.supported_schema_ids;

// If this node has been configured with an allow-list of schema ids then we check the
// If this node has been configured with a whitelist of schema ids then we check the
// target set of the requests matches our own, otherwise we skip this step and accept
// any target set.
if self.schema_provider.is_allow_list_active()
&& !local_target_set.is_valid_set(remote_target_set)
{
if self.schema_provider.is_allow_list_active() {
// If it doesn't match we signal that an error occurred and return at this point.
self.on_replication_error(peer, session_id, ReplicationError::UnsupportedTargetSet)
.await;
Expand Down Expand Up @@ -334,11 +332,11 @@ impl ConnectionManager {
/// Determine if we can attempt new replication sessions with the peers we currently know
/// about.
async fn update_sessions(&mut self) {
let local_target_set = &self
let local_supported_schema_ids = &self
.announcement
.as_ref()
.expect("Announcement state needs to be set with 'update_announcement'")
.target_set;
.supported_schema_ids;

// Iterate through all currently connected peers
let mut attempt_peers: Vec<(Peer, TargetSet)> = self
Expand All @@ -350,15 +348,19 @@ impl ConnectionManager {

// 1. Did we already receive this peers announcement state? If not we can't do
// anything yet and need to wait.
let remote_target_set: TargetSet = if let Some(announcement) = status.announcement {
announcement.target_set
} else {
return None;
};

// 2. Calculate intersection of local and remote target set. Do we have any
let remote_supported_schema_ids: TargetSet =
if let Some(announcement) = status.announcement {
announcement.supported_schema_ids
} else {
return None;
};

// 2. Calculate intersection of local and remote schema id sets. Do we have any
// supported schema id's in common?
let target_set = TargetSet::from_intersection(local_target_set, &remote_target_set);
let target_set = TargetSet::from_intersection(
local_supported_schema_ids,
&remote_supported_schema_ids,
);
if target_set.is_empty() {
return None;
}
Expand Down

0 comments on commit 4e0a167

Please sign in to comment.