Skip to content

Commit

Permalink
Merge 0cf9873 into 523d26c
Browse files Browse the repository at this point in the history
  • Loading branch information
maqi committed Nov 19, 2021
2 parents 523d26c + 0cf9873 commit ef635c4
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 65 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Expand Up @@ -21,6 +21,7 @@ rand_core = "~0.5.1"
serde = "1.0.106"
serde_derive = "1.0.106"
thiserror = "1.0.23"
tiny-keccak = { version = "2.0.2", features = ["sha3"] }
xor_name = "3.0.0"

[dev-dependencies]
Expand Down
30 changes: 30 additions & 0 deletions src/key_gen/message.rs
Expand Up @@ -12,8 +12,12 @@ use super::{Acknowledgment, Part};
use serde_derive::{Deserialize, Serialize};
use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use tiny_keccak::{Hasher, Sha3};
use xor_name::XorName;

/// SHA3-256 hash digest.
type Digest256 = [u8; 32];

/// Messages used for running BLS DKG.
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(bound = "")]
Expand Down Expand Up @@ -43,6 +47,32 @@ pub enum Message {
},
}

impl Message {
// Creator of the message.
pub fn creator(&self) -> u64 {
match &*self {
Message::Initialization { key_gen_id, .. }
| Message::Proposal { key_gen_id, .. }
| Message::Complaint { key_gen_id, .. }
| Message::Justification { key_gen_id, .. }
| Message::Acknowledgment { key_gen_id, .. } => *key_gen_id,
}
}

// Identifier of the message.
pub fn id(&self) -> XorName {
let mut hasher = Sha3::v256();
let mut hash = Digest256::default();

if let Ok(serialized) = bincode::serialize(self) {
hasher.update(&serialized);
hasher.finalize(&mut hash);
}

XorName::from_content(&hash)
}
}

impl fmt::Debug for Message {
fn fmt(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
match &*self {
Expand Down
148 changes: 92 additions & 56 deletions src/key_gen/mod.rs
Expand Up @@ -318,8 +318,8 @@ pub struct KeyGen {
complaints_accumulator: ComplaintsAccumulator,
/// Pending complain messages.
pending_complain_messages: Vec<Message>,
/// Pending messages that cannot handle yet.
pending_messages: Vec<Message>,
/// Cached messages to be used for reply unhandable.
caching_messages: BTreeMap<XorName, Message>,
}

impl KeyGen {
Expand Down Expand Up @@ -350,7 +350,7 @@ impl KeyGen {
initalization_accumulator: InitializationAccumulator::new(),
complaints_accumulator: ComplaintsAccumulator::new(names.clone(), threshold),
pending_complain_messages: Vec::new(),
pending_messages: Vec::new(),
caching_messages: BTreeMap::new(),
};

Ok((
Expand All @@ -377,36 +377,60 @@ impl KeyGen {
if self.is_finalized() {
return Ok(Vec::new());
}
let result = self.process_message(rng, msg.clone());
match result {
Ok(mut msgs) => {
msgs.extend(self.poll_pending_messages(rng));
Ok(msgs)
}
Err(Error::UnexpectedPhase { .. }) | Err(Error::MissingPart) => {
self.pending_messages.push(msg);
Ok(Vec::new())
}
Err(_) => result,
}

self.process_message(rng, msg)
}

fn poll_pending_messages<R: RngCore>(&mut self, rng: &mut R) -> Vec<Message> {
/// Cached message will be returned as a list,
/// with Initialization messages on top and Proposal behind.
/// They shall get handled in such order on receiver side as well.
pub fn get_cached_message(&self) -> Vec<Message> {
let mut result = Vec::new();
result.extend(
self.caching_messages
.iter()
.filter_map(|(_, msg)| match msg {
Message::Initialization { .. } => Some(msg.clone()),
_ => None,
})
.collect::<Vec<Message>>(),
);
result.extend(
self.caching_messages
.iter()
.filter_map(|(_, msg)| match msg {
Message::Proposal { .. } => Some(msg.clone()),
_ => None,
})
.collect::<Vec<Message>>(),
);
result
}

/// Handle upper layer cached messages even before this DKG session got started.
/// Returns with messages need to be broadcast to all,
/// AND unhandable messages need to be sent to the creator.
/// It is also being used when handle message_history due to unhandable.
pub fn handle_pre_session_messages<R: RngCore>(
&mut self,
rng: &mut R,
mut cache_messages: Vec<Message>,
) -> (Vec<Message>, Vec<(XorName, Message)>) {
let mut msgs = Vec::new();
let mut updated = false;
loop {
trace!("new round polling pending messages");
let pending_messages = std::mem::take(&mut self.pending_messages);
trace!("new round polling history messages");
let pending_messages = std::mem::take(&mut cache_messages);
for message in pending_messages {
if let Ok(new_messages) = self.process_message(rng, message.clone()) {
if self.is_finalized() {
return Vec::new();
return (Vec::new(), Vec::new());
}
msgs.extend(new_messages);
updated = true;
} else {
trace!("pushing back pending message {:?}", message);
self.pending_messages.push(message);
trace!("pushing back history message {:?}", message);
cache_messages.push(message);
}
}
if !updated {
Expand All @@ -415,26 +439,50 @@ impl KeyGen {
updated = false;
}
}
msgs

let mut unhandables = Vec::new();
for msg in cache_messages {
let sender = if let Some(name) = self.node_id_from_index(msg.creator()) {
name
} else {
warn!(
"cannot get name of index {:?} among {:?}",
msg.creator(),
self.names
);
continue;
};
unhandables.push((sender, msg));
}
(msgs, unhandables)
}

fn process_message<R: RngCore>(
&mut self,
rng: &mut R,
msg: Message,
) -> Result<Vec<Message>, Error> {
debug!(
"{:?} with phase {:?} handle DKG message {:?}",
self, self.phase, msg
trace!(
"{:?} with phase {:?} handle DKG message {:?}-{:?}",
self,
self.phase,
msg.id(),
msg
);
match msg {
match msg.clone() {
Message::Initialization {
key_gen_id,
m,
n,
member_list,
} => self.handle_initialization(rng, m, n, key_gen_id, member_list),
Message::Proposal { key_gen_id, part } => self.handle_proposal(key_gen_id, part),
} => {
let _ = self.caching_messages.insert(msg.id(), msg);
self.handle_initialization(rng, m, n, key_gen_id, member_list)
}
Message::Proposal { key_gen_id, part } => {
let _ = self.caching_messages.insert(msg.id(), msg);
self.handle_proposal(key_gen_id, part)
}
Message::Complaint {
key_gen_id,
target,
Expand All @@ -459,10 +507,7 @@ impl KeyGen {
member_list: BTreeSet<XorName>,
) -> Result<Vec<Message>, Error> {
if self.phase != Phase::Initialization {
return Err(Error::UnexpectedPhase {
expected: Phase::Initialization,
actual: self.phase,
});
return Ok(Vec::new());
}

if let Some((m, _n, member_list)) =
Expand Down Expand Up @@ -512,11 +557,13 @@ impl KeyGen {
// When there is an invalidation happens, holds the `Complaint` message till broadcast out
// when `finalize_contributing` being called.
fn handle_proposal(&mut self, sender_index: u64, part: Part) -> Result<Vec<Message>, Error> {
if !(self.phase == Phase::Contribution || self.phase == Phase::Commitment) {
if self.phase == Phase::Initialization {
return Err(Error::UnexpectedPhase {
expected: Phase::Contribution,
actual: self.phase,
});
} else if !(self.phase == Phase::Contribution || self.phase == Phase::Commitment) {
return Ok(Vec::new());
}

let row = match self.handle_part_or_fault(sender_index, part.clone()) {
Expand Down Expand Up @@ -576,12 +623,15 @@ impl KeyGen {
sender_index: u64,
ack: Acknowledgment,
) -> Result<Vec<Message>, Error> {
if !(self.phase == Phase::Contribution || self.phase == Phase::Commitment) {
if self.phase == Phase::Initialization {
return Err(Error::UnexpectedPhase {
expected: Phase::Contribution,
actual: self.phase,
});
} else if !(self.phase == Phase::Contribution || self.phase == Phase::Commitment) {
return Ok(Vec::new());
}

match self.handle_ack_or_fault(sender_index, ack.clone()) {
Ok(()) => {
if self.all_contribution_received() {
Expand Down Expand Up @@ -656,7 +706,6 @@ impl KeyGen {
self.become_finalization();
}

self.pending_messages.clear();
Ok(mem::take(&mut self.pending_complain_messages))
}

Expand Down Expand Up @@ -690,22 +739,10 @@ impl KeyGen {
&mut self,
rng: &mut R,
) -> Result<Vec<Message>, Error> {
debug!("{:?} current phase is {:?}", self, self.phase);
trace!("{:?} current phase is {:?}", self, self.phase);
match self.phase {
Phase::Contribution => match self.finalize_contributing_phase() {
Ok(mut messages) => {
messages.extend(self.poll_pending_messages(rng));
Ok(messages)
}
Err(err) => Err(err),
},
Phase::Complaining => match self.finalize_complaining_phase(rng) {
Ok(mut messages) => {
messages.extend(self.poll_pending_messages(rng));
Ok(messages)
}
Err(err) => Err(err),
},
Phase::Contribution => self.finalize_contributing_phase(),
Phase::Complaining => self.finalize_complaining_phase(rng),
Phase::Initialization => Err(Error::UnexpectedPhase {
expected: Phase::Contribution,
actual: self.phase,
Expand All @@ -727,10 +764,8 @@ impl KeyGen {
invalid_msg: Vec<u8>,
) -> Result<Vec<Message>, Error> {
if self.phase != Phase::Complaining {
return Err(Error::UnexpectedPhase {
expected: Phase::Complaining,
actual: self.phase,
});
trace!("To avoid triggering AE pattern, skip this so far");
return Ok(Vec::new());
}

let sender_id = self
Expand All @@ -750,13 +785,15 @@ impl KeyGen {
rng: &mut R,
) -> Result<Vec<Message>, Error> {
let failings = self.complaints_accumulator.finalize_complaining_phase();

if failings.len() >= self.names.len() - self.threshold {
let mut result = BTreeSet::new();
failings.iter().for_each(|pk| {
if let Some(index) = self.node_index(pk) {
let _ = result.insert(index);
}
});
trace!("Finalized with too many failing voters");
return Err(Error::TooManyNonVoters(result));
}

Expand Down Expand Up @@ -832,7 +869,6 @@ impl KeyGen {

fn become_finalization(&mut self) {
self.phase = Phase::Finalization;
self.pending_messages.clear();
self.pending_complain_messages.clear();
}

Expand Down Expand Up @@ -1048,7 +1084,7 @@ impl KeyGen {
initalization_accumulator: InitializationAccumulator::new(),
complaints_accumulator: ComplaintsAccumulator::new(names, threshold),
pending_complain_messages: Vec::new(),
pending_messages: Vec::new(),
caching_messages: BTreeMap::new(),
}
}
}
Expand Down

0 comments on commit ef635c4

Please sign in to comment.