Skip to content
Permalink
Browse files

Stabilized causal graph assembly main logic

  • Loading branch information...
octavonce committed Mar 31, 2019
1 parent ff9c3d0 commit 011240a2adbc38a3c7b82d96c31cd563b2ea1bfa
@@ -1,4 +1,5 @@
/target/
**/*.rs.bk

.DS_Store
.DS_Store
.vscode
@@ -20,30 +20,41 @@ use events::Event;
use crypto::Hash;
use graphlib::{Graph, VertexId};
use std::sync::Arc;
use hashbrown::HashMap;
use std::collections::VecDeque;
use hashbrown::{HashMap, HashSet};

#[derive(Clone, Debug)]
pub struct CausalGraph {
/// Graph structure holding the causal graph
pub graph: Graph<Arc<Event>>,

/// Events that do not yet directly follow
/// other events from the causal graph and
/// the number of events that have been
/// added and follow another event since
/// the pending event has been pushed.
pending: HashMap<Arc<Event>, usize>,
/// other events from the causal graph.
pending: HashSet<VertexId>,

/// A set of events that follow other events
/// but do not have any follower.
ends: HashSet<VertexId>,

/// Mapping between event hashes and vertex ids.
lookup_table: HashMap<Hash, VertexId>,
}

impl CausalGraph {
pub fn new() -> CausalGraph {
pub fn new(root_event: Arc<Event>) -> CausalGraph {
let mut graph = Graph::new();
let mut lookup_table = HashMap::new();
let mut ends = HashSet::new();
let id = graph.add_vertex(root_event.clone());

lookup_table.insert(root_event.hash().unwrap(), id.clone());
ends.insert(id);

CausalGraph {
graph: Graph::new(),
pending: HashMap::new(),
lookup_table: HashMap::new()
graph,
ends,
lookup_table,
pending: HashSet::new(),
}
}

@@ -62,11 +73,63 @@ impl CausalGraph {
false
}

pub fn add_vertex(&mut self, event: Arc<Event>) -> VertexId {
let id = self.graph.add_vertex(event.clone());
self.lookup_table.insert(event.hash().unwrap(), id.clone());
pub fn contains(&self, event: Arc<Event>) -> bool {
self.lookup_table.get(&event.hash().unwrap()).is_some()
}

pub fn push(&mut self, event: Arc<Event>) {
if event.parent_hash().is_none() {
panic!("Pushing an event without a parent hash is illegal!");
}

id
if !self.contains(event.clone()) {
let id = self.graph.add_vertex(event.clone());
self.lookup_table.insert(event.hash().unwrap(), id.clone());
self.pending.insert(id);

let mut ends: VecDeque<VertexId> = self.ends.iter().cloned().collect();

// Loop graph ends and for each one, try to
// attach a pending event until either the
// pending set is empty or until we have
// traversed each end vertex.
loop {
if self.pending.is_empty() {
return;
}

if let Some(current_end_id) = ends.pop_back() {
let current_end = self.graph.fetch(&current_end_id).unwrap();
let mut to_remove = Vec::with_capacity(self.pending.len());
let mut to_add = Vec::with_capacity(self.pending.len());

for e in self.pending.iter() {
let current = self.graph.fetch(e).unwrap();

// Add edge if matching child is found
if current.parent_hash() == current_end.hash() {
to_remove.push(e.clone());
self.ends.insert(e.clone());
self.ends.remove(&current_end_id);
to_add.push((current_end_id, e.clone()));
ends.push_front(*e);
}
}

for e in to_remove.iter() {
self.pending.remove(e);
}

for e in to_add.iter() {
self.graph.add_edge(&e.0, &e.1).unwrap();
}
} else {
return;
}
}
} else {
panic!("Cannot push an already contained event!");
}
}

/// Returns true if the second event happened exactly after the first event.
@@ -89,37 +152,48 @@ impl CausalGraph {

#[cfg(test)]
mod tests {
#[macro_use] use quickcheck::*;
use super::*;
use crypto::{Identity, Hash};
use causality::Stamp;
use rand::*;
use network::NodeId;

#[test]
fn is_direct_follower() {
let i = Identity::new();
let n = NodeId(*i.pkey());
let A_hash = Hash::random();
let B_hash = Hash::random();
let C_hash = Hash::random();
let A = Arc::new(Event::Dummy(n.clone(), A_hash.clone(), None, Stamp::seed()));
let B = Arc::new(Event::Dummy(n.clone(), B_hash.clone(), Some(A_hash), Stamp::seed()));
let C = Arc::new(Event::Dummy(n.clone(), C_hash.clone(), Some(B_hash), Stamp::seed()));
let D = Arc::new(Event::Dummy(n.clone(), Hash::random(), Some(C_hash), Stamp::seed()));
let mut cg = CausalGraph::new();

let A_id = cg.add_vertex(A.clone());
let B_id = cg.add_vertex(B.clone());
let C_id = cg.add_vertex(C.clone());
let D_id = cg.add_vertex(D.clone());

cg.graph.add_edge(&A_id, &B_id).unwrap();
cg.graph.add_edge(&B_id, &C_id).unwrap();

assert!(cg.is_direct_follower(B.clone(), A.clone()));
assert!(cg.is_direct_follower(C.clone(), B.clone()));
assert!(!cg.is_direct_follower(A.clone(), B.clone()));
assert!(!cg.is_direct_follower(A.clone(), C.clone()));
assert!(!cg.is_direct_follower(D, A.clone()));
assert!(!cg.is_direct_follower(C, A));
quickcheck! {
fn is_direct_follower() -> bool {
let i = Identity::new();
let n = NodeId(*i.pkey());
let A_hash = Hash::random();
let B_hash = Hash::random();
let C_hash = Hash::random();
let A = Arc::new(Event::Dummy(n.clone(), A_hash.clone(), None, Stamp::seed()));
let B = Arc::new(Event::Dummy(n.clone(), B_hash.clone(), Some(A_hash), Stamp::seed()));
let C = Arc::new(Event::Dummy(n.clone(), C_hash.clone(), Some(B_hash), Stamp::seed()));
let D = Arc::new(Event::Dummy(n.clone(), Hash::random(), Some(C_hash), Stamp::seed()));
let mut cg = CausalGraph::new(A.clone());

let mut events = vec![B.clone(), C.clone(), D.clone()];

// The causal graph should be the same regardless
// of the order in which the events are pushed.
thread_rng().shuffle(&mut events);

for e in events {
println!("DEBUG PUSHED: {:?}", e);
cg.push(e);
}

assert!(cg.is_direct_follower(B.clone(), A.clone()));
assert!(cg.is_direct_follower(C.clone(), B.clone()));
assert!(cg.is_direct_follower(D.clone(), C.clone()));
assert!(!cg.is_direct_follower(A.clone(), B.clone()));
assert!(!cg.is_direct_follower(A.clone(), C.clone()));
assert!(!cg.is_direct_follower(D, A.clone()));
assert!(!cg.is_direct_follower(C, A));

println!("SUCCESS!");

true
}
}
}
@@ -50,15 +50,10 @@ pub struct ConsensusMachine {
validators: Vec<Arc<Mutex<ValidatorState>>>,
}

enum Direction {
Incoming,
Outgoing
}

impl ConsensusMachine {
pub fn new() -> ConsensusMachine {
pub fn new(root_event: Arc<Event>) -> ConsensusMachine {
ConsensusMachine {
causal_graph: Arc::new(RwLock::new(CausalGraph::new())),
causal_graph: Arc::new(RwLock::new(CausalGraph::new(root_event))),
candidate_sets: Vec::new(),
validators: Vec::new(),
}
@@ -119,7 +114,7 @@ impl ConsensusMachine {
// event, we just add an edge between pushed and the current.
let start_events: VecDeque<&VertexId> = g.graph.roots().collect();

let (edges_to_add, edges_to_remove, _) = tail_recurse((vec![], HashSet::new(), start_events), |(mut edges_to_add, mut edges_to_remove, mut events): (Vec<(VertexId, VertexId)>, HashSet<(VertexId, VertexId)>, VecDeque<&VertexId>)| {
let (edges_to_add, edges_to_remove, _) = tail_recurse((vec![], HashSet::new(), start_events), |(mut edges_to_add, mut edges_to_remove, mut events): (Vec<(VertexId, VertexId)>, HashSet<(VertexId, VertexId)>, VecDeque<&VertexId>)| {
let edge_count = edges_to_add.len() + g.graph.edge_count();
let front = events.pop_front();

@@ -427,7 +422,6 @@ mod tests {
let F = Event::Dummy(n2.clone(), F_hash, Some(E_hash), s_b.clone());

let events = vec![
A,
B,
C,
D,
@@ -454,7 +448,7 @@ mod tests {
// of the order in which the events are pushed.
thread_rng().shuffle(&mut events);

let mut machine = ConsensusMachine::new();
let mut machine = ConsensusMachine::new(A.clone());

for e in events {
machine.push(e).unwrap();
@@ -529,7 +523,6 @@ mod tests {
let F = Event::Dummy(n2.clone(), F_hash, Some(E_hash), s_b.clone());

let events = vec![
A,
B,
C,
D,
@@ -547,13 +540,14 @@ mod tests {
.map(|e| Arc::new(e.clone()))
.collect();

let A = events[0].clone();
let F = events[5].clone();

// The causal graph should be the same regardless
// of the order in which the events are pushed.
thread_rng().shuffle(&mut events);

let mut machine = ConsensusMachine::new();
let mut machine = ConsensusMachine::new(A.clone());

for e in events {
machine.push(e).unwrap();
@@ -638,7 +632,6 @@ mod tests {
assert!(B_second.stamp().concurrent(D_prime.stamp()));

let events = vec![
A,
B,
C,
D,
@@ -688,7 +681,7 @@ mod tests {
// of the order in which the events are pushed.
thread_rng().shuffle(&mut events);

let mut machine = ConsensusMachine::new();
let mut machine = ConsensusMachine::new(A.clone());

for e in events {
unsafe {
@@ -17,12 +17,12 @@
*/

use account::NormalAddress;
use bitvec::Bits;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use causality::Stamp;
use crypto::{Hash, PublicKey, Signature};
use network::NodeId;
use std::io::Cursor;
use bitvec::Bits;

#[derive(Serialize, Deserialize, Clone, Debug, PartialEq)]
pub struct Join {
@@ -35,7 +35,7 @@ pub struct Leave {

#[serde(skip_serializing_if = "Option::is_none")]
pub hash: Option<Hash>,

#[serde(skip_serializing_if = "Option::is_none")]
pub signature: Option<Signature>,
}
@@ -24,6 +24,7 @@ extern crate quickcheck;
extern crate serde_derive;

extern crate account;
extern crate bitvec;
extern crate byteorder;
extern crate causality;
extern crate crypto;
@@ -35,7 +36,6 @@ extern crate persistence;
extern crate rayon;
extern crate rlp;
extern crate serde;
extern crate bitvec;
extern crate transactions;

#[macro_use]
@@ -67,14 +67,14 @@ pub enum Event {
impl PartialEq for Event {
fn eq(&self, other: &Event) -> bool {
// This only makes sense when the event is received
// when the node is a server i.e. when the event is
// when the node is a server i.e. when the event is
// guaranteed to have a hash because it already passed
// the parsing stage.
self.hash().unwrap() == other.hash().unwrap()
}
}

impl Eq for Event { }
impl Eq for Event {}

impl HashTrait for Event {
fn hash<H: Hasher>(&self, state: &mut H) {
@@ -91,7 +91,7 @@ impl Event {
Event::Dummy(_, _, _, ref stamp) => stamp.clone(),
}
}

pub fn node_id(&self) -> NodeId {
match *self {
Event::Heartbeat(ref event) => event.node_id.clone(),

0 comments on commit 011240a

Please sign in to comment.
You can’t perform that action at this time.