Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add more checks when pushing TopN down #41370

Merged
merged 12 commits into from
Feb 15, 2023
10 changes: 10 additions & 0 deletions planner/core/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8431,3 +8431,13 @@ func TestIssue40285(t *testing.T) {
tk.MustExec("CREATE TABLE t(col1 enum('p5', '9a33x') NOT NULL DEFAULT 'p5',col2 tinyblob DEFAULT NULL) ENGINE = InnoDB DEFAULT CHARSET = latin1 COLLATE = latin1_bin;")
tk.MustQuery("(select last_value(col1) over () as r0 from t) union all (select col2 as r0 from t);")
}

// https://github.com/pingcap/tidb/issues/41355
func TestIssue41355(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t1; CREATE TABLE `t1` (`c1` varchar(100) DEFAULT NULL, `c2` varchar(100) GENERATED ALWAYS AS (lower(`c1`)) VIRTUAL);")
tk.MustExec("insert into t1(c1) values('a'), ('e'), ('b'), ('c'), ('d'), ('e'), ('x'), ('y'), ('a'), ('b');")
tk.MustQuery("select * from t1 order by c2 limit 2;").Check(testkit.Rows("a a", "a a"))
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
}
60 changes: 49 additions & 11 deletions planner/core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -895,15 +895,6 @@ func (p *PhysicalLimit) sinkIntoIndexLookUp(t task) bool {
return true
}

// canPushDown checks if this topN can be pushed down. If each of the expression can be converted to pb, it can be pushed.
func (p *PhysicalTopN) canPushDown(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

func (p *PhysicalSort) attach2Task(tasks ...task) task {
t := tasks[0].copy()
t = attachPlan2Task(p, t)
Expand Down Expand Up @@ -955,14 +946,61 @@ func (p *PhysicalTopN) canPushToIndexPlan(indexPlan PhysicalPlan, byItemCols []*
return true
}

// canExpressionConvertedToPB checks whether each of the the expression in TopN can be converted to pb.
func (p *PhysicalTopN) canExpressionConvertedToPB(storeTp kv.StoreType) bool {
exprs := make([]expression.Expression, 0, len(p.ByItems))
for _, item := range p.ByItems {
exprs = append(exprs, item.Expr)
}
return expression.CanExprsPushDown(p.ctx.GetSessionVars().StmtCtx, exprs, p.ctx.GetClient(), storeTp)
}

// containVirtualColumn checks whether TopN.ByItems contains virtual generated columns.
func (p *PhysicalTopN) containVirtualColumn(copTask *copTask) bool {
tCols := copTask.tablePlan.Schema().Columns
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
for _, by := range p.ByItems {
cols := expression.ExtractColumns(by.Expr)
for _, col := range cols {
if col.VirtualExpr != nil {
for _, tCol := range tCols {
// A column with ID > 0 indicates that the column can be resolved by data source.
if tCol.ID > 0 && tCol.ID == col.ID && tCol.VirtualExpr != nil {
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
return true
}
}
}
}
}
return false
}

// canPushDownToTiKV checks whether this topN can be pushed down to TiKV.
func (p *PhysicalTopN) canPushDownToTiKV(copTask *copTask) bool {
if !p.canExpressionConvertedToPB(kv.TiKV) {
return false
}
if len(copTask.rootTaskConds) != 0 {
return false
}
if p.containVirtualColumn(copTask) {
return false
}
return true
}

// canPushDownToTiFlash checks whether this topN can be pushed down to TiFlash.
func (p *PhysicalTopN) canPushDownToTiFlash() bool {
return !p.canExpressionConvertedToPB(kv.TiFlash)
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *PhysicalTopN) attach2Task(tasks ...task) task {
Dousir9 marked this conversation as resolved.
Show resolved Hide resolved
t := tasks[0].copy()
cols := make([]*expression.Column, 0, len(p.ByItems))
for _, item := range p.ByItems {
cols = append(cols, expression.ExtractColumns(item.Expr)...)
}
needPushDown := len(cols) > 0
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDown(copTask.getStoreType()) && len(copTask.rootTaskConds) == 0 {
if copTask, ok := t.(*copTask); ok && needPushDown && p.canPushDownToTiKV(copTask) {
newTask, changed := p.pushTopNDownToDynamicPartition(copTask)
if changed {
return newTask
Expand All @@ -978,7 +1016,7 @@ func (p *PhysicalTopN) attach2Task(tasks ...task) task {
pushedDownTopN = p.getPushedDownTopN(copTask.tablePlan)
copTask.tablePlan = pushedDownTopN
}
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDown(kv.TiFlash) {
} else if mppTask, ok := t.(*mppTask); ok && needPushDown && p.canPushDownToTiFlash() {
pushedDownTopN := p.getPushedDownTopN(mppTask.p)
mppTask.p = pushedDownTopN
}
Expand Down