Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distsql: control the sending rate of copIteratorTaskSender if keepOrder is true #11578

Merged
merged 9 commits into from
Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
Expand Down
55 changes: 51 additions & 4 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable
// Make sure that there is at least one worker.
it.concurrency = 1
}
if !it.req.KeepOrder {
if it.req.KeepOrder {
it.sendRate = newRateLimit(2 * it.concurrency)
} else {
it.respChan = make(chan *copResponse, it.concurrency)
}
it.open(ctx)
Expand Down Expand Up @@ -222,9 +224,11 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, desc bo
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: region,
ranges: ranges.slice(i, nextI),
respChan: make(chan *copResponse, 1),
region: region,
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
cmdType: cmdType,
})
i = nextI
Expand Down Expand Up @@ -334,6 +338,9 @@ type copIterator struct {
// If keepOrder, results are stored in copTask.respChan, read them out one by one.
tasks []*copTask
curr int
// sendRate controls the sending rate of copIteratorTaskSender, if keepOrder,
// to prevent all tasks being done (aka. all of the responses are buffered)
sendRate *rateLimit

// Otherwise, results are stored in respChan.
respChan chan *copResponse
Expand Down Expand Up @@ -364,6 +371,7 @@ type copIteratorTaskSender struct {
tasks []*copTask
finishCh <-chan struct{}
respChan chan<- *copResponse
sendRate *rateLimit
}

type copResponse struct {
Expand Down Expand Up @@ -464,6 +472,7 @@ func (it *copIterator) open(ctx context.Context) {
wg: &it.wg,
tasks: it.tasks,
finishCh: it.finishCh,
sendRate: it.sendRate,
}
taskSender.respChan = it.respChan
go taskSender.run()
Expand All @@ -472,6 +481,16 @@ func (it *copIterator) open(ctx context.Context) {
func (sender *copIteratorTaskSender) run() {
// Send tasks to feed the worker goroutines.
for _, t := range sender.tasks {
// If keepOrder, we must control the sending rate to prevent all tasks
// being done (aka. all of the responses are buffered) by copIteratorWorker.
// We keep the number of inflight tasks within the number of concurrency * 2.
// It sends one more task if a task has been finished in copIterator.Next.
if sender.sendRate != nil {
exit := sender.sendRate.getToken(sender.finishCh)
if exit {
break
}
}
exit := sender.sendToTaskCh(t)
if exit {
break
Expand Down Expand Up @@ -559,6 +578,7 @@ func (it *copIterator) Next(ctx context.Context) (kv.ResultSubset, error) {
// Switch to next task.
it.tasks[it.curr] = nil
it.curr++
it.sendRate.putToken()
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -832,6 +852,33 @@ func (it *copIterator) Close() error {
return nil
}

type rateLimit struct {
token chan struct{}
}

func newRateLimit(n int) *rateLimit {
return &rateLimit{
token: make(chan struct{}, n),
}
}

func (r *rateLimit) getToken(done <-chan struct{}) (exit bool) {
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-done:
return true
case r.token <- struct{}{}:
return false
}
}

func (r *rateLimit) putToken() {
select {
case <-r.token:
default:
panic("put a redundant token")
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
}
}

// copErrorResponse returns error when calling Next()
type copErrorResponse struct{ error }

Expand Down
27 changes: 27 additions & 0 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package tikv

import (
"context"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/tidb/kv"
Expand Down Expand Up @@ -294,6 +295,32 @@ func (s *testCoprocessorSuite) TestCopRangeSplit(c *C) {
)
}

func (s *testCoprocessorSuite) TestRateLimit(c *C) {
done := make(chan struct{}, 1)
rl := newRateLimit(1)
c.Assert(rl.putToken, PanicMatches, "put a redundant token")
exit := rl.getToken(done)
c.Assert(exit, Equals, false)
rl.putToken()
c.Assert(rl.putToken, PanicMatches, "put a redundant token")

exit = rl.getToken(done)
c.Assert(exit, Equals, false)
done <- struct{}{}
exit = rl.getToken(done) // blocked but exit
c.Assert(exit, Equals, true)

sig := make(chan int, 1)
go func() {
exit = rl.getToken(done) // blocked
c.Assert(exit, Equals, false)
close(sig)
}()
time.Sleep(200 * time.Millisecond)
rl.putToken()
<-sig
}

type splitCase struct {
key string
*copRanges
Expand Down