Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve throughput of the Do and DoMulti when -cpu=1 #291

Merged
merged 2 commits into from
Jul 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 24 additions & 3 deletions mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package rueidis

import (
"context"
"math/rand"
"net"
"runtime"
"sync"
Expand Down Expand Up @@ -223,7 +224,7 @@ func (m *mux) blockingMulti(ctx context.Context, cmd []Completed) (resp *redisre
}

func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
slot := cmd.Slot() & uint16(len(m.wire)-1)
slot := slotfn(cmd.Slot(), len(m.wire))
wire := m.pipe(slot)
if resp = wire.Do(ctx, cmd); isBroken(resp.NonRedisError(), wire) {
m.wire[slot].CompareAndSwap(wire, m.init)
Expand All @@ -232,7 +233,7 @@ func (m *mux) pipeline(ctx context.Context, cmd Completed) (resp RedisResult) {
}

func (m *mux) pipelineMulti(ctx context.Context, cmd []Completed) (resp *redisresults) {
slot := cmd[0].Slot() & uint16(len(m.wire)-1)
slot := slotfn(cmd[0].Slot(), len(m.wire))
wire := m.pipe(slot)
resp = wire.DoMulti(ctx, cmd...)
for _, r := range resp.s {
Expand Down Expand Up @@ -311,7 +312,7 @@ func (m *mux) doMultiCache(ctx context.Context, slot uint16, multi []CacheableTT
}

func (m *mux) Receive(ctx context.Context, subscribe Completed, fn func(message PubSubMessage)) error {
slot := subscribe.Slot() & uint16(len(m.wire)-1)
slot := slotfn(subscribe.Slot(), len(m.wire))
wire := m.pipe(slot)
err := wire.Receive(ctx, subscribe, fn)
if isBroken(err, wire) {
Expand Down Expand Up @@ -346,3 +347,23 @@ func (m *mux) Addr() string {
func isBroken(err error, w wire) bool {
return err != nil && err != ErrClosing && w.Error() != nil
}

var rngPool = sync.Pool{
New: func() any {
return rand.New(rand.NewSource(time.Now().UnixNano()))
},
}

func fastrand(n int) (r int) {
s := rngPool.Get().(*rand.Rand)
r = s.Intn(n)
rngPool.Put(s)
return
}

func slotfn(ks uint16, n int) uint16 {
if n == 1 || ks == cmds.NoSlot {
return 0
}
return uint16(fastrand(n))
}
55 changes: 28 additions & 27 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ func (p *pipe) _background() {
}

var (
ones = make([]Completed, 1)
multi []Completed
resps []RedisResult
ch chan RedisResult
cond *sync.Cond
)
Expand All @@ -332,13 +331,12 @@ func (p *pipe) _background() {
_, _, _ = p.queue.NextWriteCmd()
default:
}
if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil {
if multi == nil {
multi = ones
}
for range multi {
ch <- newErrResult(p.Error())
if _, _, ch, resps, cond = p.queue.NextResultCh(); ch != nil {
err := newErrResult(p.Error())
for i := range resps {
resps[i] = err
}
ch <- err
cond.L.Unlock()
cond.Signal()
} else {
Expand Down Expand Up @@ -405,6 +403,7 @@ func (p *pipe) _backgroundRead() (err error) {
cond *sync.Cond
ones = make([]Completed, 1)
multi []Completed
resps []RedisResult
ch chan RedisResult
ff int // fulfilled count
skip int // skip rest push messages
Expand All @@ -415,10 +414,12 @@ func (p *pipe) _backgroundRead() (err error) {
)

defer func() {
resp := newErrResult(err)
if err != nil && ff < len(multi) {
for ; ff < len(multi); ff++ {
ch <- newErrResult(err)
for ; ff < len(resps); ff++ {
resps[ff] = resp
}
ch <- resp
cond.L.Unlock()
cond.Signal()
}
Expand Down Expand Up @@ -462,7 +463,7 @@ func (p *pipe) _backgroundRead() (err error) {
}
if ff == len(multi) {
ff = 0
ones[0], multi, ch, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
ones[0], multi, ch, resps, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
if ch == nil {
cond.L.Unlock()
// Redis will send sunsubscribe notification proactively in the event of slot migration.
Expand Down Expand Up @@ -523,8 +524,12 @@ func (p *pipe) _backgroundRead() (err error) {
} else if multi[ff].NoReply() && msg.string == "QUEUED" {
panic(multiexecsub)
}
ch <- newResult(msg, err)
resp := newResult(msg, err)
if resps != nil {
resps[ff] = resp
}
if ff++; ff == len(multi) {
ch <- resp
cond.L.Unlock()
cond.Signal()
}
Expand Down Expand Up @@ -911,32 +916,28 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
return resp

queue:
ch := p.queue.PutMulti(multi)
ch := p.queue.PutMulti(multi, resp.s)
var i int
if ctxCh := ctx.Done(); ctxCh == nil {
for ; i < len(resp.s); i++ {
resp.s[i] = <-ch
}
<-ch
} else {
for ; i < len(resp.s); i++ {
select {
case resp.s[i] = <-ch:
case <-ctxCh:
goto abort
}
select {
case <-ch:
case <-ctxCh:
goto abort
}
}
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
return resp
abort:
go func(i int) {
for ; i < len(resp.s); i++ {
<-ch
}
go func(i int, resp *redisresults) {
<-ch
resultsp.Put(resp)
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
}(i)
}(i, resp)
resp = resultsp.Get(len(multi), len(multi))
err := newErrResult(ctx.Err())
for ; i < len(resp.s); i++ {
resp.s[i] = err
Expand Down
23 changes: 16 additions & 7 deletions redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ func parallel(p int) (chan func(), func()) {
wg.Add(p)
for i := 0; i < p; i++ {
go func() {
defer func() {
recover()
wg.Done()
}()
for fn := range ch {
fn()
}
wg.Done()
}()
}
return ch, func() {
Expand Down Expand Up @@ -120,7 +123,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Set().Key(key).Value(kvs[key]).Build()).ToString()
if err != nil || val != "OK" {
t.Fatalf("unexpected set response %v %v", val, err)
t.Errorf("unexpected set response %v %v", val, err)
}
}
}
Expand All @@ -133,7 +136,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Get().Key(key).Build()).ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Fatalf("unexpected get response %v %v %v", val, err, ok)
t.Errorf("unexpected get response %v %v %v", val, err, ok)
}
}
}
Expand All @@ -148,7 +151,7 @@ func testSETGET(t *testing.T, client Client, csc bool) {
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
val, err := resp.ToString()
if v, ok := kvs[key]; !((ok && val == v) || (!ok && IsRedisNil(err))) {
t.Fatalf("unexpected csc get response %v %v %v", val, err, ok)
t.Errorf("unexpected csc get response %v %v %v", val, err, ok)
}
if resp.IsCacheHit() {
atomic.AddInt64(&hits, 1)
Expand All @@ -175,23 +178,25 @@ func testSETGET(t *testing.T, client Client, csc bool) {
jobs <- func() {
val, err := client.Do(ctx, client.B().Del().Key(key).Build()).AsInt64()
if _, ok := kvs[key]; !((val == 1 && ok) || (val == 0 && !ok)) {
t.Fatalf("unexpected del response %v %v %v", val, err, ok)
t.Errorf("unexpected del response %v %v %v", val, err, ok)
}
}
}
wait()

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
jobs, wait = parallel(para)
for i := 0; i < keys/100; i++ {
key := strconv.Itoa(i)
jobs <- func() {
resp := client.DoCache(ctx, client.B().Get().Key(key).Cache(), time.Minute)
if !IsRedisNil(resp.Error()) {
t.Fatalf("unexpected csc get response after delete %v", resp)
t.Errorf("unexpected csc get response after delete %v", resp)
}
if resp.IsCacheHit() {
t.Fatalf("unexpected csc cache hit after delete")
t.Errorf("unexpected csc cache hit after delete")
}
}
}
Expand Down Expand Up @@ -320,6 +325,8 @@ func testMultiSETGET(t *testing.T, client Client, csc bool) {
}
wait()

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
jobs, wait = parallel(para)
for i := 0; i < keys/100; i += batch {
Expand Down Expand Up @@ -415,7 +422,9 @@ func testMultiSETGETHelpers(t *testing.T, client Client, csc bool) {
t.Fatalf("unexpecetd err %v\n", err)
}
}

time.Sleep(time.Second)

t.Logf("testing client side caching after delete\n")
resp, err = MGetCache(client, ctx, time.Minute, cmdKeys)
if err != nil {
Expand Down
13 changes: 8 additions & 5 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

type queue interface {
PutOne(m Completed) chan RedisResult
PutMulti(m []Completed) chan RedisResult
PutMulti(m []Completed, resps []RedisResult) chan RedisResult
NextWriteCmd() (Completed, []Completed, chan RedisResult)
WaitForWrite() (Completed, []Completed, chan RedisResult)
NextResultCh() (Completed, []Completed, chan RedisResult, *sync.Cond)
NextResultCh() (Completed, []Completed, chan RedisResult, []RedisResult, *sync.Cond)
}

var _ queue = (*ring)(nil)
Expand Down Expand Up @@ -46,6 +46,7 @@ type node struct {
ch chan RedisResult
one Completed
multi []Completed
resps []RedisResult
mark uint32
slept bool
}
Expand All @@ -58,6 +59,7 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
}
n.one = m
n.multi = nil
n.resps = nil
n.mark = 1
s := n.slept
n.c1.L.Unlock()
Expand All @@ -67,14 +69,15 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
return n.ch
}

func (r *ring) PutMulti(m []Completed) chan RedisResult {
func (r *ring) PutMulti(m []Completed, resps []RedisResult) chan RedisResult {
n := &r.store[atomic.AddUint64(&r.write, 1)&r.mask]
n.c1.L.Lock()
for n.mark != 0 {
n.c1.Wait()
}
n.one = Completed{}
n.multi = m
n.resps = resps
n.mark = 1
s := n.slept
n.c1.L.Unlock()
Expand Down Expand Up @@ -118,14 +121,14 @@ func (r *ring) WaitForWrite() (one Completed, multi []Completed, ch chan RedisRe
}

// NextResultCh should be only called by one dedicated thread
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, cond *sync.Cond) {
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, resps []RedisResult, cond *sync.Cond) {
r.read2++
p := r.read2 & r.mask
n := &r.store[p]
cond = n.c1
n.c1.L.Lock()
if n.mark == 2 {
one, multi, ch = n.one, n.multi, n.ch
one, multi, ch, resps = n.one, n.multi, n.ch, n.resps
n.mark = 0
} else {
r.read2--
Expand Down
16 changes: 8 additions & 8 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRing(t *testing.T) {
runtime.Gosched()
continue
}
cmd2, _, ch, cond := ring.NextResultCh()
cmd2, _, ch, _, cond := ring.NextResultCh()
cond.L.Unlock()
cond.Signal()
if cmd1.Commands()[0] != cmd2.Commands()[0] {
Expand All @@ -53,7 +53,7 @@ func TestRing(t *testing.T) {

base := [][]string{{"a"}, {"b"}, {"c"}, {"d"}}
for cmd := range fixture {
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)))
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)), nil)
}

for len(fixture) != 0 {
Expand All @@ -62,7 +62,7 @@ func TestRing(t *testing.T) {
runtime.Gosched()
continue
}
_, cmd2, ch, cond := ring.NextResultCh()
_, cmd2, ch, _, cond := ring.NextResultCh()
cond.L.Unlock()
cond.Signal()
for j := 0; j < len(cmd1); j++ {
Expand All @@ -82,7 +82,7 @@ func TestRing(t *testing.T) {
if one, multi, _ := ring.NextWriteCmd(); !one.IsEmpty() || multi != nil {
t.Fatalf("NextWriteCmd should returns nil if empty")
}
if one, multi, ch, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
if one, multi, ch, _, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
t.Fatalf("NextResultCh should returns nil if not NextWriteCmd yet")
} else {
cond.L.Unlock()
Expand All @@ -93,18 +93,18 @@ func TestRing(t *testing.T) {
if one, _, _ := ring.NextWriteCmd(); len(one.Commands()) == 0 || one.Commands()[0] != "0" {
t.Fatalf("NextWriteCmd should returns next cmd")
}
if one, _, ch, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
if one, _, ch, _, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
} else {
cond.L.Unlock()
cond.Signal()
}

ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}))
ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}), nil)
if _, multi, _ := ring.NextWriteCmd(); len(multi) == 0 || multi[0].Commands()[0] != "0" {
t.Fatalf("NextWriteCmd should returns next cmd")
}
if _, multi, ch, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
if _, multi, ch, _, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
} else {
cond.L.Unlock()
Expand All @@ -131,7 +131,7 @@ func TestRing(t *testing.T) {
if _, multi, ch := ring.NextWriteCmd(); ch == nil {
go func() {
time.Sleep(time.Millisecond * 100)
ring.PutMulti([]Completed{cmds.QuitCmd})
ring.PutMulti([]Completed{cmds.QuitCmd}, nil)
}()
if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.QuitCmd.Commands()[0] {
return
Expand Down
Loading