Skip to content

Commit

Permalink
perf: improve DoMulti by reducing chansends
Browse files Browse the repository at this point in the history
  • Loading branch information
rueian committed Jul 1, 2023
1 parent 47c3d7c commit 2911b22
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 40 deletions.
55 changes: 28 additions & 27 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,7 @@ func (p *pipe) _background() {
}

var (
ones = make([]Completed, 1)
multi []Completed
resps []RedisResult
ch chan RedisResult
cond *sync.Cond
)
Expand All @@ -332,13 +331,12 @@ func (p *pipe) _background() {
_, _, _ = p.queue.NextWriteCmd()
default:
}
if ones[0], multi, ch, cond = p.queue.NextResultCh(); ch != nil {
if multi == nil {
multi = ones
}
for range multi {
ch <- newErrResult(p.Error())
if _, _, ch, resps, cond = p.queue.NextResultCh(); ch != nil {
err := newErrResult(p.Error())
for i := range resps {
resps[i] = err
}
ch <- err
cond.L.Unlock()
cond.Signal()
} else {
Expand Down Expand Up @@ -405,6 +403,7 @@ func (p *pipe) _backgroundRead() (err error) {
cond *sync.Cond
ones = make([]Completed, 1)
multi []Completed
resps []RedisResult
ch chan RedisResult
ff int // fulfilled count
skip int // skip rest push messages
Expand All @@ -415,10 +414,12 @@ func (p *pipe) _backgroundRead() (err error) {
)

defer func() {
resp := newErrResult(err)
if err != nil && ff < len(multi) {
for ; ff < len(multi); ff++ {
ch <- newErrResult(err)
for ; ff < len(resps); ff++ {
resps[ff] = resp
}
ch <- resp
cond.L.Unlock()
cond.Signal()
}
Expand Down Expand Up @@ -462,7 +463,7 @@ func (p *pipe) _backgroundRead() (err error) {
}
if ff == len(multi) {
ff = 0
ones[0], multi, ch, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
ones[0], multi, ch, resps, cond = p.queue.NextResultCh() // ch should not be nil, otherwise it must be a protocol bug
if ch == nil {
cond.L.Unlock()
// Redis will send sunsubscribe notification proactively in the event of slot migration.
Expand Down Expand Up @@ -523,8 +524,12 @@ func (p *pipe) _backgroundRead() (err error) {
} else if multi[ff].NoReply() && msg.string == "QUEUED" {
panic(multiexecsub)
}
ch <- newResult(msg, err)
resp := newResult(msg, err)
if resps != nil {
resps[ff] = resp
}
if ff++; ff == len(multi) {
ch <- resp
cond.L.Unlock()
cond.Signal()
}
Expand Down Expand Up @@ -911,32 +916,28 @@ func (p *pipe) DoMulti(ctx context.Context, multi ...Completed) *redisresults {
return resp

queue:
ch := p.queue.PutMulti(multi)
ch := p.queue.PutMulti(multi, resp.s)
var i int
if ctxCh := ctx.Done(); ctxCh == nil {
for ; i < len(resp.s); i++ {
resp.s[i] = <-ch
}
<-ch
} else {
for ; i < len(resp.s); i++ {
select {
case resp.s[i] = <-ch:
case <-ctxCh:
goto abort
}
select {
case <-ch:
case <-ctxCh:
goto abort
}
}
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
return resp
abort:
go func(i int) {
for ; i < len(resp.s); i++ {
<-ch
}
go func(i int, resp *redisresults) {
<-ch
resultsp.Put(resp)
atomic.AddInt32(&p.waits, -1)
atomic.AddInt32(&p.recvs, 1)
}(i)
}(i, resp)
resp = resultsp.Get(len(multi), len(multi))
err := newErrResult(ctx.Err())
for ; i < len(resp.s); i++ {
resp.s[i] = err
Expand Down
13 changes: 8 additions & 5 deletions ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (

type queue interface {
PutOne(m Completed) chan RedisResult
PutMulti(m []Completed) chan RedisResult
PutMulti(m []Completed, resps []RedisResult) chan RedisResult
NextWriteCmd() (Completed, []Completed, chan RedisResult)
WaitForWrite() (Completed, []Completed, chan RedisResult)
NextResultCh() (Completed, []Completed, chan RedisResult, *sync.Cond)
NextResultCh() (Completed, []Completed, chan RedisResult, []RedisResult, *sync.Cond)
}

var _ queue = (*ring)(nil)
Expand Down Expand Up @@ -46,6 +46,7 @@ type node struct {
ch chan RedisResult
one Completed
multi []Completed
resps []RedisResult
mark uint32
slept bool
}
Expand All @@ -58,6 +59,7 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
}
n.one = m
n.multi = nil
n.resps = nil
n.mark = 1
s := n.slept
n.c1.L.Unlock()
Expand All @@ -67,14 +69,15 @@ func (r *ring) PutOne(m Completed) chan RedisResult {
return n.ch
}

func (r *ring) PutMulti(m []Completed) chan RedisResult {
func (r *ring) PutMulti(m []Completed, resps []RedisResult) chan RedisResult {
n := &r.store[atomic.AddUint64(&r.write, 1)&r.mask]
n.c1.L.Lock()
for n.mark != 0 {
n.c1.Wait()
}
n.one = Completed{}
n.multi = m
n.resps = resps
n.mark = 1
s := n.slept
n.c1.L.Unlock()
Expand Down Expand Up @@ -118,14 +121,14 @@ func (r *ring) WaitForWrite() (one Completed, multi []Completed, ch chan RedisRe
}

// NextResultCh should be only called by one dedicated thread
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, cond *sync.Cond) {
func (r *ring) NextResultCh() (one Completed, multi []Completed, ch chan RedisResult, resps []RedisResult, cond *sync.Cond) {
r.read2++
p := r.read2 & r.mask
n := &r.store[p]
cond = n.c1
n.c1.L.Lock()
if n.mark == 2 {
one, multi, ch = n.one, n.multi, n.ch
one, multi, ch, resps = n.one, n.multi, n.ch, n.resps
n.mark = 0
} else {
r.read2--
Expand Down
16 changes: 8 additions & 8 deletions ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRing(t *testing.T) {
runtime.Gosched()
continue
}
cmd2, _, ch, cond := ring.NextResultCh()
cmd2, _, ch, _, cond := ring.NextResultCh()
cond.L.Unlock()
cond.Signal()
if cmd1.Commands()[0] != cmd2.Commands()[0] {
Expand All @@ -53,7 +53,7 @@ func TestRing(t *testing.T) {

base := [][]string{{"a"}, {"b"}, {"c"}, {"d"}}
for cmd := range fixture {
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)))
go ring.PutMulti(cmds.NewMultiCompleted(append([][]string{{cmd}}, base...)), nil)
}

for len(fixture) != 0 {
Expand All @@ -62,7 +62,7 @@ func TestRing(t *testing.T) {
runtime.Gosched()
continue
}
_, cmd2, ch, cond := ring.NextResultCh()
_, cmd2, ch, _, cond := ring.NextResultCh()
cond.L.Unlock()
cond.Signal()
for j := 0; j < len(cmd1); j++ {
Expand All @@ -82,7 +82,7 @@ func TestRing(t *testing.T) {
if one, multi, _ := ring.NextWriteCmd(); !one.IsEmpty() || multi != nil {
t.Fatalf("NextWriteCmd should returns nil if empty")
}
if one, multi, ch, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
if one, multi, ch, _, cond := ring.NextResultCh(); !one.IsEmpty() || multi != nil || ch != nil {
t.Fatalf("NextResultCh should returns nil if not NextWriteCmd yet")
} else {
cond.L.Unlock()
Expand All @@ -93,18 +93,18 @@ func TestRing(t *testing.T) {
if one, _, _ := ring.NextWriteCmd(); len(one.Commands()) == 0 || one.Commands()[0] != "0" {
t.Fatalf("NextWriteCmd should returns next cmd")
}
if one, _, ch, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
if one, _, ch, _, cond := ring.NextResultCh(); len(one.Commands()) == 0 || one.Commands()[0] != "0" || ch == nil {
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
} else {
cond.L.Unlock()
cond.Signal()
}

ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}))
ring.PutMulti(cmds.NewMultiCompleted([][]string{{"0"}}), nil)
if _, multi, _ := ring.NextWriteCmd(); len(multi) == 0 || multi[0].Commands()[0] != "0" {
t.Fatalf("NextWriteCmd should returns next cmd")
}
if _, multi, ch, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
if _, multi, ch, _, cond := ring.NextResultCh(); len(multi) == 0 || multi[0].Commands()[0] != "0" || ch == nil {
t.Fatalf("NextResultCh should returns next cmd after NextWriteCmd")
} else {
cond.L.Unlock()
Expand All @@ -131,7 +131,7 @@ func TestRing(t *testing.T) {
if _, multi, ch := ring.NextWriteCmd(); ch == nil {
go func() {
time.Sleep(time.Millisecond * 100)
ring.PutMulti([]Completed{cmds.QuitCmd})
ring.PutMulti([]Completed{cmds.QuitCmd}, nil)
}()
if _, multi, ch = ring.WaitForWrite(); ch != nil && multi[0].Commands()[0] == cmds.QuitCmd.Commands()[0] {
return
Expand Down

0 comments on commit 2911b22

Please sign in to comment.