diff --git a/CHANGELOG.md b/CHANGELOG.md index f70fb4303a..9c954efa65 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,7 @@ We use *breaking :warning:* to mark changes that are not backward compatible (re - [#5716](https://github.com/thanos-io/thanos/pull/5716) DNS: Fix miekgdns resolver LookupSRV to work with CNAME records. - [#5846](https://github.com/thanos-io/thanos/pull/5846) Query Frontend: vertical query sharding supports subqueries. - [#5909](https://github.com/thanos-io/thanos/pull/5909) Receive: compact tenant head after no appends have happened for 1.5 `tsdb.max-block-size`. +- [#5593](https://github.com/thanos-io/thanos/pull/5593) Cache: switch Redis client to [Rueidis](https://github.com/rueian/rueidis). Rueidis is [faster](https://github.com/rueian/rueidis#benchmark-comparison-with-go-redis-v9) and provides [client-side caching](https://redis.io/docs/manual/client-side-caching/). It is highly recommended to use it so that repeated requests for the same key would not be needed. - [#5896](https://github.com/thanos-io/thanos/pull/5896) *: Upgrade Prometheus to v0.40.7 without implementing native histogram support. *Querying native histograms will fail with `Error executing query: invalid chunk encoding ""` and native histograms in write requests are ignored.* ### Removed diff --git a/docs/components/query-frontend.md b/docs/components/query-frontend.md index b79bbcc5ff..4afe732063 100644 --- a/docs/components/query-frontend.md +++ b/docs/components/query-frontend.md @@ -132,6 +132,7 @@ config: key_file: "" server_name: "" insecure_skip_verify: false + cache_size: 0 expiration: 24h0m0s ``` diff --git a/docs/components/store.md b/docs/components/store.md index bfadcbeeba..19c94af2ef 100644 --- a/docs/components/store.md +++ b/docs/components/store.md @@ -342,6 +342,7 @@ config: key_file: "" server_name: "" insecure_skip_verify: false + cache_size: 0 ``` The **required** settings are: @@ -356,6 +357,12 @@ While the remaining settings are **optional**: - `dial_timeout`: the redis dial timeout. - `read_timeout`: the redis read timeout. - `write_timeout`: the redis write timeout. +- `cache_size` size of the in-memory cache used for client-side caching. Client-side caching is enabled when this value is not zero. See [official documentation](https://redis.io/docs/manual/client-side-caching/) for more. It is highly recommended to enable this so that Thanos Store would not need to continuously retrieve data from Redis for repeated requests of the same key(-s). + +Here is an example of what effect client-side caching could have: + +Example of client-side in action - reduced network usage by a lot + - `pool_size`: maximum number of socket connections. - `min_idle_conns`: specifies the minimum number of idle connections which is useful when establishing new connection is slow. - `idle_timeout`: amount of time after which client closes idle connections. Should be less than server's timeout. diff --git a/docs/img/rueidis-client-side.png b/docs/img/rueidis-client-side.png new file mode 100644 index 0000000000..f28b54d221 Binary files /dev/null and b/docs/img/rueidis-client-side.png differ diff --git a/go.mod b/go.mod index 3aa05529f0..d7c22ef1ef 100644 --- a/go.mod +++ b/go.mod @@ -228,6 +228,7 @@ require ( github.com/prometheus/procfs v0.8.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect github.com/rs/xid v1.4.0 // indirect + github.com/rueian/rueidis v0.0.88 github.com/santhosh-tekuri/jsonschema v1.2.4 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/sercand/kuberesolver v2.4.0+incompatible // indirect diff --git a/go.sum b/go.sum index cd3c50f5e6..a5b8ca3f21 100644 --- a/go.sum +++ b/go.sum @@ -786,7 +786,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.18.1 h1:M1GfJqGRrBrrGGsbxzV5dqM2U2ApXefZCQpkukxYRLE= +github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw= github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/image-spec v1.0.2 h1:9yCKha/T5XdGtO0q9Q9a6T5NUCsTn/DrBg0D7ufOcFM= @@ -899,6 +899,8 @@ github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTE github.com/rs/cors v1.8.2/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/rs/xid v1.4.0 h1:qd7wPTDkN6KQx2VmMBLrpHkiyQwgFXRnkOLacUiaSNY= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rueian/rueidis v0.0.88 h1:pezUOKGPDuGW4sAnlFumBBlQ18BE1a2fGh+U8kw4ouk= +github.com/rueian/rueidis v0.0.88/go.mod h1:LiKWMM/QnILwRfDZIhSIXi4vQqZ/UZy4+/aNkSCt8XA= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= diff --git a/pkg/cacheutil/redis_client.go b/pkg/cacheutil/redis_client.go index cb5a7ee2aa..ba1b5eee2d 100644 --- a/pkg/cacheutil/redis_client.go +++ b/pkg/cacheutil/redis_client.go @@ -5,21 +5,23 @@ package cacheutil import ( "context" - "fmt" - "sync" + "crypto/tls" + "net" + "strings" "time" "unsafe" "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/go-redis/redis/v8" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/rueian/rueidis" "gopkg.in/yaml.v3" "github.com/thanos-io/thanos/pkg/extprom" "github.com/thanos-io/thanos/pkg/gate" + "github.com/thanos-io/thanos/pkg/model" thanos_tls "github.com/thanos-io/thanos/pkg/tls" ) @@ -117,6 +119,12 @@ type RedisClientConfig struct { // TLSConfig to use to connect to the redis server. TLSConfig TLSConfig `yaml:"tls_config"` + + // If not zero then client-side caching is enabled. + // Client-side caching is when data is stored in memory + // instead of fetching data each time. + // See https://redis.io/docs/manual/client-side-caching/ for info. + CacheSize model.Bytes `yaml:"cache_size"` } func (c *RedisClientConfig) validate() error { @@ -124,12 +132,18 @@ func (c *RedisClientConfig) validate() error { return errors.New("no redis addr provided") } + if c.TLSEnabled { + if (c.TLSConfig.CertFile != "") != (c.TLSConfig.KeyFile != "") { + return errors.New("both client key and certificate must be provided") + } + } + return nil } -// RedisClient is a wrap of redis.Client. type RedisClient struct { - *redis.Client + client rueidis.Client + config RedisClientConfig // getMultiGate used to enforce the max number of concurrent GetMulti() operations. @@ -161,39 +175,43 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient return nil, err } - opts := &redis.Options{ - Addr: config.Addr, - Username: config.Username, - Password: config.Password, - DB: config.DB, - DialTimeout: config.DialTimeout, - ReadTimeout: config.ReadTimeout, - WriteTimeout: config.WriteTimeout, - MinIdleConns: config.MinIdleConns, - MaxConnAge: config.MaxConnAge, - IdleTimeout: config.IdleTimeout, - } - + var tlsConfig *tls.Config if config.TLSEnabled { - tlsConfig := config.TLSConfig + userTLSConfig := config.TLSConfig - tlsClientConfig, err := thanos_tls.NewClientConfig(logger, tlsConfig.CertFile, tlsConfig.KeyFile, - tlsConfig.CAFile, tlsConfig.ServerName, tlsConfig.InsecureSkipVerify) + tlsClientConfig, err := thanos_tls.NewClientConfig(logger, userTLSConfig.CertFile, userTLSConfig.KeyFile, + userTLSConfig.CAFile, userTLSConfig.ServerName, userTLSConfig.InsecureSkipVerify) if err != nil { return nil, err } - opts.TLSConfig = tlsClientConfig + tlsConfig = tlsClientConfig } - redisClient := redis.NewClient(opts) - if reg != nil { - reg = prometheus.WrapRegistererWith(prometheus.Labels{"name": name}, reg) + clientSideCacheDisabled := false + if config.CacheSize == 0 { + clientSideCacheDisabled = true + } + + client, err := rueidis.NewClient(rueidis.ClientOption{ + InitAddress: strings.Split(config.Addr, ","), + ShuffleInit: true, + Username: config.Username, + Password: config.Password, + SelectDB: config.DB, + CacheSizeEachConn: int(config.CacheSize), + Dialer: net.Dialer{Timeout: config.DialTimeout}, + ConnWriteTimeout: config.WriteTimeout, + DisableCache: clientSideCacheDisabled, + TLSConfig: tlsConfig, + }) + if err != nil { + return nil, err } c := &RedisClient{ - Client: redisClient, + client: client, config: config, logger: logger, getMultiGate: gate.New( @@ -219,9 +237,8 @@ func NewRedisClientWithConfig(logger log.Logger, name string, config RedisClient // SetAsync implement RemoteCacheClient. func (c *RedisClient) SetAsync(ctx context.Context, key string, value []byte, ttl time.Duration) error { start := time.Now() - if _, err := c.Set(ctx, key, value, ttl).Result(); err != nil { - level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, - "value_size", len(value)) + if err := c.client.Do(ctx, c.client.B().Set().Key(key).Value(rueidis.BinaryString(value)).ExSeconds(int64(ttl.Seconds())).Build()).Error(); err != nil { + level.Warn(c.logger).Log("msg", "failed to set item into redis", "err", err, "key", key, "value_size", len(value)) return nil } c.durationSet.Observe(time.Since(start).Seconds()) @@ -234,29 +251,16 @@ func (c *RedisClient) SetMulti(ctx context.Context, data map[string][]byte, ttl return } start := time.Now() - keys := make([]string, 0, len(data)) - for k := range data { - keys = append(keys, k) + sets := make(rueidis.Commands, 0, len(data)) + ittl := int64(ttl.Seconds()) + for k, v := range data { + sets = append(sets, c.client.B().Setex().Key(k).Seconds(ittl).Value(rueidis.BinaryString(v)).Build()) } - err := doWithBatch(ctx, len(data), c.config.SetMultiBatchSize, c.setMultiGate, func(startIndex, endIndex int) error { - currentKeys := keys[startIndex:endIndex] - _, err := c.Pipelined(ctx, func(p redis.Pipeliner) error { - for _, key := range currentKeys { - p.SetEX(ctx, key, data[key], ttl) - } - return nil - }) - if err != nil { - level.Warn(c.logger).Log("msg", "failed to set multi items from redis", - "err", err, "items", len(data)) - return nil + for _, resp := range c.client.DoMulti(ctx, sets...) { + if err := resp.Error(); err != nil { + level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, "items", len(data)) + return } - return nil - }) - if err != nil { - level.Warn(c.logger).Log("msg", "failed to set multi items from redis", "err", err, - "items", len(data)) - return } c.durationSetMulti.Observe(time.Since(start).Seconds()) } @@ -268,32 +272,22 @@ func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][] } start := time.Now() results := make(map[string][]byte, len(keys)) - var mu sync.Mutex - err := doWithBatch(ctx, len(keys), c.config.GetMultiBatchSize, c.getMultiGate, func(startIndex, endIndex int) error { - currentKeys := keys[startIndex:endIndex] - resp, err := c.MGet(ctx, currentKeys...).Result() - if err != nil { - level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resp)) - return nil - } - mu.Lock() - defer mu.Unlock() - for i := 0; i < len(resp); i++ { - key := currentKeys[i] - switch val := resp[i].(type) { - case string: - results[key] = stringToBytes(val) - case nil: // miss - default: - level.Warn(c.logger).Log("msg", - fmt.Sprintf("unexpected redis mget result type:%T %v", resp[i], resp[i])) - } - } - return nil - }) + + if c.config.ReadTimeout > 0 { + timeoutCtx, cancel := context.WithTimeout(ctx, c.config.ReadTimeout) + defer cancel() + ctx = timeoutCtx + } + + // NOTE(GiedriusS): TTL is the default one in case PTTL fails. 8 hours should be good enough IMHO. + resps, err := rueidis.MGetCache(c.client, ctx, 8*time.Hour, keys) if err != nil { - level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(keys)) - return nil + level.Warn(c.logger).Log("msg", "failed to mget items from redis", "err", err, "items", len(resps)) + } + for key, resp := range resps { + if val, err := resp.ToString(); err == nil { + results[key] = stringToBytes(val) + } } c.durationGetMulti.Observe(time.Since(start).Seconds()) return results @@ -301,9 +295,7 @@ func (c *RedisClient) GetMulti(ctx context.Context, keys []string) map[string][] // Stop implement RemoteCacheClient. func (c *RedisClient) Stop() { - if err := c.Close(); err != nil { - level.Error(c.logger).Log("msg", "redis close err") - } + c.client.Close() } // stringToBytes converts string to byte slice (copied from vendor/github.com/go-redis/redis/v8/internal/util/unsafe.go). diff --git a/pkg/cacheutil/redis_client_test.go b/pkg/cacheutil/redis_client_test.go index dfe4cf0e51..6b7ca51d6a 100644 --- a/pkg/cacheutil/redis_client_test.go +++ b/pkg/cacheutil/redis_client_test.go @@ -203,17 +203,10 @@ func TestValidateRedisConfig(t *testing.T) { t.Run(tt.name, func(t *testing.T) { cfg := tt.config() - logger := log.NewLogfmtLogger(os.Stderr) - reg := prometheus.NewRegistry() - val, err := NewRedisClientWithConfig(logger, tt.name, cfg, reg) - if val != nil { - defer val.Stop() - } - if tt.expect_err { - testutil.NotOk(t, err, val) + testutil.NotOk(t, cfg.validate()) } else { - testutil.Ok(t, err, val) + testutil.Ok(t, cfg.validate()) } }) } diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 258f8f21b4..57cccbda2d 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -1213,3 +1213,14 @@ rule_files: return config } + +func NewRedis(e e2e.Environment, name string) e2e.Runnable { + return e.Runnable(fmt.Sprintf("redis-%s", name)).WithPorts(map[string]int{"redis": 6379}).Init( + e2e.StartOptions{ + Image: "docker.io/redis:7.0.4-alpine", + Command: e2e.NewCommand("redis-server", "*:6379"), + User: strconv.Itoa(os.Getuid()), + WaitReadyBackoff: &defaultBackoffConfig, + }, + ) +} diff --git a/test/e2e/store_gateway_test.go b/test/e2e/store_gateway_test.go index 9f8eae671d..cfad2ba0f1 100644 --- a/test/e2e/store_gateway_test.go +++ b/test/e2e/store_gateway_test.go @@ -30,6 +30,7 @@ import ( "github.com/efficientgo/core/testutil" "github.com/thanos-io/thanos/pkg/block" "github.com/thanos-io/thanos/pkg/block/metadata" + "github.com/thanos-io/thanos/pkg/cacheutil" "github.com/thanos-io/thanos/pkg/promclient" "github.com/thanos-io/thanos/pkg/runutil" "github.com/thanos-io/thanos/pkg/testutil/e2eutil" @@ -640,3 +641,26 @@ func TestStoreGatewayBytesLimit(t *testing.T) { })) }) } + +func TestRedisClient_Rueidis(t *testing.T) { + t.Parallel() + + e, err := e2e.NewDockerEnvironment("redis-client") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + r := e2ethanos.NewRedis(e, "redis") + testutil.Ok(t, r.Start()) + + redisClient, err := cacheutil.NewRedisClientWithConfig(log.NewLogfmtLogger(os.Stderr), "redis", cacheutil.RedisClientConfig{ + Addr: r.Endpoint("redis"), + }, nil) + testutil.Ok(t, err) + + err = redisClient.SetAsync(context.TODO(), "foo", []byte(`bar`), 1*time.Minute) + testutil.Ok(t, err) + + returnedVals := redisClient.GetMulti(context.TODO(), []string{"foo"}) + testutil.Equals(t, 1, len(returnedVals)) + testutil.Equals(t, []byte("bar"), returnedVals["foo"]) +}