-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
raft: port pre-vote feature #1444
Conversation
PTAL @siddontang @BusyJay @ngaut |
PTAL @siddontang @BusyJay @ngaut |
if m.get_term() == 0 { | ||
// Pre-vote RPCs are sent at a term other than our actual term, so the code | ||
// that sends these messages is responsible for setting the term. | ||
panic!("term should be set when sending {:?}", m.get_msg_type()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tag
@@ -624,6 +662,16 @@ impl<T: Storage> Raft<T> { | |||
info!("{} became candidate at term {}", self.tag, self.term); | |||
} | |||
|
|||
pub fn become_pre_candidate(&mut self) { | |||
assert!(self.state != StateRole::Leader, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if + panic
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The usage of assert!
is consistent with become_candidate()
.
fn ents(terms: Vec<u64>) -> Interface { | ||
// ents_with_config creates a raft state machine with a sequence of log entries at | ||
// the given terms. | ||
fn ents_with_config(terms: Vec<u64>, pre_vote: bool) -> Interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
where is the config?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pre_vote is the config.
self.raft_log.committed + 1, | ||
raft_log::NO_LIMIT) | ||
.expect("unexpected error getting unapplied entries"); | ||
let n = self.num_pending_conf(&ents[..]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seem that &ents
is enough?
if n != 0 && self.raft_log.committed > self.raft_log.applied { | ||
warn!("{} cannot campaign at term {} since there are still {} pending \ | ||
configuration changes to apply", | ||
self.id, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.tag
PTAL @BusyJay |
} | ||
let base = ltoa(raft_log); | ||
let l = ltoa(&nt.peers[&(1 + i as u64)].raft_log); | ||
if base != l { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not the same with etcd :
base := ltoa(tt.raftLog)
if sm, ok := nt.peers[1+uint64(i)].(*raft); ok {
l := ltoa(sm.raftLog)
if g := diffu(base, l); g != "" {
t.Errorf("#%d: diff:\n%s", i, g)
}
} else {
t.Logf("#%d: empty log", i)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This piece of code is to ensure two logs have the same applied index, commit index, and log entries. In TiKV, it simply use String::ne
instead of diffu
in every test case.
@@ -1179,6 +1500,15 @@ fn test_msg_append_response_wait_reset() { | |||
|
|||
#[test] | |||
fn test_recv_msg_request_vote() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
etcd has only one Vote test here:
func TestRecvMsgVote(t *testing.T) {
testRecvMsgVote(t, pb.MsgVote)
}
func testRecvMsgVote(t *testing.T, msgType pb.MessageType) {
No prevote test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's deleted in etcd-io/etcd#6975
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
etcd-io/etcd#6975 will be taken in this PR later.
@@ -190,6 +202,7 @@ pub struct Raft<T: Storage> { | |||
heartbeat_elapsed: usize, | |||
|
|||
pub check_quorum: bool, | |||
pub pre_vote: bool, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why pub?
// proposals are a way to forward to the leader and | ||
// should be treated as local message. | ||
if m.get_msg_type() != MessageType::MsgPropose && | ||
m.get_msg_type() != MessageType::MsgReadIndex { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why check ReadIndex
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MsgReadIndex is also forwarded to leader.
if (self.vote == INVALID_ID || m.get_term() > self.term || | ||
self.vote == m.get_from()) && | ||
self.raft_log.is_up_to_date(m.get_index(), m.get_log_term()) { | ||
info!("{} [logterm: {}, index: {}, vote: {}] cast {:?} for {} [logterm: {}, \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reuse the function log_vote_approve
and log_vote_reject
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The log message is used once. It's simple to log all arguments in-place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function is too long, it's more readable to keep these two line consuming statements apart.
|
||
// three logs futher along than 0, but in the same term so rejection | ||
// are returned instead of the votes being ignored. | ||
(Network::new_with_config(vec![None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing a case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case of "log converge" is moved outside, see #1425.
fn ents(terms: Vec<u64>) -> Interface { | ||
// ents_with_config creates a raft state machine with a sequence of log entries at | ||
// the given terms. | ||
fn ents_with_config(terms: Vec<u64>, pre_vote: bool) -> Interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why change this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add pre_vote.
nt.send(vec![new_message(3, 3, MessageType::MsgHup, 0)]); | ||
let wlog = new_raft_log(vec![empty_entry(1, 1)], 2, 1); | ||
let wlog2 = new_raft_log_with_storage(new_storage()); | ||
let tests = vec![ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's more clear to add the index to case suit than using enumerate.
(StateRole::Candidate, 3, 3, 1, true), | ||
]; | ||
|
||
for (j, (state, i, term, vote_for, w_reject)) in tests.drain(..).enumerate() { | ||
let raft_log = new_raft_log(vec![empty_entry(2, 1), empty_entry(2, 2)], 3, 0); | ||
let mut sm = new_test_raft(1, vec![1], 10, 1, new_storage()); | ||
if msg_type == MessageType::MsgRequestPreVote { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why set this?
sm.state = state; | ||
sm.vote = vote_for; | ||
sm.raft_log = raft_log; | ||
let mut m = new_message(2, 0, MessageType::MsgRequestVote, 0); | ||
let mut m = new_message(2, 0, msg_type, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's 0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What value it should be?
@@ -529,41 +556,275 @@ fn test_progress_paused() { | |||
} | |||
|
|||
#[test] | |||
fn test_leader_election() { | |||
fn test_leader_election_no_pre_vote() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to keep the original name and rename test_leader_election
to test_leader_election_with_config
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
keep the same name with etcd TestLeaderElection
let mut r = new_test_raft(1, vec![1], 5, 1, new_storage()); | ||
r.pre_vote = pre_vote; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to add a new function new_raft
. Set it after initialization assumes that pre_vote
doesn't need any initialization work.
The modification for raft in etcd-io/etcd#6975 is merged. |
PTAL @siddontang @BusyJay |
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest LGTM. By the way, it will be more clear to port a pr at a time.
let raft = &network.peers[&1]; | ||
if raft.state != state { | ||
panic!("#{}: state = {:?}, want {:?}", i, raft.state, state); | ||
let raft = network.peers.get(&1).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use &network.peers[&1];
instead.
PTAL @BusyJay |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Hi,
This PR port the pre-vote feature for etcd/raft.
It merges #1330, #1425, and etcd-io/etcd#7060.