Skip to content

Commit

Permalink
[FIXED] Ensure state changes are posted in order they occur
Browse files Browse the repository at this point in the history
Both state changes and errors were posted through the use of a go
routine. This did not guarantee that the changes would arrive in
the state change handler in the same order they were produced.

Resolves #11
  • Loading branch information
kozlovic committed Sep 14, 2016
1 parent 92e724b commit dd88364
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 3 deletions.
5 changes: 4 additions & 1 deletion handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ func wait(t *testing.T, ch chan StateChange) *StateChange {
select {
case sc := <-ch:
return &sc
case <-time.After(MAX_ELECTION_TIMEOUT):
// It could be that the random election time is the max. Inc
// that case, the node will still need a bit more time to
// transition.
case <-time.After(MAX_ELECTION_TIMEOUT + 50*time.Millisecond):
t.Fatal("Timeout waiting on state change")
}
return nil
Expand Down
26 changes: 24 additions & 2 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ type Node struct {
// Async handler
handler Handler

// Used to chain go routines posting events to async handler.
nextSCCh chan bool
nextErrCh chan bool

// Current leader
leader string

Expand Down Expand Up @@ -352,7 +356,9 @@ func (n *Node) runAsFollower() {

// Send the error to the async handler.
func (n *Node) handleError(err error) {
go n.handler.AsyncError(err)
n.serializeGoRoutine(&n.nextErrCh, func() {
n.handler.AsyncError(err)
})
}

// handleHeartBeat is called to process a heartbeat from a LEADER.
Expand Down Expand Up @@ -514,6 +520,20 @@ func (n *Node) switchToCandidate() {
n.switchState(CANDIDATE)
}

// Execute `f` in a separate go routine, but ensures that functions
// are executed in order.
func (n *Node) serializeGoRoutine(nextCh *(chan bool), f func()) {
prevCh := *nextCh // possibly nil
*nextCh = make(chan bool, 1)
go func(prev, next chan bool) {
if prev != nil {
<-prev
}
f()
next <- true
}(prevCh, *nextCh)
}

// Process a state transistion. Assume lock is held on entrance.
// Call the async handler in a separate Go routine.
func (n *Node) switchState(state State) {
Expand All @@ -522,7 +542,9 @@ func (n *Node) switchState(state State) {
}
old := n.state
n.state = state
go n.handler.StateChange(old, state)
n.serializeGoRoutine(&n.nextSCCh, func() {
n.handler.StateChange(old, state)
})
}

// Reset the election timeout with a random value.
Expand Down

0 comments on commit dd88364

Please sign in to comment.