From 9d3e36542c5a4b179bd7394ec15fef6cfc90d689 Mon Sep 17 00:00:00 2001 From: yangxuan Date: Wed, 6 Mar 2024 16:43:59 +0800 Subject: [PATCH] fix: DC painc at dropvchannel when disable compaction Make EnableCompaction able to change dynamicly See also: #31059 Signed-off-by: yangxuan --- internal/datacoord/compaction.go | 95 +++++++++++++++--------- internal/datacoord/compaction_test.go | 4 +- internal/datacoord/compaction_trigger.go | 38 ++++++---- internal/datacoord/server.go | 46 ++++++------ internal/datacoord/services.go | 17 ++--- pkg/util/paramtable/component_param.go | 2 +- 6 files changed, 116 insertions(+), 86 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index 69c1384e3eb85..7648691ca28ea 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -50,8 +50,6 @@ type compactionPlanContext interface { execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error // getCompaction return compaction task. If planId does not exist, return nil. getCompaction(planID int64) *compactionTask - // updateCompaction set the compaction state to timeout or completed - updateCompaction(ts Timestamp) error // isFull return true if the task pool is full isFull() bool // get compaction tasks by signal id @@ -137,37 +135,6 @@ func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta C } } -func (c *compactionPlanHandler) checkResult() { - // deal results - ts, err := c.GetCurrentTS() - if err != nil { - log.Warn("fail to check result", zap.Error(err)) - return - } - _ = c.updateCompaction(ts) -} - -func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) { - interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second) - ctx, cancel := context.WithTimeout(context.Background(), interval) - defer cancel() - ts, err := c.allocator.allocTimestamp(ctx) - if err != nil { - log.Warn("unable to alloc timestamp", zap.Error(err)) - return 0, err - } - return ts, nil -} - -func (c *compactionPlanHandler) schedule() { - // schedule queuing tasks - tasks := c.scheduler.Schedule() - if len(tasks) > 0 { - c.notifyTasks(tasks) - c.scheduler.LogStatus() - } -} - func (c *compactionPlanHandler) start() { interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second) c.stopCh = make(chan struct{}) @@ -184,7 +151,9 @@ func (c *compactionPlanHandler) start() { log.Info("compaction handler check result loop quit") return case <-checkResultTicker.C: - c.checkResult() + if compactionEnabled() { + c.CheckResult() + } } } }() @@ -203,7 +172,9 @@ func (c *compactionPlanHandler) start() { return case <-scheduleTicker.C: - c.schedule() + if compactionEnabled() { + c.Schedule() + } } } }() @@ -218,12 +189,45 @@ func (c *compactionPlanHandler) start() { log.Info("Compaction handler quit clean") return case <-cleanTicker.C: - c.Clean() + if compactionEnabled() { + c.Clean() + } } } }() } +func (c *compactionPlanHandler) CheckResult() { + // deal results + ts, err := c.getCurrentTs() + if err != nil { + log.Warn("fail to check result", zap.Error(err)) + return + } + _ = c.updateCompaction(ts) +} + +func (c *compactionPlanHandler) getCurrentTs() (Timestamp, error) { + interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), interval) + defer cancel() + ts, err := c.allocator.allocTimestamp(ctx) + if err != nil { + log.Warn("unable to alloc timestamp", zap.Error(err)) + return 0, err + } + return ts, nil +} + +func (c *compactionPlanHandler) Schedule() { + // schedule queuing tasks + tasks := c.scheduler.Schedule() + if len(tasks) > 0 { + c.notifyTasks(tasks) + c.scheduler.LogStatus() + } +} + func (c *compactionPlanHandler) Clean() { current := tsoutil.GetCurrentTime() c.mu.Lock() @@ -248,6 +252,10 @@ func (c *compactionPlanHandler) stop() { } func (c *compactionPlanHandler) removeTasksByChannel(channel string) { + if !compactionEnabled() { + return + } + c.mu.Lock() defer c.mu.Unlock() for id, task := range c.plans { @@ -388,6 +396,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { // execCompactionPlan start to execute plan and return immediately func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { + if !compactionEnabled() { + return nil + } return c.enqueuePlan(signal, plan) } @@ -497,12 +508,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact // getCompaction return compaction task. If planId does not exist, return nil. func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask { + if !compactionEnabled() { + return nil + } + c.mu.RLock() defer c.mu.RUnlock() return c.plans[planID] } +// updateCompaction is the inner func to check compaction task states from datanode func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. @@ -603,6 +619,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou // isFull return true if the task pool is full func (c *compactionPlanHandler) isFull() bool { + if !compactionEnabled() { + return false + } return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() } @@ -620,6 +639,10 @@ func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*co // get compaction tasks by signal id; if signalID == 0 return all tasks func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask { + if !compactionEnabled() { + return nil + } + c.mu.RLock() defer c.mu.RUnlock() diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index cbb3dd6dc8ddb..f3f7422e687a6 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -87,13 +87,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() { { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) - handler.checkResult() + handler.CheckResult() } { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once() handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) - handler.checkResult() + handler.CheckResult() } } diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 1a26f3d1cea37..f89a1589d11ed 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -49,11 +49,12 @@ type compactTime struct { type trigger interface { start() stop() - // triggerCompaction triggers a compaction if any compaction condition satisfy. - triggerCompaction() error + // triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment + // used by savebinlogpath triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error - // forceTriggerCompaction force to start a compaction + + // forceTriggerCompaction force to start a compaction, used by manualCompaction, highest priority forceTriggerCompaction(collectionID int64) (UniqueID, error) } @@ -148,11 +149,6 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { defer logutil.LogPanic() defer t.wg.Done() - // If AutoCompaction disabled, global loop will not start - if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { - return - } - for { select { case <-t.quit: @@ -160,9 +156,11 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { log.Info("global compaction loop exit") return case <-t.globalTrigger.C: - err := t.triggerCompaction() - if err != nil { - log.Warn("unable to triggerCompaction", zap.Error(err)) + if autoCompactionEnabled() { + err := t.triggerCompaction() + if err != nil { + log.Warn("unable to trigger global compaction", zap.Error(err)) + } } } } @@ -236,7 +234,7 @@ func (t *compactionTrigger) getCompactTime(ts Timestamp, coll *collectionInfo) ( return &compactTime{0, 0}, nil } -// triggerCompaction trigger a compaction if any compaction condition satisfy. +// triggerCompaction is an internal global compaction trigger func (t *compactionTrigger) triggerCompaction() error { id, err := t.allocSignalID() if err != nil { @@ -253,8 +251,8 @@ func (t *compactionTrigger) triggerCompaction() error { // triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error { - // If AutoCompaction disabled, flush request will not trigger compaction - if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { + // If AutoCompaction disabled, no task would be generated + if !autoCompactionEnabled() { return nil } @@ -287,6 +285,10 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s // forceTriggerCompaction force to start a compaction // invoked by user `ManualCompaction` operation func (t *compactionTrigger) forceTriggerCompaction(collectionID int64) (UniqueID, error) { + if !compactionEnabled() { + return -1, merr.WrapErrServiceUnavailable("compaction disabled") + } + id, err := t.allocSignalID() if err != nil { return -1, err @@ -1008,3 +1010,11 @@ func fetchSegIDs(segBinLogs []*datapb.CompactionSegmentBinlogs) []int64 { } return segIDs } + +func compactionEnabled() bool { + return Params.DataCoordCfg.EnableCompaction.GetAsBool() +} + +func autoCompactionEnabled() bool { + return compactionEnabled() && Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 99b4c76ada2d9..f8d5e43acb24e 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -361,11 +361,8 @@ func (s *Server) initDataCoord() error { } log.Info("init service discovery done") - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.createCompactionHandler() - s.createCompactionTrigger() - log.Info("init compaction scheduler done") - } + s.initCompaction() + log.Info("init compaction done") if err = s.initSegmentManager(); err != nil { return err @@ -405,11 +402,6 @@ func (s *Server) Start() error { } func (s *Server) startDataCoord() { - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.compactionHandler.start() - s.compactionTrigger.start() - s.compactionViewManager.Start() - } s.startServerLoop() // http.Register(&http.Handler{ @@ -497,23 +489,32 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ s.indexNodeCreator = f } -func (s *Server) createCompactionHandler() { +func (s *Server) initCompaction() { s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) + triggerv2 := NewCompactionTriggerManager(s.allocator, s.compactionHandler) s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator) } -func (s *Server) stopCompactionHandler() { - s.compactionHandler.stop() - s.compactionViewManager.Close() +func (s *Server) startCompaction() { + s.compactionHandler.start() + s.compactionTrigger.start() + s.compactionViewManager.Start() } -func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) -} +func (s *Server) stopCompaction() { + if s.compactionHandler != nil { + s.compactionHandler.stop() + } + + if s.compactionViewManager != nil { + s.compactionViewManager.Close() + } -func (s *Server) stopCompactionTrigger() { - s.compactionTrigger.stop() + if s.compactionTrigger != nil { + s.compactionTrigger.stop() + } } func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) { @@ -656,6 +657,8 @@ func (s *Server) initIndexNodeManager() { } func (s *Server) startServerLoop() { + s.startCompaction() + s.serverLoopWg.Add(2) if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { s.serverLoopWg.Add(1) @@ -1114,10 +1117,7 @@ func (s *Server) Stop() error { s.importScheduler.Close() s.importChecker.Close() - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.stopCompactionTrigger() - s.stopCompactionHandler() - } + s.stopCompaction() logutil.Logger(s.ctx).Info("datacoord compaction stopped") s.indexBuilder.Stop() diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 8d1cb9189daf7..54355ac96f2a0 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -568,12 +568,12 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath // notify building index s.flushCh <- req.SegmentID - // notify compaction - if !req.Importing && paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() { + if !req.Importing { + // notify compaction err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(), req.GetSegmentID(), req.GetChannel(), false) if err != nil { - log.Warn("failed to trigger single compaction") + log.Warn("failed to trigger single compaction", zap.Error(err)) } } } @@ -639,6 +639,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual log.Warn("DropVChannel failed to ReleaseAndRemove", zap.String("channel", channel), zap.Error(err)) } s.segmentManager.DropSegmentsOfChannel(ctx, channel) + s.compactionHandler.removeTasksByChannel(channel) metrics.CleanupDataCoordNumStoredRows(collectionID) @@ -1104,11 +1105,6 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa }, nil } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { - resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) - return resp, nil - } - id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID) if err != nil { log.Error("failed to trigger manual compaction", zap.Error(err)) @@ -1145,7 +1141,7 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac }, nil } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { + if !compactionEnabled() { resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } @@ -1185,7 +1181,8 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. resp := &milvuspb.GetCompactionPlansResponse{ Status: merr.Success(), } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { + + if !compactionEnabled() { resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 49c8eb2c85ed9..369d65eda4718 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2503,7 +2503,7 @@ type dataCoordConfig struct { AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` // compaction - EnableCompaction ParamItem `refreshable:"false"` + EnableCompaction ParamItem `refreshable:"true"` EnableAutoCompaction ParamItem `refreshable:"true"` IndexBasedCompaction ParamItem `refreshable:"true"`