Skip to content

Commit

Permalink
perf: reduce locks on DedicatedClient of clusterClient
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed May 19, 2022
1 parent c7a7bd6 commit 926933c
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 23 deletions.
9 changes: 9 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,12 @@ func allReadOnly(multi []cmds.Completed) bool {
}
return true
}

func allSameSlot(multi []cmds.Completed) bool {
for i := 1; i < len(multi); i++ {
if multi[0].Slot() != multi[i].Slot() {
return false
}
}
return true
}
27 changes: 10 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,25 +363,20 @@ type dedicatedClusterClient struct {
slot uint16
}

func (c *dedicatedClusterClient) check(slot uint16) {
func (c *dedicatedClusterClient) acquire(slot uint16) (wire wire, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.slot == cmds.NoSlot {
if slot == cmds.NoSlot {
panic(panicMsgNoSlot)
}
c.slot = slot
} else if c.slot != slot {
} else if c.slot != slot && slot != cmds.NoSlot {
panic(panicMsgCxSlot)
}
}

func (c *dedicatedClusterClient) acquire() (wire wire, err error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.wire != nil {
return c.wire, nil
}
if c.slot == cmds.NoSlot {
panic(panicMsgNoSlot)
}
if c.conn, err = c.client.pick(c.slot); err != nil {
return nil, err
}
Expand All @@ -402,9 +397,8 @@ func (c *dedicatedClusterClient) B() cmds.Builder {
}

func (c *dedicatedClusterClient) Do(ctx context.Context, cmd cmds.Completed) (resp RedisResult) {
c.check(cmd.Slot())
retry:
if w, err := c.acquire(); err != nil {
if w, err := c.acquire(cmd.Slot()); err != nil {
resp = newErrResult(err)
} else {
resp = w.Do(ctx, cmd)
Expand All @@ -420,12 +414,12 @@ func (c *dedicatedClusterClient) DoMulti(ctx context.Context, multi ...cmds.Comp
if len(multi) == 0 {
return nil
}
for _, cmd := range multi {
c.check(cmd.Slot())
if !allSameSlot(multi) {
panic(panicMsgCxSlot)
}
readonly := allReadOnly(multi)
retry:
if w, err := c.acquire(); err == nil {
if w, err := c.acquire(multi[0].Slot()); err == nil {
resp = w.DoMulti(ctx, multi...)
for _, resp := range resp {
if c.client.shouldRefreshRetry(resp.NonRedisError(), ctx) && readonly && w.Error() == nil {
Expand All @@ -445,10 +439,9 @@ retry:
}

func (c *dedicatedClusterClient) Receive(ctx context.Context, subscribe cmds.Completed, fn func(msg PubSubMessage)) (err error) {
c.check(subscribe.Slot())
var w wire
retry:
if w, err = c.acquire(); err == nil {
if w, err = c.acquire(subscribe.Slot()); err == nil {
if err = w.Receive(ctx, subscribe, fn); c.client.shouldRefreshRetry(err, ctx) && w.Error() == nil {
goto retry
}
Expand Down
45 changes: 39 additions & 6 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -274,13 +275,34 @@ func TestClusterClient(t *testing.T) {
})
})

t.Run("Dedicated Multi Cross Slot Err", func(t *testing.T) {
m.AcquireFn = func() wire { return &mockWire{} }
err := client.Dedicated(func(c DedicatedClient) (err error) {
defer func() {
err = errors.New(recover().(string))
}()
c.DoMulti(
context.Background(),
c.B().Get().Key("a").Build(),
c.B().Get().Key("b").Build(),
)
return nil
})
if err == nil || err.Error() != panicMsgCxSlot {
t.Errorf("Multi should panic if cross slots is used")
}
})

t.Run("Dedicated Delegate", func(t *testing.T) {
w := &mockWire{
DoFn: func(cmd cmds.Completed) RedisResult {
return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)
},
DoMultiFn: func(cmd ...cmds.Completed) []RedisResult {
return []RedisResult{newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)}
return []RedisResult{
newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil),
newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil),
}
},
ReceiveFn: func(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error {
return ErrClosing
Expand All @@ -306,8 +328,12 @@ func TestClusterClient(t *testing.T) {
if v := c.DoMulti(context.Background()); len(v) != 0 {
t.Fatalf("received unexpected response %v", v)
}
for _, resp := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) {
if v, err := resp.ToString(); err != nil || v != "Delegate" {
for i, resp := range c.DoMulti(
context.Background(),
c.B().Get().Key("a").Build(),
c.B().Get().Key("a").Build(),
) {
if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) {
t.Fatalf("unexpected response %v %v", v, err)
}
}
Expand All @@ -329,7 +355,10 @@ func TestClusterClient(t *testing.T) {
return newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)
},
DoMultiFn: func(cmd ...cmds.Completed) []RedisResult {
return []RedisResult{newResult(RedisMessage{typ: '+', string: "Delegate"}, nil)}
return []RedisResult{
newResult(RedisMessage{typ: '+', string: "Delegate0"}, nil),
newResult(RedisMessage{typ: '+', string: "Delegate1"}, nil),
}
},
ReceiveFn: func(ctx context.Context, subscribe cmds.Completed, fn func(message PubSubMessage)) error {
return ErrClosing
Expand All @@ -355,8 +384,12 @@ func TestClusterClient(t *testing.T) {
if v := c.DoMulti(context.Background()); len(v) != 0 {
t.Fatalf("received unexpected response %v", v)
}
for _, resp := range c.DoMulti(context.Background(), c.B().Get().Key("a").Build()) {
if v, err := resp.ToString(); err != nil || v != "Delegate" {
for i, resp := range c.DoMulti(
context.Background(),
c.B().Get().Key("a").Build(),
c.B().Get().Key("a").Build(),
) {
if v, err := resp.ToString(); err != nil || v != "Delegate"+strconv.Itoa(i) {
t.Fatalf("unexpected response %v %v", v, err)
}
}
Expand Down

0 comments on commit 926933c

Please sign in to comment.