Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support checkpoint for ingest mode #42769

Merged
merged 21 commits into from Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2b185f6
ddl: support checkpoint for ingest mode
tangenta Apr 3, 2023
73ddb5d
ddl: fix data inconsistency issue
tangenta Apr 4, 2023
31a774a
fix test for build and refine code
tangenta Apr 4, 2023
e0f7dbb
make job.RowCount more accurate
tangenta Apr 4, 2023
6a9b52e
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 4, 2023
d85eb4b
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 4, 2023
d3241b7
move checkpoint to ingest package
tangenta Apr 5, 2023
0b5b1ab
add part of test for checkpoint manager
tangenta Apr 6, 2023
a8d490f
add more test for checkpoint manager
tangenta Apr 6, 2023
c14674e
fix linter
tangenta Apr 6, 2023
a378b11
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 6, 2023
1abad96
update bazel
tangenta Apr 7, 2023
b4b454d
add comments
tangenta Apr 10, 2023
9a3ca75
remove redundant file
tangenta Apr 11, 2023
87987c6
refine code
tangenta Apr 11, 2023
92dc8b5
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 11, 2023
b08d9b9
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 12, 2023
a562d83
use checkpoint manager only if distributed exec is disabled
tangenta Apr 12, 2023
f3f2b55
update the checkpoint flush interval to 10 min
tangenta Apr 12, 2023
e535400
add task ID allocator
tangenta Apr 12, 2023
9ef599f
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/backend.go
Expand Up @@ -84,6 +84,9 @@ type EngineConfig struct {
TableInfo *checkpoints.TidbTableInfo
// local backend specified configuration
Local LocalEngineConfig
// KeepSortDir indicates whether to keep the temporary sort directory
// when opening the engine, instead of removing it.
KeepSortDir bool
}

// LocalEngineConfig is the configuration used for local backend in OpenEngine.
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Expand Up @@ -854,8 +854,10 @@ func (local *Backend) OpenEngine(ctx context.Context, cfg *backend.EngineConfig,
}

sstDir := engineSSTDir(local.LocalStoreDir, engineUUID)
if err := os.RemoveAll(sstDir); err != nil {
return errors.Trace(err)
if !cfg.KeepSortDir {
if err := os.RemoveAll(sstDir); err != nil {
return errors.Trace(err)
}
}
if !common.IsDirExists(sstDir) {
if err := os.Mkdir(sstDir, 0o750); err != nil {
Expand Down
20 changes: 14 additions & 6 deletions ddl/backfilling.go
Expand Up @@ -185,6 +185,7 @@ type backfillResult struct {
taskID int
addedCount int
scanCount int
totalCount int
nextKey kv.Key
err error
}
Expand Down Expand Up @@ -598,7 +599,11 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum
scheduler.drainTasks() // Make it quit early.
return result.err
}
*totalAddedCount += int64(result.addedCount)
if result.totalCount > 0 {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
*totalAddedCount = int64(result.totalCount)
} else {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
*totalAddedCount += int64(result.addedCount)
}
if !consumer.distribute {
reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx.setRowCount(*totalAddedCount)
Expand Down Expand Up @@ -635,7 +640,8 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum
return nil
}

func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange) []*reorgBackfillTask {
func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange,
taskIDAlloc *taskIDAllocator) []*reorgBackfillTask {
batchTasks := make([]*reorgBackfillTask, 0, len(kvRanges))
var prefix kv.Key
if reorgInfo.mergingTmpIdx {
Expand Down Expand Up @@ -667,7 +673,7 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange)
}

task := &reorgBackfillTask{
id: i,
id: taskIDAlloc.alloc(),
jobID: reorgInfo.Job.ID,
physicalTable: phyTbl,
priority: reorgInfo.Priority,
Expand All @@ -681,8 +687,9 @@ func getBatchTasks(t table.Table, reorgInfo *reorgInfo, kvRanges []kv.KeyRange)
}

// sendTasks sends tasks to workers, and returns remaining kvRanges that is not handled.
func sendTasks(scheduler backfillScheduler, consumer *resultConsumer, t table.PhysicalTable, kvRanges []kv.KeyRange, reorgInfo *reorgInfo) {
batchTasks := getBatchTasks(t, reorgInfo, kvRanges)
func sendTasks(scheduler backfillScheduler, consumer *resultConsumer,
t table.PhysicalTable, kvRanges []kv.KeyRange, reorgInfo *reorgInfo, taskIDAlloc *taskIDAllocator) {
batchTasks := getBatchTasks(t, reorgInfo, kvRanges, taskIDAlloc)
for _, task := range batchTasks {
if consumer.shouldAbort() {
return
Expand Down Expand Up @@ -802,6 +809,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
return errors.Trace(err)
}

taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err := splitTableRanges(t, reorgInfo.d.store, startKey, endKey, backfillTaskChanSize)
if err != nil {
Expand All @@ -818,7 +826,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(sessPool *sess.Pool, t table.Physical
zap.String("startKey", hex.EncodeToString(startKey)),
zap.String("endKey", hex.EncodeToString(endKey)))

sendTasks(scheduler, consumer, t, kvRanges, reorgInfo)
sendTasks(scheduler, consumer, t, kvRanges, reorgInfo, taskIDAlloc)
if consumer.shouldAbort() {
break
}
Expand Down
91 changes: 67 additions & 24 deletions ddl/backfilling_scheduler.go
Expand Up @@ -79,7 +79,7 @@ func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.P
tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context,
jobCtx *JobContext) (backfillScheduler, error) {
if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return newIngestBackfillScheduler(ctx, info, tbl, false), nil
return newIngestBackfillScheduler(ctx, info, sessPool, tbl, false), nil
}
return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx)
}
Expand Down Expand Up @@ -252,9 +252,11 @@ func (b *txnBackfillScheduler) close(force bool) {
}

type ingestBackfillScheduler struct {
ctx context.Context
reorgInfo *reorgInfo
tbl table.PhysicalTable
ctx context.Context
reorgInfo *reorgInfo
sessPool *sess.Pool
tbl table.PhysicalTable
distribute bool

closed bool

Expand All @@ -263,17 +265,19 @@ type ingestBackfillScheduler struct {

copReqSenderPool *copReqSenderPool

writerPool *workerpool.WorkerPool[idxRecResult]
writerMaxID int
poolErr chan error
backendCtx *ingest.BackendContext
distribute bool
writerPool *workerpool.WorkerPool[idxRecResult]
writerMaxID int
poolErr chan error
backendCtx *ingest.BackendContext
checkpointMgr *ingest.CheckpointManager
}

func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable, distribute bool) *ingestBackfillScheduler {
func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo,
sessPool *sess.Pool, tbl table.PhysicalTable, distribute bool) *ingestBackfillScheduler {
return &ingestBackfillScheduler{
ctx: ctx,
reorgInfo: info,
sessPool: sessPool,
tbl: tbl,
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
Expand All @@ -282,6 +286,8 @@ func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.
}
}

const checkpointUpdateInterval = 10 * time.Minute

func (b *ingestBackfillScheduler) setupWorkers() error {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
Expand All @@ -290,6 +296,14 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
return errors.Trace(errors.New("cannot get lightning backend"))
}
b.backendCtx = bc
if !b.distribute {
mgr, err := ingest.NewCheckpointManager(b.ctx, bc, b.sessPool, job.ID,
b.reorgInfo.currElement.ID, checkpointUpdateInterval)
if err != nil {
return errors.Trace(err)
}
b.checkpointMgr = mgr
}
copReqSenderPool, err := b.createCopReqSenderPool()
if err != nil {
return errors.Trace(err)
Expand All @@ -313,8 +327,21 @@ func (b *ingestBackfillScheduler) close(force bool) {
return
}
close(b.taskCh)
b.copReqSenderPool.close(force)
b.writerPool.ReleaseAndWait()
if b.copReqSenderPool != nil {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
b.copReqSenderPool.close(force)
}
if b.writerPool != nil {
b.writerPool.ReleaseAndWait()
}
if b.checkpointMgr != nil {
b.checkpointMgr.Close()
// Get the latest status after all workers are closed so that the result is more accurate.
cnt, nextKey := b.checkpointMgr.Status()
b.resultCh <- &backfillResult{
totalCount: cnt,
nextKey: nextKey,
}
}
close(b.resultCh)
if !force {
jobID := b.reorgInfo.ID
Expand Down Expand Up @@ -376,7 +403,9 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult]
zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", b.reorgInfo.currElement.ID))
return nil
}
worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID, reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx, b.distribute)
worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID,
reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID,
b.copReqSenderPool, sessCtx, b.checkpointMgr, b.distribute)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
Expand Down Expand Up @@ -408,7 +437,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return nil, err
}
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh), nil
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.checkpointMgr), nil
}

func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
Expand Down Expand Up @@ -450,18 +479,19 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
}
if count == 0 {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", zap.Int("id", rs.id))
if bc, ok := ingest.LitBackCtxMgr.Load(w.jobID); ok {
err := bc.Flush(w.index.Meta().ID)
if err != nil {
result.err = err
w.resultCh <- result
}
}
return
}
result.scanCount = count
result.addedCount = count
result.nextKey = nextKey
if w.checkpointMgr != nil {
cnt, nextKey := w.checkpointMgr.Status()
result.totalCount = cnt
result.nextKey = nextKey
result.err = w.checkpointMgr.UpdateCurrent(rs.id, count)
count = cnt
} else {
result.addedCount = count
result.scanCount = count
result.nextKey = nextKey
}
w.metricCounter.Add(float64(count))
if ResultCounterForTest != nil && result.err == nil {
ResultCounterForTest.Add(1)
Expand All @@ -470,3 +500,16 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
}

func (w *addIndexIngestWorker) Close() {}

type taskIDAllocator struct {
id int
}

func newTaskIDAllocator() *taskIDAllocator {
return &taskIDAllocator{}
}

func (a *taskIDAllocator) alloc() int {
a.id++
return a.id
}
2 changes: 1 addition & 1 deletion ddl/dist_backfilling.go
Expand Up @@ -270,7 +270,7 @@ func (bwm *backfilWorkerManager) waitFinalResult(resultCh <-chan *backfillResult
}

if ingestBackendCtx != nil && i%workerCnt == 0 {
err := ingestBackendCtx.Flush(eleID)
_, err := ingestBackendCtx.Flush(eleID)
if err != nil {
bwm.unsyncErr = err
return
Expand Down
3 changes: 2 additions & 1 deletion ddl/dist_owner.go
Expand Up @@ -326,12 +326,13 @@ func (dc *ddlCtx) splitTableToBackfillJobs(se *sess.Session, reorgInfo *reorgInf
batchSize := sJobCtx.batchSize
startKey, endKey := kv.Key(pTblMeta.StartKey), kv.Key(pTblMeta.EndKey)
bJobs := make([]*BackfillJob, 0, batchSize)
taskIDAlloc := newTaskIDAllocator()
for {
kvRanges, err := splitTableRanges(pTblMeta.PhyTbl, reorgInfo.d.store, startKey, endKey, batchSize)
if err != nil {
return errors.Trace(err)
}
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges)
batchTasks := getBatchTasks(pTblMeta.PhyTbl, reorgInfo, kvRanges, taskIDAlloc)
if len(batchTasks) == 0 {
break
}
Expand Down
2 changes: 1 addition & 1 deletion ddl/export_test.go
Expand Up @@ -51,7 +51,7 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK
}
taskCh := make(chan *reorgBackfillTask, 5)
resultCh := make(chan idxRecResult, 5)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, nil)
pool.chunkSender = &resultChanForTest{ch: resultCh}
pool.adjustSize(1)
pool.tasksCh <- task
Expand Down
49 changes: 42 additions & 7 deletions ddl/index.go
Expand Up @@ -19,6 +19,8 @@ import (
"context"
"encoding/hex"
"fmt"
"os"
"path/filepath"
"strings"
"sync/atomic"
"time"
Expand Down Expand Up @@ -721,6 +723,7 @@ func pickBackfillType(job *model.Job) model.ReorgType {
if ingest.LitInitialized {
useIngest = canUseIngest()
if useIngest {
cleanupSortPath(job.ID)
job.ReorgMeta.ReorgTp = model.ReorgTypeLitMerge
return model.ReorgTypeLitMerge
}
Expand All @@ -736,6 +739,39 @@ func pickBackfillType(job *model.Job) model.ReorgType {
return model.ReorgTypeTxn
}

// cleanupSortPath is used to clean up the temp data of the previous jobs.
// Because we don't remove all the files after the support of checkpoint,
// there maybe some stale files in the sort path if TiDB is killed during the backfill process.
func cleanupSortPath(currentJobID int64) {
sortPath := ingest.ConfigSortPath()
entries, err := os.ReadDir(sortPath)
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
return
}
for _, entry := range entries {
if !entry.IsDir() {
continue
}
jobID, err := ingest.DecodeBackendTag(entry.Name())
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
continue
}
// For now, there is only one task using ingest at the same time,
// so we can remove all the temp data of the previous jobs.
if jobID < currentJobID {
logutil.BgLogger().Info("[ddl-ingest] remove stale temp index data",
zap.Int64("jobID", jobID), zap.Int64("currentJobID", currentJobID))
err := os.RemoveAll(filepath.Join(sortPath, entry.Name()))
if err != nil {
logutil.BgLogger().Warn("[ddl-ingest] cannot cleanup sort path", zap.Error(err))
return
}
}
}
}

// canUseIngest indicates whether it can use ingest way to backfill index.
func canUseIngest() bool {
// We only allow one task to use ingest at the same time, in order to limit the CPU usage.
Expand Down Expand Up @@ -855,12 +891,6 @@ func runIngestReorgJob(w *worker, d *ddlCtx, t *meta.Meta, job *model.Job,
if ok && bc.Done() {
return true, 0, nil
}
if !ok && job.SnapshotVer != 0 {
// The owner is crashed or changed, we need to restart the backfill.
job.SnapshotVer = 0
job.RowCount = 0
return false, ver, nil
}
bc, err = ingest.LitBackCtxMgr.Register(w.ctx, indexInfo.Unique, job.ID, job.ReorgMeta.SQLMode)
if err != nil {
err = tryFallbackToTxnMerge(job, err)
Expand Down Expand Up @@ -1649,13 +1679,17 @@ type addIndexIngestWorker struct {
index table.Index
writerCtx *ingest.WriterContext
copReqSenderPool *copReqSenderPool
checkpointMgr *ingest.CheckpointManager

resultCh chan *backfillResult
jobID int64
distribute bool
}

func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo, resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int, copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context, distribute bool) (*addIndexIngestWorker, error) {
func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.EngineInfo,
resultCh chan *backfillResult, jobID int64, schemaName string, indexID int64, writerID int,
copReqSenderPool *copReqSenderPool, sessCtx sessionctx.Context,
checkpointMgr *ingest.CheckpointManager, distribute bool) (*addIndexIngestWorker, error) {
indexInfo := model.FindIndexInfoByID(t.Meta().Indices, indexID)
index := tables.NewIndex(t.GetPhysicalID(), t.Meta(), indexInfo)
lwCtx, err := ei.NewWriterCtx(writerID, indexInfo.Unique)
Expand All @@ -1674,6 +1708,7 @@ func newAddIndexIngestWorker(t table.PhysicalTable, d *ddlCtx, ei *ingest.Engine
copReqSenderPool: copReqSenderPool,
resultCh: resultCh,
jobID: jobID,
checkpointMgr: checkpointMgr,
distribute: distribute,
}, nil
}
Expand Down