Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

248 connection state #256

Merged
merged 6 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions MIGRATION.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,19 @@ go func(){
cancel()

```


## Offset bug in local storage
In 0.1.4 there was a bug that caused the table offset being stored in the local cache always be +1 compared the actual offset stored in kafka.
A second bug kind of evened it out so it never was an issue.

From 0.9.x, both bugs are fixed. However, if you upgrade goka and restart a processor using the same cache files that were maintained by the old version you'll see a warning like this
```
Error: local offset is higher than partition offset. topic some-topic, partition 0, hwm 1312, local offset 1314. This can have several reasons:
(1) The kafka topic storing the table is gone --> delete the local cache and restart!
(2) the processor crashed last time while writing to disk.
(3) You found a bug!
```
This is because goka sees an offset that it is not expecting.
You should see this error only once per partition and processor. The offset will be fixed automatically. If it appears on every start or regularily, it might actually a bug or some error and should be further investigated
(or reported to goka :)).
10 changes: 10 additions & 0 deletions examples/6-reconnecting-view/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
## Reconnecting View

This example shows a reconnecting view by observing the state changes.
Run a local Kafka cluster by calling `make start` in folder `examples/`.

Then run this example (`go run 6-reconnecting-views/main.go`).
You should see the view state changes upon starting.

Now kill the kafka cluster `make stop`, you should see some error messages and the view
trying to reconnect using a default backoff
67 changes: 67 additions & 0 deletions examples/6-reconnecting-view/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package main

import (
"context"
"log"
"os"
"os/signal"
"syscall"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

func main() {

view, err := goka.NewView(
// connect to example kafka cluster
[]string{"localhost:9092"},
// name does not matter, table will be empty
"restartable-view-test-table",
// codec doesn't matter, the table will be empty
new(codec.String),
// start the view autoconnecting
goka.WithViewAutoReconnect(),
)
if err != nil {
log.Fatalf("Cannot create view: %v", err)
}
// context we'll use to run the view and the state change observer
ctx, cancel := context.WithCancel(context.Background())

// channel used to wait for the view to finish
done := make(chan struct{})
go func() {
defer close(done)
err := view.Run(ctx)
if err != nil {
log.Printf("View finished with error: %v", err)
}
}()

// Get a state change observer and
go func() {
obs := view.ObserveStateChanges()
defer obs.Stop()
for {
select {
case state, ok := <-obs.C():
if !ok {
return
}
log.Printf("View is in state: %v", goka.ViewState(state))
case <-ctx.Done():
return
}
}
}()

go func() {
waiter := make(chan os.Signal, 1)
signal.Notify(waiter, syscall.SIGINT, syscall.SIGTERM)
<-waiter
cancel()
}()

<-done
}
10 changes: 8 additions & 2 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,11 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error {
defer pp.state.SetState(PPStateRunning)

if pp.table != nil {
go pp.table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
pp.log.Debugf("catching up table")
defer pp.log.Debugf("catching up table done")
return pp.table.SetupAndRecover(setupCtx)
return pp.table.SetupAndRecover(setupCtx, false)
})
}

Expand All @@ -202,8 +203,9 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error {
)
pp.joins[join.Topic()] = table

go table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
return table.SetupAndRecover(setupCtx)
return table.SetupAndRecover(setupCtx, false)
})
}

Expand Down Expand Up @@ -358,6 +360,10 @@ func (pp *PartitionProcessor) enqueueStatsUpdate(ctx context.Context, updater fu
select {
case pp.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

Expand Down
107 changes: 71 additions & 36 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ type PartitionTable struct {
tmgr TopicManager
updateCallback UpdateCallback

stats *TableStats
cancelStatsLoop context.CancelFunc
requestStats chan bool
responseStats chan *TableStats
updateStats chan func()
stats *TableStats
requestStats chan bool
responseStats chan *TableStats
updateStats chan func()

offsetM sync.Mutex
// current offset
Expand All @@ -65,6 +64,7 @@ func newPartitionTableState() *Signal {
return NewSignal(
State(PartitionStopped),
State(PartitionInitializing),
State(PartitionConnecting),
State(PartitionRecovering),
State(PartitionPreparing),
State(PartitionRunning),
Expand All @@ -81,8 +81,6 @@ func newPartitionTable(topic string,
backoff Backoff,
backoffResetTimeout time.Duration) *PartitionTable {

statsLoopCtx, cancel := context.WithCancel(context.Background())

pt := &PartitionTable{
partition: partition,
state: newPartitionTableState(),
Expand All @@ -95,23 +93,21 @@ func newPartitionTable(topic string,
stallPeriod: defaultStallPeriod,
stalledTimeout: defaultStalledTimeout,

stats: newTableStats(),
requestStats: make(chan bool),
responseStats: make(chan *TableStats, 1),
updateStats: make(chan func(), 10),
cancelStatsLoop: cancel,
stats: newTableStats(),
requestStats: make(chan bool),
responseStats: make(chan *TableStats, 1),
updateStats: make(chan func(), 10),

backoff: backoff,
backoffResetTimeout: backoffResetTimeout,
}

go pt.runStatsLoop(statsLoopCtx)

return pt
}

// SetupAndRecover sets up the partition storage and recovers to HWM
func (p *PartitionTable) SetupAndRecover(ctx context.Context) error {
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error {

err := p.setup(ctx)
if err != nil {
return err
Expand All @@ -125,34 +121,53 @@ func (p *PartitionTable) SetupAndRecover(ctx context.Context) error {
default:
}

if restartOnError {
return p.loadRestarting(ctx, true)
}
return p.load(ctx, true)
}

// CatchupForever starts catching the partition table forever (until the context is cancelled).
// Option restartOnError allows the view to stay open/intact even in case of consumer errors
func (p *PartitionTable) CatchupForever(ctx context.Context, restartOnError bool) error {
if restartOnError {
var resetTimer *time.Timer
for {
err := p.load(ctx, false)
if err != nil {
p.log.Printf("Error while catching up, but we'll try to keep it running: %v", err)
return p.loadRestarting(ctx, false)
}
return p.load(ctx, false)
}

if resetTimer != nil {
resetTimer.Stop()
}
resetTimer = time.AfterFunc(p.backoffResetTimeout, p.backoff.Reset)
}
func (p *PartitionTable) loadRestarting(ctx context.Context, stopAfterCatchup bool) error {
var (
resetTimer *time.Timer
retries int
)

select {
case <-ctx.Done():
return nil
for {
err := p.load(ctx, stopAfterCatchup)
if err != nil {
p.log.Printf("Error while starting up: %v", err)

case <-time.After(p.backoff.Duration()):
retries++
if resetTimer != nil {
resetTimer.Stop()
}
resetTimer = time.AfterFunc(p.backoffResetTimeout, func() {
p.backoff.Reset()
retries = 0
})
} else {
return nil
}

retryDuration := p.backoff.Duration()
p.log.Printf("Will retry in %.0f seconds (retried %d times so far)", retryDuration.Seconds(), retries)
select {
case <-ctx.Done():
return nil

case <-time.After(retryDuration):
}
}
return p.load(ctx, false)
}

// Setup creates the storage for the partition table
Expand Down Expand Up @@ -202,7 +217,7 @@ WaitLoop:
case <-ticker.C:
p.log.Printf("creating storage for topic %s/%d for %.1f minutes ...", p.topic, p.partition, time.Since(start).Minutes())
case <-done:
p.log.Printf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes())
p.log.Debugf("finished building storage for topic %s/%d in %.1f minutes", p.topic, p.partition, time.Since(start).Minutes())
if err != nil {
return nil, fmt.Errorf("error building storage: %v", err)
}
Expand Down Expand Up @@ -267,6 +282,8 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
return
}()

p.state.SetState(State(PartitionConnecting))

// fetch local offset
storedOffset, err = p.st.GetOffset(offsetNotStored)
if err != nil {
Expand Down Expand Up @@ -318,10 +335,6 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr

defer p.log.Debugf("... Loading done")

if stopAfterCatchup {
p.state.SetState(State(PartitionRecovering))
}

partConsumer, err = p.consumer.ConsumePartition(p.topic, p.partition, loadOffset)
if err != nil {
errs.Collect(fmt.Errorf("Error creating partition consumer for topic %s, partition %d, offset %d: %v", p.topic, p.partition, storedOffset, err))
Expand All @@ -337,6 +350,12 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
p.drainConsumer(partConsumer, errs)
}()

if stopAfterCatchup {
p.state.SetState(State(PartitionRecovering))
} else {
p.state.SetState(State(PartitionRunning))
}

// load messages and stop when you're at HWM
loadErr := p.loadMessages(ctx, partConsumer, hwm, stopAfterCatchup)

Expand All @@ -354,6 +373,10 @@ func (p *PartitionTable) load(ctx context.Context, stopAfterCatchup bool) (rerr
return
}

func (p *PartitionTable) observeStateChanges() *StateChangeObserver {
return p.state.ObserveStateChange()
}

func (p *PartitionTable) markRecovered(ctx context.Context) error {
var (
start = time.Now()
Expand Down Expand Up @@ -520,10 +543,17 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func())
select {
case p.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

func (p *PartitionTable) runStatsLoop(ctx context.Context) {
// RunStatsLoop starts the handler for stats requests. This loop runs detached from the
// recover/catchup mechanism so clients can always request stats even if the partition table is not
// running (like a processor table after it's recovered).
func (p *PartitionTable) RunStatsLoop(ctx context.Context) {

updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()
Expand Down Expand Up @@ -617,6 +647,11 @@ func (p *PartitionTable) IsRecovered() bool {
return p.state.IsState(State(PartitionRunning))
}

// CurrentState returns the partition's current status
func (p *PartitionTable) CurrentState() PartitionStatus {
return PartitionStatus(p.state.State())
}

// WaitRecovered returns a channel that closes when the partition table enters state `PartitionRunning`
func (p *PartitionTable) WaitRecovered() chan struct{} {
return p.state.WaitForState(State(PartitionRunning))
Expand Down
8 changes: 4 additions & 4 deletions partition_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,7 +895,7 @@ func TestPT_SetupAndCatchupToHwm(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNil(t, err)
test.AssertTrue(t, count == msgsToRecover)
})
Expand All @@ -920,7 +920,7 @@ func TestPT_SetupAndCatchupToHwm(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNotNil(t, err)
})
}
Expand Down Expand Up @@ -973,7 +973,7 @@ func TestPT_SetupAndCatchupForever(t *testing.T) {
}
}()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNil(t, err)
cancel()
})
Expand All @@ -998,7 +998,7 @@ func TestPT_SetupAndCatchupForever(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()

err := pt.SetupAndRecover(ctx)
err := pt.SetupAndRecover(ctx, false)
test.AssertNotNil(t, err)
cancel()
})
Expand Down
Loading