forked from pingcap/tidb
/
rule_max_min_eliminate.go
309 lines (291 loc) · 12.4 KB
/
rule_max_min_eliminate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package core
import (
"bytes"
"context"
"fmt"
"github.com/twotigers93/tidb/expression"
"github.com/twotigers93/tidb/expression/aggregation"
"github.com/twotigers93/tidb/parser/ast"
"github.com/twotigers93/tidb/parser/model"
"github.com/twotigers93/tidb/parser/mysql"
"github.com/twotigers93/tidb/planner/util"
"github.com/twotigers93/tidb/types"
"github.com/twotigers93/tidb/util/ranger"
)
// maxMinEliminator tries to eliminate max/min aggregate function.
// For SQL like `select max(id) from t;`, we could optimize it to `select max(id) from (select id from t order by id desc limit 1 where id is not null) t;`.
// For SQL like `select min(id) from t;`, we could optimize it to `select max(id) from (select id from t order by id limit 1 where id is not null) t;`.
// For SQL like `select max(id), min(id) from t;`, we could optimize it to the cartesianJoin result of the two queries above if `id` has an index.
type maxMinEliminator struct {
}
func (a *maxMinEliminator) optimize(_ context.Context, p LogicalPlan, opt *logicalOptimizeOp) (LogicalPlan, error) {
return a.eliminateMaxMin(p, opt), nil
}
// composeAggsByInnerJoin composes the scalar aggregations by cartesianJoin.
func (a *maxMinEliminator) composeAggsByInnerJoin(originAgg *LogicalAggregation, aggs []*LogicalAggregation, opt *logicalOptimizeOp) (plan LogicalPlan) {
plan = aggs[0]
sctx := plan.SCtx()
joins := make([]*LogicalJoin, 0)
for i := 1; i < len(aggs); i++ {
join := LogicalJoin{JoinType: InnerJoin}.Init(sctx, plan.SelectBlockOffset())
join.SetChildren(plan, aggs[i])
join.schema = buildLogicalJoinSchema(InnerJoin, join)
join.cartesianJoin = true
plan = join
joins = append(joins, join)
}
appendEliminateMultiMinMaxTraceStep(originAgg, aggs, joins, opt)
return
}
// checkColCanUseIndex checks whether there is an AccessPath satisfy the conditions:
// 1. all of the selection's condition can be pushed down as AccessConds of the path.
// 2. the path can keep order for `col` after pushing down the conditions.
func (a *maxMinEliminator) checkColCanUseIndex(plan LogicalPlan, col *expression.Column, conditions []expression.Expression) bool {
switch p := plan.(type) {
case *LogicalSelection:
conditions = append(conditions, p.Conditions...)
return a.checkColCanUseIndex(p.children[0], col, conditions)
case *DataSource:
// Check whether there is an AccessPath can use index for col.
for _, path := range p.possibleAccessPaths {
if path.IsIntHandlePath {
// Since table path can contain accessConds of at most one column,
// we only need to check if all of the conditions can be pushed down as accessConds
// and `col` is the handle column.
if p.handleCols != nil && col.Equal(nil, p.handleCols.GetCol(0)) {
if _, filterConds := ranger.DetachCondsForColumn(p.ctx, conditions, col); len(filterConds) != 0 {
return false
}
return true
}
} else {
indexCols, indexColLen := path.FullIdxCols, path.FullIdxColLens
if path.IsCommonHandlePath {
indexCols, indexColLen = p.commonHandleCols, p.commonHandleLens
}
// 1. whether all of the conditions can be pushed down as accessConds.
// 2. whether the AccessPath can satisfy the order property of `col` with these accessConds.
result, err := ranger.DetachCondAndBuildRangeForIndex(p.ctx, conditions, indexCols, indexColLen, p.ctx.GetSessionVars().RangeMaxSize)
if err != nil || len(result.RemainedConds) != 0 {
continue
}
for i := 0; i <= result.EqCondCount; i++ {
if i < len(indexCols) && col.Equal(nil, indexCols[i]) {
return true
}
}
}
}
return false
default:
return false
}
}
// cloneSubPlans shallow clones the subPlan. We only consider `Selection` and `DataSource` here,
// because we have restricted the subPlan in `checkColCanUseIndex`.
func (a *maxMinEliminator) cloneSubPlans(plan LogicalPlan) LogicalPlan {
switch p := plan.(type) {
case *LogicalSelection:
newConditions := make([]expression.Expression, len(p.Conditions))
copy(newConditions, p.Conditions)
sel := LogicalSelection{Conditions: newConditions}.Init(p.ctx, p.blockOffset)
sel.SetChildren(a.cloneSubPlans(p.children[0]))
return sel
case *DataSource:
// Quick clone a DataSource.
// ReadOnly fields uses a shallow copy, while the fields which will be overwritten must use a deep copy.
newDs := *p
newDs.baseLogicalPlan = newBaseLogicalPlan(p.ctx, p.tp, &newDs, p.blockOffset)
newDs.schema = p.schema.Clone()
newDs.Columns = make([]*model.ColumnInfo, len(p.Columns))
copy(newDs.Columns, p.Columns)
newAccessPaths := make([]*util.AccessPath, 0, len(p.possibleAccessPaths))
for _, path := range p.possibleAccessPaths {
newPath := *path
newAccessPaths = append(newAccessPaths, &newPath)
}
newDs.possibleAccessPaths = newAccessPaths
return &newDs
}
// This won't happen, because we have checked the subtree.
return nil
}
// splitAggFuncAndCheckIndices splits the agg to multiple aggs and check whether each agg needs a sort
// after the transformation. For example, we firstly split the sql: `select max(a), min(a), max(b) from t` ->
// `select max(a) from t` + `select min(a) from t` + `select max(b) from t`.
// Then we check whether `a` and `b` have indices. If any of the used column has no index, we cannot eliminate
// this aggregation.
func (a *maxMinEliminator) splitAggFuncAndCheckIndices(agg *LogicalAggregation, opt *logicalOptimizeOp) (aggs []*LogicalAggregation, canEliminate bool) {
for _, f := range agg.AggFuncs {
// We must make sure the args of max/min is a simple single column.
col, ok := f.Args[0].(*expression.Column)
if !ok {
return nil, false
}
if !a.checkColCanUseIndex(agg.children[0], col, make([]expression.Expression, 0)) {
return nil, false
}
}
aggs = make([]*LogicalAggregation, 0, len(agg.AggFuncs))
// we can split the aggregation only if all of the aggFuncs pass the check.
for i, f := range agg.AggFuncs {
newAgg := LogicalAggregation{AggFuncs: []*aggregation.AggFuncDesc{f}}.Init(agg.ctx, agg.blockOffset)
newAgg.SetChildren(a.cloneSubPlans(agg.children[0]))
newAgg.schema = expression.NewSchema(agg.schema.Columns[i])
if err := newAgg.PruneColumns([]*expression.Column{newAgg.schema.Columns[0]}, opt); err != nil {
return nil, false
}
aggs = append(aggs, newAgg)
}
return aggs, true
}
// eliminateSingleMaxMin tries to convert a single max/min to Limit+Sort operators.
func (a *maxMinEliminator) eliminateSingleMaxMin(agg *LogicalAggregation, opt *logicalOptimizeOp) *LogicalAggregation {
f := agg.AggFuncs[0]
child := agg.Children()[0]
ctx := agg.SCtx()
var sel *LogicalSelection
var sort *LogicalSort
// If there's no column in f.GetArgs()[0], we still need limit and read data from real table because the result should be NULL if the input is empty.
if len(expression.ExtractColumns(f.Args[0])) > 0 {
// If it can be NULL, we need to filter NULL out first.
if !mysql.HasNotNullFlag(f.Args[0].GetType().GetFlag()) {
sel = LogicalSelection{}.Init(ctx, agg.blockOffset)
isNullFunc := expression.NewFunctionInternal(ctx, ast.IsNull, types.NewFieldType(mysql.TypeTiny), f.Args[0])
notNullFunc := expression.NewFunctionInternal(ctx, ast.UnaryNot, types.NewFieldType(mysql.TypeTiny), isNullFunc)
sel.Conditions = []expression.Expression{notNullFunc}
sel.SetChildren(agg.Children()[0])
child = sel
}
// Add Sort and Limit operators.
// For max function, the sort order should be desc.
desc := f.Name == ast.AggFuncMax
// Compose Sort operator.
sort = LogicalSort{}.Init(ctx, agg.blockOffset)
sort.ByItems = append(sort.ByItems, &util.ByItems{Expr: f.Args[0], Desc: desc})
sort.SetChildren(child)
child = sort
}
// Compose Limit operator.
li := LogicalLimit{Count: 1}.Init(ctx, agg.blockOffset)
li.SetChildren(child)
// If no data in the child, we need to return NULL instead of empty. This cannot be done by sort and limit themselves.
// Since now there would be at most one row returned, the remained agg operator is not expensive anymore.
agg.SetChildren(li)
appendEliminateSingleMaxMinTrace(agg, sel, sort, li, opt)
return agg
}
// eliminateMaxMin tries to convert max/min to Limit+Sort operators.
func (a *maxMinEliminator) eliminateMaxMin(p LogicalPlan, opt *logicalOptimizeOp) LogicalPlan {
newChildren := make([]LogicalPlan, 0, len(p.Children()))
for _, child := range p.Children() {
newChildren = append(newChildren, a.eliminateMaxMin(child, opt))
}
p.SetChildren(newChildren...)
if agg, ok := p.(*LogicalAggregation); ok {
if len(agg.GroupByItems) != 0 {
return agg
}
// Make sure that all of the aggFuncs are Max or Min.
for _, aggFunc := range agg.AggFuncs {
if aggFunc.Name != ast.AggFuncMax && aggFunc.Name != ast.AggFuncMin {
return agg
}
}
// Limit+Sort operators are sorted by value, but ENUM/SET field types are sorted by name.
cols := agg.GetUsedCols()
for _, col := range cols {
if col.RetType.GetType() == mysql.TypeEnum || col.RetType.GetType() == mysql.TypeSet {
return agg
}
}
if len(agg.AggFuncs) == 1 {
// If there is only one aggFunc, we don't need to guarantee that the child of it is a data
// source, or whether the sort can be eliminated. This transformation won't be worse than previous.
return a.eliminateSingleMaxMin(agg, opt)
}
// If we have more than one aggFunc, we can eliminate this agg only if all of the aggFuncs can benefit from
// their column's index.
aggs, canEliminate := a.splitAggFuncAndCheckIndices(agg, opt)
if !canEliminate {
return agg
}
for i := range aggs {
aggs[i] = a.eliminateSingleMaxMin(aggs[i], opt)
}
return a.composeAggsByInnerJoin(agg, aggs, opt)
}
return p
}
func (*maxMinEliminator) name() string {
return "max_min_eliminate"
}
func appendEliminateSingleMaxMinTrace(agg *LogicalAggregation, sel *LogicalSelection, sort *LogicalSort, limit *LogicalLimit, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString("")
if sel != nil {
buffer.WriteString(fmt.Sprintf("add %v_%v,", sel.TP(), sel.ID()))
}
if sort != nil {
buffer.WriteString(fmt.Sprintf("add %v_%v,", sort.TP(), sort.ID()))
}
buffer.WriteString(fmt.Sprintf("add %v_%v during eliminating %v_%v %s function", limit.TP(), limit.ID(), agg.TP(), agg.ID(), agg.AggFuncs[0].Name))
return buffer.String()
}
reason := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v has only one function[%s] without group by", agg.TP(), agg.ID(), agg.AggFuncs[0].Name))
if sel != nil {
buffer.WriteString(fmt.Sprintf(", the columns in %v_%v shouldn't be NULL and needs NULL to be filtered out", agg.TP(), agg.ID()))
}
if sort != nil {
buffer.WriteString(fmt.Sprintf(", the columns in %v_%v should be sorted", agg.TP(), agg.ID()))
}
return buffer.String()
}
opt.appendStepToCurrent(agg.ID(), agg.TP(), reason, action)
}
func appendEliminateMultiMinMaxTraceStep(originAgg *LogicalAggregation, aggs []*LogicalAggregation, joins []*LogicalJoin, opt *logicalOptimizeOp) {
action := func() string {
buffer := bytes.NewBufferString(fmt.Sprintf("%v_%v splited into [", originAgg.TP(), originAgg.ID()))
for i, agg := range aggs {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%v_%v", agg.TP(), agg.ID()))
}
buffer.WriteString("], and add [")
for i, join := range joins {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%v_%v", join.TP(), join.ID()))
}
buffer.WriteString(fmt.Sprintf("] to connect them during eliminating %v_%v multi min/max functions", originAgg.TP(), originAgg.ID()))
return buffer.String()
}
reason := func() string {
buffer := bytes.NewBufferString("each column is sorted and can benefit from index/primary key in [")
for i, agg := range aggs {
if i > 0 {
buffer.WriteString(",")
}
buffer.WriteString(fmt.Sprintf("%v_%v", agg.TP(), agg.ID()))
}
buffer.WriteString("] and none of them has group by clause")
return buffer.String()
}
opt.appendStepToCurrent(originAgg.ID(), originAgg.TP(), reason, action)
}