Skip to content

Commit

Permalink
Rename LogPositionHandler to StateMachineHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
tylertreat committed Jul 10, 2017
1 parent 23bf473 commit 6692c9d
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 73 deletions.
23 changes: 18 additions & 5 deletions chan_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package graft
// ChanHandler is a convenience handler when a user wants to simply use
// channels for the async handling of errors and state changes.
type ChanHandler struct {
LogPositionHandler
StateMachineHandler

// Chan to receive state changes.
stateChangeChan chan<- StateChange
Expand All @@ -22,11 +22,24 @@ type StateChange struct {
To State
}

func NewChanHandler(logHandler LogPositionHandler, scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
// NewChanHandler returns a Handler implementation which uses channels for
// handling errors and state changes.
func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
return NewChanHandlerWithStateMachine(new(defaultStateMachineHandler), scCh, errCh)
}

// NewChanHandlerWithStateMachine returns a Handler implementation which uses
// channels for handling errors and state changes and a StateMachineHandler for
// hooking into external state. The external state machine influences leader
// election votes.
func NewChanHandlerWithStateMachine(
stateHandler StateMachineHandler,
scCh chan<- StateChange,
errCh chan<- error) *ChanHandler {
return &ChanHandler{
LogPositionHandler: logHandler,
stateChangeChan: scCh,
errorChan: errCh,
StateMachineHandler: stateHandler,
stateChangeChan: scCh,
errorChan: errCh,
}
}

Expand Down
24 changes: 3 additions & 21 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package graft

import (
"encoding/binary"
"fmt"
"os"
"strconv"
Expand All @@ -13,20 +12,6 @@ import (
"github.com/nats-io/graft/pb"
)

type logPositionHandler struct {
logIndex uint32
}

func (l *logPositionHandler) CurrentLogPosition() []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, l.logIndex)
return buf
}

func (l *logPositionHandler) GrantVote(position []byte) bool {
return binary.BigEndian.Uint32(position) >= l.logIndex
}

// Dumb wait program to sync on callbacks, etc... Will timeout
func wait(t *testing.T, ch chan StateChange) *StateChange {
select {
Expand Down Expand Up @@ -58,8 +43,7 @@ func TestStateChangeHandler(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)
chHand := NewChanHandler(scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down Expand Up @@ -94,8 +78,7 @@ func TestErrorHandler(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)
chHand := NewChanHandler(scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down Expand Up @@ -128,8 +111,7 @@ func TestChandHandlerNotBlockingNode(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)
chHand := NewChanHandler(scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down
42 changes: 28 additions & 14 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,23 +80,37 @@ type ClusterInfo struct {
Size int
}

// LogPositionHandler is used to interrogate the state of the log.
type LogPositionHandler interface {
// CurrentLogPosition returns an opaque byte slice that represents the
// current position in the node's log.
CurrentLogPosition() []byte
// StateMachineHandler is used to interrogate an external state machine.
type StateMachineHandler interface {
// CurrentState returns an opaque byte slice that represents the current
// state of the state machine.
CurrentState() []byte

// GrantVote is called when a candidate peer has requested a vote. The
// peer's log position is passed as an opaque byte slice as returned by
// CurrentLogPosition. The returned bool determines if the vote should be
// granted because the candidate's log is at least as up-to-date as the
// receiver's log.
// peer's state machine position is passed as an opaque byte slice as
// returned by CurrentState. The returned bool determines if the vote
// should be granted because the candidate's state machine is at least as
// up-to-date as the receiver's state machine.
GrantVote(position []byte) bool
}

// defaultStateMachineHandler implements the StateMachineHandler interface by
// always granting a vote.
type defaultStateMachineHandler struct{}

// CurrentState returns nil for default behavior.
func (d *defaultStateMachineHandler) CurrentState() []byte {
return nil
}

// GrantVote always returns true for default behavior.
func (d *defaultStateMachineHandler) GrantVote(position []byte) bool {
return true
}

// A Handler can process async callbacks from a Graft node.
type Handler interface {
LogPositionHandler
StateMachineHandler

// Process async errors that are encountered by the node.
AsyncError(error)
Expand Down Expand Up @@ -265,9 +279,9 @@ func (n *Node) runAsCandidate() {

// Initiate an Election
vreq := &pb.VoteRequest{
Term: n.term,
Candidate: n.id,
LogPosition: n.handler.CurrentLogPosition(),
Term: n.term,
Candidate: n.id,
CurrentState: n.handler.CurrentState(),
}
// Collect the votes.
// We will vote for ourselves, so start at 1.
Expand Down Expand Up @@ -459,7 +473,7 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {
deny := &pb.VoteResponse{Term: n.term, Granted: false}

// Old term or candidate's log is behind, reject
if vreq.Term < n.term || !n.handler.GrantVote(vreq.LogPosition) {
if vreq.Term < n.term || !n.handler.GrantVote(vreq.CurrentState) {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return false
}
Expand Down
42 changes: 21 additions & 21 deletions pb/protocol.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions pb/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ option (gogoproto.goproto_getters_all) = false;

// VoteRequest
message VoteRequest {
uint64 Term = 1; // Term for the candidate.
string Candidate = 2; // The candidate for the election.
bytes LogPosition = 3; // Candidate's opaque position in the log.
uint64 Term = 1; // Term for the candidate.
string Candidate = 2; // The candidate for the election.
bytes CurrentState = 3; // Candidate's opaque position in the state machine.
}

// VoteResponse
Expand Down
8 changes: 4 additions & 4 deletions test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ const (
type dummyHandler struct {
}

func (*dummyHandler) AsyncError(err error) {}
func (*dummyHandler) StateChange(from, to State) {}
func (*dummyHandler) CurrentLogPosition() []byte { return nil }
func (*dummyHandler) GrantVote(position []byte) bool { return true }
func (*dummyHandler) AsyncError(err error) {}
func (*dummyHandler) StateChange(from, to State) {}
func (*dummyHandler) CurrentState() []byte { return nil }
func (*dummyHandler) GrantVote(state []byte) bool { return true }

func stackFatalf(t *testing.T, f string, args ...interface{}) {
lines := make([]string, 0, 32)
Expand Down
24 changes: 19 additions & 5 deletions vote_req_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,20 @@ import (

// Test VoteRequests RPC in different states.

type stateMachineHandler struct {
logIndex uint32
}

func (s *stateMachineHandler) CurrentState() []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, s.logIndex)
return buf
}

func (s *stateMachineHandler) GrantVote(position []byte) bool {
return binary.BigEndian.Uint32(position) >= s.logIndex
}

func vreqNode(t *testing.T, expected int) *Node {
ci := ClusterInfo{Name: "vreq", Size: expected}
hand, rpc, log := genNodeArgs(t)
Expand Down Expand Up @@ -121,10 +135,10 @@ func TestVoteRequestAsFollower(t *testing.T) {
func TestVoteRequestAsFollowerLogBehind(t *testing.T) {
ci := ClusterInfo{Name: "vreq", Size: 3}
_, rpc, log := genNodeArgs(t)
lpHandler := &logPositionHandler{}
stateHandler := new(stateMachineHandler)
scCh := make(chan StateChange)
errCh := make(chan error)
handler := NewChanHandler(lpHandler, scCh, errCh)
handler := NewChanHandlerWithStateMachine(stateHandler, scCh, errCh)
node, err := New(ci, handler, rpc, log)
if err != nil {
t.Fatalf("Expected no error, got %v", err)
Expand All @@ -134,7 +148,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) {
// Set log position to artificial higher value.
newPosition := uint32(8)
term := uint64(1)
lpHandler.logIndex = newPosition
stateHandler.logIndex = newPosition
node.setTerm(term)

// Force write of state
Expand All @@ -150,7 +164,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) {
// a VoteRequest with a log that is behind should be ignored
pos := make([]byte, 4)
binary.BigEndian.PutUint32(pos, 1)
node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos}
node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, CurrentState: pos}
vresp := <-fake.VoteResponses
if vresp.Term != term {
t.Fatalf("Expected the VoteResponse to have term=%d, got %d\n",
Expand All @@ -176,7 +190,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) {

// a VoteRequest with a log that is ahead should reset follower
binary.BigEndian.PutUint32(pos, newPosition+1)
node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos}
node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, CurrentState: pos}
vresp = <-fake.VoteResponses
if vresp.Term != term {
t.Fatalf("Expected the VoteResponse to have term=%d, got %d\n",
Expand Down

0 comments on commit 6692c9d

Please sign in to comment.