-
Notifications
You must be signed in to change notification settings - Fork 5.7k
/
index_merge_tmp.go
287 lines (255 loc) · 9.41 KB
/
index_merge_tmp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"bytes"
"context"
"time"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
// IsEnableFastReorg check whether Fast Reorg is allowed.
func IsEnableFastReorg() bool {
return variable.EnableFastReorg.Load()
}
func (w *mergeIndexWorker) batchCheckTemporaryUniqueKey(txn kv.Transaction, idxRecords []*temporaryIndexRecord) error {
idxInfo := w.index.Meta()
if !idxInfo.Unique {
// non-unique key need no check, just overwrite it,
// because in most case, backfilling indices is not exists.
return nil
}
batchVals, err := txn.BatchGet(context.Background(), w.originIdxKeys)
if err != nil {
return errors.Trace(err)
}
for i, key := range w.originIdxKeys {
if val, found := batchVals[string(key)]; found {
// Found a value in the original index key.
err := checkTempIndexKey(txn, idxRecords[i], val, w.table)
if err != nil {
return errors.Trace(err)
}
} else if idxRecords[i].distinct {
// The keys in w.batchCheckKeys also maybe duplicate,
// so we need to backfill the not found key into `batchVals` map.
batchVals[string(key)] = idxRecords[i].vals
}
}
return nil
}
func checkTempIndexKey(txn kv.Transaction, tmpRec *temporaryIndexRecord, originIdxVal []byte, tblInfo table.Table) error {
if !tmpRec.delete {
if tmpRec.distinct && !bytes.Equal(originIdxVal, tmpRec.vals) {
return kv.ErrKeyExists
}
// The key has been found in the original index, skip merging it.
tmpRec.skip = true
return nil
}
// Delete operation.
distinct := tablecodec.IndexKVIsUnique(originIdxVal)
if !distinct {
// For non-distinct key, it is consist of a null value and the handle.
// Same as the non-unique indexes, replay the delete operation on non-distinct keys.
return nil
}
// For distinct index key values, prevent deleting an unexpected index KV in original index.
hdInVal, err := tablecodec.DecodeHandleInUniqueIndexValue(originIdxVal, tblInfo.Meta().IsCommonHandle)
if err != nil {
return errors.Trace(err)
}
if !tmpRec.handle.Equal(hdInVal) {
// The inequality means multiple modifications happened in the same key.
// We use the handle in origin index value to check if the row exists.
rowKey := tablecodec.EncodeRecordKey(tblInfo.RecordPrefix(), hdInVal)
_, err := txn.Get(context.Background(), rowKey)
if err != nil {
if kv.IsErrNotFound(err) {
// The row is deleted, so we can merge the delete operation to the origin index.
tmpRec.skip = false
return nil
}
// Unexpected errors.
return errors.Trace(err)
}
// Don't delete the index key if the row exists.
tmpRec.skip = true
return nil
}
return nil
}
// temporaryIndexRecord is the record information of an index.
type temporaryIndexRecord struct {
vals []byte
skip bool // skip indicates that the index key is already exists, we should not add it.
delete bool
unique bool
distinct bool
handle kv.Handle
rowKey kv.Key
}
type mergeIndexWorker struct {
*backfillWorker
index table.Index
tmpIdxRecords []*temporaryIndexRecord
originIdxKeys []kv.Key
tmpIdxKeys []kv.Key
jobContext *JobContext
}
func newMergeTempIndexWorker(sessCtx sessionctx.Context, id int, t table.PhysicalTable, reorgInfo *reorgInfo, jc *JobContext) *mergeIndexWorker {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, reorgInfo.currElement.ID)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
return &mergeIndexWorker{
backfillWorker: newBackfillWorker(jc.ddlJobCtx, sessCtx, id, t, reorgInfo, typeAddIndexMergeTmpWorker),
index: index,
jobContext: jc,
}
}
// BackfillDataInTxn merge temp index data in txn.
func (w *mergeIndexWorker) BackfillDataInTxn(taskRange reorgBackfillTask) (taskCtx backfillTaskContext, errInTxn error) {
oprStartTime := time.Now()
ctx := kv.WithInternalSourceType(context.Background(), w.jobContext.ddlJobSourceType())
errInTxn = kv.RunInNewTxn(ctx, w.sessCtx.GetStore(), true, func(ctx context.Context, txn kv.Transaction) error {
taskCtx.addedCount = 0
taskCtx.scanCount = 0
txn.SetOption(kv.Priority, w.priority)
if tagger := w.reorgInfo.d.getResourceGroupTaggerForTopSQL(w.reorgInfo.Job); tagger != nil {
txn.SetOption(kv.ResourceGroupTagger, tagger)
}
tmpIdxRecords, nextKey, taskDone, err := w.fetchTempIndexVals(txn, taskRange)
if err != nil {
return errors.Trace(err)
}
taskCtx.nextKey = nextKey
taskCtx.done = taskDone
err = w.batchCheckTemporaryUniqueKey(txn, tmpIdxRecords)
if err != nil {
return errors.Trace(err)
}
for i, idxRecord := range tmpIdxRecords {
taskCtx.scanCount++
// The index is already exists, we skip it, no needs to backfill it.
// The following update, delete, insert on these rows, TiDB can handle it correctly.
// If all batch are skipped, update first index key to make txn commit to release lock.
if idxRecord.skip {
continue
}
// Lock the corresponding row keys so that it doesn't modify the index KVs
// that are changing by a pessimistic transaction.
rowKey := tablecodec.EncodeRecordKey(w.table.RecordPrefix(), idxRecord.handle)
err := txn.LockKeys(context.Background(), new(kv.LockCtx), rowKey)
if err != nil {
return errors.Trace(err)
}
if idxRecord.delete {
if idxRecord.unique {
err = txn.GetMemBuffer().DeleteWithFlags(w.originIdxKeys[i], kv.SetNeedLocked)
} else {
err = txn.GetMemBuffer().Delete(w.originIdxKeys[i])
}
} else {
err = txn.GetMemBuffer().Set(w.originIdxKeys[i], idxRecord.vals)
}
if err != nil {
return err
}
taskCtx.addedCount++
}
return nil
})
logSlowOperations(time.Since(oprStartTime), "AddIndexMergeDataInTxn", 3000)
return
}
func (w *mergeIndexWorker) AddMetricInfo(cnt float64) {
}
func (w *mergeIndexWorker) fetchTempIndexVals(txn kv.Transaction, taskRange reorgBackfillTask) ([]*temporaryIndexRecord, kv.Key, bool, error) {
startTime := time.Now()
w.tmpIdxRecords = w.tmpIdxRecords[:0]
w.tmpIdxKeys = w.tmpIdxKeys[:0]
w.originIdxKeys = w.originIdxKeys[:0]
// taskDone means that the merged handle is out of taskRange.endHandle.
taskDone := false
oprStartTime := startTime
idxPrefix := w.table.IndexPrefix()
var lastKey kv.Key
isCommonHandle := w.table.Meta().IsCommonHandle
err := iterateSnapshotKeys(w.reorgInfo.d.jobContext(w.reorgInfo.Job), w.sessCtx.GetStore(), w.priority, idxPrefix, txn.StartTS(),
taskRange.startKey, taskRange.endKey, func(_ kv.Handle, indexKey kv.Key, rawValue []byte) (more bool, err error) {
oprEndTime := time.Now()
logSlowOperations(oprEndTime.Sub(oprStartTime), "iterate temporary index in merge process", 0)
oprStartTime = oprEndTime
if taskRange.endInclude {
taskDone = indexKey.Cmp(taskRange.endKey) > 0
} else {
taskDone = indexKey.Cmp(taskRange.endKey) >= 0
}
if taskDone || len(w.tmpIdxRecords) >= w.batchCnt {
return false, nil
}
originVal, handle, isDelete, unique, keyVer := tablecodec.DecodeTempIndexValue(rawValue, isCommonHandle)
if keyVer == tables.TempIndexKeyTypeMerge || keyVer == tables.TempIndexKeyTypeDelete {
// For 'm' version kvs, they are double-written.
// For 'd' version kvs, they are written in the delete-only state and can be dropped safely.
return true, nil
}
if handle == nil {
// If the handle is not found in the value of the temp index, it means
// 1) This is not a deletion marker, the handle is in the key or the origin value.
// 2) This is a deletion marker, but the handle is in the key of temp index.
handle, err = tablecodec.DecodeIndexHandle(indexKey, originVal, len(w.index.Meta().Columns))
if err != nil {
return false, err
}
}
originIdxKey := make([]byte, len(indexKey))
copy(originIdxKey, indexKey)
tablecodec.TempIndexKey2IndexKey(w.index.Meta().ID, originIdxKey)
idxRecord := &temporaryIndexRecord{
handle: handle,
delete: isDelete,
unique: unique,
skip: false,
}
if !isDelete {
idxRecord.vals = originVal
idxRecord.distinct = tablecodec.IndexKVIsUnique(originVal)
}
w.tmpIdxRecords = append(w.tmpIdxRecords, idxRecord)
w.originIdxKeys = append(w.originIdxKeys, originIdxKey)
w.tmpIdxKeys = append(w.tmpIdxKeys, indexKey)
lastKey = indexKey
return true, nil
})
if len(w.tmpIdxRecords) == 0 {
taskDone = true
}
var nextKey kv.Key
if taskDone {
nextKey = taskRange.endKey
} else {
nextKey = lastKey
}
logutil.BgLogger().Debug("[ddl] merge temp index txn fetches handle info", zap.Uint64("txnStartTS", txn.StartTS()),
zap.String("taskRange", taskRange.String()), zap.Duration("takeTime", time.Since(startTime)))
return w.tmpIdxRecords, nextKey.Next(), taskDone, errors.Trace(err)
}