diff --git a/CHANGELOG.md b/CHANGELOG.md index 5ef910c64f..d05e115954 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,8 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re ### Changed +- [#7353](https://github.com/thanos-io/thanos/pull/7353) Receivers|Store: cache matchers for series calls. + ### Removed ## [v0.35.0](https://github.com/thanos-io/thanos/tree/release-0.35) - 02.05.2024 diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 5398820780..fb6e3e74dd 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -50,6 +50,7 @@ import ( httpserver "github.com/thanos-io/thanos/pkg/server/http" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" "github.com/thanos-io/thanos/pkg/tenancy" "github.com/thanos-io/thanos/pkg/tls" ) @@ -204,6 +205,14 @@ func runReceive( return errors.Wrap(err, "parse relabel configuration") } + var cache *storepb.MatchersCache + if conf.matcherCacheSize > 0 { + cache, err = storepb.NewMatchersCache(storepb.WithSize(conf.matcherCacheSize), storepb.WithPromRegistry(reg)) + if err != nil { + return errors.Wrap(err, "create matchers cache") + } + } + dbs := receive.NewMultiTSDB( conf.dataDir, logger, @@ -214,6 +223,7 @@ func runReceive( bkt, conf.allowOutOfOrderUpload, hashFunc, + cache, ) writer := receive.NewWriter(log.With(logger, "component", "receive-writer"), dbs, &receive.WriterOptions{ Intern: conf.writerInterning, @@ -322,6 +332,7 @@ func runReceive( options := []store.ProxyStoreOption{ store.WithProxyStoreDebugLogging(debugLogging), + store.WithMatcherCache(cache), } proxy := store.NewProxyStore( @@ -838,6 +849,8 @@ type receiveConfig struct { limitsConfigReloadTimer time.Duration asyncForwardWorkerCount uint + + matcherCacheSize int } func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { @@ -973,6 +986,8 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { "about order."). Default("false").Hidden().BoolVar(&rc.allowOutOfOrderUpload) + cmd.Flag("matcher-cache-size", "The size of the cache used for matching against external labels. Using 0 disables caching.").Default("0").IntVar(&rc.matcherCacheSize) + rc.reqLogConfig = extkingpin.RegisterRequestLoggingFlags(cmd) rc.writeLimitsConfig = extflag.RegisterPathOrContent(cmd, "receive.limits-config", "YAML file that contains limit configuration.", extflag.WithEnvSubstitution(), extflag.WithHidden()) diff --git a/pkg/query/query_test.go b/pkg/query/query_test.go index b490156e82..08ef448f4a 100644 --- a/pkg/query/query_test.go +++ b/pkg/query/query_test.go @@ -15,6 +15,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/go-kit/log" "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/storepb" @@ -55,6 +56,8 @@ func TestQuerier_Proxy(t *testing.T) { files, err := filepath.Glob("testdata/promql/**/*.test") testutil.Ok(t, err) testutil.Equals(t, 10, len(files), "%v", files) + cache, err := storepb.NewMatchersCache() + testutil.Ok(t, err) logger := log.NewLogfmtLogger(os.Stderr) t.Run("proxy", func(t *testing.T) { @@ -63,7 +66,7 @@ func TestQuerier_Proxy(t *testing.T) { logger, nil, store.NewProxyStore(logger, nil, func() []store.Client { return sc.get() }, - component.Debug, nil, 5*time.Minute, store.EagerRetrieval), + component.Debug, nil, 5*time.Minute, store.EagerRetrieval, store.WithMatcherCache(cache)), 1000000, 5*time.Minute, ) diff --git a/pkg/receive/handler_test.go b/pkg/receive/handler_test.go index 4f81a3d1ca..c360b4f259 100644 --- a/pkg/receive/handler_test.go +++ b/pkg/receive/handler_test.go @@ -999,6 +999,7 @@ func benchmarkHandlerMultiTSDBReceiveRemoteWrite(b testutil.TB) { nil, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(b, m.Close()) }() handler.writer = NewWriter(logger, m, &WriterOptions{}) diff --git a/pkg/receive/multitsdb.go b/pkg/receive/multitsdb.go index 5ea7bfcc5b..210630c5f8 100644 --- a/pkg/receive/multitsdb.go +++ b/pkg/receive/multitsdb.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/api/status" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" @@ -59,6 +60,7 @@ type MultiTSDB struct { allowOutOfOrderUpload bool hashFunc metadata.HashFunc hashringConfigs []HashringConfig + cache *storepb.MatchersCache } // NewMultiTSDB creates new MultiTSDB. @@ -73,6 +75,7 @@ func NewMultiTSDB( bucket objstore.Bucket, allowOutOfOrderUpload bool, hashFunc metadata.HashFunc, + cache *storepb.MatchersCache, ) *MultiTSDB { if l == nil { l = log.NewNopLogger() @@ -90,6 +93,7 @@ func NewMultiTSDB( bucket: bucket, allowOutOfOrderUpload: allowOutOfOrderUpload, hashFunc: hashFunc, + cache: cache, } } @@ -654,7 +658,7 @@ func (t *MultiTSDB) startTSDB(logger log.Logger, tenantID string, tenant *tenant shipper.DefaultMetaFilename, ) } - tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset), s, ship, exemplars.NewTSDB(s, lset)) + tenant.set(store.NewTSDBStore(logger, s, component.Receive, lset, store.WithMatcherCacheInstance(t.cache)), s, ship, exemplars.NewTSDB(s, lset)) level.Info(logger).Log("msg", "TSDB is now ready") return nil } diff --git a/pkg/receive/multitsdb_test.go b/pkg/receive/multitsdb_test.go index 53d99cabfa..b6d57af9a0 100644 --- a/pkg/receive/multitsdb_test.go +++ b/pkg/receive/multitsdb_test.go @@ -43,21 +43,14 @@ func TestMultiTSDB(t *testing.T) { logger := log.NewLogfmtLogger(os.Stderr) t.Run("run fresh", func(t *testing.T) { - m := NewMultiTSDB( - dir, logger, prometheus.NewRegistry(), &tsdb.Options{ - MinBlockDuration: (2 * time.Hour).Milliseconds(), - MaxBlockDuration: (2 * time.Hour).Milliseconds(), - RetentionDuration: (6 * time.Hour).Milliseconds(), - NoLockfile: true, - MaxExemplars: 100, - EnableExemplarStorage: true, - }, - labels.FromStrings("replica", "01"), - "tenant_id", - nil, - false, - metadata.NoneFunc, - ) + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + MaxExemplars: 100, + EnableExemplarStorage: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc, nil) defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) @@ -141,6 +134,7 @@ func TestMultiTSDB(t *testing.T) { nil, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -172,19 +166,12 @@ func TestMultiTSDB(t *testing.T) { t.Run("flush with one sample produces a block", func(t *testing.T) { const testTenant = "test_tenant" - m := NewMultiTSDB( - dir, logger, prometheus.NewRegistry(), &tsdb.Options{ - MinBlockDuration: (2 * time.Hour).Milliseconds(), - MaxBlockDuration: (2 * time.Hour).Milliseconds(), - RetentionDuration: (6 * time.Hour).Milliseconds(), - NoLockfile: true, - }, - labels.FromStrings("replica", "01"), - "tenant_id", - nil, - false, - metadata.NoneFunc, - ) + m := NewMultiTSDB(dir, logger, prometheus.NewRegistry(), &tsdb.Options{ + MinBlockDuration: (2 * time.Hour).Milliseconds(), + MaxBlockDuration: (2 * time.Hour).Milliseconds(), + RetentionDuration: (6 * time.Hour).Milliseconds(), + NoLockfile: true, + }, labels.FromStrings("replica", "01"), "tenant_id", nil, false, metadata.NoneFunc, nil) defer func() { testutil.Ok(t, m.Close()) }() testutil.Ok(t, m.Flush()) @@ -451,6 +438,7 @@ func TestMultiTSDBPrune(t *testing.T) { test.bucket, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -520,6 +508,7 @@ func TestMultiTSDBRecreatePrunedTenant(t *testing.T) { objstore.NewInMemBucket(), false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -581,6 +570,7 @@ func TestAlignedHeadFlush(t *testing.T) { test.bucket, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -655,6 +645,7 @@ func TestMultiTSDBStats(t *testing.T) { nil, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -684,6 +675,7 @@ func TestMultiTSDBWithNilStore(t *testing.T) { nil, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -725,6 +717,7 @@ func TestProxyLabelValues(t *testing.T) { nil, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(t, m.Close()) }() @@ -815,6 +808,7 @@ func BenchmarkMultiTSDB(b *testing.B) { nil, false, metadata.NoneFunc, + nil, ) defer func() { testutil.Ok(b, m.Close()) }() diff --git a/pkg/receive/receive_test.go b/pkg/receive/receive_test.go index ea1b6d81fd..d2de6e59dd 100644 --- a/pkg/receive/receive_test.go +++ b/pkg/receive/receive_test.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/tsdb" "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/store" "github.com/thanos-io/thanos/pkg/store/labelpb" @@ -810,6 +811,7 @@ func initializeMultiTSDB(dir string) *MultiTSDB { bucket, false, metadata.NoneFunc, + nil, ) return m diff --git a/pkg/receive/writer_test.go b/pkg/receive/writer_test.go index 34613794b8..dd210f75da 100644 --- a/pkg/receive/writer_test.go +++ b/pkg/receive/writer_test.go @@ -344,6 +344,7 @@ func TestWriter(t *testing.T) { nil, false, metadata.NoneFunc, + nil, ) t.Cleanup(func() { testutil.Ok(t, m.Close()) }) @@ -436,6 +437,7 @@ func benchmarkWriter(b *testing.B, labelsNum int, seriesNum int, generateHistogr nil, false, metadata.NoneFunc, + nil, ) b.Cleanup(func() { testutil.Ok(b, m.Close()) }) diff --git a/pkg/store/local.go b/pkg/store/local.go index 4e88c0a7e3..01c6ee336a 100644 --- a/pkg/store/local.go +++ b/pkg/store/local.go @@ -151,7 +151,7 @@ func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I // Series returns all series for a requested time range and label matcher. The returned data may // exceed the requested time bounds. func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { - match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels) + match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/prometheus.go b/pkg/store/prometheus.go index d52fcb07d9..46405559db 100644 --- a/pkg/store/prometheus.go +++ b/pkg/store/prometheus.go @@ -149,7 +149,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -524,8 +524,16 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que // matchesExternalLabels returns false if given matchers are not matching external labels. // If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels. -func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) { - tms, err := storepb.MatchersToPromMatchers(ms...) +func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) { + var ( + tms []*labels.Matcher + err error + ) + if cache != nil { + tms, err = storepb.MatchersToPromMatchersCached(cache, ms...) + } else { + tms, err = storepb.MatchersToPromMatchers(ms...) + } if err != nil { return false, nil, err } @@ -573,7 +581,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -636,7 +644,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue extLset := p.externalLabelsFn() - match, matchers, err := matchesExternalLabels(r.Matchers, extLset) + match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy.go b/pkg/store/proxy.go index 6dbe5df7a3..614fb056f4 100644 --- a/pkg/store/proxy.go +++ b/pkg/store/proxy.go @@ -86,6 +86,7 @@ type ProxyStore struct { retrievalStrategy RetrievalStrategy debugLogging bool tsdbSelector *TSDBSelector + matcherCache *storepb.MatchersCache } type proxyStoreMetrics struct { @@ -109,7 +110,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(* } } -// BucketStoreOption are functions that configure BucketStore. +// ProxyStoreOption are functions that configure the ProxyStore. type ProxyStoreOption func(s *ProxyStore) // WithProxyStoreDebugLogging toggles debug logging. @@ -126,6 +127,13 @@ func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption { } } +// WithMatcherCache sets the matcher cache instance for the proxy. +func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption { + return func(s *ProxyStore) { + s.matcherCache = cache + } +} + // NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client. // Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL). func NewProxyStore( @@ -292,7 +300,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb. reqLogger = log.With(reqLogger, "request", originalRequest.String()) } - match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels) + match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } diff --git a/pkg/store/proxy_test.go b/pkg/store/proxy_test.go index c4571de976..e2c95c07fd 100644 --- a/pkg/store/proxy_test.go +++ b/pkg/store/proxy_test.go @@ -6,11 +6,11 @@ package store import ( "context" "fmt" - "math" "math/rand" "os" "path/filepath" + "strings" "sync" "testing" "time" @@ -1974,6 +1974,47 @@ func BenchmarkProxySeries(b *testing.B) { }) } +func BenchmarkProxySeriesRegex(b *testing.B) { + tb := testutil.NewTB(b) + + cache, err := storepb.NewMatchersCache(storepb.WithSize(200)) + testutil.Ok(b, err) + + q := NewProxyStore(nil, + nil, + func() []Client { return nil }, + component.Query, + labels.EmptyLabels(), 0*time.Second, EagerRetrieval, + WithMatcherCache(cache), + ) + + words := []string{"foo", "bar", "baz", "qux", "quux", "corge", "grault", "garply", "waldo", "fred", "plugh", "xyzzy", "thud"} + bigRegex := strings.Builder{} + for i := 0; i < 200; i++ { + bigRegex.WriteString(words[rand.Intn(len(words))]) + bigRegex.WriteString("|") + } + + matchers := []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "foo", Value: ".*"}, + {Type: storepb.LabelMatcher_RE, Name: "bar", Value: bigRegex.String()}, + } + + // Create a regex that matches all series. + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: math.MaxInt64, + Matchers: matchers, + } + s := newStoreSeriesServer(context.Background()) + + tb.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + testutil.Ok(b, q.Series(req, s)) + } +} + func benchProxySeries(t testutil.TB, totalSamples, totalSeries int) { tmpDir := t.TempDir() @@ -2293,5 +2334,4 @@ func TestDedupRespHeap_Deduplication(t *testing.T) { tcase.testFn(tcase.responses, h) }) } - } diff --git a/pkg/store/storepb/custom.go b/pkg/store/storepb/custom.go index faed79bc7b..b3754bafa9 100644 --- a/pkg/store/storepb/custom.go +++ b/pkg/store/storepb/custom.go @@ -379,29 +379,49 @@ func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) { func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) { res := make([]*labels.Matcher, 0, len(ms)) for _, m := range ms { - var t labels.MatchType - - switch m.Type { - case LabelMatcher_EQ: - t = labels.MatchEqual - case LabelMatcher_NEQ: - t = labels.MatchNotEqual - case LabelMatcher_RE: - t = labels.MatchRegexp - case LabelMatcher_NRE: - t = labels.MatchNotRegexp - default: - return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + pm, err := MatcherToPromMatcher(m) + if err != nil { + return nil, err } - m, err := labels.NewMatcher(t, m.Name, m.Value) + res = append(res, pm) + } + return res, nil +} + +// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers. +// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions. +// NOTE: It allocates memory. +func MatchersToPromMatchersCached(cache *MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) { + res := make([]*labels.Matcher, 0, len(ms)) + for _, m := range ms { + pm, err := cache.GetOrSet(m, MatcherToPromMatcher) if err != nil { return nil, err } - res = append(res, m) + res = append(res, pm) } return res, nil } +// MatcherToPromMatcher converts a Thanos label matcher to Prometheus label matcher. +func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) { + var t labels.MatchType + + switch m.Type { + case LabelMatcher_EQ: + t = labels.MatchEqual + case LabelMatcher_NEQ: + t = labels.MatchNotEqual + case LabelMatcher_RE: + t = labels.MatchRegexp + case LabelMatcher_NRE: + t = labels.MatchNotRegexp + default: + return nil, errors.Errorf("unrecognized label matcher type %d", m.Type) + } + return labels.NewMatcher(t, m.Name, m.Value) +} + // MatchersToString converts label matchers to string format. // String should be parsable as a valid PromQL query metric selector. func MatchersToString(ms ...LabelMatcher) string { diff --git a/pkg/store/storepb/matcher_cache.go b/pkg/store/storepb/matcher_cache.go new file mode 100644 index 0000000000..d3e66d1d1d --- /dev/null +++ b/pkg/store/storepb/matcher_cache.go @@ -0,0 +1,96 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb + +import ( + lru "github.com/hashicorp/golang-lru/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" +) + +const DefaultCacheSize = 200 + +type NewItemFunc func(matcher LabelMatcher) (*labels.Matcher, error) + +type MatchersCache struct { + reg prometheus.Registerer + cache *lru.TwoQueueCache[LabelMatcher, *labels.Matcher] + metrics *matcherCacheMetrics + size int +} + +type MatcherCacheOption func(*MatchersCache) + +func WithPromRegistry(reg prometheus.Registerer) MatcherCacheOption { + return func(c *MatchersCache) { + c.reg = reg + } +} + +func WithSize(size int) MatcherCacheOption { + return func(c *MatchersCache) { + c.size = size + } +} + +func NewMatchersCache(opts ...MatcherCacheOption) (*MatchersCache, error) { + cache := &MatchersCache{ + reg: prometheus.NewRegistry(), + size: DefaultCacheSize, + } + + for _, opt := range opts { + opt(cache) + } + cache.metrics = newMatcherCacheMetrics(cache.reg) + + lruCache, err := lru.New2Q[LabelMatcher, *labels.Matcher](cache.size) + if err != nil { + return nil, err + } + cache.cache = lruCache + + return cache, nil +} + +func (c *MatchersCache) GetOrSet(key LabelMatcher, newItem NewItemFunc) (*labels.Matcher, error) { + c.metrics.requestsTotal.Inc() + if item, ok := c.cache.Get(key); ok { + c.metrics.hitsTotal.Inc() + return item, nil + } + + item, err := newItem(key) + if err != nil { + return nil, err + } + c.cache.Add(key, item) + c.metrics.numItems.Set(float64(c.cache.Len())) + + return item, nil +} + +type matcherCacheMetrics struct { + requestsTotal prometheus.Counter + hitsTotal prometheus.Counter + numItems prometheus.Gauge +} + +func newMatcherCacheMetrics(reg prometheus.Registerer) *matcherCacheMetrics { + return &matcherCacheMetrics{ + requestsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_requests_total", + Help: "Total number of cache requests for series matchers", + }), + hitsTotal: promauto.With(reg).NewCounter(prometheus.CounterOpts{ + Name: "thanos_matchers_cache_hits_total", + Help: "Total number of cache hits for series matchers", + }), + numItems: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "thanos_matchers_cache_items", + Help: "Total number of cached items", + }), + } +} diff --git a/pkg/store/storepb/matcher_cache_test.go b/pkg/store/storepb/matcher_cache_test.go new file mode 100644 index 0000000000..754379474c --- /dev/null +++ b/pkg/store/storepb/matcher_cache_test.go @@ -0,0 +1,86 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package storepb_test + +import ( + "testing" + + "github.com/efficientgo/core/testutil" + "github.com/prometheus/prometheus/model/labels" + + "github.com/thanos-io/thanos/pkg/store/storepb" +) + +func TestMatchersCache(t *testing.T) { + cache, err := storepb.NewMatchersCache(storepb.WithSize(2)) + testutil.Ok(t, err) + + matcher := storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key", + Value: "val", + } + + matcher2 := storepb.LabelMatcher{ + Type: storepb.LabelMatcher_RE, + Name: "key2", + Value: "val2|val3", + } + + matcher3 := storepb.LabelMatcher{ + Type: storepb.LabelMatcher_EQ, + Name: "key3", + Value: "val3", + } + + var cacheHit bool + newItem := func(matcher storepb.LabelMatcher) (*labels.Matcher, error) { + cacheHit = false + return storepb.MatcherToPromMatcher(matcher) + } + expected := labels.MustNewMatcher(labels.MatchEqual, "key", "val") + expected2 := labels.MustNewMatcher(labels.MatchRegexp, "key2", "val2|val3") + expected3 := labels.MustNewMatcher(labels.MatchEqual, "key3", "val3") + + item, err := cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) + + cacheHit = true + item, err = cache.GetOrSet(matcher, newItem) + testutil.Ok(t, err) + testutil.Equals(t, true, cacheHit) + testutil.Equals(t, expected, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher3, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected3, item) + + cacheHit = true + item, err = cache.GetOrSet(matcher2, newItem) + testutil.Ok(t, err) + testutil.Equals(t, false, cacheHit) + testutil.Equals(t, expected2.String(), item.String()) +} diff --git a/pkg/store/tsdb.go b/pkg/store/tsdb.go index 6985c716fa..0d8c60ea11 100644 --- a/pkg/store/tsdb.go +++ b/pkg/store/tsdb.go @@ -43,6 +43,7 @@ type TSDBStore struct { component component.StoreAPI buffers sync.Pool maxBytesPerFrame int + matcherCache *storepb.MatchersCache extLset labels.Labels mtx sync.RWMutex @@ -60,12 +61,32 @@ type ReadWriteTSDBStore struct { storepb.WriteableStoreServer } +type tsdbStoreOpts struct { + cache *storepb.MatchersCache +} + +var defaultTsdbStoreOpts = tsdbStoreOpts{} + +type TSDBStoreOption func(*tsdbStoreOpts) + +func WithMatcherCacheInstance(cache *storepb.MatchersCache) TSDBStoreOption { + return func(o *tsdbStoreOpts) { + o.cache = cache + } +} + // NewTSDBStore creates a new TSDBStore. // NOTE: Given lset has to be sorted. -func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore { +func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, opts ...TSDBStoreOption) *TSDBStore { if logger == nil { logger = log.NewNopLogger() } + + opt := defaultTsdbStoreOpts + for _, o := range opts { + o(&opt) + } + return &TSDBStore{ logger: logger, db: db, @@ -130,13 +151,13 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet { return labelSets } -func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo { - labels := p.LabelSet() +func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo { + labels := s.LabelSet() if len(labels) == 0 { return []infopb.TSDBInfo{} } - mint, maxt := p.TimeRange() + mint, maxt := s.TimeRange() return []infopb.TSDBInfo{ { Labels: labelpb.ZLabelSet{ @@ -171,7 +192,7 @@ type CloseDelegator interface { func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error { srv := newFlushableServer(seriesSrv, sortingStrategyStore) - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache) if err != nil { return status.Error(codes.InvalidArgument, err.Error()) } @@ -288,7 +309,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) ( *storepb.LabelNamesResponse, error, ) { - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) } @@ -347,7 +368,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque } } - match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset()) + match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), nil) if err != nil { return nil, status.Error(codes.InvalidArgument, err.Error()) }