Skip to content

Commit

Permalink
Using the cache in proxy and tsdb stores (only receiver)
Browse files Browse the repository at this point in the history
Signed-off-by: Pedro Tanaka <pedro.tanaka@shopify.com>
  • Loading branch information
pedro-stanaka committed May 13, 2024
1 parent 501efe7 commit d56e024
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pkg/store/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (s *LocalStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I
// Series returns all series for a requested time range and label matcher. The returned data may
// exceed the requested time bounds.
func (s *LocalStore) Series(r *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error {
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels)
match, matchers, err := matchesExternalLabels(r.Matchers, s.extLabels, nil)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
18 changes: 13 additions & 5 deletions pkg/store/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *PrometheusStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Sto

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -524,8 +524,16 @@ func (p *PrometheusStore) startPromRemoteRead(ctx context.Context, q *prompb.Que

// matchesExternalLabels returns false if given matchers are not matching external labels.
// If true, matchesExternalLabels also returns Prometheus matchers without those matching external labels.
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels) (bool, []*labels.Matcher, error) {
tms, err := storepb.MatchersToPromMatchers(ms...)
func matchesExternalLabels(ms []storepb.LabelMatcher, externalLabels labels.Labels, cache *storepb.MatchersCache) (bool, []*labels.Matcher, error) {
var (
tms []*labels.Matcher
err error
)
if cache != nil {
tms, err = storepb.MatchersToPromMatchersCached(cache, ms...)
} else {
tms, err = storepb.MatchersToPromMatchers(ms...)
}
if err != nil {
return false, nil, err
}
Expand Down Expand Up @@ -573,7 +581,7 @@ func (p *PrometheusStore) encodeChunk(ss []prompb.Sample) (storepb.Chunk_Encodin
func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -636,7 +644,7 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue

extLset := p.externalLabelsFn()

match, matchers, err := matchesExternalLabels(r.Matchers, extLset)
match, matchers, err := matchesExternalLabels(r.Matchers, extLset, nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
13 changes: 11 additions & 2 deletions pkg/store/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ type ProxyStore struct {
retrievalStrategy RetrievalStrategy
debugLogging bool
tsdbSelector *TSDBSelector
matcherCache *storepb.MatchersCache
}

type proxyStoreMetrics struct {
Expand All @@ -109,7 +110,7 @@ func RegisterStoreServer(storeSrv storepb.StoreServer, logger log.Logger) func(*
}
}

// BucketStoreOption are functions that configure BucketStore.
// ProxyStoreOption are functions that configure the ProxyStore.
type ProxyStoreOption func(s *ProxyStore)

// WithProxyStoreDebugLogging toggles debug logging.
Expand All @@ -126,6 +127,12 @@ func WithTSDBSelector(selector *TSDBSelector) ProxyStoreOption {
}
}

func WithMatcherCache(cache *storepb.MatchersCache) ProxyStoreOption {
return func(s *ProxyStore) {
s.matcherCache = cache
}
}

// NewProxyStore returns a new ProxyStore that uses the given clients that implements storeAPI to fan-in all series to the client.
// Note that there is no deduplication support. Deduplication should be done on the highest level (just before PromQL).
func NewProxyStore(
Expand Down Expand Up @@ -156,6 +163,7 @@ func NewProxyStore(
metrics: metrics,
retrievalStrategy: retrievalStrategy,
tsdbSelector: DefaultSelector,
matcherCache: storepb.NewMatchersCache(storepb.WithPromRegistry(reg)),
}

for _, option := range options {
Expand All @@ -170,6 +178,7 @@ func (s *ProxyStore) Info(_ context.Context, _ *storepb.InfoRequest) (*storepb.I
res := &storepb.InfoResponse{
StoreType: s.component.ToProto(),
Labels: labelpb.ZLabelsFromPromLabels(s.selectorLabels),
LabelSets: labelpb.ZLabelSetsFromPromLabels(s.selectorLabels),
}

minTime := int64(math.MaxInt64)
Expand Down Expand Up @@ -292,7 +301,7 @@ func (s *ProxyStore) Series(originalRequest *storepb.SeriesRequest, srv storepb.
reqLogger = log.With(reqLogger, "request", originalRequest.String())
}

match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels)
match, matchers, err := matchesExternalLabels(originalRequest.Matchers, s.selectorLabels, s.matcherCache)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/store/storepb/custom.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,21 @@ func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
return res, nil
}

// MatchersToPromMatchersCached returns Prometheus matchers from proto matchers.
// Works analogously to MatchersToPromMatchers but uses cache to avoid unnecessary allocations and conversions.
// NOTE: It allocates memory.
func MatchersToPromMatchersCached(cache *MatchersCache, ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
pm, err := cache.GetOrSet(m, MatcherToPromMatcher)
if err != nil {
return nil, err
}
res = append(res, pm)
}
return res, nil
}

func MatcherToPromMatcher(m LabelMatcher) (*labels.Matcher, error) {
var t labels.MatchType

Expand Down
40 changes: 33 additions & 7 deletions pkg/store/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"google.golang.org/grpc"
Expand Down Expand Up @@ -43,6 +44,7 @@ type TSDBStore struct {
component component.StoreAPI
buffers sync.Pool
maxBytesPerFrame int
matcherCache *storepb.MatchersCache

extLset labels.Labels
mtx sync.RWMutex
Expand All @@ -60,12 +62,35 @@ type ReadWriteTSDBStore struct {
storepb.WriteableStoreServer
}

type tsdbStoreOpts struct {
cacheSize int
registry prometheus.Registerer
}

var defaultTsdbStoreOpts = tsdbStoreOpts{
cacheSize: 200,
}

type TSDBStoreOption func(*tsdbStoreOpts)

func WithCacheSize(size int) TSDBStoreOption {
return func(o *tsdbStoreOpts) {
o.cacheSize = size
}
}

// NewTSDBStore creates a new TSDBStore.
// NOTE: Given lset has to be sorted.
func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels) *TSDBStore {
func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI, extLset labels.Labels, opts ...TSDBStoreOption) *TSDBStore {
if logger == nil {
logger = log.NewNopLogger()
}

opt := defaultTsdbStoreOpts
for _, o := range opts {
o(&opt)
}

return &TSDBStore{
logger: logger,
db: db,
Expand All @@ -76,6 +101,7 @@ func NewTSDBStore(logger log.Logger, db TSDBReader, component component.StoreAPI
b := make([]byte, 0, initialBufSize)
return &b
}},
matcherCache: storepb.NewMatchersCache(storepb.WithPromRegistry(opt.registry), storepb.WithSize(opt.cacheSize)),
}
}

Expand Down Expand Up @@ -130,13 +156,13 @@ func (s *TSDBStore) LabelSet() []labelpb.ZLabelSet {
return labelSets
}

func (p *TSDBStore) TSDBInfos() []infopb.TSDBInfo {
labels := p.LabelSet()
func (s *TSDBStore) TSDBInfos() []infopb.TSDBInfo {
labels := s.LabelSet()
if len(labels) == 0 {
return []infopb.TSDBInfo{}
}

mint, maxt := p.TimeRange()
mint, maxt := s.TimeRange()
return []infopb.TSDBInfo{
{
Labels: labelpb.ZLabelSet{
Expand Down Expand Up @@ -171,7 +197,7 @@ type CloseDelegator interface {
func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_SeriesServer) error {
srv := newFlushableServer(seriesSrv, sortingStrategyStore)

match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset())
match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), s.matcherCache)
if err != nil {
return status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -288,7 +314,7 @@ func (s *TSDBStore) Series(r *storepb.SeriesRequest, seriesSrv storepb.Store_Ser
func (s *TSDBStore) LabelNames(ctx context.Context, r *storepb.LabelNamesRequest) (
*storepb.LabelNamesResponse, error,
) {
match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset())
match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down Expand Up @@ -347,7 +373,7 @@ func (s *TSDBStore) LabelValues(ctx context.Context, r *storepb.LabelValuesReque
}
}

match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset())
match, matchers, err := matchesExternalLabels(r.Matchers, s.getExtLset(), nil)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
Expand Down

0 comments on commit d56e024

Please sign in to comment.