Skip to content

Commit

Permalink
Receive: fix serverAsClient.Series goroutines leak (thanos-io#6948)
Browse files Browse the repository at this point in the history
* fix serverAsClient goroutines leak

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* fix lint

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* update changelog

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* delete invalid comment

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* remove temp dev test

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

* remove timer channel drain

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>

---------

Signed-off-by: Thibault Mange <22740367+thibaultmg@users.noreply.github.com>
  • Loading branch information
thibaultmg authored and GiedriusS committed May 22, 2024
1 parent 99a2436 commit f33c894
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 90 deletions.
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re

### Fixed

- [#7323](https://github.com/thanos-io/thanos/pull/7323) Sidecar: wait for prometheus on startup
- [#7326](https://github.com/thanos-io/thanos/pull/7326) Query: fixing exemplars proxy when querying stores with multiple tenants.
- [#7335](https://github.com/thanos-io/thanos/pull/7335) Dependency: Update minio-go to v7.0.70 which includes support for EKS Pod Identity.
- [#6948](https://github.com/thanos-io/thanos/pull/6948) Receive: fix goroutines leak during series requests to thanos store api.

### Added

- [#7317](https://github.com/thanos-io/thanos/pull/7317) Tracing: allow specifying resource attributes for the OTLP configuration.
- [#7367](https://github.com/thanos-io/thanos/pull/7367) Store Gateway: log request ID in request logs.

### Changed

- [#7334](https://github.com/thanos-io/thanos/pull/7334) Compactor: do not vertically compact downsampled blocks. Such cases are now marked with `no-compact-mark.json`. Fixes panic `panic: unexpected seriesToChunkEncoder lack of iterations`.

### Removed

## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024

### Fixed

- [#7083](https://github.com/thanos-io/thanos/pull/7083) Store Gateway: Fix lazy expanded postings with 0 length failed to be cached.
- [#7082](https://github.com/thanos-io/thanos/pull/7082) Stores: fix label values edge case when requesting external label values with matchers
- [#7080](https://github.com/thanos-io/thanos/pull/7080) Receive: race condition in handler Close() when stopped early
Expand Down
2 changes: 0 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
var resp respSet
if s.sortingStrategy == sortingStrategyStore {
resp = newEagerRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand All @@ -1552,7 +1551,6 @@ func (s *BucketStore) Series(req *storepb.SeriesRequest, seriesSrv storepb.Store
)
} else {
resp = newLazyRespSet(
srv.Context(),
span,
10*time.Minute,
blk.meta.ULID.String(),
Expand Down
139 changes: 56 additions & 83 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,11 @@ func (l *lazyRespSet) StoreLabels() map[string]struct{} {
type lazyRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
closeSeries context.CancelFunc
storeName string
storeLabelSets []labels.Labels
storeLabels map[string]struct{}
frameTimeout time.Duration
ctx context.Context

// Internal bookkeeping.
dataOrFinishEvent *sync.Cond
Expand Down Expand Up @@ -356,7 +354,6 @@ func (l *lazyRespSet) At() *storepb.SeriesResponse {
}

func newLazyRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
Expand All @@ -373,12 +370,10 @@ func newLazyRespSet(

respSet := &lazyRespSet{
frameTimeout: frameTimeout,
cl: cl,
storeName: storeName,
storeLabelSets: storeLabelSets,
closeSeries: closeSeries,
span: span,
ctx: ctx,
dataOrFinishEvent: dataAvailable,
bufferedResponsesMtx: bufferedResponsesMtx,
bufferedResponses: bufferedResponses,
Expand Down Expand Up @@ -415,19 +410,9 @@ func newLazyRespSet(
defer t.Reset(frameTimeout)
}

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", st)
l.span.SetTag("err", err.Error())
resp, err := cl.Recv()

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
default:
resp, err := cl.Recv()
if err != nil {
if err == io.EOF {
l.bufferedResponsesMtx.Lock()
l.noMoreData = true
Expand All @@ -436,45 +421,43 @@ func newLazyRespSet(
return false
}

if err != nil {
// TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled.
var rerr error
if t != nil && !t.Stop() && errors.Is(err, context.Canceled) {
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.
var rerr error
// If timer is already stopped
if t != nil && !t.Stop() {
if errors.Is(err, context.Canceled) {
// The per-Recv timeout has been reached.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, st)
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}

l.span.SetTag("err", rerr.Error())

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
} else {
rerr = errors.Wrapf(err, "receive series from %s", st)
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}
l.span.SetTag("err", rerr.Error())

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.noMoreData = true
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return false
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}

l.bufferedResponsesMtx.Lock()
l.bufferedResponses = append(l.bufferedResponses, resp)
l.dataOrFinishEvent.Signal()
l.bufferedResponsesMtx.Unlock()
return true
}

var t *time.Timer
Expand Down Expand Up @@ -571,7 +554,6 @@ func newAsyncRespSet(
switch retrievalStrategy {
case LazyRetrieval:
return newLazyRespSet(
seriesCtx,
span,
frameTimeout,
st.String(),
Expand All @@ -584,7 +566,6 @@ func newAsyncRespSet(
), nil
case EagerRetrieval:
return newEagerRespSet(
seriesCtx,
span,
frameTimeout,
st.String(),
Expand Down Expand Up @@ -618,8 +599,6 @@ func (l *lazyRespSet) Close() {
type eagerRespSet struct {
// Generic parameters.
span opentracing.Span
cl storepb.Store_SeriesClient
ctx context.Context

closeSeries context.CancelFunc
frameTimeout time.Duration
Expand All @@ -638,7 +617,6 @@ type eagerRespSet struct {
}

func newEagerRespSet(
ctx context.Context,
span opentracing.Span,
frameTimeout time.Duration,
storeName string,
Expand All @@ -653,9 +631,7 @@ func newEagerRespSet(
ret := &eagerRespSet{
span: span,
closeSeries: closeSeries,
cl: cl,
frameTimeout: frameTimeout,
ctx: ctx,
bufferedResponses: []*storepb.SeriesResponse{},
wg: &sync.WaitGroup{},
shardMatcher: shardMatcher,
Expand Down Expand Up @@ -700,48 +676,45 @@ func newEagerRespSet(
defer t.Reset(frameTimeout)
}

select {
case <-l.ctx.Done():
err := errors.Wrapf(l.ctx.Err(), "failed to receive any data from %s", storeName)
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(err))
l.span.SetTag("err", err.Error())
return false
default:
resp, err := cl.Recv()
resp, err := cl.Recv()

if err != nil {
if err == io.EOF {
return false
}
if err != nil {
// TODO(bwplotka): Return early on error. Don't wait of dedup, merge and sort if partial response is disabled.
var rerr error
if t != nil && !t.Stop() && errors.Is(err, context.Canceled) {
// Most likely the per-Recv timeout has been reached.
// There's a small race between canceling and the Recv()
// but this is most likely true.

var rerr error
// If timer is already stopped
if t != nil && !t.Stop() {
<-t.C // Drain the channel if it was already stopped.
if errors.Is(err, context.Canceled) {
// The per-Recv timeout has been reached.
rerr = errors.Wrapf(err, "failed to receive any data in %s from %s", l.frameTimeout, storeName)
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
return false
} else {
rerr = errors.Wrapf(err, "receive series from %s", storeName)
}

numResponses++
bytesProcessed += resp.Size()

if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}
l.bufferedResponses = append(l.bufferedResponses, storepb.NewWarnSeriesResponse(rerr))
l.span.SetTag("err", rerr.Error())
return false
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}
numResponses++
bytesProcessed += resp.Size()

l.bufferedResponses = append(l.bufferedResponses, resp)
if resp.GetSeries() != nil && applySharding && !shardMatcher.MatchesZLabels(resp.GetSeries().Labels) {
return true
}

if resp.GetSeries() != nil {
seriesStats.Count(resp.GetSeries())
}

l.bufferedResponses = append(l.bufferedResponses, resp)
return true
}

var t *time.Timer
if frameTimeout > 0 {
t = time.AfterFunc(frameTimeout, closeSeries)
Expand Down
10 changes: 5 additions & 5 deletions pkg/store/storepb/inprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc
inSrv := &inProcessStream{recv: make(chan *SeriesResponse, s.clientReceiveBufferSize), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
inSrv.err <- s.srv.Series(in, inSrv)
if err := s.srv.Series(in, inSrv); err != nil {
inSrv.err <- err
}
close(inSrv.err)
close(inSrv.recv)
}()
Expand Down Expand Up @@ -89,15 +91,13 @@ func (s *inProcessClientStream) CloseSend() error {

func (s *inProcessClientStream) Recv() (*SeriesResponse, error) {
select {
case <-s.srv.ctx.Done():
return nil, s.srv.ctx.Err()
case r, ok := <-s.srv.recv:
if !ok {
return nil, io.EOF
}
return r, nil
case err := <-s.srv.err:
if err == nil {
case err, ok := <-s.srv.err:
if !ok {
return nil, io.EOF
}
return nil, err
Expand Down

0 comments on commit f33c894

Please sign in to comment.