Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coprocessor: let tiflash split range task consistent with tikv #14710

Merged
6 changes: 0 additions & 6 deletions planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tidb/util/set"
)

Expand Down Expand Up @@ -150,10 +148,6 @@ func (p *PhysicalTableScan) explainInfo(normalized bool) string {
} else if len(p.Ranges) > 0 {
if normalized {
fmt.Fprint(buffer, ", range:[?,?]")
} else if p.StoreType == kv.TiFlash {
// TiFlash table always use full range scan for each region,
// the ranges in p.Ranges is used to prune cop task
fmt.Fprintf(buffer, ", range:"+ranger.FullIntRange(false)[0].String())
} else {
fmt.Fprint(buffer, ", range:")
for i, idxRange := range p.Ranges {
Expand Down
6 changes: 0 additions & 6 deletions planner/core/find_best_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,12 +1082,6 @@ func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProper
filterCondition: path.TableFilters,
StoreType: path.StoreType,
}.Init(ds.ctx, ds.blockOffset)
if ts.StoreType == kv.TiFlash {
// Append the AccessCondition to filterCondition because TiFlash only support full range scan for each
// region, do not reset ts.Ranges as it will help prune regions during `buildCopTasks`
ts.filterCondition = append(ts.filterCondition, ts.AccessCondition...)
ts.AccessCondition = nil
}
ts.SetSchema(ds.schema.Clone())
if ts.Table.PKIsHandle {
if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
Expand Down
5 changes: 2 additions & 3 deletions planner/core/testdata/analyze_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,8 @@
"└─TableScan_5 2.00 cop[tikv] table:t, range:[1,1], [2,2], keep order:false, stats:pseudo"
],
[
"TableReader_7 2.00 root data:Selection_6",
"└─Selection_6 2.00 cop[tiflash] or(eq(test.t.a, 1), eq(test.t.a, 2))",
" └─TableScan_5 2.00 cop[tiflash] table:t, range:[-inf,+inf], keep order:false, stats:pseudo"
"TableReader_6 2.00 root data:TableScan_5",
"└─TableScan_5 2.00 cop[tiflash] table:t, range:[1,1], [2,2], keep order:false, stats:pseudo"
]
]
},
Expand Down
47 changes: 7 additions & 40 deletions store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,14 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -223,13 +220,6 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
if req.Streaming {
cmdType = tikvrpc.CmdCopStream
}
var tableStart, tableEnd kv.Key
if req.StoreType == kv.TiFlash {
tableID := tablecodec.DecodeTableID(ranges.at(0).StartKey)
fullRange := ranger.FullIntRange(false)
keyRange := distsql.TableRangesToKVRanges(tableID, fullRange, nil)
tableStart, tableEnd = keyRange[0].StartKey, keyRange[0].EndKey
}

if req.StoreType == kv.TiDB {
return buildTiDBMemCopTasks(ranges, req)
Expand All @@ -238,44 +228,21 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
rangesLen := ranges.len()
var tasks []*copTask
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
if req.StoreType == kv.TiKV {
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
} else if req.StoreType == kv.TiFlash {
left, right := regionWithRangeInfo.StartKey, regionWithRangeInfo.EndKey
if bytes.Compare(tableStart, left) >= 0 {
left = tableStart
}
if bytes.Compare(tableEnd, right) <= 0 || len(right) == 0 {
right = tableEnd
}
fullRange := kv.KeyRange{StartKey: left, EndKey: right}
// TiKV will return gRPC error if the message is too large. So we need to limit the length of the ranges slice
// to make sure the message can be sent successfully.
rLen := ranges.len()
for i := 0; i < rLen; {
nextI := mathutil.Min(i+rangesPerTask, rLen)
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
// TiFlash only support full range scan for the region, ignore the real ranges
// does not affect the correctness because we already merge the access range condition
// into filter condition in `getOriginalPhysicalTableScan`
ranges: &copRanges{mid: []kv.KeyRange{fullRange}},
ranges: ranges.slice(i, nextI),
// Channel buffer is 2 for handling region split.
// In a common case, two region split tasks will not be blocked.
respChan: make(chan *copResponse, 2),
cmdType: cmdType,
storeType: req.StoreType,
})
i = nextI
}
}

Expand Down
30 changes: 15 additions & 15 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
c.Assert(err, IsNil)
Expand All @@ -60,7 +60,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
c.Assert(err, IsNil)
Expand All @@ -70,7 +70,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "m", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
c.Assert(err, IsNil)
Expand All @@ -81,8 +81,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "k")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
c.Assert(err, IsNil)
Expand All @@ -95,10 +95,10 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t\x80\x00\x00\x00\x00\x00\x00\x00_r\xff\xff\xff\xff\xff\xff\xff\xff\x00")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "x")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
c.Assert(err, IsNil)
Expand All @@ -108,7 +108,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "b", "c")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
c.Assert(err, IsNil)
Expand All @@ -118,7 +118,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "a", "b", "e", "f")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
c.Assert(err, IsNil)
Expand All @@ -129,8 +129,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "o", "p")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
c.Assert(err, IsNil)
Expand All @@ -141,8 +141,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[0], regionIDs[1], "h", "k", "m", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "p")
}

func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) {
Expand Down