Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

serialize churn #1864

Merged
merged 15 commits into from
Oct 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 30 additions & 1 deletion src/chain/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ pub struct Chain {
/// Temporary. Counting the accumulated prune events. Only used in tests until tests that
/// actually tests pruning is in place.
parsec_prune_accumulated: usize,
/// Marker indicating we are processing churn event
churn_in_progress: bool,
}

#[allow(clippy::len_without_is_empty)]
Expand Down Expand Up @@ -123,6 +125,7 @@ impl Chain {
chain_accumulator: Default::default(),
event_cache: Default::default(),
parsec_prune_accumulated: 0,
churn_in_progress: false,
}
}

Expand Down Expand Up @@ -252,6 +255,16 @@ impl Chain {
/// If the event is a `EldersInfo` or `NeighbourInfo`, it also updates the corresponding
/// containers.
pub fn poll(&mut self) -> Result<Option<(AccumulatingEvent, EldersChange)>, RoutingError> {
if self.state.handled_genesis_event
&& !self.churn_in_progress
&& self.state.change == PrefixChange::None
madadam marked this conversation as resolved.
Show resolved Hide resolved
{
if let Some(event) = self.state.churn_event_backlog.pop_back() {
trace!("{} churn backlog poll Accumulating event {:?}", self, event);
return Ok(Some((event, EldersChange::default())));
}
}

let (event, proofs) = {
let opt_event = self
.chain_accumulator
Expand Down Expand Up @@ -332,6 +345,17 @@ impl Chain {
| AccumulatingEvent::Relocate(_) => (),
}

let start_churn_event = match event {
AccumulatingEvent::Online(_) | AccumulatingEvent::Offline(_) => true,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also Relocate.

_ => false,
};

if start_churn_event && self.churn_in_progress {
trace!("{} churn backlog Accumulating event {:?}", self, event);
self.state.churn_event_backlog.push_front(event);
return Ok(None);
}

Ok(Some((event, EldersChange::default())))
}

Expand All @@ -358,6 +382,7 @@ impl Chain {

/// Adds a member to our section.
pub fn add_member(&mut self, p2p_node: P2pNode, age: u8) {
self.churn_in_progress = true;
self.assert_no_prefix_change("add member");

let pub_id = *p2p_node.public_id();
Expand Down Expand Up @@ -385,6 +410,7 @@ impl Chain {

/// Remove a member from our section.
pub fn remove_member(&mut self, pub_id: &PublicId) {
self.churn_in_progress = true;
self.assert_no_prefix_change("remove member");

if let Some(info) = self.state.our_members.get_mut(&pub_id) {
Expand Down Expand Up @@ -504,6 +530,7 @@ impl Chain {
let merges = mem::replace(&mut self.state.merging, Default::default())
.into_iter()
.map(|digest| AccumulatingEvent::NeighbourMerge(digest).into_network_event());
self.state.handled_genesis_event = false;

info!("{} - finalise_prefix_change: {:?}", self, self.our_prefix());
trace!("{} - finalise_prefix_change state: {:?}", self, self.state);
Expand Down Expand Up @@ -709,8 +736,9 @@ impl Chain {
self.state
.their_knowledge
.iter()
.find(|(prefix, _)| prefix.matches(&target.name()))
.filter(|(prefix, _)| target.is_compatible(prefix))
.map(|(_, index)| *index)
.min()
.unwrap_or(0)
}

Expand Down Expand Up @@ -896,6 +924,7 @@ impl Chain {
if is_new_elder {
self.is_elder = true;
}
self.churn_in_progress = false;
self.check_and_clean_neighbour_infos(None);
} else {
let ppfx = elders_info.prefix().popped();
Expand Down
152 changes: 87 additions & 65 deletions src/chain/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{
bls_emu::BlsPublicKeyForSectionKeyInfo, AccumulatingProof, AgeCounter, EldersInfo, MemberInfo,
MemberPersona, MemberState, MIN_AGE_COUNTER,
bls_emu::BlsPublicKeyForSectionKeyInfo, AccumulatingEvent, AccumulatingProof, AgeCounter,
EldersInfo, MemberInfo, MemberPersona, MemberState, MIN_AGE_COUNTER,
};
use crate::{
crypto::Digest256, error::RoutingError, id::PublicId, utils::LogIdent, BlsPublicKey,
Expand All @@ -32,6 +32,8 @@ const MAX_THEIR_RECENT_KEYS: usize = 10;
/// Section state that is shared among all elders of a section via Parsec consensus.
#[derive(Debug, PartialEq, Eq)]
pub struct SharedState {
/// Indicate whether nodes are shared state because genesis event was seen
pub handled_genesis_event: bool,
/// The new self elders info, that doesn't necessarily have a full set of signatures yet.
pub new_info: EldersInfo,
/// The latest few fully signed infos of our own sections.
Expand All @@ -57,6 +59,8 @@ pub struct SharedState {
pub their_knowledge: BTreeMap<Prefix<XorName>, u64>,
/// Recent keys removed from their_keys
pub their_recent_keys: VecDeque<(Prefix<XorName>, SectionKeyInfo)>,
/// Backlog of completed events that need to be processed when churn completes.
pub churn_event_backlog: VecDeque<AccumulatingEvent>,
}

impl SharedState {
Expand All @@ -81,6 +85,7 @@ impl SharedState {
.collect();

Self {
handled_genesis_event: false,
new_info: elders_info.clone(),
our_infos: NonEmptyList::new(elders_info),
neighbour_infos: Default::default(),
Expand All @@ -92,6 +97,7 @@ impl SharedState {
their_keys,
their_knowledge: Default::default(),
their_recent_keys: Default::default(),
churn_event_backlog: Default::default(),
}
}

Expand All @@ -100,6 +106,14 @@ impl SharedState {
related_info: &[u8],
log_ident: &LogIdent,
) -> Result<(), RoutingError> {
update_with_genesis_related_info_check_same(
log_ident,
"handled_genesis_event",
&self.handled_genesis_event,
&false,
);
self.handled_genesis_event = true;

if related_info.is_empty() {
return Ok(());
}
Expand All @@ -112,72 +126,58 @@ impl SharedState {
their_keys,
their_knowledge,
their_recent_keys,
churn_event_backlog,
) = serialisation::deserialise(related_info)?;
if self.our_infos.len() != 1 {
// Check nodes with a history before genesis match the genesis block:
if self.our_infos != our_infos {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different our_infos:\n{:?},\n{:?}",
log_ident,
self.our_infos,
our_infos
);
}
if self.our_history != our_history {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different our_history:\n{:?},\n{:?}",
log_ident,
self.our_history,
our_history
);
}
if self.our_members != our_members {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different our_members:\n{:?},\n{:?}",
log_ident,
self.our_members,
our_members
);
}
if self.neighbour_infos != neighbour_infos {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different neighbour_infos:\n{:?},\n{:?}",
log_ident,
self.neighbour_infos,
neighbour_infos
);
}
if self.their_keys != their_keys {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different their_keys:\n{:?},\n{:?}",
log_ident,
self.their_keys,
their_keys
);
}
if self.their_knowledge != their_knowledge {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different their_knowledge:\n{:?},\n{:?}",
log_ident,
self.their_knowledge,
their_knowledge
);
}
if self.their_recent_keys != their_recent_keys {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info different their_recent_keys:\n{:?},\n{:?}",
log_ident,
self.their_recent_keys,
their_recent_keys
);
}
update_with_genesis_related_info_check_same(
log_ident,
"our_infos",
&self.our_infos,
&our_infos,
);
update_with_genesis_related_info_check_same(
log_ident,
"our_history",
&self.our_history,
&our_history,
);
update_with_genesis_related_info_check_same(
log_ident,
"our_members",
&self.our_members,
&our_members,
);
update_with_genesis_related_info_check_same(
log_ident,
"neighbour_infos",
&self.neighbour_infos,
&neighbour_infos,
);
update_with_genesis_related_info_check_same(
log_ident,
"their_keys",
&self.their_keys,
&their_keys,
);
update_with_genesis_related_info_check_same(
log_ident,
"their_knowledge",
&self.their_knowledge,
&their_knowledge,
);
update_with_genesis_related_info_check_same(
log_ident,
"their_recent_keys",
&self.their_recent_keys,
&their_recent_keys,
);
update_with_genesis_related_info_check_same(
log_ident,
"churn_event_backlog",
&self.churn_event_backlog,
&churn_event_backlog,
);
}
self.our_infos = our_infos;
self.our_history = our_history;
Expand All @@ -186,6 +186,7 @@ impl SharedState {
self.their_keys = their_keys;
self.their_knowledge = their_knowledge;
self.their_recent_keys = their_recent_keys;
self.churn_event_backlog = churn_event_backlog;

Ok(())
}
Expand All @@ -199,6 +200,7 @@ impl SharedState {
&self.their_keys,
&self.their_knowledge,
&self.their_recent_keys,
&self.churn_event_backlog,
))?)
}

Expand Down Expand Up @@ -420,6 +422,26 @@ impl SharedState {
}
}

fn update_with_genesis_related_info_check_same<T>(
log_ident: &LogIdent,
id: &str,
self_info: &T,
to_use_info: &T,
) where
T: Eq + Debug,
{
if self_info != to_use_info {
log_or_panic!(
LogLevel::Error,
"{} - update_with_genesis_related_info_check_same different {}:\n{:?},\n{:?}",
id,
log_ident,
self_info,
to_use_info
);
}
}

/// The prefix-affecting change (split or merge) to our own section that is currently in progress.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum PrefixChange {
Expand Down
9 changes: 9 additions & 0 deletions src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,15 @@ impl Node {
self.chain().map(Chain::prefixes).unwrap_or_default()
}

/// Returns the elder info version of a section with the given prefix.
/// Prefix must be either our prefix or of one of our neighbours. 0 otherwise.
pub fn section_elder_info_version(&self, prefix: &Prefix<XorName>) -> u64 {
self.chain()
.and_then(|chain| chain.get_section(prefix))
.map(|info| *info.version())
.unwrap_or_default()
}

/// Returns the elder of a section with the given prefix.
/// Prefix must be either our prefix or of one of our neighbours. Returns empty set otherwise.
pub fn section_elders(&self, prefix: &Prefix<XorName>) -> BTreeSet<XorName> {
Expand Down
3 changes: 2 additions & 1 deletion src/pause.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
NetworkEvent, NetworkService,
};
use crossbeam_channel as mpmc;
use std::collections::VecDeque;

/// A type that wraps the internal state of a node while it is paused in order to be upgraded and/or
/// restarted. A value of this type is obtained by pausing a node and can be then used to resume
Expand All @@ -31,7 +32,7 @@ pub struct PausedState {
pub(super) full_id: FullId,
pub(super) gen_pfx_info: GenesisPfxInfo,
pub(super) msg_filter: RoutingMessageFilter,
pub(super) msg_queue: Vec<SignedRoutingMessage>,
pub(super) msg_queue: VecDeque<SignedRoutingMessage>,
// TODO: instead of storing both network_service and network_rx, store only the network config.
pub(super) network_service: NetworkService,
pub(super) network_rx: Option<mpmc::Receiver<NetworkEvent>>,
Expand Down
8 changes: 8 additions & 0 deletions src/routing_table/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ impl<N: Xorable + Clone + Copy + Binary + Default> Authority<N> {
Authority::PrefixSection(prefix) => prefix.lower_bound(),
}
}

/// Returns if the authority is compatible with that prefix
pub fn is_compatible(&self, other_prefix: &Prefix<N>) -> bool {
match self {
Authority::Section(name) | Authority::Node(name) => other_prefix.matches(name),
Authority::PrefixSection(prefix) => other_prefix.is_compatible(prefix),
}
}
}

impl Authority<XorName> {
Expand Down
9 changes: 7 additions & 2 deletions src/signature_accumulator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;

/// Time (in seconds) within which a message and a quorum of signatures need to arrive to
/// accumulate.
pub const ACCUMULATION_TIMEOUT: Duration = Duration::from_secs(30);
pub const ACCUMULATION_TIMEOUT: Duration = Duration::from_secs(120);

#[derive(Default)]
pub struct SignatureAccumulator {
Expand Down Expand Up @@ -52,7 +52,12 @@ impl SignatureAccumulator {
.map(|(hash, _)| *hash)
.collect_vec();
for hash in expired_msgs {
let _ = self.msgs.remove(&hash);
if let Some((Some(existing_msg), clock)) = self.msgs.remove(&hash) {
error!(
"Remove unaccumulated expired message clock {:?}, msg {:?}",
clock, existing_msg,
);
}
}
}

Expand Down