From 191f6bef5ad4f2c22f61abe57652022d0fc88c91 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Fri, 7 Jul 2017 16:33:54 -0500 Subject: [PATCH 1/4] Add external state support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add support for external state (log) to influence leader voting. This, in effect, implements lastLogIndex and lastLogTerm sent on RequestVote RPCs from the Raft paper. This works by exposing two callbacks: one that calls into the user on RequestVote to get the candidate's state and one that calls into the user upon receiving a RequestVote to determine if a vote should be granted based on comparing the logs. From Raft: 1. Reply false if term < currentTerm (§5.1) 2. If votedFor is null or candidateId, and candidate’s log is at least as up-to-date as receiver’s log, grant vote (§5.2, §5.4) --- chan_handler.go | 9 +- handler_test.go | 26 ++++- node.go | 53 ++++++---- pb/protocol.pb.go | 251 +++++++++++++++++++++++++++++----------------- pb/protocol.proto | 9 +- test_helper.go | 6 +- vote_req_test.go | 89 +++++++++++++++- 7 files changed, 319 insertions(+), 124 deletions(-) diff --git a/chan_handler.go b/chan_handler.go index 31c22b6..5de9cfb 100644 --- a/chan_handler.go +++ b/chan_handler.go @@ -5,6 +5,8 @@ package graft // ChanHandler is a convenience handler when a user wants to simply use // channels for the async handling of errors and state changes. type ChanHandler struct { + LogPositionHandler + // Chan to receive state changes. stateChangeChan chan<- StateChange // Chan to receive errors. @@ -20,10 +22,11 @@ type StateChange struct { To State } -func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler { +func NewChanHandler(logHandler LogPositionHandler, scCh chan<- StateChange, errCh chan<- error) *ChanHandler { return &ChanHandler{ - stateChangeChan: scCh, - errorChan: errCh, + LogPositionHandler: logHandler, + stateChangeChan: scCh, + errorChan: errCh, } } diff --git a/handler_test.go b/handler_test.go index 2879c4f..cea8165 100644 --- a/handler_test.go +++ b/handler_test.go @@ -1,8 +1,9 @@ -// Copyright 2013-2016 Apcera Inc. All rights reserved. +// Copyright 2013-2017 Apcera Inc. All rights reserved. package graft import ( + "encoding/binary" "fmt" "os" "strconv" @@ -12,6 +13,20 @@ import ( "github.com/nats-io/graft/pb" ) +type logPositionHandler struct { + logIndex uint32 +} + +func (l *logPositionHandler) CurrentLogPosition() []byte { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, l.logIndex) + return buf +} + +func (l *logPositionHandler) GrantVote(position []byte) bool { + return binary.BigEndian.Uint32(position) >= l.logIndex +} + // Dumb wait program to sync on callbacks, etc... Will timeout func wait(t *testing.T, ch chan StateChange) *StateChange { select { @@ -43,7 +58,8 @@ func TestStateChangeHandler(t *testing.T) { // Use ChanHandler scCh := make(chan StateChange) errCh := make(chan error) - chHand := NewChanHandler(scCh, errCh) + lpHandler := &logPositionHandler{} + chHand := NewChanHandler(lpHandler, scCh, errCh) node, err := New(ci, chHand, rpc, log) if err != nil { @@ -78,7 +94,8 @@ func TestErrorHandler(t *testing.T) { // Use ChanHandler scCh := make(chan StateChange) errCh := make(chan error) - chHand := NewChanHandler(scCh, errCh) + lpHandler := &logPositionHandler{} + chHand := NewChanHandler(lpHandler, scCh, errCh) node, err := New(ci, chHand, rpc, log) if err != nil { @@ -111,7 +128,8 @@ func TestChandHandlerNotBlockingNode(t *testing.T) { // Use ChanHandler scCh := make(chan StateChange) errCh := make(chan error) - chHand := NewChanHandler(scCh, errCh) + lpHandler := &logPositionHandler{} + chHand := NewChanHandler(lpHandler, scCh, errCh) node, err := New(ci, chHand, rpc, log) if err != nil { diff --git a/node.go b/node.go index 45417f5..b2e0beb 100644 --- a/node.go +++ b/node.go @@ -1,4 +1,6 @@ -// Copyright 2013-2016 Apcera Inc. All rights reserved. +// Copyright 2013-2017 Apcera Inc. All rights reserved. + +//go:generate protoc -I=. -I=$GOPATH/src --gofast_out=. ./pb/protocol.proto // Graft is a RAFT implementation. // Currently only the election functionality is supported. @@ -78,8 +80,24 @@ type ClusterInfo struct { Size int } +// LogPositionHandler is used to interrogate the state of the log. +type LogPositionHandler interface { + // CurrentLogPosition returns an opaque byte slice that represents the + // current position in the node's log. + CurrentLogPosition() []byte + + // GrantVote is called when a candidate peer has requested a vote. The + // peer's log position is passed as an opaque byte slice as returned by + // CurrentLogPosition. The returned bool determines if the vote should be + // granted because the candidate's log is at least as up-to-date as the + // receiver's log. + GrantVote(position []byte) bool +} + // A Handler can process async callbacks from a Graft node. type Handler interface { + LogPositionHandler + // Process async errors that are encountered by the node. AsyncError(error) @@ -247,8 +265,9 @@ func (n *Node) runAsCandidate() { // Initiate an Election vreq := &pb.VoteRequest{ - Term: n.term, - Candidate: n.id, + Term: n.term, + Candidate: n.id, + LogPosition: n.handler.CurrentLogPosition(), } // Collect the votes. // We will vote for ourselves, so start at 1. @@ -439,15 +458,12 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool { deny := &pb.VoteResponse{Term: n.term, Granted: false} - // Old term, reject - if vreq.Term < n.term { + // Old term or candidate's log is behind, reject + if vreq.Term < n.term || !n.handler.GrantVote(vreq.LogPosition) { n.rpc.SendVoteResponse(vreq.Candidate, deny) return false } - // Save state flag - saveState := false - // This will trigger a return from the current runAs loop. stepDown := false @@ -457,7 +473,6 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool { n.vote = NO_VOTE n.leader = NO_LEADER stepDown = true - saveState = true } // If we are the Leader, deny request unless we have seen @@ -477,17 +492,15 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool { n.setVote(vreq.Candidate) - // Write our state if needed. - if saveState { - if err := n.writeState(); err != nil { - // We have failed to update our state. Process the error - // and deny the vote. - n.handleError(err) - n.setVote(NO_VOTE) - n.rpc.SendVoteResponse(vreq.Candidate, deny) - n.resetElectionTimeout() - return true - } + // Write our state. + if err := n.writeState(); err != nil { + // We have failed to update our state. Process the error + // and deny the vote. + n.handleError(err) + n.setVote(NO_VOTE) + n.rpc.SendVoteResponse(vreq.Candidate, deny) + n.resetElectionTimeout() + return true } // Send our acceptance. diff --git a/pb/protocol.pb.go b/pb/protocol.pb.go index cd94ccc..8a415ba 100644 --- a/pb/protocol.pb.go +++ b/pb/protocol.pb.go @@ -1,12 +1,11 @@ -// Code generated by protoc-gen-gogo. -// source: protocol.proto -// DO NOT EDIT! +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: pb/protocol.proto /* Package pb is a generated protocol buffer package. It is generated from these files: - protocol.proto + pb/protocol.proto It has these top-level messages: VoteRequest @@ -15,7 +14,7 @@ */ package pb -import proto "github.com/gogo/protobuf/proto" +import proto "github.com/golang/protobuf/proto" import fmt "fmt" import math "math" import _ "github.com/gogo/protobuf/gogoproto" @@ -27,15 +26,23 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + // VoteRequest type VoteRequest struct { - Term uint64 `protobuf:"varint,1,opt,name=Term,proto3" json:"Term,omitempty"` - Candidate string `protobuf:"bytes,2,opt,name=Candidate,proto3" json:"Candidate,omitempty"` + Term uint64 `protobuf:"varint,1,opt,name=Term,proto3" json:"Term,omitempty"` + Candidate string `protobuf:"bytes,2,opt,name=Candidate,proto3" json:"Candidate,omitempty"` + LogPosition []byte `protobuf:"bytes,3,opt,name=LogPosition,proto3" json:"LogPosition,omitempty"` } -func (m *VoteRequest) Reset() { *m = VoteRequest{} } -func (m *VoteRequest) String() string { return proto.CompactTextString(m) } -func (*VoteRequest) ProtoMessage() {} +func (m *VoteRequest) Reset() { *m = VoteRequest{} } +func (m *VoteRequest) String() string { return proto.CompactTextString(m) } +func (*VoteRequest) ProtoMessage() {} +func (*VoteRequest) Descriptor() ([]byte, []int) { return fileDescriptorProtocol, []int{0} } // VoteResponse type VoteResponse struct { @@ -43,9 +50,10 @@ type VoteResponse struct { Granted bool `protobuf:"varint,2,opt,name=Granted,proto3" json:"Granted,omitempty"` } -func (m *VoteResponse) Reset() { *m = VoteResponse{} } -func (m *VoteResponse) String() string { return proto.CompactTextString(m) } -func (*VoteResponse) ProtoMessage() {} +func (m *VoteResponse) Reset() { *m = VoteResponse{} } +func (m *VoteResponse) String() string { return proto.CompactTextString(m) } +func (*VoteResponse) ProtoMessage() {} +func (*VoteResponse) Descriptor() ([]byte, []int) { return fileDescriptorProtocol, []int{1} } // Heartbeat type Heartbeat struct { @@ -53,131 +61,138 @@ type Heartbeat struct { Leader string `protobuf:"bytes,2,opt,name=Leader,proto3" json:"Leader,omitempty"` } -func (m *Heartbeat) Reset() { *m = Heartbeat{} } -func (m *Heartbeat) String() string { return proto.CompactTextString(m) } -func (*Heartbeat) ProtoMessage() {} +func (m *Heartbeat) Reset() { *m = Heartbeat{} } +func (m *Heartbeat) String() string { return proto.CompactTextString(m) } +func (*Heartbeat) ProtoMessage() {} +func (*Heartbeat) Descriptor() ([]byte, []int) { return fileDescriptorProtocol, []int{2} } func init() { proto.RegisterType((*VoteRequest)(nil), "pb.VoteRequest") proto.RegisterType((*VoteResponse)(nil), "pb.VoteResponse") proto.RegisterType((*Heartbeat)(nil), "pb.Heartbeat") } -func (m *VoteRequest) Marshal() (data []byte, err error) { +func (m *VoteRequest) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *VoteRequest) MarshalTo(data []byte) (int, error) { +func (m *VoteRequest) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Term != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintProtocol(data, i, uint64(m.Term)) + i = encodeVarintProtocol(dAtA, i, uint64(m.Term)) } if len(m.Candidate) > 0 { - data[i] = 0x12 + dAtA[i] = 0x12 + i++ + i = encodeVarintProtocol(dAtA, i, uint64(len(m.Candidate))) + i += copy(dAtA[i:], m.Candidate) + } + if len(m.LogPosition) > 0 { + dAtA[i] = 0x1a i++ - i = encodeVarintProtocol(data, i, uint64(len(m.Candidate))) - i += copy(data[i:], m.Candidate) + i = encodeVarintProtocol(dAtA, i, uint64(len(m.LogPosition))) + i += copy(dAtA[i:], m.LogPosition) } return i, nil } -func (m *VoteResponse) Marshal() (data []byte, err error) { +func (m *VoteResponse) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *VoteResponse) MarshalTo(data []byte) (int, error) { +func (m *VoteResponse) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Term != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintProtocol(data, i, uint64(m.Term)) + i = encodeVarintProtocol(dAtA, i, uint64(m.Term)) } if m.Granted { - data[i] = 0x10 + dAtA[i] = 0x10 i++ if m.Granted { - data[i] = 1 + dAtA[i] = 1 } else { - data[i] = 0 + dAtA[i] = 0 } i++ } return i, nil } -func (m *Heartbeat) Marshal() (data []byte, err error) { +func (m *Heartbeat) Marshal() (dAtA []byte, err error) { size := m.Size() - data = make([]byte, size) - n, err := m.MarshalTo(data) + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) if err != nil { return nil, err } - return data[:n], nil + return dAtA[:n], nil } -func (m *Heartbeat) MarshalTo(data []byte) (int, error) { +func (m *Heartbeat) MarshalTo(dAtA []byte) (int, error) { var i int _ = i var l int _ = l if m.Term != 0 { - data[i] = 0x8 + dAtA[i] = 0x8 i++ - i = encodeVarintProtocol(data, i, uint64(m.Term)) + i = encodeVarintProtocol(dAtA, i, uint64(m.Term)) } if len(m.Leader) > 0 { - data[i] = 0x12 + dAtA[i] = 0x12 i++ - i = encodeVarintProtocol(data, i, uint64(len(m.Leader))) - i += copy(data[i:], m.Leader) + i = encodeVarintProtocol(dAtA, i, uint64(len(m.Leader))) + i += copy(dAtA[i:], m.Leader) } return i, nil } -func encodeFixed64Protocol(data []byte, offset int, v uint64) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) - data[offset+4] = uint8(v >> 32) - data[offset+5] = uint8(v >> 40) - data[offset+6] = uint8(v >> 48) - data[offset+7] = uint8(v >> 56) +func encodeFixed64Protocol(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) return offset + 8 } -func encodeFixed32Protocol(data []byte, offset int, v uint32) int { - data[offset] = uint8(v) - data[offset+1] = uint8(v >> 8) - data[offset+2] = uint8(v >> 16) - data[offset+3] = uint8(v >> 24) +func encodeFixed32Protocol(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) return offset + 4 } -func encodeVarintProtocol(data []byte, offset int, v uint64) int { +func encodeVarintProtocol(dAtA []byte, offset int, v uint64) int { for v >= 1<<7 { - data[offset] = uint8(v&0x7f | 0x80) + dAtA[offset] = uint8(v&0x7f | 0x80) v >>= 7 offset++ } - data[offset] = uint8(v) + dAtA[offset] = uint8(v) return offset + 1 } func (m *VoteRequest) Size() (n int) { @@ -190,6 +205,10 @@ func (m *VoteRequest) Size() (n int) { if l > 0 { n += 1 + l + sovProtocol(uint64(l)) } + l = len(m.LogPosition) + if l > 0 { + n += 1 + l + sovProtocol(uint64(l)) + } return n } @@ -231,8 +250,8 @@ func sovProtocol(x uint64) (n int) { func sozProtocol(x uint64) (n int) { return sovProtocol(uint64((x << 1) ^ uint64((int64(x) >> 63)))) } -func (m *VoteRequest) Unmarshal(data []byte) error { - l := len(data) +func (m *VoteRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -244,7 +263,7 @@ func (m *VoteRequest) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -272,7 +291,7 @@ func (m *VoteRequest) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Term |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -291,7 +310,7 @@ func (m *VoteRequest) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -306,11 +325,42 @@ func (m *VoteRequest) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Candidate = string(data[iNdEx:postIndex]) + m.Candidate = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LogPosition", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowProtocol + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthProtocol + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.LogPosition = append(m.LogPosition[:0], dAtA[iNdEx:postIndex]...) + if m.LogPosition == nil { + m.LogPosition = []byte{} + } iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipProtocol(data[iNdEx:]) + skippy, err := skipProtocol(dAtA[iNdEx:]) if err != nil { return err } @@ -329,8 +379,8 @@ func (m *VoteRequest) Unmarshal(data []byte) error { } return nil } -func (m *VoteResponse) Unmarshal(data []byte) error { - l := len(data) +func (m *VoteResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -342,7 +392,7 @@ func (m *VoteResponse) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -370,7 +420,7 @@ func (m *VoteResponse) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Term |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -389,7 +439,7 @@ func (m *VoteResponse) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ v |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -399,7 +449,7 @@ func (m *VoteResponse) Unmarshal(data []byte) error { m.Granted = bool(v != 0) default: iNdEx = preIndex - skippy, err := skipProtocol(data[iNdEx:]) + skippy, err := skipProtocol(dAtA[iNdEx:]) if err != nil { return err } @@ -418,8 +468,8 @@ func (m *VoteResponse) Unmarshal(data []byte) error { } return nil } -func (m *Heartbeat) Unmarshal(data []byte) error { - l := len(data) +func (m *Heartbeat) Unmarshal(dAtA []byte) error { + l := len(dAtA) iNdEx := 0 for iNdEx < l { preIndex := iNdEx @@ -431,7 +481,7 @@ func (m *Heartbeat) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -459,7 +509,7 @@ func (m *Heartbeat) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ m.Term |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -478,7 +528,7 @@ func (m *Heartbeat) Unmarshal(data []byte) error { if iNdEx >= l { return io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ stringLen |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -493,11 +543,11 @@ func (m *Heartbeat) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Leader = string(data[iNdEx:postIndex]) + m.Leader = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex - skippy, err := skipProtocol(data[iNdEx:]) + skippy, err := skipProtocol(dAtA[iNdEx:]) if err != nil { return err } @@ -516,8 +566,8 @@ func (m *Heartbeat) Unmarshal(data []byte) error { } return nil } -func skipProtocol(data []byte) (n int, err error) { - l := len(data) +func skipProtocol(dAtA []byte) (n int, err error) { + l := len(dAtA) iNdEx := 0 for iNdEx < l { var wire uint64 @@ -528,7 +578,7 @@ func skipProtocol(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ wire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -546,7 +596,7 @@ func skipProtocol(data []byte) (n int, err error) { return 0, io.ErrUnexpectedEOF } iNdEx++ - if data[iNdEx-1] < 0x80 { + if dAtA[iNdEx-1] < 0x80 { break } } @@ -563,7 +613,7 @@ func skipProtocol(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ length |= (int(b) & 0x7F) << shift if b < 0x80 { @@ -586,7 +636,7 @@ func skipProtocol(data []byte) (n int, err error) { if iNdEx >= l { return 0, io.ErrUnexpectedEOF } - b := data[iNdEx] + b := dAtA[iNdEx] iNdEx++ innerWire |= (uint64(b) & 0x7F) << shift if b < 0x80 { @@ -597,7 +647,7 @@ func skipProtocol(data []byte) (n int, err error) { if innerWireType == 4 { break } - next, err := skipProtocol(data[start:]) + next, err := skipProtocol(dAtA[start:]) if err != nil { return 0, err } @@ -620,3 +670,24 @@ var ( ErrInvalidLengthProtocol = fmt.Errorf("proto: negative length found during unmarshaling") ErrIntOverflowProtocol = fmt.Errorf("proto: integer overflow") ) + +func init() { proto.RegisterFile("pb/protocol.proto", fileDescriptorProtocol) } + +var fileDescriptorProtocol = []byte{ + // 237 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0x48, 0xd2, 0x2f, + 0x28, 0xca, 0x2f, 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x03, 0x33, 0x84, 0x98, 0x0a, 0x92, 0xa4, 0x74, + 0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0x21, + 0x6a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0x68, 0x51, 0x4a, 0xe4, 0xe2, 0x0e, + 0xcb, 0x2f, 0x49, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe2, 0x62, 0x09, 0x49, + 0x2d, 0xca, 0x95, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x09, 0x02, 0xb3, 0x85, 0x64, 0xb8, 0x38, 0x9d, + 0x13, 0xf3, 0x52, 0x32, 0x53, 0x12, 0x4b, 0x52, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x10, + 0x02, 0x42, 0x0a, 0x5c, 0xdc, 0x3e, 0xf9, 0xe9, 0x01, 0xf9, 0xc5, 0x99, 0x25, 0x99, 0xf9, 0x79, + 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0xc8, 0x42, 0x4a, 0x36, 0x5c, 0x3c, 0x10, 0x2b, 0x8a, + 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0xb1, 0xda, 0x21, 0xc1, 0xc5, 0xee, 0x5e, 0x94, 0x98, 0x57, 0x92, + 0x9a, 0x02, 0xb6, 0x81, 0x23, 0x08, 0xc6, 0x55, 0x32, 0xe7, 0xe2, 0xf4, 0x48, 0x4d, 0x2c, 0x2a, + 0x49, 0x4a, 0x4d, 0xc4, 0xee, 0x3c, 0x31, 0x2e, 0x36, 0x9f, 0xd4, 0xc4, 0x94, 0xd4, 0x22, 0xa8, + 0xdb, 0xa0, 0x3c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, + 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0xde, 0x36, 0x06, + 0x04, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xe6, 0x17, 0xcb, 0x3e, 0x01, 0x00, 0x00, +} diff --git a/pb/protocol.proto b/pb/protocol.proto index 3d59590..23e97fc 100644 --- a/pb/protocol.proto +++ b/pb/protocol.proto @@ -1,7 +1,7 @@ -// Copyright 2016 Apcera Inc. All rights reserved. +// Copyright 2017 Apcera Inc. All rights reserved. // // Uses https://github.com/gogo/protobuf -// compiled via `protoc -I=. -I=$GOPATH/src --gogofaster_out=. protocol.proto` +// compiled via `protoc -I=. -I=$GOPATH/src --gofast_out=. protocol.proto` syntax = "proto3"; package pb; @@ -15,8 +15,9 @@ option (gogoproto.goproto_getters_all) = false; // VoteRequest message VoteRequest { - uint64 Term = 1; // Term for the candidate. - string Candidate = 2; // The candidate for the election. + uint64 Term = 1; // Term for the candidate. + string Candidate = 2; // The candidate for the election. + bytes LogPosition = 3; // Candidate's opaque position in the log. } // VoteResponse diff --git a/test_helper.go b/test_helper.go index 9a9c059..83df79f 100644 --- a/test_helper.go +++ b/test_helper.go @@ -18,8 +18,10 @@ const ( type dummyHandler struct { } -func (*dummyHandler) AsyncError(err error) {} -func (*dummyHandler) StateChange(from, to State) {} +func (*dummyHandler) AsyncError(err error) {} +func (*dummyHandler) StateChange(from, to State) {} +func (*dummyHandler) CurrentLogPosition() []byte { return nil } +func (*dummyHandler) GrantVote(position []byte) bool { return true } func stackFatalf(t *testing.T, f string, args ...interface{}) { lines := make([]string, 0, 32) diff --git a/vote_req_test.go b/vote_req_test.go index bcf2cbe..4ddf295 100644 --- a/vote_req_test.go +++ b/vote_req_test.go @@ -1,8 +1,9 @@ -// Copyright 2013-2016 Apcera Inc. All rights reserved. +// Copyright 2013-2017 Apcera Inc. All rights reserved. package graft import ( + "encoding/binary" "os" "testing" "time" @@ -117,6 +118,92 @@ func TestVoteRequestAsFollower(t *testing.T) { testStateOfNode(t, node) } +func TestVoteRequestAsFollowerLogBehind(t *testing.T) { + ci := ClusterInfo{Name: "vreq", Size: 3} + _, rpc, log := genNodeArgs(t) + lpHandler := &logPositionHandler{} + scCh := make(chan StateChange) + errCh := make(chan error) + handler := NewChanHandler(lpHandler, scCh, errCh) + node, err := New(ci, handler, rpc, log) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + defer node.Close() + + // Set log position to artificial higher value. + newPosition := uint32(8) + term := uint64(1) + lpHandler.logIndex = newPosition + node.setTerm(term) + + // Force write of state + node.writeState() + + // Create fake node to watch VoteResponses. + fake := fakeNode("fake") + + // Hook up to MockRPC layer + mockRegisterPeer(fake) + defer mockUnregisterPeer(fake.id) + + // a VoteRequest with a log that is behind should be ignored + pos := make([]byte, 4) + binary.BigEndian.PutUint32(pos, 1) + node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos} + vresp := <-fake.VoteResponses + if vresp.Term != term { + t.Fatalf("Expected the VoteResponse to have term=%d, got %d\n", + term, vresp.Term) + } + if vresp.Granted != false { + t.Fatal("Expected the VoteResponse to have Granted of false") + } + // Make sure no changes to node + if state := node.State(); state != FOLLOWER { + t.Fatalf("Expected Node to be in Follower state, got: %s", state) + } + if node.Leader() != NO_LEADER { + t.Fatalf("Expected no leader, got: %s\n", node.Leader()) + } + if node.CurrentTerm() != term { + t.Fatalf("Expected CurrentTerm of %d, got: %d\n", + term, node.CurrentTerm()) + } + + // Test persistent state + testStateOfNode(t, node) + + // a VoteRequest with a higher term should reset follower + binary.BigEndian.PutUint32(pos, newPosition+1) + node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos} + vresp = <-fake.VoteResponses + if vresp.Term != term { + t.Fatalf("Expected the VoteResponse to have term=%d, got %d\n", + term, vresp.Term) + } + if vresp.Granted == false { + t.Fatal("Expected the VoteResponse to have been Granted") + } + // Verify new state + if state := node.State(); state != FOLLOWER { + t.Fatalf("Expected Node to be in Follower state, got: %s", state) + } + if node.Leader() != NO_LEADER { + t.Fatalf("Expected no leader, got: %s\n", node.Leader()) + } + if node.CurrentTerm() != term { + t.Fatalf("Expected CurrentTerm of %d, got: %d\n", + term, node.CurrentTerm()) + } + if node.vote == NO_VOTE { + t.Fatal("Expected to have a cast vote at this point") + } + + // Test persistent state + testStateOfNode(t, node) +} + func TestVoteRequestAsCandidate(t *testing.T) { node := vreqNode(t, 3) defer node.Close() From 433585afcc67a5543cdec53811bede05df955615 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Fri, 7 Jul 2017 16:41:32 -0500 Subject: [PATCH 2/4] Update copyrights --- chan_handler.go | 2 +- test_helper.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/chan_handler.go b/chan_handler.go index 5de9cfb..b2a94ab 100644 --- a/chan_handler.go +++ b/chan_handler.go @@ -1,4 +1,4 @@ -// Copyright 2013-2016 Apcera Inc. All rights reserved. +// Copyright 2013-2017 Apcera Inc. All rights reserved. package graft diff --git a/test_helper.go b/test_helper.go index 83df79f..7422396 100644 --- a/test_helper.go +++ b/test_helper.go @@ -1,4 +1,4 @@ -// Copyright 2013-2016 Apcera Inc. All rights reserved. +// Copyright 2013-2017 Apcera Inc. All rights reserved. package graft From 23bf4734e4a46aa35d123c08e083174d4d834dfc Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Fri, 7 Jul 2017 16:52:26 -0500 Subject: [PATCH 3/4] Fix comment in test --- vote_req_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vote_req_test.go b/vote_req_test.go index 4ddf295..6b5895c 100644 --- a/vote_req_test.go +++ b/vote_req_test.go @@ -174,7 +174,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) { // Test persistent state testStateOfNode(t, node) - // a VoteRequest with a higher term should reset follower + // a VoteRequest with a log that is ahead should reset follower binary.BigEndian.PutUint32(pos, newPosition+1) node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos} vresp = <-fake.VoteResponses From 6692c9db19ff337b1b45b7a25fecfc785a6f4e00 Mon Sep 17 00:00:00 2001 From: Tyler Treat Date: Mon, 10 Jul 2017 12:55:16 -0500 Subject: [PATCH 4/4] Rename LogPositionHandler to StateMachineHandler --- chan_handler.go | 23 ++++++++++++++++++----- handler_test.go | 24 +++--------------------- node.go | 42 ++++++++++++++++++++++++++++-------------- pb/protocol.pb.go | 42 +++++++++++++++++++++--------------------- pb/protocol.proto | 6 +++--- test_helper.go | 8 ++++---- vote_req_test.go | 24 +++++++++++++++++++----- 7 files changed, 96 insertions(+), 73 deletions(-) diff --git a/chan_handler.go b/chan_handler.go index b2a94ab..df3dcdd 100644 --- a/chan_handler.go +++ b/chan_handler.go @@ -5,7 +5,7 @@ package graft // ChanHandler is a convenience handler when a user wants to simply use // channels for the async handling of errors and state changes. type ChanHandler struct { - LogPositionHandler + StateMachineHandler // Chan to receive state changes. stateChangeChan chan<- StateChange @@ -22,11 +22,24 @@ type StateChange struct { To State } -func NewChanHandler(logHandler LogPositionHandler, scCh chan<- StateChange, errCh chan<- error) *ChanHandler { +// NewChanHandler returns a Handler implementation which uses channels for +// handling errors and state changes. +func NewChanHandler(scCh chan<- StateChange, errCh chan<- error) *ChanHandler { + return NewChanHandlerWithStateMachine(new(defaultStateMachineHandler), scCh, errCh) +} + +// NewChanHandlerWithStateMachine returns a Handler implementation which uses +// channels for handling errors and state changes and a StateMachineHandler for +// hooking into external state. The external state machine influences leader +// election votes. +func NewChanHandlerWithStateMachine( + stateHandler StateMachineHandler, + scCh chan<- StateChange, + errCh chan<- error) *ChanHandler { return &ChanHandler{ - LogPositionHandler: logHandler, - stateChangeChan: scCh, - errorChan: errCh, + StateMachineHandler: stateHandler, + stateChangeChan: scCh, + errorChan: errCh, } } diff --git a/handler_test.go b/handler_test.go index cea8165..9bc1a3a 100644 --- a/handler_test.go +++ b/handler_test.go @@ -3,7 +3,6 @@ package graft import ( - "encoding/binary" "fmt" "os" "strconv" @@ -13,20 +12,6 @@ import ( "github.com/nats-io/graft/pb" ) -type logPositionHandler struct { - logIndex uint32 -} - -func (l *logPositionHandler) CurrentLogPosition() []byte { - buf := make([]byte, 4) - binary.BigEndian.PutUint32(buf, l.logIndex) - return buf -} - -func (l *logPositionHandler) GrantVote(position []byte) bool { - return binary.BigEndian.Uint32(position) >= l.logIndex -} - // Dumb wait program to sync on callbacks, etc... Will timeout func wait(t *testing.T, ch chan StateChange) *StateChange { select { @@ -58,8 +43,7 @@ func TestStateChangeHandler(t *testing.T) { // Use ChanHandler scCh := make(chan StateChange) errCh := make(chan error) - lpHandler := &logPositionHandler{} - chHand := NewChanHandler(lpHandler, scCh, errCh) + chHand := NewChanHandler(scCh, errCh) node, err := New(ci, chHand, rpc, log) if err != nil { @@ -94,8 +78,7 @@ func TestErrorHandler(t *testing.T) { // Use ChanHandler scCh := make(chan StateChange) errCh := make(chan error) - lpHandler := &logPositionHandler{} - chHand := NewChanHandler(lpHandler, scCh, errCh) + chHand := NewChanHandler(scCh, errCh) node, err := New(ci, chHand, rpc, log) if err != nil { @@ -128,8 +111,7 @@ func TestChandHandlerNotBlockingNode(t *testing.T) { // Use ChanHandler scCh := make(chan StateChange) errCh := make(chan error) - lpHandler := &logPositionHandler{} - chHand := NewChanHandler(lpHandler, scCh, errCh) + chHand := NewChanHandler(scCh, errCh) node, err := New(ci, chHand, rpc, log) if err != nil { diff --git a/node.go b/node.go index b2e0beb..133969f 100644 --- a/node.go +++ b/node.go @@ -80,23 +80,37 @@ type ClusterInfo struct { Size int } -// LogPositionHandler is used to interrogate the state of the log. -type LogPositionHandler interface { - // CurrentLogPosition returns an opaque byte slice that represents the - // current position in the node's log. - CurrentLogPosition() []byte +// StateMachineHandler is used to interrogate an external state machine. +type StateMachineHandler interface { + // CurrentState returns an opaque byte slice that represents the current + // state of the state machine. + CurrentState() []byte // GrantVote is called when a candidate peer has requested a vote. The - // peer's log position is passed as an opaque byte slice as returned by - // CurrentLogPosition. The returned bool determines if the vote should be - // granted because the candidate's log is at least as up-to-date as the - // receiver's log. + // peer's state machine position is passed as an opaque byte slice as + // returned by CurrentState. The returned bool determines if the vote + // should be granted because the candidate's state machine is at least as + // up-to-date as the receiver's state machine. GrantVote(position []byte) bool } +// defaultStateMachineHandler implements the StateMachineHandler interface by +// always granting a vote. +type defaultStateMachineHandler struct{} + +// CurrentState returns nil for default behavior. +func (d *defaultStateMachineHandler) CurrentState() []byte { + return nil +} + +// GrantVote always returns true for default behavior. +func (d *defaultStateMachineHandler) GrantVote(position []byte) bool { + return true +} + // A Handler can process async callbacks from a Graft node. type Handler interface { - LogPositionHandler + StateMachineHandler // Process async errors that are encountered by the node. AsyncError(error) @@ -265,9 +279,9 @@ func (n *Node) runAsCandidate() { // Initiate an Election vreq := &pb.VoteRequest{ - Term: n.term, - Candidate: n.id, - LogPosition: n.handler.CurrentLogPosition(), + Term: n.term, + Candidate: n.id, + CurrentState: n.handler.CurrentState(), } // Collect the votes. // We will vote for ourselves, so start at 1. @@ -459,7 +473,7 @@ func (n *Node) handleVoteRequest(vreq *pb.VoteRequest) bool { deny := &pb.VoteResponse{Term: n.term, Granted: false} // Old term or candidate's log is behind, reject - if vreq.Term < n.term || !n.handler.GrantVote(vreq.LogPosition) { + if vreq.Term < n.term || !n.handler.GrantVote(vreq.CurrentState) { n.rpc.SendVoteResponse(vreq.Candidate, deny) return false } diff --git a/pb/protocol.pb.go b/pb/protocol.pb.go index 8a415ba..ca81a6d 100644 --- a/pb/protocol.pb.go +++ b/pb/protocol.pb.go @@ -34,9 +34,9 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package // VoteRequest type VoteRequest struct { - Term uint64 `protobuf:"varint,1,opt,name=Term,proto3" json:"Term,omitempty"` - Candidate string `protobuf:"bytes,2,opt,name=Candidate,proto3" json:"Candidate,omitempty"` - LogPosition []byte `protobuf:"bytes,3,opt,name=LogPosition,proto3" json:"LogPosition,omitempty"` + Term uint64 `protobuf:"varint,1,opt,name=Term,proto3" json:"Term,omitempty"` + Candidate string `protobuf:"bytes,2,opt,name=Candidate,proto3" json:"Candidate,omitempty"` + CurrentState []byte `protobuf:"bytes,3,opt,name=CurrentState,proto3" json:"CurrentState,omitempty"` } func (m *VoteRequest) Reset() { *m = VoteRequest{} } @@ -97,11 +97,11 @@ func (m *VoteRequest) MarshalTo(dAtA []byte) (int, error) { i = encodeVarintProtocol(dAtA, i, uint64(len(m.Candidate))) i += copy(dAtA[i:], m.Candidate) } - if len(m.LogPosition) > 0 { + if len(m.CurrentState) > 0 { dAtA[i] = 0x1a i++ - i = encodeVarintProtocol(dAtA, i, uint64(len(m.LogPosition))) - i += copy(dAtA[i:], m.LogPosition) + i = encodeVarintProtocol(dAtA, i, uint64(len(m.CurrentState))) + i += copy(dAtA[i:], m.CurrentState) } return i, nil } @@ -205,7 +205,7 @@ func (m *VoteRequest) Size() (n int) { if l > 0 { n += 1 + l + sovProtocol(uint64(l)) } - l = len(m.LogPosition) + l = len(m.CurrentState) if l > 0 { n += 1 + l + sovProtocol(uint64(l)) } @@ -329,7 +329,7 @@ func (m *VoteRequest) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 3: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field LogPosition", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field CurrentState", wireType) } var byteLen int for shift := uint(0); ; shift += 7 { @@ -353,9 +353,9 @@ func (m *VoteRequest) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.LogPosition = append(m.LogPosition[:0], dAtA[iNdEx:postIndex]...) - if m.LogPosition == nil { - m.LogPosition = []byte{} + m.CurrentState = append(m.CurrentState[:0], dAtA[iNdEx:postIndex]...) + if m.CurrentState == nil { + m.CurrentState = []byte{} } iNdEx = postIndex default: @@ -674,20 +674,20 @@ var ( func init() { proto.RegisterFile("pb/protocol.proto", fileDescriptorProtocol) } var fileDescriptorProtocol = []byte{ - // 237 bytes of a gzipped FileDescriptorProto + // 236 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x2c, 0x48, 0xd2, 0x2f, 0x28, 0xca, 0x2f, 0xc9, 0x4f, 0xce, 0xcf, 0xd1, 0x03, 0x33, 0x84, 0x98, 0x0a, 0x92, 0xa4, 0x74, 0xd3, 0x33, 0x4b, 0x32, 0x4a, 0x93, 0xf4, 0x92, 0xf3, 0x73, 0xf5, 0xd3, 0xf3, 0xd3, 0xf3, 0x21, - 0x6a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0x68, 0x51, 0x4a, 0xe4, 0xe2, 0x0e, + 0x6a, 0x92, 0x4a, 0xd3, 0xc0, 0x3c, 0x30, 0x07, 0xcc, 0x82, 0x68, 0x51, 0x4a, 0xe6, 0xe2, 0x0e, 0xcb, 0x2f, 0x49, 0x0d, 0x4a, 0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x11, 0x12, 0xe2, 0x62, 0x09, 0x49, 0x2d, 0xca, 0x95, 0x60, 0x54, 0x60, 0xd4, 0x60, 0x09, 0x02, 0xb3, 0x85, 0x64, 0xb8, 0x38, 0x9d, 0x13, 0xf3, 0x52, 0x32, 0x53, 0x12, 0x4b, 0x52, 0x25, 0x98, 0x14, 0x18, 0x35, 0x38, 0x83, 0x10, - 0x02, 0x42, 0x0a, 0x5c, 0xdc, 0x3e, 0xf9, 0xe9, 0x01, 0xf9, 0xc5, 0x99, 0x25, 0x99, 0xf9, 0x79, - 0x12, 0xcc, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0xc8, 0x42, 0x4a, 0x36, 0x5c, 0x3c, 0x10, 0x2b, 0x8a, - 0x0b, 0xf2, 0xf3, 0x8a, 0x53, 0xb1, 0xda, 0x21, 0xc1, 0xc5, 0xee, 0x5e, 0x94, 0x98, 0x57, 0x92, - 0x9a, 0x02, 0xb6, 0x81, 0x23, 0x08, 0xc6, 0x55, 0x32, 0xe7, 0xe2, 0xf4, 0x48, 0x4d, 0x2c, 0x2a, - 0x49, 0x4a, 0x4d, 0xc4, 0xee, 0x3c, 0x31, 0x2e, 0x36, 0x9f, 0xd4, 0xc4, 0x94, 0xd4, 0x22, 0xa8, - 0xdb, 0xa0, 0x3c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, - 0x8e, 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0xde, 0x36, 0x06, - 0x04, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xe6, 0x17, 0xcb, 0x3e, 0x01, 0x00, 0x00, + 0x02, 0x42, 0x4a, 0x5c, 0x3c, 0xce, 0xa5, 0x45, 0x45, 0xa9, 0x79, 0x25, 0xc1, 0x25, 0x20, 0x05, + 0xcc, 0x0a, 0x8c, 0x1a, 0x3c, 0x41, 0x28, 0x62, 0x4a, 0x36, 0x5c, 0x3c, 0x10, 0x4b, 0x8a, 0x0b, + 0xf2, 0xf3, 0x8a, 0x53, 0xb1, 0xda, 0x22, 0xc1, 0xc5, 0xee, 0x5e, 0x94, 0x98, 0x57, 0x92, 0x9a, + 0x02, 0xb6, 0x83, 0x23, 0x08, 0xc6, 0x55, 0x32, 0xe7, 0xe2, 0xf4, 0x48, 0x4d, 0x2c, 0x2a, 0x49, + 0x4a, 0x4d, 0xc4, 0xee, 0x40, 0x31, 0x2e, 0x36, 0x9f, 0xd4, 0xc4, 0x94, 0xd4, 0x22, 0xa8, 0xeb, + 0xa0, 0x3c, 0x27, 0x91, 0x13, 0x0f, 0xe5, 0x18, 0x4e, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, + 0xf1, 0xc1, 0x23, 0x39, 0xc6, 0x19, 0x8f, 0xe5, 0x18, 0x92, 0xd8, 0xc0, 0x1e, 0x37, 0x06, 0x04, + 0x00, 0x00, 0xff, 0xff, 0x4c, 0xbd, 0x60, 0x28, 0x40, 0x01, 0x00, 0x00, } diff --git a/pb/protocol.proto b/pb/protocol.proto index 23e97fc..58c0def 100644 --- a/pb/protocol.proto +++ b/pb/protocol.proto @@ -15,9 +15,9 @@ option (gogoproto.goproto_getters_all) = false; // VoteRequest message VoteRequest { - uint64 Term = 1; // Term for the candidate. - string Candidate = 2; // The candidate for the election. - bytes LogPosition = 3; // Candidate's opaque position in the log. + uint64 Term = 1; // Term for the candidate. + string Candidate = 2; // The candidate for the election. + bytes CurrentState = 3; // Candidate's opaque position in the state machine. } // VoteResponse diff --git a/test_helper.go b/test_helper.go index 7422396..81c0e91 100644 --- a/test_helper.go +++ b/test_helper.go @@ -18,10 +18,10 @@ const ( type dummyHandler struct { } -func (*dummyHandler) AsyncError(err error) {} -func (*dummyHandler) StateChange(from, to State) {} -func (*dummyHandler) CurrentLogPosition() []byte { return nil } -func (*dummyHandler) GrantVote(position []byte) bool { return true } +func (*dummyHandler) AsyncError(err error) {} +func (*dummyHandler) StateChange(from, to State) {} +func (*dummyHandler) CurrentState() []byte { return nil } +func (*dummyHandler) GrantVote(state []byte) bool { return true } func stackFatalf(t *testing.T, f string, args ...interface{}) { lines := make([]string, 0, 32) diff --git a/vote_req_test.go b/vote_req_test.go index 6b5895c..d9ceaa3 100644 --- a/vote_req_test.go +++ b/vote_req_test.go @@ -13,6 +13,20 @@ import ( // Test VoteRequests RPC in different states. +type stateMachineHandler struct { + logIndex uint32 +} + +func (s *stateMachineHandler) CurrentState() []byte { + buf := make([]byte, 4) + binary.BigEndian.PutUint32(buf, s.logIndex) + return buf +} + +func (s *stateMachineHandler) GrantVote(position []byte) bool { + return binary.BigEndian.Uint32(position) >= s.logIndex +} + func vreqNode(t *testing.T, expected int) *Node { ci := ClusterInfo{Name: "vreq", Size: expected} hand, rpc, log := genNodeArgs(t) @@ -121,10 +135,10 @@ func TestVoteRequestAsFollower(t *testing.T) { func TestVoteRequestAsFollowerLogBehind(t *testing.T) { ci := ClusterInfo{Name: "vreq", Size: 3} _, rpc, log := genNodeArgs(t) - lpHandler := &logPositionHandler{} + stateHandler := new(stateMachineHandler) scCh := make(chan StateChange) errCh := make(chan error) - handler := NewChanHandler(lpHandler, scCh, errCh) + handler := NewChanHandlerWithStateMachine(stateHandler, scCh, errCh) node, err := New(ci, handler, rpc, log) if err != nil { t.Fatalf("Expected no error, got %v", err) @@ -134,7 +148,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) { // Set log position to artificial higher value. newPosition := uint32(8) term := uint64(1) - lpHandler.logIndex = newPosition + stateHandler.logIndex = newPosition node.setTerm(term) // Force write of state @@ -150,7 +164,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) { // a VoteRequest with a log that is behind should be ignored pos := make([]byte, 4) binary.BigEndian.PutUint32(pos, 1) - node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos} + node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, CurrentState: pos} vresp := <-fake.VoteResponses if vresp.Term != term { t.Fatalf("Expected the VoteResponse to have term=%d, got %d\n", @@ -176,7 +190,7 @@ func TestVoteRequestAsFollowerLogBehind(t *testing.T) { // a VoteRequest with a log that is ahead should reset follower binary.BigEndian.PutUint32(pos, newPosition+1) - node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, LogPosition: pos} + node.VoteRequests <- &pb.VoteRequest{Term: term, Candidate: fake.id, CurrentState: pos} vresp = <-fake.VoteResponses if vresp.Term != term { t.Fatalf("Expected the VoteResponse to have term=%d, got %d\n",