Skip to content

Commit

Permalink
coprocessor: let tiflash split range task consistent with tikv (#14710)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed Feb 14, 2020
1 parent 36d2dbb commit a5e0660
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 76 deletions.
6 changes: 0 additions & 6 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/set"
)

Expand Down Expand Up @@ -150,10 +148,6 @@ func (p *PhysicalTableScan) explainInfo(normalized bool) string {
} else if len(p.Ranges) > 0 {
if normalized {
fmt.Fprint(buffer, ", range:[?,?]")
} else if p.StoreType == kv.TiFlash {
// TiFlash table always use full range scan for each region,
// the ranges in p.Ranges is used to prune cop task
fmt.Fprintf(buffer, ", range:"+ranger.FullIntRange(false)[0].String())
} else {
fmt.Fprint(buffer, ", range:")
for i, idxRange := range p.Ranges {
Expand Down
6 changes: 0 additions & 6 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,12 +1082,6 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper
filterCondition: path.TableFilters,
StoreType: path.StoreType,
}.Init(ds.ctx, ds.blockOffset)
if ts.StoreType == kv.TiFlash {
// Append the AccessCondition to filterCondition because TiFlash only support full range scan for each
// region, do not reset ts.Ranges as it will help prune regions during `buildCopTasks`
ts.filterCondition = append(ts.filterCondition, ts.AccessCondition...)
ts.AccessCondition = nil
}
ts.SetSchema(ds.schema.Clone())
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
Expand Down
5 changes: 2 additions & 3 deletions planner/core/testdata/analyze_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@
"└─TableScan_5 2.00 cop[tikv] table:t, range:[1,1], [2,2], keep order:false, stats:pseudo"
],
[
"TableReader_7 2.00 root data:Selection_6",
"└─Selection_6 2.00 cop[tiflash] or(eq(test.t.a, 1), eq(test.t.a, 2))",
" └─TableScan_5 2.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
"TableReader_6 2.00 root data:TableScan_5",
"└─TableScan_5 2.00 cop[tiflash] table:t, range:[1,1], [2,2], keep order:false, stats:pseudo"
]
]
},
Expand Down
10 changes: 4 additions & 6 deletions planner/core/testdata/integration_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -262,18 +262,16 @@
{
"SQL": "desc select * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55)",
"Plan": [
"TableReader_9 44.00 root data:Selection_8",
"└─Selection_8 44.00 cop[tiflash] or(and(gt(test.tt.a, 1), lt(test.tt.a, 20)), and(ge(test.tt.a, 30), lt(test.tt.a, 55)))",
" └─TableScan_7 44.00 cop[tiflash] table:tt, range:[-inf,+inf], keep order:false, stats:pseudo"
"TableReader_8 44.00 root data:TableScan_7",
"└─TableScan_7 44.00 cop[tiflash] table:tt, range:(1,20), [30,55), keep order:false, stats:pseudo"
],
"Warn": null
},
{
"SQL": "desc select /*+ read_from_storage(tiflash[tt]) */ * from tt where (tt.a > 1 and tt.a < 20) or (tt.a >= 30 and tt.a < 55)",
"Plan": [
"TableReader_7 44.00 root data:Selection_6",
"└─Selection_6 44.00 cop[tiflash] or(and(gt(test.tt.a, 1), lt(test.tt.a, 20)), and(ge(test.tt.a, 30), lt(test.tt.a, 55)))",
" └─TableScan_5 44.00 cop[tiflash] table:tt, range:[-inf,+inf], keep order:false, stats:pseudo"
"TableReader_6 44.00 root data:TableScan_5",
"└─TableScan_5 44.00 cop[tiflash] table:tt, range:(1,20), [30,55), keep order:false, stats:pseudo"
],
"Warn": null
},
Expand Down
47 changes: 7 additions & 40 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,13 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -223,13 +220,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
if req.Streaming {
cmdType = tikvrpc.CmdCopStream
}
var tableStart, tableEnd kv.Key
if req.StoreType == kv.TiFlash {
tableID := tablecodec.DecodeTableID(ranges.at(0).StartKey)
fullRange := ranger.FullIntRange(false)
keyRange := distsql.TableRangesToKVRanges(tableID, fullRange, nil)
tableStart, tableEnd = keyRange[0].StartKey, keyRange[0].EndKey
}

if req.StoreType == kv.TiDB {
return buildTiDBMemCopTasks(ranges, req)
Expand All @@ -238,44 +228,21 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
rangesLen := ranges.len()
var tasks []*copTask
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
if req.StoreType == kv.TiKV {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
} else if req.StoreType == kv.TiFlash {
left, right := regionWithRangeInfo.StartKey, regionWithRangeInfo.EndKey
if bytes.Compare(tableStart, left) >= 0 {
left = tableStart
}
if bytes.Compare(tableEnd, right) <= 0 || len(right) == 0 {
right = tableEnd
}
fullRange := kv.KeyRange{StartKey: left, EndKey: right}
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
// TiFlash only support full range scan for the region, ignore the real ranges
// does not affect the correctness because we already merge the access range condition
// into filter condition in `getOriginalPhysicalTableScan`
ranges: &copRanges{mid: []kv.KeyRange{fullRange}},
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
}

Expand Down
30 changes: 15 additions & 15 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
c.Assert(err, IsNil)
Expand All @@ -60,7 +60,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
c.Assert(err, IsNil)
Expand All @@ -70,7 +70,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
c.Assert(err, IsNil)
Expand All @@ -81,8 +81,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
c.Assert(err, IsNil)
Expand All @@ -95,10 +95,10 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t\x80\x00\x00\x00\x00\x00\x00\x00_r\xff\xff\xff\xff\xff\xff\xff\xff\x00")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
c.Assert(err, IsNil)
Expand All @@ -108,7 +108,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
c.Assert(err, IsNil)
Expand All @@ -118,7 +118,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
c.Assert(err, IsNil)
Expand All @@ -129,8 +129,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
c.Assert(err, IsNil)
Expand All @@ -141,8 +141,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")
}

func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) {
Expand Down

0 comments on commit a5e0660

Please sign in to comment.