Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

storage: add redis service pool configuration #230

Merged
merged 1 commit into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion config/example.config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
# cache:
# type: redis
# redis:
# address: localhost:6379
# addresses:
# - localhost:6379

#cluster:
# type: kv
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Masterminds/squirrel v1.1.0
github.com/bgentry/speakeasy v0.1.0
github.com/cespare/xxhash v1.1.0
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cockroachdb/errors v1.8.4
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKz
github.com/Joker/jade v1.0.1-0.20190614124447-d475f43051e7/go.mod h1:6E6s8o2AE4KhCrqr6GRJjdC/gNfTdxkIXvuGZZda2VM=
github.com/Masterminds/squirrel v1.1.0 h1:baP1qLdoQCeTw3ifCdOq2dkYc6vGcmRdaociKLbEJXs=
github.com/Masterminds/squirrel v1.1.0/go.mod h1:yaPeOnPG5ZRwL9oKdTsO/prlkPbXWZlRVMQ/gGlzIuA=
github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/Shopify/goreferrer v0.0.0-20181106222321-ec9c9a553398/go.mod h1:a1uqRtAwp2Xwc6WNPJEufxJ7fx3npB4UV/JOLmbu5I0=
github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY=
Expand All @@ -45,6 +46,7 @@ github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQ
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE=
Expand Down Expand Up @@ -371,6 +373,7 @@ github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrf
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/cached/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func New(cfg Config, rep repository.Repository, logger kitlog.Logger) (repositor
if cfg.Type != rediscache.Type {
return nil, fmt.Errorf("unrecognized repository cache type: %s", cfg.Type)
}
c := rediscache.New(cfg.Redis)
c := rediscache.New(cfg.Redis, logger)

return &CachedRepository{
User: &cachedUserRep{c: c, rep: rep, logger: logger},
Expand Down
52 changes: 30 additions & 22 deletions pkg/storage/cached/redis/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"errors"
"time"

"github.com/go-kit/log"

"github.com/go-redis/redis/v8"
)

Expand All @@ -27,7 +29,8 @@ const Type = "redis"

// Config contains Redis cache configuration.
type Config struct {
Address string `fig:"address"`
SRV string `fig:"srv"`
Addresses []string `fig:"addresses"`
Username string `fig:"username"`
Password string `fig:"password"`
DB int `fig:"db"`
Expand All @@ -40,21 +43,26 @@ type Config struct {
// Cache is Redis cache implementation.
type Cache struct {
cfg Config
client *redis.Client
sp servicePool
ttl time.Duration
logger log.Logger
}

// New creates and returns an initialized Redis Cache instance.
func New(cfg Config) *Cache {
return &Cache{cfg: cfg}
func New(cfg Config, logger log.Logger) *Cache {
return &Cache{
cfg: cfg,
logger: log.With(logger, "cache", "redis"),
}
}

// Type satisfies Cache interface.
func (c *Cache) Type() string { return Type }

// Get satisfies Cache interface.
func (c *Cache) Get(ctx context.Context, ns, key string) ([]byte, error) {
val, err := c.client.HGet(ctx, ns, key).Result()
cl := c.sp.getClient(ns)
val, err := cl.HGet(ctx, ns, key).Result()
if err != nil {
if errors.Is(err, redis.Nil) {
return nil, nil
Expand All @@ -66,25 +74,29 @@ func (c *Cache) Get(ctx context.Context, ns, key string) ([]byte, error) {

// Put satisfies Cache interface.
func (c *Cache) Put(ctx context.Context, ns, key string, val []byte) error {
if err := c.client.HSet(ctx, ns, key, val).Err(); err != nil {
cl := c.sp.getClient(ns)
if err := cl.HSet(ctx, ns, key, val).Err(); err != nil {
return err
}
return c.client.Expire(ctx, ns, c.ttl).Err()
return cl.Expire(ctx, ns, c.ttl).Err()
}

// Del satisfies Cache interface.
func (c *Cache) Del(ctx context.Context, ns string, keys ...string) error {
return c.client.HDel(ctx, ns, keys...).Err()
cl := c.sp.getClient(ns)
return cl.HDel(ctx, ns, keys...).Err()
}

// DelNS removes all keys contained under a given namespace from the cache store.
func (c *Cache) DelNS(ctx context.Context, ns string) error {
return c.client.Del(ctx, ns).Err()
cl := c.sp.getClient(ns)
return cl.Del(ctx, ns).Err()
}

// HasKey satisfies Cache interface.
func (c *Cache) HasKey(ctx context.Context, ns, key string) (bool, error) {
res := c.client.HExists(ctx, ns, key)
cl := c.sp.getClient(ns)
res := cl.HExists(ctx, ns, key)
if err := res.Err(); err != nil {
return false, err
}
Expand All @@ -93,19 +105,15 @@ func (c *Cache) HasKey(ctx context.Context, ns, key string) (bool, error) {

// Start satisfies Cache interface.
func (c *Cache) Start(ctx context.Context) error {
c.client = redis.NewClient(&redis.Options{
Addr: c.cfg.Address,
Username: c.cfg.Username,
Password: c.cfg.Password,
DB: c.cfg.DB,
DialTimeout: c.cfg.DialTimeout,
ReadTimeout: c.cfg.ReadTimeout,
WriteTimeout: c.cfg.WriteTimeout,
})
return c.client.Ping(ctx).Err()
if len(c.cfg.SRV) > 0 {
c.sp = newSRVServicePool(c.cfg, c.logger)
} else {
c.sp = newStaticServicePool(c.cfg)
}
return c.sp.start(ctx)
}

// Stop satisfies Cache interface.
func (c *Cache) Stop(_ context.Context) error {
return c.client.Close()
func (c *Cache) Stop(ctx context.Context) error {
return c.sp.stop(ctx)
}
33 changes: 33 additions & 0 deletions pkg/storage/cached/redis/jumphash.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2022 The jackal Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rediscache

// jumpHash consistently chooses a hash bucket number in the range
// [0, numBuckets) for the given key. numBuckets must be >= 1.
//
// Copied from github.com/dgryski/go-jump/blob/master/jump.go (MIT license).
func jumpHash(key uint64, numBuckets int) int32 {

var b int64 = -1
var j int64

for j < int64(numBuckets) {
b = j
key = key*2862933555777941757 + 1
j = int64(float64(b+1) * (float64(int64(1)<<31) / float64((key>>33)+1)))
}

return int32(b)
}
Loading