Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan: covert max/min to Limit + Sort operators #5105

Merged
merged 18 commits into from Nov 16, 2017
12 changes: 6 additions & 6 deletions executor/index_lookup_join.go
Expand Up @@ -276,9 +276,9 @@ func (e *IndexLookUpJoin) fetchSortedInners(requestRows [][]types.Datum) error {
break
}

matched, err := expression.EvalBool(e.innerFilter, innerRow, e.ctx)
if err != nil {
return errors.Trace(err)
matched, err1 := expression.EvalBool(e.innerFilter, innerRow, e.ctx)
if err1 != nil {
return errors.Trace(err1)
} else if matched {
e.innerOrderedRows.rows = append(e.innerOrderedRows.rows, innerRow)
}
Expand All @@ -288,9 +288,9 @@ func (e *IndexLookUpJoin) fetchSortedInners(requestRows [][]types.Datum) error {
innerJoinKey := e.buffer4JoinKey[:0]
for _, innerRow := range e.innerOrderedRows.rows {
for _, innerKey := range e.innerKeys {
innerDatum, err := innerKey.Eval(innerRow)
if err != nil {
return errors.Trace(err)
innerDatum, err1 := innerKey.Eval(innerRow)
if err1 != nil {
return errors.Trace(err1)
}
innerJoinKey = append(innerJoinKey, innerDatum)
}
Expand Down
6 changes: 3 additions & 3 deletions executor/join.go
Expand Up @@ -348,9 +348,9 @@ func (e *HashJoinExec) joinOuterRow(workerID int, outerRow Row, resultBuffer *ex

innerRows := make([]Row, 0, len(values))
for _, value := range values {
innerRow, err := e.decodeRow(value)
if err != nil {
resultBuffer.err = errors.Trace(err)
innerRow, err1 := e.decodeRow(value)
if err1 != nil {
resultBuffer.err = errors.Trace(err1)
return false
}
innerRows = append(innerRows, innerRow)
Expand Down
76 changes: 76 additions & 0 deletions plan/aggregation_eliminate.go
@@ -0,0 +1,76 @@
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
// // Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package plan

import (
"github.com/pingcap/tidb/ast"
"github.com/pingcap/tidb/context"
)

// aggEliminater tries to elimiate max/min aggregate function.
// For SQL like `select max(c) from t;`, we could optimize it to `select c from t order by c desc limit 1;`.
// For SQL like `select min(c) from t;`, we could optimize it to `select c from t order by c limit 1;`.
type aggEliminater struct {
ctx context.Context
}

func (a *aggEliminater) optimize(p LogicalPlan, ctx context.Context) (LogicalPlan, error) {
a.ctx = ctx
return a.eliminateAgg(p), nil
}

// Try to convert max/min to Limit+Sort operators.
func (a *aggEliminater) eliminateAgg(p LogicalPlan) LogicalPlan {
if agg, ok := p.(*LogicalAggregation); ok {
// We only consider case with single max/min function.
if len(agg.AggFuncs) != 1 {
return p
}
f := agg.AggFuncs[0]
if f.GetName() != ast.AggFuncMax && f.GetName() != ast.AggFuncMin {
return p
}

// Add Sort and Limit operators.
// For max function, the sort order should be desc.
desc := f.GetName() == ast.AggFuncMax
// Compose Sort operator.
sort := Sort{}.init(a.ctx)
sort.ByItems = append(sort.ByItems, &ByItems{f.GetArgs()[0], desc})
sort.SetSchema(p.Children()[0].Schema().Clone())
setParentAndChildren(sort, p.Children()...)
// Compose Limit operator.
li := Limit{Count: 1}.init(a.ctx)
li.SetSchema(sort.Schema().Clone())
setParentAndChildren(li, sort)

// Add a projection operator here.
// During topn_push_down, the sort/limit operator will be converted to a topn operator and the schema of sort/limit will be ignored.
// So the schema of the LogicalAggregation will be lost. We add this projection operator to keep the schema unlost.
// For SQL like `select * from t where v=(select min(t1.v) from t t1, t t2, t t3 where t1.id=t2.id and t2.id=t3.id and t1.id=t.id);`,
// the min(t1.v) will be refered in the outer selection. So we should keep the schema from LogicalAggragation.
proj := Projection{}.init(a.ctx)
proj.Exprs = append(proj.Exprs, f.GetArgs()[0])
proj.SetSchema(p.Schema().Clone())
setParentAndChildren(proj, li)
return proj
}

newChildren := make([]Plan, 0, len(p.Children()))
for _, child := range p.Children() {
newChild := a.eliminateAgg(child.(LogicalPlan))
newChildren = append(newChildren, newChild)
}
setParentAndChildren(p, newChildren...)
return p
}
17 changes: 17 additions & 0 deletions plan/logical_plan_builder.go
Expand Up @@ -63,14 +63,24 @@ func (p *LogicalAggregation) collectGroupByColumns() {
func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.AggregateFuncExpr, gbyItems []expression.Expression) (LogicalPlan, map[int]int) {
b.optFlag = b.optFlag | flagBuildKeyInfo
b.optFlag = b.optFlag | flagAggregationOptimize
eliminateAgg := len(gbyItems) == 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimize is too complex. If we have an aggregate, we set flag eliminateAgg. That's enough.


agg := LogicalAggregation{AggFuncs: make([]aggregation.Aggregation, 0, len(aggFuncList))}.init(b.ctx)
schema := expression.NewSchema(make([]*expression.Column, 0, len(aggFuncList)+p.Schema().Len())...)
// aggIdxMap maps the old index to new index after applying common aggregation functions elimination.
aggIndexMap := make(map[int]int)
for i, aggFunc := range aggFuncList {
if eliminateAgg && (aggFunc.F != ast.AggFuncMax) && (aggFunc.F != ast.AggFuncMin) {
eliminateAgg = false
}
var newArgList []expression.Expression
for _, arg := range aggFunc.Args {
if eliminateAgg {
_, ok := arg.(*ast.ColumnNameExpr)
if !ok {
eliminateAgg = false
}
}
newArg, np, err := b.rewrite(arg, p, nil, true)
if err != nil {
b.err = errors.Trace(err)
Expand Down Expand Up @@ -109,6 +119,10 @@ func (b *planBuilder) buildAggregation(p LogicalPlan, aggFuncList []*ast.Aggrega
agg.GroupByItems = gbyItems
agg.SetSchema(schema)
agg.collectGroupByColumns()
if eliminateAgg {
b.optFlag = b.optFlag | flagAggEliminate
b.optFlag = b.optFlag | flagPushDownTopN
}
return agg, aggIndexMap
}

Expand Down Expand Up @@ -1491,6 +1505,9 @@ func (b *planBuilder) buildSelect(sel *ast.SelectStmt) LogicalPlan {
if b.err != nil {
return nil
}
if b.optFlag&flagAggEliminate == flagAggEliminate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to topn will be no worse than original. it's ok not to do this

b.optFlag = b.optFlag &^ flagAggregationOptimize
}
}
var oldLen int
p, oldLen = b.buildProjection(p, sel.Fields.Fields, totalMap)
Expand Down
51 changes: 51 additions & 0 deletions plan/logical_plan_test.go
Expand Up @@ -1675,3 +1675,54 @@ func (s *testPlanSuite) TestNameResolver(c *C) {
}
}
}

func (s *testPlanSuite) TestAggEliminater(c *C) {
defer func() {
testleak.AfterTest(c)()
}()
tests := []struct {
sql string
best string
}{
// Max to Limit + Sort-Desc.
{
sql: "select max(a) from t;",
best: "DataScan(t)->TopN([test.t.a true],0,1)->Projection->Projection",
},
// Min to Limit + Sort.
{
sql: "select min(a) from t;",
best: "DataScan(t)->TopN([test.t.a],0,1)->Projection->Projection",
},
// Do nothing to max+min.
{
sql: "select max(a), min(a) from t;",
best: "DataScan(t)->Aggr(max(test.t.a),min(test.t.a))->Projection",
},
// Do nothing to max with groupby.
{
sql: "select max(a) from t group by b;",
best: "DataScan(t)->Aggr(max(test.t.a))->Projection",
},
}

for _, tt := range tests {
comment := Commentf("for %s", tt.sql)
stmt, err := s.ParseOneStmt(tt.sql, "", "")
c.Assert(err, IsNil, comment)

is, err := MockPreprocess(stmt, false)
c.Assert(err, IsNil)

builder := &planBuilder{
ctx: mockContext(),
is: is,
colMapper: make(map[*ast.ColumnNameExpr]int),
}
p := builder.build(stmt).(LogicalPlan)
c.Assert(builder.err, IsNil)
p, err = logicalOptimize(builder.optFlag, p.(LogicalPlan), builder.ctx)
c.Assert(err, IsNil)
c.Assert(ToString(p), Equals, tt.best, comment)
}
}
2 changes: 2 additions & 0 deletions plan/optimizer.go
Expand Up @@ -36,6 +36,7 @@ const (
flagDecorrelate
flagPredicatePushDown
flagAggregationOptimize
flagAggEliminate // Keep AggEliminater before PushDownTopN, for it will add a TopN operator.
flagPushDownTopN
)

Expand All @@ -46,6 +47,7 @@ var optRuleList = []logicalOptRule{
&decorrelateSolver{},
&ppdSolver{},
&aggregationOptimizer{},
&aggEliminater{},
&pushDownTopNOptimizer{},
}

Expand Down