Skip to content

Commit

Permalink
Use a single go routine to dispatch async events
Browse files Browse the repository at this point in the history
  • Loading branch information
kozlovic committed Sep 14, 2016
1 parent 7e99caa commit 11b9137
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 27 deletions.
18 changes: 18 additions & 0 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package graft

import (
"os"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -86,6 +87,21 @@ func TestErrorHandler(t *testing.T) {
os.Chmod(node.logPath, 0400)
defer os.Chmod(node.logPath, 0660)

// Need to dequeue state changes, otherwise we won't get the error
wg := &sync.WaitGroup{}
wg.Add(1)
chQuit := make(chan bool)
go func() {
defer wg.Done()
for {
select {
case <-chQuit:
return
case <-scCh:
}
}
}()

err = errWait(t, errCh)

perr, ok := err.(*os.PathError)
Expand All @@ -98,4 +114,6 @@ func TestErrorHandler(t *testing.T) {
if perr.Path != node.LogPath() {
t.Fatalf("Expected the logPath, got %s \n", perr.Path)
}
chQuit <- true
wg.Wait()
}
75 changes: 49 additions & 26 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ type Node struct {
// Async handler
handler Handler

// Used to chain go routines posting events to async handler.
stateChain chan bool // For state changes
errChain chan bool // For errors
// The node passes StateChange and Error events to the async handler.
// Since the handler implementation may cause SateChange() and AsyncError()
// to block, we need to make sure that we invoke those from a go routine.
// Still we need to preserve the order. We use those objects to do so.
asyncFuncs []func()
asyncDispReady chan bool
asyncDispQuit chan bool

// Current leader
leader string
Expand Down Expand Up @@ -94,18 +98,24 @@ func New(info ClusterInfo, handler Handler, rpc RPCDriver, logPath string) (*Nod

// Assign an Id() and start us as a FOLLOWER with no known LEADER.
node := &Node{
id: genUUID(),
info: info,
state: FOLLOWER,
rpc: rpc,
handler: handler,
leader: NO_LEADER,
quit: make(chan chan struct{}),
VoteRequests: make(chan *VoteRequest),
VoteResponses: make(chan *VoteResponse),
HeartBeats: make(chan *Heartbeat),
id: genUUID(),
info: info,
state: FOLLOWER,
rpc: rpc,
handler: handler,
leader: NO_LEADER,
quit: make(chan chan struct{}),
VoteRequests: make(chan *VoteRequest),
VoteResponses: make(chan *VoteResponse),
HeartBeats: make(chan *Heartbeat),
asyncFuncs: make([]func(), 0),
asyncDispReady: make(chan bool, 1),
asyncDispQuit: make(chan bool, 1),
}

// Start async dispatcher
go node.asyncDispatcher()

// Init the log file and update our state.
if err := node.initLog(logPath); err != nil {
return nil, err
Expand Down Expand Up @@ -357,7 +367,7 @@ func (n *Node) runAsFollower() {
// Send the error to the async handler.
func (n *Node) handleError(err error) {
n.mu.Lock()
n.serializeGoRoutine(&n.errChain, func() {
n.postAsyncFunc(func() {
n.handler.AsyncError(err)
})
n.mu.Unlock()
Expand Down Expand Up @@ -522,18 +532,30 @@ func (n *Node) switchToCandidate() {
n.switchState(CANDIDATE)
}

// Execute `f` in a separate go routine, but ensures that functions
// are executed in order.
func (n *Node) serializeGoRoutine(nextCh *(chan bool), f func()) {
prevCh := *nextCh // possibly nil
*nextCh = make(chan bool, 1)
go func(prev, next chan bool) {
if prev != nil {
<-prev
// This go-routine is popping functions that pass events
// to the async handler. This ensures that the Node is never blocked.
func (n *Node) asyncDispatcher() {
for {
select {
case <-n.asyncDispQuit:
return
case <-n.asyncDispReady:
n.mu.Lock()
f := n.asyncFuncs[0]
n.asyncFuncs = n.asyncFuncs[1:]
n.mu.Unlock()
// This may block
f()
}
f()
next <- true
}(prevCh, *nextCh)
}
}

// Add a function to the array and notify the async dispatcher
func (n *Node) postAsyncFunc(f func()) {
n.asyncFuncs = append(n.asyncFuncs, f)
if len(n.asyncDispReady) == 0 {
n.asyncDispReady <- true
}
}

// Process a state transistion. Assume lock is held on entrance.
Expand All @@ -544,7 +566,7 @@ func (n *Node) switchState(state State) {
}
old := n.state
n.state = state
n.serializeGoRoutine(&n.stateChain, func() {
n.postAsyncFunc(func() {
n.handler.StateChange(old, state)
})
}
Expand Down Expand Up @@ -598,6 +620,7 @@ func (n *Node) Close() {
n.waitOnLoopFinish()
n.clearTimers()
n.closeLog()
n.asyncDispQuit <- true
}

// Return the current state.
Expand Down
5 changes: 4 additions & 1 deletion node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ func TestClose(t *testing.T) {

// Check for dangling go routines
delta := (runtime.NumGoroutine() - base)
if delta > 0 {
// It is possible that the asyncDispatcher is still running because
// it may be blocked on handler.StateChange() or handler.AsyncError
// if the user is not actively dequeuing.
if delta > 1 {
t.Fatalf("[%d] Go routines still exist post Close()", delta)
}
}
Expand Down

0 comments on commit 11b9137

Please sign in to comment.