Skip to content

Commit

Permalink
Revisit Hearbeat and VoteRequest RPC algorithm
Browse files Browse the repository at this point in the history
More inline with raft spec found in Appendix B of
http://ramcloud.stanford.edu/~ongaro/thesis.pdf
Note that Hearbeat should be replaced with AppendEntries request
with empty log entries.

-Fixed fileStore.Close() bug that was not closing the entries log.
-Fixed some tests
  • Loading branch information
kozlovic committed Sep 29, 2016
1 parent 6f5e239 commit 34e7292
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 98 deletions.
2 changes: 1 addition & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ func (fs *fileStore) Close() error {
closeFile(&err, fs.stateLog)
}
if fs.entriesLog != nil {
closeFile(&err, fs.stateLog)
closeFile(&err, fs.entriesLog)
}
return err
}
7 changes: 6 additions & 1 deletion log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func TestFileStateStore(t *testing.T) {
if err != nil {
t.Fatalf("Error opening state log: %v", err)
}
defer fs.(*fileStore).Close()
gotState, err = fs.GetState()
if err != nil {
t.Fatalf("Error getting state: %v", err)
Expand Down Expand Up @@ -252,6 +253,7 @@ func TestFileStateAndLogStore(t *testing.T) {
if err != nil {
t.Fatalf("Error opening state log: %v", err)
}
defer fs.(*fileStore).Close()
// Check the first and last Index
if val, err := fs.FirstIndex(); err != nil || val != 1 {
t.Fatalf("Unexpected result for FirstIndex, got index=%v err=%v", val, err)
Expand Down Expand Up @@ -489,7 +491,10 @@ func TestLogPresenceOnNew(t *testing.T) {
node.setTerm(10)
node.setVote("fake")
// Force writing the state
if err := node.writeState(); err != nil {
node.mu.Lock()
err = node.writeState()
node.mu.Unlock()
if err != nil {
t.Fatalf("Unexpected error writing state: %v", err)
}

Expand Down
192 changes: 101 additions & 91 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,22 @@ type Node struct {
// Last index in the log
lastLogIndex uint64

// index of highest log entry known to be committed (initialized to 0,
// increases monotonically)
commitIndex uint64

// index of highest log entry applied to state machine (initialized to 0,
// increases monotonically)
lastApplied uint64

// On leader: for each server, index of the next log entry to send to that
// server (initialized to leader last log index + 1)
nextIndex []uint64

// On leader: for each server, index of highest log entry known to be
// replicated on server (initialized to 0, increases monotonically)
matchIndex []uint64

// quit channel for shutdown on Close().
quit chan chan struct{}

Expand Down Expand Up @@ -216,21 +232,18 @@ func (n *Node) initLog(path string, opts *Options) error {
lastLogTerm = lastEntry.Term
}
}
n.mu.Lock()
n.id = ps.ID
n.term = ps.CurrentTerm
n.vote = ps.VotedFor
n.lastLogIndex = lastLogIndex
n.lastLogTerm = lastLogTerm
n.mu.Unlock()
return nil
}

// writeState persists the state.
// Lock is held on entry.
func (n *Node) writeState() error {
n.mu.RLock()
ps := pb.State{ID: n.id, CurrentTerm: n.term, VotedFor: n.vote}
n.mu.RUnlock()

return n.log.SetState(&ps)
}

Expand Down Expand Up @@ -307,6 +320,22 @@ func (n *Node) isRunning() bool {
return n.state != CLOSED
}

// updateTerm updates the term of the node if the incoming RCP term
// is higher. This also switch the state to FOLLOWER. In that case,
// it returns true to indicate that the node may have to step down
// from its current role.
// Lock is held on entry.
func (n *Node) updateTerm(rpcTerm uint64) bool {
if rpcTerm <= n.term {
return false
}
n.term = rpcTerm
n.vote = NO_VOTE
n.leader = NO_LEADER
n.switchState(FOLLOWER)
return true
}

// Process loop for a LEADER.
func (n *Node) runAsLeader() {
// Setup our heartbeat ticker
Expand Down Expand Up @@ -352,24 +381,30 @@ func (n *Node) runAsCandidate() {
// Drain and previous responses.
n.drainPreviousVoteResponses()

n.mu.Lock()
nterm := n.term
// Initiate an Election
vreq := &pb.VoteRequest{
Term: n.term,
Candidate: n.id,
Term: nterm,
Candidate: n.id,
LastLogTerm: n.lastLogTerm,
LastLogIndex: n.lastLogIndex,
}
// Collect the votes.
// We will vote for ourselves, so start at 1.
votes := 1

// Vote for ourself.
n.setVote(n.id)
n.vote = n.id

// Save our state.
if err := n.writeState(); err != nil {
n.handleError(err)
n.mu.Unlock()
n.switchToFollower(NO_LEADER)
return
}
n.mu.Unlock()

// Send the vote request to other members
n.rpc.RequestVote(vreq)
Expand Down Expand Up @@ -399,7 +434,7 @@ func (n *Node) runAsCandidate() {
case vresp := <-n.VoteResponses:
// We have a VoteResponse. Only process if
// it is for our term and Granted is true.
if vresp.Granted && vresp.Term == n.term {
if vresp.Granted && vresp.Term == nterm {
votes++
if n.wonElection(votes) {
// Become LEADER if we have won.
Expand Down Expand Up @@ -482,134 +517,98 @@ func (n *Node) postError(err error) {
}

// Send the error to the async handler.
// Lock held on entry.
func (n *Node) handleError(err error) {
n.mu.Lock()
n.errors = append(n.errors, err)
// Call postError only for the first error added.
// Check postError for details.
if len(n.errors) == 1 {
n.postError(err)
}
n.mu.Unlock()
}

// handleHeartBeat is called to process a heartbeat from a LEADER.
// We will indicate to the controlling process loop if we should
// "stepdown" from our current role.
func (n *Node) handleHeartBeat(hb *pb.Heartbeat) bool {
n.mu.Lock()
// Don't reset for older terms.
resetTimer := n.term >= hb.Term

// Ignore old term
if hb.Term < n.term {
return false
}

// Save state flag
saveState := false

// This will trigger a return from the current runAs loop.
stepDown := false
// Check if incoming Term is greater than ours. If so, update term
// and switch to Follower.
stepDown := n.updateTerm(hb.Term)

// Newer term
if hb.Term > n.term {
// If we are candidate and someone asserts they are leader for an equal term,
// step down (note, updateTerm makes sure that n.term <= hb.Term).
if n.state == CANDIDATE && hb.Term == n.term {
n.term = hb.Term
n.vote = NO_VOTE
stepDown = true
saveState = true
}

// If we are candidate and someone asserts they are leader for an equal or
// higher term, step down.
if n.State() == CANDIDATE && hb.Term >= n.term {
n.term = hb.Term
n.vote = NO_VOTE
stepDown = true
saveState = true
}

// Reset the election timer.
n.resetElectionTimeout()

// Write our state if needed.
if saveState {
if stepDown {
if err := n.writeState(); err != nil {
n.handleError(err)
stepDown = true
}
}
n.mu.Unlock()

// Reset the election timer.
if resetTimer {
n.resetElectionTimeout()
}

return stepDown
}

// handleVoteRequest will process a vote request and either
// deny or grant our own vote to the caller.
func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {
n.mu.Lock()
// Any RPC with a newer term causes the recipient to advance its term.
// This call may change term but also state.
stepDown := n.updateTerm(vreq.Term)

deny := &pb.VoteResponse{Term: n.term, Granted: false}

// Old term, reject
if vreq.Term < n.term {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return false
}

// Save state flag
saveState := false

// This will trigger a return from the current runAs loop.
stepDown := false

// Newer term
if vreq.Term > n.term {
n.term = vreq.Term
n.vote = NO_VOTE
n.leader = NO_LEADER
stepDown = true
saveState = true
}

// If we are the Leader, deny request unless we have seen
// a newer term and must step down.
if n.State() == LEADER && !stepDown {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return stepDown
}
// Make sure candidate's log is at least as up-to-date as ours.
logOk := vreq.LastLogTerm > n.lastLogTerm ||
(vreq.LastLogTerm == n.lastLogTerm && vreq.LastLogIndex >= n.lastLogIndex)

// If we have already cast a vote for this term, reject.
if n.vote != NO_VOTE && n.vote != vreq.Candidate {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return stepDown
}
// Grant vote if we are on the same term, log is OK and we either did not vote or
// already voted for that candidate.
grant := vreq.Term == n.term && logOk && (n.vote == NO_VOTE || n.vote == vreq.Candidate)

// Don't vote for this candidate if its log is not as up-to-date than ours.
if vreq.LastLogTerm < n.lastLogTerm ||
(vreq.LastLogTerm == n.lastLogTerm && vreq.LastLogIndex < n.lastLogIndex) {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return stepDown
// If we grant, keep track who we voted for.
if grant {
n.vote = vreq.Candidate
}

// We will vote for this candidate.

n.setVote(vreq.Candidate)
// Create the response
response := &pb.VoteResponse{Term: n.term, Granted: grant}

// Write our state if needed.
if saveState {
if stepDown {
if err := n.writeState(); err != nil {
// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
n.vote = NO_VOTE
// Change the response to deny the request.
response.Granted = false
// Ask to step down
stepDown = true
}
}
n.mu.Unlock()

// Send our acceptance.
accept := &pb.VoteResponse{Term: n.term, Granted: true}
n.rpc.SendVoteResponse(vreq.Candidate, accept)
// Send our response.
n.rpc.SendVoteResponse(vreq.Candidate, response)

// Reset ElectionTimeout
n.resetElectionTimeout()
if grant {
n.resetElectionTimeout()
}

return stepDown
}
Expand Down Expand Up @@ -644,19 +643,28 @@ func (n *Node) switchToLeader() {
n.mu.Lock()
defer n.mu.Unlock()
n.leader = n.id
// These need to be (re)initialized after an election.
n.nextIndex = make([]uint64, n.info.Size)
n.matchIndex = make([]uint64, n.info.Size)
// Set the nextIndex to lastLogIndex+1
for i := 0; i < len(n.nextIndex); i++ {
n.nextIndex[i] = n.lastLogIndex + 1
}
n.switchState(LEADER)
}

// Switch to a CANDIDATE.
func (n *Node) switchToCandidate() {
n.mu.Lock()
defer n.mu.Unlock()
// Increment the term.
n.term++
// Clear current Leader.
n.leader = NO_LEADER
n.resetElectionTimeout()
// Clear the vote.
n.vote = NO_VOTE
n.switchState(CANDIDATE)
n.mu.Unlock()
n.resetElectionTimeout()
}

// postStateChange invokes handler.StateChange() in a go routine.
Expand Down Expand Up @@ -695,7 +703,9 @@ func (n *Node) switchState(state State) {

// Reset the election timeout with a random value.
func (n *Node) resetElectionTimeout() {
n.mu.Lock()
n.electTimer.Reset(randElectionTimeout())
n.mu.Unlock()
}

// Generate a random timeout between MIN and MAX Election timeouts.
Expand Down
Loading

0 comments on commit 34e7292

Please sign in to comment.