Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
Merge c97b453 into 579db9c
Browse files Browse the repository at this point in the history
  • Loading branch information
oetyng committed May 9, 2021
2 parents 579db9c + c97b453 commit 18f2eb0
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 67 deletions.
26 changes: 21 additions & 5 deletions examples/minimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,23 @@ async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
Event::MemberLeft { name, age } => {
info!("Node #{} member left - name: {}, age: {}", index, name, age);
}
Event::EldersChanged {
Event::SectionSplit {
elders,
sibling_elders,
self_status_change,
} => {
info!(
"Node #{} elders changed - prefix: {:b}, key: {:?}, sibling elders: {:?}, elders: {:?}, node elder status change: {:?}",
index, elders.prefix, elders.key, sibling_elders, elders.elders, self_status_change
"Node #{} section split - elders: {:?}, sibling elders: {:?}, node elder status change: {:?}",
index, elders, sibling_elders, self_status_change
);
}
Event::EldersChanged {
elders,
self_status_change,
} => {
info!(
"Node #{} elders changed - elders: {:?}, node elder status change: {:?}",
index, elders, self_status_change
);
}
Event::MessageReceived {
Expand Down Expand Up @@ -275,8 +284,15 @@ async fn handle_event(index: usize, node: &mut Routing, event: Event) -> bool {
index, user, msg
),
Event::ClientLost(addr) => info!("Node #{} received ClientLost({:?})", index, addr),
Event::AdultsChanged(adult_list) => {
info!("Node #{} received AdultsChanged({:?})", index, adult_list)
Event::AdultsChanged {
existing,
added,
removed,
} => {
info!(
"Node #{} adults changed - existing: {:?}, added: {:?}, removed: {:?}",
index, existing, added, removed
)
}
}

Expand Down
4 changes: 2 additions & 2 deletions examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,10 @@ impl Network {
{
*prefix = elders.prefix;

if elders.elders.contains(name) {
if elders.added.iter().any(|p| p.name() == name) {
*elder = Some(ElderState {
key: elders.key,
num_elders: elders.elders.len(),
num_elders: elders.existing.len() + elders.added.len(),
});
} else {
*elder = None;
Expand Down
54 changes: 44 additions & 10 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::section::SectionChain;
use crate::{peer::Peer, section::SectionChain};
use bls_signature_aggregator::Proof;
use bytes::Bytes;
use ed25519_dalek::Keypair;
Expand Down Expand Up @@ -34,14 +34,18 @@ pub enum NodeElderChange {
}

/// Bound name of elders and section_key, section_prefix info together.
#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub struct Elders {
/// The prefix of the section.
pub prefix: Prefix,
/// The BLS public key of a section.
pub key: bls::PublicKey,
/// The set of elders of a section.
pub elders: BTreeSet<XorName>,
/// Existing Elders in our section.
pub existing: BTreeSet<Peer>,
/// New Elders in our section.
pub added: BTreeSet<Peer>,
/// Removed Elders in our section.
pub removed: BTreeSet<Peer>,
}

/// An Event raised by a `Node` or `Client` via its event sender.
Expand Down Expand Up @@ -82,13 +86,19 @@ pub enum Event {
/// Age of the node
age: u8,
},
/// Our section has split.
SectionSplit {
/// The Elders of our section.
elders: Elders,
/// The Elders of the sibling section.
sibling_elders: Elders,
/// Promoted, demoted or no change?
self_status_change: NodeElderChange,
},
/// The set of elders in our section has changed.
EldersChanged {
/// The Elders of our section.
elders: Elders,
/// The Elders of the sibling section, if this event is fired during a split.
/// Otherwise `None`.
sibling_elders: Option<Elders>,
/// Promoted, demoted or no change?
self_status_change: NodeElderChange,
},
Expand Down Expand Up @@ -118,7 +128,14 @@ pub enum Event {
/// Failed in sending a message to client, or connection to client is lost
ClientLost(SocketAddr),
/// Notify the current list of adult nodes, in case of churning.
AdultsChanged(BTreeSet<XorName>),
AdultsChanged {
/// Existing Adults in our section.
existing: BTreeSet<Peer>,
/// New Adults in our section.
added: BTreeSet<Peer>,
/// Removed Adults in our section.
removed: BTreeSet<Peer>,
},
}

impl Debug for Event {
Expand Down Expand Up @@ -148,7 +165,7 @@ impl Debug for Event {
.field("name", name)
.field("age", age)
.finish(),
Self::EldersChanged {
Self::SectionSplit {
elders,
sibling_elders,
self_status_change,
Expand All @@ -158,6 +175,14 @@ impl Debug for Event {
.field("sibling_elders", sibling_elders)
.field("self_status_change", self_status_change)
.finish(),
Self::EldersChanged {
elders,
self_status_change,
} => formatter
.debug_struct("EldersChanged")
.field("elders", elders)
.field("self_status_change", self_status_change)
.finish(),
Self::RelocationStarted { previous_name } => formatter
.debug_struct("RelocationStarted")
.field("previous_name", previous_name)
Expand All @@ -177,7 +202,16 @@ impl Debug for Event {
msg, user,
),
Self::ClientLost(addr) => write!(formatter, "ClientLost({:?})", addr),
Self::AdultsChanged(adult_list) => write!(formatter, "AdultsChanged({:?})", adult_list),
Self::AdultsChanged {
existing,
added,
removed,
} => formatter
.debug_struct("AdultsChanged")
.field("existing", existing)
.field("added", added)
.field("removed", removed)
.finish(),
}
}
}
24 changes: 12 additions & 12 deletions src/routing/core/messaging/handling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,11 +448,7 @@ impl Core {
return Ok(vec![]);
}

let old_adults: BTreeSet<_> = self
.section
.live_adults()
.map(|peer| *peer.name())
.collect();
let old_adults: BTreeSet<_> = self.section.live_adults().copied().collect();

let snapshot = self.state_snapshot();
trace!(
Expand All @@ -464,13 +460,17 @@ impl Core {
self.network.merge(network, self.section.chain());

if !self.is_elder() {
let new_adults: BTreeSet<_> = self
.section
.live_adults()
.map(|peer| *peer.name())
.collect();
if old_adults != new_adults {
self.send_event(Event::AdultsChanged(new_adults));
let current_adults: BTreeSet<_> = self.section.live_adults().copied().collect();
let added = current_adults.difference(&old_adults).copied().collect();
let removed = old_adults.difference(&current_adults).copied().collect();
let existing = old_adults.intersection(&current_adults).copied().collect();

if old_adults != current_adults {
self.send_event(Event::AdultsChanged {
existing,
added,
removed,
});
}
}

Expand Down
72 changes: 46 additions & 26 deletions src/routing/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ use crate::{
messages::Message,
network::Network,
node::Node,
peer::Peer,
relocation::RelocateState,
section::{Section, SectionKeyShare, SectionKeysProvider},
};
use bls_signature_aggregator::SignatureAggregator;
use itertools::Itertools;
use resource_proof::ResourceProof;
use std::collections::BTreeSet;
use tokio::sync::mpsc;
use xor_name::{Prefix, XorName};

Expand Down Expand Up @@ -113,6 +115,7 @@ impl Core {
is_elder: self.is_elder(),
last_key: *self.section.chain().last_key(),
prefix: *self.section.prefix(),
elders: self.section().elders().copied().collect(),
}
}

Expand Down Expand Up @@ -152,20 +155,6 @@ impl Core {
commands.extend(self.send_sync(self.section.clone(), self.network.clone())?);
}

let sibling_elders = if new.prefix != old.prefix {
if let Some(sibling_key) = self.section_key(&new.prefix.sibling()) {
self.network.get(&new.prefix.sibling()).map(|info| Elders {
prefix: new.prefix.sibling(),
key: *sibling_key,
elders: info.elders.keys().copied().collect(),
})
} else {
None
}
} else {
None
};

let self_status_change = if !old.is_elder && new.is_elder {
info!("Promoted to elder");
NodeElderChange::Promoted
Expand All @@ -178,23 +167,53 @@ impl Core {
NodeElderChange::None
};

let current: BTreeSet<_> = self.section.elders().copied().collect();
let added = current.difference(&old.elders).copied().collect();
let removed = old.elders.difference(&current).copied().collect();
let existing = old.elders.intersection(&current).copied().collect();

let elders = Elders {
prefix: new.prefix,
key: new.last_key,
elders: self
.section
.authority_provider()
.elders
.keys()
.copied()
.collect(),
existing,
added,
removed,
};

let sibling_elders = if new.prefix != old.prefix {
if let Some(sibling_key) = self.section_key(&new.prefix.sibling()) {
self.network.get(&new.prefix.sibling()).map(|info| Elders {
prefix: new.prefix.sibling(),
key: *sibling_key,
existing: info
.elders
.iter()
.map(|(name, addr)| Peer::new(*name, *addr))
.collect(),
added: BTreeSet::new(),
removed: BTreeSet::new(),
})
} else {
None
}
} else {
None
};

let event = if let Some(sibling_elders) = sibling_elders {
Event::SectionSplit {
elders,
sibling_elders,
self_status_change,
}
} else {
Event::EldersChanged {
elders,
self_status_change,
}
};

self.send_event(Event::EldersChanged {
elders,
sibling_elders,
self_status_change,
});
self.send_event(event);
}

if !new.is_elder {
Expand Down Expand Up @@ -232,4 +251,5 @@ pub(crate) struct StateSnapshot {
is_elder: bool,
last_key: bls::PublicKey,
prefix: Prefix,
elders: BTreeSet<Peer>,
}
12 changes: 4 additions & 8 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use sn_messaging::{
section_info::{Error as TargetSectionError, Message as SectionInfoMsg},
DstLocation, EndUser, Itinerary, MessageType, WireMsg,
};
use std::{net::SocketAddr, sync::Arc};
use std::{collections::BTreeSet, net::SocketAddr, sync::Arc};
use tokio::{sync::mpsc, task};
use xor_name::{Prefix, XorName};

Expand Down Expand Up @@ -117,17 +117,13 @@ impl Routing {
let elders = Elders {
prefix: *section.prefix(),
key: *section.chain().last_key(),
elders: section
.authority_provider()
.elders
.keys()
.copied()
.collect(),
existing: BTreeSet::new(),
added: section.elders().copied().collect(),
removed: BTreeSet::new(),
};

state.send_event(Event::EldersChanged {
elders,
sibling_elders: None,
self_status_change: NodeElderChange::Promoted,
});

Expand Down
10 changes: 7 additions & 3 deletions src/routing/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ async fn handle_sync() -> Result<()> {
.chain(iter::once(new_peer)),
old_section_auth.prefix,
);
let new_elders: BTreeSet<_> = new_section_auth.elders.keys().copied().collect();
let new_section_elders: BTreeSet<_> = new_section_auth.elders.keys().copied().collect();
let proven_new_section_auth = proven(&sk2, new_section_auth)?;
let new_section = Section::new(pk0, chain, proven_new_section_auth)?;

Expand Down Expand Up @@ -1275,7 +1275,9 @@ async fn handle_sync() -> Result<()> {
event_rx.recv().await,
Some(Event::EldersChanged { elders, .. }) => {
assert_eq!(elders.key, pk2);
assert_eq!(elders.elders, new_elders);
assert!(elders.added.iter().all(|a| new_section_elders.contains(a.name())));
assert!(elders.existing.iter().all(|a| new_section_elders.contains(a.name())));
assert!(elders.removed.iter().all(|r| !new_section_elders.contains(r.name())));
}
);

Expand Down Expand Up @@ -1690,7 +1692,9 @@ async fn handle_elders_update() -> Result<()> {
event_rx.recv().await,
Some(Event::EldersChanged { elders, .. }) => {
assert_eq!(elders.key, pk1);
assert_eq!(elders.elders, elder_names1);
assert!(elders.added.iter().all(|a| elder_names1.contains(a.name())));
assert!(elders.existing.iter().all(|a| elder_names1.contains(a.name())));
assert!(elders.removed.iter().all(|r| !elder_names1.contains(r.name())));
}
);

Expand Down
7 changes: 7 additions & 0 deletions src/section/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ impl Section {
.map(|info| &info.peer)
}

/// Returns elders from our section.
pub fn elders(&self) -> impl Iterator<Item = &Peer> {
self.members
.mature()
.filter(move |peer| self.is_elder(peer.name()))
}

/// Returns adults from our section.
pub fn adults(&self) -> impl Iterator<Item = &Peer> {
self.members
Expand Down

0 comments on commit 18f2eb0

Please sign in to comment.