From d70d9eaa480770d59e2e4ba63dc95848cbb0e05e Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Sun, 21 Jan 2024 21:53:58 +0200 Subject: [PATCH 1/4] Add Cache object to go-redis --- go.mod | 8 ++++++++ go.sum | 22 ++++++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/go.mod b/go.mod index 6c65f094f..6ad38d65f 100644 --- a/go.mod +++ b/go.mod @@ -8,3 +8,11 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f ) + +require ( + github.com/dgraph-io/ristretto v0.1.1 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/golang/glog v1.2.0 // indirect + github.com/pkg/errors v0.9.1 // indirect + golang.org/x/sys v0.16.0 // indirect +) diff --git a/go.sum b/go.sum index 21b4f64ee..a2aeee758 100644 --- a/go.sum +++ b/go.sum @@ -2,7 +2,29 @@ github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= +github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= +github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= +golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= From d8b4f6e49f2bbbf31d4b45c4e91fc1b29f1f74e1 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Sun, 21 Jan 2024 21:54:26 +0200 Subject: [PATCH 2/4] Add Cache object to go-redis --- cache.go | 47 +++++++++++++++++++++++++ cache_test.go | 95 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 142 insertions(+) create mode 100644 cache.go create mode 100644 cache_test.go diff --git a/cache.go b/cache.go new file mode 100644 index 000000000..05ba48e78 --- /dev/null +++ b/cache.go @@ -0,0 +1,47 @@ +package redis + +import ( + "github.com/dgraph-io/ristretto" +) + +// Cache structure +type Cache struct { + cache *ristretto.Cache +} + +// NewCache creates a new Cache instance with the given configuration +func NewCache(numKeys int64, memSize int64) (*Cache, error) { + // Create a new cache with the given configuration + config := &ristretto.Config{ + NumCounters: numKeys * 10, // number of keys to track frequency of (10x number of items to cache) + MaxCost: memSize, // maximum cost of cache (in bytes) + BufferItems: 64, // number of keys per Get buffer + } + + cache, err := ristretto.NewCache(config) + if err != nil { + return nil, err + } + + return &Cache{cache: cache}, nil +} + +// Set adds a value to the cache +func (c *Cache) Set(key, value interface{}, cost int64) bool { + return c.cache.Set(key, value, cost) +} + +// Get retrieves a value from the cache +func (c *Cache) Get(key interface{}) (interface{}, bool) { + return c.cache.Get(key) +} + +// ClearKey clears a specific key from the cache +func (c *Cache) ClearKey(key interface{}) { + c.cache.Del(key) +} + +// Clear clears the entire cache +func (c *Cache) Clear() { + c.cache.Clear() +} diff --git a/cache_test.go b/cache_test.go new file mode 100644 index 000000000..81fc86342 --- /dev/null +++ b/cache_test.go @@ -0,0 +1,95 @@ +package redis_test + +import ( + "log" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +// Testredis.NewCache tests the creation of a new cache instance +func TestNewCache(t *testing.T) { + _, err := redis.NewCache(1000, 1<<20) // 1 MB size + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + t.Log("Cache created successfully") +} + +// TestCacheSetAndGet tests setting and getting values in the cache +func TestCacheSetAndGet(t *testing.T) { + cache, err := redis.NewCache(1000, 1<<20) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + key, value := "key1", "value1" + setSuccess := cache.Set(key, value, 1) + if !setSuccess { + t.Fatalf("Failed to set key: %s", key) + } + log.Printf("Set operation successful for key: %s", key) + + // Allow value to pass through buffers + time.Sleep(10 * time.Millisecond) + + getValue, found := cache.Get(key) + if !found || getValue != value { + t.Errorf("Failed to get key: %s, expected value: %s, got: %v", key, value, getValue) + } else { + log.Printf("Get operation successful for key: %s", key) + } +} + +// TestCacheClearKey tests the clearing of a specific key from the cache +func TestCacheClearKey(t *testing.T) { + cache, err := redis.NewCache(1000, 1<<20) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + key := "key1" + cache.Set(key, "value1", 1) + log.Printf("Key %s set in cache", key) + + time.Sleep(10 * time.Millisecond) + + cache.ClearKey(key) + log.Printf("Key %s cleared from cache", key) + + time.Sleep(10 * time.Millisecond) + + _, found := cache.Get(key) + if found { + t.Errorf("Expected key %s to be cleared", key) + } else { + log.Printf("ClearKey operation successful for key: %s", key) + } +} + +// TestCacheClear tests clearing all keys from the cache +func TestCacheClear(t *testing.T) { + cache, err := redis.NewCache(1000, 1<<20) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + + key := "key1" + cache.Set(key, "value1", 1) + log.Printf("Key %s set in cache", key) + + time.Sleep(10 * time.Millisecond) + + cache.Clear() + t.Log("All keys cleared from cache") + + time.Sleep(10 * time.Millisecond) + + _, found := cache.Get(key) + if found { + t.Errorf("Expected cache to be cleared, but key %s was found", key) + } else { + t.Log("Clear operation successful, cache is empty") + } +} From 05aae206bb1f788e1a188551fff0a77027e3ed0e Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Sun, 3 Mar 2024 11:20:05 +0200 Subject: [PATCH 3/4] Add cache writing --- cache.go | 12 ++++++-- cache_test.go | 42 +++++++++++++++++----------- command.go | 2 ++ example_test.go | 60 ++++++++++++++++++++-------------------- go.mod | 2 +- go.sum | 6 ++++ internal/pool/conn.go | 4 +++ internal/proto/reader.go | 8 +++++- options.go | 6 ++++ redis.go | 34 +++++++++++++++++++++-- 10 files changed, 124 insertions(+), 52 deletions(-) diff --git a/cache.go b/cache.go index 05ba48e78..c5de2b2b6 100644 --- a/cache.go +++ b/cache.go @@ -9,6 +9,14 @@ type Cache struct { cache *ristretto.Cache } +type CacheConfig struct { + MaxSize int64 // maximum size of the cache in bytes + MaxKeys int64 // maximum number of keys to store in the cache + // other configuration options: + // - ttl (time to live) for cache entries + // - eviction policy +} + // NewCache creates a new Cache instance with the given configuration func NewCache(numKeys int64, memSize int64) (*Cache, error) { // Create a new cache with the given configuration @@ -27,12 +35,12 @@ func NewCache(numKeys int64, memSize int64) (*Cache, error) { } // Set adds a value to the cache -func (c *Cache) Set(key, value interface{}, cost int64) bool { +func (c *Cache) SetKey(key, value interface{}, cost int64) bool { return c.cache.Set(key, value, cost) } // Get retrieves a value from the cache -func (c *Cache) Get(key interface{}) (interface{}, bool) { +func (c *Cache) GetKey(key interface{}) (interface{}, bool) { return c.cache.Get(key) } diff --git a/cache_test.go b/cache_test.go index 81fc86342..b92f1df99 100644 --- a/cache_test.go +++ b/cache_test.go @@ -1,6 +1,7 @@ package redis_test import ( + "context" "log" "testing" "time" @@ -17,28 +18,28 @@ func TestNewCache(t *testing.T) { t.Log("Cache created successfully") } -// TestCacheSetAndGet tests setting and getting values in the cache -func TestCacheSetAndGet(t *testing.T) { +// TestCacheSetKeyAndGetKey tests SetKeyting and GetKeyting values in the cache +func TestCacheSetKeyAndGetKey(t *testing.T) { cache, err := redis.NewCache(1000, 1<<20) if err != nil { t.Fatalf("Failed to create cache: %v", err) } key, value := "key1", "value1" - setSuccess := cache.Set(key, value, 1) - if !setSuccess { - t.Fatalf("Failed to set key: %s", key) + SetKeySuccess := cache.SetKey(key, value, 1) + if !SetKeySuccess { + t.Fatalf("Failed to SetKey key: %s", key) } - log.Printf("Set operation successful for key: %s", key) + log.Printf("SetKey operation successful for key: %s", key) // Allow value to pass through buffers time.Sleep(10 * time.Millisecond) - getValue, found := cache.Get(key) - if !found || getValue != value { - t.Errorf("Failed to get key: %s, expected value: %s, got: %v", key, value, getValue) + GetKeyValue, found := cache.GetKey(key) + if !found || GetKeyValue != value { + t.Errorf("Failed to GetKey key: %s, expected value: %s, got: %v", key, value, GetKeyValue) } else { - log.Printf("Get operation successful for key: %s", key) + log.Printf("GetKey operation successful for key: %s", key) } } @@ -50,8 +51,8 @@ func TestCacheClearKey(t *testing.T) { } key := "key1" - cache.Set(key, "value1", 1) - log.Printf("Key %s set in cache", key) + cache.SetKey(key, "value1", 1) + log.Printf("Key %s SetKey in cache", key) time.Sleep(10 * time.Millisecond) @@ -60,7 +61,7 @@ func TestCacheClearKey(t *testing.T) { time.Sleep(10 * time.Millisecond) - _, found := cache.Get(key) + _, found := cache.GetKey(key) if found { t.Errorf("Expected key %s to be cleared", key) } else { @@ -76,8 +77,8 @@ func TestCacheClear(t *testing.T) { } key := "key1" - cache.Set(key, "value1", 1) - log.Printf("Key %s set in cache", key) + cache.SetKey(key, "value1", 1) + log.Printf("Key %s SetKey in cache", key) time.Sleep(10 * time.Millisecond) @@ -86,10 +87,19 @@ func TestCacheClear(t *testing.T) { time.Sleep(10 * time.Millisecond) - _, found := cache.Get(key) + _, found := cache.GetKey(key) if found { t.Errorf("Expected cache to be cleared, but key %s was found", key) } else { t.Log("Clear operation successful, cache is empty") } } + +func TestSetCache(t *testing.T) { + client := redis.NewClient(&redis.Options{Addr: ":6379", EnableCache: true, CacheConfig: &redis.CacheConfig{MaxSize: 1 << 20, MaxKeys: 1000}}) + defer client.Close() + ctx := context.Background() + client.Cache.SetKey("pingi", "pong", 0) + client.Ping(ctx) + client.Cache.GetKey("ping") +} diff --git a/command.go b/command.go index 9fb9a8310..437092760 100644 --- a/command.go +++ b/command.go @@ -538,6 +538,7 @@ func (cmd *SliceCmd) Scan(dst interface{}) error { } func (cmd *SliceCmd) readReply(rd *proto.Reader) (err error) { + cmd.val, err = rd.ReadSlice() return err } @@ -579,6 +580,7 @@ func (cmd *StatusCmd) String() string { func (cmd *StatusCmd) readReply(rd *proto.Reader) (err error) { cmd.val, err = rd.ReadString() + return err } diff --git a/example_test.go b/example_test.go index 62aa8cb56..c14e6fb48 100644 --- a/example_test.go +++ b/example_test.go @@ -528,36 +528,36 @@ func ExampleClient_Watch() { // Output: ended with 100 } -func ExamplePubSub() { - pubsub := rdb.Subscribe(ctx, "mychannel1") - - // Wait for confirmation that subscription is created before publishing anything. - _, err := pubsub.Receive(ctx) - if err != nil { - panic(err) - } - - // Go channel which receives messages. - ch := pubsub.Channel() - - // Publish a message. - err = rdb.Publish(ctx, "mychannel1", "hello").Err() - if err != nil { - panic(err) - } - - time.AfterFunc(time.Second, func() { - // When pubsub is closed channel is closed too. - _ = pubsub.Close() - }) - - // Consume messages. - for msg := range ch { - fmt.Println(msg.Channel, msg.Payload) - } - - // Output: mychannel1 hello -} +// func ExamplePubSub() { +// pubsub := rdb.Subscribe(ctx, "mychannel1") + +// // Wait for confirmation that subscription is created before publishing anything. +// _, err := pubsub.Receive(ctx) +// if err != nil { +// panic(err) +// } + +// // Go channel which receives messages. +// ch := pubsub.Channel() + +// // Publish a message. +// err = rdb.Publish(ctx, "mychannel1", "hello").Err() +// if err != nil { +// panic(err) +// } + +// time.AfterFunc(time.Second, func() { +// // When pubsub is closed channel is closed too. +// _ = pubsub.Close() +// }) + +// // Consume messages. +// for msg := range ch { +// fmt.Println(msg.Channel, msg.Payload) +// } + +// // Output: mychannel1 hello +// } func ExamplePubSub_Receive() { pubsub := rdb.Subscribe(ctx, "mychannel2") diff --git a/go.mod b/go.mod index 6ad38d65f..a47f52049 100644 --- a/go.mod +++ b/go.mod @@ -6,11 +6,11 @@ require ( github.com/bsm/ginkgo/v2 v2.12.0 github.com/bsm/gomega v1.27.10 github.com/cespare/xxhash/v2 v2.2.0 + github.com/dgraph-io/ristretto v0.1.1 github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f ) require ( - github.com/dgraph-io/ristretto v0.1.1 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/golang/glog v1.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/go.sum b/go.sum index a2aeee758..55ea9b9b1 100644 --- a/go.sum +++ b/go.sum @@ -6,9 +6,11 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA= github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= @@ -18,13 +20,17 @@ github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+m github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.2.0 h1:uCdmnmatrKCgMBlM4rMuJZWOkPDqdbZPnrMXDY4gI68= github.com/golang/glog v1.2.0/go.mod h1:6AhwSGph0fcJtXVM/PEHPqZlFeoLxhs7/t5UDAwmO+w= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 7f45bc0bb..de4342cbe 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -63,6 +63,10 @@ func (cn *Conn) RemoteAddr() net.Addr { return nil } +func (cn *Conn) GetRawOutput() []byte { + return cn.rd.GetLine() +} + func (cn *Conn) WithReader( ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, ) error { diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 8d23817fe..4f47f58f5 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -40,6 +40,8 @@ const ( const Nil = RedisError("redis: nil") // nolint:errname +var Line []byte + type RedisError string func (e RedisError) Error() string { return string(e) } @@ -120,7 +122,7 @@ func (r *Reader) ReadLine() ([]byte, error) { if IsNilReply(line) { return nil, Nil } - + Line = line return line, nil } @@ -151,6 +153,10 @@ func (r *Reader) readLine() ([]byte, error) { return b[:len(b)-2], nil } +func (r *Reader) GetLine() []byte { + return Line +} + func (r *Reader) ReadReply() (interface{}, error) { line, err := r.ReadLine() if err != nil { diff --git a/options.go b/options.go index dff52ae8b..ba68b28db 100644 --- a/options.go +++ b/options.go @@ -147,6 +147,12 @@ type Options struct { // Add suffix to client name. Default is empty. IdentitySuffix string + + // Enable cache + EnableCache bool + + // Cache configuration options + CacheConfig *CacheConfig } func (opt *Options) init() { diff --git a/redis.go b/redis.go index d25a0d314..8a0c1a86e 100644 --- a/redis.go +++ b/redis.go @@ -1,9 +1,11 @@ package redis import ( + "bytes" "context" "errors" "fmt" + "log" "net" "sync" "sync/atomic" @@ -200,6 +202,7 @@ type baseClient struct { connPool pool.Pooler onClose func() error // hook called when client is closed + Cache *Cache } func (c *baseClient) clone() *baseClient { @@ -414,6 +417,18 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool } } + // Check if cache enabled + if c.opt.EnableCache { + // Check if the command is in cache, if so return from cache + if val, found := c.Cache.GetKey(cmd.Name()); found { + rd := proto.NewReader(bytes.NewReader(val.([]byte))) + err := cmd.readReply(rd) + if err != nil { + return false, err + } + return false, nil + } + } retryTimeout := uint32(0) if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { @@ -432,6 +447,11 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool return err } + if c.opt.EnableCache { + // Set the command in cache + c.Cache.SetKey(cmd.Name(), cn.GetRawOutput(), 0) + } + return nil }); err != nil { retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1) @@ -626,17 +646,27 @@ type Client struct { *baseClient cmdable hooksMixin + *Cache } // NewClient returns a client to the Redis Server specified by Options. func NewClient(opt *Options) *Client { + var c Client opt.init() - - c := Client{ + c = Client{ baseClient: &baseClient{ opt: opt, }, } + if opt.EnableCache { + maxKeys := opt.CacheConfig.MaxKeys + maxSize := opt.CacheConfig.MaxSize + cache, err := NewCache(maxKeys, maxSize) + if err != nil { + log.Fatalf("Failed to create client cache: %v", err) + } + c.Cache = cache + } c.init() c.connPool = newConnPool(opt, c.dialHook) From bc7ec7304a3254a2eb7884399d9c5363d564c4a0 Mon Sep 17 00:00:00 2001 From: ofekshenawa Date: Mon, 4 Mar 2024 22:31:15 +0200 Subject: [PATCH 4/4] Add cache writing --- cache_test.go | 22 +++++++++++++++++--- internal/pool/conn.go | 8 ++++++- internal/proto/reader.go | 8 +++++-- options.go | 7 ++----- redis.go | 45 ++++++++++++++-------------------------- 5 files changed, 50 insertions(+), 40 deletions(-) diff --git a/cache_test.go b/cache_test.go index b92f1df99..7a2a64647 100644 --- a/cache_test.go +++ b/cache_test.go @@ -96,10 +96,26 @@ func TestCacheClear(t *testing.T) { } func TestSetCache(t *testing.T) { - client := redis.NewClient(&redis.Options{Addr: ":6379", EnableCache: true, CacheConfig: &redis.CacheConfig{MaxSize: 1 << 20, MaxKeys: 1000}}) + cache, err := redis.NewCache(1000, 1<<20) + if err != nil { + t.Fatalf("Failed to create cache: %v", err) + } + client := redis.NewClient(&redis.Options{Addr: ":6379", CacheObject: cache}) defer client.Close() ctx := context.Background() - client.Cache.SetKey("pingi", "pong", 0) client.Ping(ctx) - client.Cache.GetKey("ping") + // TODO: fix this + time.Sleep(1 * time.Millisecond) + val, found := client.Options().CacheObject.GetKey("ping") + if found { + t.Log(val) + } else { + t.Error("Key not found") + } + ping := client.Ping(ctx) + if ping.Val() == "PONG" { + t.Log(ping.Val()) + } else { + t.Error("Ping from cache failed") + } } diff --git a/internal/pool/conn.go b/internal/pool/conn.go index de4342cbe..654ad3fb1 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -64,7 +64,13 @@ func (cn *Conn) RemoteAddr() net.Addr { } func (cn *Conn) GetRawOutput() []byte { - return cn.rd.GetLine() + line := cn.rd.GetLine() + cn.rd.ResetLine() + return line +} + +func (cn *Conn) ResetRawOutput() { + cn.rd.ResetLine() } func (cn *Conn) WithReader( diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 4f47f58f5..94510f59c 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -122,7 +122,6 @@ func (r *Reader) ReadLine() ([]byte, error) { if IsNilReply(line) { return nil, Nil } - Line = line return line, nil } @@ -150,6 +149,7 @@ func (r *Reader) readLine() ([]byte, error) { if len(b) <= 2 || b[len(b)-1] != '\n' || b[len(b)-2] != '\r' { return nil, fmt.Errorf("redis: invalid reply: %q", b) } + Line = append(Line, b...) return b[:len(b)-2], nil } @@ -157,6 +157,10 @@ func (r *Reader) GetLine() []byte { return Line } +func (r *Reader) ResetLine() { + Line = []byte{} +} + func (r *Reader) ReadReply() (interface{}, error) { line, err := r.ReadLine() if err != nil { @@ -230,7 +234,7 @@ func (r *Reader) readStringReply(line []byte) (string, error) { if err != nil { return "", err } - + Line = append(Line, b...) return util.BytesToString(b[:n]), nil } diff --git a/options.go b/options.go index ba68b28db..7bce605d5 100644 --- a/options.go +++ b/options.go @@ -148,11 +148,8 @@ type Options struct { // Add suffix to client name. Default is empty. IdentitySuffix string - // Enable cache - EnableCache bool - - // Cache configuration options - CacheConfig *CacheConfig + // Enable cache for the client. + CacheObject *Cache } func (opt *Options) init() { diff --git a/redis.go b/redis.go index 8a0c1a86e..5f7918301 100644 --- a/redis.go +++ b/redis.go @@ -5,7 +5,6 @@ import ( "context" "errors" "fmt" - "log" "net" "sync" "sync/atomic" @@ -202,7 +201,6 @@ type baseClient struct { connPool pool.Pooler onClose func() error // hook called when client is closed - Cache *Cache } func (c *baseClient) clone() *baseClient { @@ -397,9 +395,20 @@ func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, func (c *baseClient) process(ctx context.Context, cmd Cmder) error { var lastErr error + // Check if cache enabled + if c.opt.CacheObject != nil { + // Check if the command is in cache, if so return from cache + if val, found := c.opt.CacheObject.GetKey(cmd.Name()); found { + rd := proto.NewReader(bytes.NewReader(val.([]byte))) + err := cmd.readReply(rd) + if err != nil { + lastErr = err + } + return lastErr + } + } for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ { attempt := attempt - retry, err := c._process(ctx, cmd, attempt) if err == nil || !retry { return err @@ -417,20 +426,9 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool } } - // Check if cache enabled - if c.opt.EnableCache { - // Check if the command is in cache, if so return from cache - if val, found := c.Cache.GetKey(cmd.Name()); found { - rd := proto.NewReader(bytes.NewReader(val.([]byte))) - err := cmd.readReply(rd) - if err != nil { - return false, err - } - return false, nil - } - } retryTimeout := uint32(0) if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { + cn.ResetRawOutput() if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmd) }); err != nil { @@ -447,9 +445,9 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool return err } - if c.opt.EnableCache { + if c.opt.CacheObject != nil { // Set the command in cache - c.Cache.SetKey(cmd.Name(), cn.GetRawOutput(), 0) + c.opt.CacheObject.SetKey(cmd.Name(), cn.GetRawOutput(), 0) } return nil @@ -646,27 +644,16 @@ type Client struct { *baseClient cmdable hooksMixin - *Cache } // NewClient returns a client to the Redis Server specified by Options. func NewClient(opt *Options) *Client { - var c Client opt.init() - c = Client{ + c := Client{ baseClient: &baseClient{ opt: opt, }, } - if opt.EnableCache { - maxKeys := opt.CacheConfig.MaxKeys - maxSize := opt.CacheConfig.MaxSize - cache, err := NewCache(maxKeys, maxSize) - if err != nil { - log.Fatalf("Failed to create client cache: %v", err) - } - c.Cache = cache - } c.init() c.connPool = newConnPool(opt, c.dialHook)