Skip to content

Commit

Permalink
planner, executor: add extraProj for indexMerge with orderBy + limit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
Defined2014 committed May 9, 2023
1 parent 0d3ad53 commit ec03200
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 23 deletions.
2 changes: 1 addition & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -4064,7 +4064,7 @@ func buildNoRangeIndexMergeReader(b *executorBuilder, v *plannercore.PhysicalInd

if is, ok := v.PartialPlans[i][0].(*plannercore.PhysicalIndexScan); ok {
handleLen := ts.HandleCols.NumCols()
if len(is.ByItems) != 0 && is.Table.Partition != nil {
if len(is.ByItems) != 0 && is.Table.Partition != nil && b.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
handleLen += 1
}
tempReq, err = buildIndexReq(b.ctx, is.Index.Columns, handleLen, v.PartialPlans[i])
Expand Down
17 changes: 14 additions & 3 deletions executor/test/indexmergereadtest/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,11 @@ func TestOrderByWithLimit(t *testing.T) {
tk.MustExec(fmt.Sprintf("insert into tpkhash(a,b,c) values %s", valInserted))

for i := 0; i < 100; i++ {
if i%2 == 0 {
tk.MustExec("set tidb_partition_prune_mode = `static-only`")
} else {
tk.MustExec("set tidb_partition_prune_mode = `dynamic-only`")
}
a := rand.Intn(32)
b := rand.Intn(32)
limit := rand.Intn(10) + 1
Expand Down Expand Up @@ -1032,17 +1037,23 @@ func TestOrderByWithLimit(t *testing.T) {
queryHash := fmt.Sprintf("select /*+ use_index_merge(thash, idx_ac, idx_bc) */ * from thash where a = %v or b = %v order by c limit %v", a, b, limit)
resHash := tk.MustQuery(queryHash).Rows()
require.True(t, tk.HasPlan(queryHash, "IndexMerge"))
require.False(t, tk.HasPlan(queryHash, "TopN"))
if i%2 == 1 {
require.False(t, tk.HasPlan(queryHash, "TopN"))
}

queryCommonHash := fmt.Sprintf("select /*+ use_index_merge(tcommonhash, primary, idx_bc) */ * from tcommonhash where a = %v or b = %v order by c limit %v", a, b, limit)
resCommonHash := tk.MustQuery(queryCommonHash).Rows()
require.True(t, tk.HasPlan(queryCommonHash, "IndexMerge"))
require.False(t, tk.HasPlan(queryCommonHash, "TopN"))
if i%2 == 1 {
require.False(t, tk.HasPlan(queryCommonHash, "TopN"))
}

queryPKHash := fmt.Sprintf("select /*+ use_index_merge(tpkhash, idx_ac, idx_bc) */ * from tpkhash where a = %v or b = %v order by c limit %v", a, b, limit)
resPKHash := tk.MustQuery(queryPKHash).Rows()
require.True(t, tk.HasPlan(queryPKHash, "IndexMerge"))
require.False(t, tk.HasPlan(queryPKHash, "TopN"))
if i%2 == 1 {
require.False(t, tk.HasPlan(queryPKHash, "TopN"))
}

sliceRes := getResult(valueSlice, a, b, limit, false)

Expand Down
3 changes: 3 additions & 0 deletions planner/core/casetest/physicalplantest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2577,12 +2577,15 @@ func TestIndexMergeOrderPushDown(t *testing.T) {
tk.MustExec("set tidb_cost_model_version=1")
tk.MustExec("create table t (a int, b int, c int, index idx(a, c), index idx2(b, c))")
tk.MustExec("create table tcommon (a int, b int, c int, primary key(a, c), index idx2(b, c))")
tk.MustExec("create table thash(a int, b int, c int, index idx_ac(a, c), index idx_bc(b, c)) PARTITION BY HASH (`a`) PARTITIONS 4")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
output[i].Warning = testdata.ConvertRowsToStrings(tk.MustQuery("show warnings").Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"explain select /*+ shuffle_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a",
"explain select /*+ broadcast_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"
]
},
},
{
"name": "TestMPPBCJModel",
"cases": [
Expand Down Expand Up @@ -1333,7 +1333,8 @@
"select * from t where (a = 1 or b = 2) and c = 3 order by c limit 2",
"select * from t where (a = 1 or b = 2) and c in (1, 2, 3) order by c limit 2",
"select * from tcommon where a = 1 or b = 1 order by c limit 2",
"select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2"
"select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2",
"select * from thash use index(idx_ac, idx_bc) where a = 1 or b = 1 order by c limit 2"
]
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -8497,7 +8497,7 @@
{
"SQL": "select * from t where a = 1 or b = 1 order by c limit 2",
"Plan": [
"Projection 2.00 root test.t.a, test.t.b, test.t.c",
"Projection 19.99 root test.t.a, test.t.b, test.t.c",
"└─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 2.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:true, stats:pseudo",
Expand Down Expand Up @@ -8546,7 +8546,7 @@
{
"SQL": "select * from t where (a = 1 and c = 2) or (b = 1) order by c limit 2",
"Plan": [
"Projection 2.00 root test.t.a, test.t.b, test.t.c",
"Projection 10.10 root test.t.a, test.t.b, test.t.c",
"└─IndexMerge 10.10 root type: union, limit embedded(offset:0, count:2)",
" ├─Limit(Build) 0.10 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 0.10 cop[tikv] table:t, index:idx(a, c) range:[1 2,1 2], keep order:true, stats:pseudo",
Expand Down Expand Up @@ -8626,6 +8626,44 @@
"└─TableRowIDScan(Probe) 11.00 cop[tikv] table:tcommon keep order:false, stats:pseudo"
],
"Warning": null
},
{
"SQL": "select * from thash use index(idx_ac, idx_bc) where a = 1 or b = 1 order by c limit 2",
"Plan": [
"TopN 2.00 root test.thash.c, offset:0, count:2",
"└─PartitionUnion 8.00 root ",
" ├─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" │ └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p0, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p0, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" │ └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p0 keep order:false, stats:pseudo",
" ├─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" │ └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p1, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p1, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" │ └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p1 keep order:false, stats:pseudo",
" ├─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" │ └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p2, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p2, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" │ └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p2 keep order:false, stats:pseudo",
" └─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p3, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p3, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p3 keep order:false, stats:pseudo"
],
"Warning": [
"Warning 1105 disable dynamic pruning due to thash has no global stats"
]
}
]
}
Expand Down
36 changes: 21 additions & 15 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1138,8 +1138,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
}
if plan, ok := finalScan.(*PhysicalTableScan); ok {
plan.ByItems = p.ByItems
tblInfo := plan.Table
if tblInfo.GetPartitionInfo() != nil {
if plan.Table.GetPartitionInfo() != nil && p.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
plan.Columns = append(plan.Columns, model.NewExtraPhysTblIDColInfo())
plan.schema.Append(&expression.Column{
RetType: types.NewFieldType(mysql.TypeLonglong),
Expand All @@ -1150,8 +1149,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
}
if plan, ok := finalScan.(*PhysicalIndexScan); ok {
plan.ByItems = p.ByItems
tblInfo := plan.Table
if tblInfo.GetPartitionInfo() != nil && !plan.Index.Global {
if plan.Table.GetPartitionInfo() != nil && p.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() && !plan.Index.Global {
plan.Columns = append(plan.Columns, model.NewExtraPhysTblIDColInfo())
plan.schema.Append(&expression.Column{
RetType: types.NewFieldType(mysql.TypeLonglong),
Expand All @@ -1164,10 +1162,6 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
}
}

newPartitionBy := make([]property.SortItem, 0, len(p.GetPartitionBy()))
for _, expr := range p.GetPartitionBy() {
newPartitionBy = append(newPartitionBy, expr.Clone())
}
if !copTsk.indexPlanFinished {
// If indexPlan side isn't finished, there's no selection on the table side.
if len(copTsk.idxMergePartPlans) > 0 {
Expand All @@ -1192,6 +1186,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
if !clonedTblScan.Schema().Contains(pkCol) {
clonedTblScan.Schema().Append(pkCol)
clonedTblScan.Columns = append(clonedTblScan.Columns, pk)
copTsk.needExtraProj = true
}
} else if tblInfo.IsCommonHandle {
idxInfo := tblInfo.GetPrimaryKey()
Expand All @@ -1200,30 +1195,41 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
if !clonedTblScan.Schema().Contains(c) {
clonedTblScan.Schema().Append(c)
clonedTblScan.Columns = append(clonedTblScan.Columns, c.ToInfo())
copTsk.needExtraProj = true
}
}
} else {
if !clonedTblScan.Schema().Contains(clonedTblScan.HandleCols.GetCol(0)) {
clonedTblScan.Schema().Append(clonedTblScan.HandleCols.GetCol(0))
clonedTblScan.Columns = append(clonedTblScan.Columns, model.NewExtraHandleColInfo())
copTsk.needExtraProj = true
}
}
clonedTblScan.HandleCols, err = clonedTblScan.HandleCols.ResolveIndices(clonedTblScan.Schema())
if err != nil {
return nil, false
}
if copTsk.needExtraProj {
copTsk.originSchema = copTsk.tablePlan.Schema()
}
copTsk.tablePlan = clonedTblScan
copTsk.indexPlanFinished = true
rootTask := copTsk.convertToRootTask(p.ctx)
if indexMerge, ok := rootTask.p.(*PhysicalIndexMergeReader); ok {
indexMerge.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
indexMerge, ok := rootTask.p.(*PhysicalIndexMergeReader)
if !ok {
// needExtraProj == true
indexMerge, ok = rootTask.p.Children()[0].(*PhysicalIndexMergeReader)
if !ok {
return nil, false
}
indexMerge.ByItems = p.ByItems
indexMerge.KeepOrder = true
return rootTask, true
}
indexMerge.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
indexMerge.ByItems = p.ByItems
indexMerge.KeepOrder = true
return rootTask, true
}
} else {
return nil, false
Expand Down

0 comments on commit ec03200

Please sign in to comment.