Skip to content

Commit

Permalink
fix: not recycle cmds if err for later reference in pipe, such as con…
Browse files Browse the repository at this point in the history
…text.DeadlineExceeded cases
  • Loading branch information
rueian committed Jul 30, 2022
1 parent d07669f commit 2e7d5bf
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 36 deletions.
38 changes: 27 additions & 11 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ retry:
if cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
}
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand All @@ -52,8 +54,10 @@ retry:
}
}
}
for _, cmd := range multi {
cmds.Put(cmd.CommandSlice())
for i, cmd := range multi {
if resps[i].NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
}
return resps
}
Expand All @@ -66,8 +70,10 @@ retry:
goto retry
}
}
for _, cmd := range multi {
cmds.Put(cmd.Cmd.CommandSlice())
for i, cmd := range multi {
if resps[i].NonRedisError() == nil {
cmds.Put(cmd.Cmd.CommandSlice())
}
}
return resps
}
Expand All @@ -78,7 +84,9 @@ retry:
if c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
}
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand All @@ -88,7 +96,9 @@ retry:
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
goto retry
}
cmds.Put(subscribe.CommandSlice())
if err == nil {
cmds.Put(subscribe.CommandSlice())
}
return err
}

Expand Down Expand Up @@ -128,7 +138,9 @@ retry:
if cmd.IsReadOnly() && isRetryable(resp.NonRedisError(), c.wire, ctx) {
goto retry
}
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand All @@ -142,8 +154,10 @@ retry:
if readonly && anyRetryable(resp, c.wire, ctx) {
goto retry
}
for _, cmd := range multi {
cmds.Put(cmd.CommandSlice())
for i, cmd := range multi {
if resp[i].NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
}
return resp
}
Expand All @@ -154,7 +168,9 @@ retry:
if _, ok := err.(*RedisError); !ok && isRetryable(err, c.wire, ctx) {
goto retry
}
cmds.Put(subscribe.CommandSlice())
if err == nil {
cmds.Put(subscribe.CommandSlice())
}
return err
}

Expand Down
50 changes: 35 additions & 15 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ process:
}
}
ret:
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand Down Expand Up @@ -305,8 +307,10 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...cmds.Completed) (r
cIndexes[i] = i
}
c.doMulti(ctx, slot, commands, cIndexes, results)
for _, cmd := range multi {
cmds.Put(cmd.CommandSlice())
for i, cmd := range multi {
if results[i].NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
}
return results
}
Expand Down Expand Up @@ -352,8 +356,10 @@ func (c *clusterClient) DoMulti(ctx context.Context, multi ...cmds.Completed) (r
}()
}
wg.Wait()
for _, cmd := range multi {
cmds.Put(cmd.CommandSlice())
for i, cmd := range multi {
if results[i].NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
}
return results
}
Expand Down Expand Up @@ -421,7 +427,9 @@ process:

func (c *clusterClient) DoCache(ctx context.Context, cmd cmds.Cacheable, ttl time.Duration) (resp RedisResult) {
resp = c.doCache(ctx, cmd, ttl)
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand Down Expand Up @@ -481,8 +489,10 @@ func (c *clusterClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
cIndexes[i] = i
}
c.doMultiCache(ctx, multi[0].Cmd.Slot(), multi, cIndexes, results)
for _, cmd := range multi {
cmds.Put(cmd.Cmd.CommandSlice())
for i, cmd := range multi {
if results[i].NonRedisError() == nil {
cmds.Put(cmd.Cmd.CommandSlice())
}
}
return results
}
Expand Down Expand Up @@ -522,8 +532,10 @@ func (c *clusterClient) DoMultiCache(ctx context.Context, multi ...CacheableTTL)
}()
}
wg.Wait()
for _, cmd := range multi {
cmds.Put(cmd.Cmd.CommandSlice())
for i, cmd := range multi {
if results[i].NonRedisError() == nil {
cmds.Put(cmd.Cmd.CommandSlice())
}
}
return results
}
Expand All @@ -540,7 +552,9 @@ retry:
goto retry
}
ret:
cmds.Put(subscribe.CommandSlice())
if err == nil {
cmds.Put(subscribe.CommandSlice())
}
return err
}

Expand Down Expand Up @@ -669,7 +683,9 @@ retry:
}
}
}
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand Down Expand Up @@ -700,8 +716,10 @@ retry:
resp[i] = newErrResult(err)
}
}
for _, cmd := range multi {
cmds.Put(cmd.CommandSlice())
for i, cmd := range multi {
if resp[i].NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
}
return resp
}
Expand All @@ -716,7 +734,9 @@ retry:
goto retry
}
}
cmds.Put(subscribe.CommandSlice())
if err == nil {
cmds.Put(subscribe.CommandSlice())
}
return err
}

Expand Down
4 changes: 3 additions & 1 deletion helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func clusterMGetCache(cc *clusterClient, ctx context.Context, ttl time.Duration,
}
mu.Unlock()
wg.Done()
cmds.Put(cmd.CommandSlice())
}
}()
}
wg.Wait()
if err != nil {
return nil, err
}
for _, mget := range mgets { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe
cmds.Put(mget.CommandSlice())
}
return ret, nil
}

Expand Down
30 changes: 21 additions & 9 deletions sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ retry:
if cmd.IsReadOnly() && c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
}
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand All @@ -73,8 +75,10 @@ retry:
}
}
}
for _, cmd := range multi {
cmds.Put(cmd.CommandSlice())
for i, cmd := range multi {
if resps[i].NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
}
return resps
}
Expand All @@ -85,7 +89,9 @@ retry:
if c.isRetryable(resp.NonRedisError(), ctx) {
goto retry
}
cmds.Put(cmd.CommandSlice())
if resp.NonRedisError() == nil {
cmds.Put(cmd.CommandSlice())
}
return resp
}

Expand All @@ -97,8 +103,10 @@ retry:
goto retry
}
}
for _, cmd := range multi {
cmds.Put(cmd.Cmd.CommandSlice())
for i, cmd := range multi {
if resps[i].NonRedisError() == nil {
cmds.Put(cmd.Cmd.CommandSlice())
}
}
return resps
}
Expand All @@ -109,7 +117,9 @@ retry:
if _, ok := err.(*RedisError); !ok && c.isRetryable(err, ctx) {
goto retry
}
cmds.Put(subscribe.CommandSlice())
if err == nil {
cmds.Put(subscribe.CommandSlice())
}
return err
}

Expand Down Expand Up @@ -267,8 +277,10 @@ func (c *sentinelClient) listWatch(cc conn) (master string, sentinels []string,
sentinelsCMD := c.cmd.SentinelSentinels().Master(c.mOpt.Sentinel.MasterSet).Build()
getMasterCMD := c.cmd.SentinelGetMasterAddrByName().Master(c.mOpt.Sentinel.MasterSet).Build()
defer func() {
cmds.Put(sentinelsCMD.CommandSlice())
cmds.Put(getMasterCMD.CommandSlice())
if err == nil { // not recycle cmds if error, since cmds may be used later in pipe. consider recycle them by pipe
cmds.Put(sentinelsCMD.CommandSlice())
cmds.Put(getMasterCMD.CommandSlice())
}
}()

go func(cc conn) {
Expand Down

0 comments on commit 2e7d5bf

Please sign in to comment.