Skip to content

Commit

Permalink
bugfix goroutine-leak in statsloop in partition table
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon committed May 1, 2020
1 parent 40b650b commit bd5bf97
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 16 deletions.
6 changes: 6 additions & 0 deletions partition_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error {
defer pp.state.SetState(PPStateRunning)

if pp.table != nil {
go pp.table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
pp.log.Debugf("catching up table")
defer pp.log.Debugf("catching up table done")
Expand All @@ -202,6 +203,7 @@ func (pp *PartitionProcessor) Setup(ctx context.Context) error {
)
pp.joins[join.Topic()] = table

go table.RunStatsLoop(runnerCtx)
setupErrg.Go(func() error {
return table.SetupAndRecover(setupCtx, false)
})
Expand Down Expand Up @@ -358,6 +360,10 @@ func (pp *PartitionProcessor) enqueueStatsUpdate(ctx context.Context, updater fu
select {
case pp.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

Expand Down
32 changes: 17 additions & 15 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,10 @@ type PartitionTable struct {
tmgr TopicManager
updateCallback UpdateCallback

stats *TableStats
cancelStatsLoop context.CancelFunc
requestStats chan bool
responseStats chan *TableStats
updateStats chan func()
stats *TableStats
requestStats chan bool
responseStats chan *TableStats
updateStats chan func()

offsetM sync.Mutex
// current offset
Expand Down Expand Up @@ -82,8 +81,6 @@ func newPartitionTable(topic string,
backoff Backoff,
backoffResetTimeout time.Duration) *PartitionTable {

statsLoopCtx, cancel := context.WithCancel(context.Background())

pt := &PartitionTable{
partition: partition,
state: newPartitionTableState(),
Expand All @@ -96,23 +93,21 @@ func newPartitionTable(topic string,
stallPeriod: defaultStallPeriod,
stalledTimeout: defaultStalledTimeout,

stats: newTableStats(),
requestStats: make(chan bool),
responseStats: make(chan *TableStats, 1),
updateStats: make(chan func(), 10),
cancelStatsLoop: cancel,
stats: newTableStats(),
requestStats: make(chan bool),
responseStats: make(chan *TableStats, 1),
updateStats: make(chan func(), 10),

backoff: backoff,
backoffResetTimeout: backoffResetTimeout,
}

go pt.runStatsLoop(statsLoopCtx)

return pt
}

// SetupAndRecover sets up the partition storage and recovers to HWM
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error {

err := p.setup(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -538,10 +533,17 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func())
select {
case p.updateStats <- updater:
case <-ctx.Done():
default:
// going to default indicates the updateStats channel is not read, so so the stats
// loop is not actually running.
// We must not block here, so we'll skip the update
}
}

func (p *PartitionTable) runStatsLoop(ctx context.Context) {
// RunStatsLoop starts the handler for stats requests. This loop runs detached from the
// recover/catchup mechanism so clients can always request stats even if the partition table is not
// running (like a processor table after it's recovered).
func (p *PartitionTable) RunStatsLoop(ctx context.Context) {

updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()
Expand Down
1 change: 1 addition & 0 deletions view.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ func (v *View) Run(ctx context.Context) (rerr error) {

for _, partition := range v.partitions {
partition := partition
go partition.RunStatsLoop(ctx)
recoverErrg.Go(func() error {
return partition.SetupAndRecover(recoverCtx, v.opts.autoreconnect)
})
Expand Down
2 changes: 1 addition & 1 deletion view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ func TestView_WaitRunning(t *testing.T) {
view, _, ctrl := createTestView(t, NewMockAutoConsumer(t, DefaultConfig()))
defer ctrl.Finish()

view.state = newViewSignal()
view.state = newViewSignal().SetState(State(ViewStateRunning))

var isRunning bool
select {
Expand Down

0 comments on commit bd5bf97

Please sign in to comment.