Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add more checks when pushing TopN down #41370

Merged
merged 12 commits into from
Feb 15, 2023
35 changes: 35 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8452,3 +8452,38 @@ func TestIssue41273(t *testing.T) {
// For now tidb doesn't support push set type to TiKV, and column a is a set type, so we shouldn't generate a IndexMerge path.
require.False(t, tk.HasPlanForLastExecution("IndexMerge"))
}

// https://github.com/pingcap/tidb/issues/41355
func TestIssue41355(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1")
tk.MustExec("CREATE TABLE `t1` (`c1` varchar(100) DEFAULT NULL, `c2` varchar(100) GENERATED ALWAYS AS (lower(`c1`)) VIRTUAL);")
tk.MustExec("insert into t1(c1) values('a'), ('e'), ('b'), ('c'), ('d'), ('e'), ('x'), ('y'), ('a'), ('b');")

// tikv
tk.MustExec("set @@tidb_isolation_read_engines = 'tikv'")
tk.MustQuery("select * from t1 order by c2 limit 2;").Check(testkit.Rows("a a", "a a"))
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved

// tiflash
tk.MustExec("set @@tidb_allow_mpp=1; set @@tidb_enforce_mpp=1")
tk.MustExec("set @@tidb_isolation_read_engines = 'tiflash'")
is := dom.InfoSchema()
db, exists := is.SchemaByName(model.NewCIStr("test"))
require.True(t, exists)
for _, tblInfo := range db.Tables {
if tblInfo.Name.L == "t1" {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: 1,
Available: true,
}
}
}
rows := [][]interface{}{
{"TopN_7", "root", "test.t1.c2, offset:0, count:2"},
{"└─TableReader_15", "root", "data:TableFullScan_14"},
{" └─TableFullScan_14", "cop[tiflash]", "keep order:false, stats:pseudo"},
}
tk.MustQuery("explain select * from t1 order by c2 limit 2;").CheckAt([]int{0, 2, 4}, rows)
}
63 changes: 52 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,15 +895,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
return true
}

// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed.
func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalSort) attach2Task(tasks ...task) task {
t := tasks[0].copy()
t = attachPlan2Task(p, t)
Expand Down Expand Up @@ -955,14 +946,64 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*
return true
}

// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
func (p *PhysicalTopN) containVirtualColumn(tCols []*expression.Column) bool {
for _, by := range p.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
for _, tCol := range tCols {
// A column with ID > 0 indicates that the column can be resolved by data source.
if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil {
return true
}
}
}
}
return false
}

// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
if !p.canExpressionConvertedToPB(kv.TiKV) {
return false
}
if len(copTask.rootTaskConds) != 0 {
return false
}
if p.containVirtualColumn(copTask.plan().Schema().Columns) {
return false
}
return true
}

// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
func (p *PhysicalTopN) canPushDownToTiFlash(mppTask *mppTask) bool {
if !p.canExpressionConvertedToPB(kv.TiFlash) {
return false
}
if p.containVirtualColumn(mppTask.plan().Schema().Columns) {
return false
}
return true
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
t := tasks[0].copy()
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) {
newTask, changed := p.pushTopNDownToDynamicPartition(copTask)
if changed {
return newTask
Expand All @@ -978,7 +1019,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan)
copTask.tablePlan = pushedDownTopN
}
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash(mppTask) {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.p = pushedDownTopN
}
Expand Down