Skip to content

Commit

Permalink
raft: implemented leader lease when quorum check is on
Browse files Browse the repository at this point in the history
port from etcd etcd-io/etcd#5468
  • Loading branch information
tiancaiamao committed Jun 10, 2016
1 parent 3928b41 commit 21b48ce
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 18 deletions.
73 changes: 57 additions & 16 deletions src/raft/raft.rs
Expand Up @@ -305,6 +305,14 @@ impl<T: Storage> Raft<T> {
self.election_timeout
}

pub fn set_randomized_election_timeout(&mut self, v: usize) {
self.randomized_election_timeout = v
}

pub fn get_randomized_election_timeout(&self) -> usize {
self.election_timeout
}

pub fn get_heartbeat_timeout(&self) -> usize {
self.heartbeat_timeout
}
Expand Down Expand Up @@ -528,12 +536,8 @@ impl<T: Storage> Raft<T> {
// tick_election is run by followers and candidates after self.election_timeout.
// TODO: revoke pub when there is a better way to test.
pub fn tick_election(&mut self) {
if !self.promotable() {
self.election_elapsed = 0;
return;
}
self.election_elapsed += 1;
if self.pass_election_timeout() {
if self.promotable() && self.past_election_timeout() {
self.election_elapsed = 0;
let m = new_message(INVALID_ID, MessageType::MsgHup, Some(self.id));
self.step(m).is_ok();
Expand Down Expand Up @@ -692,6 +696,25 @@ impl<T: Storage> Raft<T> {
// local message
} else if m.get_term() > self.term {
let leader_id = if m.get_msg_type() == MessageType::MsgRequestVote {
if self.check_quorum && self.state != StateRole::Candidate &&
self.election_elapsed < self.election_timeout {
// If a server receives a RequestVote request within the minimum
// election timeout of hearing from a current leader, it does not
// update its term or grant its vote
info!("{} {} [logterm: {}, index: {}, vote: {}] ignored from {} [logterm: \
{}, index: {}] at term {}: lease is not expired (remaining ticks: {})",
self.tag,
self.id,
self.raft_log.last_term(),
self.raft_log.last_index(),
self.vote,
m.get_from(),
m.get_log_term(),
m.get_index(),
self.term,
self.election_timeout - self.election_elapsed);
return Ok(());
}
INVALID_ID
} else {
m.get_from()
Expand All @@ -705,14 +728,32 @@ impl<T: Storage> Raft<T> {
m.get_term());
self.become_follower(m.get_term(), leader_id);
} else if m.get_term() < self.term {
// ignore
info!("{} {} [term: {}] ignored a {:?} message with lower term from {} [term: {}]",
self.tag,
self.id,
self.term,
m.get_msg_type(),
m.get_from(),
m.get_term());
if self.check_quorum &&
(m.get_msg_type() == MessageType::MsgHeartbeat ||
m.get_msg_type() == MessageType::MsgAppend) {
// We have received messages from a leader at a lower term. It is possible
// that these messages were simply delayed in the network, but this
// could also mean that this node has advanced its term number during a
// network partition, and it is now unable to either win an election or
// to rejoin the majority on the old term. If checkQuorum is false, this
// will be handled by incrementing term numbers in response to MsgVote
// with a higher term, but if checkQuorum is true we may not advance the
// term on MsgVote and must generate other messages to advance the term.
// The net result of these two features is to minimize the disruption caused
// by nodes that have been removed from the cluster's configuration: a
// removed node will send MsgVotes which will be ignored, but it will not
// receive MsgApp or MsgHeartbeat, so it will not create disruptive term increases
self.send(new_message(m.get_from(), MessageType::MsgAppendResponse, None));
} else {
// ignore other cases
info!("{} {} [term: {}] ignored a {:?} message with lower term from {} [term: {}]",
self.tag,
self.id,
self.term,
m.get_msg_type(),
m.get_from(),
m.get_term());
}
return Ok(());
}

Expand Down Expand Up @@ -1323,7 +1364,7 @@ impl<T: Storage> Raft<T> {
self.prs.insert(id, p);
}

fn del_progress(&mut self, id: u64) {
pub fn del_progress(&mut self, id: u64) {
self.prs.remove(&id);
}

Expand All @@ -1343,10 +1384,10 @@ impl<T: Storage> Raft<T> {
self.vote = hs.get_vote();
}

/// `pass_election_timeout` returns true iff `election_elapsed` is greater
/// `past_election_timeout` returns true iff `election_elapsed` is greater
/// than or equal to the randomized election timeout in
/// [`election_timeout`, 2 * `election_timeout` - 1].
pub fn pass_election_timeout(&self) -> bool {
pub fn past_election_timeout(&self) -> bool {
self.election_elapsed >= self.randomized_election_timeout
}

Expand Down
172 changes: 170 additions & 2 deletions tests/test_raft.rs
Expand Up @@ -913,7 +913,7 @@ fn test_commit() {
}

#[test]
fn test_pass_election_timeout() {
fn test_past_election_timeout() {
let tests = vec![
(5, 0f64, false),
(10, 0.1, true),
Expand All @@ -929,7 +929,7 @@ fn test_pass_election_timeout() {
let mut c = 0;
for _ in 0..10000 {
sm.reset_randomized_election_timeout();
if sm.pass_election_timeout() {
if sm.past_election_timeout() {
c += 1;
}
}
Expand Down Expand Up @@ -1347,6 +1347,174 @@ fn test_leader_stepdown_when_quorum_lost() {
assert_eq!(sm.state, StateRole::Follower);
}

#[test]
fn test_leader_superseding_with_check_quorum() {
let mut a = new_test_raft(1, vec![1, 2, 3], 5, 1, new_storage());
let mut b = new_test_raft(2, vec![1, 2, 3], 5, 1, new_storage());
let mut c = new_test_raft(3, vec![1, 2, 3], 5, 1, new_storage());

a.check_quorum = true;
b.check_quorum = true;
c.check_quorum = true;

let mut nt = Network::new(vec![Some(a), Some(b), Some(c)]);

// Prevent campaigning from b
{
let election_timeout = nt.peers[&2].get_election_timeout();
let b = nt.peers.get_mut(&2).unwrap();
b.set_randomized_election_timeout(election_timeout + 1);
for _ in 0..election_timeout {
b.tick();
}
}

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&3].state, StateRole::Follower);

nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

// Peer b rejected c's vote since its electionElapsed had not reached to electionTimeout
assert_eq!(nt.peers[&3].state, StateRole::Candidate);

// Letting b's electionElapsed reach to electionTimeout
{
let election_timeout = nt.peers[&2].get_election_timeout();
let b = nt.peers.get_mut(&2).unwrap();
for _ in 0..election_timeout {
b.tick();
}
}

nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&3].state, StateRole::Leader);
}

#[test]
fn test_leader_election_with_check_quorum() {
let mut a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
let mut b = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage());
let mut c = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage());

a.check_quorum = true;
b.check_quorum = true;
c.check_quorum = true;

let mut nt = Network::new(vec![Some(a), Some(b), Some(c)]);

// Letting b's electionElapsed reach to timeout so that it can vote for a
{
let election_timeout = nt.peers[&2].get_election_timeout();
let b = nt.peers.get_mut(&2).unwrap();
for _ in 0..election_timeout {
b.tick()
}
}

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&3].state, StateRole::Follower);

{
let election_timeout = nt.peers[&1].get_election_timeout();
let a = nt.peers.get_mut(&1).unwrap();
for _ in 0..election_timeout {
a.tick()
}
}
{
let election_timeout = nt.peers[&2].get_election_timeout();
let b = nt.peers.get_mut(&2).unwrap();
for _ in 0..election_timeout {
b.tick()
}
}

nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Leader);
}

// test_free_stuck_candidate_with_check_quorum ensures that a candidate with a higher term
// can disrupt the leader even if the leader still "officially" holds the lease, The
// leader is expected to step down and adopt the candidate's term
#[test]
fn test_free_stuck_candidate_with_check_quorum() {
let mut a = new_test_raft(1, vec![1, 2, 3], 10, 1, new_storage());
let mut b = new_test_raft(2, vec![1, 2, 3], 10, 1, new_storage());
let mut c = new_test_raft(3, vec![1, 2, 3], 10, 1, new_storage());

a.check_quorum = true;
b.check_quorum = true;
c.check_quorum = true;

let mut nt = Network::new(vec![Some(a), Some(b), Some(c)]);

{
let election_timeout = nt.peers[&2].get_election_timeout();
let b = nt.peers.get_mut(&2).unwrap();
for _ in 0..election_timeout {
b.tick()
}
}
nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);

nt.isolate(1);
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);
assert_eq!(nt.peers[&3].term, nt.peers[&2].term + 1);

// Vote again for safety
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]);

assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&3].state, StateRole::Candidate);
assert_eq!(nt.peers[&3].term, nt.peers[&2].term + 2);

nt.recover();
let mut m = new_message(1, 3, MessageType::MsgHeartbeat, 0);
m.set_term(nt.peers[&1].term);
nt.send(vec![m]);

// Disrupt the leader so that the stuck peer is freed
assert_eq!(nt.peers[&1].state, StateRole::Follower);
assert_eq!(nt.peers[&3].term, nt.peers[&1].term);
}

#[test]
fn test_non_promotable_voter_with_check_quorum() {
let mut a = new_test_raft(1, vec![1, 2], 10, 1, new_storage());
let mut b = new_test_raft(2, vec![1], 10, 1, new_storage());

a.check_quorum = true;
b.check_quorum = true;

let mut nt = Network::new(vec![Some(a), Some(b)]);
// Need to remove 2 again to make it a non-promotable node
// since newNetwork overwritten some internal states
nt.peers.get_mut(&2).unwrap().del_progress(2);
assert!(nt.peers[&2].promotable(), false);

{
let election_timeout = nt.peers[&2].get_election_timeout();
let b = nt.peers.get_mut(&2).unwrap();
for _ in 0..election_timeout {
b.tick()
}
}

nt.send(vec![new_message(1, 1, MessageType::MsgHup, 0)]);
assert_eq!(nt.peers[&1].state, StateRole::Leader);
assert_eq!(nt.peers[&2].state, StateRole::Follower);
assert_eq!(nt.peers[&2].leader_id, 1);
}

#[test]
fn test_leader_append_response() {
// initial progress: match = 0; next = 3
Expand Down

0 comments on commit 21b48ce

Please sign in to comment.