Skip to content

Commit

Permalink
#248: refactored state merger, bugfix race condition when shutting do…
Browse files Browse the repository at this point in the history
…wn the signal/observers, created example
  • Loading branch information
frairon committed May 7, 2020
1 parent bd5bf97 commit f47ae9c
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 25 deletions.
10 changes: 10 additions & 0 deletions examples/6-reconnecting-view/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Reconnecting View

This example shows a reconnecting view by observing the state changes.
Run a local Kafka cluster by calling `make start` in folder `examples/`.

Then run this example (`go run 6-reconnecting-views/main.go`).
You should see the view state changes upon starting.

Now kill the kafka cluster `make stop`, you should see some error messages and the view
trying to reconnect using a default backoff
67 changes: 67 additions & 0 deletions examples/6-reconnecting-view/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

func main() {

view, err := goka.NewView(
// connect to example kafka cluster
[]string{"localhost:9092"},
// name does not matter, table will be empty
"restartable-view-test-table",
// codec doesn't matter, the table will be empty
new(codec.String),
// start the view autoconnecting
goka.WithViewAutoReconnect(),
)
if err != nil {
log.Fatalf("Cannot create view: %v", err)
}
// context we'll use to run the view and the state change observer
ctx, cancel := context.WithCancel(context.Background())

// channel used to wait for the view to finish
done := make(chan struct{})
go func() {
defer close(done)
err := view.Run(ctx)
if err != nil {
log.Printf("View finished with error: %v", err)
}
}()

// Get a state change observer and
go func() {
obs := view.ObserveStateChanges()
defer obs.Stop()
for {
select {
case state, ok := <-obs.C():
if !ok {
return
}
log.Printf("View is in state: %v", goka.ViewState(state))
case <-ctx.Done():
return
}
}
}()

go func() {
waiter := make(chan os.Signal, 1)
signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM)
<-waiter
cancel()
}()

<-done
}
18 changes: 14 additions & 4 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,25 +137,35 @@ func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool
}

func (p *PartitionTable) loadRestarting(ctx context.Context, stopAfterCatchup bool) error {
var resetTimer *time.Timer
var (
resetTimer *time.Timer
retries int
)

for {
err := p.load(ctx, stopAfterCatchup)
if err != nil {
p.log.Printf("Error while catching up, but we'll try to keep it running: %v", err)
p.log.Printf("Error while starting up: %v", err)

retries++
if resetTimer != nil {
resetTimer.Stop()
}
resetTimer = time.AfterFunc(p.backoffResetTimeout, p.backoff.Reset)
resetTimer = time.AfterFunc(p.backoffResetTimeout, func() {
p.backoff.Reset()
retries = 0
})
} else {
return nil
}

retryDuration := p.backoff.Duration()
p.log.Printf("Will retry in %.0f seconds (retried %d times so far)", retryDuration.Seconds(), retries)
select {
case <-ctx.Done():
return nil

case <-time.After(p.backoff.Duration()):
case <-time.After(retryDuration):
}
}
}
Expand Down
29 changes: 19 additions & 10 deletions signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (s *Signal) SetState(state State) *Signal {

// notify the state change observers
for _, obs := range s.stateChangeObservers {
obs.c <- struct{}{}
obs.notify(state)
}

return s
Expand Down Expand Up @@ -119,9 +119,12 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} {

// StateChangeObserver wraps a channel that triggers when the signal's state changes
type StateChangeObserver struct {
c chan struct{}
// state notifier channel
c chan State
// closed is closed when the observer is closed to avoid sending to a closed channel
closed chan struct{}
// stop is a callback to stop the observer
stop func()
s *Signal
}

// Stop stops the observer. Its update channel will be closed and
Expand All @@ -130,30 +133,36 @@ func (s *StateChangeObserver) Stop() {
}

// C returns the channel to observer state changes
func (s *StateChangeObserver) C() <-chan struct{} {
func (s *StateChangeObserver) C() <-chan State {
return s.c
}

// State returns the current state of the Signal
func (s *StateChangeObserver) State() State {
return s.s.State()
func (s *StateChangeObserver) notify(state State) {
select {
case <-s.closed:
case s.c <- state:
}
}

// ObserveStateChange returns a channel that receives state changes.
// Note that the caller must take care of consuming that channel, otherwise the Signal
// will block upon state changes
// will block upon state changes.
func (s *Signal) ObserveStateChange() *StateChangeObserver {
s.m.Lock()
defer s.m.Unlock()

observer := &StateChangeObserver{
c: make(chan struct{}, 1),
s: s,
c: make(chan State, 1),
closed: make(chan struct{}),
}

// initialize the observer with the current state
observer.notify(s.State())

// the stop funtion stops the observer by closing its channel
// and removing it from the list of observers
observer.stop = func() {
close(observer.closed)
s.m.Lock()
defer s.m.Unlock()

Expand Down
59 changes: 48 additions & 11 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,25 @@ func (v *View) createPartitions(brokers []string) (rerr error) {

func (v *View) runStateMerger(ctx context.Context) {

// when the state of any partition updates,
// get the *lowest* state, which is the partition least running.
// Then translate that state to a view state.
updateViewState := func() {
var (
states = make(map[int]PartitionStatus)
m sync.Mutex
)

// internal callback that will be called when the state of any
// partition changes.
// Then the "lowest" state of all partitions will be selected and
// translated into the respective ViewState
updateViewState := func(idx int, state State) {
m.Lock()
defer m.Unlock()
states[idx] = PartitionStatus(state)

var lowestState = PartitionStatus(-1)

for _, part := range v.partitions {
state := part.CurrentState()
if lowestState == -1 || state < lowestState {
lowestState = state
for _, partitionState := range states {
if lowestState == -1 || partitionState < lowestState {
lowestState = partitionState
}
}
var newState = ViewState(-1)
Expand All @@ -187,19 +196,21 @@ func (v *View) runStateMerger(ctx context.Context) {
}

// get a state change observer for all partitions
for _, partition := range v.partitions {
for idx, partition := range v.partitions {
idx := idx
partition := partition

observer := partition.observeStateChanges()
// create a goroutine that updates the view state based all partition states
go func() {
for {
select {
case _, ok := <-observer.C():
case newState, ok := <-observer.C():
if !ok {
return
}
// something has changed, so update the state
updateViewState()
updateViewState(idx, newState)
case <-ctx.Done():
observer.Stop()
return
Expand Down Expand Up @@ -430,6 +441,32 @@ func (v *View) CurrentState() ViewState {
return ViewState(v.state.State())
}

// ObserveStateChanges returns a StateChangeObserver that allows to handle state changes of the view
// by reading from a channel.
// It is crucial to continuously read from that channel, otherwise the View might deadlock upon
// state changes.
// If the observer is not needed, the caller must call observer.Stop()
//
// Example
//
// view := goka.NewView(...)
// go view.Run(ctx)
//
// go func(){
// obs := view.ObserveStateChanges()
// defer obs.Stop()
// for {
// select{
// case state, ok := <-obs.C:
// // handle state (or closed channel)
// case <-ctx.Done():
// }
// }
// }()
func (v *View) ObserveStateChanges() *StateChangeObserver {
return v.state.ObserveStateChange()
}

// Stats returns a set of performance metrics of the view.
func (v *View) Stats(ctx context.Context) *ViewStats {
return v.statsWithContext(ctx)
Expand Down

0 comments on commit f47ae9c

Please sign in to comment.