-
Notifications
You must be signed in to change notification settings - Fork 5.7k
/
index_merge_tmp.go
400 lines (355 loc) · 12.4 KB
/
index_merge_tmp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"bytes"
"context"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl/logutil"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/parser/model"
driver "github.com/pingcap/tidb/pkg/store/driver/txn"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
kvutil "github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)
func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(
txn kv.Transaction,
idxRecords []*temporaryIndexRecord,
) error {
if !w.currentIndex.Unique {
// non-unique key need no check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
}
batchVals, err := txn.BatchGet(context.Background(), w.originIdxKeys)
if err != nil {
return errors.Trace(err)
}
for i, key := range w.originIdxKeys {
if val, found := batchVals[string(key)]; found {
// Found a value in the original index key.
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
if kv.ErrKeyExists.Equal(err) {
return driver.ExtractKeyExistsErrFromIndex(key, val, w.table.Meta(), w.currentIndex.ID)
}
return errors.Trace(err)
}
} else if idxRecords[i].distinct {
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
batchVals[string(key)] = idxRecords[i].vals
}
}
return nil
}
func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originIdxVal []byte, tblInfo table.Table) error {
if !tmpRec.delete {
if tmpRec.distinct && !bytes.Equal(originIdxVal, tmpRec.vals) {
return kv.ErrKeyExists
}
// The key has been found in the original index, skip merging it.
tmpRec.skip = true
return nil
}
// Delete operation.
distinct := tablecodec.IndexKVIsUnique(originIdxVal)
if !distinct {
// For non-distinct key, it is consist of a null value and the handle.
// Same as the non-unique indexes, replay the delete operation on non-distinct keys.
return nil
}
// For distinct index key values, prevent deleting an unexpected index KV in original index.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(originIdxVal, tblInfo.Meta().IsCommonHandle)
if err != nil {
return errors.Trace(err)
}
if !tmpRec.handle.Equal(hdInVal) {
// The inequality means multiple modifications happened in the same key.
// We use the handle in origin index value to check if the row exists.
rowKey := tablecodec.EncodeRecordKey(tblInfo.RecordPrefix(), hdInVal)
_, err := txn.Get(context.Background(), rowKey)
if err != nil {
if kv.IsErrNotFound(err) {
// The row is deleted, so we can merge the delete operation to the origin index.
tmpRec.skip = false
return nil
}
// Unexpected errors.
return errors.Trace(err)
}
// Don't delete the index key if the row exists.
tmpRec.skip = true
return nil
}
return nil
}
// temporaryIndexRecord is the record information of an index.
type temporaryIndexRecord struct {
vals []byte
skip bool // skip indicates that the index key is already exists, we should not add it.
delete bool
unique bool
distinct bool
handle kv.Handle
}
type mergeIndexWorker struct {
*backfillCtx
indexes []table.Index
tmpIdxRecords []*temporaryIndexRecord
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key
needValidateKey bool
currentTempIndexPrefix []byte
currentIndex *model.IndexInfo
}
func newMergeTempIndexWorker(bfCtx *backfillCtx, t table.PhysicalTable, elements []*meta.Element) *mergeIndexWorker {
allIndexes := make([]table.Index, 0, len(elements))
for _, elem := range elements {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, elem.ID)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
allIndexes = append(allIndexes, index)
}
return &mergeIndexWorker{
backfillCtx: bfCtx,
indexes: allIndexes,
}
}
func (w *mergeIndexWorker) validateTaskRange(taskRange *reorgBackfillTask) (skip bool, err error) {
tmpID, err := tablecodec.DecodeIndexID(taskRange.startKey)
if err != nil {
return false, err
}
startIndexID := tmpID & tablecodec.IndexIDMask
tmpID, err = tablecodec.DecodeIndexID(taskRange.endKey)
if err != nil {
return false, err
}
endIndexID := tmpID & tablecodec.IndexIDMask
w.needValidateKey = startIndexID != endIndexID
containsTargetID := false
for _, idx := range w.indexes {
idxInfo := idx.Meta()
if idxInfo.ID == startIndexID {
containsTargetID = true
w.currentIndex = idxInfo
break
}
if idxInfo.ID == endIndexID {
containsTargetID = true
}
}
return !containsTargetID, nil
}
// BackfillData merge temp index data in txn.
func (w *mergeIndexWorker) BackfillData(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
skip, err := w.validateTaskRange(&taskRange)
if skip || err != nil {
return taskCtx, err
}
oprStartTime := time.Now()
ctx := kv.WithInternalSourceAndTaskType(context.Background(), w.jobContext.ddlJobSourceType(), kvutil.ExplicitTypeDDL)
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(_ context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
updateTxnEntrySizeLimitIfNeeded(txn)
txn.SetOption(kv.Priority, taskRange.priority)
if tagger := w.GetCtx().getResourceGroupTaggerForTopSQL(taskRange.getJobID()); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
txn.SetOption(kv.ResourceGroupName, w.jobContext.resourceGroupName)
tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone
err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
if err != nil {
return errors.Trace(err)
}
for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}
// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}
if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
if err != nil {
return err
}
taskCtx.addedCount++
}
return nil
})
failpoint.Inject("mockDMLExecutionMerging", func(val failpoint.Value) {
//nolint:forcetypeassert
if val.(bool) && MockDMLExecutionMerging != nil {
MockDMLExecutionMerging()
}
})
logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
func (*mergeIndexWorker) AddMetricInfo(float64) {
}
func (*mergeIndexWorker) String() string {
return typeAddIndexMergeTmpWorker.String()
}
func (w *mergeIndexWorker) GetCtx() *backfillCtx {
return w.backfillCtx
}
func (w *mergeIndexWorker) prefixIsChanged(newKey kv.Key) bool {
return len(w.currentTempIndexPrefix) == 0 || !bytes.HasPrefix(newKey, w.currentTempIndexPrefix)
}
func (w *mergeIndexWorker) updateCurrentIndexInfo(newIndexKey kv.Key) (skip bool, err error) {
tempIdxID, err := tablecodec.DecodeIndexID(newIndexKey)
if err != nil {
return false, err
}
idxID := tablecodec.IndexIDMask & tempIdxID
var curIdx *model.IndexInfo
for _, idx := range w.indexes {
if idx.Meta().ID == idxID {
curIdx = idx.Meta()
}
}
if curIdx == nil {
// Index IDs are always increasing, but not always continuous:
// if DDL adds another index between these indexes, it is possible that:
// multi-schema add index IDs = [1, 2, 4, 5]
// another index ID = [3]
// If the new index get rollback, temp index 0xFFxxx03 may have dirty records.
// We should skip these dirty records.
return true, nil
}
pfx := tablecodec.CutIndexPrefix(newIndexKey)
w.currentTempIndexPrefix = kv.Key(pfx).Clone()
w.currentIndex = curIdx
return false, nil
}
func (w *mergeIndexWorker) fetchTempIndexVals(
txn kv.Transaction,
taskRange reorgBackfillTask,
) ([]*temporaryIndexRecord, kv.Key, bool, error) {
startTime := time.Now()
w.tmpIdxRecords = w.tmpIdxRecords[:0]
w.tmpIdxKeys = w.tmpIdxKeys[:0]
w.originIdxKeys = w.originIdxKeys[:0]
// taskDone means that the merged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
idxPrefix := w.table.IndexPrefix()
var lastKey kv.Key
err := iterateSnapshotKeys(w.jobContext, w.sessCtx.GetStore(), taskRange.priority, idxPrefix, txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterate temporary index in merge process", 0)
oprStartTime = oprEndTime
taskDone = indexKey.Cmp(taskRange.endKey) >= 0
if taskDone || len(w.tmpIdxRecords) >= w.batchCnt {
return false, nil
}
if w.needValidateKey && w.prefixIsChanged(indexKey) {
skip, err := w.updateCurrentIndexInfo(indexKey)
if err != nil || skip {
return skip, err
}
}
tempIdxVal, err := tablecodec.DecodeTempIndexValue(rawValue)
if err != nil {
return false, err
}
tempIdxVal, err = decodeTempIndexHandleFromIndexKV(indexKey, tempIdxVal, len(w.currentIndex.Columns))
if err != nil {
return false, err
}
tempIdxVal = tempIdxVal.FilterOverwritten()
// Extract the operations on the original index and replay them later.
for _, elem := range tempIdxVal {
if elem.KeyVer == tables.TempIndexKeyTypeMerge || elem.KeyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
continue
}
originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(originIdxKey)
idxRecord := &temporaryIndexRecord{
handle: elem.Handle,
delete: elem.Delete,
unique: elem.Distinct,
skip: false,
}
if !elem.Delete {
idxRecord.vals = elem.Value
idxRecord.distinct = tablecodec.IndexKVIsUnique(elem.Value)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey)
}
lastKey = indexKey
return true, nil
})
if len(w.tmpIdxRecords) == 0 {
taskDone = true
}
var nextKey kv.Key
if taskDone {
nextKey = taskRange.endKey
} else {
nextKey = lastKey
}
logutil.DDLLogger().Debug("merge temp index txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()),
zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.tmpIdxRecords, nextKey.Next(), taskDone, errors.Trace(err)
}
func decodeTempIndexHandleFromIndexKV(indexKey kv.Key, tmpVal tablecodec.TempIndexValue, idxColLen int) (ret tablecodec.TempIndexValue, err error) {
for _, elem := range tmpVal {
if elem.Handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
elem.Handle, err = tablecodec.DecodeIndexHandle(indexKey, elem.Value, idxColLen)
if err != nil {
return nil, err
}
}
}
return tmpVal, nil
}