Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: kill elder received too many connectivity complaints
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and S-Coyle committed Apr 14, 2021
1 parent c8fc00c commit cc9ca8a
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 2 deletions.
4 changes: 4 additions & 0 deletions src/messages/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::{
collections::VecDeque,
fmt::{self, Debug, Formatter},
};
use xor_name::XorName;

#[derive(Clone, Eq, PartialEq, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
Expand Down Expand Up @@ -113,6 +114,8 @@ pub(crate) enum Variant {
nonce: [u8; 32],
nonce_signature: Signature,
},
/// Direct complaint sent from an adult to elders regarding the connectivity issue of an elder.
ConnectivityComplaint(XorName),
}

impl Variant {
Expand Down Expand Up @@ -249,6 +252,7 @@ impl Debug for Variant {
.field("data_size", data_size)
.field("difficulty", difficulty)
.finish(),
Self::ConnectivityComplaint(name) => write!(f, "ConnectivityComplaint({:?})", name),
}
}
}
Expand Down
80 changes: 80 additions & 0 deletions src/routing/connectivity_complaints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2018 MaidSafe.net limited.
//
// This SAFE Network Software is licensed to you under The General Public License (GPL), version 3.
// Unless required by applicable law or agreed to in writing, the SAFE Network Software distributed
// under the GPL Licence is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use crate::{supermajority, ELDER_SIZE};
use lru_time_cache::{Entry, LruCache};
use std::{collections::BTreeSet, iter, time::Duration};
use xor_name::XorName;

const COMPLAINT_EXPIRY_DURATION: Duration = Duration::from_secs(60);

// Structure to retain connectivity complaints from adults against an elder.
// The flag indicates whehter an accumulation has been reported.
pub(crate) struct ConnectivityComplaints {
complaints: LruCache<XorName, (BTreeSet<XorName>, bool)>,
}

impl ConnectivityComplaints {
pub fn new() -> Self {
Self {
complaints: LruCache::with_expiry_duration_and_capacity(
COMPLAINT_EXPIRY_DURATION,
ELDER_SIZE,
),
}
}

// Received a complaint from an adult against an elder.
pub fn add_complaint(&mut self, adult_name: XorName, elder_name: XorName) {
match self.complaints.entry(elder_name) {
Entry::Vacant(entry) => {
let _ = entry.insert((iter::once(adult_name).collect(), false));
}
Entry::Occupied(entry) => {
let _ = entry.into_mut().0.insert(adult_name);
}
}
}

// Check whether an elder got too many complaints among weighing adults.
pub fn is_complained(
&mut self,
elder_name: XorName,
weighing_adults: &BTreeSet<XorName>,
) -> bool {
let threshold = supermajority(weighing_adults.len());
match self.complaints.entry(elder_name) {
Entry::Vacant(_) => false,
Entry::Occupied(entry) => {
let mut records = entry.into_mut();
if records.1 {
// Already complained, return with false to avoid duplication.
false
} else {
let complained_adults = records
.0
.iter()
.filter(|name| weighing_adults.contains(name))
.count();
if complained_adults >= threshold {
records.1 = true;
true
} else {
false
}
}
}
}
}
}

impl Default for ConnectivityComplaints {
fn default() -> Self {
Self::new()
}
}
62 changes: 60 additions & 2 deletions src/routing/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
// permissions and limitations relating to use of the SAFE Network Software.

use super::{
connectivity_complaints::ConnectivityComplaints,
enduser_registry::{EndUserRegistry, SocketId},
lazy_messaging,
split_barrier::SplitBarrier,
Expand Down Expand Up @@ -55,6 +56,7 @@ use sn_messaging::{
};
use std::{
cmp::{self, Ordering},
collections::BTreeSet,
iter,
net::SocketAddr,
slice,
Expand Down Expand Up @@ -83,6 +85,7 @@ pub(crate) struct Core {
joins_allowed: bool,
resource_proof: ResourceProof,
end_users: EndUserRegistry,
connectivity_complaints: ConnectivityComplaints,
}

impl Core {
Expand Down Expand Up @@ -116,6 +119,7 @@ impl Core {
joins_allowed: true,
resource_proof: ResourceProof::new(RESOURCE_PROOF_DATA_SIZE, RESOURCE_PROOF_DIFFICULTY),
end_users: EndUserRegistry::new(),
connectivity_complaints: ConnectivityComplaints::new(),
}
}

Expand Down Expand Up @@ -458,9 +462,36 @@ impl Core {
};

if !self.is_elder() {
return Ok(vec![]);
// When self is not an elder, then the peer has to be an elder, and we shall complaint
// the lost to other elders.
let variant = Variant::ConnectivityComplaint(name);
let recipients: Vec<_> = self
.section
.elders_info()
.elders
.values()
.filter(|peer| *peer.name() != name)
.copied()
.collect();
trace!(
"Casting connectivity complaint against {:?} {:?}",
name,
recipients
);

return self.send_message_for_dst_accumulation(
self.node.name(),
DstLocation::Direct,
variant,
None,
&recipients,
);
}

self.propose_offline(name)
}

fn propose_offline(&self, name: XorName) -> Result<Vec<Command>> {
if let Some(info) = self.section.members().get(&name) {
let info = info.clone().leave()?;

Expand Down Expand Up @@ -581,7 +612,7 @@ impl Core {

fn decide_message_status(&self, msg: &Message) -> Result<MessageStatus> {
match msg.variant() {
Variant::OtherSection { .. } => {
Variant::OtherSection { .. } | Variant::ConnectivityComplaint(_) => {
if !self.is_elder() {
return Ok(MessageStatus::Unknown);
}
Expand Down Expand Up @@ -752,6 +783,9 @@ impl Core {
commands.extend(result?);
Ok(commands)
}
Variant::ConnectivityComplaint(elder_name) => {
self.handle_connectivity_complaint(msg.src().name(), *elder_name)
}
Variant::NodeApproval { .. }
| Variant::JoinRetry { .. }
| Variant::ResourceChallenge { .. } => {
Expand Down Expand Up @@ -1437,6 +1471,30 @@ impl Core {
self.send_dkg_start_to(elders_info, slice::from_ref(sender))
}

fn handle_connectivity_complaint(
&mut self,
sender: XorName,
elder_name: XorName,
) -> Result<Vec<Command>> {
self.connectivity_complaints
.add_complaint(sender, elder_name);

let weighing_adults: BTreeSet<XorName> = self
.section
.members()
.joined()
.map(|info| *info.peer.name())
.collect();
if self
.connectivity_complaints
.is_complained(elder_name, &weighing_adults)
{
self.propose_offline(elder_name)
} else {
Ok(vec![])
}
}

// Generate a new section info based on the current set of members and if it differs from the
// current elders, trigger a DKG.
fn promote_and_demote_elders(&mut self) -> Result<Vec<Command>> {
Expand Down
1 change: 1 addition & 0 deletions src/routing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod command;

mod bootstrap;
mod comm;
mod connectivity_complaints;
mod core;
mod dispatcher;
mod enduser_registry;
Expand Down

0 comments on commit cc9ca8a

Please sign in to comment.