forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 2
/
chunk.go
197 lines (174 loc) · 7.31 KB
/
chunk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreedto in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package worker
import (
"fmt"
"github.com/xsec-lab/go/vt/vterrors"
"golang.org/x/net/context"
"github.com/xsec-lab/go/sqlescape"
"github.com/xsec-lab/go/sqltypes"
"github.com/xsec-lab/go/vt/topo/topoproto"
"github.com/xsec-lab/go/vt/wrangler"
tabletmanagerdatapb "github.com/xsec-lab/go/vt/proto/tabletmanagerdata"
topodatapb "github.com/xsec-lab/go/vt/proto/topodata"
)
var (
completeChunk = chunk{sqltypes.NULL, sqltypes.NULL, 1, 1}
singleCompleteChunk = []chunk{completeChunk}
)
// chunk holds the information which subset of the table should be worked on.
// The subset is the range of rows in the range [start, end) where start and end
// both refer to the first column of the primary key.
// If the column is not numeric, both start and end will be sqltypes.NULL.
type chunk struct {
start sqltypes.Value
end sqltypes.Value
// number records the position of this chunk among all "total" chunks.
// The lowest value is 1.
number int
// total is the total number of chunks this chunk belongs to.
total int
}
// String returns a human-readable presentation of the chunk range.
func (c chunk) String() string {
// Pad the chunk number such that all log messages align nicely.
digits := digits(c.total)
return fmt.Sprintf("%*d/%d", digits, c.number, c.total)
}
func digits(i int) int {
digits := 1
for {
i /= 10
if i == 0 {
break
}
digits++
}
return digits
}
// generateChunks returns an array of chunks to use for splitting up a table
// into multiple data chunks. It only works for tables with a primary key
// whose first column is a numeric type.
func generateChunks(ctx context.Context, wr *wrangler.Wrangler, tablet *topodatapb.Tablet, td *tabletmanagerdatapb.TableDefinition, chunkCount, minRowsPerChunk int) ([]chunk, error) {
if len(td.PrimaryKeyColumns) == 0 {
// No explicit primary key. Cannot chunk the rows then.
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks because it has no primary key columns. This will reduce the performance of the clone.", td.Name)
return singleCompleteChunk, nil
}
if td.RowCount < 2*uint64(minRowsPerChunk) {
// The automatic adjustment of "chunkCount" based on "minRowsPerChunk"
// below would set "chunkCount" to less than 2 i.e. 1 or 0 chunks.
// In practice in this case there should be exactly one chunk.
// Return early in this case and notice the user about this.
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks because it has only %d rows.", td.Name, td.RowCount)
return singleCompleteChunk, nil
}
if chunkCount == 1 {
return singleCompleteChunk, nil
}
// Get the MIN and MAX of the leading column of the primary key.
query := fmt.Sprintf("SELECT MIN(%v), MAX(%v) FROM %v.%v", sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(td.PrimaryKeyColumns[0]), sqlescape.EscapeID(topoproto.TabletDbName(tablet)), sqlescape.EscapeID(td.Name))
shortCtx, cancel := context.WithTimeout(ctx, *remoteActionsTimeout)
qr, err := wr.TabletManagerClient().ExecuteFetchAsApp(shortCtx, tablet, true, []byte(query), 1)
cancel()
if err != nil {
return nil, vterrors.Wrapf(err, "tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. ExecuteFetchAsApp: %v", topoproto.TabletAliasString(tablet.Alias), td.Name, err)
}
if len(qr.Rows) != 1 {
return nil, fmt.Errorf("tablet: %v, table: %v: cannot determine MIN and MAX of the first primary key column. Zero rows were returned", topoproto.TabletAliasString(tablet.Alias), td.Name)
}
result := sqltypes.Proto3ToResult(qr)
min, _ := sqltypes.ToNative(result.Rows[0][0])
max, _ := sqltypes.ToNative(result.Rows[0][1])
if min == nil || max == nil {
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks, min or max is NULL: %v", td.Name, qr.Rows[0])
return singleCompleteChunk, nil
}
// Determine the average number of rows per chunk for the given chunkCount.
avgRowsPerChunk := td.RowCount / uint64(chunkCount)
if avgRowsPerChunk < uint64(minRowsPerChunk) {
// Reduce the chunkCount to fulfill minRowsPerChunk.
newChunkCount := td.RowCount / uint64(minRowsPerChunk)
wr.Logger().Infof("table=%v: Reducing the number of chunks from the default %d to %d to make sure that each chunk has at least %d rows.", td.Name, chunkCount, newChunkCount, minRowsPerChunk)
chunkCount = int(newChunkCount)
}
// TODO(mberlin): Write a unit test for this part of the function.
var interval interface{}
chunks := make([]chunk, chunkCount)
switch min := min.(type) {
case int64:
max := max.(int64)
interval = (max - min) / int64(chunkCount)
if interval == 0 {
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks, interval=0: %v to %v", td.Name, min, max)
return singleCompleteChunk, nil
}
case uint64:
max := max.(uint64)
interval = (max - min) / uint64(chunkCount)
if interval == 0 {
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks, interval=0: %v to %v", td.Name, min, max)
return singleCompleteChunk, nil
}
case float64:
max := max.(float64)
interval = (max - min) / float64(chunkCount)
if interval == 0 {
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks, interval=0: %v to %v", td.Name, min, max)
return singleCompleteChunk, nil
}
default:
wr.Logger().Infof("table=%v: Not splitting the table into multiple chunks, primary key not numeric.", td.Name)
return singleCompleteChunk, nil
}
// Create chunks.
start := min
for i := 0; i < chunkCount; i++ {
end := add(start, interval)
chunk, err := toChunk(start, end, i+1, chunkCount)
if err != nil {
return nil, vterrors.Wrapf(err, "tablet: %v, table: %v", topoproto.TabletAliasString(tablet.Alias), td.Name)
}
chunks[i] = chunk
start = end
}
// Clear out the MIN and MAX on the first and last chunk respectively
// because other shards might have smaller or higher values than the one we
// looked at.
chunks[0].start = sqltypes.NULL
chunks[chunkCount-1].end = sqltypes.NULL
return chunks, nil
}
func add(start, interval interface{}) interface{} {
switch start := start.(type) {
case int64:
return start + interval.(int64)
case uint64:
return start + interval.(uint64)
case float64:
return start + interval.(float64)
default:
panic(fmt.Sprintf("unsupported type %T for interval start: %v", start, start))
}
}
func toChunk(start, end interface{}, number, total int) (chunk, error) {
startValue, err := sqltypes.InterfaceToValue(start)
if err != nil {
return chunk{}, vterrors.Wrapf(err, "failed to convert calculated start value (%v) into internal sqltypes.Value", start)
}
endValue, err := sqltypes.InterfaceToValue(end)
if err != nil {
return chunk{}, vterrors.Wrapf(err, "failed to convert calculated end value (%v) into internal sqltypes.Value", end)
}
return chunk{startValue, endValue, number, total}, nil
}