From dd8836462001b2fc4b71e771e875ecd725574ed0 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Wed, 14 Sep 2016 13:40:12 -0600 Subject: [PATCH] [FIXED] Ensure state changes are posted in order they occur Both state changes and errors were posted through the use of a go routine. This did not guarantee that the changes would arrive in the state change handler in the same order they were produced. Resolves #11 --- handler_test.go | 5 ++++- node.go | 26 ++++++++++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/handler_test.go b/handler_test.go index 2e43c13..8d50c33 100644 --- a/handler_test.go +++ b/handler_test.go @@ -13,7 +13,10 @@ func wait(t *testing.T, ch chan StateChange) *StateChange { select { case sc := <-ch: return &sc - case <-time.After(MAX_ELECTION_TIMEOUT): + // It could be that the random election time is the max. Inc + // that case, the node will still need a bit more time to + // transition. + case <-time.After(MAX_ELECTION_TIMEOUT + 50*time.Millisecond): t.Fatal("Timeout waiting on state change") } return nil diff --git a/node.go b/node.go index db7deb2..87c2d4f 100644 --- a/node.go +++ b/node.go @@ -36,6 +36,10 @@ type Node struct { // Async handler handler Handler + // Used to chain go routines posting events to async handler. + nextSCCh chan bool + nextErrCh chan bool + // Current leader leader string @@ -352,7 +356,9 @@ func (n *Node) runAsFollower() { // Send the error to the async handler. func (n *Node) handleError(err error) { - go n.handler.AsyncError(err) + n.serializeGoRoutine(&n.nextErrCh, func() { + n.handler.AsyncError(err) + }) } // handleHeartBeat is called to process a heartbeat from a LEADER. @@ -514,6 +520,20 @@ func (n *Node) switchToCandidate() { n.switchState(CANDIDATE) } +// Execute `f` in a separate go routine, but ensures that functions +// are executed in order. +func (n *Node) serializeGoRoutine(nextCh *(chan bool), f func()) { + prevCh := *nextCh // possibly nil + *nextCh = make(chan bool, 1) + go func(prev, next chan bool) { + if prev != nil { + <-prev + } + f() + next <- true + }(prevCh, *nextCh) +} + // Process a state transistion. Assume lock is held on entrance. // Call the async handler in a separate Go routine. func (n *Node) switchState(state State) { @@ -522,7 +542,9 @@ func (n *Node) switchState(state State) { } old := n.state n.state = state - go n.handler.StateChange(old, state) + n.serializeGoRoutine(&n.nextSCCh, func() { + n.handler.StateChange(old, state) + }) } // Reset the election timeout with a random value.