Skip to content

Commit

Permalink
cacheutil: replace go-redis with rueidis client
Browse files Browse the repository at this point in the history
Switch client to rueidis because it is faster and supports client-side
caching. Tested it in production for a few months.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Dec 8, 2022
1 parent e6f0b6e commit 1ee7262
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 78 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5716](https://github.com/thanos-io/thanos/pull/5716) DNS: Fix miekgdns resolver LookupSRV to work with CNAME records.
- [#5846](https://github.com/thanos-io/thanos/pull/5846) Query Frontend: vertical query sharding supports subqueries.
- [#5909](https://github.com/thanos-io/thanos/pull/5909) Receive: compact tenant head after no appends have happened for 1.5 `tsdb.max-block-size`.
- [#5593](https://github.com/thanos-io/thanos/pull/5593) Cache: switch Redis client to [Rueidis](https://github.com/rueian/rueidis). Rueidis is [https://github.com/rueian/rueidis#benchmark-comparison-with-go-redis-v9](faster) and provides [client-side caching](https://redis.io/docs/manual/client-side-caching/). It is highly recommended to use it so that repeated requests for the same key would not be needed.

### Removed

Expand Down
2 changes: 2 additions & 0 deletions docs/components/store.md
Expand Up @@ -324,6 +324,7 @@ config:
username: ""
password: ""
db: 0
cache_size: 0
dial_timeout: 5s
read_timeout: 3s
write_timeout: 3s
Expand Down Expand Up @@ -356,6 +357,7 @@ While the remaining settings are **optional**:
- `dial_timeout`: the redis dial timeout.
- `read_timeout`: the redis read timeout.
- `write_timeout`: the redis write timeout.
- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s).
- `pool_size`: maximum number of socket connections.
- `min_idle_conns`: specifies the minimum number of idle connections which is useful when establishing new connection is slow.
- `idle_timeout`: amount of time after which client closes idle connections. Should be less than server's timeout.
Expand Down
Binary file added docs/img/rueidis-client-side.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -228,6 +228,7 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/rueian/rueidis v0.0.88
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
Expand Down
4 changes: 3 additions & 1 deletion go.sum
Expand Up @@ -781,7 +781,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
Expand Down Expand Up @@ -891,6 +891,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rueian/rueidis v0.0.88 h1:pezUOKGPDuGW4sAnlFumBBlQ18BE1a2fGh+U8kw4ouk=
github.com/rueian/rueidis v0.0.88/go.mod h1:LiKWMM/QnILwRfDZIhSIXi4vQqZ/UZy4+/aNkSCt8XA=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down
141 changes: 64 additions & 77 deletions pkg/cacheutil/redis_client.go
Expand Up @@ -5,21 +5,23 @@ package cacheutil

import (
"context"
"fmt"
"sync"
"crypto/tls"
"net"
"strings"
"time"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rueian/rueidis"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
thanos_tls "github.com/thanos-io/thanos/pkg/tls"
)

Expand Down Expand Up @@ -117,6 +119,12 @@ type RedisClientConfig struct {

// TLSConfig to use to connect to the redis server.
TLSConfig TLSConfig `yaml:"tls_config"`

// If not zero then client-side caching is enabled.
// Client-side caching is when data is stored in memory
// instead of fetching data each time.
// See https://redis.io/docs/manual/client-side-caching/ for info.
CacheSize model.Bytes `yaml:"cache_size"`
}

func (c *RedisClientConfig) validate() error {
Expand All @@ -129,7 +137,8 @@ func (c *RedisClientConfig) validate() error {

// RedisClient is a wrap of redis.Client.
type RedisClient struct {
*redis.Client
client rueidis.Client

config RedisClientConfig

// getMultiGate used to enforce the max number of concurrent GetMulti() operations.
Expand Down Expand Up @@ -161,39 +170,43 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
return nil, err
}

opts := &redis.Options{
Addr: config.Addr,
Username: config.Username,
Password: config.Password,
DB: config.DB,
DialTimeout: config.DialTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MinIdleConns: config.MinIdleConns,
MaxConnAge: config.MaxConnAge,
IdleTimeout: config.IdleTimeout,
}

var tlsConfig *tls.Config
if config.TLSEnabled {
tlsConfig := config.TLSConfig
userTLSConfig := config.TLSConfig

tlsClientConfig, err := thanos_tls.NewClientConfig(logger, tlsConfig.CertFile, tlsConfig.KeyFile,
tlsConfig.CAFile, tlsConfig.ServerName, tlsConfig.InsecureSkipVerify)
tlsClientConfig, err := thanos_tls.NewClientConfig(logger, userTLSConfig.CertFile, userTLSConfig.KeyFile,
userTLSConfig.CAFile, userTLSConfig.ServerName, userTLSConfig.InsecureSkipVerify)

if err != nil {
return nil, err
}

opts.TLSConfig = tlsClientConfig
tlsConfig = tlsClientConfig
}

redisClient := redis.NewClient(opts)
if reg != nil {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg)
var disableCaching = false
if config.CacheSize == 0 {
disableCaching = true
}

client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: strings.Split(config.Addr, ","),
ShuffleInit: true,
Username: config.Username,
Password: config.Password,
SelectDB: config.DB,
CacheSizeEachConn: int(config.CacheSize),
Dialer: net.Dialer{Timeout: config.DialTimeout},
ConnWriteTimeout: config.WriteTimeout,
DisableCache: disableCaching,
TLSConfig: tlsConfig,
})
if err != nil {
return nil, err
}

c := &RedisClient{
Client: redisClient,
client: client,
config: config,
logger: logger,
getMultiGate: gate.New(
Expand All @@ -219,9 +232,8 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error {
start := time.Now()
if _, err := c.Set(ctx, key, value, ttl).Result(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key,
"value_size", len(value))
if err := c.client.Do(ctx, c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
return nil
}
c.durationSet.Observe(time.Since(start).Seconds())
Expand All @@ -234,29 +246,16 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl
return
}
start := time.Now()
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
sets := make(rueidis.Commands, 0, len(data))
ittl := int64(ttl.Seconds())
for k, v := range data {
sets = append(sets, c.client.B().Setex().Key(k).Seconds(ittl).Value(rueidis.BinaryString(v)).Build())
}
err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
_, err := c.Pipelined(ctx, func(p redis.Pipeliner) error {
for _, key := range currentKeys {
p.SetEX(ctx, key, data[key], ttl)
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis",
"err", err, "items", len(data))
return nil
for _, resp := range c.client.DoMulti(ctx, sets...) {
if err := resp.Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, "items", len(data))
return
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err,
"items", len(data))
return
}
c.durationSetMulti.Observe(time.Since(start).Seconds())
}
Expand All @@ -268,42 +267,30 @@ func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][]
}
start := time.Now()
results := make(map[string][]byte, len(keys))
var mu sync.Mutex
err := doWithBatch(ctx, len(keys), c.config.GetMultiBatchSize, c.getMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
resp, err := c.MGet(ctx, currentKeys...).Result()
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resp))
return nil
}
mu.Lock()
defer mu.Unlock()
for i := 0; i < len(resp); i++ {
key := currentKeys[i]
switch val := resp[i].(type) {
case string:
results[key] = stringToBytes(val)
case nil: // miss
default:
level.Warn(c.logger).Log("msg",
fmt.Sprintf("unexpected redis mget result type:%T %v", resp[i], resp[i]))
}
}
return nil
})

if c.config.ReadTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, c.config.ReadTimeout)
defer cancel()
ctx = timeoutCtx
}

// NOTE(GiedriusS): TTL is the default one in case PTTL fails. 8 hours should be good enough IMHO.
resps, err := rueidis.MGetCache(c.client, ctx, 8*time.Hour, keys)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(keys))
return nil
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resps))
}
for key, resp := range resps {
if val, err := resp.ToString(); err == nil {
results[key] = stringToBytes(val)
}
}
c.durationGetMulti.Observe(time.Since(start).Seconds())
return results
}

// Stop implement RemoteCacheClient.
func (c *RedisClient) Stop() {
if err := c.Close(); err != nil {
level.Error(c.logger).Log("msg", "redis close err")
}
c.client.Close()
}

// stringToBytes converts string to byte slice (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go).
Expand Down
11 changes: 11 additions & 0 deletions test/e2e/e2ethanos/services.go
Expand Up @@ -1213,3 +1213,14 @@ rule_files:

return config
}

func NewRedis(e e2e.Environment, name string) e2e.Runnable {
return e.Runnable(fmt.Sprintf("redis-%s", name)).WithPorts(map[string]int{"redis": 6379}).Init(
e2e.StartOptions{
Image: "docker.io/redis:7.0.4-alpine",
Command: e2e.NewCommand("redis-server", "*:6379"),
User: strconv.Itoa(os.Getuid()),
WaitReadyBackoff: &defaultBackoffConfig,
},
)
}
24 changes: 24 additions & 0 deletions test/e2e/store_gateway_test.go
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -640,3 +641,26 @@ func TestStoreGatewayBytesLimit(t *testing.T) {
}))
})
}

func TestRedisClient_Rueidis(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("redis-client")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

r := e2ethanos.NewRedis(e, "redis")
testutil.Ok(t, r.Start())

redisClient, err := cacheutil.NewRedisClientWithConfig(log.NewLogfmtLogger(os.Stderr), "redis", cacheutil.RedisClientConfig{
Addr: r.Endpoint("redis"),
}, nil)
testutil.Ok(t, err)

err = redisClient.SetAsync(context.TODO(), "foo", []byte(`bar`), 1*time.Minute)
testutil.Ok(t, err)

returnedVals := redisClient.GetMulti(context.TODO(), []string{"foo"})
testutil.Equals(t, 1, len(returnedVals))
testutil.Equals(t, []byte("bar"), returnedVals["foo"])
}

0 comments on commit 1ee7262

Please sign in to comment.