Skip to content

Commit

Permalink
planner, executor: add extraProj for indexMerge with orderBy + limit (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed May 10, 2023
1 parent f5b2fd2 commit 21853ad
Show file tree
Hide file tree
Showing 5 changed files with 131 additions and 70 deletions.
17 changes: 14 additions & 3 deletions executor/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,11 @@ func TestOrderByWithLimit(t *testing.T) {
tk.MustExec("analyze table tpkhash")

for i := 0; i < 100; i++ {
if i%2 == 0 {
tk.MustExec("set tidb_partition_prune_mode = `static-only`")
} else {
tk.MustExec("set tidb_partition_prune_mode = `dynamic-only`")
}
a := rand.Intn(32)
b := rand.Intn(32)
limit := rand.Intn(10) + 1
Expand Down Expand Up @@ -1027,17 +1032,23 @@ func TestOrderByWithLimit(t *testing.T) {
queryHash := fmt.Sprintf("select /*+ use_index_merge(thash, idx_ac, idx_bc) */ * from thash where a = %v or b = %v order by c limit %v", a, b, limit)
resHash := tk.MustQuery(queryHash).Rows()
require.True(t, tk.HasPlan(queryHash, "IndexMerge"))
require.False(t, tk.HasPlan(queryHash, "TopN"))
if i%2 == 1 {
require.False(t, tk.HasPlan(queryHash, "TopN"))
}

queryCommonHash := fmt.Sprintf("select /*+ use_index_merge(tcommonhash, primary, idx_bc) */ * from tcommonhash where a = %v or b = %v order by c limit %v", a, b, limit)
resCommonHash := tk.MustQuery(queryCommonHash).Rows()
require.True(t, tk.HasPlan(queryCommonHash, "IndexMerge"))
require.False(t, tk.HasPlan(queryCommonHash, "TopN"))
if i%2 == 1 {
require.False(t, tk.HasPlan(queryCommonHash, "TopN"))
}

queryPKHash := fmt.Sprintf("select /*+ use_index_merge(tpkhash, idx_ac, idx_bc) */ * from tpkhash where a = %v or b = %v order by c limit %v", a, b, limit)
resPKHash := tk.MustQuery(queryPKHash).Rows()
require.True(t, tk.HasPlan(queryPKHash, "IndexMerge"))
require.False(t, tk.HasPlan(queryPKHash, "TopN"))
if i%2 == 1 {
require.False(t, tk.HasPlan(queryPKHash, "TopN"))
}

sliceRes := getResult(valueSlice, a, b, limit, false)

Expand Down
3 changes: 3 additions & 0 deletions planner/core/casetest/physical_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,12 +2521,15 @@ func TestIndexMergeOrderPushDown(t *testing.T) {
tk.MustExec("set tidb_cost_model_version=1")
tk.MustExec("create table t (a int, b int, c int, index idx(a, c), index idx2(b, c))")
tk.MustExec("create table tcommon (a int, b int, c int, primary key(a, c), index idx2(b, c))")
tk.MustExec("create table thash(a int, b int, c int, index idx_ac(a, c), index idx_bc(b, c)) PARTITION BY HASH (`a`) PARTITIONS 4")

for i, ts := range input {
testdata.OnRecord(func() {
output[i].SQL = ts
output[i].Plan = testdata.ConvertRowsToStrings(tk.MustQuery("explain format = 'brief' " + ts).Rows())
output[i].Warning = testdata.ConvertRowsToStrings(tk.MustQuery("show warnings").Rows())
})
tk.MustQuery("explain format = 'brief' " + ts).Check(testkit.Rows(output[i].Plan...))
tk.MustQuery("show warnings").Check(testkit.Rows(output[i].Warning...))
}
}
5 changes: 3 additions & 2 deletions planner/core/casetest/testdata/plan_suite_in.json
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@
"explain select /*+ shuffle_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a",
"explain select /*+ broadcast_join(t1, t2) */ * from t t1, t t2 where t1.a=t2.a"
]
},
},
{
"name": "TestMPPBCJModel",
"cases": [
Expand Down Expand Up @@ -1316,7 +1316,8 @@
"select * from t where (a = 1 or b = 2) and c = 3 order by c limit 2",
"select * from t where (a = 1 or b = 2) and c in (1, 2, 3) order by c limit 2",
"select * from tcommon where a = 1 or b = 1 order by c limit 2",
"select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2"
"select * from tcommon where (a = 1 and c = 2) or (b = 1) order by c limit 2",
"select * from thash use index(idx_ac, idx_bc) where a = 1 or b = 1 order by c limit 2"
]
}
]
42 changes: 40 additions & 2 deletions planner/core/casetest/testdata/plan_suite_out.json
Original file line number Diff line number Diff line change
Expand Up @@ -8442,7 +8442,7 @@
{
"SQL": "select * from t where a = 1 or b = 1 order by c limit 2",
"Plan": [
"Projection 2.00 root test.t.a, test.t.b, test.t.c",
"Projection 19.99 root test.t.a, test.t.b, test.t.c",
"└─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 2.00 cop[tikv] table:t, index:idx(a, c) range:[1,1], keep order:true, stats:pseudo",
Expand Down Expand Up @@ -8491,7 +8491,7 @@
{
"SQL": "select * from t where (a = 1 and c = 2) or (b = 1) order by c limit 2",
"Plan": [
"Projection 2.00 root test.t.a, test.t.b, test.t.c",
"Projection 10.10 root test.t.a, test.t.b, test.t.c",
"└─IndexMerge 10.10 root type: union, limit embedded(offset:0, count:2)",
" ├─Limit(Build) 0.10 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 0.10 cop[tikv] table:t, index:idx(a, c) range:[1 2,1 2], keep order:true, stats:pseudo",
Expand Down Expand Up @@ -8571,6 +8571,44 @@
"└─TableRowIDScan(Probe) 11.00 cop[tikv] table:tcommon keep order:false, stats:pseudo"
],
"Warning": null
},
{
"SQL": "select * from thash use index(idx_ac, idx_bc) where a = 1 or b = 1 order by c limit 2",
"Plan": [
"TopN 2.00 root test.thash.c, offset:0, count:2",
"└─PartitionUnion 8.00 root ",
" ├─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" │ └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p0, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p0, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" │ └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p0 keep order:false, stats:pseudo",
" ├─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" │ └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p1, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p1, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" │ └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p1 keep order:false, stats:pseudo",
" ├─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" │ └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p2, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" │ ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p2, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" │ └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p2 keep order:false, stats:pseudo",
" └─Projection 19.99 root test.thash.a, test.thash.b, test.thash.c",
" └─IndexMerge 19.99 root type: union, limit embedded(offset:0, count:2)",
" ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p3, index:idx_ac(a, c) range:[1,1], keep order:true, stats:pseudo",
" ├─Limit(Build) 2.00 cop[tikv] offset:0, count:2",
" │ └─IndexRangeScan 2.00 cop[tikv] table:thash, partition:p3, index:idx_bc(b, c) range:[1,1], keep order:true, stats:pseudo",
" └─TableRowIDScan(Probe) 19.99 cop[tikv] table:thash, partition:p3 keep order:false, stats:pseudo"
],
"Warning": [
"Warning 1105 disable dynamic pruning due to thash has no global stats"
]
}
]
}
Expand Down
134 changes: 71 additions & 63 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,7 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
if !clonedTblScan.Schema().Contains(pkCol) {
clonedTblScan.Schema().Append(pkCol)
clonedTblScan.(*PhysicalTableScan).Columns = append(clonedTblScan.(*PhysicalTableScan).Columns, pk)
copTsk.needExtraProj = true
}
} else if tblInfo.IsCommonHandle {
idxInfo := tblInfo.GetPrimaryKey()
Expand All @@ -1222,85 +1223,95 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
if !clonedTblScan.Schema().Contains(c) {
clonedTblScan.Schema().Append(c)
clonedTblScan.(*PhysicalTableScan).Columns = append(clonedTblScan.(*PhysicalTableScan).Columns, c.ToInfo())
copTsk.needExtraProj = true
}
}
} else {
if !clonedTblScan.Schema().Contains(tblScan.HandleCols.GetCol(0)) {
clonedTblScan.Schema().Append(tblScan.HandleCols.GetCol(0))
clonedTblScan.(*PhysicalTableScan).Columns = append(clonedTblScan.(*PhysicalTableScan).Columns, model.NewExtraHandleColInfo())
copTsk.needExtraProj = true
}
}
clonedTblScan.(*PhysicalTableScan).HandleCols, err = tblScan.HandleCols.ResolveIndices(clonedTblScan.Schema())
if err != nil {
return nil, false
}
if copTsk.needExtraProj {
copTsk.originSchema = copTsk.tablePlan.Schema()
}
copTsk.tablePlan = clonedTblScan
copTsk.indexPlanFinished = true
rootTask := copTsk.convertToRootTask(p.ctx)
if indexMerge, ok := rootTask.p.(*PhysicalIndexMergeReader); ok {
indexMerge.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
indexMerge, ok := rootTask.p.(*PhysicalIndexMergeReader)
if !ok {
// needExtraProj == true
indexMerge, ok = rootTask.p.Children()[0].(*PhysicalIndexMergeReader)
if !ok {
return nil, false
}
indexMerge.ByItems = p.ByItems
indexMerge.KeepOrder = true
return rootTask, true
}
} else {
// The normal index scan cases.(single read and double read)
propMatched := p.checkOrderPropForSubIndexScan(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp)
if !propMatched {
return nil, false
}
idxScan.Desc = isDesc
idxScan.KeepOrder = true
idxScan.ByItems = p.ByItems
childProfile := copTsk.plan().statsInfo()
newCount := p.Offset + p.Count
stats := deriveLimitStats(childProfile, float64(newCount))
pushedLimit := PhysicalLimit{
Count: newCount,
}.Init(p.SCtx(), stats, p.SelectBlockOffset())
pushedLimit.SetSchema(copTsk.indexPlan.Schema())
copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask)

// A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalIndexScan.
child := pushedLimit.Children()[0]
// The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count.
child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount)))
// The Limit->Selection->IndexScan case:
// adjust the row count of IndexScan according to the selectivity of the Selection.
if selSelectivity > 0 && selSelectivity < 1 {
scaledRowCount := child.Stats().RowCount / selSelectivity
idxScan.SetStats(idxScan.Stats().ScaleByExpectCnt(scaledRowCount))
indexMerge.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
indexMerge.ByItems = p.ByItems
indexMerge.KeepOrder = true
return rootTask, true
}
// The normal index scan cases.(single read and double read)
propMatched := p.checkOrderPropForSubIndexScan(idxScan.IdxCols, idxScan.IdxColLens, idxScan.constColsByCond, colsProp)
if !propMatched {
return nil, false
}
idxScan.Desc = isDesc
idxScan.KeepOrder = true
idxScan.ByItems = p.ByItems
childProfile := copTsk.plan().statsInfo()
newCount := p.Offset + p.Count
stats := deriveLimitStats(childProfile, float64(newCount))
pushedLimit := PhysicalLimit{
Count: newCount,
}.Init(p.SCtx(), stats, p.SelectBlockOffset())
pushedLimit.SetSchema(copTsk.indexPlan.Schema())
copTsk = attachPlan2Task(pushedLimit, copTsk).(*copTask)

rootTask := copTsk.convertToRootTask(p.ctx)
// embedded limit in indexLookUp, no more limit needed.
if idxLookup, ok := rootTask.p.(*PhysicalIndexLookUpReader); ok {
idxLookup.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
extraInfo, extraCol, hasExtraCol := tryGetPkExtraColumn(p.ctx.GetSessionVars(), tblInfo)
// TODO: sometimes it will add a duplicate `_tidb_rowid` column in ts.schema()
if hasExtraCol {
idxLookup.ExtraHandleCol = extraCol
ts := idxLookup.TablePlans[0].(*PhysicalTableScan)
ts.Columns = append(ts.Columns, extraInfo)
ts.schema.Append(extraCol)
ts.HandleIdx = []int{len(ts.Columns) - 1}
}
return rootTask, true
}
rootLimit := PhysicalLimit{
Count: p.Count,
Offset: p.Offset,
PartitionBy: newPartitionBy,
}.Init(p.SCtx(), stats, p.SelectBlockOffset())
rootLimit.SetSchema(rootTask.plan().Schema())
return attachPlan2Task(rootLimit, rootTask), true
// A similar but simplified logic compared the ExpectedCnt handling logic in getOriginalPhysicalIndexScan.
child := pushedLimit.Children()[0]
// The row count of the direct child of Limit should be adjusted to be no larger than the Limit.Count.
child.SetStats(child.statsInfo().ScaleByExpectCnt(float64(newCount)))
// The Limit->Selection->IndexScan case:
// adjust the row count of IndexScan according to the selectivity of the Selection.
if selSelectivity > 0 && selSelectivity < 1 {
scaledRowCount := child.Stats().RowCount / selSelectivity
idxScan.SetStats(idxScan.Stats().ScaleByExpectCnt(scaledRowCount))
}

rootTask := copTsk.convertToRootTask(p.ctx)
// embedded limit in indexLookUp, no more limit needed.
if idxLookup, ok := rootTask.p.(*PhysicalIndexLookUpReader); ok {
idxLookup.PushedLimit = &PushedDownLimit{
Offset: p.Offset,
Count: p.Count,
}
extraInfo, extraCol, hasExtraCol := tryGetPkExtraColumn(p.ctx.GetSessionVars(), tblInfo)
// TODO: sometimes it will add a duplicate `_tidb_rowid` column in ts.schema()
if hasExtraCol {
idxLookup.ExtraHandleCol = extraCol
ts := idxLookup.TablePlans[0].(*PhysicalTableScan)
ts.Columns = append(ts.Columns, extraInfo)
ts.schema.Append(extraCol)
ts.HandleIdx = []int{len(ts.Columns) - 1}
}
return rootTask, true
}
rootLimit := PhysicalLimit{
Count: p.Count,
Offset: p.Offset,
PartitionBy: newPartitionBy,
}.Init(p.SCtx(), stats, p.SelectBlockOffset())
rootLimit.SetSchema(rootTask.plan().Schema())
return attachPlan2Task(rootLimit, rootTask), true
} else if copTsk.indexPlan == nil {
if tblScan.HandleCols == nil {
return nil, false
Expand Down Expand Up @@ -1351,12 +1362,9 @@ func (p *PhysicalTopN) pushPartialTopNDownToCop(copTsk *copTask) (task, bool) {
}.Init(p.SCtx(), stats, p.SelectBlockOffset())
rootLimit.SetSchema(rootTask.plan().Schema())
return attachPlan2Task(rootLimit, rootTask), true
} else {
return nil, false
}

rootTask := copTsk.convertToRootTask(p.ctx)
return attachPlan2Task(p, rootTask), true
return nil, false
}

// checkOrderPropForSubIndexScan checks whether these index columns can meet the specified order property.
Expand Down

0 comments on commit 21853ad

Please sign in to comment.