diff --git a/mux.go b/mux.go index 42e5a586..9bae0fc7 100644 --- a/mux.go +++ b/mux.go @@ -2,6 +2,7 @@ package rueidis import ( "context" + "math/rand" "net" "runtime" "sync" @@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre } func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) { - slot := cmd.Slot() & uint16(len(m.wire)-1) + slot := uint16(fastrand(len(m.wire))) wire := m.pipe(slot) if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) { m.wire[slot].CompareAndSwap(wire, m.init) @@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) { } func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) { - slot := cmd[0].Slot() & uint16(len(m.wire)-1) + slot := uint16(fastrand(len(m.wire))) wire := m.pipe(slot) resp = wire.DoMulti(ctx, cmd...) for _, r := range resp.s { @@ -346,3 +347,16 @@ func (m *mux) Addr() string { func isBroken(err error, w wire) bool { return err != nil && err != ErrClosing && w.Error() != nil } + +var rngPool = sync.Pool{ + New: func() any { + return rand.New(rand.NewSource(time.Now().UnixNano())) + }, +} + +func fastrand(n int) (r int) { + s := rngPool.Get().(*rand.Rand) + r = s.Intn(n) + rngPool.Put(s) + return +} diff --git a/redis_test.go b/redis_test.go index 576e4a57..0df59187 100644 --- a/redis_test.go +++ b/redis_test.go @@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) { wg.Add(p) for i := 0; i < p; i++ { go func() { + defer func() { + recover() + wg.Done() + }() for fn := range ch { fn() } - wg.Done() }() } return ch, func() { @@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString() if err != nil || val != "OK" { - t.Fatalf("unexpected set response %v %v", val, err) + t.Errorf("unexpected set response %v %v", val, err) } } } @@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString() if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) { - t.Fatalf("unexpected get response %v %v %v", val, err, ok) + t.Errorf("unexpected get response %v %v %v", val, err, ok) } } } @@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) { resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute) val, err := resp.ToString() if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) { - t.Fatalf("unexpected csc get response %v %v %v", val, err, ok) + t.Errorf("unexpected csc get response %v %v %v", val, err, ok) } if resp.IsCacheHit() { atomic.AddInt64(&hits, 1) @@ -175,12 +178,14 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64() if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) { - t.Fatalf("unexpected del response %v %v %v", val, err, ok) + t.Errorf("unexpected del response %v %v %v", val, err, ok) } } } wait() + time.Sleep(time.Second) + t.Logf("testing client side caching after delete\n") jobs, wait = parallel(para) for i := 0; i < keys/100; i++ { @@ -188,10 +193,10 @@ func testSETGET(t *testing.T, client Client, csc bool) { jobs <- func() { resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute) if !IsRedisNil(resp.Error()) { - t.Fatalf("unexpected csc get response after delete %v", resp) + t.Errorf("unexpected csc get response after delete %v", resp) } if resp.IsCacheHit() { - t.Fatalf("unexpected csc cache hit after delete") + t.Errorf("unexpected csc cache hit after delete") } } } @@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) { } wait() + time.Sleep(time.Second) + t.Logf("testing client side caching after delete\n") jobs, wait = parallel(para) for i := 0; i < keys/100; i += batch { @@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) { t.Fatalf("unexpecetd err %v\n", err) } } + time.Sleep(time.Second) + t.Logf("testing client side caching after delete\n") resp, err = MGetCache(client, ctx, time.Minute, cmdKeys) if err != nil { diff --git a/rueidis.go b/rueidis.go index e5c29f05..7cb1d3c6 100644 --- a/rueidis.go +++ b/rueidis.go @@ -5,8 +5,10 @@ import ( "context" "crypto/tls" "errors" + "math" "math/rand" "net" + "runtime" "strings" "time" @@ -319,7 +321,9 @@ func NewClient(option ClientOption) (client Client, err error) { func singleClientMultiplex(multiplex int) int { if multiplex == 0 { - multiplex = 2 + if multiplex = int(math.Log2(float64(runtime.GOMAXPROCS(0)))); multiplex >= 2 { + multiplex = 2 + } } if multiplex < 0 { multiplex = 0