Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: support checkpoint for ingest mode #42769

Merged
merged 21 commits into from Apr 12, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2b185f6
ddl: support checkpoint for ingest mode
tangenta Apr 3, 2023
73ddb5d
ddl: fix data inconsistency issue
tangenta Apr 4, 2023
31a774a
fix test for build and refine code
tangenta Apr 4, 2023
e0f7dbb
make job.RowCount more accurate
tangenta Apr 4, 2023
6a9b52e
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 4, 2023
d85eb4b
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 4, 2023
d3241b7
move checkpoint to ingest package
tangenta Apr 5, 2023
0b5b1ab
add part of test for checkpoint manager
tangenta Apr 6, 2023
a8d490f
add more test for checkpoint manager
tangenta Apr 6, 2023
c14674e
fix linter
tangenta Apr 6, 2023
a378b11
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 6, 2023
1abad96
update bazel
tangenta Apr 7, 2023
b4b454d
add comments
tangenta Apr 10, 2023
9a3ca75
remove redundant file
tangenta Apr 11, 2023
87987c6
refine code
tangenta Apr 11, 2023
92dc8b5
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 11, 2023
b08d9b9
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 12, 2023
a562d83
use checkpoint manager only if distributed exec is disabled
tangenta Apr 12, 2023
f3f2b55
update the checkpoint flush interval to 10 min
tangenta Apr 12, 2023
e535400
add task ID allocator
tangenta Apr 12, 2023
9ef599f
Merge remote-tracking branch 'upstream/master' into add-index-checkpo…
tangenta Apr 12, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/backend.go
Expand Up @@ -108,6 +108,9 @@ type EngineConfig struct {
TableInfo *checkpoints.TidbTableInfo
// local backend specified configuration
Local LocalEngineConfig
// KeepSortDir indicates whether to keep the temporary sort directory
// when opening the engine, instead of removing it.
KeepSortDir bool
}

// LocalEngineConfig is the configuration used for local backend in OpenEngine.
Expand Down
6 changes: 4 additions & 2 deletions br/pkg/lightning/backend/local/local.go
Expand Up @@ -851,8 +851,10 @@ func (local *Local) OpenEngine(ctx context.Context, cfg *backend.EngineConfig, e
}

sstDir := engineSSTDir(local.LocalStoreDir, engineUUID)
if err := os.RemoveAll(sstDir); err != nil {
return errors.Trace(err)
if !cfg.KeepSortDir {
if err := os.RemoveAll(sstDir); err != nil {
return errors.Trace(err)
}
}
if !common.IsDirExists(sstDir) {
if err := os.Mkdir(sstDir, 0o750); err != nil {
Expand Down
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Expand Up @@ -161,6 +161,7 @@ go_test(
"attributes_sql_test.go",
"backfilling_test.go",
"cancel_test.go",
"checkpoint_test.go",
"cluster_test.go",
"column_change_test.go",
"column_modify_test.go",
Expand Down
7 changes: 6 additions & 1 deletion ddl/backfilling.go
Expand Up @@ -185,6 +185,7 @@ type backfillResult struct {
taskID int
addedCount int
scanCount int
totalCount int
nextKey kv.Key
err error
}
Expand Down Expand Up @@ -596,7 +597,11 @@ func handleOneResult(result *backfillResult, scheduler backfillScheduler, consum
scheduler.drainTasks() // Make it quit early.
return result.err
}
*totalAddedCount += int64(result.addedCount)
if result.totalCount > 0 {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
*totalAddedCount = int64(result.totalCount)
} else {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
*totalAddedCount += int64(result.addedCount)
}
reorgCtx := consumer.dc.getReorgCtx(reorgInfo.Job.ID)
reorgCtx.setRowCount(*totalAddedCount)
keeper.updateNextKey(result.taskID, result.nextKey)
Expand Down
67 changes: 47 additions & 20 deletions ddl/backfilling_scheduler.go
Expand Up @@ -70,16 +70,17 @@ type txnBackfillScheduler struct {
workers []*backfillWorker
wg sync.WaitGroup

taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
closed bool
taskCh chan *reorgBackfillTask
resultCh chan *backfillResult
taskMaxID int
closed bool
}

func newBackfillScheduler(ctx context.Context, info *reorgInfo, sessPool *sess.Pool,
tp backfillerType, tbl table.PhysicalTable, sessCtx sessionctx.Context,
jobCtx *JobContext) (backfillScheduler, error) {
if tp == typeAddIndexWorker && info.ReorgMeta.ReorgTp == model.ReorgTypeLitMerge {
return newIngestBackfillScheduler(ctx, info, tbl), nil
return newIngestBackfillScheduler(ctx, info, sessPool, tbl), nil
}
return newTxnBackfillScheduler(ctx, info, sessPool, tp, tbl, sessCtx, jobCtx)
}
Expand Down Expand Up @@ -110,6 +111,8 @@ func (b *txnBackfillScheduler) setupWorkers() error {
}

func (b *txnBackfillScheduler) sendTask(task *reorgBackfillTask) {
b.taskMaxID++
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why reallocate the task ID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it needs to be unique during the lifetime of the DDL job, instead of a task batch.

task.id = b.taskMaxID
b.taskCh <- task
}

Expand Down Expand Up @@ -254,6 +257,7 @@ func (b *txnBackfillScheduler) close(force bool) {
type ingestBackfillScheduler struct {
ctx context.Context
reorgInfo *reorgInfo
sessPool *sess.Pool
tbl table.PhysicalTable

closed bool
Expand All @@ -262,24 +266,31 @@ type ingestBackfillScheduler struct {
resultCh chan *backfillResult

copReqSenderPool *copReqSenderPool
taskMaxID int

writerPool *workerpool.WorkerPool[idxRecResult]
writerMaxID int
poolErr chan error
backendCtx *ingest.BackendContext

checkpointMgr ingest.CheckpointManager
}

func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo, tbl table.PhysicalTable) *ingestBackfillScheduler {
func newIngestBackfillScheduler(ctx context.Context, info *reorgInfo,
sessPool *sess.Pool, tbl table.PhysicalTable) *ingestBackfillScheduler {
return &ingestBackfillScheduler{
ctx: ctx,
reorgInfo: info,
sessPool: sessPool,
tbl: tbl,
taskCh: make(chan *reorgBackfillTask, backfillTaskChanSize),
resultCh: make(chan *backfillResult, backfillTaskChanSize),
poolErr: make(chan error),
}
}

const checkpointUpdateInterval = 5 * time.Second
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved

func (b *ingestBackfillScheduler) setupWorkers() error {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
Expand All @@ -288,6 +299,12 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
return errors.Trace(errors.New("cannot get lightning backend"))
}
b.backendCtx = bc
mgr, err := ingest.NewCentralizedCheckpointManager(b.ctx, bc, b.sessPool, job.ID,
b.reorgInfo.currElement.ID, checkpointUpdateInterval)
if err != nil {
return errors.Trace(err)
}
b.checkpointMgr = mgr
copReqSenderPool, err := b.createCopReqSenderPool()
if err != nil {
return errors.Trace(err)
Expand All @@ -311,8 +328,21 @@ func (b *ingestBackfillScheduler) close(force bool) {
return
}
close(b.taskCh)
b.copReqSenderPool.close(force)
b.writerPool.ReleaseAndWait()
if b.copReqSenderPool != nil {
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
b.copReqSenderPool.close(force)
}
if b.writerPool != nil {
b.writerPool.ReleaseAndWait()
}
if b.checkpointMgr != nil {
b.checkpointMgr.Close()
}
// Get the latest status after all workers are closed so that the result is more accurate.
cnt, nextKey := b.checkpointMgr.Status()
b.resultCh <- &backfillResult{
totalCount: cnt,
nextKey: nextKey,
}
close(b.resultCh)
if !force {
jobID := b.reorgInfo.ID
Expand All @@ -325,6 +355,8 @@ func (b *ingestBackfillScheduler) close(force bool) {
}

func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) {
b.taskMaxID++
task.id = b.taskMaxID
b.taskCh <- task
}

Expand Down Expand Up @@ -375,7 +407,8 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[idxRecResult]
return nil
}
worker, err := newAddIndexIngestWorker(b.tbl, reorgInfo.d, ei, b.resultCh, job.ID,
reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID, b.copReqSenderPool, sessCtx)
reorgInfo.SchemaName, b.reorgInfo.currElement.ID, b.writerMaxID,
b.copReqSenderPool, sessCtx, b.checkpointMgr)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
Expand Down Expand Up @@ -407,7 +440,7 @@ func (b *ingestBackfillScheduler) createCopReqSenderPool() (*copReqSenderPool, e
logutil.BgLogger().Warn("[ddl-ingest] cannot init cop request sender", zap.Error(err))
return nil, err
}
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh), nil
return newCopReqSenderPool(b.ctx, copCtx, sessCtx.GetStore(), b.taskCh, b.checkpointMgr), nil
}

func (b *ingestBackfillScheduler) expectedWorkerSize() (readerSize int, writerSize int) {
Expand Down Expand Up @@ -439,30 +472,24 @@ func (w *addIndexIngestWorker) HandleTask(rs idxRecResult) {
w.resultCh <- result
return
}
count, nextKey, err := w.WriteLocal(&rs)
count, err := w.WriteLocal(&rs)
if err != nil {
result.err = err
w.resultCh <- result
return
}
if count == 0 {
logutil.BgLogger().Info("[ddl-ingest] finish a cop-request task", zap.Int("id", rs.id))
if bc, ok := ingest.LitBackCtxMgr.Load(w.jobID); ok {
err := bc.Flush(w.index.Meta().ID)
if err != nil {
result.err = err
w.resultCh <- result
}
}
return
}
result.scanCount = count
result.addedCount = count
cnt, nextKey := w.checkpointMgr.Status()
result.totalCount = cnt
Benjamin2037 marked this conversation as resolved.
Show resolved Hide resolved
result.nextKey = nextKey
w.metricCounter.Add(float64(count))
w.metricCounter.Add(float64(cnt))
if ResultCounterForTest != nil && result.err == nil {
ResultCounterForTest.Add(1)
}
result.err = w.checkpointMgr.UpdateCurrent(rs.id, count)
w.resultCh <- result
}

Expand Down
21 changes: 21 additions & 0 deletions ddl/checkpoint_test.go
@@ -0,0 +1,21 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ddl

import "testing"

func TestCheckpointManager(t *testing.T) {

}
2 changes: 1 addition & 1 deletion ddl/dist_backfilling.go
Expand Up @@ -270,7 +270,7 @@ func (bwm *backfilWorkerManager) waitFinalResult(resultCh <-chan *backfillResult
}

if ingestBackendCtx != nil && i%workerCnt == 0 {
err := ingestBackendCtx.Flush(eleID)
_, err := ingestBackendCtx.Flush(eleID)
if err != nil {
bwm.unsyncErr = err
return
Expand Down
22 changes: 21 additions & 1 deletion ddl/export_test.go
Expand Up @@ -51,7 +51,7 @@ func FetchChunk4Test(copCtx *copContext, tbl table.PhysicalTable, startKey, endK
}
taskCh := make(chan *reorgBackfillTask, 5)
resultCh := make(chan idxRecResult, 5)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh)
pool := newCopReqSenderPool(context.Background(), copCtx, store, taskCh, &dummyCheckpointMgr{})
pool.chunkSender = &resultChanForTest{ch: resultCh}
pool.adjustSize(1)
pool.tasksCh <- task
Expand All @@ -67,3 +67,23 @@ func ConvertRowToHandleAndIndexDatum(row chunk.Row, copCtx *copContext) (kv.Hand
handle, err := buildHandle(handleData, copCtx.tblInfo, copCtx.pkInfo, &stmtctx.StatementContext{TimeZone: time.Local})
return handle, idxData, err
}

type dummyCheckpointMgr struct{}

func (d *dummyCheckpointMgr) Status() (_ int, _ kv.Key) {
return 0, nil
}

func (d *dummyCheckpointMgr) IsComplete(_ int, _, _ kv.Key) bool {
return false
}

func (d *dummyCheckpointMgr) Register(_ int, _, _ kv.Key) {}

func (d *dummyCheckpointMgr) UpdateTotal(_ int, _ int, _ bool) {}

func (d *dummyCheckpointMgr) UpdateCurrent(_ int, _ int) error {
return nil
}

func (d *dummyCheckpointMgr) Close() {}