-
Notifications
You must be signed in to change notification settings - Fork 9
/
live_migration.go
413 lines (371 loc) · 14.5 KB
/
live_migration.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
/*
Copyright (c) YugabyteDB, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"context"
"database/sql"
"errors"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"sort"
"time"
"github.com/google/uuid"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"github.com/yugabyte/yb-voyager/yb-voyager/src/metadb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/namereg"
reporter "github.com/yugabyte/yb-voyager/yb-voyager/src/reporter/stats"
"github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname"
)
var NUM_EVENT_CHANNELS int
var EVENT_CHANNEL_SIZE int // has to be > MAX_EVENTS_PER_BATCH
var MAX_EVENTS_PER_BATCH int
var MAX_INTERVAL_BETWEEN_BATCHES int //ms
var END_OF_QUEUE_SEGMENT_EVENT = &tgtdb.Event{Op: "end_of_source_queue_segment"}
var FLUSH_BATCH_EVENT = &tgtdb.Event{Op: "flush_batch"}
var eventQueue *EventQueue
var statsReporter *reporter.StreamImportStatsReporter
func init() {
NUM_EVENT_CHANNELS = utils.GetEnvAsInt("NUM_EVENT_CHANNELS", 100)
EVENT_CHANNEL_SIZE = utils.GetEnvAsInt("EVENT_CHANNEL_SIZE", 500)
MAX_EVENTS_PER_BATCH = utils.GetEnvAsInt("MAX_EVENTS_PER_BATCH", 500)
MAX_INTERVAL_BETWEEN_BATCHES = utils.GetEnvAsInt("MAX_INTERVAL_BETWEEN_BATCHES", 2000)
}
func streamChanges(state *ImportDataState, tableNames []sqlname.NameTuple) error {
log.Infof("NUM_EVENT_CHANNELS: %d, EVENT_CHANNEL_SIZE: %d, MAX_EVENTS_PER_BATCH: %d, MAX_INTERVAL_BETWEEN_BATCHES: %d",
NUM_EVENT_CHANNELS, EVENT_CHANNEL_SIZE, MAX_EVENTS_PER_BATCH, MAX_INTERVAL_BETWEEN_BATCHES)
// re-initilizing name registry in case it hadn't picked up the names registered on source/target/source-replica
err := namereg.NameReg.Init()
if err != nil {
return fmt.Errorf("init name registry again: %v", err)
}
tdb.PrepareForStreaming()
err = state.InitLiveMigrationState(migrationUUID, NUM_EVENT_CHANNELS, bool(startClean), tableNames)
if err != nil {
utils.ErrExit("Failed to init event channels metadata table on target DB: %s", err)
}
eventChannelsMetaInfo, err := state.GetEventChannelsMetaInfo(migrationUUID)
if err != nil {
return fmt.Errorf("failed to fetch event channel meta info from target : %w", err)
}
numInserts, numUpdates, numDeletes, err := state.GetTotalNumOfEventsImportedByType(migrationUUID)
if err != nil {
return fmt.Errorf("failed to fetch import stats meta by type: %w", err)
}
statsReporter = reporter.NewStreamImportStatsReporter(importerRole)
err = statsReporter.Init(migrationUUID, metaDB, numInserts, numUpdates, numDeletes)
if err != nil {
return fmt.Errorf("failed to initialize stats reporter: %w", err)
}
if !disablePb {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go statsReporter.ReportStats(ctx)
defer statsReporter.Finalize()
}
eventQueue = NewEventQueue(exportDir)
// setup target event channels
var evChans []chan *tgtdb.Event
var processingDoneChans []chan bool
for i := 0; i < NUM_EVENT_CHANNELS; i++ {
evChans = append(evChans, make(chan *tgtdb.Event, EVENT_CHANNEL_SIZE))
processingDoneChans = append(processingDoneChans, make(chan bool, 1))
}
log.Infof("streaming changes from %s", eventQueue.QueueDirPath)
for !eventQueue.EndOfQueue { // continuously get next segments to stream
segment, err := eventQueue.GetNextSegment()
if err != nil {
if segment == nil && (errors.Is(err, os.ErrNotExist) || errors.Is(err, sql.ErrNoRows)) {
time.Sleep(2 * time.Second)
continue
}
return fmt.Errorf("error getting next segment to stream: %v", err)
}
log.Infof("got next segment to stream: %v", segment)
err = streamChangesFromSegment(segment, evChans, processingDoneChans, eventChannelsMetaInfo, statsReporter, state)
if err != nil {
return fmt.Errorf("error streaming changes for segment %s: %v", segment.FilePath, err)
}
}
return nil
}
var prevExporterRole = ""
func streamChangesFromSegment(
segment *EventQueueSegment,
evChans []chan *tgtdb.Event,
processingDoneChans []chan bool,
eventChannelsMetaInfo map[int]EventChannelMetaInfo,
statsReporter *reporter.StreamImportStatsReporter,
state *ImportDataState) error {
err := segment.Open()
if err != nil {
return err
}
defer segment.Close()
// start target event channel processors
for i := 0; i < NUM_EVENT_CHANNELS; i++ {
var chanLastAppliedVsn int64
chanMetaInfo, exists := eventChannelsMetaInfo[i]
if exists {
chanLastAppliedVsn = chanMetaInfo.LastAppliedVsn
} else {
return fmt.Errorf("unable to find channel meta info for channel - %v", i)
}
go processEvents(i, evChans[i], chanLastAppliedVsn, processingDoneChans[i], statsReporter, state)
}
log.Infof("streaming changes for segment %s", segment.FilePath)
for !segment.IsProcessed() {
event, err := segment.NextEvent()
if err != nil {
return err
}
if event == nil && segment.IsProcessed() {
break
}
// segment switch and cutover(for example: source changed from PG to YB)
if event != nil && prevExporterRole != event.ExporterRole {
/*
Note: `sourceDBType` is a global variable, which always represent the initial source db type
which does not change even after cutover to target but for conflict detection cache,
we need to use the actual source db type at the moment since we save information like
TableToUniqueKeyColumns during export(from source/target) to reuse it during import
*/
sourceDBTypeForConflictCache := lo.Ternary(isTargetDBExporter(event.ExporterRole), "yugabytedb", sourceDBType)
err = initializeConflictDetectionCache(evChans, event.ExporterRole, sourceDBTypeForConflictCache)
if err != nil {
return fmt.Errorf("error initializing conflict detection cache: %w", err)
}
prevExporterRole = event.ExporterRole
}
if event.IsCutoverToTarget() && importerRole == TARGET_DB_IMPORTER_ROLE ||
event.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE ||
event.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command
eventQueue.EndOfQueue = true
segment.MarkProcessed()
break
}
err = handleEvent(event, evChans)
if err != nil {
return fmt.Errorf("error handling event: %v", err)
}
}
for i := 0; i < NUM_EVENT_CHANNELS; i++ {
evChans[i] <- END_OF_QUEUE_SEGMENT_EVENT
}
for i := 0; i < NUM_EVENT_CHANNELS; i++ {
<-processingDoneChans[i]
}
err = metaDB.MarkEventQueueSegmentAsProcessed(segment.SegmentNum, importerRole)
if err != nil {
return fmt.Errorf("error marking segment %s as processed: %v", segment.FilePath, err)
}
log.Infof("finished streaming changes from segment %s\n", filepath.Base(segment.FilePath))
return nil
}
func shouldFormatValues(event *tgtdb.Event) bool {
switch tconf.TargetDBType {
case YUGABYTEDB, POSTGRESQL:
return event.Op == "u"
case ORACLE:
return true
}
return false
}
func handleEvent(event *tgtdb.Event, evChans []chan *tgtdb.Event) error {
if event.IsCutoverEvent() {
// nil in case of cutover or fall_forward events for unconcerned importer
return nil
}
log.Debugf("handling event: %v", event)
// hash event
// Note: hash the event before running the keys/values through the value converter.
// This is because the value converter can generate different values (formatting vs no formatting) for the same key
// which will affect hash value.
h := hashEvent(event)
/*
Checking for all possible conflicts among events
For more details about ConflictDetectionCache see the comment on line 11 in [conflictDetectionCache.go](../conflictDetectionCache.go)
*/
uniqueKeyCols, _ := conflictDetectionCache.tableToUniqueKeyColumns.Get(event.TableNameTup)
if len(uniqueKeyCols) > 0 {
if event.Op == "d" {
conflictDetectionCache.Put(event)
} else { // "i" or "u"
conflictDetectionCache.WaitUntilNoConflict(event)
if event.IsUniqueKeyChanged(uniqueKeyCols) {
conflictDetectionCache.Put(event)
}
}
}
// preparing value converters for the streaming mode
err := valueConverter.ConvertEvent(event, event.TableNameTup, shouldFormatValues(event))
if err != nil {
return fmt.Errorf("error transforming event key fields: %v", err)
}
evChans[h] <- event
log.Tracef("inserted event %v into channel %v", event.Vsn, h)
return nil
}
// Returns a hash value between 0..NUM_EVENT_CHANNELS
func hashEvent(e *tgtdb.Event) int {
hash := fnv.New64a()
hash.Write([]byte(e.TableNameTup.ForKey()))
keyColumns := make([]string, 0)
for k := range e.Key {
keyColumns = append(keyColumns, k)
}
// sort to ensure input to hash is consistent.
sort.Strings(keyColumns)
for _, k := range keyColumns {
hash.Write([]byte(*e.Key[k]))
}
return int(hash.Sum64() % (uint64(NUM_EVENT_CHANNELS)))
}
func processEvents(chanNo int, evChan chan *tgtdb.Event, lastAppliedVsn int64, done chan bool, statsReporter *reporter.StreamImportStatsReporter, state *ImportDataState) {
endOfProcessing := false
for !endOfProcessing {
batch := []*tgtdb.Event{}
timer := time.NewTimer(time.Duration(MAX_INTERVAL_BETWEEN_BATCHES) * time.Millisecond)
Batching:
for {
// read from channel until MAX_EVENTS_PER_BATCH or MAX_INTERVAL_BETWEEN_BATCHES
select {
case event := <-evChan:
if event == END_OF_QUEUE_SEGMENT_EVENT {
endOfProcessing = true
break Batching
}
if event == FLUSH_BATCH_EVENT {
break Batching
}
if event.Vsn <= lastAppliedVsn {
log.Tracef("ignoring event %v because event vsn <= %v", event, lastAppliedVsn)
continue
}
if importerRole == SOURCE_DB_IMPORTER_ROLE && event.ExporterRole != TARGET_DB_EXPORTER_FB_ROLE {
log.Tracef("ignoring event %v because importer role is FB_DB_IMPORTER_ROLE and event exporter role is not TARGET_DB_EXPORTER_FB_ROLE.", event)
continue
}
batch = append(batch, event)
if len(batch) >= MAX_EVENTS_PER_BATCH {
break Batching
}
case <-timer.C:
break Batching
}
}
timer.Stop()
if len(batch) == 0 {
continue
}
start := time.Now()
eventBatch := tgtdb.NewEventBatch(batch, chanNo)
var err error
sleepIntervalSec := 0
for attempt := 0; attempt < EVENT_BATCH_MAX_RETRY_COUNT; attempt++ {
err = tdb.ExecuteBatch(migrationUUID, eventBatch)
if err == nil {
conflictDetectionCache.RemoveEvents(eventBatch)
break
} else if tdb.IsNonRetryableCopyError(err) {
break
}
log.Warnf("retriable error executing batch(%s) on channel %v (last VSN: %d): %v", eventBatch.ID(), chanNo, eventBatch.GetLastVsn(), err)
sleepIntervalSec += 10
if sleepIntervalSec > MAX_SLEEP_SECOND {
sleepIntervalSec = MAX_SLEEP_SECOND
}
log.Infof("sleep for %d seconds before retrying the batch on channel %v (attempt %d)",
sleepIntervalSec, chanNo, attempt)
time.Sleep(time.Duration(sleepIntervalSec) * time.Second)
// In certain situations, we get an error on `targetDB.ExecuteBatch`, but eventually the transaction is committed.
// For example, in Yugabyte, we can get an `rpc timeout` on commit, and the commit eventually succeeds on YB server.
// Retrying an already executed batch has consequences:
// - It can fail with some duplicate / unique key constraint errors
// - Stats will double count the events.
// Therefore, we check if batch has already been imported before retrying.
alreadyImported, aerr := checkifEventBatchAlreadyImported(state, eventBatch, migrationUUID)
if aerr != nil {
utils.ErrExit("error checking if event batch channel %d (last VSN: %d) already imported: %v", chanNo, eventBatch.GetLastVsn(), aerr)
}
if alreadyImported {
log.Infof("batch on channel %d (last VSN: %d) already imported", chanNo, eventBatch.GetLastVsn())
err = nil
break
}
}
if err != nil {
utils.ErrExit("error executing batch on channel %v: %v", chanNo, err)
}
statsReporter.BatchImported(eventBatch.EventCounts.NumInserts, eventBatch.EventCounts.NumUpdates, eventBatch.EventCounts.NumDeletes)
log.Debugf("processEvents from channel %v: Executed Batch of size - %d successfully in time %s",
chanNo, len(batch), time.Since(start).String())
}
done <- true
}
func initializeConflictDetectionCache(evChans []chan *tgtdb.Event, exporterRole string, sourceDBTypeForConflictCache string) error {
tableToUniqueKeyColumns, err := getTableToUniqueKeyColumnsMapFromMetaDB(exporterRole)
if err != nil {
return fmt.Errorf("get table unique key columns map: %w", err)
}
log.Infof("initializing conflict detection cache")
conflictDetectionCache = NewConflictDetectionCache(tableToUniqueKeyColumns, evChans, sourceDBTypeForConflictCache)
return nil
}
func getTableToUniqueKeyColumnsMapFromMetaDB(exporterRole string) (*utils.StructMap[sqlname.NameTuple, []string], error) {
log.Infof("fetching table to unique key columns map from metaDB")
var metaDbData map[string][]string
res := utils.NewStructMap[sqlname.NameTuple, []string]()
key := fmt.Sprintf("%s_%s", metadb.TABLE_TO_UNIQUE_KEY_COLUMNS_KEY, exporterRole)
found, err := metaDB.GetJsonObject(nil, key, &metaDbData)
if err != nil {
return nil, err
}
if !found {
return nil, fmt.Errorf("table to unique key columns map not found in metaDB")
}
log.Infof("fetched table to unique key columns map: %v", metaDbData)
for tableNameRaw, columns := range metaDbData {
tableName, err := namereg.NameReg.LookupTableName(tableNameRaw)
if err != nil {
return nil, fmt.Errorf("lookup table %s in name registry: %v", tableNameRaw, err)
}
res.Put(tableName, columns)
}
return res, nil
}
func checkifEventBatchAlreadyImported(state *ImportDataState, eventBatch *tgtdb.EventBatch, migrationUUID uuid.UUID) (bool, error) {
var res bool
var err error
sleepIntervalSec := 0
for attempt := 0; attempt < EVENT_BATCH_MAX_RETRY_COUNT; attempt++ {
res, err = state.IsEventBatchAlreadyImported(eventBatch, migrationUUID)
if err == nil {
break
} else if tdb.IsNonRetryableCopyError(err) {
break
}
sleepIntervalSec += 10
if sleepIntervalSec > MAX_SLEEP_SECOND {
sleepIntervalSec = MAX_SLEEP_SECOND
}
log.Infof("sleep for %d seconds before retrying to check if event batch (last vsn: %d) already imported (attempt %d)",
sleepIntervalSec, eventBatch.GetLastVsn(), attempt)
time.Sleep(time.Duration(sleepIntervalSec) * time.Second)
}
return res, err
}