Skip to content

Commit

Permalink
#248 exposing partition table's connection state to view
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed Apr 27, 2020
1 parent f7ae456 commit 404bc24
Show file tree
Hide file tree
Showing 5 changed files with 173 additions and 19 deletions.
20 changes: 17 additions & 3 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func newPartitionTableState() *Signal {
return NewSignal(
State(PartitionStopped),
State(PartitionInitializing),
State(PartitionConnecting),
State(PartitionRecovering),
State(PartitionPreparing),
State(PartitionRunning),
Expand Down Expand Up @@ -318,9 +319,7 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

defer p.log.Debugf("... Loading done")

if stopAfterCatchup {
p.state.SetState(State(PartitionRecovering))
}
p.state.SetState(State(PartitionConnecting))

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
Expand All @@ -337,6 +336,12 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
p.drainConsumer(partConsumer, errs)
}()

if stopAfterCatchup {
p.state.SetState(State(PartitionRecovering))
} else {
p.state.SetState(State(PartitionRunning))
}

// load messages and stop when you're at HWM
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)

Expand All @@ -354,6 +359,10 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
return
}

func (p *PartitionTable) observeStateChanges() *StateChangeObserver {
return p.state.ObserveStateChange()
}

func (p *PartitionTable) markRecovered(ctx context.Context) error {
var (
start = time.Now()
Expand Down Expand Up @@ -617,6 +626,11 @@ func (p *PartitionTable) IsRecovered() bool {
return p.state.IsState(State(PartitionRunning))
}

// CurrentState returns the partition's current status
func (p *PartitionTable) CurrentState() PartitionStatus {
return PartitionStatus(p.state.State())
}

// WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`
func (p *PartitionTable) WaitRecovered() chan struct{} {
return p.state.WaitForState(State(PartitionRunning))
Expand Down
69 changes: 65 additions & 4 deletions signal.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type waiter struct {
// Signal allows synchronization on a state, waiting for that state and checking
// the current state
type Signal struct {
m sync.Mutex
state State
waiters []*waiter
allowedStates map[State]bool
m sync.Mutex
state State
waiters []*waiter
stateChangeObservers []*StateChangeObserver
allowedStates map[State]bool
}

// NewSignal creates a new Signal based on the states
Expand Down Expand Up @@ -56,6 +57,11 @@ func (s *Signal) SetState(state State) *Signal {
}
s.waiters = newWaiters

// notify the state change observers
for _, obs := range s.stateChangeObservers {
obs.c <- struct{}{}
}

return s
}

Expand Down Expand Up @@ -110,3 +116,58 @@ func (s *Signal) waitForWaiter(state State, w *waiter) chan struct{} {

return w.done
}

// StateChangeObserver wraps a channel that triggers when the signal's state changes
type StateChangeObserver struct {
c chan struct{}
stop func()
s *Signal
}

// Stop stops the observer. Its update channel will be closed and
func (s *StateChangeObserver) Stop() {
s.stop()
}

// C returns the channel to observer state changes
func (s *StateChangeObserver) C() <-chan struct{} {
return s.c
}

// State returns the current state of the Signal
func (s *StateChangeObserver) State() State {
return s.s.State()
}

// ObserveStateChange returns a channel that receives state changes.
// Note that the caller must take care of consuming that channel, otherwise the Signal
// will block upon state changes
func (s *Signal) ObserveStateChange() *StateChangeObserver {
s.m.Lock()
defer s.m.Unlock()

observer := &StateChangeObserver{
c: make(chan struct{}, 1),
s: s,
}

// the stop funtion stops the observer by closing its channel
// and removing it from the list of observers
observer.stop = func() {
s.m.Lock()
defer s.m.Unlock()

// iterate over all observers and close *this* one
for idx, obs := range s.stateChangeObservers {
if obs == observer {
copy(s.stateChangeObservers[idx:], s.stateChangeObservers[idx+1:])
s.stateChangeObservers[len(s.stateChangeObservers)-1] = nil
s.stateChangeObservers = s.stateChangeObservers[:len(s.stateChangeObservers)-1]
}
}
close(observer.c)
}

s.stateChangeObservers = append(s.stateChangeObservers, observer)
return observer
}
2 changes: 2 additions & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ const (
// PartitionInitializing indicates that the underlying storage is initializing (e.g. opening leveldb files),
// and has not actually started working yet.
PartitionInitializing
// PartitionConnecting indicates the partition trying to (re-)connect to Kafka
PartitionConnecting
// PartitionRecovering indicates the partition is recovering and the storage
// is writing updates in bulk-mode (if the storage implementation supports it).
PartitionRecovering
Expand Down
95 changes: 86 additions & 9 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,30 @@ import (
"github.com/lovoo/goka/storage"
)

// ViewState represents the state of the view
type ViewState int

const (
// ViewStateIdle - the view is not started yet
ViewStateIdle State = iota
// ViewStateCatchUp - the view is still catching up
ViewStateIdle ViewState = iota
// ViewStateInitializing - the view (i.e. at least one partition) is initializing
ViewStateInitializing
// ViewStateConnecting - the view (i.e. at least one partition) is (re-)connecting
ViewStateConnecting
// ViewStateCatchUp - the view (i.e. at least one partition) is still catching up
ViewStateCatchUp
// ViewStateRunning - the view has caught up and is running
// ViewStateRunning - the view (i.e. all partitions) has caught up and is running
ViewStateRunning
)

func newViewSignal() *Signal {
return NewSignal(State(ViewStateIdle),
State(ViewStateInitializing),
State(ViewStateConnecting),
State(ViewStateCatchUp),
State(ViewStateRunning)).SetState(State(ViewStateIdle))
}

// Getter functions return a value for a key or an error. If no value exists for the key, nil is returned without errors.
type Getter func(string) (interface{}, error)

Expand Down Expand Up @@ -75,7 +90,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption)
log: opts.log.Prefix(fmt.Sprintf("View %s", topic)),
consumer: consumer,
tmgr: tmgr,
state: NewSignal(ViewStateIdle, ViewStateCatchUp, ViewStateRunning).SetState(ViewStateIdle),
state: newViewSignal(),
}

if err = v.createPartitions(brokers); err != nil {
Expand All @@ -87,7 +102,7 @@ func NewView(brokers []string, topic Table, codec Codec, options ...ViewOption)

// WaitRunning returns a channel that will be closed when the view enters the running state
func (v *View) WaitRunning() <-chan struct{} {
return v.state.WaitForState(ViewStateRunning)
return v.state.WaitForState(State(ViewStateRunning))
}

func (v *View) createPartitions(brokers []string) (rerr error) {
Expand Down Expand Up @@ -134,6 +149,62 @@ func (v *View) createPartitions(brokers []string) (rerr error) {
return nil
}

func (v *View) runStateMerger(ctx context.Context) {

// when the state of any partition updates,
// get the *lowest* state, which is the partition least running.
// Then translate that state to a view state.
updateViewState := func() {
var lowestState = PartitionStatus(-1)

for _, part := range v.partitions {
state := part.CurrentState()
if lowestState == -1 || state < lowestState {
lowestState = state
}
}

switch lowestState {
case PartitionStopped:
v.state.SetState(State(ViewStateIdle))
case PartitionInitializing:
v.state.SetState(State(ViewStateInitializing))
case PartitionConnecting:
v.state.SetState(State(ViewStateConnecting))
case PartitionRecovering:
v.state.SetState(State(ViewStateCatchUp))
case PartitionPreparing:
v.state.SetState(State(ViewStateCatchUp))
case PartitionRunning:
v.state.SetState(State(ViewStateRunning))
default:
v.log.Printf("State merger received unknown partition state: %v", lowestState)
}
}

// get a state change observer for all partitions
for _, partition := range v.partitions {
partition := partition
observer := partition.observeStateChanges()
// create a goroutine that updates the view state based all partition states
go func() {
for {
select {
case _, ok := <-observer.C():
if !ok {
return
}
// something has changed, so update the state
updateViewState()
case <-ctx.Done():
observer.Stop()
return
}
}
}()
}
}

// Run starts consuming the view's topic and saving updates in the local persistent cache.
//
// The view will shutdown in case of errors or when the context is closed.
Expand All @@ -145,8 +216,10 @@ func (v *View) Run(ctx context.Context) (rerr error) {
v.log.Debugf("starting")
defer v.log.Debugf("stopped")

v.state.SetState(ViewStateCatchUp)
defer v.state.SetState(ViewStateIdle)
// update the view state asynchronously by observing
// the partition's state and translating that to the view
v.runStateMerger(ctx)
defer v.state.SetState(State(ViewStateIdle))

// close the view after running
defer func() {
Expand Down Expand Up @@ -177,8 +250,6 @@ func (v *View) Run(ctx context.Context) (rerr error) {
default:
}

v.state.SetState(ViewStateRunning)

catchupErrg, catchupCtx := multierr.NewErrGroup(ctx)

for _, partition := range v.partitions {
Expand Down Expand Up @@ -348,6 +419,12 @@ func (v *View) Recovered() bool {
return true
}

// CurrentState returns the current ViewState of the view
// This is useful for polling e.g. when implementing health checks or metrics
func (v *View) CurrentState() ViewState {
return ViewState(v.state.State())
}

// Stats returns a set of performance metrics of the view.
func (v *View) Stats(ctx context.Context) *ViewStats {
return v.statsWithContext(ctx)
Expand Down
6 changes: 3 additions & 3 deletions view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ func TestView_Run(t *testing.T) {

pt.consumer = consumer
view.partitions = []*PartitionTable{pt}
view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateIdle))
view.state = newViewSignal()

bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetOldest).Return(oldest, nil).AnyTimes()
bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetNewest).Return(newest, nil).AnyTimes()
Expand Down Expand Up @@ -647,7 +647,7 @@ func TestView_Run(t *testing.T) {

pt.consumer = consumer
view.partitions = []*PartitionTable{pt}
view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateIdle))
view.state = newViewSignal()

bm.mst.EXPECT().GetOffset(gomock.Any()).Return(int64(0), retErr).AnyTimes()
bm.tmgr.EXPECT().GetOffset(pt.topic, pt.partition, sarama.OffsetNewest).Return(sarama.OffsetNewest, retErr).AnyTimes()
Expand Down Expand Up @@ -697,7 +697,7 @@ func TestView_WaitRunning(t *testing.T) {
view, _, ctrl := createTestView(t, NewMockAutoConsumer(t, DefaultConfig()))
defer ctrl.Finish()

view.state = NewSignal(State(ViewStateCatchUp), State(ViewStateRunning), State(ViewStateIdle)).SetState(State(ViewStateRunning))
view.state = newViewSignal()

var isRunning bool
select {
Expand Down

0 comments on commit 404bc24

Please sign in to comment.