Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(dkg): detect corrupted DKG outcome
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Mar 4, 2021
1 parent e68ba2a commit ec53c63
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 30 deletions.
20 changes: 11 additions & 9 deletions examples/stress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// KIND, either express or implied. Please review the Licences for the specific language governing
// permissions and limitations relating to use of the SAFE Network Software.

use anyhow::{format_err, Error, Result};
use anyhow::{format_err, Context, Error, Result};
use bls_signature_aggregator::{ProofShare, SignatureAggregator};
use futures::{
future,
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn main() -> Result<()> {
let mut network = Network::new();

// Create the genesis node
network.create_node(event_tx.clone()).await?;
network.create_node(event_tx.clone()).await;

let mut churn_events = schedule.events();

Expand All @@ -140,7 +140,7 @@ async fn main() -> Result<()> {
event = churn_events.next() => {
match event {
Some(ChurnEvent::Join) => {
network.create_node(event_tx.clone()).await?
network.create_node(event_tx.clone()).await
}
Some(ChurnEvent::Drop) => {
network.remove_random_node()
Expand Down Expand Up @@ -211,7 +211,7 @@ impl Network {
}

// Create new node and let it join the network.
async fn create_node(&mut self, event_tx: UnboundedSender<Event>) -> Result<()> {
async fn create_node(&mut self, event_tx: UnboundedSender<Event>) {
let bootstrap_addrs = self.get_bootstrap_addrs();

let id = self.new_node_id();
Expand All @@ -231,8 +231,6 @@ impl Network {
let _ = task::spawn(add_node(id, config, event_tx));

self.try_print_status();

Ok(())
}

// Remove a random node where the probability of a node to be removed is inversely proportional
Expand Down Expand Up @@ -445,9 +443,13 @@ impl Network {
let bytes = bincode::serialize(&dst)?;
let signature_share = node
.sign_as_elder(&bytes, &public_key_set.public_key())
.await?;
.await
.context("failed to sign probe")?;

let index = node.our_index().await?;
let index = node
.our_index()
.await
.context("failed to retrieve key share index")?;

let message = ProbeMessage {
proof_share: ProofShare {
Expand All @@ -473,7 +475,7 @@ impl Network {
match node.send_message(itinerary, bytes).await {
Ok(()) => Ok(true),
Err(RoutingError::InvalidSrcLocation) => Ok(false), // node name changed
Err(error) => Err(error.into()),
Err(error) => Err(Error::from(error).context("failed to send probe")),
}
}

Expand Down
64 changes: 43 additions & 21 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,11 @@ impl DkgVoter {
);

let _ = self.sessions.insert(dkg_key, session);

// Remove uneeded old sessions.
self.sessions
.retain(|old_dkg_key, _| old_dkg_key.generation >= dkg_key.generation);
self.backlog.prune(&dkg_key);

commands
}
Expand Down Expand Up @@ -336,34 +339,48 @@ impl Session {
return vec![];
};

if participants.iter().eq(self.elders_info.elders.keys()) {
trace!(
"DKG for {} complete: {:?}",
self.elders_info,
outcome.public_key_set.public_key()
);

self.complete = true;

let outcome = SectionKeyShare {
public_key_set: outcome.public_key_set,
index: self.participant_index,
secret_key_share: outcome.secret_key_share,
};

vec![DkgCommand::HandleOutcome {
elders_info: self.elders_info.clone(),
outcome,
}]
} else {
// Less than 100% participation
if !participants.iter().eq(self.elders_info.elders.keys()) {
trace!(
"DKG for {} failed: unexpected participants: {:?}",
self.elders_info,
participants.iter().format(", ")
);

self.report_failure(dkg_key, keypair)
return self.report_failure(dkg_key, keypair);
}

// Corrupted DKG outcome. This can happen when a DKG session is restarted using the same set
// of participants and the same generation, but some of the participants are unaware of the
// restart (due to lag, etc...) and keep sending messages for the original session which
// then get mixed with the messages of the restarted session.
if outcome
.public_key_set
.public_key_share(self.participant_index)
!= outcome.secret_key_share.public_key_share()
{
trace!("DKG for {} failed: corrupted outcome", self.elders_info);
return self.report_failure(dkg_key, keypair);
}

trace!(
"DKG for {} complete: {:?}",
self.elders_info,
outcome.public_key_set.public_key()
);

self.complete = true;

let outcome = SectionKeyShare {
public_key_set: outcome.public_key_set,
index: self.participant_index,
secret_key_share: outcome.secret_key_share,
};

vec![DkgCommand::HandleOutcome {
elders_info: self.elders_info.clone(),
outcome,
}]
}

fn report_failure(&mut self, dkg_key: &DkgKey, keypair: &Keypair) -> Vec<DkgCommand> {
Expand Down Expand Up @@ -529,6 +546,11 @@ impl Backlog {

output
}

fn prune(&mut self, dkg_key: &DkgKey) {
self.0
.retain(|(old_dkg_key, _)| old_dkg_key.generation >= dkg_key.generation)
}
}

#[derive(Debug)]
Expand Down

0 comments on commit ec53c63

Please sign in to comment.