Skip to content

Commit

Permalink
coprocessor: fix wrong cop task range for tiflash. (pingcap#13292)
Browse files Browse the repository at this point in the history
  • Loading branch information
lzmhhh123 committed Jan 19, 2020
1 parent df87a51 commit 38e586d
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 16 deletions.
19 changes: 18 additions & 1 deletion store/tikv/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,15 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -255,6 +258,13 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
if req.Streaming {
cmdType = tikvrpc.CmdCopStream
}
var tableStart, tableEnd kv.Key
if req.StoreType == kv.TiFlash {
tableID := tablecodec.DecodeTableID(ranges.at(0).StartKey)
fullRange := ranger.FullIntRange(false)
keyRange := distsql.TableRangesToKVRanges(tableID, fullRange, nil)
tableStart, tableEnd = keyRange[0].StartKey, keyRange[0].EndKey
}

var tasks []*copTask
appendTask := func(regionWithRangeInfo *KeyLocation, ranges *copRanges) {
Expand All @@ -276,7 +286,14 @@ func buildCopTasks(bo *Backoffer, cache *RegionCache, ranges *copRanges, req *kv
i = nextI
}
} else if req.StoreType == kv.TiFlash {
fullRange := kv.KeyRange{StartKey: regionWithRangeInfo.StartKey, EndKey: regionWithRangeInfo.EndKey}
left, right := regionWithRangeInfo.StartKey, regionWithRangeInfo.EndKey
if bytes.Compare(tableStart, left) >= 0 {
left = tableStart
}
if bytes.Compare(tableEnd, right) <= 0 || len(right) == 0 {
right = tableEnd
}
fullRange := kv.KeyRange{StartKey: left, EndKey: right}
tasks = append(tasks, &copTask{
region: regionWithRangeInfo.Region,
// TiFlash only support full range scan for the region, ignore the real ranges
Expand Down
30 changes: 15 additions & 15 deletions store/tikv/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), req)
c.Assert(err, IsNil)
Expand All @@ -60,7 +60,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), req)
c.Assert(err, IsNil)
Expand All @@ -70,7 +70,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("m", "n"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), req)
c.Assert(err, IsNil)
Expand All @@ -81,8 +81,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "k"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), req)
c.Assert(err, IsNil)
Expand All @@ -95,10 +95,10 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "x"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 4)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t", "")
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")
s.taskEqual(c, tasks[1], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[2], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
s.taskEqual(c, tasks[3], regionIDs[3], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t\x80\x00\x00\x00\x00\x00\x00\x00_r\xff\xff\xff\xff\xff\xff\xff\xff\x00")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), req)
c.Assert(err, IsNil)
Expand All @@ -108,7 +108,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "b", "c"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), req)
c.Assert(err, IsNil)
Expand All @@ -118,7 +118,7 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("a", "b", "e", "f"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 1)
s.taskEqual(c, tasks[0], regionIDs[0], "", "g")
s.taskEqual(c, tasks[0], regionIDs[0], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "g")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), req)
c.Assert(err, IsNil)
Expand All @@ -129,8 +129,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("g", "n", "o", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")

tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), req)
c.Assert(err, IsNil)
Expand All @@ -141,8 +141,8 @@ func (s *testCoprocessorSuite) TestBuildTasks(c *C) {
tasks, err = buildCopTasks(bo, cache, buildCopRanges("h", "k", "m", "p"), flashReq)
c.Assert(err, IsNil)
c.Assert(tasks, HasLen, 2)
s.taskEqual(c, tasks[0], regionIDs[1], "g", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "n", "t")
s.taskEqual(c, tasks[0], regionIDs[1], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "n")
s.taskEqual(c, tasks[1], regionIDs[2], "t\x80\x00\x00\x00\x00\x00\x00\x00_r\x00\x00\x00\x00\x00\x00\x00\x00", "t")
}

func (s *testCoprocessorSuite) TestSplitRegionRanges(c *C) {
Expand Down

0 comments on commit 38e586d

Please sign in to comment.