/
physical_hashagg.go
186 lines (170 loc) · 5.78 KB
/
physical_hashagg.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
//
// This source code is a modified form of original source from the TiDB project, which has the following copyright header(s):
//
// Copyright 2016 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package planner
import (
"github.com/pingcap/parser/ast"
"github.com/squareup/pranadb/tidb/expression"
"github.com/squareup/pranadb/tidb/expression/aggregation"
"github.com/squareup/pranadb/tidb/planner/property"
"github.com/squareup/pranadb/tidb/sessionctx"
"math"
)
var aggFuncFactor = map[string]float64{
ast.AggFuncCount: 1.0,
ast.AggFuncSum: 1.0,
ast.AggFuncAvg: 2.0,
ast.AggFuncFirstRow: 0.1,
ast.AggFuncMax: 1.0,
ast.AggFuncMin: 1.0,
ast.AggFuncGroupConcat: 1.0,
ast.AggFuncBitOr: 0.9,
ast.AggFuncBitXor: 0.9,
ast.AggFuncBitAnd: 0.9,
ast.AggFuncVarPop: 3.0,
ast.AggFuncVarSamp: 3.0,
ast.AggFuncStddevPop: 3.0,
ast.AggFuncStddevSamp: 3.0,
"default": 1.5,
}
var _ PhysicalPlan = &PhysicalHashAgg{}
// PhysicalHashAgg is hash operator of aggregate.
type PhysicalHashAgg struct {
basePhysicalAgg
}
// NewPhysicalHashAgg creates a new PhysicalHashAgg from a LogicalAggregation.
func NewPhysicalHashAgg(la *LogicalAggregation, newStats *property.StatsInfo, prop *property.PhysicalProperty) *PhysicalHashAgg {
agg := basePhysicalAgg{
GroupByItems: la.GroupByItems,
AggFuncs: la.AggFuncs,
}.initForHash(la.ctx, newStats, la.blockOffset, prop)
return agg
}
func (base basePhysicalAgg) initForHash(ctx sessionctx.Context, stats *property.StatsInfo, offset int, props ...*property.PhysicalProperty) *PhysicalHashAgg {
p := &PhysicalHashAgg{base}
p.basePhysicalPlan = newBasePhysicalPlan(ctx, TypeHashAgg, p, offset)
p.childrenReqProps = props
p.stats = stats
return p
}
// cpuCostDivisor computes the concurrency to which we would amortize CPU cost
// for hash aggregation.
func (p *PhysicalHashAgg) cpuCostDivisor(hasDistinct bool) (float64, float64) {
if hasDistinct {
return 0, 0
}
sessionVars := p.ctx.GetSessionVars()
finalCon, partialCon := sessionVars.HashAggFinalConcurrency(), sessionVars.HashAggPartialConcurrency()
// According to `ValidateSetSystemVar`, `finalCon` and `partialCon` cannot be less than or equal to 0.
if finalCon == 1 && partialCon == 1 {
return 0, 0
}
// It is tricky to decide which concurrency we should use to amortize CPU cost. Since cost of hash
// aggregation is tend to be under-estimated as explained in `attach2Task`, we choose the smaller
// concurrecy to make some compensation.
return math.Min(float64(finalCon), float64(partialCon)), float64(finalCon + partialCon)
}
// GetCost computes the cost of hash aggregation considering CPU/memory.
func (p *PhysicalHashAgg) GetCost(inputRows float64, isRoot bool, isMPP bool) float64 {
cardinality := p.statsInfo().RowCount
numDistinctFunc := p.numDistinctFunc()
aggFuncFactor := p.getAggFuncCostFactor(isMPP)
var cpuCost float64
sessVars := p.ctx.GetSessionVars()
cpuCost = inputRows * sessVars.CPUFactor * aggFuncFactor
divisor, con := p.cpuCostDivisor(numDistinctFunc > 0)
if divisor > 0 {
cpuCost /= divisor
// Cost of additional goroutines.
cpuCost += (con + 1) * sessVars.ConcurrencyFactor
}
memoryCost := cardinality * sessVars.MemoryFactor * float64(len(p.AggFuncs))
// When aggregation has distinct flag, we would allocate a map for each group to
// check duplication.
memoryCost += inputRows * distinctFactor * sessVars.MemoryFactor * float64(numDistinctFunc)
return cpuCost + memoryCost
}
type basePhysicalAgg struct {
physicalSchemaProducer
AggFuncs []*aggregation.AggFuncDesc
GroupByItems []expression.Expression
}
func (p *basePhysicalAgg) isFinalAgg() bool {
if len(p.AggFuncs) > 0 {
if p.AggFuncs[0].Mode == aggregation.FinalMode || p.AggFuncs[0].Mode == aggregation.CompleteMode {
return true
}
}
return false
}
func (p *basePhysicalAgg) numDistinctFunc() (num int) {
for _, fun := range p.AggFuncs {
if fun.HasDistinct {
num++
}
}
return
}
func (p *basePhysicalAgg) getAggFuncCostFactor(isMPP bool) (factor float64) {
factor = 0.0
for _, agg := range p.AggFuncs {
if fac, ok := aggFuncFactor[agg.Name]; ok {
factor += fac
} else {
factor += aggFuncFactor["default"]
}
}
if factor == 0 {
if isMPP {
// The default factor 1.0 will lead to 1-phase agg in pseudo stats settings.
// But in mpp cases, 2-phase is more usual. So we change this factor.
// TODO: This is still a little tricky and might cause regression. We should
// calibrate these factors and polish our cost model in the future.
factor = aggFuncFactor[ast.AggFuncFirstRow]
} else {
factor = 1.0
}
}
return
}
// ResolveIndices implements Plan interface.
func (p *basePhysicalAgg) ResolveIndices() (err error) {
err = p.physicalSchemaProducer.ResolveIndices()
if err != nil {
return err
}
for _, aggFun := range p.AggFuncs {
for i, arg := range aggFun.Args {
aggFun.Args[i], err = arg.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
}
for _, byItem := range aggFun.OrderByItems {
byItem.Expr, err = byItem.Expr.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
}
}
for i, item := range p.GroupByItems {
p.GroupByItems[i], err = item.ResolveIndices(p.children[0].Schema())
if err != nil {
return err
}
}
return
}