Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure StateChange are posted in order to Handler #15

Merged
merged 2 commits into from
Sep 16, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
package graft

import (
"fmt"
"os"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -101,3 +103,92 @@ func TestErrorHandler(t *testing.T) {
t.Fatalf("Expected the logPath, got %s \n", perr.Path)
}
}

func TestChandHandlerNotBlockingNode(t *testing.T) {
ci := ClusterInfo{Name: "foo", Size: 1}
_, rpc, log := genNodeArgs(t)

// Use ChanHandler
scCh := make(chan StateChange)
errCh := make(chan error)
chHand := NewChanHandler(scCh, errCh)

node, err := New(ci, chHand, rpc, log)
if err != nil {
t.Fatalf("Expected no error, got: %v", err)
}
defer node.Close()

// Reset the node election timer to a very high number so
// it does not interfere with the test
node.mu.Lock()
node.electTimer.Reset(time.Hour)
node.mu.Unlock()
// Wait in case election was happening
time.Sleep(MAX_ELECTION_TIMEOUT + 50*time.Millisecond)
// Drain the state changes
drained := false
for !drained {
select {
case <-scCh:
default:
drained = true
}
}

// Make sure that if no one is dequeuing state changes or errors
// the node is not blocked.
total := 10
to := FOLLOWER
step := 0
for i := 0; i < total; i++ {
switch step {
case 0:
to = CANDIDATE
step = 1
case 1:
to = LEADER
step = 2
case 2:
to = FOLLOWER
step = 0
}
// This call expects lock to be held on entry
node.mu.Lock()
node.switchState(to)
node.mu.Unlock()
// This call does not
node.handleError(fmt.Errorf("%d", i))
}

// Now dequeue from channels and verify order
step = 0
for i := 0; i < total; i++ {
sc := <-scCh
switch step {
case 0:
if sc.From != FOLLOWER || sc.To != CANDIDATE {
t.Fatalf("i=%d Expected state to be from Follower to Candidate, got %v to %v", i, sc.From.String(), sc.To.String())
}
step = 1
case 1:
if sc.From != CANDIDATE || sc.To != LEADER {
t.Fatalf("i=%d Expected state to be from Candidate to Leader, got %v to %v", i, sc.From.String(), sc.To.String())
}
step = 2
case 2:
if sc.From != LEADER || sc.To != FOLLOWER {
t.Fatalf("i=%d Expected state to be from Leader to Follower, got %v to %v", i, sc.From.String(), sc.To.String())
}
step = 0
}
err := <-errCh
errAsInt, convErr := strconv.Atoi(err.Error())
if convErr != nil {
t.Fatalf("Error converting error content: %v", convErr)
}
if errAsInt != i {
t.Fatalf("Expected error to be %d, got %d", i, errAsInt)
}
}
}
72 changes: 50 additions & 22 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@ import (
"crypto/rand"
"encoding/hex"
"io"
mrand "math/rand"
"sync"
"time"

"github.com/nats-io/graft/pb"

mrand "math/rand"
)

type Node struct {
Expand All @@ -38,9 +37,11 @@ type Node struct {
// Async handler
handler Handler

// Used to chain go routines posting events to async handler.
stateChain chan bool // For state changes
errChain chan bool // For errors
// Pending StateChange events
stateChg []*StateChange

// Pending Error events
errors []error

// Current leader
leader string
Expand Down Expand Up @@ -356,12 +357,32 @@ func (n *Node) runAsFollower() {
}
}

// postError invokes handler.AsyncError() in a go routine.
// When the handler call returns, and if there are still pending errors,
// this function will recursively call itself with the first element in
// the list.
func (n *Node) postError(err error) {
go func() {
n.handler.AsyncError(err)
n.mu.Lock()
n.errors = n.errors[1:]
if len(n.errors) > 0 {
err := n.errors[0]
n.postError(err)
}
n.mu.Unlock()
}()
}

// Send the error to the async handler.
func (n *Node) handleError(err error) {
n.mu.Lock()
n.serializeGoRoutine(&n.errChain, func() {
n.handler.AsyncError(err)
})
n.errors = append(n.errors, err)
// Call postError only for the first error added.
// Check postError for details.
if len(n.errors) == 1 {
n.postError(err)
}
n.mu.Unlock()
}

Expand Down Expand Up @@ -524,18 +545,21 @@ func (n *Node) switchToCandidate() {
n.switchState(CANDIDATE)
}

// Execute `f` in a separate go routine, but ensures that functions
// are executed in order.
func (n *Node) serializeGoRoutine(nextCh *(chan bool), f func()) {
prevCh := *nextCh // possibly nil
*nextCh = make(chan bool, 1)
go func(prev, next chan bool) {
if prev != nil {
<-prev
// postStateChange invokes handler.StateChange() in a go routine.
// When the handler call returns, and if there are still pending state
// changes, this function will recursively call itself with the first
// element in the list.
func (n *Node) postStateChange(sc *StateChange) {
go func() {
n.handler.StateChange(sc.From, sc.To)
n.mu.Lock()
n.stateChg = n.stateChg[1:]
if len(n.stateChg) > 0 {
sc := n.stateChg[0]
n.postStateChange(sc)
}
f()
next <- true
}(prevCh, *nextCh)
n.mu.Unlock()
}()
}

// Process a state transistion. Assume lock is held on entrance.
Expand All @@ -546,9 +570,13 @@ func (n *Node) switchState(state State) {
}
old := n.state
n.state = state
n.serializeGoRoutine(&n.stateChain, func() {
n.handler.StateChange(old, state)
})
sc := &StateChange{From: old, To: state}
n.stateChg = append(n.stateChg, sc)
// Invoke postStateChange only for the first state change added.
// Check postStateChange for details.
if len(n.stateChg) == 1 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this change again before you check the length?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it can't, we are under the node's lock when adding, checking and removing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok!

n.postStateChange(sc)
}
}

// Reset the election timeout with a random value.
Expand Down
2 changes: 1 addition & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCandidateState(t *testing.T) {

// Should move to candidate state within MAX_ELECTION_TIMEOUT
time.Sleep(MAX_ELECTION_TIMEOUT)
if state := node.State(); state != CANDIDATE {
if state := waitForState(node, CANDIDATE); state != CANDIDATE {
t.Fatalf("Expected node to move to Candidate state, got: %s", state)
}
if stateStr := node.State().String(); stateStr != "Candidate" {
Expand Down