Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

in memory cache for caching bucket #3579

Merged
merged 2 commits into from Feb 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -24,6 +24,7 @@ We use _breaking :warning:_ to mark changes that are not backward compatible (re

### Added

- [#3579](https://github.com/thanos-io/thanos/pull/3579) Cache: Added inmemory cache for caching bucket.
- [#3469](https://github.com/thanos-io/thanos/pull/3469) StoreAPI: Added `hints` field to `LabelNamesRequest` and `LabelValuesRequest`. Hints in an opaque data structure that can be used to carry additional information from the store and its content is implementation specific.
- [#3421](https://github.com/thanos-io/thanos/pull/3421) Tools: Added `thanos tools bucket rewrite` command allowing to delete series from given block.
- [#3509](https://github.com/thanos-io/thanos/pull/3509) Store: Added touch series limit
Expand Down
6 changes: 4 additions & 2 deletions docs/components/store.md
Expand Up @@ -295,7 +295,7 @@ While the remaining settings are **optional**:

Thanos Store Gateway supports a "caching bucket" with [chunks](../design.md/#chunk) and metadata caching to speed up loading of [chunks](../design.md/#chunk) from TSDB blocks. To configure caching, one needs to use `--store.caching-bucket.config=<yaml content>` or `--store.caching-bucket.config-file=<file.yaml>`.

Currently only memcached "backend" is supported:
Both memcached and in-memory cache "backend"s are supported:

```yaml
type: MEMCACHED # Case-insensitive
Expand Down Expand Up @@ -331,7 +331,9 @@ Following options are used for metadata caching (meta.json files, deletion mark
- `metafile_content_ttl`: how long to cache content of meta.json and deletion mark files.
- `metafile_max_size`: maximum size of cached meta.json and deletion mark file. Larger files are not cached.

Note that [chunks](../design.md/#chunk) and metadata cache is an experimental feature, and these fields may be renamed or removed completely in the future.
The yml structure for setting the in memory cache configs for caching bucket are the same as the [in-memory index cache](https://thanos.io/tip/components/store.md/#in-memory-index-cache) and all the options to configure Caching Buket mentioned above can be used.

Note that chunks and metadata cache is an experimental feature, and these fields may be renamed or removed completely in the future.

## Index Header

Expand Down
5 changes: 5 additions & 0 deletions go.sum
Expand Up @@ -684,6 +684,7 @@ github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmK
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/flux v0.65.1/go.mod h1:J754/zds0vvpfwuq7Gc2wRdVwEodfpCFM7mYlOw2LqY=
github.com/influxdata/influxdb v1.8.3/go.mod h1:JugdFhsvvI8gadxOI6noqNeeBHvWNTbfYGtiAn+2jhI=
Expand Down Expand Up @@ -742,6 +743,7 @@ github.com/karrick/godirwalk v1.8.0/go.mod h1:H5KPZjojv4lE+QYImBI8xVtrBRgYrIVsaR
github.com/karrick/godirwalk v1.10.3/go.mod h1:RoGL9dQei4vP9ilrpETWE8CLOZ1kiN0LhBygSwrAsHA=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
Expand Down Expand Up @@ -960,6 +962,7 @@ github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0Mw
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE=
github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k=
github.com/pelletier/go-toml v1.4.0 h1:u3Z1r+oOXJIkxqw34zVhyPgjBsm6X2wn21NWs/HfSeg=
github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac=
Expand Down Expand Up @@ -1103,7 +1106,9 @@ github.com/soundcloud/go-runit v0.0.0-20150630195641-06ad41a06c4a/go.mod h1:LeFC
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.2.2 h1:5jhuqJyZCZf2JRofRvN/nIFgIWNzPa3/Vz8mYylgbWc=
github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v0.0.0-20170130214245-9ff6c6923cff/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
Expand Down
305 changes: 305 additions & 0 deletions pkg/cache/inmemory.go
@@ -0,0 +1,305 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"sync"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
lru "github.com/hashicorp/golang-lru/simplelru"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/thanos-io/thanos/pkg/model"
"gopkg.in/yaml.v2"
)

var (
DefaultInMemoryCacheConfig = InMemoryCacheConfig{
MaxSize: 250 * 1024 * 1024,
MaxItemSize: 125 * 1024 * 1024,
}
)

const (
maxInt = int(^uint(0) >> 1)
)

// InMemoryCacheConfig holds the in-memory cache config.
type InMemoryCacheConfig struct {
// MaxSize represents overall maximum number of bytes cache can contain.
MaxSize model.Bytes `yaml:"max_size"`
// MaxItemSize represents maximum size of single item.
MaxItemSize model.Bytes `yaml:"max_item_size"`
}

type InMemoryCache struct {
logger log.Logger
maxSizeBytes uint64
maxItemSizeBytes uint64

mtx sync.Mutex
curSize uint64
lru *lru.LRU
evicted prometheus.Counter
requests prometheus.Counter
hits prometheus.Counter
hitsExpired prometheus.Counter
// The input cache value would be copied to an inmemory array
// instead of simply using the one sent by the caller.
added prometheus.Counter
current prometheus.Gauge
currentSize prometheus.Gauge
totalCurrentSize prometheus.Gauge
overflow prometheus.Counter
}

type cacheDataWithTTLWrapper struct {
data []byte
// The objects that are over the TTL are not destroyed eagerly.
// When there is a hit for an item that is over the TTL, the object is removed from the cache
// and null is returned.
// There is ongoing effort to integrate TTL within the Hashicorp golang cache itself.
// This https://github.com/hashicorp/golang-lru/pull/41 can be used here once complete.
expiryTime time.Time
}

// parseInMemoryCacheConfig unmarshals a buffer into a InMemoryCacheConfig with default values.
func parseInMemoryCacheConfig(conf []byte) (InMemoryCacheConfig, error) {
config := DefaultInMemoryCacheConfig
if err := yaml.Unmarshal(conf, &config); err != nil {
return InMemoryCacheConfig{}, err
}

return config, nil
}

// NewInMemoryCache creates a new thread-safe LRU cache and ensures the total cache
// size approximately does not exceed maxBytes.
func NewInMemoryCache(name string, logger log.Logger, reg prometheus.Registerer, conf []byte) (*InMemoryCache, error) {
config, err := parseInMemoryCacheConfig(conf)
if err != nil {
return nil, err
}

return NewInMemoryCacheWithConfig(name, logger, reg, config)
}

// NewInMemoryCacheWithConfig creates a new thread-safe LRU cache and ensures the total cache
// size approximately does not exceed maxBytes.
func NewInMemoryCacheWithConfig(name string, logger log.Logger, reg prometheus.Registerer, config InMemoryCacheConfig) (*InMemoryCache, error) {
if config.MaxItemSize > config.MaxSize {
return nil, errors.Errorf("max item size (%v) cannot be bigger than overall cache size (%v)", config.MaxItemSize, config.MaxSize)
}

c := &InMemoryCache{
logger: logger,
maxSizeBytes: uint64(config.MaxSize),
maxItemSizeBytes: uint64(config.MaxItemSize),
}

c.evicted = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_items_evicted_total",
Help: "Total number of items that were evicted from the inmemory cache.",
Sudhar287 marked this conversation as resolved.
Show resolved Hide resolved
ConstLabels: prometheus.Labels{"name": name},
})

c.added = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_items_added_total",
Help: "Total number of items that were added to the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
})

c.requests = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_requests_total",
Help: "Total number of requests to the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
})

c.hitsExpired = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_hits_on_expired_data_total",
Help: "Total number of requests to the inmemory cache that were a hit but needed to be evicted due to TTL.",
ConstLabels: prometheus.Labels{"name": name},
})

c.overflow = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_items_overflowed_total",
Help: "Total number of items that could not be added to the inmemory cache due to being too big.",
ConstLabels: prometheus.Labels{"name": name},
})

c.hits = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "thanos_cache_inmemory_hits_total",
Help: "Total number of requests to the inmemory cache that were a hit.",
ConstLabels: prometheus.Labels{"name": name},
})

c.current = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_cache_inmemory_items",
Help: "Current number of items in the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
})

c.currentSize = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_cache_inmemory_items_size_bytes",
Help: "Current byte size of items in the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
})

c.totalCurrentSize = promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "thanos_cache_inmemory_total_size_bytes",
Help: "Current byte size of items (both value and key) in the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
})

_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_cache_inmemory_max_size_bytes",
Help: "Maximum number of bytes to be held in the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
}, func() float64 {
return float64(c.maxSizeBytes)
})
_ = promauto.With(reg).NewGaugeFunc(prometheus.GaugeOpts{
Name: "thanos_cache_inmemory_max_item_size_bytes",
Help: "Maximum number of bytes for single entry to be held in the inmemory cache.",
ConstLabels: prometheus.Labels{"name": name},
}, func() float64 {
return float64(c.maxItemSizeBytes)
})

// Initialize LRU cache with a high size limit since we will manage evictions ourselves
// based on stored size using `RemoveOldest` method.
l, err := lru.NewLRU(maxInt, c.onEvict)
if err != nil {
return nil, err
}
c.lru = l

level.Info(logger).Log(
"msg", "created in-memory inmemory cache",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"maxItems", "maxInt",
)
return c, nil
}

func (c *InMemoryCache) onEvict(key, val interface{}) {
keySize := uint64(len(key.(string)))
entrySize := uint64(len(val.(cacheDataWithTTLWrapper).data))

c.evicted.Inc()
c.current.Dec()
c.currentSize.Sub(float64(entrySize))
c.totalCurrentSize.Sub(float64(keySize + entrySize))

c.curSize -= entrySize
}

func (c *InMemoryCache) get(key string) ([]byte, bool) {
c.requests.Inc()
c.mtx.Lock()
defer c.mtx.Unlock()

v, ok := c.lru.Get(key)
if !ok {
return nil, false
}
// If the present time is greater than the TTL for the object from cache, the object will be
// removed from the cache and a nil will be returned
if time.Now().After(v.(cacheDataWithTTLWrapper).expiryTime) {
c.hitsExpired.Inc()
c.lru.Remove(key)
return nil, false
}
c.hits.Inc()
return v.(cacheDataWithTTLWrapper).data, true
}

func (c *InMemoryCache) set(key string, val []byte, ttl time.Duration) {
var size = uint64(len(val))
keySize := uint64(len(key))

c.mtx.Lock()
defer c.mtx.Unlock()

if _, ok := c.lru.Get(key); ok {
return
}

if !c.ensureFits(size) {
c.overflow.Inc()
return
}

// The caller may be passing in a sub-slice of a huge array. Copy the data
// to ensure we don't waste huge amounts of space for something small.
kakkoyun marked this conversation as resolved.
Show resolved Hide resolved
v := make([]byte, len(val))
copy(v, val)
c.lru.Add(key, cacheDataWithTTLWrapper{data: v, expiryTime: time.Now().Add(ttl)})

c.added.Inc()
c.currentSize.Add(float64(size))
c.totalCurrentSize.Add(float64(keySize + size))
c.current.Inc()
c.curSize += size
}

// ensureFits tries to make sure that the passed slice will fit into the LRU cache.
// Returns true if it will fit.
func (c *InMemoryCache) ensureFits(size uint64) bool {
if size > c.maxItemSizeBytes {
level.Debug(c.logger).Log(
"msg", "item bigger than maxItemSizeBytes. Ignoring..",
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"curSize", c.curSize,
"itemSize", size,
)
return false
}

for c.curSize+size > c.maxSizeBytes {
if _, _, ok := c.lru.RemoveOldest(); !ok {
level.Error(c.logger).Log(
"msg", "LRU has nothing more to evict, but we still cannot allocate the item. Resetting cache.",
Sudhar287 marked this conversation as resolved.
Show resolved Hide resolved
"maxItemSizeBytes", c.maxItemSizeBytes,
"maxSizeBytes", c.maxSizeBytes,
"curSize", c.curSize,
"itemSize", size,
)
c.reset()
}
}
return true
}

func (c *InMemoryCache) reset() {
c.lru.Purge()
c.current.Set(0)
c.currentSize.Set(0)
c.totalCurrentSize.Set(0)
c.curSize = 0
}

func (c *InMemoryCache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
for key, val := range data {
c.set(key, val, ttl)
}
}

// Fetch fetches multiple keys and returns a map containing cache hits
// In case of error, it logs and return an empty cache hits map.
func (c *InMemoryCache) Fetch(ctx context.Context, keys []string) map[string][]byte {
results := make(map[string][]byte)
for _, key := range keys {
if b, ok := c.get(key); ok {
results[key] = b
}
}
return results
}