New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ddl: support checkpoint for ingest mode #42769
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Skipping CI for Draft Pull Request. |
/test all |
/retest |
ddl/ingest/checkpoint.go
Outdated
) | ||
|
||
// CheckpointManager is an interface to manage checkpoints. | ||
type CheckpointManager interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove this interface since it's not used in the distribution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add a no-op implementation for distributed reorg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LTGM
ddl/backfilling_scheduler.go
Outdated
@@ -110,6 +111,8 @@ func (b *txnBackfillScheduler) setupWorkers() error { | |||
} | |||
|
|||
func (b *txnBackfillScheduler) sendTask(task *reorgBackfillTask) { | |||
b.taskMaxID++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why reallocate the task ID?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it needs to be unique during the lifetime of the DDL job, instead of a task batch.
ddl/backfilling_scheduler.go
Outdated
@@ -288,6 +299,12 @@ func (b *ingestBackfillScheduler) setupWorkers() error { | |||
return errors.Trace(errors.New("cannot get lightning backend")) | |||
} | |||
b.backendCtx = bc | |||
mgr, err := ingest.NewCheckpointManager(b.ctx, bc, b.sessPool, job.ID, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It shouldn't set the manager in distribute case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me resolve the conflict after #42753 is merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated.
/merge |
This pull request has been accepted and is ready to merge. Commit hash: e535400
|
/retest |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 9ef599f
|
/retest |
@tangenta: The following tests failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #42164
Background:
For creating index, most of the time are spent on reading table, writing index and importing. After code refactoring #42472 and #42668, the procedure is as follows:
flush
will be triggered to write the index KVs from the memory buffer to the disk.unsafe import
will be triggered to import the index KV of the disk to the TiKV storage.If we treat the overall process as a progress bar, the start point is the start key of the table, and the end point is the end key of the table. There are two keys that can represent the current progress:
unsafe import
.flush
.As long as the Global/Local Checkpoint is persisted, before reader starts to read, we can compare the end key and checkpoint of the task to determine whether the task can be skipped. Therefore, we need a component to manage checkpoints, including the addition, deletion and modification of checkpoints, called Checkpoint Manager.
What is changed and how it works?
According to the above reading and writing process, we can abstract the interface for Checkpoint Manager:
IsComplete()
is called before the reader reads the data and decides whether to skip the current task.UpdateTotal()
is called by the reader after reading the data to update the number of rows contained in the current chunk.UpdateCurrent()
is called by the writer after writing the local engine to update the current number of rows written.The checkpoint manager spawns a background goroutine, which is used to update the checkpoint info to the system table
mysql.tidb_ddl_reorg
periodically.Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.