Skip to content

Commit

Permalink
Remove references to bloom
Browse files Browse the repository at this point in the history
Signed-off-by: Filip Petkovski <filip.petkovsky@gmail.com>
  • Loading branch information
fpetkovski committed Jul 27, 2023
1 parent 639e681 commit 341e874
Show file tree
Hide file tree
Showing 16 changed files with 97 additions and 100 deletions.
11 changes: 5 additions & 6 deletions cmd/thanos/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,14 +796,13 @@ func runQuery(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
mint, maxt := proxy.TimeRange()
labelNamesBloom := proxy.LabelNamesBloom()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
TsdbInfos: proxy.TSDBInfos(),
LabelNamesSet: infopb.NewCuckooFilter(labelNamesBloom),
LabelNamesIndex: infopb.NewCuckooSet(proxy.LabelNamesSet()),
}
}
return nil
Expand Down Expand Up @@ -844,16 +843,16 @@ func runQuery(
// Start bloom name filter updater.
{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic label names bloom filter update")
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names bloom filter update")
level.Debug(logger).Log("msg", "Starting label names update")

if err := proxy.UpdateLabelNamesBloom(ctx); err != nil {
if err := proxy.UpdateLabelNames(ctx); err != nil {
return err
}

level.Debug(logger).Log("msg", "Finished label names bloom filter update")
level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
Expand Down
11 changes: 5 additions & 6 deletions cmd/thanos/receive.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,13 @@ func runReceive(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
minTime, maxTime := proxy.TimeRange()
labelNamesBloom := proxy.LabelNamesBloom()
return &infopb.StoreInfo{
MinTime: minTime,
MaxTime: maxTime,
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
TsdbInfos: proxy.TSDBInfos(),
LabelNamesSet: infopb.NewCuckooFilter(labelNamesBloom),
LabelNamesIndex: infopb.NewCuckooSet(proxy.LabelNamesSet()),
}
}
return nil
Expand All @@ -368,16 +367,16 @@ func runReceive(
)

ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic label names bloom filter update")
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names bloom filter update")
level.Debug(logger).Log("msg", "Starting label names update")

if err := proxy.UpdateLabelNamesBloom(ctx); err != nil {
if err := proxy.UpdateLabelNames(ctx); err != nil {
return err
}

level.Debug(logger).Log("msg", "Finished label names bloom filter update")
level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
Expand Down
3 changes: 1 addition & 2 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,14 +630,13 @@ func runRule(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
mint, maxt := tsdbStore.TimeRange()
labelNamesBloom := tsdbStore.LabelNamesBloom()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
TsdbInfos: tsdbStore.TSDBInfos(),
LabelNamesSet: infopb.NewCuckooFilter(labelNamesBloom),
LabelNamesIndex: infopb.NewCuckooSet(tsdbStore.LabelNamesSet()),
}
}
return nil
Expand Down
11 changes: 5 additions & 6 deletions cmd/thanos/sidecar.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ func runSidecar(

g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names bloom filter update")
level.Debug(logger).Log("msg", "Starting label names update")

m.UpdateLabelNamesBloom(context.Background())
m.UpdateLabelNames(context.Background())

level.Debug(logger).Log("msg", "Finished label names bloom filter update")
level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
Expand Down Expand Up @@ -285,14 +285,13 @@ func runSidecar(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
mint, maxt := promStore.Timestamps()
labelNamesBloom := promStore.LabelNamesBloom()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
TsdbInfos: promStore.TSDBInfos(),
LabelNamesSet: infopb.NewCuckooFilter(labelNamesBloom),
LabelNamesIndex: infopb.NewCuckooSet(promStore.LabelNamesSet()),
}
}
return nil
Expand Down Expand Up @@ -465,7 +464,7 @@ func (s *promMetadata) UpdateTimestamps(mint, maxt int64) {
s.maxt = maxt
}

func (s *promMetadata) UpdateLabelNamesBloom(ctx context.Context) {
func (s *promMetadata) UpdateLabelNames(ctx context.Context) {
mint, _ := s.Timestamps()
labelNames, err := s.client.LabelNamesInGRPC(ctx, s.promURL, nil, mint, time.Now().UnixMilli())
if err != nil {
Expand Down
11 changes: 5 additions & 6 deletions cmd/thanos/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,14 +457,13 @@ func runStore(
info.WithStoreInfoFunc(func() *infopb.StoreInfo {
if httpProbe.IsReady() {
mint, maxt := bs.TimeRange()
labelNamesBloom := bs.LabelNamesBloom()
return &infopb.StoreInfo{
MinTime: mint,
MaxTime: maxt,
SupportsSharding: true,
SupportsWithoutReplicaLabels: true,
TsdbInfos: bs.TSDBInfos(),
LabelNamesSet: infopb.NewCuckooFilter(labelNamesBloom),
LabelNamesIndex: infopb.NewCuckooSet(bs.LabelNamesSet()),
}
}
return nil
Expand Down Expand Up @@ -501,16 +500,16 @@ func runStore(
// Start bloom name filter updater.
{
ctx, cancel := context.WithCancel(context.Background())
level.Debug(logger).Log("msg", "setting up periodic label names bloom filter update")
level.Debug(logger).Log("msg", "setting up periodic update for label names")
g.Add(func() error {
return runutil.Repeat(10*time.Second, ctx.Done(), func() error {
level.Debug(logger).Log("msg", "Starting label names bloom filter update")
level.Debug(logger).Log("msg", "Starting label names update")

if err := bs.UpdateLabelNamesBloom(ctx); err != nil {
if err := bs.UpdateLabelNames(ctx); err != nil {
return err
}

level.Debug(logger).Log("msg", "Finished label names bloom filter update")
level.Debug(logger).Log("msg", "Finished label names update")
return nil
})
}, func(err error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/info/infopb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/thanos-io/thanos/pkg/stringset"
)

func NewCuckooFilter(filter stringset.Set) *StringSet {
func NewCuckooSet(filter stringset.Set) *StringSet {
return &StringSet{
CuckooFilterBytes: filter.Bytes(),
}
Expand Down
105 changes: 53 additions & 52 deletions pkg/info/infopb/rpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/info/infopb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ message StoreInfo {
// TSDBInfos holds metadata for all TSDBs exposed by the store.
repeated TSDBInfo tsdb_infos = 6 [(gogoproto.nullable) = false];

StringSet label_names_set = 7;
StringSet label_names_index = 7;
}

message StringSet {
Expand Down
4 changes: 2 additions & 2 deletions pkg/query/endpointset.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,11 @@ func (er *endpointRef) LabelNamesSet() stringset.Set {
er.mtx.RLock()
defer er.mtx.RUnlock()

if er.metadata == nil || er.metadata.Store == nil || er.metadata.Store.LabelNamesSet == nil {
if er.metadata == nil || er.metadata.Store == nil || er.metadata.Store.LabelNamesIndex == nil {
return stringset.AllStringsSet()
}

return stringset.Decode(er.metadata.Store.LabelNamesSet.CuckooFilterBytes)
return stringset.Decode(er.metadata.Store.LabelNamesIndex.CuckooFilterBytes)
}

func (er *endpointRef) SupportsSharding() bool {
Expand Down
4 changes: 2 additions & 2 deletions pkg/store/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1655,7 +1655,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}, nil
}

func (s *BucketStore) UpdateLabelNamesBloom(ctx context.Context) error {
func (s *BucketStore) UpdateLabelNames(ctx context.Context) error {
g, _ := errgroup.WithContext(ctx)
var mtx sync.Mutex
names := make(map[string]struct{})
Expand Down Expand Up @@ -1704,7 +1704,7 @@ func (s *BucketStore) UpdateLabelNamesBloom(ctx context.Context) error {
return nil
}

func (b *BucketStore) LabelNamesBloom() stringset.Set {
func (b *BucketStore) LabelNamesSet() stringset.Set {
b.bmtx.Lock()
defer b.bmtx.Unlock()

Expand Down

0 comments on commit 341e874

Please sign in to comment.