From f33c894c3acae8648e05377370c0554f134cb212 Mon Sep 17 00:00:00 2001 From: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> Date: Mon, 20 May 2024 11:45:47 +0200 Subject: [PATCH] Receive: fix serverAsClient.Series goroutines leak (#6948) * fix serverAsClient goroutines leak Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * fix lint Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * update changelog Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * delete invalid comment Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * remove temp dev test Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> * remove timer channel drain Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> --------- Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com> --- CHANGELOG.md | 20 +++++ pkg/store/bucket.go | 2 - pkg/store/proxy_heap.go | 139 +++++++++++++-------------------- pkg/store/storepb/inprocess.go | 10 +-- 4 files changed, 81 insertions(+), 90 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a43c4e9829..9f60f4820f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,26 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Fixed +- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup +- [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants. +- [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity. +- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api. + +### Added + +- [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration. +- [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs. + +### Changed + +- [#7334](https://github.com/thanos-io/thanos/pull/7334) Compactor: do not vertically compact downsampled blocks. Such cases are now marked with `no-compact-mark.json`. Fixes panic `panic: unexpected seriesToChunkEncoder lack of iterations`. + +### Removed + +## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024 + +### Fixed + - [#7083](https://github.com/thanos-io/thanos/pull/7083) Store Gateway: Fix lazy expanded postings with 0 length failed to be cached. - [#7082](https://github.com/thanos-io/thanos/pull/7082) Stores: fix label values edge case when requesting external label values with matchers - [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early diff --git a/pkg/store/bucket.go b/pkg/store/bucket.go index 7bd1647f66..3573431270 100644 --- a/pkg/store/bucket.go +++ b/pkg/store/bucket.go @@ -1538,7 +1538,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store var resp respSet if s.sortingStrategy == sortingStrategyStore { resp = newEagerRespSet( - srv.Context(), span, 10*time.Minute, blk.meta.ULID.String(), @@ -1552,7 +1551,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store ) } else { resp = newLazyRespSet( - srv.Context(), span, 10*time.Minute, blk.meta.ULID.String(), diff --git a/pkg/store/proxy_heap.go b/pkg/store/proxy_heap.go index 51631b388a..376e7ca448 100644 --- a/pkg/store/proxy_heap.go +++ b/pkg/store/proxy_heap.go @@ -273,13 +273,11 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} { type lazyRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient closeSeries context.CancelFunc storeName string storeLabelSets []labels.Labels storeLabels map[string]struct{} frameTimeout time.Duration - ctx context.Context // Internal bookkeeping. dataOrFinishEvent *sync.Cond @@ -356,7 +354,6 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse { } func newLazyRespSet( - ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, @@ -373,12 +370,10 @@ func newLazyRespSet( respSet := &lazyRespSet{ frameTimeout: frameTimeout, - cl: cl, storeName: storeName, storeLabelSets: storeLabelSets, closeSeries: closeSeries, span: span, - ctx: ctx, dataOrFinishEvent: dataAvailable, bufferedResponsesMtx: bufferedResponsesMtx, bufferedResponses: bufferedResponses, @@ -415,19 +410,9 @@ func newLazyRespSet( defer t.Reset(frameTimeout) } - select { - case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st) - l.span.SetTag("err", err.Error()) + resp, err := cl.Recv() - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - default: - resp, err := cl.Recv() + if err != nil { if err == io.EOF { l.bufferedResponsesMtx.Lock() l.noMoreData = true @@ -436,45 +421,43 @@ func newLazyRespSet( return false } - if err != nil { - // TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled. - var rerr error - if t != nil && !t.Stop() && errors.Is(err, context.Canceled) { - // Most likely the per-Recv timeout has been reached. - // There's a small race between canceling and the Recv() - // but this is most likely true. + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + if errors.Is(err, context.Canceled) { + // The per-Recv timeout has been reached. rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st) - } else { - rerr = errors.Wrapf(err, "receive series from %s", st) } - - l.span.SetTag("err", rerr.Error()) - - l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.noMoreData = true - l.dataOrFinishEvent.Signal() - l.bufferedResponsesMtx.Unlock() - return false - } - - numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true + } else { + rerr = errors.Wrapf(err, "receive series from %s", st) } - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) - } + l.span.SetTag("err", rerr.Error()) l.bufferedResponsesMtx.Lock() - l.bufferedResponses = append(l.bufferedResponses, resp) + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + l.noMoreData = true l.dataOrFinishEvent.Signal() l.bufferedResponsesMtx.Unlock() + return false + } + + numResponses++ + bytesProcessed += resp.Size() + + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } + + if resp.GetSeries() != nil { + seriesStats.Count(resp.GetSeries()) + } + + l.bufferedResponsesMtx.Lock() + l.bufferedResponses = append(l.bufferedResponses, resp) + l.dataOrFinishEvent.Signal() + l.bufferedResponsesMtx.Unlock() + return true } var t *time.Timer @@ -571,7 +554,6 @@ func newAsyncRespSet( switch retrievalStrategy { case LazyRetrieval: return newLazyRespSet( - seriesCtx, span, frameTimeout, st.String(), @@ -584,7 +566,6 @@ func newAsyncRespSet( ), nil case EagerRetrieval: return newEagerRespSet( - seriesCtx, span, frameTimeout, st.String(), @@ -618,8 +599,6 @@ func (l *lazyRespSet) Close() { type eagerRespSet struct { // Generic parameters. span opentracing.Span - cl storepb.Store_SeriesClient - ctx context.Context closeSeries context.CancelFunc frameTimeout time.Duration @@ -638,7 +617,6 @@ type eagerRespSet struct { } func newEagerRespSet( - ctx context.Context, span opentracing.Span, frameTimeout time.Duration, storeName string, @@ -653,9 +631,7 @@ func newEagerRespSet( ret := &eagerRespSet{ span: span, closeSeries: closeSeries, - cl: cl, frameTimeout: frameTimeout, - ctx: ctx, bufferedResponses: []*storepb.SeriesResponse{}, wg: &sync.WaitGroup{}, shardMatcher: shardMatcher, @@ -700,48 +676,45 @@ func newEagerRespSet( defer t.Reset(frameTimeout) } - select { - case <-l.ctx.Done(): - err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName) - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err)) - l.span.SetTag("err", err.Error()) - return false - default: - resp, err := cl.Recv() + resp, err := cl.Recv() + + if err != nil { if err == io.EOF { return false } - if err != nil { - // TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled. - var rerr error - if t != nil && !t.Stop() && errors.Is(err, context.Canceled) { - // Most likely the per-Recv timeout has been reached. - // There's a small race between canceling and the Recv() - // but this is most likely true. + + var rerr error + // If timer is already stopped + if t != nil && !t.Stop() { + <-t.C // Drain the channel if it was already stopped. + if errors.Is(err, context.Canceled) { + // The per-Recv timeout has been reached. rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName) - } else { - rerr = errors.Wrapf(err, "receive series from %s", storeName) } - l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) - l.span.SetTag("err", rerr.Error()) - return false + } else { + rerr = errors.Wrapf(err, "receive series from %s", storeName) } - numResponses++ - bytesProcessed += resp.Size() - - if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { - return true - } + l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr)) + l.span.SetTag("err", rerr.Error()) + return false + } - if resp.GetSeries() != nil { - seriesStats.Count(resp.GetSeries()) - } + numResponses++ + bytesProcessed += resp.Size() - l.bufferedResponses = append(l.bufferedResponses, resp) + if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) { return true } + + if resp.GetSeries() != nil { + seriesStats.Count(resp.GetSeries()) + } + + l.bufferedResponses = append(l.bufferedResponses, resp) + return true } + var t *time.Timer if frameTimeout > 0 { t = time.AfterFunc(frameTimeout, closeSeries) diff --git a/pkg/store/storepb/inprocess.go b/pkg/store/storepb/inprocess.go index 7d2b8c7f61..e8ba1df461 100644 --- a/pkg/store/storepb/inprocess.go +++ b/pkg/store/storepb/inprocess.go @@ -37,7 +37,9 @@ func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc inSrv := &inProcessStream{recv: make(chan *SeriesResponse, s.clientReceiveBufferSize), err: make(chan error)} inSrv.ctx, inSrv.cancel = context.WithCancel(ctx) go func() { - inSrv.err <- s.srv.Series(in, inSrv) + if err := s.srv.Series(in, inSrv); err != nil { + inSrv.err <- err + } close(inSrv.err) close(inSrv.recv) }() @@ -89,15 +91,13 @@ func (s *inProcessClientStream) CloseSend() error { func (s *inProcessClientStream) Recv() (*SeriesResponse, error) { select { - case <-s.srv.ctx.Done(): - return nil, s.srv.ctx.Err() case r, ok := <-s.srv.recv: if !ok { return nil, io.EOF } return r, nil - case err := <-s.srv.err: - if err == nil { + case err, ok := <-s.srv.err: + if !ok { return nil, io.EOF } return nil, err