Skip to content

Commit

Permalink
fix: adding target to the bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f committed Jan 12, 2023
1 parent d52f9a9 commit 443d283
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 68 deletions.
34 changes: 18 additions & 16 deletions src/mvba/abba/mod.rs
Expand Up @@ -86,17 +86,17 @@ impl Abba {
self.broadcast(action)
}

// receive_message process the received message 'msg` from `sender`
pub fn receive_message(&mut self, sender: NodeId, msg: Message) -> Result<()> {
// receive_message process the received message 'msg` from `initiator`
pub fn receive_message(&mut self, initiator: NodeId, msg: Message) -> Result<()> {
log::debug!(
"received {} message: {:?} from {}",
msg.action_str(),
msg,
sender
initiator
);

self.check_message(&sender, &msg)?;
if !self.add_message(&sender, &msg)? {
self.check_message(&initiator, &msg)?;
if !self.add_message(&initiator, &msg)? {
return Ok(());
}

Expand Down Expand Up @@ -317,42 +317,42 @@ impl Abba {
}
}

fn add_message(&mut self, sender: &NodeId, msg: &Message) -> Result<bool> {
fn add_message(&mut self, initiator: &NodeId, msg: &Message) -> Result<bool> {
match &msg.action {
Action::PreVote(action) => {
let pre_votes = self.get_mut_pre_votes_by_round(action.round);
if let Some(exist) = pre_votes.get(sender) {
if let Some(exist) = pre_votes.get(initiator) {
if exist != action {
return Err(Error::InvalidMessage(format!(
"double pre-vote detected from {:?}",
sender
initiator
)));
}
return Ok(false);
}

pre_votes.insert(*sender, action.clone());
pre_votes.insert(*initiator, action.clone());
}
Action::MainVote(action) => {
let main_votes = self.get_mut_main_votes_by_round(action.round);
if let Some(exist) = main_votes.get(sender) {
if let Some(exist) = main_votes.get(initiator) {
if exist != action {
return Err(Error::InvalidMessage(format!(
"double main-vote detected from {:?}",
sender
initiator
)));
}
return Ok(false);
}

main_votes.insert(*sender, action.clone());
main_votes.insert(*initiator, action.clone());
}
Action::Decision(_action) => (),
}
Ok(true)
}

fn check_message(&mut self, sender: &NodeId, msg: &Message) -> Result<()> {
fn check_message(&mut self, initiator: &NodeId, msg: &Message) -> Result<()> {
if msg.id != self.id {
return Err(Error::InvalidMessage(format!(
"invalid ID. expected: {}, got {}",
Expand All @@ -373,7 +373,7 @@ impl Abba {
let sign_bytes = self.pre_vote_bytes_to_sign(action.round, &action.value)?;
if !self
.pub_key_set
.public_key_share(sender)
.public_key_share(initiator)
.verify(&action.sig_share, sign_bytes)
{
return Err(Error::InvalidMessage("invalid signature share".to_string()));
Expand Down Expand Up @@ -446,7 +446,7 @@ impl Abba {
let sign_bytes = self.main_vote_bytes_to_sign(action.round, &action.value)?;
if !self
.pub_key_set
.public_key_share(sender)
.public_key_share(initiator)
.verify(&action.sig_share, sign_bytes)
{
return Err(Error::InvalidMessage("invalid signature share".to_string()));
Expand Down Expand Up @@ -536,7 +536,9 @@ impl Abba {
action,
};
let data = bincode::serialize(&msg)?;
self.broadcaster.borrow_mut().broadcast(MODULE_NAME, data);
self.broadcaster
.borrow_mut()
.broadcast(MODULE_NAME, Some(self.j), data);
self.receive_message(self.i, msg)?;
Ok(())
}
Expand Down
17 changes: 12 additions & 5 deletions src/mvba/broadcaster.rs
Expand Up @@ -21,19 +21,26 @@ impl Broadcaster {
self.self_id
}

pub fn send_to(&mut self, module: &str, payload: Vec<u8>, recipient: NodeId) {
let bdl = self.make_bundle(module, payload);
pub fn send_to(
&mut self,
module: &str,
target: Option<NodeId>,
payload: Vec<u8>,
recipient: NodeId,
) {
let bdl = self.make_bundle(module, target, payload);
self.outgoings.push(Outgoing::Direct(recipient, bdl));
}

pub fn broadcast(&mut self, module: &str, payload: Vec<u8>) {
let bdl = self.make_bundle(module, payload);
pub fn broadcast(&mut self, module: &str, target: Option<NodeId>, payload: Vec<u8>) {
let bdl = self.make_bundle(module, target, payload);
self.outgoings.push(Outgoing::Gossip(bdl));
}

fn make_bundle(&self, module: &str, payload: Vec<u8>) -> Bundle {
fn make_bundle(&self, module: &str, target: Option<NodeId>, payload: Vec<u8>) -> Bundle {
Bundle {
initiator: self.self_id,
target,
module: module.to_string(),
payload,
}
Expand Down
2 changes: 1 addition & 1 deletion src/mvba/bundle.rs
Expand Up @@ -7,7 +7,7 @@ pub struct Bundle {
/// This is the initiator node and in the most cases is same as `i` in specs.
pub initiator: NodeId,
/// This is the target node and in the most cases is same as `j` in specs.
// pub target: Option<NodeId>,
pub target: Option<NodeId>,
/// This is the destination module, it can be ABBA, VCBC or MVBA.
pub module: String,
/// This is the actual message
Expand Down
58 changes: 35 additions & 23 deletions src/mvba/consensus.rs
Expand Up @@ -97,38 +97,50 @@ impl Consensus {
}

match bundle.module.as_ref() {
vcbc::MODULE_NAME => match self.vcbc_map.get_mut(&bundle.initiator) {
Some(vcbc) => {
let msg = bincode::deserialize(&bundle.payload)?;
vcbc.receive_message(bundle.initiator, msg)?;
if vcbc.is_delivered() {
let (proposal, sig) = vcbc.delivered_message();
self.mvba.set_proposal(bundle.initiator, proposal, sig)?;
vcbc::MODULE_NAME => match bundle.target {
Some(target) => match self.vcbc_map.get_mut(&target) {
Some(vcbc) => {
let msg = bincode::deserialize(&bundle.payload)?;
vcbc.receive_message(bundle.initiator, msg)?;
if vcbc.is_delivered() {
let (proposal, sig) = vcbc.delivered_message();
self.mvba.set_proposal(target, proposal, sig)?;
}
}
}
None => return Err(Error::UnknownNodeId(bundle.initiator)),
None => {
return Err(Error::InvalidMessage(format!("target {target} not found")))
}
},
None => return Err(Error::InvalidMessage("no target is defined".to_string())),
},
abba::MODULE_NAME => match self.abba_map.get_mut(&bundle.initiator) {
Some(abba) => {
let msg = bincode::deserialize(&bundle.payload)?;
abba.receive_message(bundle.initiator, msg)?;
if abba.is_decided() {
if abba.decided_value() {
self.finished = true;
} else {
self.mvba.move_to_next_proposal()?;

abba::MODULE_NAME => match bundle.target {
Some(target) => match self.abba_map.get_mut(&target) {
Some(abba) => {
let msg = bincode::deserialize(&bundle.payload)?;
abba.receive_message(bundle.initiator, msg)?;
if abba.is_decided() {
if abba.decided_value() {
self.finished = true;
} else {
self.mvba.move_to_next_proposal()?;
}
}
}
}
None => {
return Err(Error::UnknownNodeId(bundle.initiator));
}
None => {
return Err(Error::InvalidMessage(format!("target {target} not found")))
}
},
None => return Err(Error::InvalidMessage("no target is defined".to_string())),
},
mvba::MODULE_NAME => {
let msg = bincode::deserialize(&bundle.payload)?;
self.mvba.receive_message(msg)?;
if self.mvba.is_completed() {
let abba = self.abba_map.get_mut(&bundle.initiator).unwrap();
let abba = self
.abba_map
.get_mut(&self.mvba.current_proposer())
.unwrap();
if self.mvba.completed_vote() {
abba.pre_vote_zero()?;
} else {
Expand Down
4 changes: 1 addition & 3 deletions src/mvba/error.rs
@@ -1,7 +1,7 @@
use core::fmt::Debug;
use thiserror::Error;

use super::{abba, mvba, vcbc, NodeId};
use super::{abba, mvba, vcbc};

#[derive(Error, Debug)]
pub enum Error {
Expand All @@ -17,8 +17,6 @@ pub enum Error {
InvalidMessage(String),
#[error("generic error {0}")]
Generic(String),
#[error("unknown node id {0}")]
UnknownNodeId(NodeId),
}

pub type Result<T> = std::result::Result<T, Error>;
24 changes: 18 additions & 6 deletions src/mvba/mvba/mod.rs
Expand Up @@ -78,6 +78,10 @@ impl Mvba {
self.vote()
}

pub fn current_proposer(&self) -> NodeId {
*self.parties.get(self.l).unwrap()
}

pub fn is_completed(&self) -> bool {
self.v.is_some()
}
Expand All @@ -87,7 +91,10 @@ impl Mvba {
}

pub fn completed_vote_one(&self) -> (Proposal, Signature) {
self.proposals.get(&self.l).unwrap().clone()
self.proposals
.get(&self.current_proposer())
.unwrap()
.clone()
}

fn check_message(&mut self, msg: &Message) -> Result<()> {
Expand Down Expand Up @@ -147,17 +154,20 @@ impl Mvba {
// (by sending it the message (ID|vcbc.a.0, c-request)).
let data = vcbc::make_c_request_message(&self.id, msg.vote.proposer)?;

self.broadcaster
.borrow_mut()
.send_to(vcbc::MODULE_NAME, data, msg.voter);
self.broadcaster.borrow_mut().send_to(
vcbc::MODULE_NAME,
Some(msg.vote.proposer),
data,
msg.voter,
);

Ok(false)
} else {
Ok(true)
}
}

/// receive_message process the received message 'msg` from `sender`
/// receive_message process the received message 'msg`
pub fn receive_message(&mut self, msg: Message) -> Result<()> {
self.check_message(&msg)?;
if !self.add_vote(&msg)? {
Expand Down Expand Up @@ -251,7 +261,9 @@ impl Mvba {
signature: sig,
};
let data = bincode::serialize(&msg)?;
self.broadcaster.borrow_mut().broadcast(MODULE_NAME, data);
self.broadcaster
.borrow_mut()
.broadcast(MODULE_NAME, None, data);
self.receive_message(msg)?;
Ok(())
}
Expand Down
32 changes: 18 additions & 14 deletions src/mvba/vcbc/mod.rs
Expand Up @@ -119,8 +119,8 @@ impl Vcbc {
self.broadcast(send_msg)
}

/// receive_message process the received message 'msg` from `sender`
pub fn receive_message(&mut self, sender: NodeId, msg: Message) -> Result<()> {
/// receive_message process the received message 'msg` from `initiator`
pub fn receive_message(&mut self, initiator: NodeId, msg: Message) -> Result<()> {
if msg.tag.id != self.tag.id {
return Err(Error::InvalidMessage(format!(
"invalid ID. expected: {}, got {}",
Expand All @@ -137,14 +137,14 @@ impl Vcbc {
"received {} message: {:?} from {}",
msg.action_str(),
msg,
sender
initiator
);
match msg.action.clone() {
Action::Send(m) => {
// Upon receiving message (ID.j.s, c-send, m) from Pl:
// if j = l and m̄ = ⊥ then
if sender == self.tag.j && self.m_bar.is_none() {
if !(self.message_validity)(sender, &m) {
if initiator == self.tag.j && self.m_bar.is_none() {
if !(self.message_validity)(initiator, &m) {
return Err(Error::InvalidMessage("invalid proposal".to_string()));
}
// m̄ ← m
Expand Down Expand Up @@ -179,10 +179,10 @@ impl Vcbc {
}

// Upon receiving message (ID.j.s, c-ready, d, νl) from Pl for the first time:
if let Vacant(e) = self.wd.entry(sender) {
if let Vacant(e) = self.wd.entry(initiator) {
let valid_sig = self
.pub_key_set
.public_key_share(sender)
.public_key_share(initiator)
.verify(&sig_share, sign_bytes);

if !valid_sig {
Expand Down Expand Up @@ -220,13 +220,13 @@ impl Vcbc {
Some(d) => d,
None => {
warn!("received c-final before receiving s-send, logging message");
try_insert(&mut self.final_messages, sender, msg)?;
try_insert(&mut self.final_messages, initiator, msg)?;
// requesting for the proposal
let request_msg = Message {
tag: self.tag.clone(),
action: Action::Request,
};
self.send_to(request_msg, sender)?;
self.send_to(request_msg, initiator)?;

return Ok(());
}
Expand Down Expand Up @@ -258,7 +258,7 @@ impl Vcbc {
};

// send (ID.j.s, c-answer, m̄, µ̄) to Pl
self.send_to(answer_msg, sender)?;
self.send_to(answer_msg, initiator)?;
}
}
}
Expand All @@ -279,8 +279,8 @@ impl Vcbc {
}
}

for (sender, final_msg) in std::mem::take(&mut self.final_messages) {
self.receive_message(sender, final_msg)?;
for (initiator, final_msg) in std::mem::take(&mut self.final_messages) {
self.receive_message(initiator, final_msg)?;
}

Ok(())
Expand Down Expand Up @@ -315,7 +315,9 @@ impl Vcbc {
if to == self.i {
self.receive_message(self.i, msg)?;
} else {
self.broadcaster.borrow_mut().send_to(MODULE_NAME, data, to);
self.broadcaster
.borrow_mut()
.send_to(MODULE_NAME, Some(self.i), data, to);
}
Ok(())
}
Expand All @@ -324,7 +326,9 @@ impl Vcbc {
// It adds the message to our messages log.
fn broadcast(&mut self, msg: self::Message) -> Result<()> {
let data = bincode::serialize(&msg)?;
self.broadcaster.borrow_mut().broadcast(MODULE_NAME, data);
self.broadcaster
.borrow_mut()
.broadcast(MODULE_NAME, Some(self.i), data);
self.receive_message(self.i, msg)?;
Ok(())
}
Expand Down

0 comments on commit 443d283

Please sign in to comment.