Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: vote DKG non_participants off
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi authored and dirvine committed Apr 21, 2021
1 parent 2121dc4 commit c4d6067
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 50 deletions.
92 changes: 68 additions & 24 deletions src/agreement/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ use itertools::Itertools;
use serde::{Deserialize, Serialize};
use sn_messaging::DstLocation;
use std::{
collections::{HashMap, VecDeque},
collections::{BTreeSet, HashMap, VecDeque},
fmt::{self, Debug, Formatter},
iter, mem,
net::SocketAddr,
time::Duration,
};
use tiny_keccak::{Hasher, Sha3};
use xor_name::XorName;

// Interval to progress DKG timed phase
const DKG_PROGRESS_INTERVAL: Duration = Duration::from_secs(30);
Expand Down Expand Up @@ -218,11 +219,12 @@ impl DkgVoter {
pub fn process_failure(
&mut self,
dkg_key: &DkgKey,
non_participants: &BTreeSet<XorName>,
proof: DkgFailureProof,
) -> Option<DkgCommand> {
self.sessions
.get_mut(dkg_key)?
.process_failure(dkg_key, proof)
.process_failure(dkg_key, non_participants, proof)
}
}

Expand Down Expand Up @@ -319,7 +321,7 @@ impl Session {
}
Err(error) => {
trace!("DKG for {} failed: {}", self.elders_info, error);
self.report_failure(dkg_key, keypair)
self.report_failure(dkg_key, BTreeSet::new(), keypair)
}
}
}
Expand Down Expand Up @@ -348,7 +350,20 @@ impl Session {
participants.iter().format(", ")
);

return self.report_failure(dkg_key, keypair);
let non_participants: BTreeSet<_> = self
.elders_info
.elders
.keys()
.filter_map(|elder| {
if !participants.contains(elder) {
Some(*elder)
} else {
None
}
})
.collect();

return self.report_failure(dkg_key, non_participants, keypair);
}

// Corrupted DKG outcome. This can happen when a DKG session is restarted using the same set
Expand All @@ -361,7 +376,7 @@ impl Session {
!= outcome.secret_key_share.public_key_share()
{
trace!("DKG for {} failed: corrupted outcome", self.elders_info);
return self.report_failure(dkg_key, keypair);
return self.report_failure(dkg_key, BTreeSet::new(), keypair);
}

trace!(
Expand All @@ -384,10 +399,15 @@ impl Session {
}]
}

fn report_failure(&mut self, dkg_key: &DkgKey, keypair: &Keypair) -> Vec<DkgCommand> {
let proof = DkgFailureProof::new(keypair, dkg_key);
fn report_failure(
&mut self,
dkg_key: &DkgKey,
non_participants: BTreeSet<XorName>,
keypair: &Keypair,
) -> Vec<DkgCommand> {
let proof = DkgFailureProof::new(keypair, &non_participants, dkg_key);

if !self.failures.insert(proof) {
if !self.failures.insert(proof, &non_participants) {
return vec![];
}

Expand All @@ -397,11 +417,17 @@ impl Session {
recipients: self.recipients(),
dkg_key: *dkg_key,
proof,
non_participants,
}))
.collect()
}

fn process_failure(&mut self, dkg_key: &DkgKey, proof: DkgFailureProof) -> Option<DkgCommand> {
fn process_failure(
&mut self,
dkg_key: &DkgKey,
non_participants: &BTreeSet<XorName>,
proof: DkgFailureProof,
) -> Option<DkgCommand> {
if !self
.elders_info
.elders
Expand All @@ -410,11 +436,11 @@ impl Session {
return None;
}

if !proof.verify(dkg_key) {
if !proof.verify(dkg_key, non_participants) {
return None;
}

if !self.failures.insert(proof) {
if !self.failures.insert(proof, non_participants) {
return None;
}

Expand Down Expand Up @@ -449,32 +475,38 @@ pub(crate) struct DkgFailureProof {
}

impl DkgFailureProof {
fn new(keypair: &Keypair, dkg_key: &DkgKey) -> Self {
fn new(keypair: &Keypair, non_participants: &BTreeSet<XorName>, dkg_key: &DkgKey) -> Self {
Self {
public_key: keypair.public,
signature: crypto::sign(&failure_proof_hash(dkg_key), keypair),
signature: crypto::sign(&failure_proof_hash(dkg_key, non_participants), keypair),
}
}

fn verify(&self, dkg_key: &DkgKey) -> bool {
let hash = failure_proof_hash(dkg_key);
fn verify(&self, dkg_key: &DkgKey, non_participants: &BTreeSet<XorName>) -> bool {
let hash = failure_proof_hash(dkg_key, non_participants);
self.public_key.verify(&hash, &self.signature).is_ok()
}
}

#[derive(Default, Clone, Eq, PartialEq, Debug, Serialize, Deserialize)]
pub(crate) struct DkgFailureProofSet(Vec<DkgFailureProof>);
pub(crate) struct DkgFailureProofSet {
proofs: Vec<DkgFailureProof>,
pub non_participants: BTreeSet<XorName>,
}

impl DkgFailureProofSet {
// Insert a proof into this set. The proof is assumed valid. Returns `true` if the proof was
// not already present in the set and `false` otherwise.
fn insert(&mut self, proof: DkgFailureProof) -> bool {
fn insert(&mut self, proof: DkgFailureProof, non_participants: &BTreeSet<XorName>) -> bool {
if self.non_participants.is_empty() {
self.non_participants = non_participants.clone();
}
if self
.0
.proofs
.iter()
.all(|existing_proof| existing_proof.public_key != proof.public_key)
{
self.0.push(proof);
self.proofs.push(proof);
true
} else {
false
Expand All @@ -484,13 +516,16 @@ impl DkgFailureProofSet {
// Check whether we have enough proofs to reach agreement on the failure. The contained proofs
// are assumed valid.
fn has_agreement(&self, elders_info: &EldersInfo) -> bool {
has_failure_agreement(elders_info.elders.len(), self.0.len())
has_failure_agreement(elders_info.elders.len(), self.proofs.len())
}

pub fn verify(&self, elders_info: &EldersInfo, generation: u64) -> bool {
let hash = failure_proof_hash(&DkgKey::new(elders_info, generation));
let hash = failure_proof_hash(
&DkgKey::new(elders_info, generation),
&self.non_participants,
);
let votes = self
.0
.proofs
.iter()
.filter(|proof| {
elders_info
Expand All @@ -513,11 +548,14 @@ fn has_failure_agreement(num_participants: usize, num_votes: usize) -> bool {

// Create a value whose signature serves as the proof that a failure of a DKG session with the given
// `dkg_key` was observed.
fn failure_proof_hash(dkg_key: &DkgKey) -> Digest256 {
fn failure_proof_hash(dkg_key: &DkgKey, non_participants: &BTreeSet<XorName>) -> Digest256 {
let mut hasher = Sha3::v256();
let mut hash = Digest256::default();
hasher.update(&dkg_key.hash);
hasher.update(&dkg_key.generation.to_le_bytes());
for name in non_participants.iter() {
hasher.update(&name.0);
}
hasher.update(b"failure");
hasher.finalize(&mut hash);
hash
Expand Down Expand Up @@ -580,6 +618,7 @@ pub(crate) enum DkgCommand {
recipients: Vec<SocketAddr>,
dkg_key: DkgKey,
proof: DkgFailureProof,
non_participants: BTreeSet<XorName>,
},
HandleFailureAgreement(DkgFailureProofSet),
}
Expand Down Expand Up @@ -615,8 +654,13 @@ impl DkgCommand {
recipients,
dkg_key,
proof,
non_participants,
} => {
let variant = Variant::DkgFailureObservation { dkg_key, proof };
let variant = Variant::DkgFailureObservation {
dkg_key,
proof,
non_participants,
};
let message = Message::single_src(node, DstLocation::Direct, variant, None, None)?;

Ok(Command::send_message_to_nodes(
Expand Down
10 changes: 8 additions & 2 deletions src/messages/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hex_fmt::HexFmt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
collections::{BTreeSet, VecDeque},
fmt::{self, Debug, Formatter},
};
use xor_name::XorName;
Expand Down Expand Up @@ -98,6 +98,7 @@ pub(crate) enum Variant {
DkgFailureObservation {
dkg_key: DkgKey,
proof: DkgFailureProof,
non_participants: BTreeSet<XorName>,
},
/// Sent to the current elders by the DKG participants when at least majority of them observe
/// a DKG failure.
Expand Down Expand Up @@ -227,10 +228,15 @@ impl Debug for Variant {
.field("dkg_key", &dkg_key)
.field("message", message)
.finish(),
Self::DkgFailureObservation { dkg_key, proof } => f
Self::DkgFailureObservation {
dkg_key,
proof,
non_participants,
} => f
.debug_struct("DkgFailureObservation")
.field("dkg_key", dkg_key)
.field("proof", proof)
.field("non_participants", non_participants)
.finish(),
Self::DkgFailureAgreement(proofs) => {
f.debug_tuple("DkgFailureAgreement").field(proofs).finish()
Expand Down
69 changes: 45 additions & 24 deletions src/routing/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -492,23 +492,29 @@ impl Core {
}

pub fn propose_offline(&self, name: XorName) -> Result<Vec<Command>> {
if let Some(info) = self.section.members().get(&name) {
let info = info.clone().leave()?;

// Don't send the `Offline` proposal to the peer being lost as that send would fail,
// triggering a chain of further `Offline` proposals.
let elders: Vec<_> = self
.section
.elders_info()
.peers()
.filter(|peer| peer.name() != info.peer.name())
.copied()
.collect();
self.cast_offline_proposals(&iter::once(name).collect())
}

self.send_proposal(&elders, Proposal::Offline(info))
} else {
Ok(vec![])
fn cast_offline_proposals(&self, names: &BTreeSet<XorName>) -> Result<Vec<Command>> {
// Don't send the `Offline` proposal to the peer being lost as that send would fail,
// triggering a chain of further `Offline` proposals.
let elders: Vec<_> = self
.section
.elders_info()
.peers()
.filter(|peer| !names.contains(peer.name()))
.copied()
.collect();
let mut result: Vec<Command> = Vec::new();
for name in names.iter() {
if let Some(info) = self.section.members().get(name) {
let info = info.clone().leave()?;
if let Ok(commands) = self.send_proposal(&elders, Proposal::Offline(info)) {
result.extend(commands);
}
}
}
Ok(result)
}

pub fn handle_dkg_outcome(
Expand Down Expand Up @@ -763,9 +769,11 @@ impl Core {
Variant::DkgMessage { dkg_key, message } => {
self.handle_dkg_message(*dkg_key, message.clone(), msg.src().name())
}
Variant::DkgFailureObservation { dkg_key, proof } => {
self.handle_dkg_failure_observation(*dkg_key, *proof)
}
Variant::DkgFailureObservation {
dkg_key,
proof,
non_participants,
} => self.handle_dkg_failure_observation(*dkg_key, non_participants, *proof),
Variant::DkgFailureAgreement(proofs) => {
self.handle_dkg_failure_agreement(&msg.src().name(), proofs)
}
Expand Down Expand Up @@ -1431,10 +1439,11 @@ impl Core {
fn handle_dkg_failure_observation(
&mut self,
dkg_key: DkgKey,
non_participants: &BTreeSet<XorName>,
proof: DkgFailureProof,
) -> Result<Vec<Command>> {
self.dkg_voter
.process_failure(&dkg_key, proof)
.process_failure(&dkg_key, non_participants, proof)
.into_commands(&self.node)
}

Expand Down Expand Up @@ -1463,12 +1472,24 @@ impl Core {
return Ok(vec![]);
};

trace!(
"Received DKG failure agreement - restarting: {}",
elders_info
);
if proofs.non_participants.is_empty() {
// The DKG failure is a corrupted one due to lagging.
trace!(
"Received DKG failure agreement of corrupted result - restarting: {}",
elders_info
);

self.send_dkg_start_to(elders_info, slice::from_ref(sender))
self.send_dkg_start_to(elders_info, slice::from_ref(sender))
} else {
// The DKG failure is regarding non_participants, i.e. potential unresponsive node.
trace!(
"Received DKG failure agreement of non_participants {:?} , DKG generation({}) {:?}",
proofs.non_participants,
generation,
elders_info
);
self.cast_offline_proposals(&proofs.non_participants)
}
}

fn handle_connectivity_complaint(
Expand Down

0 comments on commit c4d6067

Please sign in to comment.