Skip to content

Commit

Permalink
Merge pull request #256 from lovoo/248-connection-state
Browse files Browse the repository at this point in the history
248 connection state
  • Loading branch information
Jan Bickel committed Jul 8, 2020
2 parents f7ae456 + dc77c63 commit f79a6bf
Show file tree
Hide file tree
Showing 11 changed files with 402 additions and 67 deletions.
16 changes: 16 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,19 @@ go func(){
cancel()

```


## Offset bug in local storage
In 0.1.4 there was a bug that caused the table offset being stored in the local cache always be +1 compared the actual offset stored in kafka.
A second bug kind of evened it out so it never was an issue.

From 0.9.x, both bugs are fixed. However, if you upgrade goka and restart a processor using the same cache files that were maintained by the old version you'll see a warning like this
```
Error: local offset is higher than partition offset. topic some-topic, partition 0, hwm 1312, local offset 1314. This can have several reasons:
(1) The kafka topic storing the table is gone --> delete the local cache and restart!
(2) the processor crashed last time while writing to disk.
(3) You found a bug!
```
This is because goka sees an offset that it is not expecting.
You should see this error only once per partition and processor. The offset will be fixed automatically. If it appears on every start or regularily, it might actually a bug or some error and should be further investigated
(or reported to goka :)).
10 changes: 10 additions & 0 deletions examples/6-reconnecting-view/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Reconnecting View

This example shows a reconnecting view by observing the state changes.
Run a local Kafka cluster by calling `make start` in folder `examples/`.

Then run this example (`go run 6-reconnecting-views/main.go`).
You should see the view state changes upon starting.

Now kill the kafka cluster `make stop`, you should see some error messages and the view
trying to reconnect using a default backoff
67 changes: 67 additions & 0 deletions examples/6-reconnecting-view/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

func main() {

view, err := goka.NewView(
// connect to example kafka cluster
[]string{"localhost:9092"},
// name does not matter, table will be empty
"restartable-view-test-table",
// codec doesn't matter, the table will be empty
new(codec.String),
// start the view autoconnecting
goka.WithViewAutoReconnect(),
)
if err != nil {
log.Fatalf("Cannot create view: %v", err)
}
// context we'll use to run the view and the state change observer
ctx, cancel := context.WithCancel(context.Background())

// channel used to wait for the view to finish
done := make(chan struct{})
go func() {
defer close(done)
err := view.Run(ctx)
if err != nil {
log.Printf("View finished with error: %v", err)
}
}()

// Get a state change observer and
go func() {
obs := view.ObserveStateChanges()
defer obs.Stop()
for {
select {
case state, ok := <-obs.C():
if !ok {
return
}
log.Printf("View is in state: %v", goka.ViewState(state))
case <-ctx.Done():
return
}
}
}()

go func() {
waiter := make(chan os.Signal, 1)
signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM)
<-waiter
cancel()
}()

<-done
}
10 changes: 8 additions & 2 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error {
defer pp.state.SetState(PPStateRunning)

if pp.table != nil {
go pp.table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
pp.log.Debugf("catching up table")
defer pp.log.Debugf("catching up table done")
return pp.table.SetupAndRecover(setupCtx)
return pp.table.SetupAndRecover(setupCtx, false)
})
}

Expand All @@ -202,8 +203,9 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error {
)
pp.joins[join.Topic()] = table

go table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
return table.SetupAndRecover(setupCtx)
return table.SetupAndRecover(setupCtx, false)
})
}

Expand Down Expand Up @@ -358,6 +360,10 @@ func (pp *PartitionProcessor) enqueueStatsUpdate(ctx context.Context, updater fu
select {
case pp.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

Expand Down
107 changes: 71 additions & 36 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ type PartitionTable struct {
tmgr TopicManager
updateCallback UpdateCallback

stats *TableStats
cancelStatsLoop context.CancelFunc
requestStats chan bool
responseStats chan *TableStats
updateStats chan func()
stats *TableStats
requestStats chan bool
responseStats chan *TableStats
updateStats chan func()

offsetM sync.Mutex
// current offset
Expand All @@ -65,6 +64,7 @@ func newPartitionTableState() *Signal {
return NewSignal(
State(PartitionStopped),
State(PartitionInitializing),
State(PartitionConnecting),
State(PartitionRecovering),
State(PartitionPreparing),
State(PartitionRunning),
Expand All @@ -81,8 +81,6 @@ func newPartitionTable(topic string,
backoff Backoff,
backoffResetTimeout time.Duration) *PartitionTable {

statsLoopCtx, cancel := context.WithCancel(context.Background())

pt := &PartitionTable{
partition: partition,
state: newPartitionTableState(),
Expand All @@ -95,23 +93,21 @@ func newPartitionTable(topic string,
stallPeriod: defaultStallPeriod,
stalledTimeout: defaultStalledTimeout,

stats: newTableStats(),
requestStats: make(chan bool),
responseStats: make(chan *TableStats, 1),
updateStats: make(chan func(), 10),
cancelStatsLoop: cancel,
stats: newTableStats(),
requestStats: make(chan bool),
responseStats: make(chan *TableStats, 1),
updateStats: make(chan func(), 10),

backoff: backoff,
backoffResetTimeout: backoffResetTimeout,
}

go pt.runStatsLoop(statsLoopCtx)

return pt
}

// SetupAndRecover sets up the partition storage and recovers to HWM
func (p *PartitionTable) SetupAndRecover(ctx context.Context) error {
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error {

err := p.setup(ctx)
if err != nil {
return err
Expand All @@ -125,34 +121,53 @@ func (p *PartitionTable) SetupAndRecover(ctx context.Context) error {
default:
}

if restartOnError {
return p.loadRestarting(ctx, true)
}
return p.load(ctx, true)
}

// CatchupForever starts catching the partition table forever (until the context is cancelled).
// Option restartOnError allows the view to stay open/intact even in case of consumer errors
func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error {
if restartOnError {
var resetTimer *time.Timer
for {
err := p.load(ctx, false)
if err != nil {
p.log.Printf("Error while catching up, but we'll try to keep it running: %v", err)
return p.loadRestarting(ctx, false)
}
return p.load(ctx, false)
}

if resetTimer != nil {
resetTimer.Stop()
}
resetTimer = time.AfterFunc(p.backoffResetTimeout, p.backoff.Reset)
}
func (p *PartitionTable) loadRestarting(ctx context.Context, stopAfterCatchup bool) error {
var (
resetTimer *time.Timer
retries int
)

select {
case <-ctx.Done():
return nil
for {
err := p.load(ctx, stopAfterCatchup)
if err != nil {
p.log.Printf("Error while starting up: %v", err)

case <-time.After(p.backoff.Duration()):
retries++
if resetTimer != nil {
resetTimer.Stop()
}
resetTimer = time.AfterFunc(p.backoffResetTimeout, func() {
p.backoff.Reset()
retries = 0
})
} else {
return nil
}

retryDuration := p.backoff.Duration()
p.log.Printf("Will retry in %.0f seconds (retried %d times so far)", retryDuration.Seconds(), retries)
select {
case <-ctx.Done():
return nil

case <-time.After(retryDuration):
}
}
return p.load(ctx, false)
}

// Setup creates the storage for the partition table
Expand Down Expand Up @@ -202,7 +217,7 @@ WaitLoop:
case <-ticker.C:
p.log.Printf("creating storage for topic %s/%d for %.1f minutes ...", p.topic, p.partition, time.Since(start).Minutes())
case <-done:
p.log.Printf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes())
p.log.Debugf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes())
if err != nil {
return nil, fmt.Errorf("error building storage: %v", err)
}
Expand Down Expand Up @@ -267,6 +282,8 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
return
}()

p.state.SetState(State(PartitionConnecting))

// fetch local offset
storedOffset, err = p.st.GetOffset(offsetNotStored)
if err != nil {
Expand Down Expand Up @@ -318,10 +335,6 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

defer p.log.Debugf("... Loading done")

if stopAfterCatchup {
p.state.SetState(State(PartitionRecovering))
}

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err))
Expand All @@ -337,6 +350,12 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
p.drainConsumer(partConsumer, errs)
}()

if stopAfterCatchup {
p.state.SetState(State(PartitionRecovering))
} else {
p.state.SetState(State(PartitionRunning))
}

// load messages and stop when you're at HWM
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)

Expand All @@ -354,6 +373,10 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
return
}

func (p *PartitionTable) observeStateChanges() *StateChangeObserver {
return p.state.ObserveStateChange()
}

func (p *PartitionTable) markRecovered(ctx context.Context) error {
var (
start = time.Now()
Expand Down Expand Up @@ -520,10 +543,17 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func())
select {
case p.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

func (p *PartitionTable) runStatsLoop(ctx context.Context) {
// RunStatsLoop starts the handler for stats requests. This loop runs detached from the
// recover/catchup mechanism so clients can always request stats even if the partition table is not
// running (like a processor table after it's recovered).
func (p *PartitionTable) RunStatsLoop(ctx context.Context) {

updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()
Expand Down Expand Up @@ -617,6 +647,11 @@ func (p *PartitionTable) IsRecovered() bool {
return p.state.IsState(State(PartitionRunning))
}

// CurrentState returns the partition's current status
func (p *PartitionTable) CurrentState() PartitionStatus {
return PartitionStatus(p.state.State())
}

// WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`
func (p *PartitionTable) WaitRecovered() chan struct{} {
return p.state.WaitForState(State(PartitionRunning))
Expand Down
8 changes: 4 additions & 4 deletions partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func TestPT_SetupAndCatchupToHwm(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNil(t, err)
test.AssertTrue(t, count == msgsToRecover)
})
Expand All @@ -920,7 +920,7 @@ func TestPT_SetupAndCatchupToHwm(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNotNil(t, err)
})
}
Expand Down Expand Up @@ -973,7 +973,7 @@ func TestPT_SetupAndCatchupForever(t *testing.T) {
}
}()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNil(t, err)
cancel()
})
Expand All @@ -998,7 +998,7 @@ func TestPT_SetupAndCatchupForever(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNotNil(t, err)
cancel()
})
Expand Down
Loading

0 comments on commit f79a6bf

Please sign in to comment.