From f47ae9caef23642b383d17bfe9bbe321b2219207 Mon Sep 17 00:00:00 2001 From: frairon Date: Thu, 7 May 2020 22:27:55 +0200 Subject: [PATCH] #248: refactored state merger, bugfix race condition when shutting down the signal/observers, created example --- examples/6-reconnecting-view/README.md | 10 ++++ examples/6-reconnecting-view/main.go | 67 ++++++++++++++++++++++++++ partition_table.go | 18 +++++-- signal.go | 29 +++++++---- view.go | 59 ++++++++++++++++++----- 5 files changed, 158 insertions(+), 25 deletions(-) create mode 100644 examples/6-reconnecting-view/README.md create mode 100644 examples/6-reconnecting-view/main.go diff --git a/examples/6-reconnecting-view/README.md b/examples/6-reconnecting-view/README.md new file mode 100644 index 00000000..8f1df471 --- /dev/null +++ b/examples/6-reconnecting-view/README.md @@ -0,0 +1,10 @@ +## Reconnecting View + +This example shows a reconnecting view by observing the state changes. +Run a local Kafka cluster by calling `make start` in folder `examples/`. + +Then run this example (`go run 6-reconnecting-views/main.go`). +You should see the view state changes upon starting. + +Now kill the kafka cluster `make stop`, you should see some error messages and the view +trying to reconnect using a default backoff \ No newline at end of file diff --git a/examples/6-reconnecting-view/main.go b/examples/6-reconnecting-view/main.go new file mode 100644 index 00000000..65e62db8 --- /dev/null +++ b/examples/6-reconnecting-view/main.go @@ -0,0 +1,67 @@ +package main + +import ( + "context" + "log" + "os" + "os/signal" + "syscall" + + "github.com/lovoo/goka" + "github.com/lovoo/goka/codec" +) + +func main() { + + view, err := goka.NewView( + // connect to example kafka cluster + []string{"localhost:9092"}, + // name does not matter, table will be empty + "restartable-view-test-table", + // codec doesn't matter, the table will be empty + new(codec.String), + // start the view autoconnecting + goka.WithViewAutoReconnect(), + ) + if err != nil { + log.Fatalf("Cannot create view: %v", err) + } + // context we'll use to run the view and the state change observer + ctx, cancel := context.WithCancel(context.Background()) + + // channel used to wait for the view to finish + done := make(chan struct{}) + go func() { + defer close(done) + err := view.Run(ctx) + if err != nil { + log.Printf("View finished with error: %v", err) + } + }() + + // Get a state change observer and + go func() { + obs := view.ObserveStateChanges() + defer obs.Stop() + for { + select { + case state, ok := <-obs.C(): + if !ok { + return + } + log.Printf("View is in state: %v", goka.ViewState(state)) + case <-ctx.Done(): + return + } + } + }() + + go func() { + waiter := make(chan os.Signal, 1) + signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM) + <-waiter + cancel() + }() + + <-done +} diff --git a/partition_table.go b/partition_table.go index 5e898c60..550adb64 100644 --- a/partition_table.go +++ b/partition_table.go @@ -137,25 +137,35 @@ func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool } func (p *PartitionTable) loadRestarting(ctx context.Context, stopAfterCatchup bool) error { - var resetTimer *time.Timer + var ( + resetTimer *time.Timer + retries int + ) + for { err := p.load(ctx, stopAfterCatchup) if err != nil { - p.log.Printf("Error while catching up, but we'll try to keep it running: %v", err) + p.log.Printf("Error while starting up: %v", err) + retries++ if resetTimer != nil { resetTimer.Stop() } - resetTimer = time.AfterFunc(p.backoffResetTimeout, p.backoff.Reset) + resetTimer = time.AfterFunc(p.backoffResetTimeout, func() { + p.backoff.Reset() + retries = 0 + }) } else { return nil } + retryDuration := p.backoff.Duration() + p.log.Printf("Will retry in %.0f seconds (retried %d times so far)", retryDuration.Seconds(), retries) select { case <-ctx.Done(): return nil - case <-time.After(p.backoff.Duration()): + case <-time.After(retryDuration): } } } diff --git a/signal.go b/signal.go index eeddeb69..7414af7c 100644 --- a/signal.go +++ b/signal.go @@ -59,7 +59,7 @@ func (s *Signal) SetState(state State) *Signal { // notify the state change observers for _, obs := range s.stateChangeObservers { - obs.c <- struct{}{} + obs.notify(state) } return s @@ -119,9 +119,12 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} { // StateChangeObserver wraps a channel that triggers when the signal's state changes type StateChangeObserver struct { - c chan struct{} + // state notifier channel + c chan State + // closed is closed when the observer is closed to avoid sending to a closed channel + closed chan struct{} + // stop is a callback to stop the observer stop func() - s *Signal } // Stop stops the observer. Its update channel will be closed and @@ -130,30 +133,36 @@ func (s *StateChangeObserver) Stop() { } // C returns the channel to observer state changes -func (s *StateChangeObserver) C() <-chan struct{} { +func (s *StateChangeObserver) C() <-chan State { return s.c } -// State returns the current state of the Signal -func (s *StateChangeObserver) State() State { - return s.s.State() +func (s *StateChangeObserver) notify(state State) { + select { + case <-s.closed: + case s.c <- state: + } } // ObserveStateChange returns a channel that receives state changes. // Note that the caller must take care of consuming that channel, otherwise the Signal -// will block upon state changes +// will block upon state changes. func (s *Signal) ObserveStateChange() *StateChangeObserver { s.m.Lock() defer s.m.Unlock() observer := &StateChangeObserver{ - c: make(chan struct{}, 1), - s: s, + c: make(chan State, 1), + closed: make(chan struct{}), } + // initialize the observer with the current state + observer.notify(s.State()) + // the stop funtion stops the observer by closing its channel // and removing it from the list of observers observer.stop = func() { + close(observer.closed) s.m.Lock() defer s.m.Unlock() diff --git a/view.go b/view.go index 8049d515..cf0694aa 100644 --- a/view.go +++ b/view.go @@ -151,16 +151,25 @@ func (v *View) createPartitions(brokers []string) (rerr error) { func (v *View) runStateMerger(ctx context.Context) { - // when the state of any partition updates, - // get the *lowest* state, which is the partition least running. - // Then translate that state to a view state. - updateViewState := func() { + var ( + states = make(map[int]PartitionStatus) + m sync.Mutex + ) + + // internal callback that will be called when the state of any + // partition changes. + // Then the "lowest" state of all partitions will be selected and + // translated into the respective ViewState + updateViewState := func(idx int, state State) { + m.Lock() + defer m.Unlock() + states[idx] = PartitionStatus(state) + var lowestState = PartitionStatus(-1) - for _, part := range v.partitions { - state := part.CurrentState() - if lowestState == -1 || state < lowestState { - lowestState = state + for _, partitionState := range states { + if lowestState == -1 || partitionState < lowestState { + lowestState = partitionState } } var newState = ViewState(-1) @@ -187,19 +196,21 @@ func (v *View) runStateMerger(ctx context.Context) { } // get a state change observer for all partitions - for _, partition := range v.partitions { + for idx, partition := range v.partitions { + idx := idx partition := partition + observer := partition.observeStateChanges() // create a goroutine that updates the view state based all partition states go func() { for { select { - case _, ok := <-observer.C(): + case newState, ok := <-observer.C(): if !ok { return } // something has changed, so update the state - updateViewState() + updateViewState(idx, newState) case <-ctx.Done(): observer.Stop() return @@ -430,6 +441,32 @@ func (v *View) CurrentState() ViewState { return ViewState(v.state.State()) } +// ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view +// by reading from a channel. +// It is crucial to continuously read from that channel, otherwise the View might deadlock upon +// state changes. +// If the observer is not needed, the caller must call observer.Stop() +// +// Example +// +// view := goka.NewView(...) +// go view.Run(ctx) +// +// go func(){ +// obs := view.ObserveStateChanges() +// defer obs.Stop() +// for { +// select{ +// case state, ok := <-obs.C: +// // handle state (or closed channel) +// case <-ctx.Done(): +// } +// } +// }() +func (v *View) ObserveStateChanges() *StateChangeObserver { + return v.state.ObserveStateChange() +} + // Stats returns a set of performance metrics of the view. func (v *View) Stats(ctx context.Context) *ViewStats { return v.statsWithContext(ctx)