Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(dkg): avoid mixing DKG messages from different generations
Browse files Browse the repository at this point in the history
Make generation part of the DkgKey, so that DKGs that have the same participants but different generations don't get mixed up.
  • Loading branch information
madadam committed Mar 4, 2021
1 parent 92dfc70 commit e68ba2a
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 127 deletions.
86 changes: 34 additions & 52 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,17 @@ const DKG_PROGRESS_INTERVAL: Duration = Duration::from_secs(30);
const BACKLOG_CAPACITY: usize = 100;

/// Unique identified of a DKG session.
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct DkgKey(Digest256);
#[derive(Copy, Clone, Eq, PartialEq, Hash, Serialize, Deserialize)]
pub struct DkgKey {
hash: Digest256,
generation: u64,
}

impl DkgKey {
pub fn new(elders_info: &EldersInfo) -> Self {
pub fn new(elders_info: &EldersInfo, generation: u64) -> Self {
// Calculate the hash without involving serialization to avoid having to return `Result`.
let mut hasher = Sha3::v256();
let mut output = Digest256::default();
let mut hash = Digest256::default();

for peer in elders_info.elders.values() {
hasher.update(&peer.name().0);
Expand All @@ -52,15 +55,15 @@ impl DkgKey {

hasher.update(&elders_info.prefix.name().0);
hasher.update(&elders_info.prefix.bit_count().to_le_bytes());
hasher.finalize(&mut output);
hasher.finalize(&mut hash);

Self(output)
Self { hash, generation }
}
}

impl Debug for DkgKey {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "DkgKey({:10})", HexFmt(&self.0))
write!(f, "DkgKey({:10}/{})", HexFmt(&self.hash), self.generation)
}
}

Expand Down Expand Up @@ -105,13 +108,10 @@ impl DkgVoter {
keypair: &Keypair,
dkg_key: DkgKey,
elders_info: EldersInfo,
key_index: u64,
) -> Vec<DkgCommand> {
if let Some(session) = self.sessions.get(&dkg_key) {
if !session.complete && session.key_index >= key_index {
trace!("DKG for {} already in progress", elders_info);
return vec![];
}
if self.sessions.contains_key(&dkg_key) {
trace!("DKG for {} already in progress", elders_info);
return vec![];
}

let name = crypto::name(&keypair.public);
Expand Down Expand Up @@ -154,7 +154,6 @@ impl DkgVoter {
let mut session = Session {
key_gen,
elders_info,
key_index,
participant_index,
timer_token: 0,
failures: Default::default(),
Expand All @@ -172,7 +171,7 @@ impl DkgVoter {

let _ = self.sessions.insert(dkg_key, session);
self.sessions
.retain(|_, session| session.key_index >= key_index);
.retain(|old_dkg_key, _| old_dkg_key.generation >= dkg_key.generation);

commands
}
Expand Down Expand Up @@ -226,9 +225,6 @@ impl DkgVoter {
// Data for a DKG participant.
struct Session {
elders_info: EldersInfo,
// Section chain index of the key to be generated.
key_index: u64,
// Our participant index.
participant_index: usize,
key_gen: KeyGen,
timer_token: u64,
Expand Down Expand Up @@ -411,10 +407,9 @@ impl Session {
if self.failures.has_agreement(&self.elders_info) {
self.complete = true;

Some(DkgCommand::HandleFailureAgreement {
elders_info: self.elders_info.clone(),
proofs: mem::take(&mut self.failures),
})
Some(DkgCommand::HandleFailureAgreement(mem::take(
&mut self.failures,
)))
} else {
None
}
Expand Down Expand Up @@ -474,8 +469,8 @@ impl DkgFailureProofSet {
self.0.len() >= majority(elders_info.elders.len())
}

pub fn verify(&self, elders_info: &EldersInfo) -> bool {
let hash = failure_proof_hash(&DkgKey::new(elders_info));
pub fn verify(&self, elders_info: &EldersInfo, generation: u64) -> bool {
let hash = failure_proof_hash(&DkgKey::new(elders_info, generation));
let votes = self
.0
.iter()
Expand All @@ -494,17 +489,13 @@ impl DkgFailureProofSet {
// Create a value whose signature serves as the proof that a failure of a DKG session with the given
// `dkg_key` was observed.
fn failure_proof_hash(dkg_key: &DkgKey) -> Digest256 {
// Scramble the bytes by XOR-ing them with this, so that the signature of the resulting digest
// is different from a signature of just the `dkg_key`.
const SCRAMBLE: &[u8] = b"failure";

let mut output = dkg_key.0;

for (o, s) in output.iter_mut().zip(SCRAMBLE.iter().cycle()) {
*o ^= s;
}

output
let mut hasher = Sha3::v256();
let mut hash = Digest256::default();
hasher.update(&dkg_key.hash);
hasher.update(&dkg_key.generation.to_le_bytes());
hasher.update(b"failure");
hasher.finalize(&mut hash);
hash
}

struct Backlog(VecDeque<(DkgKey, DkgMessage)>);
Expand Down Expand Up @@ -560,10 +551,7 @@ pub(crate) enum DkgCommand {
dkg_key: DkgKey,
proof: DkgFailureProof,
},
HandleFailureAgreement {
elders_info: EldersInfo,
proofs: DkgFailureProofSet,
},
HandleFailureAgreement(DkgFailureProofSet),
}

impl DkgCommand {
Expand Down Expand Up @@ -607,13 +595,7 @@ impl DkgCommand {
message.to_bytes(),
))
}
Self::HandleFailureAgreement {
elders_info,
proofs,
} => Ok(Command::HandleDkgFailure {
elders_info,
proofs,
}),
Self::HandleFailureAgreement(proofs) => Ok(Command::HandleDkgFailure(proofs)),
}
}
}
Expand Down Expand Up @@ -662,8 +644,8 @@ mod tests {
let elders_info0 = EldersInfo::new(iter::once(peer0), Prefix::default());
let elders_info1 = EldersInfo::new(iter::once(peer1), Prefix::default());

let key0 = DkgKey::new(&elders_info0);
let key1 = DkgKey::new(&elders_info1);
let key0 = DkgKey::new(&elders_info0, 0);
let key1 = DkgKey::new(&elders_info1, 0);

assert_ne!(key0, key1);
}
Expand All @@ -676,9 +658,9 @@ mod tests {

let node = Node::new(crypto::gen_keypair(), gen_addr());
let elders_info = EldersInfo::new(iter::once(node.peer()), Prefix::default());
let dkg_key = DkgKey::new(&elders_info);
let dkg_key = DkgKey::new(&elders_info, 0);

let commands = voter.start(&node.keypair, dkg_key, elders_info, 0);
let commands = voter.start(&node.keypair, dkg_key, elders_info);
assert_matches!(&commands[..], &[DkgCommand::HandleOutcome { .. }]);
}

Expand All @@ -698,7 +680,7 @@ mod tests {
let mut messages = Vec::new();

let elders_info = EldersInfo::new(nodes.iter().map(Node::peer), Prefix::default());
let dkg_key = DkgKey::new(&elders_info);
let dkg_key = DkgKey::new(&elders_info, 0);

let mut actors: HashMap<_, _> = nodes
.into_iter()
Expand All @@ -708,7 +690,7 @@ mod tests {
for actor in actors.values_mut() {
let commands = actor
.voter
.start(&actor.node.keypair, dkg_key, elders_info.clone(), 0);
.start(&actor.node.keypair, dkg_key, elders_info.clone());

for command in commands {
messages.extend(actor.handle(command, &dkg_key))
Expand Down
22 changes: 4 additions & 18 deletions src/messages/variant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,6 @@ pub(crate) enum Variant {
dkg_key: DkgKey,
/// The DKG particpants.
elders_info: EldersInfo,
/// A DKG "generation". A DKG with higher generation supersedes a DKG with lower but DKGs
/// with the same generation need to executed in parallel as we can't tell which one "wins"
/// before they complete.
generation: u64,
},
/// Message exchanged for DKG process.
DKGMessage {
Expand All @@ -103,10 +99,7 @@ pub(crate) enum Variant {
},
/// Sent to the current elders by the DKG participants when at least majority of them observe
/// a DKG failure.
DKGFailureAgreement {
elders_info: EldersInfo,
proofs: DkgFailureProofSet,
},
DKGFailureAgreement(DkgFailureProofSet),
/// Message containing a single `Vote` to be accumulated in the vote accumulator.
Vote {
content: Vote,
Expand Down Expand Up @@ -217,12 +210,10 @@ impl Debug for Variant {
Self::DKGStart {
dkg_key,
elders_info,
generation,
} => f
.debug_struct("DKGStart")
.field("dkg_key", dkg_key)
.field("elders_info", elders_info)
.field("generation", generation)
.finish(),
Self::DKGMessage { dkg_key, message } => f
.debug_struct("DKGMessage")
Expand All @@ -234,14 +225,9 @@ impl Debug for Variant {
.field("dkg_key", dkg_key)
.field("proof", proof)
.finish(),
Self::DKGFailureAgreement {
elders_info,
proofs,
} => f
.debug_struct("DKGFailureAgreement")
.field("elders_info", elders_info)
.field("proofs", proofs)
.finish(),
Self::DKGFailureAgreement(proofs) => {
f.debug_tuple("DKGFailureAgreement").field(proofs).finish()
}
Self::Vote {
content,
proof_share,
Expand Down
56 changes: 16 additions & 40 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -455,15 +455,8 @@ impl Approved {
result
}

pub fn handle_dkg_failure(
&mut self,
elders_info: EldersInfo,
proofs: DkgFailureProofSet,
) -> Result<Command> {
let variant = Variant::DKGFailureAgreement {
elders_info,
proofs,
};
pub fn handle_dkg_failure(&mut self, proofs: DkgFailureProofSet) -> Result<Command> {
let variant = Variant::DKGFailureAgreement(proofs);
let message = Message::single_src(&self.node, DstLocation::Direct, variant, None, None)?;
Ok(self.send_message_to_our_elders(message.to_bytes()))
}
Expand Down Expand Up @@ -696,22 +689,16 @@ impl Approved {
Variant::DKGStart {
dkg_key,
elders_info,
generation,
} => self.handle_dkg_start(*dkg_key, elders_info.clone(), *generation),
} => self.handle_dkg_start(*dkg_key, elders_info.clone()),
Variant::DKGMessage { dkg_key, message } => {
self.handle_dkg_message(*dkg_key, message.clone(), msg.src().to_node_name()?)
}
Variant::DKGFailureObservation { dkg_key, proof } => {
self.handle_dkg_failure_observation(*dkg_key, *proof)
}
Variant::DKGFailureAgreement {
elders_info,
proofs,
} => self.handle_dkg_failure_agreement(
&msg.src().to_node_name()?,
elders_info.clone(),
proofs,
),
Variant::DKGFailureAgreement(proofs) => {
self.handle_dkg_failure_agreement(&msg.src().to_node_name()?, proofs)
}
Variant::Vote {
content,
proof_share,
Expand Down Expand Up @@ -1355,11 +1342,10 @@ impl Approved {
&mut self,
dkg_key: DkgKey,
new_elders_info: EldersInfo,
generation: u64,
) -> Result<Vec<Command>> {
trace!("Received DKGStart for {}", new_elders_info);
self.dkg_voter
.start(&self.node.keypair, dkg_key, new_elders_info, generation)
.start(&self.node.keypair, dkg_key, new_elders_info)
.into_commands(&self.node)
}

Expand Down Expand Up @@ -1389,7 +1375,6 @@ impl Approved {
fn handle_dkg_failure_agreement(
&self,
sender: &XorName,
elders_info: EldersInfo,
proofs: &DkgFailureProofSet,
) -> Result<Vec<Command>> {
let sender = &self
Expand All @@ -1399,25 +1384,17 @@ impl Approved {
.ok_or(Error::InvalidSrcLocation)?
.peer;

if !proofs.verify(&elders_info) {
error!(
"Ignore DKG failure agreement with invalid proofs: {}",
elders_info
);
return Ok(vec![]);
}

if !self
let elders_info = self
.section
.promote_and_demote_elders(&self.node.name())
.contains(&elders_info)
{
trace!(
"Ignore DKG failure agreement for outdated participants: {}",
elders_info
);
.into_iter()
.find(|elders_info| proofs.verify(elders_info, self.section.chain().len() as u64));
let elders_info = if let Some(elders_info) = elders_info {
elders_info
} else {
trace!("Ignore DKG failure agreement with invalid proofs or outdated participants",);
return Ok(vec![]);
}
};

trace!(
"Received DKG failure agreement - restarting: {}",
Expand Down Expand Up @@ -2016,11 +1993,10 @@ impl Approved {
) -> Result<Vec<Command>> {
trace!("Send DKGStart for {} to {:?}", elders_info, recipients);

let dkg_key = DkgKey::new(&elders_info);
let dkg_key = DkgKey::new(&elders_info, self.section.chain().len() as u64);
let variant = Variant::DKGStart {
dkg_key,
elders_info,
generation: self.section.chain().len() as u64,
};
let vote = self.create_send_message_vote(DstLocation::Direct, variant, None)?;
self.send_vote(recipients, vote)
Expand Down
16 changes: 4 additions & 12 deletions src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,7 @@ pub(crate) enum Command {
outcome: SectionKeyShare,
},
/// Handle a DKG failure that was observed by a majority of the DKG participants.
HandleDkgFailure {
elders_info: EldersInfo,
proofs: DkgFailureProofSet,
},
HandleDkgFailure(DkgFailureProofSet),
/// Send a message to `delivery_group_size` peers out of the given `recipients`.
SendMessage {
recipients: Vec<SocketAddr>,
Expand Down Expand Up @@ -147,14 +144,9 @@ impl Debug for Command {
.field("elders_info", elders_info)
.field("outcome", &outcome.public_key_set.public_key())
.finish(),
Self::HandleDkgFailure {
elders_info,
proofs,
} => f
.debug_struct("HandleDkgFailure")
.field("elders_info", elders_info)
.field("proofs", proofs)
.finish(),
Self::HandleDkgFailure(proofs) => {
f.debug_tuple("HandleDkgFailure").field(proofs).finish()
}
Self::SendMessage {
recipients,
delivery_group_size,
Expand Down

0 comments on commit e68ba2a

Please sign in to comment.