diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index d18506d4aee85..2968c5efe0fcb 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -612,6 +612,12 @@ func (dc *ddlCtx) writePhysicalTableRecord( return errors.Trace(err) } defer scheduler.close(true) + if lit, ok := scheduler.(*ingestBackfillScheduler); ok { + if lit.finishedWritingNeedImport() { + return nil + } + } + err = scheduler.setupWorkers() if err != nil { return errors.Trace(err) diff --git a/pkg/ddl/backfilling_operators.go b/pkg/ddl/backfilling_operators.go index 75f7b9610ed2a..1b29c1c4ae336 100644 --- a/pkg/ddl/backfilling_operators.go +++ b/pkg/ddl/backfilling_operators.go @@ -656,7 +656,7 @@ func (w *indexIngestLocalWorker) HandleTask(rs IndexRecordChunk, send func(Index }() w.indexIngestBaseWorker.HandleTask(rs, send) // needs to flush and import to avoid too much use of disk. - _, _, _, err := ingest.TryFlushAllIndexes(w.backendCtx, ingest.FlushModeAuto, w.indexIDs) + _, _, _, err := w.backendCtx.Flush(ingest.FlushModeAuto) if err != nil { w.ctx.onError(err) return @@ -726,8 +726,14 @@ func (w *indexIngestBaseWorker) initSessCtx() { } func (w *indexIngestBaseWorker) Close() { + // TODO(lance6716): unify the real write action for engineInfo and external + // writer. for _, writer := range w.writers { - err := writer.Close(w.ctx) + ew, ok := writer.(*external.Writer) + if !ok { + break + } + err := ew.Close(w.ctx) if err != nil { w.ctx.onError(err) } @@ -827,24 +833,36 @@ func (s *indexWriteResultSink) flush() error { failpoint.Inject("mockFlushError", func(_ failpoint.Value) { failpoint.Return(errors.New("mock flush error")) }) - for _, index := range s.indexes { - idxInfo := index.Meta() - _, _, err := s.backendCtx.Flush(idxInfo.ID, ingest.FlushModeForceFlushAndImport) - if err != nil { - if common.ErrFoundDuplicateKeys.Equal(err) { - err = convertToKeyExistsErr(err, idxInfo, s.tbl.Meta()) - return err + _, _, errIdxID, err := s.backendCtx.Flush(ingest.FlushModeForceFlushAndImport) + if err != nil { + if common.ErrFoundDuplicateKeys.Equal(err) { + var idxInfo table.Index + for _, idx := range s.indexes { + if idx.Meta().ID == errIdxID { + idxInfo = idx + break + } } - logutil.Logger(s.ctx).Error("flush error", - zap.String("category", "ddl"), zap.Error(err)) - return err + if idxInfo == nil { + logutil.Logger(s.ctx).Error("index not found", zap.Int64("indexID", errIdxID)) + return kv.ErrKeyExists + } + return convertToKeyExistsErr(err, idxInfo.Meta(), s.tbl.Meta()) } + logutil.Logger(s.ctx).Error("flush error", + zap.String("category", "ddl"), zap.Error(err)) + return err } return nil } func (s *indexWriteResultSink) Close() error { - return s.errGroup.Wait() + err := s.errGroup.Wait() + // for local pipeline + if bc := s.backendCtx; bc != nil { + bc.UnregisterEngines() + } + return err } func (*indexWriteResultSink) String() string { diff --git a/pkg/ddl/backfilling_read_index.go b/pkg/ddl/backfilling_read_index.go index fddfd946c229d..aee305cfaab8f 100644 --- a/pkg/ddl/backfilling_read_index.go +++ b/pkg/ddl/backfilling_read_index.go @@ -133,12 +133,7 @@ func (r *readIndexExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta if opCtx.OperatorErr() != nil { return opCtx.OperatorErr() } - if err != nil { - return err - } - - r.bc.ResetWorkers(r.job.ID) - return nil + return err } func (r *readIndexExecutor) RealtimeSummary() *execute.SubtaskSummary { @@ -226,15 +221,17 @@ func (r *readIndexExecutor) buildLocalStorePipeline( return nil, err } d := r.d - engines := make([]ingest.Engine, 0, len(r.indexes)) + indexIDs := make([]int64, 0, len(r.indexes)) for _, index := range r.indexes { - ei, err := r.bc.Register(r.job.ID, index.ID, r.job.SchemaName, r.job.TableName) - if err != nil { - tidblogutil.Logger(opCtx).Warn("cannot register new engine", zap.Error(err), - zap.Int64("job ID", r.job.ID), zap.Int64("index ID", index.ID)) - return nil, err - } - engines = append(engines, ei) + indexIDs = append(indexIDs, index.ID) + } + engines, err := r.bc.Register(indexIDs, r.job.TableName) + if err != nil { + tidblogutil.Logger(opCtx).Error("cannot register new engine", + zap.Error(err), + zap.Int64("job ID", r.job.ID), + zap.Int64s("index IDs", indexIDs)) + return nil, err } counter := metrics.BackfillTotalCounter.WithLabelValues( metrics.GenerateReorgLabel("add_idx_rate", r.job.SchemaName, tbl.Meta().Name.O)) diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index 05b522ee6630e..9b173170234e3 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -354,6 +354,15 @@ func newIngestBackfillScheduler( }, nil } +func (b *ingestBackfillScheduler) finishedWritingNeedImport() bool { + job := b.reorgInfo.Job + bc, ok := ingest.LitBackCtxMgr.Load(job.ID) + if !ok { + return false + } + return bc.FinishedWritingNeedImport() +} + func (b *ingestBackfillScheduler) setupWorkers() error { job := b.reorgInfo.Job bc, ok := ingest.LitBackCtxMgr.Load(job.ID) @@ -371,10 +380,26 @@ func (b *ingestBackfillScheduler) setupWorkers() error { if err != nil { return errors.Trace(err) } + + indexIDs := make([]int64, 0, len(b.reorgInfo.elements)) + for _, e := range b.reorgInfo.elements { + indexIDs = append(indexIDs, e.ID) + } + engines, err := b.backendCtx.Register(indexIDs, job.TableName) + if err != nil { + return errors.Trace(err) + } + b.copReqSenderPool = copReqSenderPool readerCnt, writerCnt := b.expectedWorkerSize() - writerPool := workerpool.NewWorkerPool[IndexRecordChunk]("ingest_writer", - poolutil.DDL, writerCnt, b.createWorker) + writerPool := workerpool.NewWorkerPool[IndexRecordChunk]( + "ingest_writer", + poolutil.DDL, + writerCnt, + func() workerpool.Worker[IndexRecordChunk, workerpool.None] { + return b.createWorker(indexIDs, engines) + }, + ) writerPool.Start(b.ctx) b.writerPool = writerPool b.copReqSenderPool.chunkSender = writerPool @@ -406,13 +431,9 @@ func (b *ingestBackfillScheduler) close(force bool) { }) } close(b.resultCh) - if intest.InTest && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() { + if intest.InTest && b.copReqSenderPool != nil && len(b.copReqSenderPool.srcChkPool) != copReadChunkPoolSize() { panic(fmt.Sprintf("unexpected chunk size %d", len(b.copReqSenderPool.srcChkPool))) } - if !force { - jobID := b.reorgInfo.ID - b.backendCtx.ResetWorkers(jobID) - } } func (b *ingestBackfillScheduler) sendTask(task *reorgBackfillTask) error { @@ -446,7 +467,10 @@ func (b *ingestBackfillScheduler) adjustWorkerSize() error { return nil } -func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordChunk, workerpool.None] { +func (b *ingestBackfillScheduler) createWorker( + indexIDs []int64, + engines []ingest.Engine, +) workerpool.Worker[IndexRecordChunk, workerpool.None] { reorgInfo := b.reorgInfo job := reorgInfo.Job sessCtx, err := newSessCtx(reorgInfo.d.store, reorgInfo.ReorgMeta.SQLMode, reorgInfo.ReorgMeta.Location, reorgInfo.ReorgMeta.ResourceGroupName) @@ -454,24 +478,6 @@ func (b *ingestBackfillScheduler) createWorker() workerpool.Worker[IndexRecordCh b.sendResult(&backfillResult{err: err}) return nil } - bcCtx := b.backendCtx - indexIDs := make([]int64, 0, len(reorgInfo.elements)) - engines := make([]ingest.Engine, 0, len(reorgInfo.elements)) - for _, elem := range reorgInfo.elements { - ei, err := bcCtx.Register(job.ID, elem.ID, job.SchemaName, job.TableName) - if err != nil { - // Return an error only if it is the first worker. - if b.writerMaxID == 0 { - b.sendResult(&backfillResult{err: err}) - return nil - } - logutil.Logger(b.ctx).Warn("cannot create new writer", zap.Error(err), - zap.Int64("job ID", reorgInfo.ID), zap.Int64("index ID", elem.ID)) - return nil - } - indexIDs = append(indexIDs, elem.ID) - engines = append(engines, ei) - } worker, err := newAddIndexIngestWorker( b.ctx, b.tbl, reorgInfo.d, engines, b.resultCh, job.ID, diff --git a/pkg/ddl/ingest/BUILD.bazel b/pkg/ddl/ingest/BUILD.bazel index 635c16c217fd5..5f772cfb5c1ae 100644 --- a/pkg/ddl/ingest/BUILD.bazel +++ b/pkg/ddl/ingest/BUILD.bazel @@ -11,7 +11,6 @@ go_library( "engine.go", "engine_mgr.go", "env.go", - "flush.go", "mem_root.go", "message.go", "mock.go", diff --git a/pkg/ddl/ingest/backend.go b/pkg/ddl/ingest/backend.go index 1d32df1054175..de9c11f504ff4 100644 --- a/pkg/ddl/ingest/backend.go +++ b/pkg/ddl/ingest/backend.go @@ -17,6 +17,7 @@ package ingest import ( "context" "fmt" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -33,7 +34,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util/dbterror" - "github.com/pingcap/tidb/pkg/util/generic" "github.com/pingcap/tidb/pkg/util/logutil" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" @@ -41,15 +41,29 @@ import ( "go.uber.org/zap" ) -// BackendCtx is the backend context for add index reorg task. +// BackendCtx is the backend context for one add index reorg task. type BackendCtx interface { - Register(jobID, indexID int64, schemaName, tableName string) (Engine, error) - Unregister(jobID, indexID int64) + // Register create a new engineInfo for each index ID and register it to the + // backend context. If the index ID is already registered, it will return the + // associated engines. Only one group of index ID is allowed to register for a + // BackendCtx. + Register(indexIDs []int64, tableName string) ([]Engine, error) + UnregisterEngines() + // FinishImport imports the engine of given index ID into the storage, collects + // the duplicate errors if the `unique` is true. Caller should make sure the + // first call of FinishImport means no further data will be wrote to all engines. + // + // TODO(lance6716): refine the interface to let caller don't need to pass the + // indexID, and unify with CollectRemoteDuplicateRows. + FinishImport(indexID int64, unique bool, tbl table.Table) error + // FinishedWritingNeedImport returns true only when all the engines are finished + // writing and only need import. Considering the calling usage of FinishImport, + // it will return true after a successful call of FinishImport and may return + // true after a failed call of FinishImport. + FinishedWritingNeedImport() bool CollectRemoteDuplicateRows(indexID int64, tbl table.Table) error - FinishImport(indexID int64, unique bool, tbl table.Table) error - ResetWorkers(jobID int64) - Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) + FlushController Done() bool SetDone() @@ -73,19 +87,19 @@ const ( FlushModeForceFlushAndImport ) -// litBackendCtx store a backend info for add index reorg task. +// litBackendCtx implements BackendCtx. type litBackendCtx struct { - generic.SyncMap[int64, *engineInfo] - MemRoot MemRoot - DiskRoot DiskRoot + engines map[int64]*engineInfo + memRoot MemRoot + diskRoot DiskRoot jobID int64 backend *local.Backend ctx context.Context cfg *lightning.Config sysVars map[string]string - diskRoot DiskRoot done bool + flushing atomic.Bool timeOfLastFlush atomicutil.Time updateInterval time.Duration checkpointMgr *CheckpointManager @@ -135,7 +149,7 @@ func (bc *litBackendCtx) CollectRemoteDuplicateRows(indexID int64, tbl table.Tab // FinishImport imports all the key-values in engine into the storage, collects the duplicate errors if any, and // removes the engine from the backend context. func (bc *litBackendCtx) FinishImport(indexID int64, unique bool, tbl table.Table) error { - ei, exist := bc.Load(indexID) + ei, exist := bc.engines[indexID] if !exist { return dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found") } @@ -175,42 +189,39 @@ func acquireLock(ctx context.Context, se *concurrency.Session, key string) (*con return mu, nil } -// Flush checks the disk quota and imports the current key-values in engine to the storage. -func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) { - ei, exist := bc.Load(indexID) - if !exist { - logutil.Logger(bc.ctx).Error(LitErrGetEngineFail, zap.Int64("index ID", indexID)) - return false, false, dbterror.ErrIngestFailed.FastGenByArgs("ingest engine not found") - } - +// Flush implements FlushController. +func (bc *litBackendCtx) Flush(mode FlushMode) (flushed, imported bool, errIdxID int64, err error) { shouldFlush, shouldImport := bc.checkFlush(mode) if !shouldFlush { - return false, false, nil + return false, false, 0, nil } - if !ei.flushing.CompareAndSwap(false, true) { - return false, false, nil + if !bc.flushing.CompareAndSwap(false, true) { + return false, false, 0, nil } - defer ei.flushing.Store(false) - ei.flushLock.Lock() - defer ei.flushLock.Unlock() + defer bc.flushing.Store(false) - err = ei.Flush() - if err != nil { - return false, false, err + for indexID, ei := range bc.engines { + ei.flushLock.Lock() + //nolint: all_revive,revive + defer ei.flushLock.Unlock() + + if err = ei.Flush(); err != nil { + return false, false, indexID, err + } } bc.timeOfLastFlush.Store(time.Now()) if !shouldImport { - return true, false, nil + return true, false, 0, nil } // Use distributed lock if run in distributed mode). if bc.etcdClient != nil { - distLockKey := fmt.Sprintf("/tidb/distributeLock/%d/%d", bc.jobID, indexID) + distLockKey := fmt.Sprintf("/tidb/distributeLock/%d", bc.jobID) se, _ := concurrency.NewSession(bc.etcdClient) mu, err := acquireLock(bc.ctx, se, distLockKey) if err != nil { - return true, false, errors.Trace(err) + return true, false, 0, errors.Trace(err) } logutil.Logger(bc.ctx).Info("acquire distributed flush lock success", zap.Int64("jobID", bc.jobID)) defer func() { @@ -226,25 +237,29 @@ func (bc *litBackendCtx) Flush(indexID int64, mode FlushMode) (flushed, imported } }() } - err = bc.unsafeImportAndReset(ei) - if err != nil { - return true, false, err + + for indexID, ei := range bc.engines { + if err = bc.unsafeImportAndReset(ei); err != nil { + return true, false, indexID, err + } } - return true, true, nil + + return true, true, 0, nil } func (bc *litBackendCtx) unsafeImportAndReset(ei *engineInfo) error { - logutil.Logger(bc.ctx).Info(LitInfoUnsafeImport, zap.Int64("index ID", ei.indexID), - zap.String("usage info", bc.diskRoot.UsageInfo())) logger := log.FromContext(bc.ctx).With( zap.Stringer("engineUUID", ei.uuid), ) + logger.Info(LitInfoUnsafeImport, + zap.Int64("index ID", ei.indexID), + zap.String("usage info", bc.diskRoot.UsageInfo())) - ei.closedEngine = backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0) + closedEngine := backend.NewClosedEngine(bc.backend, logger, ei.uuid, 0) regionSplitSize := int64(lightning.SplitRegionSize) * int64(lightning.MaxSplitRegionSizeRatio) regionSplitKeys := int64(lightning.SplitRegionKeys) - if err := ei.closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil { + if err := closedEngine.Import(bc.ctx, regionSplitSize, regionSplitKeys); err != nil { logutil.Logger(bc.ctx).Error(LitErrIngestDataErr, zap.Int64("index ID", ei.indexID), zap.String("usage info", bc.diskRoot.UsageInfo())) return err diff --git a/pkg/ddl/ingest/backend_mgr.go b/pkg/ddl/ingest/backend_mgr.go index 5276184a4913e..33c7a81a05ee2 100644 --- a/pkg/ddl/ingest/backend_mgr.go +++ b/pkg/ddl/ingest/backend_mgr.go @@ -28,7 +28,6 @@ import ( ddllogutil "github.com/pingcap/tidb/pkg/ddl/logutil" "github.com/pingcap/tidb/pkg/lightning/backend/local" "github.com/pingcap/tidb/pkg/lightning/config" - "github.com/pingcap/tidb/pkg/util/generic" "github.com/pingcap/tidb/pkg/util/logutil" kvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" @@ -130,7 +129,7 @@ func (m *litBackendCtxMgr) Register( } m.memRoot.RefreshConsumption() - ok := m.memRoot.CheckConsume(StructSizeBackendCtx) + ok := m.memRoot.CheckConsume(structSizeBackendCtx) if !ok { return nil, genBackendAllocMemFailedErr(ctx, m.memRoot, jobID) } @@ -155,7 +154,7 @@ func (m *litBackendCtxMgr) Register( bcCtx := newBackendContext(ctx, jobID, bd, cfg.lightning, defaultImportantVariables, m.memRoot, m.diskRoot, etcdClient) m.backends.m[jobID] = bcCtx - m.memRoot.Consume(StructSizeBackendCtx) + m.memRoot.Consume(structSizeBackendCtx) m.backends.mu.Unlock() logutil.Logger(ctx).Info(LitInfoCreateBackend, zap.Int64("job ID", jobID), @@ -268,15 +267,14 @@ func newBackendContext( etcdClient *clientv3.Client, ) *litBackendCtx { bCtx := &litBackendCtx{ - SyncMap: generic.NewSyncMap[int64, *engineInfo](10), - MemRoot: memRoot, - DiskRoot: diskRoot, + engines: make(map[int64]*engineInfo, 10), + memRoot: memRoot, + diskRoot: diskRoot, jobID: jobID, backend: be, ctx: ctx, cfg: cfg, sysVars: vars, - diskRoot: diskRoot, updateInterval: checkpointUpdateInterval, etcdClient: etcdClient, } @@ -299,12 +297,12 @@ func (m *litBackendCtxMgr) Unregister(jobID int64) { if !exist { return } - bc.unregisterAll(jobID) + bc.UnregisterEngines() bc.backend.Close() if bc.checkpointMgr != nil { bc.checkpointMgr.Close() } - m.memRoot.Release(StructSizeBackendCtx) + m.memRoot.Release(structSizeBackendCtx) m.memRoot.ReleaseWithTag(encodeBackendTag(jobID)) logutil.Logger(bc.ctx).Info(LitInfoCloseBackend, zap.Int64("job ID", jobID), zap.Int64("current memory usage", m.memRoot.CurrentUsage()), diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 1b6a7d6aee916..49fe71f2dfa6e 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -92,9 +92,12 @@ type taskCheckpoint struct { lastBatchRead bool } -// FlushController is an interface to control the flush of the checkpoint. +// FlushController is an interface to control the flush of data so after it +// returns caller can save checkpoint. type FlushController interface { - Flush(indexID int64, mode FlushMode) (flushed, imported bool, err error) + // Flush checks if al engines need to be flushed and imported based on given + // FlushMode. It's concurrent safe. + Flush(mode FlushMode) (flushed, imported bool, errIdxID int64, err error) } // NewCheckpointManager creates a new checkpoint manager. @@ -203,7 +206,7 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error { cp.writtenKeys += delta s.mu.Unlock() - flushed, imported, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeAuto, s.indexIDs) + flushed, imported, _, err := s.flushCtrl.Flush(FlushModeAuto) if !flushed || err != nil { return err } @@ -258,7 +261,7 @@ func (s *CheckpointManager) Close() { // Flush flushed the data and updates checkpoint. func (s *CheckpointManager) Flush() { // use FlushModeForceFlushNoImport to finish the flush process timely. - _, _, _, err := TryFlushAllIndexes(s.flushCtrl, FlushModeForceFlushNoImport, s.indexIDs) + _, _, _, err := s.flushCtrl.Flush(FlushModeForceFlushNoImport) if err != nil { s.logger.Warn("flush local engine failed", zap.Error(err)) } diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index d8ccec06f305c..64a9cf5543e29 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -198,6 +198,6 @@ type dummyFlushCtrl struct { imported bool } -func (d *dummyFlushCtrl) Flush(_ int64, _ ingest.FlushMode) (bool, bool, error) { - return true, d.imported, nil +func (d *dummyFlushCtrl) Flush(_ ingest.FlushMode) (bool, bool, int64, error) { + return true, d.imported, 0, nil } diff --git a/pkg/ddl/ingest/config.go b/pkg/ddl/ingest/config.go index 2b17c413d8c8a..c6b59a531c5cd 100644 --- a/pkg/ddl/ingest/config.go +++ b/pkg/ddl/ingest/config.go @@ -109,7 +109,7 @@ var ( compactConcurrency = 4 ) -func generateLocalEngineConfig(id int64, dbName, tbName string, ts uint64) *backend.EngineConfig { +func generateLocalEngineConfig(ts uint64) *backend.EngineConfig { return &backend.EngineConfig{ Local: backend.LocalEngineConfig{ Compact: true, @@ -117,11 +117,7 @@ func generateLocalEngineConfig(id int64, dbName, tbName string, ts uint64) *back CompactConcurrency: compactConcurrency, BlockSize: 16 * 1024, // using default for DDL }, - TableInfo: &checkpoints.TidbTableInfo{ - ID: id, - DB: dbName, - Name: tbName, - }, + TableInfo: &checkpoints.TidbTableInfo{}, KeepSortDir: true, TS: ts, } diff --git a/pkg/ddl/ingest/engine.go b/pkg/ddl/ingest/engine.go index 1a66d9ef0f4b1..1d3181f9b9359 100644 --- a/pkg/ddl/ingest/engine.go +++ b/pkg/ddl/ingest/engine.go @@ -18,7 +18,6 @@ import ( "context" "strconv" "sync" - "sync/atomic" "github.com/google/uuid" tidbkv "github.com/pingcap/tidb/pkg/kv" @@ -45,7 +44,6 @@ type Writer interface { // To enable uniqueness check, the handle should be non-empty. WriteRow(ctx context.Context, idxKey, idxVal []byte, handle tidbkv.Handle) error LockForWrite() (unlock func()) - Close(ctx context.Context) error } // engineInfo is the engine for one index reorg task, each task will create several new writers under the @@ -55,28 +53,36 @@ type engineInfo struct { jobID int64 indexID int64 openedEngine *backend.OpenedEngine + // closedEngine is set only when all data is finished written and all writers are + // closed. closedEngine *backend.ClosedEngine uuid uuid.UUID cfg *backend.EngineConfig - writerCount int + litCfg *config.Config writerCache generic.SyncMap[int, backend.EngineWriter] memRoot MemRoot flushLock *sync.RWMutex - flushing atomic.Bool } // newEngineInfo create a new engineInfo struct. -func newEngineInfo(ctx context.Context, jobID, indexID int64, cfg *backend.EngineConfig, - en *backend.OpenedEngine, uuid uuid.UUID, wCnt int, memRoot MemRoot) *engineInfo { +func newEngineInfo( + ctx context.Context, + jobID, indexID int64, + cfg *backend.EngineConfig, + litCfg *config.Config, + en *backend.OpenedEngine, + uuid uuid.UUID, + memRoot MemRoot, +) *engineInfo { return &engineInfo{ ctx: ctx, jobID: jobID, indexID: indexID, cfg: cfg, + litCfg: litCfg, openedEngine: en, uuid: uuid, - writerCount: wCnt, - writerCache: generic.NewSyncMap[int, backend.EngineWriter](wCnt), + writerCache: generic.NewSyncMap[int, backend.EngineWriter](4), memRoot: memRoot, flushLock: &sync.RWMutex{}, } @@ -181,9 +187,9 @@ type writerContext struct { // CreateWriter creates a new writerContext. func (ei *engineInfo) CreateWriter(id int) (Writer, error) { ei.memRoot.RefreshConsumption() - ok := ei.memRoot.CheckConsume(StructSizeWriterCtx) + ok := ei.memRoot.CheckConsume(structSizeWriterCtx) if !ok { - return nil, genEngineAllocMemFailedErr(ei.ctx, ei.memRoot, ei.jobID, ei.indexID) + return nil, genWriterAllocMemFailedErr(ei.ctx, ei.memRoot, ei.jobID, ei.indexID) } wCtx, err := ei.newWriterContext(id) @@ -194,10 +200,10 @@ func (ei *engineInfo) CreateWriter(id int) (Writer, error) { return nil, err } - ei.memRoot.Consume(StructSizeWriterCtx) + ei.memRoot.Consume(structSizeWriterCtx) logutil.Logger(ei.ctx).Info(LitInfoCreateWrite, zap.Int64("job ID", ei.jobID), zap.Int64("index ID", ei.indexID), zap.Int("worker ID", id), - zap.Int64("allocate memory", StructSizeWriterCtx), + zap.Int64("allocate memory", structSizeWriterCtx), zap.Int64("current memory usage", ei.memRoot.CurrentUsage()), zap.Int64("max memory quota", ei.memRoot.MaxMemoryQuota())) return wCtx, err @@ -210,6 +216,10 @@ func (ei *engineInfo) CreateWriter(id int) (Writer, error) { func (ei *engineInfo) newWriterContext(workerID int) (*writerContext, error) { lWrite, exist := ei.writerCache.Load(workerID) if !exist { + ok := ei.memRoot.CheckConsume(int64(ei.litCfg.TikvImporter.LocalWriterMemCacheSize)) + if !ok { + return nil, genWriterAllocMemFailedErr(ei.ctx, ei.memRoot, ei.jobID, ei.indexID) + } var err error lWrite, err = ei.openedEngine.LocalWriter(ei.ctx, &backend.LocalWriterConfig{}) if err != nil { @@ -217,6 +227,7 @@ func (ei *engineInfo) newWriterContext(workerID int) (*writerContext, error) { } // Cache the local writer. ei.writerCache.Store(workerID, lWrite) + ei.memRoot.Consume(int64(ei.litCfg.TikvImporter.LocalWriterMemCacheSize)) } wc := &writerContext{ ctx: ei.ctx, @@ -236,8 +247,10 @@ func (ei *engineInfo) closeWriters() error { firstErr = err } } + ei.memRoot.Release(int64(ei.litCfg.TikvImporter.LocalWriterMemCacheSize)) } ei.writerCache.Delete(wid) + ei.memRoot.Release(structSizeWriterCtx) } return firstErr } @@ -261,8 +274,3 @@ func (wCtx *writerContext) LockForWrite() (unlock func()) { wCtx.fLock.RUnlock() } } - -// Close implements ingest.Writer interface. -func (*writerContext) Close(_ context.Context) error { - return nil -} diff --git a/pkg/ddl/ingest/engine_mgr.go b/pkg/ddl/ingest/engine_mgr.go index b4daddaff7df4..8d1449658c1c8 100644 --- a/pkg/ddl/ingest/engine_mgr.go +++ b/pkg/ddl/ingest/engine_mgr.go @@ -15,118 +15,111 @@ package ingest import ( - "fmt" - "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/lightning/backend" - "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" ) -// maxWriterCount is the max number of writers that can be created for a single engine. -const maxWriterCount = 16 +// Register implements BackendCtx. +func (bc *litBackendCtx) Register(indexIDs []int64, tableName string) ([]Engine, error) { + ret := make([]Engine, 0, len(indexIDs)) + + for _, indexID := range indexIDs { + en, ok := bc.engines[indexID] + if !ok { + continue + } + ret = append(ret, en) + } + if l := len(ret); l > 0 { + if l != len(indexIDs) { + return nil, errors.Errorf( + "engines index ID number mismatch: job ID %d, required number of index IDs: %d, actual number of engines: %d", + bc.jobID, len(indexIDs), l, + ) + } + return ret, nil + } -// Register create a new engineInfo and register it to the backend context. -func (bc *litBackendCtx) Register(jobID, indexID int64, schemaName, tableName string) (Engine, error) { - // Calculate lightning concurrency degree and set memory usage - // and pre-allocate memory usage for worker. - bc.MemRoot.RefreshConsumption() - ok := bc.MemRoot.CheckConsume(int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) + bc.memRoot.RefreshConsumption() + numIdx := int64(len(indexIDs)) + engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) + ok := bc.memRoot.CheckConsume(numIdx * (structSizeEngineInfo + engineCacheSize)) if !ok { - return nil, genEngineAllocMemFailedErr(bc.ctx, bc.MemRoot, bc.jobID, indexID) + return nil, genEngineAllocMemFailedErr(bc.ctx, bc.memRoot, bc.jobID, indexIDs) } - var info string - en, exist := bc.Load(indexID) - if !exist || en.openedEngine == nil { - if exist && en.closedEngine != nil { - // Import failed before, try to import again. - err := en.ImportAndClean() - if err != nil { - return nil, errors.Trace(err) - } - } - engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) - ok := bc.MemRoot.CheckConsume(StructSizeEngineInfo + engineCacheSize) - if !ok { - return nil, genEngineAllocMemFailedErr(bc.ctx, bc.MemRoot, bc.jobID, indexID) - } + mgr := backend.MakeEngineManager(bc.backend) + ts := uint64(0) + if c := bc.checkpointMgr; c != nil { + ts = c.GetTS() + } + cfg := generateLocalEngineConfig(ts) - mgr := backend.MakeEngineManager(bc.backend) - ts := uint64(0) - if c := bc.checkpointMgr; c != nil { - ts = c.GetTS() - } - cfg := generateLocalEngineConfig(jobID, schemaName, tableName, ts) - openedEn, err := mgr.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) + openedEngines := make(map[int64]*engineInfo, numIdx) + + for _, indexID := range indexIDs { + openedEngine, err := mgr.OpenEngine(bc.ctx, cfg, tableName, int32(indexID)) if err != nil { - logutil.Logger(bc.ctx).Warn(LitErrCreateEngineFail, zap.Int64("job ID", jobID), - zap.Int64("index ID", indexID), zap.Error(err)) - return nil, errors.Trace(err) - } - id := openedEn.GetEngineUUID() - en = newEngineInfo(bc.ctx, jobID, indexID, cfg, openedEn, id, 1, bc.MemRoot) - bc.Store(indexID, en) - bc.MemRoot.Consume(StructSizeEngineInfo) - bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize) - info = LitInfoOpenEngine - } else { - if en.writerCount+1 > maxWriterCount { - logutil.Logger(bc.ctx).Warn(LitErrExceedConcurrency, zap.Int64("job ID", jobID), + logutil.Logger(bc.ctx).Warn(LitErrCreateEngineFail, + zap.Int64("job ID", bc.jobID), zap.Int64("index ID", indexID), - zap.Int("concurrency", bc.cfg.TikvImporter.RangeConcurrency)) - return nil, dbterror.ErrIngestFailed.FastGenByArgs("concurrency quota exceeded") + zap.Error(err)) + + for _, e := range openedEngines { + e.Clean() + } + return nil, errors.Trace(err) } - en.writerCount++ - info = LitInfoAddWriter + + openedEngines[indexID] = newEngineInfo( + bc.ctx, + bc.jobID, + indexID, + cfg, + bc.cfg, + openedEngine, + openedEngine.GetEngineUUID(), + bc.memRoot, + ) } - bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), int64(bc.cfg.TikvImporter.LocalWriterMemCacheSize)) - logutil.Logger(bc.ctx).Info(info, zap.Int64("job ID", jobID), - zap.Int64("index ID", indexID), - zap.Int64("current memory usage", bc.MemRoot.CurrentUsage()), - zap.Int64("memory limitation", bc.MemRoot.MaxMemoryQuota()), - zap.Int("current writer count", en.writerCount)) - return en, nil -} -// Unregister delete the engineInfo from the engineManager. -func (bc *litBackendCtx) Unregister(jobID, indexID int64) { - ei, exist := bc.Load(indexID) - if !exist { - return + for _, indexID := range indexIDs { + ei := openedEngines[indexID] + ret = append(ret, ei) + bc.engines[indexID] = ei } + bc.memRoot.Consume(numIdx * (structSizeEngineInfo + engineCacheSize)) - ei.Clean() - bc.Delete(indexID) - bc.MemRoot.ReleaseWithTag(encodeEngineTag(jobID, indexID)) - bc.MemRoot.Release(StructSizeWriterCtx * int64(ei.writerCount)) - bc.MemRoot.Release(StructSizeEngineInfo) + logutil.Logger(bc.ctx).Info(LitInfoOpenEngine, zap.Int64("job ID", bc.jobID), + zap.Int64s("index IDs", indexIDs), + zap.Int64("current memory usage", bc.memRoot.CurrentUsage()), + zap.Int64("memory limitation", bc.memRoot.MaxMemoryQuota())) + return ret, nil } -// ResetWorkers reset the writer count of the engineInfo because -// the goroutines of backfill workers have been terminated. -func (bc *litBackendCtx) ResetWorkers(jobID int64) { - for _, indexID := range bc.Keys() { - ei, exist := bc.Load(indexID) - if !exist { - continue - } - bc.MemRoot.Release(StructSizeWriterCtx * int64(ei.writerCount)) - bc.MemRoot.ReleaseWithTag(encodeEngineTag(jobID, indexID)) - engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) - bc.MemRoot.ConsumeWithTag(encodeEngineTag(jobID, indexID), engineCacheSize) - ei.writerCount = 0 +// UnregisterEngines implements BackendCtx. +func (bc *litBackendCtx) UnregisterEngines() { + numIdx := int64(len(bc.engines)) + for _, ei := range bc.engines { + ei.Clean() } -} + bc.engines = make(map[int64]*engineInfo, 10) -// unregisterAll delete all engineInfo from the engineManager. -func (bc *litBackendCtx) unregisterAll(jobID int64) { - for _, idxID := range bc.Keys() { - bc.Unregister(jobID, idxID) - } + engineCacheSize := int64(bc.cfg.TikvImporter.EngineMemCacheSize) + bc.memRoot.Release(numIdx * (structSizeEngineInfo + engineCacheSize)) } -func encodeEngineTag(jobID, indexID int64) string { - return fmt.Sprintf("%d-%d", jobID, indexID) +// FinishedWritingNeedImport implements BackendCtx. +func (bc *litBackendCtx) FinishedWritingNeedImport() bool { + if len(bc.engines) == 0 { + return false + } + for _, ei := range bc.engines { + if ei.closedEngine != nil { + return true + } + } + return false } diff --git a/pkg/ddl/ingest/flush.go b/pkg/ddl/ingest/flush.go deleted file mode 100644 index a3cb89a3886f1..0000000000000 --- a/pkg/ddl/ingest/flush.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package ingest - -// TryFlushAllIndexes tries to flush and import all indexes. -func TryFlushAllIndexes(flushCtrl FlushController, mode FlushMode, indexIDs []int64) (flushed, imported bool, failedIdxID int64, err error) { - allFlushed := true - allImported := true - for _, idxID := range indexIDs { - // TODO(lance6716): use flushCtrl.Flush(indexIDs, mode) - flushed, imported, err := flushCtrl.Flush(idxID, mode) - if err != nil { - return false, false, idxID, err - } - allFlushed = allFlushed && flushed - allImported = allImported && imported - } - return allFlushed, allImported, -1, nil -} diff --git a/pkg/ddl/ingest/mem_root.go b/pkg/ddl/ingest/mem_root.go index 0d37b2ef6096c..e254620a69704 100644 --- a/pkg/ddl/ingest/mem_root.go +++ b/pkg/ddl/ingest/mem_root.go @@ -20,6 +20,7 @@ import ( ) // MemRoot is used to track the memory usage for the lightning backfill process. +// TODO(lance6716): change API to prevent TOCTOU. type MemRoot interface { Consume(size int64) Release(size int64) @@ -35,18 +36,18 @@ type MemRoot interface { } var ( - // StructSizeBackendCtx is the size of litBackendCtx. - StructSizeBackendCtx int64 - // StructSizeEngineInfo is the size of engineInfo. - StructSizeEngineInfo int64 - // StructSizeWriterCtx is the size of writerContext. - StructSizeWriterCtx int64 + // structSizeBackendCtx is the size of litBackendCtx. + structSizeBackendCtx int64 + // structSizeEngineInfo is the size of engineInfo. + structSizeEngineInfo int64 + // structSizeWriterCtx is the size of writerContext. + structSizeWriterCtx int64 ) func init() { - StructSizeBackendCtx = int64(unsafe.Sizeof(litBackendCtx{})) - StructSizeEngineInfo = int64(unsafe.Sizeof(engineInfo{})) - StructSizeWriterCtx = int64(unsafe.Sizeof(writerContext{})) + structSizeBackendCtx = int64(unsafe.Sizeof(litBackendCtx{})) + structSizeEngineInfo = int64(unsafe.Sizeof(engineInfo{})) + structSizeWriterCtx = int64(unsafe.Sizeof(writerContext{})) } // memRootImpl is an implementation of MemRoot. diff --git a/pkg/ddl/ingest/message.go b/pkg/ddl/ingest/message.go index 1217244c6f2ac..374575d3a4b4e 100644 --- a/pkg/ddl/ingest/message.go +++ b/pkg/ddl/ingest/message.go @@ -67,8 +67,18 @@ func genBackendAllocMemFailedErr(ctx context.Context, memRoot MemRoot, jobID int return dbterror.ErrIngestFailed.FastGenByArgs("memory used up") } -func genEngineAllocMemFailedErr(ctx context.Context, memRoot MemRoot, jobID, idxID int64) error { - logutil.Logger(ctx).Warn(LitErrAllocMemFail, zap.Int64("job ID", jobID), +func genEngineAllocMemFailedErr(ctx context.Context, memRoot MemRoot, jobID int64, idxIDs []int64) error { + logutil.Logger(ctx).Warn(LitErrAllocMemFail, + zap.Int64("job ID", jobID), + zap.Int64s("index IDs", idxIDs), + zap.Int64("current memory usage", memRoot.CurrentUsage()), + zap.Int64("max memory quota", memRoot.MaxMemoryQuota())) + return dbterror.ErrIngestFailed.FastGenByArgs("memory used up") +} + +func genWriterAllocMemFailedErr(ctx context.Context, memRoot MemRoot, jobID int64, idxID int64) error { + logutil.Logger(ctx).Warn(LitErrAllocMemFail, + zap.Int64("job ID", jobID), zap.Int64("index ID", idxID), zap.Int64("current memory usage", memRoot.CurrentUsage()), zap.Int64("max memory quota", memRoot.MaxMemoryQuota())) diff --git a/pkg/ddl/ingest/mock.go b/pkg/ddl/ingest/mock.go index 4393861003759..06207ae39b7fe 100644 --- a/pkg/ddl/ingest/mock.go +++ b/pkg/ddl/ingest/mock.go @@ -105,14 +105,23 @@ type MockBackendCtx struct { } // Register implements BackendCtx.Register interface. -func (m *MockBackendCtx) Register(jobID, indexID int64, _, _ string) (Engine, error) { - logutil.DDLIngestLogger().Info("mock backend ctx register", zap.Int64("jobID", jobID), zap.Int64("indexID", indexID)) - return &MockEngineInfo{sessCtx: m.sessCtx, mu: &m.mu}, nil +func (m *MockBackendCtx) Register(indexIDs []int64, _ string) ([]Engine, error) { + logutil.DDLIngestLogger().Info("mock backend ctx register", zap.Int64("jobID", m.jobID), zap.Int64s("indexIDs", indexIDs)) + ret := make([]Engine, 0, len(indexIDs)) + for range indexIDs { + ret = append(ret, &MockEngineInfo{sessCtx: m.sessCtx, mu: &m.mu}) + } + return ret, nil } -// Unregister implements BackendCtx.Unregister interface. -func (*MockBackendCtx) Unregister(jobID, indexID int64) { - logutil.DDLIngestLogger().Info("mock backend ctx unregister", zap.Int64("jobID", jobID), zap.Int64("indexID", indexID)) +// UnregisterEngines implements BackendCtx.UnregisterEngines interface. +func (*MockBackendCtx) UnregisterEngines() { + logutil.DDLIngestLogger().Info("mock backend ctx unregister") +} + +// FinishedWritingNeedImport implements BackendCtx interface. +func (*MockBackendCtx) FinishedWritingNeedImport() bool { + return false } // CollectRemoteDuplicateRows implements BackendCtx.CollectRemoteDuplicateRows interface. @@ -127,13 +136,9 @@ func (*MockBackendCtx) FinishImport(indexID int64, _ bool, _ table.Table) error return nil } -// ResetWorkers implements BackendCtx.ResetWorkers interface. -func (*MockBackendCtx) ResetWorkers(_ int64) { -} - // Flush implements BackendCtx.Flush interface. -func (*MockBackendCtx) Flush(_ int64, _ FlushMode) (flushed bool, imported bool, err error) { - return false, false, nil +func (*MockBackendCtx) Flush(_ FlushMode) (flushed bool, imported bool, errIdxID int64, err error) { + return false, false, 0, nil } // Done implements BackendCtx.Done interface. @@ -243,10 +248,5 @@ func (*MockWriter) LockForWrite() func() { return func() {} } -// Close implements Writer.Close interface. -func (*MockWriter) Close(_ context.Context) error { - return nil -} - // MockExecAfterWriteRow is only used for test. var MockExecAfterWriteRow func()