Skip to content

Commit

Permalink
receive: Fixed leak on receive and querier proxying Store API Series,…
Browse files Browse the repository at this point in the history
… which was leaking on errors. (#2866)

* receive: Fixed leak on receive and querier proxying Store API Series, which was leaking on errors.

Fixes: #2823

TestTenantSeriesSetServert_NotLeakingIfNotExhausted was showing leaks:

```
    TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:132: leaktest: timed out checking goroutines
    TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]:
        github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Send(0xc000708360, 0xc0003104c0, 0x0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:141 +0x13e
        github.com/thanos-io/thanos/pkg/store.(*mockedStoreServer).Series(0xc0004e6330, 0xc0007083c0, 0x20ac2c0, 0xc000708360, 0x5116a0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:173 +0x76
        github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series.func1(0x2097760, 0xc00003c940)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:121 +0x56
        github.com/thanos-io/thanos/pkg/tracing.DoInSpan(0x2097760, 0xc00003c940, 0x1c8bace, 0x17, 0xc000173760, 0x0, 0x0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/tracing/tracing.go:72 +0xcc
        github.com/thanos-io/thanos/pkg/store.(*tenantSeriesSetServer).Series(0xc000708360, 0x20983e0, 0xc0004e6330, 0xc0007083c0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:120 +0xfa
        github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2.1(0xc000708360, 0xc0004e6330)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:225 +0x62
        created by github.com/thanos-io/thanos/pkg/store.TestTenantSeriesSetServert_NotLeakingIfNotExhausted.func2
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb_test.go:224 +0x618
    --- FAIL: TestTenantSeriesSetServert_NotLeakingIfNotExhausted/cancelled,_not_exhausted_StoreSet (10.03s)
FAIL

Process finished with exit code 1

```

TestMultiTSDBStore_NotLeakingOnPrematureFinish was showing:

```
TestMultiTSDBStore_NotLeakingOnSendError: leaktest.go:150: leaktest: leaked goroutine: goroutine 84 [chan send]:
        github.com/thanos-io/thanos/pkg/store.ctxRespSender.send(...)
        	/home/bwplotka/Repos/thanos/pkg/store/proxy.go:198
        github.com/thanos-io/thanos/pkg/store.(*MultiTSDBStore).Series.func1(0x0, 0x0)
        	/home/bwplotka/Repos/thanos/pkg/store/multitsdb.go:214 +0x5cf
        golang.org/x/sync/errgroup.(*Group).Go.func1(0xc0002708d0, 0xc000416380)
        	/home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/sync@v0.0.0-20200317015054-43a5402ce75a/errgroup/errgroup.go:57 +0x59
        created by golang.org/x/sync/errgroup.(*Group).Go
        	/home/bwplotka/Repos/thanosgopath/pkg/mod/golang.org/x/sync@v0.0.0-20200317015054-43a5402ce75a/errgroup/errgroup.go:54 +0x66
--- FAIL: TestMultiTSDBStore_NotLeakingOnSendError (10.02s)
```

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Quick fix for leaks.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Fixed issues found by lint.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Get back copying.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

* Lint

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed Jul 9, 2020
1 parent cdac8a1 commit 60ede4c
Show file tree
Hide file tree
Showing 6 changed files with 293 additions and 63 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel
- [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers.
- [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix`
- [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase
- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and qwuerier Store API Series, which were leaking on errors.

### Changed

Expand Down
5 changes: 3 additions & 2 deletions pkg/receive/multitsdb.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/shipper"
"github.com/thanos-io/thanos/pkg/store"
"github.com/thanos-io/thanos/pkg/store/storepb"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error {
return merr.Err()
}

func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore {
func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer {
t.mtx.RLock()
defer t.mtx.RUnlock()

res := make(map[string]*store.TSDBStore, len(t.tenants))
res := make(map[string]storepb.StoreServer, len(t.tenants))
for k, tenant := range t.tenants {
s := tenant.store()
if s != nil {
Expand Down
92 changes: 54 additions & 38 deletions pkg/store/multitsdb.go
Expand Up @@ -24,14 +24,15 @@ import (
)

// MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances.
// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864
type MultiTSDBStore struct {
logger log.Logger
component component.SourceStoreAPI
tsdbStores func() map[string]*TSDBStore
tsdbStores func() map[string]storepb.StoreServer
}

// NewMultiTSDBStore creates a new MultiTSDBStore.
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore {
func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down Expand Up @@ -89,59 +90,70 @@ type tenantSeriesSetServer struct {

ctx context.Context

warnCh warnSender
recv chan *storepb.Series
cur *storepb.Series
directCh directSender
recv chan *storepb.Series
cur *storepb.Series

err error
tenant string
}

// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality.
// Details https://github.com/thanos-io/thanos/issues/2864.
func newTenantSeriesSetServer(
ctx context.Context,
tenant string,
warnCh warnSender,
directCh directSender,
) *tenantSeriesSetServer {
return &tenantSeriesSetServer{
ctx: ctx,
tenant: tenant,
warnCh: warnCh,
recv: make(chan *storepb.Series),
ctx: ctx,
tenant: tenant,
directCh: directCh,
recv: make(chan *storepb.Series),
}
}

func (s *tenantSeriesSetServer) Context() context.Context {
return s.ctx
}
func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx }

func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) {
func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) {
var err error
tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) {
err = store.Series(r, s)
})

if err != nil {
if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT {
s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant)
} else {
// Consistently prefix tenant specific warnings as done in various other places.
err = errors.New(prefixTenantWarning(s.tenant, err.Error()))
s.warnCh.send(storepb.NewWarnSeriesResponse(err))
s.directCh.send(storepb.NewWarnSeriesResponse(err))
}
}

close(s.recv)
}

func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error {
series := r.GetSeries()
if series == nil {
// Proxy non series responses directly to client
s.directCh.send(r)
return nil
}

// TODO(bwplotka): Consider avoid copying / learn why it has to copied.
chunks := make([]storepb.AggrChunk, len(series.Chunks))
copy(chunks, series.Chunks)
s.recv <- &storepb.Series{

// For series, pass it to our AggChunkSeriesSet.
select {
case <-s.ctx.Done():
return s.ctx.Err()
case s.recv <- &storepb.Series{
Labels: series.Labels,
Chunks: chunks,
}:
return nil
}
return nil
}

func (s *tenantSeriesSetServer) Next() (ok bool) {
Expand All @@ -156,37 +168,39 @@ func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) {
return s.cur.Labels, s.cur.Chunks
}

func (s *tenantSeriesSetServer) Err() error {
return s.err
}
func (s *tenantSeriesSetServer) Err() error { return s.err }

// Series returns all series for a requested time range and label matcher. The
// returned data may exceed the requested time bounds. The data returned may
// have been read and merged from multiple underlying TSDBStore instances.
func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
span, ctx := tracing.StartSpan(srv.Context(), "multitsdb_series")
defer span.Finish()

stores := s.tsdbStores()
if len(stores) == 0 {
return nil
}

var (
g, gctx = errgroup.WithContext(srv.Context())
span, ctx = tracing.StartSpan(gctx, "multitsdb_series")
// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respRecv, closeFn = newRespCh(gctx, 10)
)
defer span.Finish()
g, gctx := errgroup.WithContext(ctx)

// Allow to buffer max 10 series response.
// Each might be quite large (multi chunk long series given by sidecar).
respSender, respCh := newCancelableRespChannel(gctx, 10)

g.Go(func() error {
// This go routine is responsible for calling store's Series concurrently. Merged results
// are passed to respCh and sent concurrently to client (if buffer of 10 have room).
// When this go routine finishes or is canceled, respCh channel is closed.

var (
seriesSet []storepb.SeriesSet
wg = &sync.WaitGroup{}
)

defer func() {
wg.Wait()
closeFn()
close(respCh)
}()

for tenant, store := range stores {
Expand All @@ -202,7 +216,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
defer wg.Done()
ss.Series(store, r)
}()

seriesSet = append(seriesSet, ss)
}

Expand All @@ -214,13 +227,16 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri
}
return mergedSet.Err()
})

for resp := range respRecv {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
g.Go(func() error {
// Go routine for gathering merged responses and sending them over to client. It stops when
// respCh channel is closed OR on error from client.
for resp := range respCh {
if err := srv.Send(resp); err != nil {
return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error())
}
}
}

return nil
})
return g.Wait()
}

Expand Down
140 changes: 138 additions & 2 deletions pkg/store/multitsdb_test.go
Expand Up @@ -4,15 +4,19 @@
package store

import (
"context"
"fmt"
"io/ioutil"
"math"
"math/rand"
"os"
"path/filepath"
"testing"
"time"

"github.com/fortytw2/leaktest"
"github.com/go-kit/kit/log"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/store/storepb"
Expand All @@ -21,6 +25,8 @@ import (
)

func TestMultiTSDBSeries(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

tb := testutil.NewTB(t)
storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) {
if ok := t.Run("headOnly", func(t testutil.TB) {
Expand Down Expand Up @@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)}
}

tsdbs := map[string]*TSDBStore{}
tsdbs := map[string]storepb.StoreServer{}
for i, db := range dbs {
tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger, maxSamplesPerChunk: 120} // On production we have math.MaxInt64
}

store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs })
store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs })

var expected []storepb.Series
lastLabels := storepb.Series{}
Expand Down Expand Up @@ -154,3 +160,133 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB
},
)
}

type mockedStoreServer struct {
storepb.StoreServer

responses []*storepb.SeriesResponse
}

func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error {
for _, r := range m.responses {
if err := server.Send(r); err != nil {
return err
}
}
return nil
}

// Regression test against https://github.com/thanos-io/thanos/issues/2823.
func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) {
t.Run("exhausted StoreSet", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

s := newTenantSeriesSetServer(context.Background(), "a", nil)

resps := []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}

m := &mockedStoreServer{responses: resps}

go func() {
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
}()

testutil.Ok(t, s.Err())
i := 0
for s.Next() {
l, c := s.At()

testutil.Equals(t, resps[i].GetSeries().Labels, l)
testutil.Equals(t, resps[i].GetSeries().Chunks, c)

i++
}
testutil.Ok(t, s.Err())
testutil.Equals(t, 3, i)
})

t.Run("canceled, not exhausted StoreSet", func(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

ctx, cancel := context.WithCancel(context.Background())
s := newTenantSeriesSetServer(ctx, "a", nil)

m := &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}}
go func() {
s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT})
}()

testutil.Ok(t, s.Err())
testutil.Equals(t, true, s.Next())
cancel()
})
}

type mockedSeriesServer struct {
storepb.Store_SeriesServer
ctx context.Context

send func(*storepb.SeriesResponse) error
}

func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error {
return s.send(r)
}
func (s *mockedSeriesServer) Context() context.Context { return s.ctx }

// Regression test against https://github.com/thanos-io/thanos/issues/2823.
// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted.
func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) {
defer leaktest.CheckTimeout(t, 10*time.Second)()

m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer {
return map[string]storepb.StoreServer{
// Ensure more than 10 (internal respCh channel).
"a": &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}},
"b": &mockedStoreServer{responses: []*storepb.SeriesResponse{
storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}),
storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}),
}},
}
})

t.Run("failing send", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
// We mimic failing series server, but practically context cancel will do the same.
testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{
ctx: ctx,
send: func(*storepb.SeriesResponse) error {
cancel()
return ctx.Err()
},
}))
testutil.NotOk(t, ctx.Err())
})
}

0 comments on commit 60ede4c

Please sign in to comment.