Skip to content
This repository has been archived by the owner on Jun 25, 2021. It is now read-only.

Commit

Permalink
Merge 16ccf8f into 06a703e
Browse files Browse the repository at this point in the history
  • Loading branch information
davidrusu committed Apr 21, 2021
2 parents 06a703e + 16ccf8f commit a425897
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 279 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Expand Up @@ -15,7 +15,7 @@ bincode = "1.2.1"
multibase = "~0.8.0"
hex = "~0.3.2"
rand = "~0.7.3"
crdts = "4.3.0"
crdts = "6.3.2"
threshold_crypto = "~0.4.0"
xor_name = "1.1.9"
signature = "1.1.0"
Expand All @@ -34,6 +34,7 @@ rand_core = "~0.5.1"
features = [ "sha3" ]

[dev-dependencies]
num = "*"
anyhow = "1.0.36"
rand_xorshift = "~0.2.0"
proptest = "0.10.1"
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Expand Up @@ -75,7 +75,7 @@ use xor_name::XorName;

/// Object storing a data variant.
#[allow(clippy::large_enum_variant)]
#[derive(Clone, Eq, PartialEq, PartialOrd, Hash, Serialize, Deserialize, Debug)]
#[derive(Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Debug)]
pub enum Data {
/// Blob.
Immutable(Blob),
Expand Down
256 changes: 4 additions & 252 deletions src/sequence/mod.rs
Expand Up @@ -18,10 +18,7 @@ pub use metadata::{
use seq_crdt::{CrdtOperation, SequenceCrdt};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::{
fmt::{self, Debug, Formatter},
hash::Hash,
};
use std::{fmt::Debug, hash::Hash};
use xor_name::XorName;

// Type of data used for the 'Actor' in CRDT vector clocks
Expand All @@ -35,20 +32,8 @@ pub type PublicSeqData = SequenceCrdt<ActorType, PublicPolicy>;
/// Private Sequence.
pub type PrivateSeqData = SequenceCrdt<ActorType, PrivatePolicy>;

impl Debug for PublicSeqData {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "PubSequence {:?}", self.address().name())
}
}

impl Debug for PrivateSeqData {
fn fmt(&self, formatter: &mut Formatter) -> fmt::Result {
write!(formatter, "PrivSequence {:?}", self.address().name())
}
}

/// Object storing a Sequence variant.
#[derive(Clone, Eq, PartialEq, PartialOrd, Hash, Serialize, Deserialize, Debug)]
#[derive(Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Debug)]
enum SeqData {
/// Public Sequence Data.
Public(PublicSeqData),
Expand All @@ -57,7 +42,7 @@ enum SeqData {
}

/// Object storing the Sequence
#[derive(Clone, Eq, PartialEq, PartialOrd, Hash, Serialize, Deserialize, Debug)]
#[derive(Clone, Eq, PartialEq, Hash, Serialize, Deserialize, Debug)]
pub struct Data {
authority: PublicKey,
data: SeqData,
Expand Down Expand Up @@ -295,7 +280,7 @@ mod tests {
};
use anyhow::anyhow;
use proptest::prelude::*;
use rand::{rngs::OsRng, seq::SliceRandom};
use rand::rngs::OsRng;
use std::{collections::BTreeMap, sync::Arc};
use xor_name::XorName;

Expand Down Expand Up @@ -913,14 +898,6 @@ mod tests {
prop::collection::vec(generate_seq_entry(), 1..max_quantity + 1)
}

// Generates a vec of Sequence entries each with a value suggesting
// the delivery chance of the op that gets created with the entry
fn generate_dataset_and_probability(
max_quantity: usize,
) -> impl Strategy<Value = Vec<(Vec<u8>, u8)>> {
prop::collection::vec((generate_seq_entry(), any::<u8>()), 1..max_quantity + 1)
}

proptest! {
#[test]
fn proptest_seq_doesnt_crash_with_random_data(
Expand Down Expand Up @@ -1010,231 +987,6 @@ mod tests {
}

verify_data_convergence(replicas, dataset_length)?;

}

#[test]
fn proptest_converge_with_shuffled_op_set_across_arbitrary_number_of_replicas(
dataset in generate_dataset(100),
res in generate_replicas(500)
) {
let (mut replicas, owner_keypair) = res?;
let dataset_length = dataset.len() as u64;

// generate an ops set from one replica
let mut ops = vec![];

for data in dataset {
let op = sign_sequence_op(replicas[0].create_unsigned_append_op(data)?, &owner_keypair)?;
replicas[0].apply_op(op.clone())?;
ops.push(op);
}

// now we randomly shuffle ops and apply at each replica
for replica in &mut replicas {
let mut ops = ops.clone();
ops.shuffle(&mut OsRng);

for op in ops {
replica.apply_op(op)?;
}
}

verify_data_convergence(replicas, dataset_length)?;
}

#[test]
fn proptest_converge_with_shuffled_ops_from_many_replicas_across_arbitrary_number_of_replicas(
dataset in generate_dataset(1000),
res in generate_replicas(100)
) {
let (mut replicas, owner_keypair) = res?;
let dataset_length = dataset.len() as u64;

// generate an ops set using random replica for each data
let mut ops = vec![];
for data in dataset {
if let Some(replica) = replicas.choose_mut(&mut OsRng)
{
let op = sign_sequence_op(replica.create_unsigned_append_op(data)?, &owner_keypair)?;
replica.apply_op(op.clone())?;

ops.push(op);
}
}

let opslen = ops.len() as u64;
prop_assert_eq!(dataset_length, opslen);

// now we randomly shuffle ops and apply at each replica
for replica in &mut replicas {
let mut ops = ops.clone();
ops.shuffle(&mut OsRng);

for op in ops {
replica.apply_op(op)?;
}
}

verify_data_convergence(replicas, dataset_length)?;
}

#[test]
fn proptest_dropped_data_can_be_reapplied_and_we_converge(
dataset in generate_dataset_and_probability(1000),
) {
// Instantiate the same Sequence on two replicas
let sequence_name = XorName::random();
let sequence_tag = 43_001u64;
let owner_keypair = Keypair::new_ed25519(&mut OsRng);
let policy = SequencePublicPolicy {
owner: owner_keypair.public_key(),
permissions: BTreeMap::default(),
};

// Instantiate the same Sequence on two replicas
let mut replicas = gen_pub_seq_replicas(
Some(owner_keypair.clone()),
sequence_name,
sequence_tag,
Some(policy),
2);
let (_, mut replica1) = replicas.remove(0);
let (_, mut replica2) = replicas.remove(0);

let dataset_length = dataset.len() as u64;

let mut ops = vec![];
for (data, delivery_chance) in dataset {
let op = sign_sequence_op(replica1.create_unsigned_append_op(data)?, &owner_keypair)?;
replica1.apply_op(op.clone())?;

ops.push((op, delivery_chance));
}

for (op, delivery_chance) in ops.clone() {
if delivery_chance < u8::MAX / 3 {
replica2.apply_op(op)?;
}
}

// here we statistically should have dropped some messages
if dataset_length > 50 {
assert_ne!(replica2.len(None), replica1.len(None));
}

// reapply all ops
for (op, _) in ops {
replica2.apply_op(op)?;
}

// now we converge
verify_data_convergence(vec![replica1, replica2], dataset_length)?;
}

#[test]
fn proptest_converge_with_shuffled_ops_from_many_while_dropping_some_at_random(
dataset in generate_dataset_and_probability(1000),
res in generate_replicas(100),
) {
let (mut replicas, owner_keypair) = res?;
let dataset_length = dataset.len() as u64;

// generate an ops set using random replica for each data
let mut ops = vec![];
for (data, delivery_chance) in dataset {

// a random index within the replicas range
let index: usize = OsRng.gen_range( 0, replicas.len());
let replica = &mut replicas[index];

let op = sign_sequence_op(replica.create_unsigned_append_op(data)?, &owner_keypair)?;
replica.apply_op(op.clone())?;
ops.push((op, delivery_chance));
}

let opslen = ops.len() as u64;
prop_assert_eq!(dataset_length, opslen);

// now we randomly shuffle ops and apply at each replica
for replica in &mut replicas {
let mut ops = ops.clone();
ops.shuffle(&mut OsRng);

for (op, delivery_chance) in ops.clone() {
if delivery_chance > u8::MAX / 3 {
replica.apply_op(op)?;
}
}

// reapply all ops, simulating lazy messaging filling in the gaps
for (op, _) in ops {
replica.apply_op(op)?;
}
}

verify_data_convergence(replicas, dataset_length)?;
}

#[test]
fn proptest_converge_with_shuffled_ops_including_bad_ops_which_error_and_are_not_applied(
dataset in generate_dataset(10),
bogus_dataset in generate_dataset(10), // should be same number as dataset
gen_replicas_result in generate_replicas(10),

) {
let (mut replicas, owner_keypair) = gen_replicas_result?;
let dataset_length = dataset.len();
let bogus_dataset_length = bogus_dataset.len();
let number_replicas = replicas.len();

// generate the real ops set using random replica for each data
let mut ops = vec![];
for data in dataset {
if let Some(replica) = replicas.choose_mut(&mut OsRng)
{
let op = sign_sequence_op(replica.create_unsigned_append_op(data)?, &owner_keypair)?;

replica.apply_op(op.clone())?;
ops.push(op);
}
}

// set up a replica that has nothing to do with the rest, random xor... different owner...
let xorname = XorName::random();
let tag = 45_000u64;
let random_owner_keypair = Keypair::new_ed25519(&mut OsRng);
let mut bogus_replica = Sequence::new_public(random_owner_keypair.public_key(), "authority".to_string(), xorname, tag, None);

// add bogus ops from bogus replica + bogus data
for data in bogus_dataset {
let bogus_op = sign_sequence_op( bogus_replica.create_unsigned_append_op(data)?, &random_owner_keypair)?;
bogus_replica.apply_op(bogus_op.clone())?;
ops.push(bogus_op);
}

let opslen = ops.len();
prop_assert_eq!(dataset_length + bogus_dataset_length, opslen);

let mut err_count = vec![];
// now we randomly shuffle ops and apply at each replica
for replica in &mut replicas {
let mut ops = ops.clone();
ops.shuffle(&mut OsRng);

for op in ops {
match replica.apply_op(op) {
Ok(_) => {},
// record all errors to check this matches bogus data
Err(error) => {err_count.push(error)},
}
}
}

// check we get an error per bogus datum per replica
assert_eq!(err_count.len(), bogus_dataset_length * number_replicas);

verify_data_convergence(replicas, dataset_length as u64)?;
}
}
}

0 comments on commit a425897

Please sign in to comment.