Skip to content

Commit

Permalink
perf: evenly dispatch requests to conns and limit the default conns c…
Browse files Browse the repository at this point in the history
…ount in single client
  • Loading branch information
rueian committed Jul 2, 2023
1 parent 2911b22 commit 1aba42b
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
27 changes: 24 additions & 3 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rueidis

import (
"context"
"math/rand"
"net"
"runtime"
"sync"
Expand Down Expand Up @@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre
}

func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
slot := cmd.Slot() & uint16(len(m.wire)-1)
slot := slotfn(cmd.Slot(), len(m.wire))
wire := m.pipe(slot)
if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
m.wire[slot].CompareAndSwap(wire, m.init)
Expand All @@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
slot := cmd[0].Slot() & uint16(len(m.wire)-1)
slot := slotfn(cmd[0].Slot(), len(m.wire))
wire := m.pipe(slot)
resp = wire.DoMulti(ctx, cmd...)
for _, r := range resp.s {
Expand Down Expand Up @@ -311,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
}

func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
slot := subscribe.Slot() & uint16(len(m.wire)-1)
slot := slotfn(subscribe.Slot(), len(m.wire))
wire := m.pipe(slot)
err := wire.Receive(ctx, subscribe, fn)
if isBroken(err, wire) {
Expand Down Expand Up @@ -346,3 +347,23 @@ func (m *mux) Addr() string {
func isBroken(err error, w wire) bool {
return err != nil && err != ErrClosing && w.Error() != nil
}

var rngPool = sync.Pool{
New: func() any {
return rand.New(rand.NewSource(time.Now().UnixNano()))
},
}

func fastrand(n int) (r int) {
s := rngPool.Get().(*rand.Rand)
r = s.Intn(n)
rngPool.Put(s)
return
}

func slotfn(ks uint16, n int) uint16 {
if n == 1 || ks == cmds.NoSlot {
return 0
}
return uint16(fastrand(n))
}
23 changes: 16 additions & 7 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) {
wg.Add(p)
for i := 0; i < p; i++ {
go func() {
defer func() {
recover()
wg.Done()
}()
for fn := range ch {
fn()
}
wg.Done()
}()
}
return ch, func() {
Expand Down Expand Up @@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString()
if err != nil || val != "OK" {
t.Fatalf("unexpected set response %v %v", val, err)
t.Errorf("unexpected set response %v %v", val, err)
}
}
}
Expand All @@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Fatalf("unexpected get response %v %v %v", val, err, ok)
t.Errorf("unexpected get response %v %v %v", val, err, ok)
}
}
}
Expand All @@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
val, err := resp.ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Fatalf("unexpected csc get response %v %v %v", val, err, ok)
t.Errorf("unexpected csc get response %v %v %v", val, err, ok)
}
if resp.IsCacheHit() {
atomic.AddInt64(&hits, 1)
Expand All @@ -175,23 +178,25 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64()
if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) {
t.Fatalf("unexpected del response %v %v %v", val, err, ok)
t.Errorf("unexpected del response %v %v %v", val, err, ok)
}
}
}
wait()

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
jobs, wait = parallel(para)
for i := 0; i < keys/100; i++ {
key := strconv.Itoa(i)
jobs <- func() {
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
if !IsRedisNil(resp.Error()) {
t.Fatalf("unexpected csc get response after delete %v", resp)
t.Errorf("unexpected csc get response after delete %v", resp)
}
if resp.IsCacheHit() {
t.Fatalf("unexpected csc cache hit after delete")
t.Errorf("unexpected csc cache hit after delete")
}
}
}
Expand Down Expand Up @@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) {
}
wait()

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
jobs, wait = parallel(para)
for i := 0; i < keys/100; i += batch {
Expand Down Expand Up @@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) {
t.Fatalf("unexpecetd err %v\n", err)
}
}

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
resp, err = MGetCache(client, ctx, time.Minute, cmdKeys)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion rueidis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"context"
"crypto/tls"
"errors"
"math"
"math/rand"
"net"
"runtime"
"strings"
"time"

Expand Down Expand Up @@ -319,7 +321,9 @@ func NewClient(option ClientOption) (client Client, err error) {

func singleClientMultiplex(multiplex int) int {
if multiplex == 0 {
multiplex = 2
if multiplex = int(math.Log2(float64(runtime.GOMAXPROCS(0)))); multiplex >= 2 {
multiplex = 2
}
}
if multiplex < 0 {
multiplex = 0
Expand Down

0 comments on commit 1aba42b

Please sign in to comment.