Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

store: Add Groupcache as a cache backend #4818

Merged
merged 26 commits into from Jan 6, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
0ae3021
cache: add groupcache
GiedriusS Jun 18, 2021
a99bd9e
implement Cache interface on groupcache
akanshat Oct 23, 2021
2bf5358
move BucketCacheKey to a new package
akanshat Oct 30, 2021
87e8f64
add copyright to new pkg cachekey
akanshat Oct 30, 2021
d139fef
add a fix to return partial results if Get fails
akanshat Nov 6, 2021
0b7ca17
migrate from groupcache to galaxycache
akanshat Nov 10, 2021
6bca1ff
instrument metrics for galaxyCache
akanshat Nov 14, 2021
8f8c261
add e2e test for store with groupcache
akanshat Nov 14, 2021
8621c68
fix collector interface on CacheStatsCollector
akanshat Nov 17, 2021
a6f1945
cache: fix / clean up tests
GiedriusS Nov 17, 2021
63b212b
groupcache: support IterVerb
GiedriusS Nov 17, 2021
992e2e1
groupcache: changes according to comments
GiedriusS Nov 17, 2021
2bf5594
Merge branch 'main' into groupcache_caching_bucket
akanshat Dec 8, 2021
3c12c2e
store: fix groupcache test
akanshat Dec 8, 2021
088b9d3
cache: add TTL support
GiedriusS Nov 22, 2021
b15440e
move caching_bucket_config to pkg cache
akanshat Dec 22, 2021
c7c1460
modify groupcacheCfg
akanshat Jan 3, 2022
57b1ab5
remove duplicate CachingBucketConfig
akanshat Jan 4, 2022
13eaa61
remove duplicate calls to NewCachingBucketConfig
akanshat Jan 4, 2022
1e59562
refactor cache configuration
akanshat Jan 4, 2022
5f2f7cf
implement suggestions from comments
akanshat Jan 6, 2022
d0ef6f2
Merge pull request #2 from akanshat/add_ttl
GiedriusS Jan 6, 2022
a8ba37f
Merge pull request #1 from GiedriusS/add_ttl
akanshat Jan 6, 2022
e6c3d73
Merge remote-tracking branch 'origin/main' into groupcache_caching_bu…
akanshat Jan 6, 2022
d61e97c
*: formatting changes
GiedriusS Jan 6, 2022
16ac6fd
*: linter fixes
GiedriusS Jan 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 4 additions & 2 deletions cmd/thanos/store.go
Expand Up @@ -252,8 +252,11 @@ func runStore(
if err != nil {
return errors.Wrap(err, "get caching bucket configuration")
}

r := route.New()

if len(cachingBucketConfigYaml) > 0 {
bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg)
bkt, err = storecache.NewCachingBucketFromYaml(cachingBucketConfigYaml, bkt, logger, reg, r)
if err != nil {
return errors.Wrap(err, "create caching bucket")
}
Expand Down Expand Up @@ -407,7 +410,6 @@ func runStore(
}
// Add bucket UI for loaded blocks.
{
r := route.New()
ins := extpromhttp.NewInstrumentationMiddleware(reg, nil)

compactorView := ui.NewBucketUI(logger, "", conf.webConfig.externalPrefix, conf.webConfig.prefixHeaderName, "/loaded", conf.component)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -43,6 +43,7 @@ require (
github.com/leanovate/gopter v0.2.4
github.com/lightstep/lightstep-tracer-go v0.18.1
github.com/lovoo/gcloud-opentracing v0.3.0
github.com/mailgun/groupcache/v2 v2.2.1
github.com/miekg/dns v1.1.43
github.com/minio/minio-go/v7 v7.0.10
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Expand Up @@ -1166,6 +1166,8 @@ github.com/lufia/iostat v1.1.0/go.mod h1:rEPNA0xXgjHQjuI5Cy05sLlS2oRcSlWHRLrvh/A
github.com/lyft/protoc-gen-star v0.5.1/go.mod h1:9toiA3cC7z5uVbODF7kEQ91Xn7XNFkVUl+SrEe+ZORU=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailgun/groupcache/v2 v2.2.1 h1:OalhvLqdhiHd76CU8gn+w6UqeEa1m60ChrVwwKkmqh4=
github.com/mailgun/groupcache/v2 v2.2.1/go.mod h1:fgFJNRQar4yVloM0SzqWhOuTF83HCO5DDXVnZQVVJ58=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
Expand Down Expand Up @@ -1535,6 +1537,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/sercand/kuberesolver v2.4.0+incompatible h1:WE2OlRf6wjLxHwNkkFLQGaZcVLEXjMjBPjjEU5vksH8=
github.com/sercand/kuberesolver v2.4.0+incompatible/go.mod h1:lWF3GL0xptCB/vCiJPl/ZshwPsX/n4Y7u0CW9E7aQIQ=
Expand Down Expand Up @@ -2154,6 +2158,7 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
222 changes: 222 additions & 0 deletions pkg/cache/groupcache.go
@@ -0,0 +1,222 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cache

import (
"context"
"encoding/json"
"io/ioutil"
"strconv"
"time"

"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/mailgun/groupcache/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/route"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/model"
"github.com/thanos-io/thanos/pkg/objstore"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/store/cache/cachekey"
"gopkg.in/yaml.v2"
)

type Groupcache struct {
dns *dns.Provider
Pool *groupcache.HTTPPool
group *groupcache.Group
logger log.Logger
}

// GroupcacheConfig holds the in-memory cache config.
type GroupcacheConfig struct {
// Addresses of statically configured peers (repeatable). The scheme may be prefixed with 'dns+' or 'dnssrv+' to detect store API servers through respective DNS lookups.
// Typically, you'd want something like `dns+http://thanos-store:42/`.
Peers []string `yaml:"peers"`

// Address of ourselves in the peer list. This needs to be set to `http://external-ip:HTTP_PORT`
// of the current instance.
SelfURL string `yaml:"self_url"`

// Maximum size of the hot in-memory cache.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved
MaxSize model.Bytes `yaml:"max_size"`
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

// Group's name. All of the instances need to be using the same group and point to the same bucket.
GroupcacheGroup string `yaml:"groupcache_group"`

// DNS SD resolver to use.
DNSSDResolver dns.ResolverType `yaml:"dns_sd_resolver"`

// How often we should resolve the addresses.
DNSInterval time.Duration `yaml:"dns_interval"`
}

var (
DefaultGroupcacheConfig = GroupcacheConfig{
MaxSize: 250 * 1024 * 1024,
DNSSDResolver: dns.GolangResolverType,
DNSInterval: 1 * time.Minute,
}
)

// parseGroupcacheConfig unmarshals a buffer into a GroupcacheConfig with default values.
func parseGroupcacheConfig(conf []byte) (GroupcacheConfig, error) {
config := DefaultGroupcacheConfig
if err := yaml.Unmarshal(conf, &config); err != nil {
return GroupcacheConfig{}, err
}

if len(config.Peers) == 0 {
config.Peers = append(config.Peers, config.SelfURL)
}

return config, nil
}

// NewGroupcache creates a new Groupcache instance.
func NewGroupcache(name string, logger log.Logger, reg prometheus.Registerer, conf []byte, groupname, basepath string, r *route.Router, bucket objstore.Bucket) (*Groupcache, error) {
config, err := parseGroupcacheConfig(conf)
if err != nil {
return nil, err
}

return NewGroupcacheWithConfig(name, logger, reg, config, groupname, basepath, r, bucket)
}

// NewGroupcacheWithConfig creates a new Groupcache instance with the given config.
func NewGroupcacheWithConfig(name string, logger log.Logger, reg prometheus.Registerer, conf GroupcacheConfig, groupname, basepath string, r *route.Router, bucket objstore.Bucket) (*Groupcache, error) {
dnsGroupcacheProvider := dns.NewProvider(
logger,
extprom.WrapRegistererWithPrefix("thanos_store_groupcache_", reg),
dns.ResolverType(conf.DNSSDResolver),
)

pool := groupcache.NewHTTPPoolOpts(conf.SelfURL, &groupcache.HTTPPoolOptions{
BasePath: basepath,
})

ticker := time.NewTicker(conf.DNSInterval)

go func() {
for {
if err := dnsGroupcacheProvider.Resolve(context.Background(), conf.Peers); err != nil {
level.Error(logger).Log("msg", "failed to resolve addresses for groupcache", "err", err)
} else {
pool.Set(dnsGroupcacheProvider.Addresses()...)
}

<-ticker.C
}
}()

r.Get(basepath, pool.ServeHTTP)

group := groupcache.NewGroup(conf.GroupcacheGroup, int64(conf.MaxSize), groupcache.GetterFunc(
func(ctx context.Context, id string, dest groupcache.Sink) error {
parsedData, err := cachekey.ParseBucketCacheKey(id)
if err != nil {
return err
}

switch parsedData.Verb {
case cachekey.AttributesVerb:
attrs, err := bucket.Attributes(ctx, parsedData.Name)
if err != nil {
return err
}

finalAttrs, err := json.Marshal(attrs)
if err != nil {
return err
}
err = dest.SetString(string(finalAttrs), time.Now().Add(5*time.Minute))
if err != nil {
return err
}
case cachekey.IterVerb:
// Not supported.
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

return nil
case cachekey.ContentVerb:
rc, err := bucket.Get(ctx, parsedData.Name)
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, rc, "closing get")

b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}

err = dest.SetBytes(b, time.Now().Add(5*time.Minute))
if err != nil {
return err
}
case cachekey.ExistsVerb:
exists, err := bucket.Exists(ctx, parsedData.Name)
if err != nil {
return err
}

err = dest.SetString(strconv.FormatBool(exists), time.Now().Add(5*time.Minute))
if err != nil {
return err
}
case cachekey.SubrangeVerb:
rc, err := bucket.GetRange(ctx, parsedData.Name, parsedData.Start, parsedData.End-parsedData.Start)
if err != nil {
return err
}
defer runutil.CloseWithLogOnErr(logger, rc, "closing get_range")

b, err := ioutil.ReadAll(rc)
if err != nil {
return err
}

err = dest.SetBytes(b, time.Now().Add(5*time.Minute))
if err != nil {
return err
}
}

return nil
},
))

return &Groupcache{
dns: dnsGroupcacheProvider,
Pool: pool,
group: group,
logger: logger,
}, nil
}

func (c *Groupcache) Store(ctx context.Context, data map[string][]byte, ttl time.Duration) {
// Noop since cache is already filled during fetching.
}

func (c *Groupcache) Fetch(ctx context.Context, keys []string) map[string][]byte {
data := map[string][]byte{}
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

for _, k := range keys {
var keyData []byte

if err := c.group.Get(ctx, k, groupcache.AllocatingByteSliceSink(&keyData)); err != nil {
level.Error(c.logger).Log("msg", "failed fetching data from groupcache", "err", err, "key", k)
return nil
onprem marked this conversation as resolved.
Show resolved Hide resolved
}

data[k] = keyData
}

return data
}

func (c *Groupcache) Name() string {
return c.group.Name()
}
100 changes: 100 additions & 0 deletions pkg/store/cache/cachekey/cachekey.go
@@ -0,0 +1,100 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.

package cachekey

import (
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
)

var (
ErrInvalidBucketCacheKeyFormat = errors.New("key has invalid format")
ErrInvalidBucketCacheKeyVerb = errors.New("key has invalid verb")
ErrParseKeyInt = errors.New("failed to parse integer in key")
)

// VerbType is the type of operation whose result has been stored in the caching bucket's cache.
type VerbType string

const (
ExistsVerb VerbType = "exists"
ContentVerb VerbType = "content"
IterVerb VerbType = "iter"
AttributesVerb VerbType = "attrs"
SubrangeVerb VerbType = "subrange"
)

type BucketCacheKey struct {
Verb VerbType
Name string
Start int64
End int64
}

// String returns the string representation of BucketCacheKey.
func (ck BucketCacheKey) String() string {
if ck.Start == 0 && ck.End == 0 {
return fmt.Sprintf("%s:%s", ck.Verb, ck.Name)
}

return fmt.Sprintf("%s:%s:%d:%d", ck.Verb, ck.Name, ck.Start, ck.End)
}

// IsValidVerb checks if the VerbType matches the predefined verbs.
func IsValidVerb(v VerbType) bool {
switch v {
case
ExistsVerb,
ContentVerb,
IterVerb,
AttributesVerb,
SubrangeVerb:
return true
}
return false
}

// ParseBucketCacheKey parses a string and returns BucketCacheKey.
func ParseBucketCacheKey(key string) (BucketCacheKey, error) {
ck := BucketCacheKey{}
slice := strings.Split(key, ":")
if len(slice) < 2 {
return ck, ErrInvalidBucketCacheKeyFormat
}

verb := VerbType(slice[0])
if !IsValidVerb(verb) {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyVerb
}

if verb == SubrangeVerb {
if len(slice) != 4 {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyFormat
}

start, err := strconv.ParseInt(slice[2], 10, 64)
if err != nil {
return BucketCacheKey{}, ErrParseKeyInt
}

end, err := strconv.ParseInt(slice[3], 10, 64)
if err != nil {
return BucketCacheKey{}, ErrParseKeyInt
}

ck.Start = start
ck.End = end
} else {
if len(slice) != 2 {
return BucketCacheKey{}, ErrInvalidBucketCacheKeyFormat
}
}

ck.Verb = verb
ck.Name = slice[1]
return ck, nil
}