Skip to content

Commit

Permalink
Merge pull request #5 from saswatamcode/bloom-f-t
Browse files Browse the repository at this point in the history
Feature flag & add basic unit tests for Proxy/BlockStore UpdateLabelNamesBloom()
  • Loading branch information
fpetkovski committed Jul 20, 2023
2 parents b2221dd + c84fbe2 commit 032e64b
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 12 deletions.
3 changes: 3 additions & 0 deletions pkg/bloom/bloom.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package bloom

import (
Expand Down
3 changes: 3 additions & 0 deletions pkg/info/infopb/custom.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package infopb

import (
Expand Down
14 changes: 11 additions & 3 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,7 @@ func NewBucketStore(
enableSeriesResponseHints: enableSeriesResponseHints,
enableChunkHashCalculation: enableChunkHashCalculation,
seriesBatchSize: SeriesBatchSize,
labelNamesBloom: bloom.NewAlwaysTrueFilter(),
}

for _, option := range options {
Expand Down Expand Up @@ -1647,7 +1648,6 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

func (s *BucketStore) UpdateLabelNamesBloom(ctx context.Context) error {
g, _ := errgroup.WithContext(ctx)

var mtx sync.Mutex
names := make(map[string]struct{})

Expand All @@ -1658,15 +1658,23 @@ func (s *BucketStore) UpdateLabelNamesBloom(ctx context.Context) error {
g.Go(func() error {
defer runutil.CloseWithLogOnErr(b.logger, indexr, "label names")

var result []string
res, err := indexr.block.indexHeaderReader.LabelNames()
if err != nil {
return errors.Wrapf(err, "label names for block %s", b.meta.ULID)
}

extRes := make([]string, 0, len(b.extLset))
for _, l := range b.extLset {
extRes = append(extRes, l.Name)
}

if len(res) > 0 {
mtx.Lock()
for _, n := range result {
for _, n := range res {
names[n] = struct{}{}
}

for _, n := range extRes {
names[n] = struct{}{}
}
mtx.Unlock()
Expand Down
26 changes: 26 additions & 0 deletions pkg/store/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,32 @@ func TestBucketStore_LabelNames_e2e(t *testing.T) {
})
}

func TestBucketStore_LabelNamesBloom_e2e(t *testing.T) {
objtesting.ForeachStore(t, func(t *testing.T, bkt objstore.Bucket) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

dir := t.TempDir()

s := prepareStoreWithTestBlocks(t, dir, bkt, false, NewChunksLimiterFactory(0), NewSeriesLimiterFactory(0), NewBytesLimiterFactory(0), emptyRelabelConfig, allowAllFilterConf)
s.cache.SwapWith(noopCache{})

mint, maxt := s.store.TimeRange()
testutil.Equals(t, s.minTime, mint)
testutil.Equals(t, s.maxTime, maxt)

testutil.Ok(t, s.store.UpdateLabelNamesBloom(ctx))
for _, b := range s.store.blocks {
waitTimeout(t, &b.pendingReaders, 5*time.Second)
}

filter := s.store.LabelNamesBloom()
for _, n := range []string{"a", "b", "c", "ext1", "ext2"} {
testutil.Assert(t, filter.Test(n))
}
})
}

func TestBucketStore_LabelNames_SeriesLimiter_e2e(t *testing.T) {
cases := map[string]struct {
maxSeriesLimit uint64
Expand Down
15 changes: 9 additions & 6 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool

mtx sync.Mutex
labelNamesBloom bloom.Filter
mtx sync.Mutex
labelNamesBloom bloom.Filter
internalLabelResort bool
}

type proxyStoreMetrics struct {
Expand Down Expand Up @@ -150,9 +151,11 @@ func NewProxyStore(
b := make([]byte, 0, initialBufSize)
return &b
}},
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
responseTimeout: responseTimeout,
metrics: metrics,
retrievalStrategy: retrievalStrategy,
labelNamesBloom: bloom.NewAlwaysTrueFilter(),
internalLabelResort: true,
}

for _, option := range options {
Expand Down Expand Up @@ -381,7 +384,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
storeDebugMsgs = append(storeDebugMsgs, fmt.Sprintf("store %s queried", st))
}

respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses)
respSet, err := newAsyncRespSet(srv.Context(), st, r, s.responseTimeout, s.retrievalStrategy, &s.buffers, r.ShardInfo, reqLogger, s.metrics.emptyStreamResponses, s.internalLabelResort)
if err != nil {
level.Error(reqLogger).Log("err", err)

Expand Down
9 changes: 7 additions & 2 deletions pkg/store/proxy_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,7 @@ func newAsyncRespSet(
shardInfo *storepb.ShardInfo,
logger log.Logger,
emptyStreamResponses prometheus.Counter,
internalLabelResort bool,
) (respSet, error) {

var span opentracing.Span
Expand Down Expand Up @@ -575,7 +576,7 @@ func newAsyncRespSet(
}

labelsToRemove := make(map[string]struct{})
dedupByInternalLabel := hasInternalReplicaLabels(st, req)
dedupByInternalLabel := hasInternalReplicaLabels(st, req, internalLabelResort)
if dedupByInternalLabel || !st.SupportsWithoutReplicaLabels() && len(req.WithoutReplicaLabels) > 0 {
level.Warn(logger).Log("msg", "detecting store that does not support without replica label setting. "+
"Falling back to eager retrieval with additional sort. Make sure your storeAPI supports it to speed up your queries", "store", st.String())
Expand Down Expand Up @@ -897,7 +898,11 @@ type respSet interface {

// hasInternalReplicaLabels returns true if any replica label in the series request is not an
// external label for the given Client.
func hasInternalReplicaLabels(st Client, req *storepb.SeriesRequest) bool {
func hasInternalReplicaLabels(st Client, req *storepb.SeriesRequest, internalLabelResort bool) bool {
if !internalLabelResort {
return true
}

bloom := st.LabelNamesBloom()
// Empty bloom filter capacity is 1. We fallback to eager retrieval if bloom filter
// is yet to be updated, as we cannot yet determine if the store has internal replica labels.
Expand Down
96 changes: 95 additions & 1 deletion pkg/store/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1604,7 +1604,7 @@ func TestProxyStore_LabelNamesBloom(t *testing.T) {
expectedNames []string
}{
{
title: "label_names partial response disabled",
title: "some common labels",
storeAPIs: []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
Expand All @@ -1623,6 +1623,100 @@ func TestProxyStore_LabelNamesBloom(t *testing.T) {
},
expectedNames: []string{"a", "b", "c", "d"},
},
{
title: "fully common labels",
storeAPIs: []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
},
},
expectedNames: []string{"a", "b"},
},
{
title: "no common labels",
storeAPIs: []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"c", "d"},
},
},
},
},
expectedNames: []string{"a", "b", "c", "d"},
},
{
title: "multiple stores",
storeAPIs: []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"a", "b"},
},
},
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"c", "d"},
},
},
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"e", "f"},
},
},
},
},
expectedNames: []string{"a", "b", "c", "d", "e", "f"},
},
{
title: "empty store",
storeAPIs: []Client{
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{},
},
},
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"c", "d"},
},
},
},
&storetestutil.TestClient{
StoreClient: &mockedStoreAPI{
RespLabelNames: &storepb.LabelNamesResponse{
Names: []string{"e", "f"},
},
},
},
},
expectedNames: []string{"c", "d", "e", "f"},
},
} {
if ok := t.Run(tc.title, func(t *testing.T) {
q := NewProxyStore(
Expand Down

0 comments on commit 032e64b

Please sign in to comment.