Skip to content

Commit

Permalink
make index cache ttl configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed Oct 6, 2023
1 parent 56d8882 commit 57330d3
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 11 deletions.
12 changes: 10 additions & 2 deletions pkg/store/cache/factory.go
Expand Up @@ -6,6 +6,7 @@ package storecache
import (
"fmt"
"strings"
"time"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -31,6 +32,9 @@ type IndexCacheConfig struct {

// Available item types are Postings, Series and ExpandedPostings.
EnabledItems []string `yaml:"enabled_items"`
// TTL for storing items in remote cache. Not supported for inmemory cache.
// Default value is 24h.
TTL time.Duration `yaml:"ttl"`
}

// NewIndexCache initializes and returns new index cache.
Expand All @@ -47,6 +51,10 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
return nil, errors.Wrap(err, "marshal content of cache backend configuration")
}

if cacheConfig.TTL == 0 {
cacheConfig.TTL = memcachedDefaultTTL
}

var cache IndexCache
switch strings.ToUpper(string(cacheConfig.Type)) {
case string(INMEMORY):
Expand All @@ -55,13 +63,13 @@ func NewIndexCache(logger log.Logger, confContentYaml []byte, reg prometheus.Reg
var memcached cacheutil.RemoteCacheClient
memcached, err = cacheutil.NewMemcachedClient(logger, "index-cache", backendConfig, reg)
if err == nil {
cache, err = NewRemoteIndexCache(logger, memcached, cacheMetrics, reg)
cache, err = NewRemoteIndexCache(logger, memcached, cacheMetrics, reg, cacheConfig.TTL)
}
case string(REDIS):
var redisCache cacheutil.RemoteCacheClient
redisCache, err = cacheutil.NewRedisClient(logger, "index-cache", backendConfig, reg)
if err == nil {
cache, err = NewRemoteIndexCache(logger, redisCache, cacheMetrics, reg)
cache, err = NewRemoteIndexCache(logger, redisCache, cacheMetrics, reg, cacheConfig.TTL)
}
default:
return nil, errors.Errorf("index cache with type %s is not supported", cacheConfig.Type)
Expand Down
2 changes: 1 addition & 1 deletion pkg/store/cache/factory_test.go
Expand Up @@ -16,7 +16,7 @@ func TestIndexCacheMetrics(t *testing.T) {
commonMetrics := newCommonMetrics(reg)

memcached := newMockedMemcachedClient(nil)
_, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, commonMetrics, reg)
_, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, commonMetrics, reg, memcachedDefaultTTL)
testutil.Ok(t, err)
conf := []byte(`
max_size: 10MB
Expand Down
13 changes: 8 additions & 5 deletions pkg/store/cache/memcached.go
Expand Up @@ -33,6 +33,8 @@ type RemoteIndexCache struct {

compressionScheme string

ttl time.Duration

// Metrics.
requestTotal *prometheus.CounterVec
hitsTotal *prometheus.CounterVec
Expand All @@ -41,8 +43,9 @@ type RemoteIndexCache struct {
}

// NewRemoteIndexCache makes a new RemoteIndexCache.
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer) (*RemoteIndexCache, error) {
func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheClient, commonMetrics *commonMetrics, reg prometheus.Registerer, ttl time.Duration) (*RemoteIndexCache, error) {
c := &RemoteIndexCache{
ttl: ttl,
logger: logger,
memcached: cacheClient,
compressionScheme: compressionSchemeStreamedSnappy, // Hardcode it for now. Expose it once we support different types of compressions.
Expand Down Expand Up @@ -81,7 +84,7 @@ func NewRemoteIndexCache(logger log.Logger, cacheClient cacheutil.RemoteCacheCli
func (c *RemoteIndexCache) StorePostings(blockID ulid.ULID, l labels.Label, v []byte, tenant string) {
c.dataSizeBytes.WithLabelValues(cacheTypePostings, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyPostings(l), c.compressionScheme}.string()
if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache postings in memcached", "err", err)
}
}
Expand Down Expand Up @@ -134,7 +137,7 @@ func (c *RemoteIndexCache) StoreExpandedPostings(blockID ulid.ULID, keys []*labe
c.dataSizeBytes.WithLabelValues(cacheTypeExpandedPostings, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeyExpandedPostings(labelMatchersToString(keys)), c.compressionScheme}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache expanded postings in memcached", "err", err)
}
}
Expand Down Expand Up @@ -168,7 +171,7 @@ func (c *RemoteIndexCache) StoreSeries(blockID ulid.ULID, id storage.SeriesRef,
c.dataSizeBytes.WithLabelValues(cacheTypeSeries, tenant).Observe(float64(len(v)))
key := cacheKey{blockID.String(), cacheKeySeries(id), ""}.string()

if err := c.memcached.SetAsync(key, v, memcachedDefaultTTL); err != nil {
if err := c.memcached.SetAsync(key, v, c.ttl); err != nil {
level.Error(c.logger).Log("msg", "failed to cache series in memcached", "err", err)
}
}
Expand Down Expand Up @@ -215,5 +218,5 @@ func (c *RemoteIndexCache) FetchMultiSeries(ctx context.Context, blockID ulid.UL

// NewMemcachedIndexCache is alias NewRemoteIndexCache for compatible.
func NewMemcachedIndexCache(logger log.Logger, memcached cacheutil.RemoteCacheClient, reg prometheus.Registerer) (*RemoteIndexCache, error) {
return NewRemoteIndexCache(logger, memcached, nil, reg)
return NewRemoteIndexCache(logger, memcached, nil, reg, memcachedDefaultTTL)
}
6 changes: 3 additions & 3 deletions pkg/store/cache/memcached_test.go
Expand Up @@ -88,7 +88,7 @@ func TestMemcachedIndexCache_FetchMultiPostings(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the postings expected before running the test.
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestMemcachedIndexCache_FetchExpandedPostings(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the postings expected before running the test.
Expand Down Expand Up @@ -264,7 +264,7 @@ func TestMemcachedIndexCache_FetchMultiSeries(t *testing.T) {
for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
memcached := newMockedMemcachedClient(testData.mockedErr)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil)
c, err := NewRemoteIndexCache(log.NewNopLogger(), memcached, nil, nil, memcachedDefaultTTL)
testutil.Ok(t, err)

// Store the series expected before running the test.
Expand Down

0 comments on commit 57330d3

Please sign in to comment.