453 changes: 320 additions & 133 deletions src/mvba/abba/tests.rs

Large diffs are not rendered by default.

103 changes: 56 additions & 47 deletions src/mvba/broadcaster.rs
@@ -1,83 +1,92 @@
use super::{bundle::Bundle, NodeId};
use blsttc::SecretKeyShare;
use super::{
bundle::{Bundle, Outgoing},
NodeId,
};

// Broadcaster holds information required to broadcast the messages.
#[derive(Debug)]
pub struct Broadcaster {
bundle_id: u32,
self_id: NodeId,
_sec_key_share: SecretKeyShare, // TODO: SecretKeyShare or SecretKey?
broadcast_bundles: Vec<Bundle>,
send_bundles: Vec<(NodeId, Bundle)>,
outgoings: Vec<Outgoing>,
}

impl Broadcaster {
pub fn new(bundle_id: u32, self_id: NodeId, sec_key_share: SecretKeyShare) -> Self {
pub fn new(self_id: NodeId) -> Self {
Self {
bundle_id,
self_id,
_sec_key_share: sec_key_share,
broadcast_bundles: Vec::new(),
send_bundles: Vec::new(),
outgoings: Vec::new(),
}
}

pub fn self_id(&self) -> NodeId {
self.self_id
}

pub fn send_to(&mut self, module: &str, message: Vec<u8>, recipient: NodeId) {
let bdl = Bundle {
id: self.bundle_id,
sender: self.self_id,
module: module.to_string(),
message,
};
self.send_bundles.push((recipient, bdl));
pub fn send_to(
&mut self,
module: &str,
target: Option<NodeId>,
payload: Vec<u8>,
recipient: NodeId,
) {
let bdl = self.make_bundle(module, target, payload);
self.outgoings.push(Outgoing::Direct(recipient, bdl));
}

pub fn broadcast(&mut self, module: &str, message: Vec<u8>) {
let bdl = Bundle {
id: self.bundle_id,
sender: self.self_id,
module: module.to_string(),
message,
};
self.broadcast_bundles.push(bdl);
pub fn broadcast(&mut self, module: &str, target: Option<NodeId>, payload: Vec<u8>) {
let bdl = self.make_bundle(module, target, payload);
self.outgoings.push(Outgoing::Gossip(bdl));
}

#[allow(dead_code)]
pub fn take_broadcast_bundles(&mut self) -> Vec<Vec<u8>> {
let mut data = Vec::with_capacity(self.broadcast_bundles.len());
for bdl in std::mem::take(&mut self.broadcast_bundles) {
data.push(bincode::serialize(&bdl).unwrap())
fn make_bundle(&self, module: &str, target: Option<NodeId>, payload: Vec<u8>) -> Bundle {
Bundle {
initiator: self.self_id,
target,
module: module.to_string(),
payload,
}
data
}

#[allow(dead_code)]
pub fn take_send_bundles(&mut self) -> Vec<(NodeId, Vec<u8>)> {
let mut data = Vec::with_capacity(self.send_bundles.len());
for (recipient, bdl) in std::mem::take(&mut self.send_bundles) {
data.push((recipient, bincode::serialize(&bdl).unwrap()))
pub fn take_outgoings(&mut self) -> Vec<Outgoing> {
std::mem::take(&mut self.outgoings)
}

#[allow(clippy::type_complexity)]
#[cfg(test)]
pub fn take_bundles(&mut self) -> (Vec<Vec<u8>>, Vec<(NodeId, Vec<u8>)>) {
let mut gossips = Vec::with_capacity(self.outgoings.len());
let mut directs = Vec::with_capacity(self.outgoings.len());

for out in std::mem::take(&mut self.outgoings) {
match out {
Outgoing::Gossip(bdl) => gossips.push(bincode::serialize(&bdl).unwrap()),
Outgoing::Direct(recipient, bdl) => {
directs.push((recipient, bincode::serialize(&bdl).unwrap()))
}
}
}
data
(gossips, directs)
}

#[cfg(test)]
pub fn has_broadcast_message(&self, msg: &[u8]) -> bool {
for bdl in &self.broadcast_bundles {
if bdl.message.eq(&msg) {
return true;
pub fn has_gossip_message(&self, pld: &[u8]) -> bool {
for out in &self.outgoings {
if let Outgoing::Gossip(bdl) = out {
if bdl.payload.eq(&pld) {
return true;
}
}
}
false
}

#[cfg(test)]
pub fn has_send_message(&self, to: &NodeId, msg: &[u8]) -> bool {
for (receiver, bdl) in &self.send_bundles {
if bdl.message.eq(&msg) {
return receiver == to;
pub fn has_direct_message(&self, to: &NodeId, pld: &[u8]) -> bool {
for out in &self.outgoings {
if let Outgoing::Direct(recipient, bdl) = out {
if bdl.payload == pld && recipient == to {
return true;
}
}
}
false
Expand Down
22 changes: 17 additions & 5 deletions src/mvba/bundle.rs
@@ -1,10 +1,22 @@
use super::NodeId;
use serde::{Deserialize, Serialize};

#[derive(Debug, Serialize, Deserialize)]
/// Bundle is a wrapper around the actual message
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Bundle {
pub id: u32,
pub sender: NodeId,
pub module: String, // TODO: use enum
pub message: Vec<u8>, // TODO: use enum
/// This is the initiator node and in the most cases is same as `i` in specs.
pub initiator: NodeId,
/// This is the target node and in the most cases is same as `j` in specs.
pub target: Option<NodeId>,
/// This is the destination module, it can be ABBA, VCBC or MVBA.
pub module: String,
/// This is the actual message
pub payload: Vec<u8>,
}

/// Ongoing messages definition
#[derive(Debug)]
pub enum Outgoing {
Gossip(Bundle),
Direct(NodeId, Bundle),
}
331 changes: 257 additions & 74 deletions src/mvba/consensus.rs
@@ -1,139 +1,322 @@
use crate::mvba::{
broadcaster::Broadcaster, proposal::Proposal, vcbc::Vcbc, NodeId, ProposalChecker,
};
use blsttc::{PublicKeySet, SecretKeyShare};
use std::{cell::RefCell, collections::HashMap, rc::Rc};

use super::{
abba::{self, Abba},
bundle::Bundle,
vcbc::{self, message::Tag},
bundle::{Bundle, Outgoing},
error::Error,
error::Result,
hash::Hash32,
mvba::{self, Mvba},
vcbc::{self},
Proposal,
};
use crate::mvba::{broadcaster::Broadcaster, vcbc::Vcbc, MessageValidity, NodeId};
use blsttc::{PublicKeySet, SecretKeyShare};
use std::{cell::RefCell, collections::HashMap, rc::Rc};

pub struct Consensus {
id: String,
self_id: NodeId,
threshold: usize,
abba_map: HashMap<NodeId, Abba>,
vcbc_map: HashMap<NodeId, Vcbc>,
#[allow(unused)]
mvba: Mvba,
decided_party: Option<NodeId>,
broadcaster: Rc<RefCell<Broadcaster>>,
}

impl Consensus {
pub fn init(
bundle_id: u32,
id: String,
self_id: NodeId,
sec_key_share: SecretKeyShare,
pub_key_set: PublicKeySet,
parties: Vec<NodeId>,
_proposal_checker: ProposalChecker,
message_validity: MessageValidity,
) -> Consensus {
let broadcaster = Broadcaster::new(bundle_id, self_id, sec_key_share.clone());
let broadcaster = Broadcaster::new(self_id);
let broadcaster_rc = Rc::new(RefCell::new(broadcaster));

// TODO: uncomment me
// let abba = Abba::new(
// pub_key_set.clone(),
// number,
// threshold,
// broadcaster_rc.clone(),
// );
let mut abba_map = HashMap::new();
let mut vcbc_map = HashMap::new();

for id in &parties {
let tag = Tag::new("vcbc", *id, 0);
for party in &parties {
let vcbc = Vcbc::new(
id.clone(),
self_id,
tag.clone(),
*party,
pub_key_set.clone(),
sec_key_share.clone(),
message_validity,
broadcaster_rc.clone(),
);
vcbc_map.insert(*id, vcbc).unwrap();
vcbc_map.insert(*party, vcbc);

let abba = Abba::new(
format!("{}", id),
id.clone(),
self_id,
tag,
*party,
pub_key_set.clone(),
sec_key_share.clone(),
broadcaster_rc.clone(),
);
abba_map.insert(*id, abba).unwrap();
abba_map.insert(*party, abba);
}

let mvba = Mvba::new(
id.clone(),
self_id,
sec_key_share,
pub_key_set,
parties,
broadcaster_rc.clone(),
);

Consensus {
id,
self_id,
threshold: pub_key_set.threshold(),
vcbc_map,
abba_map,
mvba,
decided_party: None,
broadcaster: broadcaster_rc,
}
}

// start the consensus by proposing a proposal and broadcasting it.
pub fn start(&mut self, _proposal: Proposal) -> Vec<Vec<u8>> {
/// starts the consensus by proposing the `proposal`.
pub fn propose(&mut self, proposal: Proposal) -> Result<Vec<Outgoing>> {
match self.vcbc_map.get_mut(&self.self_id) {
Some(_vcbc) => {}
Some(vcbc) => {
// verifiably authenticatedly c-broadcast message (v-echo, w, π) tagged with ID|vcbc.i.0
vcbc.c_broadcast(proposal)?;
}
None => {
log::warn!("this node is an observer node")
}
}
Ok(self.broadcaster.borrow_mut().take_outgoings())
}

pub fn process_bundle(&mut self, bundle: &Bundle) -> Result<Vec<Outgoing>> {
if self.decided_party.is_some() {
return Ok(vec![]);
}

// TODO:
// self.broadcaster.borrow_mut().take_bundles()

// TODO: fixme
// just fixing cargo clippy issees
match self.abba_map.get_mut(&self.self_id) {
Some(abba) => {
let c_final = vcbc::message::Message {
tag: vcbc::message::Tag::new("id", 0, 0),
action: vcbc::message::Action::Send(vec![]),
};
abba.pre_vote_one(c_final).unwrap();
abba.pre_vote_zero().unwrap();
abba.is_decided();
match bundle.module.as_ref() {
vcbc::MODULE_NAME => match bundle.target {
Some(target) => match self.vcbc_map.get_mut(&target) {
Some(vcbc) => {
let msg = bincode::deserialize(&bundle.payload)?;
vcbc.receive_message(bundle.initiator, msg)?;
if vcbc.is_delivered() {
// Check if we have agreed on this proposal before.
// There might be a situation that we receive the agreement
// before receiving the actual proposal.
let abba = self.abba_map.get_mut(&target).unwrap();
if let Some(decided_value) = abba.decided_value() {
if decided_value {
// We re done! We have both proposal and agreement
log::info!("halted. proposer: {target}");
self.decided_party = Some(target);
}
}

let (proposal, sig) = vcbc.delivered_message();
self.mvba.set_proposal(target, proposal, sig)?;
}
}
None => {
return Err(Error::InvalidMessage(format!("target {target} not found")))
}
},
None => return Err(Error::InvalidMessage("no target is defined".to_string())),
},

abba::MODULE_NAME => match bundle.target {
Some(target) => match self.abba_map.get_mut(&target) {
Some(abba) => {
let msg = bincode::deserialize(&bundle.payload)?;
abba.receive_message(bundle.initiator, msg)?;
if let Some(decided_value) = abba.decided_value() {
if decided_value {
let vcbc = self.vcbc_map.get_mut(&target).unwrap();
if vcbc.is_delivered() {
// We re done! We have both proposal and agreement
log::info!("halted. proposer: {target}");
self.decided_party = Some(target);
} else {
// abba is finished but still we don't have the proposal
// request it from the initiator
let data = vcbc::make_c_request_message(&self.id, target)?;

self.broadcaster.borrow_mut().broadcast(
vcbc::MODULE_NAME,
Some(target),
data,
);
}
} else if !self.mvba.move_to_next_proposal()? {
log::warn!("no more proposal");
}
Comment on lines +154 to +156
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should continue in a loop forever going back to the first proposer.

It could be that we hit the first guy too early. Now that some time has passed their proposal had enough time to reach the other nodes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if it is a good idea, in this case we might have double votes per proposal, unless we increase the sequence from 0 to 1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, you're worried about a tag re-use issue. Yeah good point. Ok, let's leave it as is for now and I'll create an issue to track this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually... I don't think this is possible to happen.

A node will not move from the VCBC stage until it receives confirmation that a threshold of voters have seen the proposal. So we no at the very least that 2/3rd's will vote 1 for at least one of the proposers.

Still, perhaps more randomized testing will help build our confidence here.

}
}
None => {
return Err(Error::InvalidMessage(format!("target {target} not found")))
}
},
None => return Err(Error::InvalidMessage("no target is defined".to_string())),
},
mvba::MODULE_NAME => {
let msg = bincode::deserialize(&bundle.payload)?;
self.mvba.receive_message(msg)?;
}
None => {
log::warn!("this node is an observer node")

_ => {
return Err(Error::InvalidMessage(format!(
"unknown module {}",
bundle.module
)));
}
}
vec![]
}
};

if let Some(completed_vote) = self.mvba.completed_vote() {
let abba = self
.abba_map
.get_mut(&self.mvba.current_proposer())
.unwrap();

pub fn process_bundle(&mut self, sender: NodeId, bundle: &Bundle) -> Vec<Vec<u8>> {
let mut delivered_count = 0;
for vcbc in self.vcbc_map.values() {
if vcbc.is_delivered() {
delivered_count += 1;
if completed_vote {
// The proposal is c-delivered and we have proof for that.
// Let's start binary agreement by voting 1
if let Some((proposal, sig)) = self.mvba.completed_vote_value() {
let digest = Hash32::calculate(proposal);
abba.pre_vote_one(digest, sig.clone())?;
}
} else {
// The proposal is NOT c-delivered.
// Let's start binary agreement by voting 0,
abba.pre_vote_zero()?;
}
}

match bundle.module.as_ref() {
vcbc::MODULE_NAME => {
let vcbc = self.vcbc_map.get_mut(&sender).unwrap();
let msg = bincode::deserialize(&bundle.message).unwrap();
vcbc.receive_message(sender, msg).unwrap();
if delivered_count >= self.super_majority_num() {}
Ok(self.broadcaster.borrow_mut().take_outgoings())
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use super::Consensus;
use crate::mvba::{bundle::Outgoing, *};
use blsttc::SecretKeySet;
use rand::{thread_rng, Rng};

fn valid_proposal(_id: NodeId, _: &Proposal) -> bool {
true
}

struct TestNet {
cons: Vec<Consensus>,
buffer: Vec<Outgoing>,
}

impl TestNet {
pub fn new() -> Self {
let id = "test-id".to_string();
let mut rng = thread_rng();
//let (t, n) = (5, 7);
let (t, n) = (2, 4);
let sec_key_set = SecretKeySet::random(t, &mut rng);
let mut parties = Vec::new();
let mut cons = Vec::new();

for index in 0..n {
parties.push(index as usize)
}
abba::MODULE_NAME => {
let abba = self.abba_map.get_mut(&sender).unwrap();
let msg = bincode::deserialize(&bundle.message).unwrap();
abba.receive_message(sender, msg).unwrap();
if delivered_count >= self.super_majority_num() {}

for p in &parties {
let consensus = Consensus::init(
id.clone(),
*p,
sec_key_set.secret_key_share(p),
sec_key_set.public_keys(),
parties.clone(),
valid_proposal,
);

cons.push(consensus);
}
_ => {
//

Self {
cons,
buffer: Vec::new(),
}
}
// TODO:
// self.broadcaster.borrow_mut().take_bundles()
vec![]
}

fn super_majority_num(&self) -> usize {
self.vcbc_map.len() - self.threshold
#[test]
fn test_random() {
Comment on lines +254 to +255
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice to see some randomized testing, but can we modify this to be a quickcheck test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test is really useful to fix some issues. Despite its name, the random here means picking a random message from the message buffer. I don't think quickcheck can be helpful here, even I thought about it before.

let _ = env_logger::builder()
.is_test(true)
.filter_level(log::LevelFilter::Debug)
.try_init();

for test_id in 0..10 {
log::info!("--- starting test {test_id}");
let mut net = TestNet::new();
let mut rng = thread_rng();

for c in &mut net.cons {
let proposal = (0..4).map(|_| rng.gen_range(0..64)).collect();
let mut msgs = c.propose(proposal).unwrap();
net.buffer.append(&mut msgs);
}

loop {
let rand_index = rng.gen_range(0..net.buffer.len());
let rand_msg = &net.buffer.remove(rand_index);
let mut msgs = Vec::new();
log::debug!("random message: {:?}", rand_msg);

for c in &mut net.cons {
msgs.append(&mut match rand_msg {
Outgoing::Direct(id, bundle) => {
if id == &c.self_id {
c.process_bundle(bundle).unwrap()
} else {
Vec::new()
}
}
Outgoing::Gossip(bundle) => c.process_bundle(bundle).unwrap(),
});
}

net.buffer.append(&mut msgs);

let mut decisions = HashMap::new();
for c in &mut net.cons {
if c.decided_party.is_some() {
let value = c
.abba_map
.get(&c.decided_party.unwrap())
.unwrap()
.decided_value()
.unwrap();

println!(
"test {test_id} for consensus {} finished on proposal {} with {value}",
c.self_id,
c.decided_party.unwrap(),
);
decisions.insert(c.self_id, (c.decided_party.unwrap(), value));
}
}

if decisions.len() == net.cons.len() {
// check if all consensus results are equal:
// https://sts10.github.io/2019/06/06/is-all-equal-function.html
let first = decisions.iter().next().unwrap().1;
assert!(decisions.iter().all(|(_, item)| item == first));
break;
}
}
}
}
}
22 changes: 22 additions & 0 deletions src/mvba/error.rs
@@ -0,0 +1,22 @@
use core::fmt::Debug;
use thiserror::Error;

use super::{abba, mvba, vcbc};

#[derive(Error, Debug)]
pub enum Error {
#[error("encoding/decoding error {0:?}")]
Encoding(#[from] bincode::Error),
#[error("vcbc error {0:?}")]
Vcbc(#[from] vcbc::error::Error),
#[error("vcbc error {0:?}")]
Abba(#[from] abba::error::Error),
#[error("abba {0}")]
Mvba(#[from] mvba::error::Error),
#[error("mvba error {0}")]
InvalidMessage(String),
#[error("generic error {0}")]
Generic(String),
}

pub type Result<T> = std::result::Result<T, Error>;
15 changes: 12 additions & 3 deletions src/mvba/hash.rs
@@ -1,11 +1,10 @@
use std::fmt::Display;

use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display};
use thiserror::Error;

const HASH32_SIZE: usize = 32;

#[derive(Debug, Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash)]
#[derive(Clone, Copy, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd, Hash)]
pub struct Hash32([u8; HASH32_SIZE]);

#[derive(Error, Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -49,6 +48,16 @@ impl From<[u8; 32]> for Hash32 {
}
}

impl Debug for Hash32 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for index in 0..4 {
f.write_str(&format!("{:02x}", self.0[index]))?;
}

Ok(())
}
}

impl Display for Hash32 {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
for v in self.0 {
Expand Down
11 changes: 11 additions & 0 deletions src/mvba/message.rs
@@ -0,0 +1,11 @@
use super::{vcbc, NodeId};

#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]

/// VoteMessage definition.
/// This is same as `v-vote` message in spec
pub struct VoteMessage {
pub party: NodeId, // this is same as $a$ in spec
pub value: bool, // this is same as $0$ or $1$ in spec
pub proof: Option<vcbc::message::Message>, // this is same as $⊥$ or $ρ$ in spec
}
17 changes: 13 additions & 4 deletions src/mvba/mod.rs
@@ -1,12 +1,21 @@
pub mod consensus;

pub(crate) mod proposal;
pub mod error;
pub mod hash;

mod abba;
mod broadcaster;
mod bundle;
mod hash;
// TODO: remove me
#[allow(clippy::module_inception)]
mod mvba;
mod vcbc;

pub type NodeId = usize;
pub type ProposalChecker = fn(&proposal::Proposal) -> bool;

/// A proposed data with the proof inside. It is the same as $(w, π)$ in the spec.
pub type Proposal = Vec<u8>;

/// MessageValidity is same as &Q_{ID}$ ins spec: a global polynomial-time computable
/// predicate QID known to all parties, which is determined by an external application.
/// Each party may propose a value v together with a proof π that should satisfy QID .
pub type MessageValidity = fn(NodeId, &Proposal) -> bool;
54 changes: 0 additions & 54 deletions src/mvba/mvba/context.rs

This file was deleted.

20 changes: 0 additions & 20 deletions src/mvba/mvba/deliver.rs

This file was deleted.

37 changes: 0 additions & 37 deletions src/mvba/mvba/echo.rs

This file was deleted.

25 changes: 9 additions & 16 deletions src/mvba/mvba/error.rs
@@ -1,22 +1,15 @@
use crate::mvba::{proposal::Proposal, NodeId};
use thiserror::Error;

#[derive(Error, Debug, PartialEq)]
#[derive(Error, Debug)]
pub enum Error {
#[error("invalid proposer. expected {0:?}, get {1:?}")]
InvalidProposer(NodeId, NodeId),
#[error("invalid proposal: {0:?}")]
InvalidProposal(Proposal),
#[error("duplicated proposal: {0:?}")]
DuplicatedProposal(Proposal),
#[error("encoding/decoding error: {0}")]
Encoding(String),
}

impl From<bincode::Error> for Error {
fn from(err: bincode::Error) -> Self {
Error::Encoding(format!("{}", err))
}
#[error("encoding/decoding error {0:?}")]
Encoding(#[from] bincode::Error),
#[error("blsttc Error {0}")]
Blsttc(#[from] blsttc::error::Error),
#[error("invalid message {0}")]
InvalidMessage(String),
#[error("{0}")]
Generic(String),
}

pub type Result<T> = std::result::Result<T, Error>;
25 changes: 20 additions & 5 deletions src/mvba/mvba/message.rs
@@ -1,7 +1,22 @@
use crate::mvba::proposal::Proposal;
use blsttc::{Signature, SignatureShare};
use serde::{Deserialize, Serialize};

#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub(crate) enum Message {
Propose(Proposal),
Echo(Proposal),
use super::NodeId;
use crate::mvba::hash::Hash32;

/// VoteAction definition.
/// This is same as `v-vote` message in spec: (ID, v-vote, a, uj, ρj)
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Vote {
pub id: String, // this is same as $id$ in spec
pub proposer: NodeId, // this is same as $a$ in spec
pub value: bool, // this is same as $0$ or $1$ in spec
pub proof: Option<(Hash32, Signature)>, // this is same as $ρ$ in spec
}

#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct Message {
pub vote: Vote,
pub voter: NodeId,
pub signature: SignatureShare,
}
306 changes: 253 additions & 53 deletions src/mvba/mvba/mod.rs
@@ -1,75 +1,275 @@
pub(super) mod context;
pub(super) mod message;
pub(super) mod state;

mod deliver;
mod echo;
mod error;
mod propose;

use self::error::Result;
use self::message::Message;
use self::propose::ProposeState;
use self::state::State;
use super::{NodeId, ProposalChecker};
use crate::mvba::{broadcaster::Broadcaster, proposal::Proposal};

use std::cell::RefCell;
use std::rc::Rc;

pub(crate) const MODULE_NAME: &str = "vcbc";

// VCBC is a verifiably authenticatedly c-broadcast protocol.
// Each party $P_i$ c-broadcasts the value that it proposes to all other parties
// using verifiable authenticated consistent broadcast.
pub(crate) struct Vcbc {
ctx: context::Context,
state: Box<dyn State>,
pub(crate) mod error;
mod message;

use self::message::{Message, Vote};

use self::{error::Error, error::Result};
use super::vcbc;
use super::{hash::Hash32, Proposal};
use crate::mvba::{broadcaster::Broadcaster, NodeId};
use blsttc::{PublicKeySet, SecretKeyShare, Signature};
use std::{cell::RefCell, collections::HashMap, rc::Rc};

pub(crate) const MODULE_NAME: &str = "mvba";

pub struct Mvba {
id: String, // this is same as $ID$ in spec
i: NodeId, // this is same as $i$ in spec
l: usize, // this is same as $a$ in spec
v: Option<bool>, // this is same as $v$ in spec
proposals: HashMap<NodeId, (Proposal, Signature)>,
votes_per_proposer: HashMap<NodeId, HashMap<NodeId, Vote>>,
voted: bool,
pub_key_set: PublicKeySet,
sec_key_share: SecretKeyShare,
parties: Vec<NodeId>,
broadcaster: Rc<RefCell<Broadcaster>>,
}

impl Vcbc {
impl Mvba {
pub fn new(
number: usize,
threshold: usize,
proposer_id: NodeId,
id: String,
self_id: NodeId,
sec_key_share: SecretKeyShare,
pub_key_set: PublicKeySet,
parties: Vec<NodeId>,
broadcaster: Rc<RefCell<Broadcaster>>,
proposal_checker: ProposalChecker,
) -> Self {
Self {
ctx : context::Context::new(
number,
threshold,
proposer_id,
broadcaster,
proposal_checker,
),
state: Box::new(ProposeState),
id,
i: self_id,
l: 0,
v: None,
voted: false,
proposals: HashMap::new(),
votes_per_proposer: HashMap::new(),
pub_key_set,
sec_key_share,
parties,
broadcaster,
}
}

pub fn set_proposal(
&mut self,
proposer: NodeId,
proposal: Proposal,
signature: Signature,
) -> Result<()> {
debug_assert!(self.parties.contains(&proposer));

let digest = Hash32::calculate(&proposal);
let sign_bytes = vcbc::c_ready_bytes_to_sign(&self.id, &proposer, &digest).unwrap();
if !self.pub_key_set.public_key().verify(&signature, sign_bytes) {
return Err(Error::InvalidMessage(
"proposal with an invalid proof".to_string(),
));
}

self.proposals.insert(proposer, (proposal, signature));
self.vote()
}

pub fn move_to_next_proposal(&mut self) -> Result<bool> {
if self.l + 1 == self.parties.len() {
// no more proposal
return Ok(false);
}
self.l += 1;
self.v = None;
self.voted = false;

self.vote()?;
Ok(true)
}

pub fn current_proposer(&self) -> NodeId {
*self.parties.get(self.l).unwrap()
}

pub fn completed_vote(&self) -> Option<bool> {
self.v
}

pub fn completed_vote_value(&self) -> Option<&(Proposal, Signature)> {
self.proposals.get(&self.current_proposer())
}

fn check_message(&mut self, msg: &Message) -> Result<()> {
if msg.vote.id != self.id {
return Err(Error::InvalidMessage(format!(
"invalid ID. expected: {}, got {}",
self.id, msg.vote.id
)));
}

let sign_bytes = bincode::serialize(&msg.vote)?;
if !self
.pub_key_set
.public_key_share(msg.voter)
.verify(&msg.signature, sign_bytes)
{
return Err(Error::InvalidMessage("invalid signature".to_string()));
}

if !msg.vote.value && msg.vote.proof.is_some() {
return Err(Error::InvalidMessage("no vote with proof".to_string()));
}

if msg.vote.value && msg.vote.proof.is_none() {
return Err(Error::InvalidMessage("yes vote without proof".to_string()));
}

if let Some((digest, signature)) = &msg.vote.proof {
let sign_bytes =
vcbc::c_ready_bytes_to_sign(&self.id, &msg.vote.proposer, digest).unwrap();
if !self.pub_key_set.public_key().verify(signature, sign_bytes) {
return Err(Error::InvalidMessage(
"proposal with an invalid proof".to_string(),
));
}
};

Ok(())
}

// propose sets the proposal and broadcast propose message.
pub fn propose(&mut self, proposal: &Proposal) -> Result<()> {
debug_assert_eq!(proposal.proposer_id, self.ctx.proposer_id);
pub fn add_vote(&mut self, msg: &Message) -> Result<bool> {
let votes = self.must_get_proposer_votes(&msg.vote.proposer);
if let Some(exist) = votes.get(&msg.voter) {
if exist != &msg.vote {
return Err(Error::InvalidMessage(format!(
"double vote detected from {:?}",
msg.voter
)));
}
return Ok(false);
}

self.state.set_proposal(proposal, &mut self.ctx)?;
if let Some(s) = self.state.decide(&mut self.ctx)? {
self.state = s;
votes.insert(msg.voter, msg.vote.clone());

if msg.vote.value && !self.proposals.contains_key(&msg.vote.proposer) {
// If a v-vote from Pj indicates 1 but Pi has not yet received Pa ’s proposal,
// ignore the vote and ask Pj to supply Pa ’s proposal
// (by sending it the message (ID|vcbc.a.0, c-request)).
let data = vcbc::make_c_request_message(&self.id, msg.vote.proposer)?;

self.broadcaster.borrow_mut().send_to(
vcbc::MODULE_NAME,
Some(msg.vote.proposer),
data,
msg.voter,
);

Ok(false)
} else {
Ok(true)
}
}

/// receive_message process the received message 'msg`
pub fn receive_message(&mut self, msg: Message) -> Result<()> {
log::debug!("received message: {:?}", msg);

self.check_message(&msg)?;
if !self.add_vote(&msg)? {
return Ok(());
}

// Message is for another proposal, not current one
// TODO: test me!
if msg.vote.proposer != self.current_proposer() {
return Ok(());
}

// wait for n − t messages (v-echo, wj , πj ) to be c-delivered with tag ID|vcbc.j.0
//from distinct Pj such that QID (wj , πj ) holds
let threshold = self.threshold();
if self.proposals.len() >= threshold && self.v.is_none() {
// wait for n − t messages (ID, v-vote, a, uj , ρj ) from distinct Pj such
// that VID|a (uj , ρj) holds
let votes = self.must_get_proposer_votes(&msg.vote.proposer);
if votes.len() >= threshold {
if votes.values().any(|v| v.value) {
// if there is some uj = 1 then
// v ← 1; ρ ← ρj
self.v = Some(true);
} else {
// else
// v ← 0; ρ ← ⊥
self.v = Some(false);
}
}
}

Ok(())
}

pub fn process_message(&mut self, sender: &NodeId, message: &[u8]) -> Result<()> {
let msg: Message = bincode::deserialize(message)?;
// TODO: make me better, no unwrap?
fn must_get_proposer_votes(&mut self, proposer: &NodeId) -> &mut HashMap<NodeId, Vote> {
if !self.votes_per_proposer.contains_key(proposer) {
self.votes_per_proposer.insert(*proposer, HashMap::new());
}
self.votes_per_proposer.get_mut(proposer).unwrap()
}
fn vote(&mut self) -> Result<()> {
// wait for n − t messages (v-echo, wj , πj ) to be c-delivered with tag ID|vcbc.j.0
//from distinct Pj such that QID (wj , πj ) holds
if self.proposals.len() >= self.threshold() && !self.voted {
let a = self.current_proposer();
let vote = match self.proposals.get(&a) {
None => {
// if wa = ⊥ then
// send the message (ID, v-vote, a, 0, ⊥) to all parties
Vote {
id: self.id.clone(),
proposer: a,
value: false,
proof: None,
}
}
Some((proposal, signature)) => {
// else
// let ρ be the message that completes the c-broadcast with tag ID|vcbc.a.0
// send the message (ID, v-vote, a, 1, ρ) to all parties
let digest = Hash32::calculate(proposal);
Vote {
id: self.id.clone(),
proposer: a,
value: true,
proof: Some((digest, signature.clone())),
}
}
};

self.state.process_message(sender, &msg, &mut self.ctx)?;
if let Some(s) = self.state.decide(&mut self.ctx)? {
self.state = s;
self.broadcast(vote)?;
self.voted = true;
}

Ok(())
}

// broadcast sends the message `msg` to all other peers in the network.
// It adds the message to our messages log.
fn broadcast(&mut self, vote: Vote) -> Result<()> {
let sign_bytes = bincode::serialize(&vote)?;
let sig = self.sec_key_share.sign(sign_bytes);
let msg = Message {
vote,
voter: self.i,
signature: sig,
};
let data = bincode::serialize(&msg)?;
self.broadcaster
.borrow_mut()
.broadcast(MODULE_NAME, None, data);
self.receive_message(msg)?;
Ok(())
}

pub fn is_delivered(&self) -> bool {
self.ctx.delivered
// threshold return the threshold of the public key set.
// It SHOULD be `n-t` according to the spec
fn threshold(&self) -> usize {
self.pub_key_set.threshold() + 1
}
}

Expand Down
32 changes: 0 additions & 32 deletions src/mvba/mvba/propose.rs

This file was deleted.

65 changes: 0 additions & 65 deletions src/mvba/mvba/state.rs

This file was deleted.

354 changes: 237 additions & 117 deletions src/mvba/mvba/tests.rs

Large diffs are not rendered by default.

16 changes: 0 additions & 16 deletions src/mvba/proposal.rs

This file was deleted.

2 changes: 2 additions & 0 deletions src/mvba/vcbc/error.rs
Expand Up @@ -12,6 +12,8 @@ pub enum Error {
InvalidHashLength(#[from] hash::InvalidLength),
#[error("duplicated message {0} from {1:?}")]
DuplicatedMessage(NodeId, String),
#[error("invalid message {0}")]
InvalidMessage(String),
#[error("generic error {0}")]
Generic(String),
}
Expand Down
9 changes: 6 additions & 3 deletions src/mvba/vcbc/message.rs
@@ -1,4 +1,4 @@
use crate::mvba::hash::Hash32;
use crate::mvba::{hash::Hash32, Proposal};
use blsttc::{Signature, SignatureShare};

#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
Expand All @@ -20,10 +20,11 @@ impl Tag {

#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
pub enum Action {
// TODO: use serde trait
Send(Vec<u8>), // this is same as $c-send$ in spec
Send(Proposal), // this is same as $c-send$ in spec
Ready(Hash32, SignatureShare), // this is same as $c-ready$ in spec
Final(Hash32, Signature), // this is same as $c-final$ in spec
Request, // this is same as $c-request$ in spec
Answer(Proposal, Signature), // this is same as $c-answer$ in spec
}

#[derive(Debug, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
Expand All @@ -38,6 +39,8 @@ impl Message {
Action::Send(_) => "c-send",
Action::Ready(_, _) => "c-ready",
Action::Final(_, _) => "c-final",
Action::Request => "c-request",
Action::Answer(_, _) => "c-answer",
}
}
}
187 changes: 132 additions & 55 deletions src/mvba/vcbc/mod.rs
@@ -1,42 +1,62 @@
mod error;
pub(crate) mod message;
pub(crate) mod error;
pub mod message;

use self::error::{Error, Result};
use self::message::{Action, Message, Tag};
use super::hash::Hash32;
use super::NodeId;
use crate::mvba::broadcaster::Broadcaster;
use blsttc::{PublicKeySet, SecretKeyShare, Signature, SignatureShare};
use log::warn;
use std::cell::RefCell;
use std::collections::hash_map::Entry::Vacant;
use std::collections::HashMap;
use std::rc::Rc;

use blsttc::{PublicKeySet, SecretKeyShare, Signature, SignatureShare};

use self::error::{Error, Result};
use self::message::{Action, Message, Tag};
use super::hash::Hash32;
use super::{MessageValidity, NodeId, Proposal};
use crate::mvba::broadcaster::Broadcaster;

pub(crate) const MODULE_NAME: &str = "vcbc";

// make_c_request_message creates the payload message to request a proposal
// from the the proposer
pub fn make_c_request_message(
id: &str,
proposer: NodeId,
) -> std::result::Result<Vec<u8>, bincode::Error> {
let msg = Message {
tag: Tag {
id: id.to_string(),
j: proposer,
s: 0,
},
action: Action::Request,
};
bincode::serialize(&msg)
}

// c_ready_bytes_to_sign generates bytes that should be signed by each party
// as a wittiness of receiving the message.
// c_ready_bytes_to_sign is same as serialized of $(ID.j.s, c-ready, H(m))$ in spec.
// c_ready_bytes_to_sign is same as serialized of $(ID.j.s, c-ready, H(m))$ in spec
pub fn c_ready_bytes_to_sign(
tag: &Tag,
digest: Hash32,
id: &str,
proposer: &NodeId,
digest: &Hash32,
) -> std::result::Result<Vec<u8>, bincode::Error> {
bincode::serialize(&(tag, "c-ready", digest))
bincode::serialize(&(id, proposer, 0, "c-ready", digest))
}

// Protocol VCBC for verifiable and authenticated consistent broadcast.
pub(crate) struct Vcbc {
tag: Tag, // this is same as $Tag$ in spec
i: NodeId, // this is same as $i$ in spec
m_bar: Option<Vec<u8>>, // this is same as $\bar{m}$ in spec
m_bar: Option<Proposal>, // this is same as $\bar{m}$ in spec
u_bar: Option<Signature>, // this is same as $\bar{\mu}$ in spec
wd: HashMap<NodeId, SignatureShare>, // this is same as $W_d$ in spec
rd: usize, // this is same as $r_d$ in spec
d: Option<Hash32>, // Memorizing the message digest
pub_key_set: PublicKeySet,
sec_key_share: SecretKeyShare,
final_messages: HashMap<NodeId, Message>,
message_validity: MessageValidity,
broadcaster: Rc<RefCell<Broadcaster>>,
}

Expand All @@ -56,17 +76,21 @@ fn try_insert(map: &mut HashMap<NodeId, Message>, k: NodeId, v: Message) -> Resu

impl Vcbc {
pub fn new(
i: NodeId,
tag: Tag,
id: String,
self_id: NodeId,
proposer: NodeId,
pub_key_set: PublicKeySet,
sec_key_share: SecretKeyShare,
message_validity: MessageValidity,
broadcaster: Rc<RefCell<Broadcaster>>,
) -> Self {
debug_assert_eq!(i, broadcaster.borrow().self_id());
debug_assert_eq!(self_id, broadcaster.borrow().self_id());

let tag = Tag::new(&id, proposer, 0);

Self {
i,
tag,
i: self_id,
m_bar: None,
u_bar: None,
wd: HashMap::new(),
Expand All @@ -75,16 +99,14 @@ impl Vcbc {
final_messages: HashMap::new(),
pub_key_set,
sec_key_share,
message_validity,
broadcaster,
}
}

// TODO: remove me
#[allow(dead_code)]

// c_broadcast sends the messages `m` to all other parties.
// It also adds the message to message_log and process it.
pub fn c_broadcast(&mut self, m: Vec<u8>) -> Result<()> {
/// c_broadcast sends the messages `m` to all other parties.
/// It also adds the message to message_log and process it.
pub fn c_broadcast(&mut self, m: Proposal) -> Result<()> {
debug_assert_eq!(self.i, self.tag.j);

// Upon receiving message (ID.j.s, in, c-broadcast, m):
Expand All @@ -96,32 +118,38 @@ impl Vcbc {
self.broadcast(send_msg)
}

// receive_message process the received message 'msg` from `sender`
pub fn receive_message(&mut self, sender: NodeId, msg: Message) -> Result<()> {
if msg.tag != self.tag {
log::trace!("invalid tag, ignoring message.: {:?}. ", msg);
return Ok(());
}

/// receive_message process the received message 'msg` from `initiator`
pub fn receive_message(&mut self, initiator: NodeId, msg: Message) -> Result<()> {
log::debug!(
"received {} message: {:?} from {}",
msg.action_str(),
msg,
sender
initiator
);

if msg.tag != self.tag {
return Err(Error::InvalidMessage(format!(
"invalid tag. expected {:?}, got {:?}",
self.tag, msg.tag
)));
}

match msg.action.clone() {
Action::Send(m) => {
// Upon receiving message (ID.j.s, c-send, m) from Pl:
// if j = l and m̄ = ⊥ then
if sender == self.tag.j && self.m_bar.is_none() {
if initiator == self.tag.j && self.m_bar.is_none() {
if !(self.message_validity)(initiator, &m) {
return Err(Error::InvalidMessage("invalid proposal".to_string()));
}
// m̄ ← m
self.m_bar = Some(m.clone());

let d = Hash32::calculate(&m);
self.d = Some(d);

// compute an S1-signature share ν on (ID.j.s, c-ready, H(m))
let sign_bytes = c_ready_bytes_to_sign(&self.tag, d)?;
let sign_bytes = c_ready_bytes_to_sign(&self.tag.id, &self.tag.j, &d)?;
let s1 = self.sec_key_share.sign(sign_bytes);

let ready_msg = Message {
Expand All @@ -138,25 +166,22 @@ impl Vcbc {
Some(d) => d,
None => return Err(Error::Generic("protocol violated. no digest".to_string())),
};
let sign_bytes = c_ready_bytes_to_sign(&self.tag, d)?;
let sign_bytes = c_ready_bytes_to_sign(&self.tag.id, &self.tag.j, &d)?;

if d != msg_d {
warn!(
"c-ready has unknown digest. expected {:?}, got {:?}",
d, msg_d
);
log::warn!("c-ready has unknown digest. expected {d:?}, got {msg_d:?}");
return Err(Error::Generic("Invalid digest".to_string()));
}

// Upon receiving message (ID.j.s, c-ready, d, νl) from Pl for the first time:
if let Vacant(e) = self.wd.entry(sender) {
if let Vacant(e) = self.wd.entry(initiator) {
let valid_sig = self
.pub_key_set
.public_key_share(sender)
.verify(&sig_share, &sign_bytes);
.public_key_share(initiator)
.verify(&sig_share, sign_bytes);

if !valid_sig {
warn!("c-ready has has invalid signature share");
log::warn!("c-ready has has invalid signature share");
}

// if i = j and νl is a valid S1-signature share then
Expand Down Expand Up @@ -189,17 +214,23 @@ impl Vcbc {
let d = match self.d {
Some(d) => d,
None => {
warn!("received c-final before receiving s-send, logging message");
try_insert(&mut self.final_messages, sender, msg)?;
log::warn!("received c-final before receiving c-send, logging message");
try_insert(&mut self.final_messages, initiator, msg)?;
// requesting for the proposal
let request_msg = Message {
tag: self.tag.clone(),
action: Action::Request,
};
self.send_to(request_msg, initiator)?;

return Ok(());
}
};

let sign_bytes = c_ready_bytes_to_sign(&self.tag, d)?;
let sign_bytes = c_ready_bytes_to_sign(&self.tag.id, &self.tag.j, &d)?;
let valid_sig = self.pub_key_set.public_key().verify(&sig, sign_bytes);

if !valid_sig {
warn!("c-ready has has invalid signature share");
log::warn!("c-ready has has invalid signature share");
}

// if H(m̄) = d and µ̄ = ⊥ and µ is a valid S1-signature then
Expand All @@ -208,10 +239,43 @@ impl Vcbc {
self.u_bar = Some(sig);
}
}
Action::Request => {
// Upon receiving message (ID.j.s, c-request) from Pl :
if let Some(u) = &self.u_bar {
// if µ̄ != ⊥ then

// proposal is known because we have the valid signature
debug_assert!(self.m_bar.is_some());
if let Some(m) = &self.m_bar {
let answer_msg = Message {
tag: self.tag.clone(),
action: Action::Answer(m.clone(), u.clone()),
};

// send (ID.j.s, c-answer, m̄, µ̄) to Pl
self.send_to(answer_msg, initiator)?;
}
}
}
Action::Answer(m, u) => {
// Upon receiving message (ID.j.s, c-answer, m, µ) from Pl :
if self.u_bar.is_none() {
// if µ̄ = ⊥ and ...
let d = Hash32::calculate(&m);
let sign_bytes = c_ready_bytes_to_sign(&self.tag.id, &self.tag.j, &d)?;
if self.pub_key_set.public_key().verify(&u, sign_bytes) {
// ... µ is a valid S1 -signature on (ID.j.s, c-ready, H(m)) then
// µ̄ ← µ
// m̄ ← m
self.u_bar = Some(u);
self.m_bar = Some(m);
}
}
}
}

for (sender, final_msg) in std::mem::take(&mut self.final_messages) {
self.receive_message(sender, final_msg)?;
for (initiator, final_msg) in std::mem::take(&mut self.final_messages) {
self.receive_message(initiator, final_msg)?;
}

Ok(())
Expand All @@ -221,8 +285,17 @@ impl Vcbc {
self.u_bar.is_some()
}

#[allow(dead_code)]
pub fn read_delivered(&self) -> Option<(Vec<u8>, Signature)> {
pub fn delivered_message(&self) -> (Proposal, Signature) {
debug_assert!(self.u_bar.is_some(), "message should be delivered");

(
self.m_bar.as_ref().unwrap().clone(),
self.u_bar.as_ref().unwrap().clone(),
)
}

#[cfg(test)]
fn read_delivered(&self) -> Option<(Vec<u8>, Signature)> {
if let (Some(m), Some(u)) = (&self.m_bar, &self.u_bar) {
Some((m.clone(), u.clone()))
} else {
Expand All @@ -233,11 +306,13 @@ impl Vcbc {
// send_to sends the message `msg` to the corresponding peer `to`.
// If the `to` is us, it adds the message to our messages log.
fn send_to(&mut self, msg: self::Message, to: NodeId) -> Result<()> {
let data = bincode::serialize(&msg).unwrap();
let data = bincode::serialize(&msg)?;
if to == self.i {
self.receive_message(self.i, msg)?;
} else {
self.broadcaster.borrow_mut().send_to(MODULE_NAME, data, to);
self.broadcaster
.borrow_mut()
.send_to(MODULE_NAME, Some(self.tag.j), data, to);
}
Ok(())
}
Expand All @@ -246,7 +321,9 @@ impl Vcbc {
// It adds the message to our messages log.
fn broadcast(&mut self, msg: self::Message) -> Result<()> {
let data = bincode::serialize(&msg)?;
self.broadcaster.borrow_mut().broadcast(MODULE_NAME, data);
self.broadcaster
.borrow_mut()
.broadcast(MODULE_NAME, Some(self.i), data);
self.receive_message(self.i, msg)?;
Ok(())
}
Expand Down
219 changes: 145 additions & 74 deletions src/mvba/vcbc/tests.rs

Large diffs are not rendered by default.