Skip to content
Permalink
Browse files

feat\parsec: refactor tie-breaking rules when compute consensus

We spotted an issue within the current tie-breaking rules, with
previously a routing test and now a new created test having nodes
vote for the same payload in single consensus mode.

We know that we want the following properties:
* all nodes vote for payload X before any node votes for payload
Y, X should be polled first
* everything else being equal, we had rather have payloads that have
earlier votes come out first
* everything else being equal, we had rather have payloads that had
more votes come out first

To achieve above, the tie-breaking rules in pseudo-code shall be:
- sort by smaller index of this content by this node
- then with larger count
- then with lexicographical order

The commit also contains a new integration test which make nodes
vote for the same payload in single mode. This test fails with the
previous ordering rules, and will pass with this fix.
  • Loading branch information...
maqi committed Aug 15, 2019
1 parent 96dc0d6 commit 33101fce52cad4cfad809499060d8e4133612dcb
Showing with 81 additions and 65 deletions.
  1. +14 −5 src/dev_utils/network.rs
  2. +26 −4 src/dev_utils/schedule.rs
  3. +27 −56 src/parsec.rs
  4. +14 −0 tests/integration_tests.rs
@@ -49,7 +49,7 @@ pub struct Network {
#[derive(Debug)]
pub struct BlocksOrder {
peer: PeerId,
order: Vec<Observation>,
order: Vec<(Observation, Option<PeerId>)>,
}

pub struct DifferingBlocksOrder {
@@ -173,11 +173,11 @@ impl Network {
Err(ConsensusError::DifferingBlocksOrder(DifferingBlocksOrder {
order_1: BlocksOrder {
peer: first_peer.id().clone(),
order: payloads.into_iter().cloned().collect(),
order: self.block_keys(&first_peer),
},
order_2: BlocksOrder {
peer: peer.id().clone(),
order: peer.blocks_payloads().into_iter().cloned().collect(),
order: self.block_keys(&peer),
},
}))
} else {
@@ -306,11 +306,11 @@ impl Network {
return Err(ConsensusError::DifferingBlocksOrder(DifferingBlocksOrder {
order_1: BlocksOrder {
peer: peer.id().clone(),
order: peer.blocks_payloads().into_iter().cloned().collect(),
order: self.block_keys(peer),
},
order_2: BlocksOrder {
peer: old_peer.id().clone(),
order: old_peer.blocks_payloads().into_iter().cloned().collect(),
order: self.block_keys(&old_peer),
},
}));
}
@@ -320,6 +320,15 @@ impl Network {
Ok(())
}

fn block_keys(&self, peer: &Peer) -> Vec<(Observation, Option<PeerId>)> {
peer.blocks()
.map(|block| {
let (obs, opt_peer_id) = self.block_key(block);
(obs.clone(), opt_peer_id.cloned())
})
.collect()
}

fn block_key<'a>(
&self,
block: &'a Block<Transaction, PeerId>,
@@ -280,6 +280,8 @@ pub struct ScheduleOptions {
pub intermediate_consistency_checks: bool,
/// The only genesis members that will compute consensus if provided. All if none.
pub genesis_restrict_consensus_to: Option<BTreeSet<PeerId>>,
/// Allows for voting for the same OpaquePayload. This applies only when `ConsensusMode::Single`
pub vote_for_same: bool,
}

impl ScheduleOptions {
@@ -337,6 +339,7 @@ impl Default for ScheduleOptions {
transparent_voters: Sampling::Fraction(1.0, 1.0),
intermediate_consistency_checks: true,
genesis_restrict_consensus_to: None,
vote_for_same: false,
}
}
}
@@ -574,9 +577,16 @@ impl Schedule {
) -> Schedule {
let mut pending = PendingObservations::new(options);

let observation_multiplier =
if options.vote_for_same && ConsensusMode::Single == env.network.consensus_mode() {
options.genesis_size
} else {
1
};
// the +1 below is to account for genesis
let max_observations =
obs_schedule.count_observations() + obs_schedule.count_expected_accusations() + 1;
let max_observations = obs_schedule.count_observations() * observation_multiplier
+ obs_schedule.count_expected_accusations()
+ 1;

let mut peers = PeerStatuses::new(&obs_schedule.genesis.all_ids());
let mut added_peers: BTreeSet<_> = peers.all_peers().cloned().collect();
@@ -590,7 +600,13 @@ impl Schedule {
if options.votes_before_gossip {
let opaque_transactions = obs_schedule.extract_opaque();
let sampling = match env.network.consensus_mode() {
ConsensusMode::Single => Sampling::Constant(1),
ConsensusMode::Single => {
if options.vote_for_same {
Sampling::Constant(peers.all_peers().count())
} else {
Sampling::Constant(1)
}
}
ConsensusMode::Supermajority => options.opaque_voters,
};

@@ -684,7 +700,13 @@ impl Schedule {
ObservationEvent::Opaque(payload) => {
let observation = ParsecObservation::OpaquePayload(payload);
let sampling = match env.network.consensus_mode() {
ConsensusMode::Single => Sampling::Constant(1),
ConsensusMode::Single => {
if options.vote_for_same {
Sampling::Constant(peers.all_peers().count())
} else {
Sampling::Constant(1)
}
}
ConsensusMode::Supermajority => options.opaque_voters,
};

@@ -40,7 +40,6 @@ use crate::{
hash::Hash,
mock::{PeerId, Transaction},
};
use fnv::FnvHashSet;
use itertools::Itertools;
#[cfg(any(test, feature = "testing"))]
use std::ops::{Deref, DerefMut};
@@ -1438,65 +1437,37 @@ impl<T: NetworkEvent, S: SecretId> Parsec<T, S> {
where
I: IntoIterator<Item = (PeerIndex, bool)>,
{
let mut payload_iters = decided_meta_votes
let payloads = decided_meta_votes
.into_iter()
.filter(|(_, decision)| *decision)
.filter_map(|(peer_index, _)| self.meta_election.interesting_content_by(peer_index))
.map(|payload_keys| payload_keys.iter())
.collect_vec();

let mut all_payloads_in_order = Vec::new();
let mut all_payloads_lookup: FnvHashSet<&ObservationKey> = FnvHashSet::default();

// First, process the first payloads from each decided meta events and append the ordered result to
// the end of `all_payloads_in_order`. Then, skipping any payload already in `all_payloads_in_order`/
// `all_payloads_lookup`, process the next payloads from each decided meta events similarly.
while let Some(payloads_counts) = self
.next_payload_batch_for_consensus_with_count(&mut payload_iters, &all_payloads_lookup)
{
let new_payloads = self.sort_payload_batch_for_consensus(&payloads_counts);
all_payloads_in_order.extend(new_payloads.iter().cloned());
all_payloads_lookup.extend(new_payloads.iter());
}

all_payloads_in_order
}

// Iterate the payload iterators skipping processed payloads and accumulate this batch of payload count.
fn next_payload_batch_for_consensus_with_count<'a>(
&self,
payload_iters: &mut [impl Iterator<Item = &'a ObservationKey>],
processed_payloads: &FnvHashSet<&ObservationKey>,
) -> Option<BTreeMap<&'a ObservationKey, i32>> {
let payloads_counts = payload_iters
.iter_mut()
.filter_map(|iter| iter.find(|key| !processed_payloads.contains(key)))
.fold(BTreeMap::new(), |mut map, payload_key| {
*map.entry(payload_key).or_insert(0) += 1;
.flat_map(|(peer_index, decision)| {
if decision {
match self.meta_election.interesting_content_by(peer_index) {
Some(content) => content.iter().enumerate().collect_vec(),
None => Vec::new(),
}
} else {
Vec::new()
}
})
.fold(BTreeMap::new(), |mut map, (idx, payload_key)| {
let (count, min_index) = map.entry(payload_key.clone()).or_insert((0, idx));
*count += 1;
*min_index = std::cmp::min(*min_index, idx);
map
});
if payloads_counts.is_empty() {
None
} else {
Some(payloads_counts)
}
}

// Sort the payload batch by count, and then a consistent tie breaker.
fn sort_payload_batch_for_consensus<'a>(
&self,
payloads_counts: &BTreeMap<&'a ObservationKey, i32>,
) -> Vec<&'a ObservationKey> {
payloads_counts
.iter()
.sorted_by(|(lhs_key, lhs_count), (rhs_key, rhs_count)| {
lhs_count
.cmp(rhs_count)
.reverse()
.then_with(|| lhs_key.consistent_cmp(rhs_key, &self.peer_list))
})
.map(|(&key, _)| key)
.collect_vec()
payloads
.into_iter()
.sorted_by(
|(lhs_key, (lhs_count, lhs_min_index)), (rhs_key, (rhs_count, rhs_min_index))| {
lhs_min_index
.cmp(rhs_min_index)
.then_with(|| lhs_count.cmp(rhs_count).reverse())
.then_with(|| lhs_key.consistent_cmp(rhs_key, &self.peer_list))
},
)
.map(|(key, _)| key)
.collect()
}

fn create_blocks(&self, payload_keys: &[ObservationKey]) -> Result<BlockGroup<T, S::PublicId>> {
@@ -641,3 +641,17 @@ proptest! {
assert!(result.is_ok(), "{:?}", result);
}
}

#[test]
fn consensus_mode_single_all_vote_same() {
let mut env = Environment::with_consensus_mode(SEED, ConsensusMode::Single);
let options = ScheduleOptions {
genesis_size: 8,
opaque_to_add: 2,
vote_for_same: true,
..Default::default()
};
let schedule = Schedule::new(&mut env, &options);

unwrap!(env.execute_schedule(schedule));
}

0 comments on commit 33101fc

Please sign in to comment.
You can’t perform that action at this time.