Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add external state support #18

Merged
merged 4 commits into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions chan_handler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

package graft

// ChanHandler is a convenience handler when a user wants to simply use
// channels for the async handling of errors and state changes.
type ChanHandler struct {
StateMachineHandler

// Chan to receive state changes.
stateChangeChan chan<- StateChange
// Chan to receive errors.
Expand All @@ -20,10 +22,24 @@ type StateChange struct {
To State
}

// NewChanHandler returns a Handler implementation which uses channels for
// handling errors and state changes.
func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler {
return NewChanHandlerWithStateMachine(new(defaultStateMachineHandler), scCh, errCh)
}

// NewChanHandlerWithStateMachine returns a Handler implementation which uses
// channels for handling errors and state changes and a StateMachineHandler for
// hooking into external state. The external state machine influences leader
// election votes.
func NewChanHandlerWithStateMachine(
stateHandler StateMachineHandler,
scCh chan<- StateChange,
errCh chan<- error) *ChanHandler {
return &ChanHandler{
stateChangeChan: scCh,
errorChan: errCh,
StateMachineHandler: stateHandler,
stateChangeChan: scCh,
errorChan: errCh,
}
}

Expand Down
2 changes: 1 addition & 1 deletion handler_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

package graft

Expand Down
67 changes: 47 additions & 20 deletions node.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright 2013-2016 Apcera Inc. All rights reserved.
// Copyright 2013-2017 Apcera Inc. All rights reserved.

//go:generate protoc -I=. -I=$GOPATH/src --gofast_out=. ./pb/protocol.proto

// Graft is a RAFT implementation.
// Currently only the election functionality is supported.
Expand Down Expand Up @@ -78,8 +80,38 @@ type ClusterInfo struct {
Size int
}

// StateMachineHandler is used to interrogate an external state machine.
type StateMachineHandler interface {
// CurrentState returns an opaque byte slice that represents the current
// state of the state machine.
CurrentState() []byte

// GrantVote is called when a candidate peer has requested a vote. The
// peer's state machine position is passed as an opaque byte slice as
// returned by CurrentState. The returned bool determines if the vote
// should be granted because the candidate's state machine is at least as
// up-to-date as the receiver's state machine.
GrantVote(position []byte) bool
}

// defaultStateMachineHandler implements the StateMachineHandler interface by
// always granting a vote.
type defaultStateMachineHandler struct{}

// CurrentState returns nil for default behavior.
func (d *defaultStateMachineHandler) CurrentState() []byte {
return nil
}

// GrantVote always returns true for default behavior.
func (d *defaultStateMachineHandler) GrantVote(position []byte) bool {
return true
}

// A Handler can process async callbacks from a Graft node.
type Handler interface {
StateMachineHandler

// Process async errors that are encountered by the node.
AsyncError(error)

Expand Down Expand Up @@ -247,8 +279,9 @@ func (n *Node) runAsCandidate() {

// Initiate an Election
vreq := &pb.VoteRequest{
Term: n.term,
Candidate: n.id,
Term: n.term,
Candidate: n.id,
CurrentState: n.handler.CurrentState(),
}
// Collect the votes.
// We will vote for ourselves, so start at 1.
Expand Down Expand Up @@ -439,15 +472,12 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {

deny := &pb.VoteResponse{Term: n.term, Granted: false}

// Old term, reject
if vreq.Term < n.term {
// Old term or candidate's log is behind, reject
if vreq.Term < n.term || !n.handler.GrantVote(vreq.CurrentState) {
n.rpc.SendVoteResponse(vreq.Candidate, deny)
return false
}

// Save state flag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the test above is vreq.Term < n.term and saveState was previously set to true if vreq.Term > n.term, which means that if they were equal, we would not save the state. You are changing that. Is it intentional?

Copy link
Contributor Author

@tylertreat tylertreat Jul 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we send a vote response, the node's state changes. Before, this only happened if the term was higher, but now it also happens if term is equal but "log" is higher. My test was failing because the node state wasn't being persisted in this case. I guess we can still avoid saving if term is equal and the "log" is equal?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure. The point was to make sure that the change was intentional, which I understand it is.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If term is equal and log is equal, we can avoid saving, but IMO just keep it simple and safe. How often are we expecting this to happen?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ColinSullivan1 I agree, I don't think the optimization is worth the added complexity.

saveState := false

// This will trigger a return from the current runAs loop.
stepDown := false

Expand All @@ -457,7 +487,6 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {
n.vote = NO_VOTE
n.leader = NO_LEADER
stepDown = true
saveState = true
}

// If we are the Leader, deny request unless we have seen
Expand All @@ -477,17 +506,15 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool {

n.setVote(vreq.Candidate)

// Write our state if needed.
if saveState {
if err := n.writeState(); err != nil {
// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
}
// Write our state.
if err := n.writeState(); err != nil {
Copy link
Member

@ColinSullivan1 ColinSullivan1 Jul 12, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know a lot has gone into this. I wonder if writing the state should be the responsibility of the state machine interface implementor. wdyt?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is writing state needed for leadership election, so seems like it should be graft's responsibility?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do see your point, but was thinking that users may want a way to persist state themselves.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could provide a hook that calls out to user code (via the StateMachineHandler) to signal them to persist. I think that could be added later though.

// We have failed to update our state. Process the error
// and deny the vote.
n.handleError(err)
n.setVote(NO_VOTE)
n.rpc.SendVoteResponse(vreq.Candidate, deny)
n.resetElectionTimeout()
return true
}

// Send our acceptance.
Expand Down
Loading