From 0c892a85659590658d860b1194141ccff7df71a7 Mon Sep 17 00:00:00 2001 From: qima Date: Tue, 23 Nov 2021 15:19:15 +0800 Subject: [PATCH] feat!: avoid broadcasting direct messages BREAKING CHANGE: specify the receiver explicity to avoid it to be broadcasted unnecessarily. --- src/key_gen/mod.rs | 109 +++++++++++++++++++++++++++++++++---------- src/key_gen/tests.rs | 42 +++++++++-------- 2 files changed, 107 insertions(+), 44 deletions(-) diff --git a/src/key_gen/mod.rs b/src/key_gen/mod.rs index 1303ccd..c83fea6 100644 --- a/src/key_gen/mod.rs +++ b/src/key_gen/mod.rs @@ -102,7 +102,7 @@ impl Debug for Part { /// For each node, it contains `proposal_index, receiver_index, serialised value for the receiver, /// encrypted values from the sender`. #[derive(Deserialize, Serialize, Clone, Hash, Eq, PartialEq, PartialOrd, Ord)] -pub struct Acknowledgment(u64, u64, Vec, Vec>); +pub struct Acknowledgment(pub u64, u64, Vec, Vec>); impl Debug for Acknowledgment { fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { @@ -281,6 +281,8 @@ impl ComplaintsAccumulator { } } +pub type MessageAndTarget = (XorName, Message); + /// An algorithm for dealerless distributed key generation. /// /// This is trying to follow the protocol as suggested at @@ -329,7 +331,7 @@ impl KeyGen { our_id: XorName, threshold: usize, names: BTreeSet, - ) -> Result<(KeyGen, Message), Error> { + ) -> Result<(KeyGen, Vec), Error> { if names.len() < threshold { return Err(Error::Unknown); } @@ -353,15 +355,15 @@ impl KeyGen { message_cache: BTreeMap::new(), }; - Ok(( - key_gen, - Message::Initialization { - key_gen_id: our_index, - m: threshold, - n: names.len(), - member_list: names, - }, - )) + let msg = Message::Initialization { + key_gen_id: our_index, + m: threshold, + n: names.len(), + member_list: names.clone(), + }; + let messages: Vec<_> = names.iter().map(|name| (*name, msg.clone())).collect(); + + Ok((key_gen, messages)) } pub fn phase(&self) -> Phase { @@ -373,7 +375,7 @@ impl KeyGen { &mut self, rng: &mut R, msg: Message, - ) -> Result, Error> { + ) -> Result, Error> { if self.is_finalized() { return Ok(Vec::new()); } @@ -415,7 +417,7 @@ impl KeyGen { &mut self, rng: &mut R, mut cache_messages: Vec, - ) -> (Vec, Vec<(XorName, Message)>) { + ) -> (Vec, Vec) { let mut msgs = Vec::new(); let mut updated = false; loop { @@ -461,7 +463,7 @@ impl KeyGen { &mut self, rng: &mut R, msg: Message, - ) -> Result, Error> { + ) -> Result, Error> { trace!( "{:?} with phase {:?} handle DKG message {:?}-{:?}", self, @@ -469,7 +471,7 @@ impl KeyGen { msg.id(), msg ); - match msg.clone() { + let result = match msg.clone() { Message::Initialization { key_gen_id, m, @@ -479,10 +481,7 @@ impl KeyGen { let _ = self.message_cache.insert(msg.id(), msg); self.handle_initialization(rng, m, n, key_gen_id, member_list) } - Message::Proposal { key_gen_id, part } => { - let _ = self.message_cache.insert(msg.id(), msg); - self.handle_proposal(key_gen_id, part) - } + Message::Proposal { key_gen_id, part } => self.handle_proposal(key_gen_id, part), Message::Complaint { key_gen_id, target, @@ -493,7 +492,8 @@ impl KeyGen { keys_map, } => self.handle_justification(key_gen_id, keys_map), Message::Acknowledgment { key_gen_id, ack } => self.handle_ack(key_gen_id, ack), - } + }; + self.multicasting_messages(result) } // Handles an incoming initialize message. Creates the `Proposal` message once quorumn @@ -738,9 +738,9 @@ impl KeyGen { pub fn timed_phase_transition( &mut self, rng: &mut R, - ) -> Result, Error> { + ) -> Result, Error> { trace!("{:?} current phase is {:?}", self, self.phase); - match self.phase { + let result = match self.phase { Phase::Contribution => self.finalize_contributing_phase(), Phase::Complaining => self.finalize_complaining_phase(rng), Phase::Initialization => Err(Error::UnexpectedPhase { @@ -753,6 +753,58 @@ impl KeyGen { }), Phase::Finalization => Ok(Vec::new()), + }; + self.multicasting_messages(result) + } + + // Specify the receiver of the DKG messages explicitly + // to avoid un-necessary broadcasting. + fn multicasting_messages( + &mut self, + result: Result, Error>, + ) -> Result, Error> { + match result { + Ok(messages) => { + let mut messaging = Vec::new(); + for message in messages { + match message { + Message::Proposal { ref part, .. } => { + // Proposal to us cannot be used by other. + // So the cache must be carried out on sender side. + let _ = self.message_cache.insert(message.id(), message.clone()); + + let receiver = + if let Some(name) = self.node_id_from_index(part.receiver) { + name + } else { + warn!( + "For a Proposal, Cannot get name of index {:?} among {:?}", + part.receiver, self.names + ); + continue; + }; + messaging.push((receiver, message)); + } + Message::Acknowledgment { ref ack, .. } => { + let receiver = if let Some(name) = self.node_id_from_index(ack.1) { + name + } else { + warn!("For an Acknowledgement, Cannot get name of index {:?} among {:?}", + ack.1, self.names); + continue; + }; + messaging.push((receiver, message)); + } + _ => { + for name in &self.names { + messaging.push((*name, message.clone())); + } + } + } + } + Ok(messaging) + } + Err(err) => Err(err), } } @@ -881,7 +933,7 @@ impl KeyGen { } /// Returns the id of the index, or `None` if it is unknown. - fn node_id_from_index(&self, node_index: u64) -> Option { + pub fn node_id_from_index(&self, node_index: u64) -> Option { for (i, name) in self.names.iter().enumerate() { if i == node_index as usize { return Some(*name); @@ -906,7 +958,16 @@ impl KeyGen { /// Returns `true` if in the phase of Finalization. pub fn is_finalized(&self) -> bool { - self.phase == Phase::Finalization + let result = self.phase == Phase::Finalization; + + if !result { + trace!("incompleted DKG session containing:"); + for (key, part) in self.parts.iter() { + let acks: Vec = part.values.keys().cloned().collect(); + trace!(" Part from {:?}, and acks from {:?}", key, acks); + } + } + result } /// Returns the new secret key share and the public key set. diff --git a/src/key_gen/tests.rs b/src/key_gen/tests.rs index 0115a16..f595ba8 100644 --- a/src/key_gen/tests.rs +++ b/src/key_gen/tests.rs @@ -8,7 +8,7 @@ // Software. use crate::dev_utils::{create_ids, PeerId}; -use crate::key_gen::{message::Message, Error, KeyGen}; +use crate::key_gen::{message::Message, Error, KeyGen, MessageAndTarget}; use anyhow::{format_err, Result}; use bincode::serialize; use blsttc::{PublicKeySet, SignatureShare}; @@ -48,7 +48,7 @@ fn create_generators( let mut proposals = Vec::new(); for peer_id in peer_ids.iter() { let key_gen = { - let (key_gen, proposal) = + let (key_gen, messaging) = match KeyGen::initialize(peer_id.name(), threshold, names.clone()) { Ok(result) => result, Err(err) => { @@ -59,7 +59,7 @@ fn create_generators( )) } }; - proposals.push(proposal); + proposals.extend(messaging); key_gen }; @@ -74,7 +74,7 @@ fn create_generators( fn messaging( mut rng: &mut R, generators: &mut Vec, - proposals: &mut Vec, + proposals: &mut Vec, non_responsives: BTreeSet, ) { // Simulating the AE pattern @@ -84,7 +84,7 @@ fn messaging( // The proposal from non_responsive nodes shall be ignored. while !proposals.is_empty() { let proposals_local = std::mem::take(proposals); - for proposal in &proposals_local { + for (receiver, proposal) in &proposals_local { match proposal { Message::Initialization { .. } | Message::Proposal { .. } => { let _ = cached_msg.insert(proposal.id(), proposal.clone()); @@ -92,21 +92,23 @@ fn messaging( _ => {} } for (index, generator) in generators.iter_mut().enumerate() { - let proposal_vec = if let Ok(proposal_vec) = - generator.handle_message(&mut rng, proposal.clone()) - { - proposal_vec - } else { - let mut messages: Vec = cached_msg.values().cloned().collect(); - messages.push(proposal.clone()); - let (proposal_vec, _unhandable) = - generator.handle_pre_session_messages(&mut rng, messages); - proposal_vec - }; - if !non_responsives.contains(&(index as u64)) { - proposal_vec - .iter() - .for_each(|prop| proposals.push(prop.clone())); + if receiver == &generator.our_id { + let messaging_vec = if let Ok(messaging_vec) = + generator.handle_message(&mut rng, proposal.clone()) + { + messaging_vec + } else { + let mut messages: Vec = cached_msg.values().cloned().collect(); + messages.push(proposal.clone()); + let (messaging_vec, _unhandable) = + generator.handle_pre_session_messages(&mut rng, messages); + messaging_vec + }; + if !non_responsives.contains(&(index as u64)) { + messaging_vec + .iter() + .for_each(|prop| proposals.push(prop.clone())); + } } } }