Skip to content

Commit

Permalink
fix: DoCache cancellation and exec failure
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed May 14, 2022
1 parent a22045b commit d932192
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 14 deletions.
26 changes: 24 additions & 2 deletions lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const (
type cache interface {
GetOrPrepare(key, cmd string, ttl time.Duration) (v RedisMessage, entry *entry)
Update(key, cmd string, value RedisMessage, pttl int64)
Cancel(key, cmd string, value RedisMessage, err error)
Delete(keys []RedisMessage)
FreeAndClose(notice RedisMessage)
}
Expand All @@ -30,12 +31,13 @@ type entry struct {
key string
cmd string
val RedisMessage
err error
size int
}

func (e *entry) Wait() RedisMessage {
func (e *entry) Wait() (RedisMessage, error) {
<-e.ch
return e.val
return e.val, e.err
}

type keyCache struct {
Expand Down Expand Up @@ -160,6 +162,26 @@ func (c *lru) Update(key, cmd string, value RedisMessage, pttl int64) {
}
}

func (c *lru) Cancel(key, cmd string, val RedisMessage, err error) {
var ch chan struct{}
c.mu.Lock()
if store, ok := c.store[key]; ok {
if ele, ok := store.cache[cmd]; ok {
if e := ele.Value.(*entry); e.val.typ == 0 {
e.val = val
e.err = err
ch = e.ch
delete(c.store[key].cache, cmd)
c.list.Remove(ele)
}
}
}
c.mu.Unlock()
if ch != nil {
close(ch)
}
}

func (c *lru) purge(store *keyCache) {
if store != nil {
for cmd, ele := range store.cache {
Expand Down
30 changes: 27 additions & 3 deletions lru_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rueidis

import (
"errors"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestLRU(t *testing.T) {

lru.FreeAndClose(RedisMessage{typ: '-', string: "closed"})

if resp := entry.Wait(); resp.typ != '-' || resp.string != "closed" {
if resp, _ := entry.Wait(); resp.typ != '-' || resp.string != "closed" {
t.Fatalf("got unexpected value after FreeAndClose: %v", resp)
}

Expand All @@ -153,6 +154,27 @@ func TestLRU(t *testing.T) {
}
}
})

t.Run("Cache Cancel", func(t *testing.T) {
lru := setup(t)
v, entry := lru.GetOrPrepare("1", "GET", TTL)
if v.typ != 0 || entry != nil {
t.Fatalf("got unexpected value from the first GetOrPrepare: %v %v", v, entry)
}
v, entry = lru.GetOrPrepare("1", "GET", TTL)
if v.typ != 0 || entry == nil { // entry should not be nil in second call
t.Fatalf("got unexpected value from the second GetOrPrepare: %v %v", v, entry)
}
err := errors.New("any")

go func() {
lru.Cancel("1", "GET", RedisMessage{typ: 1}, err)
}()

if v, err2 := entry.Wait(); v.typ != 1 || err2 != err {
t.Fatalf("got unexpected value from the entry.Wait(): %v %v", err, err2)
}
})
}

func BenchmarkLRU(b *testing.B) {
Expand All @@ -176,12 +198,14 @@ func BenchmarkLRU(b *testing.B) {
func TestEntry(t *testing.T) {
t.Run("Wait", func(t *testing.T) {
e := entry{ch: make(chan struct{}, 1)}
err := errors.New("any")
go func() {
e.val = RedisMessage{typ: 1}
e.err = err
close(e.ch)
}()
if v := e.Wait(); v.typ != 1 {
t.Fatalf("got unexpected value from the Wait: %v", v.typ)
if v, err2 := e.Wait(); v.typ != 1 || err2 != err {
t.Fatalf("got unexpected value from the Wait: %v %v", v.typ, err)
}
})
}
23 changes: 16 additions & 7 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,7 @@ func (p *pipe) _backgroundRead() {
if ff == 4 && len(multi) == 5 && multi[0].IsOptIn() {
cacheable := cmds.Cacheable(multi[3])
ck, cc := cacheable.CacheKey()
if len(msg.values) != 2 { // EXEC aborted
p.cache.Update(ck, cc, msg, 0)
} else {
if len(msg.values) == 2 {
cp := msg.values[1]
cp.attrs = cacheMark
p.cache.Update(ck, cc, cp, msg.values[0].integer)
Expand Down Expand Up @@ -610,18 +608,29 @@ func (p *pipe) DoCache(ctx context.Context, cmd cmds.Cacheable, ttl time.Duratio
if v, entry := p.cache.GetOrPrepare(ck, cc, ttl); v.typ != 0 {
return newResult(v, nil)
} else if entry != nil {
return newResult(entry.Wait(), nil)
return newResult(entry.Wait())
}
exec, err := p.DoMulti(
resp := p.DoMulti(
ctx,
cmds.OptInCmd,
cmds.MultiCmd,
cmds.NewCompleted([]string{"PTTL", ck}),
cmds.Completed(cmd),
cmds.ExecCmd,
)[4].ToArray()
)
exec, err := resp[4].ToArray()
if err != nil {
return newErrResult(err)
if _, ok := err.(*RedisError); !ok {
p.cache.Cancel(ck, cc, RedisMessage{}, err)
return newErrResult(err)
}
// EXEC aborted, return err of the input cmd in MULTI block
if resp[3].val.typ != '+' {
p.cache.Cancel(ck, cc, resp[3].val, nil)
return newResult(resp[3].val, nil)
}
p.cache.Cancel(ck, cc, resp[4].val, nil)
return newResult(resp[4].val, nil)
}
return newResult(exec[1], nil)
}
Expand Down
52 changes: 50 additions & 2 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,54 @@ func TestClientSideCachingExecAbort(t *testing.T) {
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.GetOrPrepare("a", "GET", time.Second); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}

func TestClientSideCachingExecAbortWithMoved(t *testing.T) {
p, mock, cancel, _ := setup(t, ClientOption{})
defer cancel()

go func() {
mock.Expect("CLIENT", "CACHING", "YES").
Expect("MULTI").
Expect("PTTL", "a").
Expect("GET", "a").
Expect("EXEC").
ReplyString("OK").
ReplyString("OK").
ReplyString("OK").
Reply(RedisMessage{typ: '-', string: "MOVED 0 :0"}).
Reply(RedisMessage{typ: '-', string: "EXECABORT"})
}()

v, err := p.DoCache(context.Background(), cmds.Cacheable(cmds.NewCompleted([]string{"GET", "a"})), 10*time.Second).ToMessage()
if addr, ok := err.(*RedisError).IsMoved(); !ok || addr != ":0" {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.GetOrPrepare("a", "GET", time.Second); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}

func TestClientSideCachingWithNonRedisError(t *testing.T) {
p, _, _, closeConn := setup(t, ClientOption{})
closeConn()

v, err := p.DoCache(context.Background(), cmds.Cacheable(cmds.NewCompleted([]string{"GET", "a"})), 10*time.Second).ToMessage()
if !strings.HasPrefix(err.Error(), "io:") {
t.Errorf("unexpected err, got %v", err)
}
if v.IsCacheHit() {
t.Errorf("unexpected cache hit")
}
if v, entry := p.cache.GetOrPrepare("a", "GET", time.Second); v.typ != 0 || entry != nil {
t.Errorf("unexpected cache value and entry %v %v", v, entry)
}
}

// https://github.com/redis/redis/issues/8935
Expand Down Expand Up @@ -1027,7 +1075,7 @@ func TestOngoingCancelContextInPipelineMode_Do(t *testing.T) {
}

func TestOngoingWriteTimeoutInPipelineMode_Do(t *testing.T) {
p, mock, _, closeConn := setup(t, ClientOption{ConnWriteTimeout: time.Second / 2})
p, mock, _, closeConn := setup(t, ClientOption{ConnReadWriteTimeout: time.Second / 2})
defer closeConn()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand Down Expand Up @@ -1098,7 +1146,7 @@ func TestOngoingCancelContextInPipelineMode_DoMulti(t *testing.T) {
}

func TestOngoingWriteTimeoutInPipelineMode_DoMulti(t *testing.T) {
p, mock, _, closeConn := setup(t, ClientOption{ConnWriteTimeout: time.Second / 2})
p, mock, _, closeConn := setup(t, ClientOption{ConnReadWriteTimeout: time.Second / 2})
defer closeConn()

ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
Expand Down

0 comments on commit d932192

Please sign in to comment.