Skip to content

Commit

Permalink
ddl: change interface of lightning package wrappers (#53233)
Browse files Browse the repository at this point in the history
close #53165
  • Loading branch information
lance6716 committed May 16, 2024
1 parent 6612270 commit cc127c1
Show file tree
Hide file tree
Showing 16 changed files with 301 additions and 282 deletions.
6 changes: 6 additions & 0 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,12 @@ func (dc *ddlCtx) writePhysicalTableRecord(
return errors.Trace(err)
}
defer scheduler.close(true)
if lit, ok := scheduler.(*ingestBackfillScheduler); ok {
if lit.finishedWritingNeedImport() {
return nil
}
}

err = scheduler.setupWorkers()
if err != nil {
return errors.Trace(err)
Expand Down
44 changes: 31 additions & 13 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(Index
}()
w.indexIngestBaseWorker.HandleTask(rs, send)
// needs to flush and import to avoid too much use of disk.
_, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs)
_, _, _, err := w.backendCtx.Flush(ingest.FlushModeAuto)
if err != nil {
w.ctx.onError(err)
return
Expand Down Expand Up @@ -726,8 +726,14 @@ func (w *indexIngestBaseWorker) initSessCtx() {
}

func (w *indexIngestBaseWorker) Close() {
// TODO(lance6716): unify the real write action for engineInfo and external
// writer.
for _, writer := range w.writers {
err := writer.Close(w.ctx)
ew, ok := writer.(*external.Writer)
if !ok {
break
}
err := ew.Close(w.ctx)
if err != nil {
w.ctx.onError(err)
}
Expand Down Expand Up @@ -827,24 +833,36 @@ func (s *indexWriteResultSink) flush() error {
failpoint.Inject("mockFlushError", func(_ failpoint.Value) {
failpoint.Return(errors.New("mock flush error"))
})
for _, index := range s.indexes {
idxInfo := index.Meta()
_, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta())
return err
_, _, errIdxID, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport)
if err != nil {
if common.ErrFoundDuplicateKeys.Equal(err) {
var idxInfo table.Index
for _, idx := range s.indexes {
if idx.Meta().ID == errIdxID {
idxInfo = idx
break
}
}
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
if idxInfo == nil {
logutil.Logger(s.ctx).Error("index not found", zap.Int64("indexID", errIdxID))
return kv.ErrKeyExists
}
return convertToKeyExistsErr(err, idxInfo.Meta(), s.tbl.Meta())
}
logutil.Logger(s.ctx).Error("flush error",
zap.String("category", "ddl"), zap.Error(err))
return err
}
return nil
}

func (s *indexWriteResultSink) Close() error {
return s.errGroup.Wait()
err := s.errGroup.Wait()
// for local pipeline
if bc := s.backendCtx; bc != nil {
bc.UnregisterEngines()
}
return err
}

func (*indexWriteResultSink) String() string {
Expand Down
25 changes: 11 additions & 14 deletions pkg/ddl/backfilling_read_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
if opCtx.OperatorErr() != nil {
return opCtx.OperatorErr()
}
if err != nil {
return err
}

r.bc.ResetWorkers(r.job.ID)
return nil
return err
}

func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary {
Expand Down Expand Up @@ -226,15 +221,17 @@ func (r *readIndexExecutor) buildLocalStorePipeline(
return nil, err
}
d := r.d
engines := make([]ingest.Engine, 0, len(r.indexes))
indexIDs := make([]int64, 0, len(r.indexes))
for _, index := range r.indexes {
ei, err := r.bc.Register(r.job.ID, index.ID, r.job.SchemaName, r.job.TableName)
if err != nil {
tidblogutil.Logger(opCtx).Warn("cannot register new engine", zap.Error(err),
zap.Int64("job ID", r.job.ID), zap.Int64("index ID", index.ID))
return nil, err
}
engines = append(engines, ei)
indexIDs = append(indexIDs, index.ID)
}
engines, err := r.bc.Register(indexIDs, r.job.TableName)
if err != nil {
tidblogutil.Logger(opCtx).Error("cannot register new engine",
zap.Error(err),
zap.Int64("job ID", r.job.ID),
zap.Int64s("index IDs", indexIDs))
return nil, err
}
counter := metrics.BackfillTotalCounter.WithLabelValues(
metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O))
Expand Down
58 changes: 32 additions & 26 deletions pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,15 @@ func newIngestBackfillScheduler(
}, nil
}

func (b *ingestBackfillScheduler) finishedWritingNeedImport() bool {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
if !ok {
return false
}
return bc.FinishedWritingNeedImport()
}

func (b *ingestBackfillScheduler) setupWorkers() error {
job := b.reorgInfo.Job
bc, ok := ingest.LitBackCtxMgr.Load(job.ID)
Expand All @@ -371,10 +380,26 @@ func (b *ingestBackfillScheduler) setupWorkers() error {
if err != nil {
return errors.Trace(err)
}

indexIDs := make([]int64, 0, len(b.reorgInfo.elements))
for _, e := range b.reorgInfo.elements {
indexIDs = append(indexIDs, e.ID)
}
engines, err := b.backendCtx.Register(indexIDs, job.TableName)
if err != nil {
return errors.Trace(err)
}

b.copReqSenderPool = copReqSenderPool
readerCnt, writerCnt := b.expectedWorkerSize()
writerPool := workerpool.NewWorkerPool[IndexRecordChunk]("ingest_writer",
poolutil.DDL, writerCnt, b.createWorker)
writerPool := workerpool.NewWorkerPool[IndexRecordChunk](
"ingest_writer",
poolutil.DDL,
writerCnt,
func() workerpool.Worker[IndexRecordChunk, workerpool.None] {
return b.createWorker(indexIDs, engines)
},
)
writerPool.Start(b.ctx)
b.writerPool = writerPool
b.copReqSenderPool.chunkSender = writerPool
Expand Down Expand Up @@ -406,13 +431,9 @@ func (b *ingestBackfillScheduler) close(force bool) {
})
}
close(b.resultCh)
if intest.InTest && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() {
if intest.InTest && b.copReqSenderPool != nil && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() {
panic(fmt.Sprintf("unexpected chunk size %d", len(b.copReqSenderPool.srcChkPool)))
}
if !force {
jobID := b.reorgInfo.ID
b.backendCtx.ResetWorkers(jobID)
}
}

func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) error {
Expand Down Expand Up @@ -446,32 +467,17 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error {
return nil
}

func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] {
func (b *ingestBackfillScheduler) createWorker(
indexIDs []int64,
engines []ingest.Engine,
) workerpool.Worker[IndexRecordChunk, workerpool.None] {
reorgInfo := b.reorgInfo
job := reorgInfo.Job
sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName)
if err != nil {
b.sendResult(&backfillResult{err: err})
return nil
}
bcCtx := b.backendCtx
indexIDs := make([]int64, 0, len(reorgInfo.elements))
engines := make([]ingest.Engine, 0, len(reorgInfo.elements))
for _, elem := range reorgInfo.elements {
ei, err := bcCtx.Register(job.ID, elem.ID, job.SchemaName, job.TableName)
if err != nil {
// Return an error only if it is the first worker.
if b.writerMaxID == 0 {
b.sendResult(&backfillResult{err: err})
return nil
}
logutil.Logger(b.ctx).Warn("cannot create new writer", zap.Error(err),
zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", elem.ID))
return nil
}
indexIDs = append(indexIDs, elem.ID)
engines = append(engines, ei)
}

worker, err := newAddIndexIngestWorker(
b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID,
Expand Down
1 change: 0 additions & 1 deletion pkg/ddl/ingest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ go_library(
"engine.go",
"engine_mgr.go",
"env.go",
"flush.go",
"mem_root.go",
"message.go",
"mock.go",
Expand Down

0 comments on commit cc127c1

Please sign in to comment.