Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
feat: support multiple concurrent DKGs
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Jan 18, 2021
1 parent 476c3e8 commit 98fc101
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 49 deletions.
97 changes: 50 additions & 47 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hex_fmt::HexFmt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{
collections::VecDeque,
collections::{HashMap, VecDeque},
fmt::{self, Debug, Formatter},
iter, mem,
net::SocketAddr,
Expand Down Expand Up @@ -81,7 +81,7 @@ impl Debug for DkgKey {
/// successfully. Some kind of disambiguation strategy needs to be employed in that case, but that
/// is currently not a responsibility of this module.
pub(crate) struct DkgVoter {
session: Option<Session>,
sessions: HashMap<DkgKey, Session>,

// Due to the asyncronous nature of the network we might sometimes receive a DKG message before
// we created the corresponding session. To avoid losing those messages, we store them in this
Expand All @@ -92,7 +92,7 @@ pub(crate) struct DkgVoter {
impl Default for DkgVoter {
fn default() -> Self {
Self {
session: None,
sessions: HashMap::default(),
backlog: Backlog::new(),
}
}
Expand All @@ -106,8 +106,8 @@ impl DkgVoter {
dkg_key: DkgKey,
elders_info: EldersInfo,
) -> Vec<DkgCommand> {
if let Some(session) = &self.session {
if session.dkg_key == dkg_key && !session.complete {
if let Some(session) = self.sessions.get(&dkg_key) {
if !session.complete {
trace!("DKG for {} already in progress", elders_info);
return vec![];
}
Expand Down Expand Up @@ -151,7 +151,6 @@ impl DkgVoter {
trace!("DKG for {} starting", elders_info);

let mut session = Session {
dkg_key,
key_gen,
elders_info,
index,
Expand All @@ -161,15 +160,15 @@ impl DkgVoter {
};

let mut commands = vec![];
commands.extend(session.broadcast(keypair, message));
commands.extend(session.broadcast(&dkg_key, keypair, message));
commands.extend(
self.backlog
.take(&dkg_key)
.into_iter()
.flat_map(|message| session.process_message(keypair, message)),
.flat_map(|message| session.process_message(&dkg_key, keypair, message)),
);

self.session = Some(session);
let _ = self.sessions.insert(dkg_key, session);

commands
}
Expand All @@ -183,8 +182,12 @@ impl DkgVoter {

// Make key generator progress with timed phase.
pub fn handle_timeout(&mut self, keypair: &Keypair, timer_token: u64) -> Vec<DkgCommand> {
if let Some(session) = self.session.as_mut() {
session.handle_timeout(keypair, timer_token)
if let Some((dkg_key, session)) = self
.sessions
.iter_mut()
.find(|(_, session)| session.timer_token == timer_token)
{
session.handle_timeout(dkg_key, keypair)
} else {
vec![]
}
Expand All @@ -194,30 +197,25 @@ impl DkgVoter {
pub fn process_message(
&mut self,
keypair: &Keypair,
dkg_key: DkgKey,
dkg_key: &DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
if let Some(session) = self
.session
.as_mut()
.filter(|session| session.dkg_key == dkg_key)
{
session.process_message(keypair, message)
if let Some(session) = self.sessions.get_mut(dkg_key) {
session.process_message(dkg_key, keypair, message)
} else {
self.backlog.push(dkg_key, message);
self.backlog.push(*dkg_key, message);
vec![]
}
}

pub fn process_failure(
&mut self,
dkg_key: DkgKey,
dkg_key: &DkgKey,
proof: DkgFailureProof,
) -> Option<DkgCommand> {
self.session
.as_mut()
.filter(|session| session.dkg_key == dkg_key)?
.process_failure(proof)
self.sessions
.get_mut(dkg_key)?
.process_failure(dkg_key, proof)
}
}

Expand All @@ -226,7 +224,6 @@ struct Session {
elders_info: EldersInfo,
// Our participant index.
index: usize,
dkg_key: DkgKey,
key_gen: KeyGen,
timer_token: u64,
failures: DkgFailureProofSet,
Expand All @@ -237,7 +234,12 @@ struct Session {
}

impl Session {
fn process_message(&mut self, keypair: &Keypair, message: DkgMessage) -> Vec<DkgCommand> {
fn process_message(
&mut self,
dkg_key: &DkgKey,
keypair: &Keypair,
message: DkgMessage,
) -> Vec<DkgCommand> {
trace!("process DKG message {:?}", message);
let responses = self
.key_gen
Expand All @@ -253,10 +255,10 @@ impl Session {

let mut commands: Vec<_> = responses
.into_iter()
.flat_map(|response| self.broadcast(keypair, response))
.flat_map(|response| self.broadcast(dkg_key, keypair, response))
.chain(reset_timer)
.collect();
commands.extend(self.check(keypair));
commands.extend(self.check(dkg_key, keypair));
commands
}

Expand All @@ -270,28 +272,29 @@ impl Session {
.collect()
}

fn broadcast(&mut self, keypair: &Keypair, message: DkgMessage) -> Vec<DkgCommand> {
fn broadcast(
&mut self,
dkg_key: &DkgKey,
keypair: &Keypair,
message: DkgMessage,
) -> Vec<DkgCommand> {
let mut commands = vec![];

let recipients = self.recipients();
if !recipients.is_empty() {
trace!("broadcasting DKG message {:?} to {:?}", message, recipients);
commands.push(DkgCommand::SendMessage {
recipients,
dkg_key: self.dkg_key,
dkg_key: *dkg_key,
message: message.clone(),
});
}

commands.extend(self.process_message(keypair, message));
commands.extend(self.process_message(dkg_key, keypair, message));
commands
}

fn handle_timeout(&mut self, keypair: &Keypair, timer_token: u64) -> Vec<DkgCommand> {
if self.timer_token != timer_token {
return vec![];
}

fn handle_timeout(&mut self, dkg_key: &DkgKey, keypair: &Keypair) -> Vec<DkgCommand> {
if self.complete {
return vec![];
}
Expand All @@ -302,21 +305,21 @@ impl Session {
Ok(messages) => {
let mut commands: Vec<_> = messages
.into_iter()
.flat_map(|message| self.broadcast(keypair, message))
.flat_map(|message| self.broadcast(dkg_key, keypair, message))
.collect();
commands.push(self.reset_timer());
commands.extend(self.check(keypair));
commands.extend(self.check(dkg_key, keypair));
commands
}
Err(error) => {
trace!("DKG for {} failed: {}", self.elders_info, error);
self.report_failure(keypair)
self.report_failure(dkg_key, keypair)
}
}
}

// Check whether a key generator is finalized to give a DKG outcome.
fn check(&mut self, keypair: &Keypair) -> Vec<DkgCommand> {
fn check(&mut self, dkg_key: &DkgKey, keypair: &Keypair) -> Vec<DkgCommand> {
if self.complete {
return vec![];
}
Expand Down Expand Up @@ -357,12 +360,12 @@ impl Session {
participants.iter().format(", ")
);

self.report_failure(keypair)
self.report_failure(dkg_key, keypair)
}
}

fn report_failure(&mut self, keypair: &Keypair) -> Vec<DkgCommand> {
let proof = DkgFailureProof::new(keypair, &self.dkg_key);
fn report_failure(&mut self, dkg_key: &DkgKey, keypair: &Keypair) -> Vec<DkgCommand> {
let proof = DkgFailureProof::new(keypair, dkg_key);

if !self.failures.insert(proof) {
return vec![];
Expand All @@ -372,13 +375,13 @@ impl Session {
.into_iter()
.chain(iter::once(DkgCommand::SendFailureObservation {
recipients: self.recipients(),
dkg_key: self.dkg_key,
dkg_key: *dkg_key,
proof,
}))
.collect()
}

fn process_failure(&mut self, proof: DkgFailureProof) -> Option<DkgCommand> {
fn process_failure(&mut self, dkg_key: &DkgKey, proof: DkgFailureProof) -> Option<DkgCommand> {
if !self
.elders_info
.elders
Expand All @@ -387,7 +390,7 @@ impl Session {
return None;
}

if !proof.verify(&self.dkg_key) {
if !proof.verify(dkg_key) {
return None;
}

Expand Down Expand Up @@ -726,7 +729,7 @@ mod tests {
let actor = actors.get_mut(&addr).expect("unknown message recipient");
let commands = actor
.voter
.process_message(&actor.node.keypair, dkg_key, message);
.process_message(&actor.node.keypair, &dkg_key, message);

for command in commands {
messages.extend(actor.handle(command, &dkg_key))
Expand Down
4 changes: 2 additions & 2 deletions src/routing/approved.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1113,7 +1113,7 @@ impl Approved {
trace!("handle DKG message {:?} from {}", message, sender);

self.dkg_voter
.process_message(&self.node.keypair, dkg_key, message)
.process_message(&self.node.keypair, &dkg_key, message)
.into_commands(&self.node)
}

Expand All @@ -1123,7 +1123,7 @@ impl Approved {
proof: DkgFailureProof,
) -> Result<Vec<Command>> {
self.dkg_voter
.process_failure(dkg_key, proof)
.process_failure(&dkg_key, proof)
.into_commands(&self.node)
}

Expand Down

0 comments on commit 98fc101

Please sign in to comment.