Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
fix(dkg): backlog messages with unknown DKG key
Browse files Browse the repository at this point in the history
  • Loading branch information
madadam committed Nov 23, 2020
1 parent d6be64f commit 03873c1
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 14 deletions.
79 changes: 66 additions & 13 deletions src/consensus/dkg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use hex_fmt::HexFmt;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::{
collections::{HashMap, HashSet},
collections::{HashMap, HashSet, VecDeque},
fmt::{self, Debug, Formatter},
net::SocketAddr,
time::Duration,
Expand All @@ -32,6 +32,8 @@ use xor_name::XorName;
// Interval to progress DKG timed phase
const DKG_PROGRESS_INTERVAL: Duration = Duration::from_secs(30);

const BACKLOG_CAPACITY: usize = 100;

/// Unique identified of a DKG session.
#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)]
pub struct DkgKey(Digest256);
Expand Down Expand Up @@ -88,13 +90,19 @@ impl Debug for DkgKey {
pub(crate) struct DkgVoter {
participant: Option<Participant>,
observers: HashMap<DkgKey, Observer>,

// Due to the asyncronous nature of the network we might sometimes receive a DKG message before
// we created the corresponding `Participant` session. To avoid losing those messages, we store
// them in this backlog and replay them once we create the session.
backlog: Backlog,
}

impl Default for DkgVoter {
fn default() -> Self {
Self {
participant: None,
observers: HashMap::new(),
backlog: Backlog::new(),
}
}
}
Expand Down Expand Up @@ -135,6 +143,13 @@ impl DkgVoter {

let mut commands = session.broadcast(&our_name, dkg_key, message);

commands.extend(
self.backlog
.take(&dkg_key)
.into_iter()
.flat_map(|message| session.process_message(&our_name, dkg_key, message)),
);

if let Some(command) = session.check() {
// Already completed.
commands.push(command)
Expand Down Expand Up @@ -228,9 +243,14 @@ impl DkgVoter {
dkg_key: DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
let session = if let Some(session) = &mut self.participant {
let session = if let Some(session) = self
.participant
.as_mut()
.filter(|session| session.dkg_key == dkg_key)
{
session
} else {
self.backlog.push(dkg_key, message);
return vec![];
};

Expand Down Expand Up @@ -352,10 +372,6 @@ impl Participant {
dkg_key: DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
if self.dkg_key != dkg_key {
return vec![];
}

trace!("process DKG message {:?}", message);
let responses = self
.key_gen
Expand All @@ -374,6 +390,8 @@ impl Participant {
dkg_key: DkgKey,
message: DkgMessage,
) -> Vec<DkgCommand> {
let mut commands = vec![];

let recipients: Vec<_> = self
.elders_info
.as_ref()
Expand All @@ -384,14 +402,15 @@ impl Participant {
.copied()
.collect();

trace!("broadcasting DKG message {:?} to {:?}", message, recipients);
if !recipients.is_empty() {
trace!("broadcasting DKG message {:?} to {:?}", message, recipients);
commands.push(DkgCommand::SendMessage {
recipients,
dkg_key,
message: message.clone(),
});
}

let mut commands = vec![];
commands.push(DkgCommand::SendMessage {
recipients,
dkg_key,
message: message.clone(),
});
commands.extend(self.process_message(our_name, dkg_key, message));
commands
}
Expand Down Expand Up @@ -448,6 +467,40 @@ struct Observer {
accumulator: HashMap<Result<bls::PublicKey, ()>, HashSet<XorName>>,
}

struct Backlog(VecDeque<(DkgKey, DkgMessage)>);

impl Backlog {
fn new() -> Self {
Self(VecDeque::with_capacity(BACKLOG_CAPACITY))
}

fn push(&mut self, dkg_key: DkgKey, message: DkgMessage) {
if self.0.len() == self.0.capacity() {
let _ = self.0.pop_front();
}

self.0.push_back((dkg_key, message))
}

fn take(&mut self, dkg_key: &DkgKey) -> Vec<DkgMessage> {
let mut output = Vec::new();
let max = self.0.len();

for _ in 0..max {
if let Some((message_dkg_key, message)) = self.0.pop_front() {
if &message_dkg_key == dkg_key {
output.push(message)
} else {
self.0.push_back((message_dkg_key, message))
}
}
}

output
}
}

#[derive(Debug)]
pub(crate) enum DkgCommand {
SendMessage {
recipients: Vec<SocketAddr>,
Expand Down
7 changes: 6 additions & 1 deletion src/routing/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,12 @@ impl Debug for Command {
.debug_struct("HandleDkgParticipationResult")
.field("dkg_key", dkg_key)
.field("elders_info", elders_info)
.field("result", result)
.field(
"result",
&result
.as_ref()
.map(|outcome| outcome.public_key_set.public_key()),
)
.finish(),
Self::HandleDkgObservationResult {
elders_info,
Expand Down
17 changes: 17 additions & 0 deletions tests/drop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ async fn test_node_drop() -> Result<()> {
assert_next_event!(nodes[1].1, Event::RelocationStarted { .. });
assert_next_event!(nodes[1].1, Event::Relocated { .. });

// Wait for the DKG(s) to complete, to make sure there are no more messages being exchanged.
let node_count = nodes.len();
for (node, events) in &mut nodes {
if node.our_elders().await.len() == node_count {
continue;
}

while let Some(event) = events.next().await {
match event {
Event::EldersChanged { elders, .. } if elders.len() == node_count => continue,
_ => {}
}
}

panic!("event stream closed before receiving Event::EldersChanged");
}

// Drop one node
let dropped_name = nodes.remove(1).0.name().await;

Expand Down

0 comments on commit 03873c1

Please sign in to comment.