Skip to content

Commit

Permalink
Add external state support
Browse files Browse the repository at this point in the history
Add support for external state (log) to influence leader voting. This,
in effect, implements lastLogIndex and lastLogTerm sent on RequestVote
RPCs from the Raft paper. This works by exposing two callbacks: one that
calls into the user on RequestVote to get the candidate's state and one
that calls into the user upon receiving a RequestVote to determine if a
vote should be granted based on comparing the logs.

From Raft:
1. Reply false if term < currentTerm (§5.1)
2. If votedFor is null or candidateId, and candidate’s log is at
least as up-to-date as receiver’s log, grant vote (§5.2, §5.4)
  • Loading branch information
tylertreat committed Jul 7, 2017
1 parent 4629e3c commit 191f6be
Show file tree
Hide file tree
Showing 7 changed files with 319 additions and 124 deletions.
9 changes: 6 additions & 3 deletions chan_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package graft
// ChanHandler is a convenience handler when a user wants to simply use
// channels for the async handling of errors and state changes.
type ChanHandler struct {
LogPositionHandler

// Chan to receive state changes.
stateChangeChan chan<- StateChange
// Chan to receive errors.
Expand All @@ -20,10 +22,11 @@ type StateChange struct {
To State
}

func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
func NewChanHandler(logHandler LogPositionHandler, scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
return &ChanHandler{
stateChangeChan: scCh,
errorChan: errCh,
LogPositionHandler: logHandler,
stateChangeChan: scCh,
errorChan: errCh,
}
}

Expand Down
26 changes: 22 additions & 4 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

package graft

import (
"encoding/binary"
"fmt"
"os"
"strconv"
Expand All @@ -12,6 +13,20 @@ import (
"github.com/nats-io/graft/pb"
)

type logPositionHandler struct {
logIndex uint32
}

func (l *logPositionHandler) CurrentLogPosition() []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, l.logIndex)
return buf
}

func (l *logPositionHandler) GrantVote(position []byte) bool {
return binary.BigEndian.Uint32(position) >= l.logIndex
}

// Dumb wait program to sync on callbacks, etc... Will timeout
func wait(t *testing.T, ch chan StateChange) *StateChange {
select {
Expand Down Expand Up @@ -43,7 +58,8 @@ func TestStateChangeHandler(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down Expand Up @@ -78,7 +94,8 @@ func TestErrorHandler(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down Expand Up @@ -111,7 +128,8 @@ func TestChandHandlerNotBlockingNode(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down
53 changes: 33 additions & 20 deletions node.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

//go:generate protoc -I=. -I=$GOPATH/src --gofast_out=. ./pb/protocol.proto

// Graft is a RAFT implementation.
// Currently only the election functionality is supported.
Expand Down Expand Up @@ -78,8 +80,24 @@ type ClusterInfo struct {
Size int
}

// LogPositionHandler is used to interrogate the state of the log.
type LogPositionHandler interface {
// CurrentLogPosition returns an opaque byte slice that represents the
// current position in the node's log.
CurrentLogPosition() []byte

// GrantVote is called when a candidate peer has requested a vote. The
// peer's log position is passed as an opaque byte slice as returned by
// CurrentLogPosition. The returned bool determines if the vote should be
// granted because the candidate's log is at least as up-to-date as the
// receiver's log.
GrantVote(position []byte) bool
}

// A Handler can process async callbacks from a Graft node.
type Handler interface {
LogPositionHandler

// Process async errors that are encountered by the node.
AsyncError(error)

Expand Down Expand Up @@ -247,8 +265,9 @@ func (n *Node) runAsCandidate() {

// Initiate an Election
vreq := &pb.VoteRequest{
Term: n.term,
Candidate: n.id,
Term: n.term,
Candidate: n.id,
LogPosition: n.handler.CurrentLogPosition(),
}
// Collect the votes.
// We will vote for ourselves, so start at 1.
Expand Down Expand Up @@ -439,15 +458,12 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {

deny := &pb.VoteResponse{Term: n.term, Granted: false}

// Old term, reject
if vreq.Term < n.term {
// Old term or candidate's log is behind, reject
if vreq.Term < n.term || !n.handler.GrantVote(vreq.LogPosition) {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return false
}

// Save state flag
saveState := false

// This will trigger a return from the current runAs loop.
stepDown := false

Expand All @@ -457,7 +473,6 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {
n.vote = NO_VOTE
n.leader = NO_LEADER
stepDown = true
saveState = true
}

// If we are the Leader, deny request unless we have seen
Expand All @@ -477,17 +492,15 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {

n.setVote(vreq.Candidate)

// Write our state if needed.
if saveState {
if err := n.writeState(); err != nil {
// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
}
// Write our state.
if err := n.writeState(); err != nil {
// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
}

// Send our acceptance.
Expand Down
Loading

0 comments on commit 191f6be

Please sign in to comment.