Skip to content

Commit

Permalink
Support filtered index cache (#6765)
Browse files Browse the repository at this point in the history
* support filtered index cache

Signed-off-by: Ben Ye <benye@amazon.com>

* changelog

Signed-off-by: Ben Ye <benye@amazon.com>

* fix doc

Signed-off-by: Ben Ye <benye@amazon.com>

* fix unit test failure

Signed-off-by: Ben Ye <benye@amazon.com>

* add item type validation

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* change enabled_items to []string type

Signed-off-by: Ben Ye <benye@amazon.com>

* generate docs

Signed-off-by: Ben Ye <benye@amazon.com>

* separate validation code

Signed-off-by: Ben Ye <benye@amazon.com>

* fix lint

Signed-off-by: Ben Ye <benye@amazon.com>

* update doc

Signed-off-by: Ben Ye <benye@amazon.com>

* fix interface

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Oct 6, 2023
1 parent 62d2753 commit 79bbf34
Show file tree
Hide file tree
Showing 5 changed files with 271 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Expand Up @@ -17,9 +17,10 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#6605](https://github.com/thanos-io/thanos/pull/6605) Query Frontend: Support vertical sharding binary expression with metric name when no matching labels specified.
- [#6308](https://github.com/thanos-io/thanos/pull/6308) Ruler: Support configuration flag that allows customizing template for alert message.
- [#6760](https://github.com/thanos-io/thanos/pull/6760) Query Frontend: Added TLS support in `--query-frontend.downstream-tripper-config` and `--query-frontend.downstream-tripper-config-file`
- [#6749](https://github.com/thanos-io/thanos/pull/6308) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache.
- [#6749](https://github.com/thanos-io/thanos/pull/6749) Store Gateway: Added `thanos_store_index_cache_fetch_duration_seconds` histogram for tracking latency of fetching data from index cache.
- [#6690](https://github.com/thanos-io/thanos/pull/6690) Store: *breaking :warning:* Add tenant label to relevant exported metrics. Note that this change may cause some pre-existing dashboard queries to be incorrect due to the added label.
- [#6530](https://github.com/thanos-io/thanos/pull/6530) / [#6690](https://github.com/thanos-io/thanos/pull/6690) Query: Add command line arguments for configuring tenants and forward tenant information to Store Gateway.
- [#6765](https://github.com/thanos-io/thanos/pull/6765) Index Cache: Add `enabled_items` to index cache config to selectively cache configured items. Available item types are `Postings`, `Series` and `ExpandedPostings`.

### Changed

Expand Down
6 changes: 6 additions & 0 deletions docs/components/store.md
Expand Up @@ -291,12 +291,14 @@ type: IN-MEMORY
config:
max_size: 0
max_item_size: 0
enabled_items: []
```

All the settings are **optional**:

- `max_size`: overall maximum number of bytes cache can contain. The value should be specified with a bytes unit (ie. `250MB`).
- `max_item_size`: maximum size of single item, in bytes. The value should be specified with a bytes unit (ie. `125MB`).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.

### Memcached index cache

Expand All @@ -315,6 +317,7 @@ config:
max_get_multi_batch_size: 0
dns_provider_update_interval: 0s
auto_discovery: false
enabled_items: []
```

The **required** settings are:
Expand All @@ -332,6 +335,7 @@ While the remaining settings are **optional**:
- `max_item_size`: maximum size of an item to be stored in memcached. This option should be set to the same value of memcached `-I` flag (defaults to 1MB) in order to avoid wasting network round trips to store items larger than the max item size allowed in memcached. If set to `0`, the item size is unlimited.
- `dns_provider_update_interval`: the DNS discovery update interval.
- `auto_discovery`: whether to use the auto-discovery mechanism for memcached.
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.

### Redis index cache

Expand Down Expand Up @@ -362,6 +366,7 @@ config:
master_name: ""
max_async_buffer_size: 10000
max_async_concurrency: 20
enabled_items: []
```

The **required** settings are:
Expand All @@ -377,6 +382,7 @@ While the remaining settings are **optional**:
- `read_timeout`: the redis read timeout.
- `write_timeout`: the redis write timeout.
- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s).
- `enabled_items`: selectively choose what types of items to cache. Supported values are `Postings`, `Series` and `ExpandedPostings`. By default, all items are cached.

Here is an example of what effect client-side caching could have:

Expand Down
11 changes: 11 additions & 0 deletions pkg/store/cache/factory.go
Expand Up @@ -28,6 +28,9 @@ const (
type IndexCacheConfig struct {
Type IndexCacheProvider `yaml:"type"`
Config interface{} `yaml:"config"`

// Available item types are Postings, Series and ExpandedPostings.
EnabledItems []string `yaml:"enabled_items"`
}

// NewIndexCache initializes and returns new index cache.
Expand Down Expand Up @@ -66,5 +69,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
if err != nil {
return nil, errors.Wrap(err, fmt.Sprintf("create %s index cache", cacheConfig.Type))
}

if len(cacheConfig.EnabledItems) > 0 {
if err = ValidateEnabledItems(cacheConfig.EnabledItems); err != nil {
return nil, err
}
cache = NewFilteredIndexCache(cache, cacheConfig.EnabledItems)
}

return cache, nil
}
88 changes: 88 additions & 0 deletions pkg/store/cache/filter_cache.go
@@ -0,0 +1,88 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"
"fmt"

"github.com/oklog/ulid"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
"golang.org/x/exp/slices"
)

type FilteredIndexCache struct {
cache IndexCache
enabledItems []string
}

// NewFilteredIndexCache creates a filtered index cache based on enabled items.
func NewFilteredIndexCache(cache IndexCache, enabledItems []string) *FilteredIndexCache {
return &FilteredIndexCache{
cache: cache,
enabledItems: enabledItems,
}
}

// StorePostings sets the postings identified by the ulid and label to the value v,
// if the postings already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
c.cache.StorePostings(blockID, l, v, tenant)
}
}

// FetchMultiPostings fetches multiple postings - each identified by a label -
// and returns a map containing cache hits, along with a list of missing keys.
func (c *FilteredIndexCache) FetchMultiPostings(ctx context.Context, blockID ulid.ULID, keys []labels.Label, tenant string) (hits map[labels.Label][]byte, misses []labels.Label) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypePostings) {
return c.cache.FetchMultiPostings(ctx, blockID, keys, tenant)
}
return nil, keys
}

// StoreExpandedPostings stores expanded postings for a set of label matchers.
func (c *FilteredIndexCache) StoreExpandedPostings(blockID ulid.ULID, matchers []*labels.Matcher, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
c.cache.StoreExpandedPostings(blockID, matchers, v, tenant)
}
}

// FetchExpandedPostings fetches expanded postings and returns cached data and a boolean value representing whether it is a cache hit or not.
func (c *FilteredIndexCache) FetchExpandedPostings(ctx context.Context, blockID ulid.ULID, matchers []*labels.Matcher, tenant string) ([]byte, bool) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeExpandedPostings) {
return c.cache.FetchExpandedPostings(ctx, blockID, matchers, tenant)
}
return nil, false
}

// StoreSeries sets the series identified by the ulid and id to the value v,
// if the series already exists in the cache it is not mutated.
func (c *FilteredIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef, v []byte, tenant string) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
c.cache.StoreSeries(blockID, id, v, tenant)
}
}

// FetchMultiSeries fetches multiple series - each identified by ID - from the cache
// and returns a map containing cache hits, along with a list of missing IDs.
func (c *FilteredIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.ULID, ids []storage.SeriesRef, tenant string) (hits map[storage.SeriesRef][]byte, misses []storage.SeriesRef) {
if len(c.enabledItems) == 0 || slices.Contains(c.enabledItems, cacheTypeSeries) {
return c.cache.FetchMultiSeries(ctx, blockID, ids, tenant)
}
return nil, ids
}

func ValidateEnabledItems(enabledItems []string) error {
for _, item := range enabledItems {
switch item {
// valid
case cacheTypePostings, cacheTypeExpandedPostings, cacheTypeSeries:
default:
return fmt.Errorf("unsupported item type %s", item)
}
}
return nil
}
164 changes: 164 additions & 0 deletions pkg/store/cache/filter_cache_test.go
@@ -0,0 +1,164 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package storecache

import (
"context"
"testing"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"

"github.com/thanos-io/thanos/pkg/tenancy"
)

func TestFilterCache(t *testing.T) {
blockID := ulid.MustNew(ulid.Now(), nil)
postingKeys := []labels.Label{
{Name: "foo", Value: "bar"},
}
expandedPostingsMatchers := []*labels.Matcher{
labels.MustNewMatcher(labels.MatchEqual, "foo", "bar"),
}
testPostingData := []byte("postings")
testExpandedPostingsData := []byte("expandedPostings")
testSeriesData := []byte("series")
ctx := context.TODO()
for _, tc := range []struct {
name string
enabledItems []string
expectedError string
verifyFunc func(t *testing.T, c IndexCache)
}{
{
name: "invalid item type",
expectedError: "unsupported item type foo",
enabledItems: []string{"foo"},
},
{
name: "invalid item type with 1 valid cache type",
expectedError: "unsupported item type foo",
enabledItems: []string{cacheTypeExpandedPostings, "foo"},
},
{
name: "empty enabled items",
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
{
name: "all enabled items",
enabledItems: []string{cacheTypeSeries, cacheTypePostings, cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Assert(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
{
name: "only enable postings",
enabledItems: []string{cacheTypePostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(missed))
testutil.Equals(t, testPostingData, hits[postingKeys[0]])

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
},
{
name: "only enable expanded postings",
enabledItems: []string{cacheTypeExpandedPostings},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

ep, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, true, hit)
testutil.Equals(t, testExpandedPostingsData, ep)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(misses))
testutil.Equals(t, 0, len(seriesHit))
},
},
{
name: "only enable series",
enabledItems: []string{cacheTypeSeries},
verifyFunc: func(t *testing.T, c IndexCache) {
c.StorePostings(blockID, postingKeys[0], testPostingData, tenancy.DefaultTenant)
c.StoreExpandedPostings(blockID, expandedPostingsMatchers, testExpandedPostingsData, tenancy.DefaultTenant)
c.StoreSeries(blockID, 1, testSeriesData, tenancy.DefaultTenant)

hits, missed := c.FetchMultiPostings(ctx, blockID, postingKeys, tenancy.DefaultTenant)
testutil.Equals(t, 1, len(missed))
testutil.Equals(t, 0, len(hits))

_, hit := c.FetchExpandedPostings(ctx, blockID, expandedPostingsMatchers, tenancy.DefaultTenant)
testutil.Equals(t, false, hit)

seriesHit, misses := c.FetchMultiSeries(ctx, blockID, []storage.SeriesRef{1}, tenancy.DefaultTenant)
testutil.Equals(t, 0, len(misses))
testutil.Equals(t, testSeriesData, seriesHit[1])
},
},
} {
t.Run(tc.name, func(t *testing.T) {
inMemoryCache, err := NewInMemoryIndexCacheWithConfig(log.NewNopLogger(), nil, prometheus.NewRegistry(), DefaultInMemoryIndexCacheConfig)
testutil.Ok(t, err)
err = ValidateEnabledItems(tc.enabledItems)
if tc.expectedError != "" {
testutil.Equals(t, tc.expectedError, err.Error())
} else {
testutil.Ok(t, err)
c := NewFilteredIndexCache(inMemoryCache, tc.enabledItems)
tc.verifyFunc(t, c)
}
})
}
}

0 comments on commit 79bbf34

Please sign in to comment.