Skip to content

Commit

Permalink
WIP: Minimum leader election test
Browse files Browse the repository at this point in the history
  • Loading branch information
tailhook committed Oct 28, 2015
1 parent f8c3e09 commit 99f95e1
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 16 deletions.
102 changes: 89 additions & 13 deletions src/elect/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ type Epoch = u64;
#[derive(Clone, Debug)]
pub enum Machine {
Starting { leader_deadline: SteadyTime },
Electing { epoch: Epoch,
votes_for_me: HashSet<Id>, election_deadline: SteadyTime },
Electing { epoch: Epoch, votes_for_me: HashSet<Id>, deadline: SteadyTime },
Voted { epoch: Epoch, peer: Id, election_deadline: SteadyTime },
Leader { epoch: Epoch, ping_time: SteadyTime },
Leader { epoch: Epoch, next_ping_time: SteadyTime },
Follower { epoch: Epoch, leader_deadline: SteadyTime },
}

Expand All @@ -28,6 +27,30 @@ impl Machine {
leader_deadline: now + start_timeout(),
}
}

// methods generic over the all states
pub fn is_newer_than(&self, epoch: Epoch) -> bool {
use self::Machine::*;
let my_epoch = match *self {
Starting { .. } => 0, // real epochs start from 1
Electing { epoch, .. } => epoch,
Voted { epoch, ..} => epoch,
Leader { epoch, ..} => epoch,
Follower { epoch, ..} => epoch,
};
my_epoch > epoch
}
pub fn current_deadline(&self) -> SteadyTime {
use self::Machine::*;
match *self {
Starting { leader_deadline } => leader_deadline,
Electing { deadline, .. } => deadline,
Voted { election_deadline, ..} => election_deadline,
Leader { next_ping_time, ..} => next_ping_time,
Follower { leader_deadline, ..} => leader_deadline,
}
}

pub fn time_passed(self, info: &Info, now: SteadyTime)
-> (Machine, ActionList)
{
Expand All @@ -37,10 +60,7 @@ impl Machine {
info!("[{}] Time passed. Electing as a leader", info.id);
if info.all_hosts.len() == 0 {
// No other hosts. May safefully become a leader
let next_ping = now +
Duration::milliseconds(HEARTBEAT_INTERVAL);
(Leader { epoch: 1, ping_time: next_ping },
Action::PingAll.and_wait(next_ping))
become_leader(1, now)
} else {
let election_end = now +
Duration::milliseconds(HEARTBEAT_INTERVAL);
Expand All @@ -50,7 +70,7 @@ impl Machine {
let mut h = HashSet::new();
h.insert(info.id.clone());
h },
election_deadline: election_end },
deadline: election_end },
Action::Vote.and_wait(election_end))
}
}
Expand All @@ -65,13 +85,15 @@ impl Machine {
{
use self::Machine::*;
use super::Message::*;
let (msg_epoch, data) = msg;
let (src, msg_epoch, data) = msg;
if self.is_newer_than(msg_epoch) {
return pass(self);
}

let (machine, action) = match (self, data) {
(Starting { .. }, Ping) => {
let dline = now + election_ivl();
(Follower { epoch: msg_epoch, leader_deadline: dline },
Action::wait(dline))
// Got message that someone is a leader
follow(msg_epoch, now)
}
(Starting { leader_deadline: dline }, Pong) => {
// This probably means this node was a leader. But there is
Expand All @@ -85,11 +107,65 @@ impl Machine {
peer: id.clone(), election_deadline: dline},
Action::ConfirmVote(id).and_wait(leader_deadline))
}
(Electing { .. }, _) => unimplemented!(),
(Electing { .. }, Ping) => {
// Got message that someone is already (just became) a leader
follow(msg_epoch, now)
}
(me @ Electing { .. }, Pong) => {
// This is not expected when the network is okay and all nodes
// behave well because it means someone thinks that this node
// is a leader
pass(me)
}
(Electing { epoch, mut votes_for_me, deadline}, Vote(id)) => {
if id == info.id {
votes_for_me.insert(src);
let need = minimum_votes(info.all_hosts.len());
if votes_for_me.len() >= need {
become_leader(epoch, now)
} else {
(Electing { epoch: epoch, votes_for_me: votes_for_me,
deadline: deadline },
Action::wait(deadline))
}
} else {
// Peer voted for someone else
(Electing { epoch: epoch, votes_for_me: votes_for_me,
deadline: deadline },
Action::wait(deadline))
}
}
(Voted { .. }, _) => unimplemented!(),
(Leader { .. }, _) => unimplemented!(),
(Follower { .. }, _) => unimplemented!(),
};
return (machine, action)
}
}

fn follow(epoch: Epoch, now: SteadyTime) -> (Machine, ActionList) {
let dline = now + election_ivl();
(Machine::Follower { epoch: epoch, leader_deadline: dline },
Action::wait(dline))
}

fn pass(me: Machine) -> (Machine, ActionList) {
let deadline = me.current_deadline();
return (me, Action::wait(deadline));
}

fn minimum_votes(total_peers: usize) -> usize {
match total_peers + 1 { // peers don't include myself
0 => 0,
1 => 1,
2 => 2,
x => (x >> 1) + 1,
}
}

fn become_leader(epoch: Epoch, now: SteadyTime) -> (Machine, ActionList) {
let next_ping = now +
Duration::milliseconds(HEARTBEAT_INTERVAL);
(Machine::Leader { epoch: epoch, next_ping_time: next_ping },
Action::PingAll.and_wait(next_ping))
}
2 changes: 1 addition & 1 deletion src/elect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod info;
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
struct Id(String);

type Capsule = (u64, Message);
type Capsule = (Id, u64, Message);

#[derive(Clone, Debug)]
enum Message {
Expand Down
5 changes: 3 additions & 2 deletions src/elect/test_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,15 @@ fn test_vote_approved() {
let id = info.id.clone();
assert!(matches!(node, Machine::Starting { .. }));

env.add_another_for(&mut info);
let two = env.add_another_for(&mut info);
env.sleep(10000); // Large timeout, should start_election
let (node, act) = node.time_passed(&info, env.now());
assert!(matches!(node, Machine::Electing { .. }));
assert!(act.action == Some(Action::Vote));

let (node, act) = node.message(&info,
(0, Message::Vote(id.clone())), env.now());
(two.clone(), 1, Message::Vote(id.clone())), env.now());
println!("Node {:?}", node);
assert!(matches!(node, Machine::Leader { .. }));
assert!(act.action == Some(Action::PingAll));
}

0 comments on commit 99f95e1

Please sign in to comment.