Skip to content

Commit

Permalink
Merge pull request #15 from nats-io/other_take_on_state_order
Browse files Browse the repository at this point in the history
Ensure StateChange are posted in order to Handler
  • Loading branch information
derekcollison committed Sep 16, 2016
2 parents 26a3418 + 420e435 commit 0515657
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 23 deletions.
91 changes: 91 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package graft

import (
"fmt"
"os"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -101,3 +103,92 @@ func TestErrorHandler(t *testing.T) {
t.Fatalf("Expected the logPath, got %s \n", perr.Path)
}
}

func TestChandHandlerNotBlockingNode(t *testing.T) {
ci := ClusterInfo{Name: "foo", Size: 1}
_, rpc, log := genNodeArgs(t)

// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
defer node.Close()

// Reset the node election timer to a very high number so
// it does not interfere with the test
node.mu.Lock()
node.electTimer.Reset(time.Hour)
node.mu.Unlock()
// Wait in case election was happening
time.Sleep(MAX_ELECTION_TIMEOUT + 50*time.Millisecond)
// Drain the state changes
drained := false
for !drained {
select {
case <-scCh:
default:
drained = true
}
}

// Make sure that if no one is dequeuing state changes or errors
// the node is not blocked.
total := 10
to := FOLLOWER
step := 0
for i := 0; i < total; i++ {
switch step {
case 0:
to = CANDIDATE
step = 1
case 1:
to = LEADER
step = 2
case 2:
to = FOLLOWER
step = 0
}
// This call expects lock to be held on entry
node.mu.Lock()
node.switchState(to)
node.mu.Unlock()
// This call does not
node.handleError(fmt.Errorf("%d", i))
}

// Now dequeue from channels and verify order
step = 0
for i := 0; i < total; i++ {
sc := <-scCh
switch step {
case 0:
if sc.From != FOLLOWER || sc.To != CANDIDATE {
t.Fatalf("i=%d Expected state to be from Follower to Candidate, got %v to %v", i, sc.From.String(), sc.To.String())
}
step = 1
case 1:
if sc.From != CANDIDATE || sc.To != LEADER {
t.Fatalf("i=%d Expected state to be from Candidate to Leader, got %v to %v", i, sc.From.String(), sc.To.String())
}
step = 2
case 2:
if sc.From != LEADER || sc.To != FOLLOWER {
t.Fatalf("i=%d Expected state to be from Leader to Follower, got %v to %v", i, sc.From.String(), sc.To.String())
}
step = 0
}
err := <-errCh
errAsInt, convErr := strconv.Atoi(err.Error())
if convErr != nil {
t.Fatalf("Error converting error content: %v", convErr)
}
if errAsInt != i {
t.Fatalf("Expected error to be %d, got %d", i, errAsInt)
}
}
}
72 changes: 50 additions & 22 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"crypto/rand"
"encoding/hex"
"io"
mrand "math/rand"
"sync"
"time"

"github.com/nats-io/graft/pb"

mrand "math/rand"
)

type Node struct {
Expand All @@ -38,9 +37,11 @@ type Node struct {
// Async handler
handler Handler

// Used to chain go routines posting events to async handler.
stateChain chan bool // For state changes
errChain chan bool // For errors
// Pending StateChange events
stateChg []*StateChange

// Pending Error events
errors []error

// Current leader
leader string
Expand Down Expand Up @@ -356,12 +357,32 @@ func (n *Node) runAsFollower() {
}
}

// postError invokes handler.AsyncError() in a go routine.
// When the handler call returns, and if there are still pending errors,
// this function will recursively call itself with the first element in
// the list.
func (n *Node) postError(err error) {
go func() {
n.handler.AsyncError(err)
n.mu.Lock()
n.errors = n.errors[1:]
if len(n.errors) > 0 {
err := n.errors[0]
n.postError(err)
}
n.mu.Unlock()
}()
}

// Send the error to the async handler.
func (n *Node) handleError(err error) {
n.mu.Lock()
n.serializeGoRoutine(&n.errChain, func() {
n.handler.AsyncError(err)
})
n.errors = append(n.errors, err)
// Call postError only for the first error added.
// Check postError for details.
if len(n.errors) == 1 {
n.postError(err)
}
n.mu.Unlock()
}

Expand Down Expand Up @@ -524,18 +545,21 @@ func (n *Node) switchToCandidate() {
n.switchState(CANDIDATE)
}

// Execute `f` in a separate go routine, but ensures that functions
// are executed in order.
func (n *Node) serializeGoRoutine(nextCh *(chan bool), f func()) {
prevCh := *nextCh // possibly nil
*nextCh = make(chan bool, 1)
go func(prev, next chan bool) {
if prev != nil {
<-prev
// postStateChange invokes handler.StateChange() in a go routine.
// When the handler call returns, and if there are still pending state
// changes, this function will recursively call itself with the first
// element in the list.
func (n *Node) postStateChange(sc *StateChange) {
go func() {
n.handler.StateChange(sc.From, sc.To)
n.mu.Lock()
n.stateChg = n.stateChg[1:]
if len(n.stateChg) > 0 {
sc := n.stateChg[0]
n.postStateChange(sc)
}
f()
next <- true
}(prevCh, *nextCh)
n.mu.Unlock()
}()
}

// Process a state transistion. Assume lock is held on entrance.
Expand All @@ -546,9 +570,13 @@ func (n *Node) switchState(state State) {
}
old := n.state
n.state = state
n.serializeGoRoutine(&n.stateChain, func() {
n.handler.StateChange(old, state)
})
sc := &StateChange{From: old, To: state}
n.stateChg = append(n.stateChg, sc)
// Invoke postStateChange only for the first state change added.
// Check postStateChange for details.
if len(n.stateChg) == 1 {
n.postStateChange(sc)
}
}

// Reset the election timeout with a random value.
Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCandidateState(t *testing.T) {

// Should move to candidate state within MAX_ELECTION_TIMEOUT
time.Sleep(MAX_ELECTION_TIMEOUT)
if state := node.State(); state != CANDIDATE {
if state := waitForState(node, CANDIDATE); state != CANDIDATE {
t.Fatalf("Expected node to move to Candidate state, got: %s", state)
}
if stateStr := node.State().String(); stateStr != "Candidate" {
Expand Down

0 comments on commit 0515657

Please sign in to comment.