diff --git a/CHANGELOG.md b/CHANGELOG.md index 020c9b16c7..c76f879362 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -25,6 +25,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#6441](https://github.com/thanos-io/thanos/pull/6441) Compact: Compactor will set `index_stats` in `meta.json` file with max series and chunk size information. - [#6466](https://github.com/thanos-io/thanos/pull/6466) Mixin (Receive): add limits alerting for configuration reload and meta-monitoring. - [#6467](https://github.com/thanos-io/thanos/pull/6467) Mixin (Receive): add alert for tenant reaching head series limit. +- [#6528](https://github.com/thanos-io/thanos/pull/6528) Index Cache: Add histogram metric `thanos_store_index_cache_stored_data_size_bytes` for item size. ### Fixed - [#6503](https://github.com/thanos-io/thanos/pull/6503) *: Change the engine behind `ContentPathReloader` to be completely independent of any filesystem concept. This effectively fixes this configuration reload when used with Kubernetes ConfigMaps, Secrets, or other volume mounts. diff --git a/pkg/store/cache/cache.go b/pkg/store/cache/cache.go index bcbb727b30..ad6ecac30e 100644 --- a/pkg/store/cache/cache.go +++ b/pkg/store/cache/cache.go @@ -58,8 +58,9 @@ type IndexCache interface { // Common metrics that should be used by all cache implementations. type commonMetrics struct { - requestTotal *prometheus.CounterVec - hitsTotal *prometheus.CounterVec + requestTotal *prometheus.CounterVec + hitsTotal *prometheus.CounterVec + dataSizeBytes *prometheus.HistogramVec } func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { @@ -72,6 +73,13 @@ func newCommonMetrics(reg prometheus.Registerer) *commonMetrics { Name: "thanos_store_index_cache_hits_total", Help: "Total number of items requests to the cache that were a hit.", }, []string{"item_type"}), + dataSizeBytes: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_store_index_cache_stored_data_size_bytes", + Help: "Histogram to track item data size stored in index cache", + Buckets: []float64{ + 32, 256, 512, 1024, 32 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024, 32 * 1024 * 1024, 256 * 1024 * 1024, 512 * 1024 * 1024, + }, + }, []string{"item_type"}), } } diff --git a/pkg/store/cache/inmemory.go b/pkg/store/cache/inmemory.go index d5227285a2..747199b414 100644 --- a/pkg/store/cache/inmemory.go +++ b/pkg/store/cache/inmemory.go @@ -295,6 +295,7 @@ func copyToKey(l labels.Label) cacheKeyPostings { // StorePostings sets the postings identified by the ulid and label to the value v, // if the postings already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings).Observe(float64(len(v))) c.set(cacheTypePostings, cacheKey{block: blockID.String(), key: copyToKey(l)}, v) } @@ -318,6 +319,7 @@ func (c *InMemoryIndexCache) FetchMultiPostings(_ context.Context, blockID ulid. // StoreExpandedPostings stores expanded postings for a set of label matchers. func (c *InMemoryIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings).Observe(float64(len(v))) c.set(cacheTypeExpandedPostings, cacheKey{block: blockID.String(), key: cacheKeyExpandedPostings(labelMatchersToString(matchers))}, v) } @@ -332,6 +334,7 @@ func (c *InMemoryIndexCache) FetchExpandedPostings(_ context.Context, blockID ul // StoreSeries sets the series identified by the ulid and id to the value v, // if the series already exists in the cache it is not mutated. func (c *InMemoryIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { + c.commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries).Observe(float64(len(v))) c.set(cacheTypeSeries, cacheKey{blockID.String(), cacheKeySeries(id), ""}, v) } diff --git a/pkg/store/cache/memcached.go b/pkg/store/cache/memcached.go index f5ab1c4b02..9292f3ed59 100644 --- a/pkg/store/cache/memcached.go +++ b/pkg/store/cache/memcached.go @@ -33,12 +33,15 @@ type RemoteIndexCache struct { compressionScheme string // Metrics. - postingRequests prometheus.Counter - seriesRequests prometheus.Counter - expandedPostingRequests prometheus.Counter - postingHits prometheus.Counter - seriesHits prometheus.Counter - expandedPostingHits prometheus.Counter + postingRequests prometheus.Counter + seriesRequests prometheus.Counter + expandedPostingRequests prometheus.Counter + postingHits prometheus.Counter + seriesHits prometheus.Counter + expandedPostingHits prometheus.Counter + postingDataSizeBytes prometheus.Observer + expandedPostingDataSizeBytes prometheus.Observer + seriesDataSizeBytes prometheus.Observer } // NewRemoteIndexCache makes a new RemoteIndexCache. @@ -61,6 +64,10 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli c.seriesHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeSeries) c.expandedPostingHits = commonMetrics.hitsTotal.WithLabelValues(cacheTypeExpandedPostings) + c.postingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypePostings) + c.seriesDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeSeries) + c.expandedPostingDataSizeBytes = commonMetrics.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings) + level.Info(logger).Log("msg", "created index cache") return c, nil @@ -70,6 +77,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte) { + c.postingDataSizeBytes.Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err) @@ -118,6 +126,7 @@ func (c *RemoteIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid. // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labels.Matcher, v []byte) { + c.expandedPostingDataSizeBytes.Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil { @@ -148,6 +157,7 @@ func (c *RemoteIndexCache) FetchExpandedPostings(ctx context.Context, blockID ul // The function enqueues the request and returns immediately: the entry will be // asynchronously stored in the cache. func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte) { + c.seriesDataSizeBytes.Observe(float64(len(v))) key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string() if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {