-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
distinct.go
296 lines (260 loc) · 8.83 KB
/
distinct.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
/*
Copyright 2020 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package engine
import (
"context"
"fmt"
"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
)
// Distinct Primitive is used to uniqueify results
var _ Primitive = (*Distinct)(nil)
type (
// Distinct Primitive is used to uniqueify results
Distinct struct {
Source Primitive
CheckCols []CheckCol
Truncate int
}
CheckCol struct {
Col int
WsCol *int
Type sqltypes.Type
Collation collations.ID
}
probeTable struct {
seenRows map[evalengine.HashCode][]sqltypes.Row
checkCols []CheckCol
}
)
func (pt *probeTable) exists(inputRow sqltypes.Row) (bool, error) {
// the two prime numbers used here (17 and 31) are used to
// calculate hashcode from all column values in the input sqltypes.Row
code, err := pt.hashCodeForRow(inputRow)
if err != nil {
return false, err
}
existingRows, found := pt.seenRows[code]
if !found {
// nothing with this hash code found, we can be sure it's a not seen sqltypes.Row
pt.seenRows[code] = []sqltypes.Row{inputRow}
return false, nil
}
// we found something in the map - still need to check all individual values
// so we don't just fall for a hash collision
for _, existingRow := range existingRows {
exists, err := pt.equal(existingRow, inputRow)
if err != nil {
return false, err
}
if exists {
return true, nil
}
}
pt.seenRows[code] = append(existingRows, inputRow)
return false, nil
}
func (pt *probeTable) hashCodeForRow(inputRow sqltypes.Row) (evalengine.HashCode, error) {
// Why use 17 and 31 in this method?
// Copied from an old usenet discussion on the topic:
// https://groups.google.com/g/comp.programming/c/HSurZEyrZ1E?pli=1#d887b5bdb2dac99d
// > It's a mixture of superstition and good sense.
// > Suppose the multiplier were 26, and consider
// > hashing a hundred-character string. How much influence does
// > the string's first character have on the final value of `h',
// > just before the mod operation? The first character's value
// > will have been multiplied by MULT 99 times, so if the arithmetic
// > were done in infinite precision the value would consist of some
// > jumble of bits followed by 99 low-order zero bits -- each time
// > you multiply by MULT you introduce another low-order zero, right?
// > The computer's finite arithmetic just chops away all the excess
// > high-order bits, so the first character's actual contribution to
// > `h' is ... precisely zero! The `h' value depends only on the
// > rightmost 32 string characters (assuming a 32-bit int), and even
// > then things are not wonderful: the first of those final 32 bytes
// > influences only the leftmost bit of `h' and has no effect on
// > the remaining 31. Clearly, an even-valued MULT is a poor idea.
// >
// > Need MULT be prime? Not as far as I know (I don't know
// > everything); any odd value ought to suffice. 31 may be attractive
// > because it is close to a power of two, and it may be easier for
// > the compiler to replace a possibly slow multiply instruction with
// > a shift and subtract (31*x == (x << 5) - x) on machines where it
// > makes a difference. Setting MULT one greater than a power of two
// > (e.g., 33) would also be easy to optimize, but might produce too
// > "simple" an arrangement: mostly a juxtaposition of two copies
// > of the original set of bits, with a little mixing in the middle.
// > So you want an odd MULT that has plenty of one-bits.
code := evalengine.HashCode(17)
for i, checkCol := range pt.checkCols {
if i >= len(inputRow) {
return 0, vterrors.VT13001("index out of range in row when creating the DISTINCT hash code")
}
col := inputRow[checkCol.Col]
hashcode, err := evalengine.NullsafeHashcode(col, checkCol.Collation, col.Type())
if err != nil {
if err != evalengine.UnsupportedCollationHashError || checkCol.WsCol == nil {
return 0, err
}
checkCol = checkCol.SwitchToWeightString()
pt.checkCols[i] = checkCol
hashcode, err = evalengine.NullsafeHashcode(inputRow[checkCol.Col], checkCol.Collation, col.Type())
if err != nil {
return 0, err
}
}
code = code*31 + hashcode
}
return code, nil
}
func (pt *probeTable) equal(a, b sqltypes.Row) (bool, error) {
for i, checkCol := range pt.checkCols {
cmp, err := evalengine.NullsafeCompare(a[i], b[i], checkCol.Collation)
if err != nil {
_, isComparisonErr := err.(evalengine.UnsupportedComparisonError)
if !isComparisonErr || checkCol.WsCol == nil {
return false, err
}
checkCol = checkCol.SwitchToWeightString()
pt.checkCols[i] = checkCol
cmp, err = evalengine.NullsafeCompare(a[i], b[i], checkCol.Collation)
if err != nil {
return false, err
}
}
if cmp != 0 {
return false, nil
}
}
return true, nil
}
func newProbeTable(checkCols []CheckCol) *probeTable {
cols := make([]CheckCol, len(checkCols))
copy(cols, checkCols)
return &probeTable{
seenRows: map[evalengine.HashCode][]sqltypes.Row{},
checkCols: cols,
}
}
// TryExecute implements the Primitive interface
func (d *Distinct) TryExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool) (*sqltypes.Result, error) {
input, err := vcursor.ExecutePrimitive(ctx, d.Source, bindVars, wantfields)
if err != nil {
return nil, err
}
result := &sqltypes.Result{
Fields: input.Fields,
InsertID: input.InsertID,
}
pt := newProbeTable(d.CheckCols)
for _, row := range input.Rows {
exists, err := pt.exists(row)
if err != nil {
return nil, err
}
if !exists {
result.Rows = append(result.Rows, row)
}
}
if d.Truncate > 0 {
return result.Truncate(d.Truncate), nil
}
return result, err
}
// TryStreamExecute implements the Primitive interface
func (d *Distinct) TryStreamExecute(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable, wantfields bool, callback func(*sqltypes.Result) error) error {
pt := newProbeTable(d.CheckCols)
err := vcursor.StreamExecutePrimitive(ctx, d.Source, bindVars, wantfields, func(input *sqltypes.Result) error {
result := &sqltypes.Result{
Fields: input.Fields,
InsertID: input.InsertID,
}
for _, row := range input.Rows {
exists, err := pt.exists(row)
if err != nil {
return err
}
if !exists {
result.Rows = append(result.Rows, row)
}
}
return callback(result.Truncate(len(d.CheckCols)))
})
return err
}
// RouteType implements the Primitive interface
func (d *Distinct) RouteType() string {
return d.Source.RouteType()
}
// GetKeyspaceName implements the Primitive interface
func (d *Distinct) GetKeyspaceName() string {
return d.Source.GetKeyspaceName()
}
// GetTableName implements the Primitive interface
func (d *Distinct) GetTableName() string {
return d.Source.GetTableName()
}
// GetFields implements the Primitive interface
func (d *Distinct) GetFields(ctx context.Context, vcursor VCursor, bindVars map[string]*querypb.BindVariable) (*sqltypes.Result, error) {
return d.Source.GetFields(ctx, vcursor, bindVars)
}
// NeedsTransaction implements the Primitive interface
func (d *Distinct) NeedsTransaction() bool {
return d.Source.NeedsTransaction()
}
// Inputs implements the Primitive interface
func (d *Distinct) Inputs() []Primitive {
return []Primitive{d.Source}
}
func (d *Distinct) description() PrimitiveDescription {
other := map[string]any{}
var colls []string
for _, checkCol := range d.CheckCols {
colls = append(colls, checkCol.String())
}
if colls != nil {
other["Collations"] = colls
}
if d.Truncate > 0 {
other["ResultColumns"] = d.Truncate
}
return PrimitiveDescription{
Other: other,
OperatorType: "Distinct",
}
}
// SwitchToWeightString returns a new CheckCol that works on the weight string column instead
func (cc CheckCol) SwitchToWeightString() CheckCol {
return CheckCol{
Col: *cc.WsCol,
WsCol: nil,
Type: sqltypes.VarBinary,
Collation: collations.CollationBinaryID,
}
}
func (cc CheckCol) String() string {
var collation string
if sqltypes.IsText(cc.Type) && cc.Collation != collations.Unknown {
collation = ": " + cc.Collation.Get().Name()
}
var column string
if cc.WsCol == nil {
column = fmt.Sprintf("%d", cc.Col)
} else {
column = fmt.Sprintf("(%d:%d)", cc.Col, *cc.WsCol)
}
return column + collation
}