Skip to content

Commit

Permalink
Merge 23bf473 into 4629e3c
Browse files Browse the repository at this point in the history
  • Loading branch information
tylertreat committed Jul 7, 2017
2 parents 4629e3c + 23bf473 commit f2ec2d2
Show file tree
Hide file tree
Showing 7 changed files with 321 additions and 126 deletions.
11 changes: 7 additions & 4 deletions chan_handler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

package graft

// ChanHandler is a convenience handler when a user wants to simply use
// channels for the async handling of errors and state changes.
type ChanHandler struct {
LogPositionHandler

// Chan to receive state changes.
stateChangeChan chan<- StateChange
// Chan to receive errors.
Expand All @@ -20,10 +22,11 @@ type StateChange struct {
To State
}

func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
func NewChanHandler(logHandler LogPositionHandler, scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
return &ChanHandler{
stateChangeChan: scCh,
errorChan: errCh,
LogPositionHandler: logHandler,
stateChangeChan: scCh,
errorChan: errCh,
}
}

Expand Down
26 changes: 22 additions & 4 deletions handler_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

package graft

import (
"encoding/binary"
"fmt"
"os"
"strconv"
Expand All @@ -12,6 +13,20 @@ import (
"github.com/nats-io/graft/pb"
)

type logPositionHandler struct {
logIndex uint32
}

func (l *logPositionHandler) CurrentLogPosition() []byte {
buf := make([]byte, 4)
binary.BigEndian.PutUint32(buf, l.logIndex)
return buf
}

func (l *logPositionHandler) GrantVote(position []byte) bool {
return binary.BigEndian.Uint32(position) >= l.logIndex
}

// Dumb wait program to sync on callbacks, etc... Will timeout
func wait(t *testing.T, ch chan StateChange) *StateChange {
select {
Expand Down Expand Up @@ -43,7 +58,8 @@ func TestStateChangeHandler(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down Expand Up @@ -78,7 +94,8 @@ func TestErrorHandler(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down Expand Up @@ -111,7 +128,8 @@ func TestChandHandlerNotBlockingNode(t *testing.T) {
// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)
lpHandler := &logPositionHandler{}
chHand := NewChanHandler(lpHandler, scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
Expand Down
53 changes: 33 additions & 20 deletions node.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

//go:generate protoc -I=. -I=$GOPATH/src --gofast_out=. ./pb/protocol.proto

// Graft is a RAFT implementation.
// Currently only the election functionality is supported.
Expand Down Expand Up @@ -78,8 +80,24 @@ type ClusterInfo struct {
Size int
}

// LogPositionHandler is used to interrogate the state of the log.
type LogPositionHandler interface {
// CurrentLogPosition returns an opaque byte slice that represents the
// current position in the node's log.
CurrentLogPosition() []byte

// GrantVote is called when a candidate peer has requested a vote. The
// peer's log position is passed as an opaque byte slice as returned by
// CurrentLogPosition. The returned bool determines if the vote should be
// granted because the candidate's log is at least as up-to-date as the
// receiver's log.
GrantVote(position []byte) bool
}

// A Handler can process async callbacks from a Graft node.
type Handler interface {
LogPositionHandler

// Process async errors that are encountered by the node.
AsyncError(error)

Expand Down Expand Up @@ -247,8 +265,9 @@ func (n *Node) runAsCandidate() {

// Initiate an Election
vreq := &pb.VoteRequest{
Term: n.term,
Candidate: n.id,
Term: n.term,
Candidate: n.id,
LogPosition: n.handler.CurrentLogPosition(),
}
// Collect the votes.
// We will vote for ourselves, so start at 1.
Expand Down Expand Up @@ -439,15 +458,12 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {

deny := &pb.VoteResponse{Term: n.term, Granted: false}

// Old term, reject
if vreq.Term < n.term {
// Old term or candidate's log is behind, reject
if vreq.Term < n.term || !n.handler.GrantVote(vreq.LogPosition) {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return false
}

// Save state flag
saveState := false

// This will trigger a return from the current runAs loop.
stepDown := false

Expand All @@ -457,7 +473,6 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {
n.vote = NO_VOTE
n.leader = NO_LEADER
stepDown = true
saveState = true
}

// If we are the Leader, deny request unless we have seen
Expand All @@ -477,17 +492,15 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {

n.setVote(vreq.Candidate)

// Write our state if needed.
if saveState {
if err := n.writeState(); err != nil {
// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
}
// Write our state.
if err := n.writeState(); err != nil {
// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
}

// Send our acceptance.
Expand Down
Loading

0 comments on commit f2ec2d2

Please sign in to comment.