From 404bc24720b1f8536803c7a1a819a89392399aa8 Mon Sep 17 00:00:00 2001 From: frairon Date: Mon, 27 Apr 2020 21:34:17 +0200 Subject: [PATCH] #248 exposing partition table's connection state to view --- partition_table.go | 20 ++++++++-- signal.go | 69 +++++++++++++++++++++++++++++++-- stats.go | 2 + view.go | 95 +++++++++++++++++++++++++++++++++++++++++----- view_test.go | 6 +-- 5 files changed, 173 insertions(+), 19 deletions(-) diff --git a/partition_table.go b/partition_table.go index 2b04043c..04f62acc 100644 --- a/partition_table.go +++ b/partition_table.go @@ -65,6 +65,7 @@ func newPartitionTableState() *Signal { return NewSignal( State(PartitionStopped), State(PartitionInitializing), + State(PartitionConnecting), State(PartitionRecovering), State(PartitionPreparing), State(PartitionRunning), @@ -318,9 +319,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr defer p.log.Debugf("... Loading done") - if stopAfterCatchup { - p.state.SetState(State(PartitionRecovering)) - } + p.state.SetState(State(PartitionConnecting)) partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset) if err != nil { @@ -337,6 +336,12 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr p.drainConsumer(partConsumer, errs) }() + if stopAfterCatchup { + p.state.SetState(State(PartitionRecovering)) + } else { + p.state.SetState(State(PartitionRunning)) + } + // load messages and stop when you're at HWM loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup) @@ -354,6 +359,10 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr return } +func (p *PartitionTable) observeStateChanges() *StateChangeObserver { + return p.state.ObserveStateChange() +} + func (p *PartitionTable) markRecovered(ctx context.Context) error { var ( start = time.Now() @@ -617,6 +626,11 @@ func (p *PartitionTable) IsRecovered() bool { return p.state.IsState(State(PartitionRunning)) } +// CurrentState returns the partition's current status +func (p *PartitionTable) CurrentState() PartitionStatus { + return PartitionStatus(p.state.State()) +} + // WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning` func (p *PartitionTable) WaitRecovered() chan struct{} { return p.state.WaitForState(State(PartitionRunning)) diff --git a/signal.go b/signal.go index 81eead81..eeddeb69 100644 --- a/signal.go +++ b/signal.go @@ -17,10 +17,11 @@ type waiter struct { // Signal allows synchronization on a state, waiting for that state and checking // the current state type Signal struct { - m sync.Mutex - state State - waiters []*waiter - allowedStates map[State]bool + m sync.Mutex + state State + waiters []*waiter + stateChangeObservers []*StateChangeObserver + allowedStates map[State]bool } // NewSignal creates a new Signal based on the states @@ -56,6 +57,11 @@ func (s *Signal) SetState(state State) *Signal { } s.waiters = newWaiters + // notify the state change observers + for _, obs := range s.stateChangeObservers { + obs.c <- struct{}{} + } + return s } @@ -110,3 +116,58 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} { return w.done } + +// StateChangeObserver wraps a channel that triggers when the signal's state changes +type StateChangeObserver struct { + c chan struct{} + stop func() + s *Signal +} + +// Stop stops the observer. Its update channel will be closed and +func (s *StateChangeObserver) Stop() { + s.stop() +} + +// C returns the channel to observer state changes +func (s *StateChangeObserver) C() <-chan struct{} { + return s.c +} + +// State returns the current state of the Signal +func (s *StateChangeObserver) State() State { + return s.s.State() +} + +// ObserveStateChange returns a channel that receives state changes. +// Note that the caller must take care of consuming that channel, otherwise the Signal +// will block upon state changes +func (s *Signal) ObserveStateChange() *StateChangeObserver { + s.m.Lock() + defer s.m.Unlock() + + observer := &StateChangeObserver{ + c: make(chan struct{}, 1), + s: s, + } + + // the stop funtion stops the observer by closing its channel + // and removing it from the list of observers + observer.stop = func() { + s.m.Lock() + defer s.m.Unlock() + + // iterate over all observers and close *this* one + for idx, obs := range s.stateChangeObservers { + if obs == observer { + copy(s.stateChangeObservers[idx:], s.stateChangeObservers[idx+1:]) + s.stateChangeObservers[len(s.stateChangeObservers)-1] = nil + s.stateChangeObservers = s.stateChangeObservers[:len(s.stateChangeObservers)-1] + } + } + close(observer.c) + } + + s.stateChangeObservers = append(s.stateChangeObservers, observer) + return observer +} diff --git a/stats.go b/stats.go index 37ff8dce..709c62d2 100644 --- a/stats.go +++ b/stats.go @@ -14,6 +14,8 @@ const ( // PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files), // and has not actually started working yet. PartitionInitializing + // PartitionConnecting indicates the partition trying to (re-)connect to Kafka + PartitionConnecting // PartitionRecovering indicates the partition is recovering and the storage // is writing updates in bulk-mode (if the storage implementation supports it). PartitionRecovering diff --git a/view.go b/view.go index 28ab5d13..7b4f35a9 100644 --- a/view.go +++ b/view.go @@ -12,15 +12,30 @@ import ( "github.com/lovoo/goka/storage" ) +// ViewState represents the state of the view +type ViewState int + const ( // ViewStateIdle - the view is not started yet - ViewStateIdle State = iota - // ViewStateCatchUp - the view is still catching up + ViewStateIdle ViewState = iota + // ViewStateInitializing - the view (i.e. at least one partition) is initializing + ViewStateInitializing + // ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting + ViewStateConnecting + // ViewStateCatchUp - the view (i.e. at least one partition) is still catching up ViewStateCatchUp - // ViewStateRunning - the view has caught up and is running + // ViewStateRunning - the view (i.e. all partitions) has caught up and is running ViewStateRunning ) +func newViewSignal() *Signal { + return NewSignal(State(ViewStateIdle), + State(ViewStateInitializing), + State(ViewStateConnecting), + State(ViewStateCatchUp), + State(ViewStateRunning)).SetState(State(ViewStateIdle)) +} + // Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors. type Getter func(string) (interface{}, error) @@ -75,7 +90,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) log: opts.log.Prefix(fmt.Sprintf("View %s", topic)), consumer: consumer, tmgr: tmgr, - state: NewSignal(ViewStateIdle, ViewStateCatchUp, ViewStateRunning).SetState(ViewStateIdle), + state: newViewSignal(), } if err = v.createPartitions(brokers); err != nil { @@ -87,7 +102,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption) // WaitRunning returns a channel that will be closed when the view enters the running state func (v *View) WaitRunning() <-chan struct{} { - return v.state.WaitForState(ViewStateRunning) + return v.state.WaitForState(State(ViewStateRunning)) } func (v *View) createPartitions(brokers []string) (rerr error) { @@ -134,6 +149,62 @@ func (v *View) createPartitions(brokers []string) (rerr error) { return nil } +func (v *View) runStateMerger(ctx context.Context) { + + // when the state of any partition updates, + // get the *lowest* state, which is the partition least running. + // Then translate that state to a view state. + updateViewState := func() { + var lowestState = PartitionStatus(-1) + + for _, part := range v.partitions { + state := part.CurrentState() + if lowestState == -1 || state < lowestState { + lowestState = state + } + } + + switch lowestState { + case PartitionStopped: + v.state.SetState(State(ViewStateIdle)) + case PartitionInitializing: + v.state.SetState(State(ViewStateInitializing)) + case PartitionConnecting: + v.state.SetState(State(ViewStateConnecting)) + case PartitionRecovering: + v.state.SetState(State(ViewStateCatchUp)) + case PartitionPreparing: + v.state.SetState(State(ViewStateCatchUp)) + case PartitionRunning: + v.state.SetState(State(ViewStateRunning)) + default: + v.log.Printf("State merger received unknown partition state: %v", lowestState) + } + } + + // get a state change observer for all partitions + for _, partition := range v.partitions { + partition := partition + observer := partition.observeStateChanges() + // create a goroutine that updates the view state based all partition states + go func() { + for { + select { + case _, ok := <-observer.C(): + if !ok { + return + } + // something has changed, so update the state + updateViewState() + case <-ctx.Done(): + observer.Stop() + return + } + } + }() + } +} + // Run starts consuming the view's topic and saving updates in the local persistent cache. // // The view will shutdown in case of errors or when the context is closed. @@ -145,8 +216,10 @@ func (v *View) Run(ctx context.Context) (rerr error) { v.log.Debugf("starting") defer v.log.Debugf("stopped") - v.state.SetState(ViewStateCatchUp) - defer v.state.SetState(ViewStateIdle) + // update the view state asynchronously by observing + // the partition's state and translating that to the view + v.runStateMerger(ctx) + defer v.state.SetState(State(ViewStateIdle)) // close the view after running defer func() { @@ -177,8 +250,6 @@ func (v *View) Run(ctx context.Context) (rerr error) { default: } - v.state.SetState(ViewStateRunning) - catchupErrg, catchupCtx := multierr.NewErrGroup(ctx) for _, partition := range v.partitions { @@ -348,6 +419,12 @@ func (v *View) Recovered() bool { return true } +// CurrentState returns the current ViewState of the view +// This is useful for polling e.g. when implementing health checks or metrics +func (v *View) CurrentState() ViewState { + return ViewState(v.state.State()) +} + // Stats returns a set of performance metrics of the view. func (v *View) Stats(ctx context.Context) *ViewStats { return v.statsWithContext(ctx) diff --git a/view_test.go b/view_test.go index 3c29e5bf..2a270eda 100644 --- a/view_test.go +++ b/view_test.go @@ -592,7 +592,7 @@ func TestView_Run(t *testing.T) { pt.consumer = consumer view.partitions = []*PartitionTable{pt} - view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateIdle)) + view.state = newViewSignal() bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetOldest).Return(oldest, nil).AnyTimes() bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetNewest).Return(newest, nil).AnyTimes() @@ -647,7 +647,7 @@ func TestView_Run(t *testing.T) { pt.consumer = consumer view.partitions = []*PartitionTable{pt} - view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateIdle)) + view.state = newViewSignal() bm.mst.EXPECT().GetOffset(gomock.Any()).Return(int64(0), retErr).AnyTimes() bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetNewest).Return(sarama.OffsetNewest, retErr).AnyTimes() @@ -697,7 +697,7 @@ func TestView_WaitRunning(t *testing.T) { view, _, ctrl := createTestView(t, NewMockAutoConsumer(t, DefaultConfig())) defer ctrl.Finish() - view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateRunning)) + view.state = newViewSignal() var isRunning bool select {