Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: switch to Rueidis client for Redis #5593

Merged
merged 3 commits into from Dec 20, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -33,6 +33,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re
- [#5716](https://github.com/thanos-io/thanos/pull/5716) DNS: Fix miekgdns resolver LookupSRV to work with CNAME records.
- [#5846](https://github.com/thanos-io/thanos/pull/5846) Query Frontend: vertical query sharding supports subqueries.
- [#5909](https://github.com/thanos-io/thanos/pull/5909) Receive: compact tenant head after no appends have happened for 1.5 `tsdb.max-block-size`.
- [#5593](https://github.com/thanos-io/thanos/pull/5593) Cache: switch Redis client to [Rueidis](https://github.com/rueian/rueidis). Rueidis is [faster](https://github.com/rueian/rueidis#benchmark-comparison-with-go-redis-v9) and provides [client-side caching](https://redis.io/docs/manual/client-side-caching/). It is highly recommended to use it so that repeated requests for the same key would not be needed.

### Removed

Expand Down
1 change: 1 addition & 0 deletions docs/components/query-frontend.md
Expand Up @@ -132,6 +132,7 @@ config:
key_file: ""
server_name: ""
insecure_skip_verify: false
cache_size: 0
expiration: 24h0m0s
```

Expand Down
7 changes: 7 additions & 0 deletions docs/components/store.md
Expand Up @@ -342,6 +342,7 @@ config:
key_file: ""
server_name: ""
insecure_skip_verify: false
cache_size: 0
```

The **required** settings are:
Expand All @@ -356,6 +357,12 @@ While the remaining settings are **optional**:
- `dial_timeout`: the redis dial timeout.
- `read_timeout`: the redis read timeout.
- `write_timeout`: the redis write timeout.
- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s).

Here is an example of what effect client-side caching could have:

<img src="../img/rueidis-client-side.png" class="img-fluid" alt="Example of client-side in action - reduced network usage by a lot"/>

- `pool_size`: maximum number of socket connections.
- `min_idle_conns`: specifies the minimum number of idle connections which is useful when establishing new connection is slow.
- `idle_timeout`: amount of time after which client closes idle connections. Should be less than server's timeout.
Expand Down
Binary file added docs/img/rueidis-client-side.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -228,6 +228,7 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/rueian/rueidis v0.0.88
github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
Expand Down
4 changes: 3 additions & 1 deletion go.sum
Expand Up @@ -781,7 +781,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM=
Expand Down Expand Up @@ -891,6 +891,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE
github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY=
github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rueian/rueidis v0.0.88 h1:pezUOKGPDuGW4sAnlFumBBlQ18BE1a2fGh+U8kw4ouk=
github.com/rueian/rueidis v0.0.88/go.mod h1:LiKWMM/QnILwRfDZIhSIXi4vQqZ/UZy4+/aNkSCt8XA=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
Expand Down
148 changes: 70 additions & 78 deletions pkg/cacheutil/redis_client.go
Expand Up @@ -5,21 +5,23 @@ package cacheutil

import (
"context"
"fmt"
"sync"
"crypto/tls"
"net"
"strings"
"time"
"unsafe"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/rueian/rueidis"
"gopkg.in/yaml.v3"

"github.com/thanos-io/thanos/pkg/extprom"
"github.com/thanos-io/thanos/pkg/gate"
"github.com/thanos-io/thanos/pkg/model"
thanos_tls "github.com/thanos-io/thanos/pkg/tls"
)

Expand Down Expand Up @@ -117,19 +119,31 @@ type RedisClientConfig struct {

// TLSConfig to use to connect to the redis server.
TLSConfig TLSConfig `yaml:"tls_config"`

// If not zero then client-side caching is enabled.
// Client-side caching is when data is stored in memory
// instead of fetching data each time.
// See https://redis.io/docs/manual/client-side-caching/ for info.
CacheSize model.Bytes `yaml:"cache_size"`
}

func (c *RedisClientConfig) validate() error {
if c.Addr == "" {
return errors.New("no redis addr provided")
}

if c.TLSEnabled {
if (c.TLSConfig.CertFile != "") != (c.TLSConfig.KeyFile != "") {
return errors.New("both client key and certificate must be provided")
yeya24 marked this conversation as resolved.
Show resolved Hide resolved
}
}

return nil
}

// RedisClient is a wrap of redis.Client.
type RedisClient struct {
*redis.Client
client rueidis.Client

config RedisClientConfig

// getMultiGate used to enforce the max number of concurrent GetMulti() operations.
Expand Down Expand Up @@ -161,39 +175,43 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
return nil, err
}

opts := &redis.Options{
Addr: config.Addr,
Username: config.Username,
Password: config.Password,
DB: config.DB,
DialTimeout: config.DialTimeout,
ReadTimeout: config.ReadTimeout,
WriteTimeout: config.WriteTimeout,
MinIdleConns: config.MinIdleConns,
MaxConnAge: config.MaxConnAge,
IdleTimeout: config.IdleTimeout,
}

var tlsConfig *tls.Config
if config.TLSEnabled {
tlsConfig := config.TLSConfig
userTLSConfig := config.TLSConfig

tlsClientConfig, err := thanos_tls.NewClientConfig(logger, tlsConfig.CertFile, tlsConfig.KeyFile,
tlsConfig.CAFile, tlsConfig.ServerName, tlsConfig.InsecureSkipVerify)
tlsClientConfig, err := thanos_tls.NewClientConfig(logger, userTLSConfig.CertFile, userTLSConfig.KeyFile,
userTLSConfig.CAFile, userTLSConfig.ServerName, userTLSConfig.InsecureSkipVerify)

if err != nil {
return nil, err
}

opts.TLSConfig = tlsClientConfig
tlsConfig = tlsClientConfig
}

redisClient := redis.NewClient(opts)
if reg != nil {
reg = prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg)
clientSideCacheDisabled := false
if config.CacheSize == 0 {
clientSideCacheDisabled = true
}

client, err := rueidis.NewClient(rueidis.ClientOption{
InitAddress: strings.Split(config.Addr, ","),
ShuffleInit: true,
Username: config.Username,
Password: config.Password,
SelectDB: config.DB,
CacheSizeEachConn: int(config.CacheSize),
Dialer: net.Dialer{Timeout: config.DialTimeout},
ConnWriteTimeout: config.WriteTimeout,
DisableCache: clientSideCacheDisabled,
TLSConfig: tlsConfig,
})
if err != nil {
return nil, err
}

c := &RedisClient{
Client: redisClient,
client: client,
config: config,
logger: logger,
getMultiGate: gate.New(
Expand All @@ -219,9 +237,8 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient
// SetAsync implement RemoteCacheClient.
func (c *RedisClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error {
start := time.Now()
if _, err := c.Set(ctx, key, value, ttl).Result(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key,
"value_size", len(value))
if err := c.client.Do(ctx, c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why here we use Set with ExSeconds being added later instead of Setex, like in the SetMulti function? Would be cool to be consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, wouldn't it be wise to use DoCache here to populate the client side cache too? I don't see DoCache or DoMultiCache being used anymore and from what I understand from Rueidis' README those are functions that will also populate client side cache.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed there is MGetCache for getting through client side cache, but I fail to see where we write to the client side cache.

Copy link
Contributor

@rueian rueian Dec 9, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @douglascamata,

The population of client side cache is done by rueidis when you get successful responses from DoCache and DoMultiCache with supported commands, such as GET and MGET.

SET is not supported by DoCache by design, because the client must be tracked by redis server via cacheable requests to receive related invalidation notifications later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rueian gotcha, thanks for the clarification!

I thought it would be useful for us to populate the client side cache whenever we SET something, but it if the cost is another GET and by consequence of the design, might not be a good idea.

level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value))
return nil
}
c.durationSet.Observe(time.Since(start).Seconds())
Expand All @@ -234,29 +251,16 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl
return
}
start := time.Now()
keys := make([]string, 0, len(data))
for k := range data {
keys = append(keys, k)
sets := make(rueidis.Commands, 0, len(data))
ittl := int64(ttl.Seconds())
for k, v := range data {
sets = append(sets, c.client.B().Setex().Key(k).Seconds(ittl).Value(rueidis.BinaryString(v)).Build())
}
err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
_, err := c.Pipelined(ctx, func(p redis.Pipeliner) error {
for _, key := range currentKeys {
p.SetEX(ctx, key, data[key], ttl)
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis",
"err", err, "items", len(data))
return nil
for _, resp := range c.client.DoMulti(ctx, sets...) {
if err := resp.Error(); err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, "items", len(data))
return
}
return nil
})
if err != nil {
level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err,
"items", len(data))
return
}
c.durationSetMulti.Observe(time.Since(start).Seconds())
}
Expand All @@ -268,42 +272,30 @@ func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][]
}
start := time.Now()
results := make(map[string][]byte, len(keys))
var mu sync.Mutex
err := doWithBatch(ctx, len(keys), c.config.GetMultiBatchSize, c.getMultiGate, func(startIndex, endIndex int) error {
currentKeys := keys[startIndex:endIndex]
resp, err := c.MGet(ctx, currentKeys...).Result()
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resp))
return nil
}
mu.Lock()
defer mu.Unlock()
for i := 0; i < len(resp); i++ {
key := currentKeys[i]
switch val := resp[i].(type) {
case string:
results[key] = stringToBytes(val)
case nil: // miss
default:
level.Warn(c.logger).Log("msg",
fmt.Sprintf("unexpected redis mget result type:%T %v", resp[i], resp[i]))
}
}
return nil
})

if c.config.ReadTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, c.config.ReadTimeout)
defer cancel()
ctx = timeoutCtx
}

// NOTE(GiedriusS): TTL is the default one in case PTTL fails. 8 hours should be good enough IMHO.
resps, err := rueidis.MGetCache(c.client, ctx, 8*time.Hour, keys)
if err != nil {
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(keys))
return nil
level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resps))
}
for key, resp := range resps {
if val, err := resp.ToString(); err == nil {
results[key] = stringToBytes(val)
}
}
c.durationGetMulti.Observe(time.Since(start).Seconds())
return results
}

// Stop implement RemoteCacheClient.
func (c *RedisClient) Stop() {
if err := c.Close(); err != nil {
level.Error(c.logger).Log("msg", "redis close err")
}
c.client.Close()
}

// stringToBytes converts string to byte slice (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go).
Expand Down
11 changes: 2 additions & 9 deletions pkg/cacheutil/redis_client_test.go
Expand Up @@ -203,17 +203,10 @@ func TestValidateRedisConfig(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
cfg := tt.config()

logger := log.NewLogfmtLogger(os.Stderr)
reg := prometheus.NewRegistry()
val, err := NewRedisClientWithConfig(logger, tt.name, cfg, reg)
if val != nil {
defer val.Stop()
}

if tt.expect_err {
testutil.NotOk(t, err, val)
testutil.NotOk(t, cfg.validate())
} else {
testutil.Ok(t, err, val)
testutil.Ok(t, cfg.validate())
}
})
}
Expand Down
11 changes: 11 additions & 0 deletions test/e2e/e2ethanos/services.go
Expand Up @@ -1213,3 +1213,14 @@ rule_files:

return config
}

func NewRedis(e e2e.Environment, name string) e2e.Runnable {
return e.Runnable(fmt.Sprintf("redis-%s", name)).WithPorts(map[string]int{"redis": 6379}).Init(
e2e.StartOptions{
Image: "docker.io/redis:7.0.4-alpine",
Command: e2e.NewCommand("redis-server", "*:6379"),
User: strconv.Itoa(os.Getuid()),
WaitReadyBackoff: &defaultBackoffConfig,
},
)
}
24 changes: 24 additions & 0 deletions test/e2e/store_gateway_test.go
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/thanos-io/thanos/pkg/block"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/cacheutil"
"github.com/thanos-io/thanos/pkg/promclient"
"github.com/thanos-io/thanos/pkg/runutil"
"github.com/thanos-io/thanos/pkg/testutil"
Expand Down Expand Up @@ -640,3 +641,26 @@ func TestStoreGatewayBytesLimit(t *testing.T) {
}))
})
}

func TestRedisClient_Rueidis(t *testing.T) {
t.Parallel()

e, err := e2e.NewDockerEnvironment("redis-client")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

r := e2ethanos.NewRedis(e, "redis")
testutil.Ok(t, r.Start())

redisClient, err := cacheutil.NewRedisClientWithConfig(log.NewLogfmtLogger(os.Stderr), "redis", cacheutil.RedisClientConfig{
Addr: r.Endpoint("redis"),
}, nil)
testutil.Ok(t, err)

err = redisClient.SetAsync(context.TODO(), "foo", []byte(`bar`), 1*time.Minute)
testutil.Ok(t, err)

returnedVals := redisClient.GetMulti(context.TODO(), []string{"foo"})
testutil.Equals(t, 1, len(returnedVals))
testutil.Equals(t, []byte("bar"), returnedVals["foo"])
}