Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract Network and Interface from utils #127

Merged
merged 18 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ target
tmp
/bin

harness/target

Cargo.lock
*.rs.bk
*.rs.fmt
5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,23 @@ categories = ["algorithms", "database-implementations"]

[features]
default = []
# Enable failpoints
failpoint = ["fail"]

# Make sure to synchronize updates with Harness.
[dependencies]
log = ">0.2"
protobuf = "2.0.4"
quick-error = "1.2.2"
rand = "0.5.4"
fxhash = "0.2.1"
fail = { version = "0.2", optional = true }
env_logger = "0.5"

[dev-dependencies]
env_logger = "0.5"
criterion = ">0.2.4"
lazy_static = "1.0"
harness = { path = "harness" }

[[bench]]
name = "benches"
Expand Down
16 changes: 16 additions & 0 deletions harness/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "harness"
version = "0.1.0"
authors = ["The TiKV Project Developers"]
license = "Apache-2.0"
keywords = []
repository = "https://github.com/pingcap/raft-rs/harness"
readme = "README.md"
homepage = "https://github.com/pingcap/raft-rs/harness"
description = "A testing harness for Raft."
categories = []

# Make sure to synchronize updates with Raft.
[dependencies]
raft = { path = ".." }
rand = "0.5.4"
115 changes: 115 additions & 0 deletions harness/src/interface.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2018 PingCAP, Inc.
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use raft::{eraftpb::Message, storage::MemStorage, Progress, ProgressSet, Raft, Result};
use std::ops::{Deref, DerefMut};

/// A simulated Raft façade for testing.
///
/// If the contained value is a `Some` operations happen. If they are a `None` operations are
/// a no-op.
///
// Compare to upstream, we use struct instead of trait here.
// Because to be able to cast Interface later, we have to make
// Raft derive Any, which will require a lot of dependencies to derive Any.
// That's not worthy for just testing purpose.
pub struct Interface {
/// The raft peer.
pub raft: Option<Raft<MemStorage>>,
}

impl Interface {
/// Create a new interface to a new raft.
pub fn new(r: Raft<MemStorage>) -> Interface {
Interface { raft: Some(r) }
}

/// Step the raft, if it exists.
pub fn step(&mut self, m: Message) -> Result<()> {
match self.raft {
Some(_) => Raft::step(self, m),
None => Ok(()),
}
}

/// Read messages out of the raft.
pub fn read_messages(&mut self) -> Vec<Message> {
match self.raft {
Some(_) => self.msgs.drain(..).collect(),
None => vec![],
}
}

/// Initialize a raft with the given ID and peer set.
pub fn initial(&mut self, id: u64, ids: &[u64]) {
if self.raft.is_some() {
self.id = id;
let prs = self.take_prs();
self.set_prs(ProgressSet::with_capacity(
ids.len(),
prs.learner_ids().len(),
));
for id in ids {
let progress = Progress::new(0, 256);
if prs.learner_ids().contains(id) {
if let Err(e) = self.mut_prs().insert_learner(*id, progress) {
panic!("{}", e);
}
} else if let Err(e) = self.mut_prs().insert_voter(*id, progress) {
panic!("{}", e);
}
}
let term = self.term;
self.reset(term);
}
}
}

impl From<Option<Raft<MemStorage>>> for Interface {
fn from(raft: Option<Raft<MemStorage>>) -> Self {
Self { raft }
}
}

impl From<Raft<MemStorage>> for Interface {
fn from(raft: Raft<MemStorage>) -> Self {
Self { raft: Some(raft) }
}
}

impl Deref for Interface {
type Target = Raft<MemStorage>;
fn deref(&self) -> &Raft<MemStorage> {
self.raft.as_ref().unwrap()
}
}

impl DerefMut for Interface {
fn deref_mut(&mut self) -> &mut Raft<MemStorage> {
self.raft.as_mut().unwrap()
}
}
42 changes: 42 additions & 0 deletions harness/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*!

This module contains various testing harness utilities for Raft.

> If you want to build Raft without this, disable the `harness` feature.

*/

extern crate raft;
extern crate rand;

mod interface;
mod network;

pub use self::{interface::Interface, network::Network};
189 changes: 189 additions & 0 deletions harness/src/network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

// Copyright 2015 CoreOS, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::interface::Interface;
use raft::{
eraftpb::{Message, MessageType},
storage::MemStorage,
Config, Raft, NO_LIMIT,
};
use rand;
use std::collections::HashMap;

#[derive(Default, Debug, PartialEq, Eq, Hash)]
struct Connem {
Hoverbear marked this conversation as resolved.
Show resolved Hide resolved
from: u64,
to: u64,
}

/// A simulated network for testing.
///
/// You can use this to create a test network of Raft nodes.
///
/// *Please note:* no actual network calls are made.
#[derive(Default)]
pub struct Network {
/// The set of raft peers.
pub peers: HashMap<u64, Interface>,
/// The storage of the raft peers.
pub storage: HashMap<u64, MemStorage>,
dropm: HashMap<Connem, f64>,
ignorem: HashMap<MessageType, bool>,
}

impl Network {
/// Initializes a network from peers.
///
/// Nodes will recieve their ID based on their index in the vector, starting with 1.
///
/// A `None` node will be replaced with a new Raft node.
pub fn new(peers: Vec<Option<Interface>>) -> Network {
Network::new_with_config(peers, false)
}

/// Explicitly set the pre_vote option on newly created rafts.
///
/// **TODO:** Make this accept any config.
pub fn new_with_config(mut peers: Vec<Option<Interface>>, pre_vote: bool) -> Network {
let size = peers.len();
let peer_addrs: Vec<u64> = (1..=size as u64).collect();
let mut nstorage = HashMap::new();
let mut npeers = HashMap::new();
for (p, id) in peers.drain(..).zip(peer_addrs.clone()) {
match p {
None => {
nstorage.insert(id, MemStorage::default());
let r = Interface::new(
Raft::new(
&Config {
id,
peers: peer_addrs.clone(),
election_tick: 10,
heartbeat_tick: 1,
max_size_per_msg: NO_LIMIT,
max_inflight_msgs: 256,
pre_vote,
..Default::default()
},
nstorage[&id].clone(),
)
.unwrap(),
);
npeers.insert(id, r);
}
Some(mut p) => {
p.initial(id, &peer_addrs);
npeers.insert(id, p);
}
}
}
Network {
peers: npeers,
storage: nstorage,
..Default::default()
}
}

/// Ignore a given `MessageType`.
pub fn ignore(&mut self, t: MessageType) {
self.ignorem.insert(t, true);
}

/// Filter out messages that should be dropped according to rules set by `ignore` or `drop`.
pub fn filter(&self, mut msgs: Vec<Message>) -> Vec<Message> {
msgs.drain(..)
.filter(|m| {
if self
.ignorem
.get(&m.get_msg_type())
.cloned()
.unwrap_or(false)
{
return false;
}
// hups never go over the network, so don't drop them but panic
assert_ne!(m.get_msg_type(), MessageType::MsgHup, "unexpected msgHup");
let perc = self
.dropm
.get(&Connem {
from: m.get_from(),
to: m.get_to(),
})
.cloned()
.unwrap_or(0f64);
rand::random::<f64>() >= perc
})
.collect()
}

/// Instruct the cluster to `step` through the given messages.
pub fn send(&mut self, msgs: Vec<Message>) {
let mut msgs = msgs;
while !msgs.is_empty() {
let mut new_msgs = vec![];
for m in msgs.drain(..) {
let resp = {
let p = self.peers.get_mut(&m.get_to()).unwrap();
let _ = p.step(m);
p.read_messages()
};
new_msgs.append(&mut self.filter(resp));
}
msgs.append(&mut new_msgs);
}
}

/// Ignore messages from `from` to `to` at `perc` percent chance.
///
/// `perc` set to `1f64` is a 100% chance, `0f64` is a 0% chance.
pub fn drop(&mut self, from: u64, to: u64, perc: f64) {
self.dropm.insert(Connem { from, to }, perc);
}

/// Cut the communication between the two given nodes.
pub fn cut(&mut self, one: u64, other: u64) {
self.drop(one, other, 1f64);
self.drop(other, one, 1f64);
}

/// Isolate the given raft to and from all other raft in the cluster.
pub fn isolate(&mut self, id: u64) {
for i in 0..self.peers.len() as u64 {
let nid = i + 1;
if nid != id {
self.drop(id, nid, 1.0);
self.drop(nid, id, 1.0);
}
}
}

/// Recover the cluster conditions applied with `drop` and `ignore`.
pub fn recover(&mut self) {
self.dropm = HashMap::new();
self.ignorem = HashMap::new();
}
}
Loading