diff --git a/CHANGELOG.md b/CHANGELOG.md index 712aee4ba6..b63212c5b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ We use *breaking* word for marking changes that are not backward compatible (rel - [#2665](https://github.com/thanos-io/thanos/pull/2665) Swift: fix issue with missing Content-Type HTTP headers. - [#2800](https://github.com/thanos-io/thanos/pull/2800) Query: Fix handling of `--web.external-prefix` and `--web.route-prefix` - [#2834](https://github.com/thanos-io/thanos/pull/2834) Query: Fix rendered JSON state value for rules and alerts should be in lowercase +- [#2866](https://github.com/thanos-io/thanos/pull/2866) Receive, Querier: Fixed leaks on receive and qwuerier Store API Series, which were leaking on errors. ### Changed diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 57ce6cb7b4..7708a6e9e4 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -23,6 +23,7 @@ import ( "github.com/thanos-io/thanos/pkg/objstore" "github.com/thanos-io/thanos/pkg/shipper" "github.com/thanos-io/thanos/pkg/store" + "github.com/thanos-io/thanos/pkg/store/storepb" "golang.org/x/sync/errgroup" ) @@ -211,11 +212,11 @@ func (t *MultiTSDB) Sync(ctx context.Context) error { return merr.Err() } -func (t *MultiTSDB) TSDBStores() map[string]*store.TSDBStore { +func (t *MultiTSDB) TSDBStores() map[string]storepb.StoreServer { t.mtx.RLock() defer t.mtx.RUnlock() - res := make(map[string]*store.TSDBStore, len(t.tenants)) + res := make(map[string]storepb.StoreServer, len(t.tenants)) for k, tenant := range t.tenants { s := tenant.store() if s != nil { diff --git a/pkg/store/multitsdb.go b/pkg/store/multitsdb.go index c8dd3c05a8..4c6f43ce3a 100644 --- a/pkg/store/multitsdb.go +++ b/pkg/store/multitsdb.go @@ -24,14 +24,15 @@ import ( ) // MultiTSDBStore implements the Store interface backed by multiple TSDBStore instances. +// TODO(bwplotka): Remove this and use Proxy instead. Details: https://github.com/thanos-io/thanos/issues/2864 type MultiTSDBStore struct { logger log.Logger component component.SourceStoreAPI - tsdbStores func() map[string]*TSDBStore + tsdbStores func() map[string]storepb.StoreServer } // NewMultiTSDBStore creates a new MultiTSDBStore. -func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]*TSDBStore) *MultiTSDBStore { +func NewMultiTSDBStore(logger log.Logger, _ prometheus.Registerer, component component.SourceStoreAPI, tsdbStores func() map[string]storepb.StoreServer) *MultiTSDBStore { if logger == nil { logger = log.NewNopLogger() } @@ -89,59 +90,70 @@ type tenantSeriesSetServer struct { ctx context.Context - warnCh warnSender - recv chan *storepb.Series - cur *storepb.Series + directCh directSender + recv chan *storepb.Series + cur *storepb.Series err error tenant string } +// TODO(bwplotka): Remove tenant awareness; keep it simple with single functionality. +// Details https://github.com/thanos-io/thanos/issues/2864. func newTenantSeriesSetServer( ctx context.Context, tenant string, - warnCh warnSender, + directCh directSender, ) *tenantSeriesSetServer { return &tenantSeriesSetServer{ - ctx: ctx, - tenant: tenant, - warnCh: warnCh, - recv: make(chan *storepb.Series), + ctx: ctx, + tenant: tenant, + directCh: directCh, + recv: make(chan *storepb.Series), } } -func (s *tenantSeriesSetServer) Context() context.Context { - return s.ctx -} +func (s *tenantSeriesSetServer) Context() context.Context { return s.ctx } -func (s *tenantSeriesSetServer) Series(store *TSDBStore, r *storepb.SeriesRequest) { +func (s *tenantSeriesSetServer) Series(store storepb.StoreServer, r *storepb.SeriesRequest) { var err error tracing.DoInSpan(s.ctx, "multitsdb_tenant_series", func(_ context.Context) { err = store.Series(r, s) }) - if err != nil { if r.PartialResponseDisabled || r.PartialResponseStrategy == storepb.PartialResponseStrategy_ABORT { s.err = errors.Wrapf(err, "get series for tenant %s", s.tenant) } else { // Consistently prefix tenant specific warnings as done in various other places. err = errors.New(prefixTenantWarning(s.tenant, err.Error())) - s.warnCh.send(storepb.NewWarnSeriesResponse(err)) + s.directCh.send(storepb.NewWarnSeriesResponse(err)) } } - close(s.recv) } func (s *tenantSeriesSetServer) Send(r *storepb.SeriesResponse) error { series := r.GetSeries() + if series == nil { + // Proxy non series responses directly to client + s.directCh.send(r) + return nil + } + + // TODO(bwplotka): Consider avoid copying / learn why it has to copied. chunks := make([]storepb.AggrChunk, len(series.Chunks)) copy(chunks, series.Chunks) - s.recv <- &storepb.Series{ + + // For series, pass it to our AggChunkSeriesSet. + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case s.recv <- &storepb.Series{ Labels: series.Labels, Chunks: chunks, + }: + return nil } - return nil } func (s *tenantSeriesSetServer) Next() (ok bool) { @@ -156,29 +168,31 @@ func (s *tenantSeriesSetServer) At() ([]storepb.Label, []storepb.AggrChunk) { return s.cur.Labels, s.cur.Chunks } -func (s *tenantSeriesSetServer) Err() error { - return s.err -} +func (s *tenantSeriesSetServer) Err() error { return s.err } // Series returns all series for a requested time range and label matcher. The // returned data may exceed the requested time bounds. The data returned may // have been read and merged from multiple underlying TSDBStore instances. func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + span, ctx := tracing.StartSpan(srv.Context(), "multitsdb_series") + defer span.Finish() + stores := s.tsdbStores() if len(stores) == 0 { return nil } - var ( - g, gctx = errgroup.WithContext(srv.Context()) - span, ctx = tracing.StartSpan(gctx, "multitsdb_series") - // Allow to buffer max 10 series response. - // Each might be quite large (multi chunk long series given by sidecar). - respSender, respRecv, closeFn = newRespCh(gctx, 10) - ) - defer span.Finish() + g, gctx := errgroup.WithContext(ctx) + + // Allow to buffer max 10 series response. + // Each might be quite large (multi chunk long series given by sidecar). + respSender, respCh := newCancelableRespChannel(gctx, 10) g.Go(func() error { + // This go routine is responsible for calling store's Series concurrently. Merged results + // are passed to respCh and sent concurrently to client (if buffer of 10 have room). + // When this go routine finishes or is canceled, respCh channel is closed. + var ( seriesSet []storepb.SeriesSet wg = &sync.WaitGroup{} @@ -186,7 +200,7 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri defer func() { wg.Wait() - closeFn() + close(respCh) }() for tenant, store := range stores { @@ -202,7 +216,6 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri defer wg.Done() ss.Series(store, r) }() - seriesSet = append(seriesSet, ss) } @@ -214,13 +227,16 @@ func (s *MultiTSDBStore) Series(r *storepb.SeriesRequest, srv storepb.Store_Seri } return mergedSet.Err() }) - - for resp := range respRecv { - if err := srv.Send(resp); err != nil { - return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + g.Go(func() error { + // Go routine for gathering merged responses and sending them over to client. It stops when + // respCh channel is closed OR on error from client. + for resp := range respCh { + if err := srv.Send(resp); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + } } - } - + return nil + }) return g.Wait() } diff --git a/pkg/store/multitsdb_test.go b/pkg/store/multitsdb_test.go index a22a3af50d..e0d18d06bc 100644 --- a/pkg/store/multitsdb_test.go +++ b/pkg/store/multitsdb_test.go @@ -4,6 +4,7 @@ package store import ( + "context" "fmt" "io/ioutil" "math" @@ -11,8 +12,11 @@ import ( "os" "path/filepath" "testing" + "time" + "github.com/fortytw2/leaktest" "github.com/go-kit/kit/log" + "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -21,6 +25,8 @@ import ( ) func TestMultiTSDBSeries(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + tb := testutil.NewTB(t) storetestutil.RunSeriesInterestingCases(tb, 200e3, 200e3, func(t testutil.TB, samplesPerSeries, series int) { if ok := t.Run("headOnly", func(t testutil.TB) { @@ -116,12 +122,12 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB dbs[j] = &mockedStartTimeDB{DBReadOnly: db, startTime: int64(j * samplesPerSeriesPerTSDB * seriesPerTSDB)} } - tsdbs := map[string]*TSDBStore{} + tsdbs := map[string]storepb.StoreServer{} for i, db := range dbs { tsdbs[fmt.Sprintf("%v", i)] = &TSDBStore{db: db, logger: logger, maxSamplesPerChunk: 120} // On production we have math.MaxInt64 } - store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]*TSDBStore { return tsdbs }) + store := NewMultiTSDBStore(logger, nil, component.Receive, func() map[string]storepb.StoreServer { return tsdbs }) var expected []storepb.Series lastLabels := storepb.Series{} @@ -154,3 +160,133 @@ func benchMultiTSDBSeries(t testutil.TB, totalSamples, totalSeries int, flushToB }, ) } + +type mockedStoreServer struct { + storepb.StoreServer + + responses []*storepb.SeriesResponse +} + +func (m *mockedStoreServer) Series(_ *storepb.SeriesRequest, server storepb.Store_SeriesServer) error { + for _, r := range m.responses { + if err := server.Send(r); err != nil { + return err + } + } + return nil +} + +// Regression test against https://github.com/thanos-io/thanos/issues/2823. +func TestTenantSeriesSetServert_NotLeakingIfNotExhausted(t *testing.T) { + t.Run("exhausted StoreSet", func(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + s := newTenantSeriesSetServer(context.Background(), "a", nil) + + resps := []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + } + + m := &mockedStoreServer{responses: resps} + + go func() { + s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}) + }() + + testutil.Ok(t, s.Err()) + i := 0 + for s.Next() { + l, c := s.At() + + testutil.Equals(t, resps[i].GetSeries().Labels, l) + testutil.Equals(t, resps[i].GetSeries().Chunks, c) + + i++ + } + testutil.Ok(t, s.Err()) + testutil.Equals(t, 3, i) + }) + + t.Run("canceled, not exhausted StoreSet", func(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + ctx, cancel := context.WithCancel(context.Background()) + s := newTenantSeriesSetServer(ctx, "a", nil) + + m := &mockedStoreServer{responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }} + go func() { + s.Series(m, &storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}) + }() + + testutil.Ok(t, s.Err()) + testutil.Equals(t, true, s.Next()) + cancel() + }) +} + +type mockedSeriesServer struct { + storepb.Store_SeriesServer + ctx context.Context + + send func(*storepb.SeriesResponse) error +} + +func (s *mockedSeriesServer) Send(r *storepb.SeriesResponse) error { + return s.send(r) +} +func (s *mockedSeriesServer) Context() context.Context { return s.ctx } + +// Regression test against https://github.com/thanos-io/thanos/issues/2823. +// This is different leak than in TestTenantSeriesSetServert_NotLeakingIfNotExhausted. +func TestMultiTSDBStore_NotLeakingOnPrematureFinish(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + m := NewMultiTSDBStore(log.NewNopLogger(), nil, component.Receive, func() map[string]storepb.StoreServer { + return map[string]storepb.StoreServer{ + // Ensure more than 10 (internal respCh channel). + "a": &mockedStoreServer{responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }}, + "b": &mockedStoreServer{responses: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }}, + } + }) + + t.Run("failing send", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // We mimic failing series server, but practically context cancel will do the same. + testutil.NotOk(t, m.Series(&storepb.SeriesRequest{PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + cancel() + return ctx.Err() + }, + })) + testutil.NotOk(t, ctx.Err()) + }) +} diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index a14db1d572..67ae5a908f 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -184,18 +184,23 @@ func mergeLabels(a []storepb.Label, b labels.Labels) []storepb.Label { return res } -type ctxRespSender struct { +// cancelableRespSender is a response channel that does need to be exhausted on cancel. +type cancelableRespSender struct { ctx context.Context ch chan<- *storepb.SeriesResponse } -func newRespCh(ctx context.Context, buffer int) (*ctxRespSender, <-chan *storepb.SeriesResponse, func()) { +func newCancelableRespChannel(ctx context.Context, buffer int) (*cancelableRespSender, chan *storepb.SeriesResponse) { respCh := make(chan *storepb.SeriesResponse, buffer) - return &ctxRespSender{ctx: ctx, ch: respCh}, respCh, func() { close(respCh) } + return &cancelableRespSender{ctx: ctx, ch: respCh}, respCh } -func (s ctxRespSender) send(r *storepb.SeriesResponse) { - s.ch <- r +// send or return on cancel. +func (s cancelableRespSender) send(r *storepb.SeriesResponse) { + select { + case <-s.ctx.Done(): + case s.ch <- r: + } } // Series returns all series for a requested time range and label matcher. Requested series are taken from other @@ -213,15 +218,17 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe return status.Error(codes.InvalidArgument, errors.New("no matchers specified (excluding external labels)").Error()) } - var ( - g, gctx = errgroup.WithContext(srv.Context()) + g, gctx := errgroup.WithContext(srv.Context()) - // Allow to buffer max 10 series response. - // Each might be quite large (multi chunk long series given by sidecar). - respSender, respRecv, closeFn = newRespCh(gctx, 10) - ) + // Allow to buffer max 10 series response. + // Each might be quite large (multi chunk long series given by sidecar). + respSender, respCh := newCancelableRespChannel(gctx, 10) g.Go(func() error { + // This go routine is responsible for calling store's Series concurrently. Merged results + // are passed to respCh and sent concurrently to client (if buffer of 10 have room). + // When this go routine finishes or is canceled, respCh channel is closed. + var ( seriesSet []storepb.SeriesSet storeDebugMsgs []string @@ -239,7 +246,7 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe defer func() { wg.Wait() - closeFn() + close(respCh) }() for _, st := range s.stores() { @@ -306,21 +313,25 @@ func (s *ProxyStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesSe } return mergedSet.Err() }) - - for resp := range respRecv { - if err := srv.Send(resp); err != nil { - return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + g.Go(func() error { + // Go routine for gathering merged responses and sending them over to client. It stops when + // respCh channel is closed OR on error from client. + for resp := range respCh { + if err := srv.Send(resp); err != nil { + return status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + } } - } - + return nil + }) if err := g.Wait(); err != nil { + // TODO(bwplotka): Replace with request logger. level.Error(s.logger).Log("err", err) return err } return nil } -type warnSender interface { +type directSender interface { send(*storepb.SeriesResponse) } @@ -331,7 +342,7 @@ type streamSeriesSet struct { logger log.Logger stream storepb.Store_SeriesClient - warnCh warnSender + warnCh directSender currSeries *storepb.Series recvCh chan *storepb.Series @@ -367,7 +378,7 @@ func startStreamSeriesSet( closeSeries context.CancelFunc, wg *sync.WaitGroup, stream storepb.Store_SeriesClient, - warnCh warnSender, + warnCh directSender, name string, partialResponse bool, responseTimeout time.Duration, diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index f3d85c27f4..50019ced37 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -1653,3 +1653,68 @@ func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { }, ) } + +func TestProxyStore_NotLeakingOnPrematureFinish(t *testing.T) { + defer leaktest.CheckTimeout(t, 10*time.Second)() + + clients := []Client{ + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + // Ensure more than 10 (internal respCh channel). + storeSeriesResponse(t, labels.FromStrings("a", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("a", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }, + &testClient{ + StoreClient: &mockedStoreAPI{ + RespSeries: []*storepb.SeriesResponse{ + storeSeriesResponse(t, labels.FromStrings("b", "a"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "b"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "c"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "d"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "e"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "f"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "g"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "h"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "i"), []sample{{0, 0}, {2, 1}, {3, 2}}), + storeSeriesResponse(t, labels.FromStrings("b", "j"), []sample{{0, 0}, {2, 1}, {3, 2}}), + }, + }, + minTime: math.MinInt64, + maxTime: math.MaxInt64, + }, + } + + logger := log.NewNopLogger() + p := &ProxyStore{ + logger: logger, + stores: func() []Client { return clients }, + metrics: newProxyStoreMetrics(nil), + responseTimeout: 0, + } + + t.Run("failling send", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + // We mimic failing series server, but practically context cancel will do the same. + testutil.NotOk(t, p.Series(&storepb.SeriesRequest{Matchers: []storepb.LabelMatcher{{}}, PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT}, &mockedSeriesServer{ + ctx: ctx, + send: func(*storepb.SeriesResponse) error { + cancel() + return ctx.Err() + }, + })) + testutil.NotOk(t, ctx.Err()) + }) +}