Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -803,10 +803,6 @@ func (w *indexIngestWorker) HandleTask(ck IndexRecordChunk, send func(IndexWrite
return err
}
scannedCount := ck.tableScanRowCount
if scannedCount == 0 {
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", ck.ID))
return nil
}
if w.totalCount != nil {
w.totalCount.Add(scannedCount)
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ type CheckpointManager struct {

// taskCheckpoint is the checkpoint for a single task.
type taskCheckpoint struct {
totalKeys int
writtenKeys int
endKey kv.Key
lastBatchRead bool
totalKeys int
writtenKeys int
endKey kv.Key
lastBatchRead bool
chunksTotal int
chunksFinished int
}

// newCheckpointManagerWithStorage is the common constructor
Expand Down Expand Up @@ -348,14 +350,24 @@ func (s *CheckpointManager) UpdateChunk(taskID int, delta int, last bool) {
cp := s.checkpoints[taskID]
cp.totalKeys += delta
cp.lastBatchRead = last
cp.chunksTotal++
}

// FinishChunk updates the written keys of the task.
// This is called by the writer after writing the local engine to update the current number of rows written.
func (s *CheckpointManager) FinishChunk(taskID int, delta int) {
s.mu.Lock()
cp := s.checkpoints[taskID]
cp, ok := s.checkpoints[taskID]
if !ok {
s.mu.Unlock()
s.logger.Warn("finish chunk for unknown task", zap.Int("taskID", taskID))
return
}
cp.writtenKeys += delta
cp.chunksFinished++
if cp.chunksFinished == cp.chunksTotal {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may not be correct after we support OnDuplicateKeyRemove

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OnDuplicateKeyRemove should not affect chunk count, so this is not a problem

s.logger.Info("finish a index ingest task", zap.Int("id", taskID), zap.Int("totalKeys", cp.totalKeys), zap.Int("writtenKeys", cp.writtenKeys))
}
s.mu.Unlock()
}

Expand Down Expand Up @@ -396,7 +408,7 @@ func (s *CheckpointManager) afterFlush() {
defer s.mu.Unlock()
for {
cp := s.checkpoints[s.minTaskIDFinished]
if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys {
if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys || cp.chunksFinished < cp.chunksTotal {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IF !(cp.writtenKeys < cp.totalKeys), then chunksFinished must equal to chunksTotal?
So it may always be false here?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this case handle the situation when cp.writtenKeys = cp.totalKeys, but cp.chunksFinished < cp.chunksTotal, which will be fixed in #64655

break
}
delete(s.checkpoints, s.minTaskIDFinished)
Expand Down
18 changes: 18 additions & 0 deletions pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,24 @@ func TestCheckpointManager(t *testing.T) {
require.NoError(t, mgr.AdvanceWatermark(true))
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'}))

// Finish the remaining chunk so we can continue to new tasks.
mgr.FinishChunk(2, 100)
require.NoError(t, mgr.AdvanceWatermark(true))
require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'3', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'4', '9'}))

// Chunk completion counter: progress only after all chunks finish.
mgr.AddChunk(5, []byte{'5', '9'})
// mock wrong row count
mgr.UpdateChunk(5, 100, false)
mgr.UpdateChunk(5, 0, true)
mgr.FinishChunk(5, 100)
require.NoError(t, mgr.AdvanceWatermark(true))
mgr.FinishChunk(5, 0)
require.NoError(t, mgr.AdvanceWatermark(true))
require.True(t, mgr.IsKeyProcessed([]byte{'5', '9'}))
}

func TestCheckpointManagerUpdateReorg(t *testing.T) {
Expand Down
4 changes: 0 additions & 4 deletions tests/realtikvtest/addindextest4/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,10 +451,6 @@ func TestAddIndexResumesFromCheckpointAfterPartialImport(t *testing.T) {
}
ingest.ForceSyncFlagForTest.Store(true)

tk.Session().Close()
tk = testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("drop table if exists t")
tk.MustExec("create table t (a bigint primary key, b bigint)")
for i := 0; i < 2000; i++ {
Expand Down