diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index dff05b00aaf53..40ab9466aa908 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -51,8 +51,6 @@ type compactionPlanContext interface { execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error // getCompaction return compaction task. If planId does not exist, return nil. getCompaction(planID int64) *compactionTask - // updateCompaction set the compaction state to timeout or completed - updateCompaction(ts Timestamp) error // isFull return true if the task pool is full isFull() bool // get compaction tasks by signal id @@ -138,41 +136,6 @@ func newCompactionPlanHandler(sessions SessionManager, cm ChannelManager, meta C } } -func (c *compactionPlanHandler) checkResult() { - // deal results - ts, err := c.GetCurrentTS() - if err != nil { - log.Warn("fail to check result", zap.Error(err)) - return - } - err = c.updateCompaction(ts) - if err != nil { - log.Warn("fail to update compaction", zap.Error(err)) - return - } -} - -func (c *compactionPlanHandler) GetCurrentTS() (Timestamp, error) { - interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second) - ctx, cancel := context.WithTimeout(context.Background(), interval) - defer cancel() - ts, err := c.allocator.allocTimestamp(ctx) - if err != nil { - log.Warn("unable to alloc timestamp", zap.Error(err)) - return 0, err - } - return ts, nil -} - -func (c *compactionPlanHandler) schedule() { - // schedule queuing tasks - tasks := c.scheduler.Schedule() - if len(tasks) > 0 { - c.notifyTasks(tasks) - c.scheduler.LogStatus() - } -} - func (c *compactionPlanHandler) start() { interval := Params.DataCoordCfg.CompactionCheckIntervalInSeconds.GetAsDuration(time.Second) c.stopCh = make(chan struct{}) @@ -189,7 +152,9 @@ func (c *compactionPlanHandler) start() { log.Info("compaction handler check result loop quit") return case <-checkResultTicker.C: - c.checkResult() + if compactionEnabled() { + c.CheckResult() + } } } }() @@ -208,7 +173,9 @@ func (c *compactionPlanHandler) start() { return case <-scheduleTicker.C: - c.schedule() + if compactionEnabled() { + c.Schedule() + } } } }() @@ -223,12 +190,49 @@ func (c *compactionPlanHandler) start() { log.Info("Compaction handler quit clean") return case <-cleanTicker.C: - c.Clean() + if compactionEnabled() { + c.Clean() + } } } }() } +func (c *compactionPlanHandler) CheckResult() { + // deal results + ts, err := c.getCurrentTs() + if err != nil { + log.Warn("fail to check result", zap.Error(err)) + return + } + err = c.updateCompaction(ts) + if err != nil { + log.Warn("fail to update compaction", zap.Error(err)) + return + } +} + +func (c *compactionPlanHandler) getCurrentTs() (Timestamp, error) { + interval := Params.DataCoordCfg.CompactionRPCTimeout.GetAsDuration(time.Second) + ctx, cancel := context.WithTimeout(context.Background(), interval) + defer cancel() + ts, err := c.allocator.allocTimestamp(ctx) + if err != nil { + log.Warn("unable to alloc timestamp", zap.Error(err)) + return 0, err + } + return ts, nil +} + +func (c *compactionPlanHandler) Schedule() { + // schedule queuing tasks + tasks := c.scheduler.Schedule() + if len(tasks) > 0 { + c.notifyTasks(tasks) + c.scheduler.LogStatus() + } +} + func (c *compactionPlanHandler) Clean() { current := tsoutil.GetCurrentTime() c.mu.Lock() @@ -253,6 +257,10 @@ func (c *compactionPlanHandler) stop() { } func (c *compactionPlanHandler) removeTasksByChannel(channel string) { + if !compactionEnabled() { + return + } + c.mu.Lock() defer c.mu.Unlock() for id, task := range c.plans { @@ -408,6 +416,9 @@ func (c *compactionPlanHandler) notifyTasks(tasks []*compactionTask) { // execCompactionPlan start to execute plan and return immediately func (c *compactionPlanHandler) execCompactionPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { + if !compactionEnabled() { + return nil + } return c.enqueuePlan(signal, plan) } @@ -517,12 +528,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact // getCompaction return compaction task. If planId does not exist, return nil. func (c *compactionPlanHandler) getCompaction(planID int64) *compactionTask { + if !compactionEnabled() { + return nil + } + c.mu.RLock() defer c.mu.RUnlock() return c.plans[planID] } +// updateCompaction is the inner func to check compaction task states from datanode func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // Get executing executingTasks before GetCompactionState from DataNode to prevent false failure, // for DC might add new task while GetCompactionState. @@ -653,6 +669,9 @@ func (c *compactionPlanHandler) isTimeout(now Timestamp, start Timestamp, timeou // isFull return true if the task pool is full func (c *compactionPlanHandler) isFull() bool { + if !compactionEnabled() { + return true + } return c.scheduler.GetTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() } @@ -670,6 +689,10 @@ func (c *compactionPlanHandler) getTasksByState(state compactionTaskState) []*co // get compaction tasks by signal id; if signalID == 0 return all tasks func (c *compactionPlanHandler) getCompactionTasksBySignalID(signalID int64) []*compactionTask { + if !compactionEnabled() { + return nil + } + c.mu.RLock() defer c.mu.RUnlock() diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index e21f06f29f626..7a9cbe1b6f3ff 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -29,6 +29,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metautil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -74,6 +75,13 @@ func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() { handler.mu.Lock() s.Equal(0, len(handler.plans)) handler.mu.Unlock() + + s.Run("disable compaction", func() { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) + + handler.removeTasksByChannel(ch) + }) } func (s *CompactionPlanHandlerSuite) TestCheckResult() { @@ -88,13 +96,13 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() { { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) - handler.checkResult() + handler.CheckResult() } { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once() handler := newCompactionPlanHandler(s.mockSessMgr, nil, nil, s.mockAlloc) - handler.checkResult() + handler.CheckResult() } } @@ -468,6 +476,14 @@ func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() { } }) } + + s.Run("disable compaction", func() { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) + + err := handler.execCompactionPlan(&compactionSignal{id: 333}, &datapb.CompactionPlan{PlanID: 333, Channel: "ch-2"}) + s.NoError(err) + }) } func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { @@ -679,6 +695,17 @@ func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() { task = handler.getCompaction(19530) s.Nil(task) + + s.Run("disable compaction", func() { + paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) + + got := handler.getCompactionTasksBySignalID(1) + s.Nil(got) + + task := handler.getCompaction(1) + s.Nil(task) + }) } func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index df7ebc65d9783..48592612af68a 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -34,6 +34,7 @@ import ( "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/indexparamcheck" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" "github.com/milvus-io/milvus/pkg/util/typeutil" @@ -47,11 +48,12 @@ type compactTime struct { type trigger interface { start() stop() - // triggerCompaction triggers a compaction if any compaction condition satisfy. - triggerCompaction() error - // triggerSingleCompaction triggers a compaction bundled with collection-partition-channel-segment + + // triggerSingleCompaction triggers a compaction on given (collection, partition, channel, segment), + // used when a segment gets flushed triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error - // forceTriggerCompaction force to start a compaction + + // forceTriggerCompaction forces to trigger a compaction, used by ManualCompaction, highest priority forceTriggerCompaction(collectionID int64) (UniqueID, error) } @@ -121,6 +123,7 @@ func (t *compactionTrigger) start() { log.Info("compaction trigger quit") return case signal := <-t.signals: + // When compaction is disabled, there should be no signals to handle. switch { case signal.isGlobal: // ManualCompaction also use use handleGlobalSignal @@ -146,11 +149,7 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { defer logutil.LogPanic() defer t.wg.Done() - // If AutoCompaction disabled, global loop will not start - if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { - return - } - + log.Info("start global compaction loop") for { select { case <-t.quit: @@ -158,9 +157,11 @@ func (t *compactionTrigger) startGlobalCompactionLoop() { log.Info("global compaction loop exit") return case <-t.globalTrigger.C: - err := t.triggerCompaction() - if err != nil { - log.Warn("unable to triggerCompaction", zap.Error(err)) + if autoCompactionEnabled() { + err := t.triggerCompaction() + if err != nil { + log.Warn("unable to trigger global compaction", zap.Error(err)) + } } } } @@ -234,7 +235,7 @@ func (t *compactionTrigger) getCompactTime(ts Timestamp, coll *collectionInfo) ( return &compactTime{0, 0}, nil } -// triggerCompaction trigger a compaction if any compaction condition satisfy. +// triggerCompaction is an internal global compaction trigger func (t *compactionTrigger) triggerCompaction() error { id, err := t.allocSignalID() if err != nil { @@ -251,8 +252,8 @@ func (t *compactionTrigger) triggerCompaction() error { // triggerSingleCompaction triger a compaction bundled with collection-partition-channel-segment func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, segmentID int64, channel string, blockToSendSignal bool) error { - // If AutoCompaction disabled, flush request will not trigger compaction - if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { + // If AutoCompaction disabled, no task would be generated + if !autoCompactionEnabled() { return nil } @@ -285,6 +286,10 @@ func (t *compactionTrigger) triggerSingleCompaction(collectionID, partitionID, s // forceTriggerCompaction force to start a compaction // invoked by user `ManualCompaction` operation func (t *compactionTrigger) forceTriggerCompaction(collectionID int64) (UniqueID, error) { + if !compactionEnabled() { + return -1, merr.WrapErrServiceUnavailable("compaction disabled") + } + id, err := t.allocSignalID() if err != nil { return -1, err @@ -311,16 +316,8 @@ func (t *compactionTrigger) allocSignalID() (UniqueID, error) { return t.allocator.allocID(ctx) } -func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 { - indexInfos := t.meta.indexMeta.GetIndexesForCollection(collectionID, "") - - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - collMeta, err := t.handler.GetCollection(ctx, collectionID) - if err != nil { - log.Warn("failed to get collection", zap.Int64("collectionID", collectionID), zap.Error(err)) - return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 - } +func (t *compactionTrigger) getExpectedSegmentSize(collMeta *collectionInfo) int64 { + indexInfos := t.meta.indexMeta.GetIndexesForCollection(collMeta.ID, "") vectorFields := typeutil.GetVectorFieldSchemas(collMeta.Schema) fieldIndexTypes := lo.SliceToMap(indexInfos, func(t *model.Index) (int64, indexparamcheck.IndexType) { @@ -342,14 +339,42 @@ func (t *compactionTrigger) getExpectedSegmentSize(collectionID int64) int64 { return Params.DataCoordCfg.SegmentMaxSize.GetAsInt64() * 1024 * 1024 } +func (t *compactionTrigger) SelectIndexedSegments(coll *collectionInfo, segments []*SegmentInfo) []*SegmentInfo { + // Get all vector fields from collection schema + vecFields := lo.FilterMap(coll.Schema.GetFields(), func(f *schemapb.FieldSchema, _ int) (int64, bool) { + if typeutil.IsVectorType(f.GetDataType()) { + return f.GetFieldID(), true + } + return 0, false + }) + segmentIDs := lo.Map(segments, func(info *SegmentInfo, _ int) int64 { + return info.GetID() + }) + + // get indexed segments that finished index on all vector field + indexed := t.meta.indexMeta.GetIndexedSegments(coll.ID, segmentIDs, vecFields) + if len(indexed) == 0 { + return nil + } + + indexedSet := typeutil.NewUniqueSet(indexed...) + return lo.Filter(segments, func(segInfo *SegmentInfo, _ int) bool { + return indexedSet.Contain(segInfo.GetID()) + }) +} + func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { t.forceMu.Lock() defer t.forceMu.Unlock() + start := time.Now() log := log.With(zap.Int64("compactionID", signal.id), zap.Int64("signal.collectionID", signal.collectionID), zap.Int64("signal.partitionID", signal.partitionID), - zap.Int64("signal.segmentID", signal.segmentID)) + zap.Int64("signal.segmentID", signal.segmentID), + zap.Bool("signal.force", signal.isForce), + ) + m := t.meta.GetSegmentsChanPart(func(segment *SegmentInfo) bool { return (signal.collectionID == 0 || segment.CollectionID == signal.collectionID) && isSegmentHealthy(segment) && @@ -358,213 +383,245 @@ func (t *compactionTrigger) handleGlobalSignal(signal *compactionSignal) error { !segment.GetIsImporting() && // not importing now segment.GetLevel() != datapb.SegmentLevel_L0 // ignore level zero segments }) // m is list of chanPartSegments, which is channel-partition organized segments - if len(m) == 0 { log.Info("the length of SegmentsChanPart is 0, skip to handle compaction") return nil } - ts, err := t.allocTs() - if err != nil { - log.Warn("allocate ts failed, skip to handle compaction") - return err - } - channelCheckpointOK := make(map[string]bool) isChannelCPOK := func(channelName string) bool { cached, ok := channelCheckpointOK[channelName] if ok { return cached } - return t.isChannelCheckpointHealthy(channelName) + + isHealthy := t.isChannelCheckpointHealthy(channelName) + return isHealthy + } + + collCache := make(map[int64]*collectionInfo) + getColl := func(collectionID int64) (*collectionInfo, error) { + if cached, ok := collCache[collectionID]; ok { + return cached, nil + } + + coll, err := t.getCollection(collectionID) + if err != nil { + return nil, err + } + + collCache[collectionID] = coll + return coll, nil + } + + ts, err := t.allocTs() + if err != nil { + log.Warn("allocate ts failed, skip to handle compaction") + return err } for _, group := range m { - log := log.With(zap.Int64("collectionID", group.collectionID), + log := log.With( + zap.Int64("collectionID", group.collectionID), zap.Int64("partitionID", group.partitionID), - zap.String("channel", group.channelName)) - if !signal.isForce && t.compactionHandler.isFull() { - log.Warn("compaction plan skipped due to handler full") - break - } - if !isChannelCPOK(group.channelName) && !signal.isForce { - log.Warn("compaction plan skipped due to channel checkpoint lag", zap.String("channel", signal.channel)) + zap.String("channel", group.channelName), + ) + + coll, err := getColl(group.collectionID) + if err != nil { + log.Warn("failed to get collection info, skip handling compaction", zap.Error(err)) continue } - if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { - group.segments = FilterInIndexedSegments(t.handler, t.meta, group.segments...) + ct, err := t.getCompactTime(ts, coll) + if err != nil { + log.Warn("get compact time failed, skip to handle compaction", zap.Error(err)) + continue } - coll, err := t.getCollection(group.collectionID) - if err != nil { - log.Warn("get collection info failed, skip handling compaction", zap.Error(err)) - return err + if !signal.isForce { + if !t.isCollectionAutoCompactionEnabled(coll) { + log.RatedInfo(20, "collection auto compaction disabled") + continue + } + + if t.compactionHandler.isFull() { + log.Warn("compaction plan skipped due to handler full") + continue + } + + if !isChannelCPOK(group.channelName) { + continue + } } - if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) { - log.RatedInfo(20, "collection auto compaction disabled", - zap.Int64("collectionID", group.collectionID), - ) - return nil + if paramtable.Get().DataCoordCfg.IndexBasedCompaction.GetAsBool() { + group.segments = t.SelectIndexedSegments(coll, group.segments) } - ct, err := t.getCompactTime(ts, coll) - if err != nil { - log.Warn("get compact time failed, skip to handle compaction", - zap.Int64("collectionID", group.collectionID), - zap.Int64("partitionID", group.partitionID), - zap.String("channel", group.channelName)) - return err + if len(group.segments) == 0 { + continue } - plans := t.generatePlans(group.segments, signal.isForce, ct) + expectedSize := t.getExpectedSegmentSize(coll) + plans := t.generatePlans(group.segments, signal.isForce, ct, expectedSize) for _, plan := range plans { - segIDs := fetchSegIDs(plan.GetSegmentBinlogs()) - + log := log.With(zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs()))) if !signal.isForce && t.compactionHandler.isFull() { - log.Warn("compaction plan skipped due to handler full", - zap.Int64("collectionID", signal.collectionID), - zap.Int64s("segmentIDs", segIDs)) + log.Warn("compaction plan skipped due to handler full") break } - start := time.Now() - if err := fillOriginPlan(t.allocator, plan); err != nil { - log.Warn("failed to fill plan", - zap.Int64("collectionID", signal.collectionID), - zap.Int64s("segmentIDs", segIDs), - zap.Error(err)) - continue - } - err := t.compactionHandler.execCompactionPlan(signal, plan) - if err != nil { - log.Warn("failed to execute compaction plan", - zap.Int64("collectionID", signal.collectionID), - zap.Int64("planID", plan.PlanID), - zap.Int64s("segmentIDs", segIDs), - zap.Error(err)) + + if err := t.submitPlan(signal, plan); err != nil { + log.Warn("failed to submit compaction plan", zap.Int64("planID", plan.GetPlanID()), zap.Error(err)) continue } log.Info("time cost of generating global compaction", - zap.Int64("planID", plan.PlanID), - zap.Int64("time cost", time.Since(start).Milliseconds()), - zap.Int64("collectionID", signal.collectionID), - zap.String("channel", group.channelName), - zap.Int64("partitionID", group.partitionID), - zap.Int64s("segmentIDs", segIDs)) + zap.Int64("planID", plan.GetPlanID()), + zap.Duration("time cost", time.Since(start)), + ) } } return nil } // handleSignal processes segment flush caused partition-chan level compaction signal +// non-force signal handler, ignore isForce in signal func (t *compactionTrigger) handleSignal(signal *compactionSignal) { t.forceMu.Lock() defer t.forceMu.Unlock() - // 1. check whether segment's binlogs should be compacted or not - if t.compactionHandler.isFull() { - log.Warn("compaction plan skipped due to handler full") - return - } - - if !t.isChannelCheckpointHealthy(signal.channel) { - log.Warn("compaction plan skipped due to channel checkpoint lag", zap.String("channel", signal.channel)) - return - } - + start := time.Now() segment := t.meta.GetHealthySegment(signal.segmentID) if segment == nil { log.Warn("segment in compaction signal not found in meta", zap.Int64("segmentID", signal.segmentID)) return } - channel := segment.GetInsertChannel() - partitionID := segment.GetPartitionID() - collectionID := segment.GetCollectionID() - segments := t.getCandidateSegments(channel, partitionID) + log := log.With( + zap.Int64("collectionID", segment.GetCollectionID()), + zap.Int64("partitionID", segment.GetPartitionID()), + zap.String("channel", segment.GetInsertChannel()), + ) - if len(segments) == 0 { - log.Info("the number of candidate segments is 0, skip to handle compaction") + coll, err := t.getCollection(segment.GetCollectionID()) + if err != nil { + log.Warn("failed to get collection schema", zap.Error(err)) return } - ts, err := t.allocTs() - if err != nil { - log.Warn("allocate ts failed, skip to handle compaction", zap.Int64("collectionID", signal.collectionID), - zap.Int64("partitionID", signal.partitionID), zap.Int64("segmentID", signal.segmentID)) + // 1. check can do compaction + if !t.ifChannelCanDoCompaction(signal.channel, coll) { return } - coll, err := t.getCollection(collectionID) - if err != nil { - log.Warn("get collection info failed, skip handling compaction", - zap.Int64("collectionID", collectionID), - zap.Int64("partitionID", partitionID), - zap.String("channel", channel), - zap.Error(err), - ) + // 2. get grouped segments + segments := t.meta.SelectSegments(func(segInfo *SegmentInfo) bool { + return isSegmentHealthy(segInfo) && + // choose the segments from the same collection, channel, partition + segInfo.GetInsertChannel() == segment.GetInsertChannel() && + segInfo.GetCollectionID() == segment.GetCollectionID() && + segInfo.GetPartitionID() == segment.GetPartitionID() && + + // choose flushed/flushing segments, + // ignore compacting, importing, and, Level Zero segments , + isFlush(segInfo) && + !segInfo.isCompacting && + !segInfo.GetIsImporting() && + segInfo.GetLevel() != datapb.SegmentLevel_L0 + }) + if Params.DataCoordCfg.IndexBasedCompaction.GetAsBool() { + segments = t.SelectIndexedSegments(coll, segments) + } + + if len(segments) == 0 { return } - if !signal.isForce && !t.isCollectionAutoCompactionEnabled(coll) { - log.RatedInfo(20, "collection auto compaction disabled", - zap.Int64("collectionID", collectionID), - ) + ts, err := t.allocTs() + if err != nil { + log.Warn("allocate ts failed, skip to handle compaction", zap.Error(err)) return } ct, err := t.getCompactTime(ts, coll) if err != nil { - log.Warn("get compact time failed, skip to handle compaction", zap.Int64("collectionID", segment.GetCollectionID()), - zap.Int64("partitionID", partitionID), zap.String("channel", channel)) + log.Warn("get compact time failed, skip to handle compaction", zap.Error(err)) return } - plans := t.generatePlans(segments, signal.isForce, ct) + // 3. generate and submit plans + expectedSize := t.getExpectedSegmentSize(coll) + plans := t.generatePlans(segments, false, ct, expectedSize) for _, plan := range plans { if t.compactionHandler.isFull() { - log.Warn("compaction plan skipped due to handler full", zap.Int64("collection", signal.collectionID), zap.Int64("planID", plan.PlanID)) + log.Warn("compaction plan skipped due to handler full", zap.Int64("planID", plan.PlanID)) break } - start := time.Now() - if err := fillOriginPlan(t.allocator, plan); err != nil { - log.Warn("failed to fill plan", zap.Error(err)) - continue - } - if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil { - log.Warn("failed to execute compaction plan", - zap.Int64("collection", signal.collectionID), - zap.Int64("planID", plan.PlanID), - zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs())), - zap.Error(err)) + + segmentIDs := fetchSegIDs(plan.GetSegmentBinlogs()) + + if err := t.submitPlan(signal, plan); err != nil { + log.Warn("failed to submit compaction plan", + zap.Int64("planID", plan.GetPlanID()), + zap.Int64s("segmentIDs", segmentIDs), + zap.Error(err), + ) continue } + log.Info("time cost of generating compaction", zap.Int64("planID", plan.PlanID), zap.Int64("time cost", time.Since(start).Milliseconds()), - zap.Int64("collectionID", signal.collectionID), - zap.String("channel", channel), - zap.Int64("partitionID", partitionID), zap.Int64s("segmentIDs", fetchSegIDs(plan.GetSegmentBinlogs()))) } } -func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, compactTime *compactTime) []*datapb.CompactionPlan { +func (t *compactionTrigger) submitPlan(signal *compactionSignal, plan *datapb.CompactionPlan) error { + if err := fillOriginPlan(t.allocator, plan); err != nil { + return err + } + + if err := t.compactionHandler.execCompactionPlan(signal, plan); err != nil { + return err + } + + return nil +} + +func (t *compactionTrigger) ifChannelCanDoCompaction(channel string, coll *collectionInfo) bool { + if !t.isCollectionAutoCompactionEnabled(coll) { + log.RatedInfo(20, "collection auto compaction disabled", + zap.Int64("collectionID", coll.ID), + ) + return false + } + + if t.compactionHandler.isFull() { + log.Warn("compaction plan skipped due to handler full") + return false + } + + if !t.isChannelCheckpointHealthy(channel) { + log.Warn("compaction plan skipped due to channel checkpoint lag", zap.String("channel", channel)) + return false + } + + return true +} + +func (t *compactionTrigger) generatePlans(segments []*SegmentInfo, force bool, compactTime *compactTime, expectedSize int64) []*datapb.CompactionPlan { if len(segments) == 0 { log.Warn("the number of candidate segments is 0, skip to generate compaction plan") return []*datapb.CompactionPlan{} } - // find segments need internal compaction // TODO add low priority candidates, for example if the segment is smaller than full 0.9 * max segment size but larger than small segment boundary, we only execute compaction when there are no compaction running actively var prioritizedCandidates []*SegmentInfo var smallCandidates []*SegmentInfo var nonPlannedSegments []*SegmentInfo - expectedSize := t.getExpectedSegmentSize(segments[0].CollectionID) - // TODO, currently we lack of the measurement of data distribution, there should be another compaction help on redistributing segment based on scalar/vector field distribution for _, segment := range segments { segment := segment.ShadowClone() @@ -899,3 +956,11 @@ func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, return small } + +func compactionEnabled() bool { + return paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() +} + +func autoCompactionEnabled() bool { + return compactionEnabled() && paramtable.Get().DataCoordCfg.EnableAutoCompaction.GetAsBool() +} diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index a99c10d810ea6..f2153fa330a06 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -18,6 +18,7 @@ package datacoord import ( "context" + "fmt" "sort" satomic "sync/atomic" "testing" @@ -38,6 +39,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/tsoutil" ) @@ -2053,11 +2055,8 @@ func Test_compactionTrigger_getCompactTime(t *testing.T) { } func Test_triggerSingleCompaction(t *testing.T) { - originValue := Params.DataCoordCfg.EnableAutoCompaction.GetValue() - Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "true") - defer func() { - Params.Save(Params.DataCoordCfg.EnableAutoCompaction.Key, originValue) - }() + paramtable.Get().Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "true") + defer paramtable.Get().Reset(Params.DataCoordCfg.EnableAutoCompaction.Key) m := &meta{ channelCPs: newChannelCps(), segments: NewSegmentsInfo(), collections: make(map[UniqueID]*collectionInfo), @@ -2133,6 +2132,7 @@ type CompactionTriggerSuite struct { collectionID int64 partitionID int64 channel string + schema *schemapb.CollectionSchema indexID int64 vecFieldID int64 @@ -2146,14 +2146,13 @@ type CompactionTriggerSuite struct { } func (s *CompactionTriggerSuite) SetupSuite() { - paramtable.Init() } -func (s *CompactionTriggerSuite) genSeg(segID, numRows int64) *datapb.SegmentInfo { +func (s *CompactionTriggerSuite) genSeg(segID, numRows, partitionID int64) *datapb.SegmentInfo { return &datapb.SegmentInfo{ ID: segID, CollectionID: s.collectionID, - PartitionID: s.partitionID, + PartitionID: partitionID, LastExpireTime: 100, NumOfRows: numRows, MaxRowNum: 110, @@ -2191,36 +2190,46 @@ func (s *CompactionTriggerSuite) SetupTest() { s.indexID = 300 s.vecFieldID = 400 s.channel = "dml_0_100v0" + s.schema = &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: s.vecFieldID, + DataType: schemapb.DataType_FloatVector, + }, + }, + } +} + +func (s *CompactionTriggerSuite) SetupSubTest() { catalog := mocks.NewDataCoordCatalog(s.T()) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil) - s.meta = &meta{ channelCPs: newChannelCps(), catalog: catalog, segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ 1: { - SegmentInfo: s.genSeg(1, 60), + SegmentInfo: s.genSeg(1, 60, s.partitionID), lastFlushTime: time.Now().Add(-100 * time.Minute), }, 2: { - SegmentInfo: s.genSeg(2, 60), + SegmentInfo: s.genSeg(2, 60, s.partitionID), lastFlushTime: time.Now(), }, 3: { - SegmentInfo: s.genSeg(3, 60), + SegmentInfo: s.genSeg(3, 60, s.partitionID), lastFlushTime: time.Now(), }, 4: { - SegmentInfo: s.genSeg(4, 60), + SegmentInfo: s.genSeg(4, 60, s.partitionID), lastFlushTime: time.Now(), }, 5: { - SegmentInfo: s.genSeg(5, 26), + SegmentInfo: s.genSeg(5, 26, s.partitionID), lastFlushTime: time.Now(), }, 6: { - SegmentInfo: s.genSeg(6, 26), + SegmentInfo: s.genSeg(6, 26, s.partitionID), lastFlushTime: time.Now(), }, }, @@ -2259,15 +2268,8 @@ func (s *CompactionTriggerSuite) SetupTest() { }, collections: map[int64]*collectionInfo{ s.collectionID: { - ID: s.collectionID, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: s.vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, + ID: s.collectionID, + Schema: s.schema, }, }, } @@ -2288,75 +2290,189 @@ func (s *CompactionTriggerSuite) SetupTest() { s.versionManager, ) s.tr.testingOnly = true + log.Info("===========================================init subtest========================================") } -func (s *CompactionTriggerSuite) TestHandleSignal() { - s.Run("getCompaction_failed", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - // s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) - tr.handleSignal(&compactionSignal{ - segmentID: 1, - collectionID: s.collectionID, - partitionID: s.partitionID, - channel: s.channel, - isForce: false, +func (s *CompactionTriggerSuite) TestEnableCompactionConfigDynamicly() { + tests := []struct { + description string + enableCompaction bool + enableAutoCompaction bool + + triggered bool + }{ + {"both enabled triggered signal", true, true, true}, + {"auto compaction disabled, no signal", true, false, false}, + {"compaction disabled no signal", false, true, false}, + {"all disabled no signal", false, false, false}, + } + enableCompKey := paramtable.Get().DataCoordCfg.EnableCompaction.Key + enableAutoCompKey := paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key + defer paramtable.Get().Reset(enableCompKey) + defer paramtable.Get().Reset(enableAutoCompKey) + + for _, test := range tests { + enableCompV, enableAutoCompV := "false", "false" + if test.enableCompaction { + enableCompV = "true" + } + if test.enableAutoCompaction { + enableAutoCompV = "true" + } + paramtable.Get().Save(enableCompKey, enableCompV) + paramtable.Get().Save(enableAutoCompKey, enableAutoCompV) + + desp := fmt.Sprintf("forceTriggerSingalCompaction %s", test.description) + s.Run(desp, func() { + if test.enableCompaction { + s.Require().True(compactionEnabled()) + + s.allocator.EXPECT().allocID(mock.Anything).Return(19530, nil) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once() + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, + Schema: s.schema, + }, nil).Once() + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil).Times(1) + + compactionID, err := s.tr.forceTriggerCompaction(s.collectionID) + s.NoError(err) + s.True(compactionID > 0) + s.EqualValues(19530, compactionID) + } else { + s.Require().False(compactionEnabled()) + + compactionID, err := s.tr.forceTriggerCompaction(s.collectionID) + s.Error(err) + s.ErrorIs(merr.ErrServiceUnavailable, err) + s.EqualValues(-1, compactionID) + + s.Require().False(autoCompactionEnabled()) + err = s.tr.triggerSingleCompaction(s.collectionID, s.partitionID, 1, s.channel, false) + s.NoError(err) + s.Equal(0, len(s.tr.signals)) + } + }) + + desp = fmt.Sprintf("triggerSingalCompaction %s", test.description) + s.Run(desp, func() { + if test.triggered { + s.Require().True(autoCompactionEnabled()) + s.allocator.EXPECT().allocID(mock.Anything).Return(19530, nil).Once() + err := s.tr.triggerSingleCompaction(s.collectionID, s.partitionID, 1, s.channel, false) + s.NoError(err) + + sig := <-s.tr.signals + s.EqualValues(s.collectionID, sig.collectionID) + s.EqualValues(s.partitionID, sig.partitionID) + s.EqualValues(1, sig.segmentID) + s.EqualValues(s.channel, sig.channel) + s.EqualValues(19530, sig.id) + s.False(sig.isForce) + s.False(sig.isGlobal) + } else { + s.Require().False(autoCompactionEnabled()) + err := s.tr.triggerSingleCompaction(s.collectionID, s.partitionID, 1, s.channel, false) + s.NoError(err) + s.Equal(0, len(s.tr.signals)) + } + }) + + desp = fmt.Sprintf("auto Trigger %s", test.description) + s.Run(desp, func() { + triggerInterval := 1 * time.Millisecond + s.tr.globalTrigger = time.NewTicker(triggerInterval) + s.tr.quit = make(chan struct{}) + s.tr.wg.Add(1) + go s.tr.startGlobalCompactionLoop() + + if test.triggered { + s.allocator.EXPECT().allocID(mock.Anything).Return(19530, nil) + s.Require().True(autoCompactionEnabled()) + sig := <-s.tr.signals + s.True(sig.isGlobal) + s.False(sig.isForce) + } else { + s.Require().False(autoCompactionEnabled()) + time.Sleep(2 * triggerInterval) + s.Equal(0, len(s.tr.signals)) + } + s.tr.stop() }) + } + s.Run("enable then disable compaction", func() { + enableCompKey := paramtable.Get().DataCoordCfg.EnableCompaction.Key + enableAutoCompKey := paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key + paramtable.Get().Save(enableCompKey, "true") + paramtable.Get().Save(enableAutoCompKey, "true") + + triggerInterval := 1 * time.Millisecond + s.tr.globalTrigger = time.NewTicker(triggerInterval) + s.tr.quit = make(chan struct{}) + s.tr.wg.Add(1) + go s.tr.startGlobalCompactionLoop() + + s.allocator.EXPECT().allocID(mock.Anything).Return(10000, nil) + + sig := <-s.tr.signals + s.True(sig.isGlobal) + s.False(sig.isForce) + + paramtable.Get().Save(enableCompKey, "false") + if len(s.tr.signals) > 0 { + // drain all signals from channel + log.Info("drain all signals from channel") + for sig := range s.tr.signals { + s.True(sig.isGlobal) + s.False(sig.isForce) + } + } - // suite shall check compactionHandler.execCompactionPlan never called + // wait for 2 times of the globalTrigger interval + <-time.After(2 * triggerInterval) + s.Equal(0, len(s.tr.signals)) + + s.tr.stop() }) +} - s.Run("collectionAutoCompactionConfigError", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ +func (s *CompactionTriggerSuite) TestCollectionAutoCompaction() { + s.Run("collection autoCompaction config error", func() { + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, Properties: map[string]string{ common.CollectionAutoCompactionKey: "bad_value", }, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: s.vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, nil) - tr.handleSignal(&compactionSignal{ + Schema: s.schema, + }, nil).Times(2) + + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Times(1) + + err := s.tr.handleGlobalSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, partitionID: s.partitionID, channel: s.channel, isForce: false, }) + s.NoError(err) - // suite shall check compactionHandler.execCompactionPlan never called + s.tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) }) + s.Run("collection autoCompaction disabled", func() { + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, + Schema: s.schema, + Properties: map[string]string{common.CollectionAutoCompactionKey: "false"}, + }, nil).Times(2) - s.Run("collectionAutoCompactionDisabled", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ - Properties: map[string]string{ - common.CollectionAutoCompactionKey: "false", - }, - ID: s.collectionID, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: s.vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, nil) - tr.handleSignal(&compactionSignal{ + s.tr.handleSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, partitionID: s.partitionID, @@ -2364,30 +2480,37 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { isForce: false, }) - // suite shall check compactionHandler.execCompactionPlan never called + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Times(1) + + err := s.tr.handleGlobalSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + s.NoError(err) }) + s.Run("force/collection autoCompaction disabled", func() { + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, + Properties: map[string]string{common.CollectionAutoCompactionKey: "false"}, + Schema: s.schema, + }, nil).Times(2) + + // handleSignal is an non force signal handler, will ignore forced signal + s.tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: true, + }) - s.Run("collectionAutoCompactionDisabled_force", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ - Properties: map[string]string{ - common.CollectionAutoCompactionKey: "false", - }, - Schema: &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: s.vecFieldID, - DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, nil) - s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil) - tr.handleSignal(&compactionSignal{ + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Times(1) + s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil).Times(1) + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil).Times(1) + s.tr.handleGlobalSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, partitionID: s.partitionID, @@ -2395,13 +2518,29 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { isForce: true, }) }) +} + +func (s *CompactionTriggerSuite) TestHandleSignal() { + s.Run("getCompaction_failed", func() { + s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")).Once() + s.tr.handleSignal(&compactionSignal{ + segmentID: 1, + collectionID: s.collectionID, + partitionID: s.partitionID, + channel: s.channel, + isForce: false, + }) + }) s.Run("channel_cp_lag_too_large", func() { - defer s.SetupTest() ptKey := paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.Key paramtable.Get().Save(ptKey, "900") defer paramtable.Get().Reset(ptKey) - s.compactionHandler.EXPECT().isFull().Return(false) + + s.compactionHandler.EXPECT().isFull().Return(false).Times(1) + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, + }, nil).Times(1) s.meta.channelCPs.checkpoints[s.channel] = &msgpb.MsgPosition{ ChannelName: s.channel, @@ -2420,136 +2559,107 @@ func (s *CompactionTriggerSuite) TestHandleSignal() { } func (s *CompactionTriggerSuite) TestHandleGlobalSignal() { - schema := &schemapb.CollectionSchema{ - Fields: []*schemapb.FieldSchema{ - { - FieldID: common.StartOfUserFieldID, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", + paramtable.Get().Save(paramtable.Get().DataCoordCfg.AutoUpgradeSegmentIndex.Key, "false") + defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.AutoUpgradeSegmentIndex.Key) + s.Run("2 partitions", func() { + s.meta = &meta{ + channelCPs: newChannelCps(), + catalog: s.meta.catalog, + segments: &SegmentsInfo{ + segments: map[int64]*SegmentInfo{ + 1: { + SegmentInfo: s.genSeg(1, 60, s.partitionID), + lastFlushTime: time.Now(), + }, + 2: { + SegmentInfo: s.genSeg(2, 60, 19530), + lastFlushTime: time.Now(), }, }, }, - { - FieldID: common.StartOfUserFieldID + 1, - DataType: schemapb.DataType_FloatVector, - TypeParams: []*commonpb.KeyValuePair{ - { - Key: common.DimKey, - Value: "128", + indexMeta: &indexMeta{ + segmentIndexes: map[UniqueID]map[UniqueID]*model.SegmentIndex{ + 1: s.genSegIndex(1, indexID, 60), + 2: s.genSegIndex(2, indexID, 60), + }, + indexes: map[UniqueID]map[UniqueID]*model.Index{ + s.collectionID: { + s.indexID: { + CollectionID: s.collectionID, + FieldID: s.vecFieldID, + IndexID: s.indexID, + IndexName: "_default_idx", + IndexParams: []*commonpb.KeyValuePair{ + { + Key: common.IndexTypeKey, + Value: "HNSW", + }, + }, + }, }, }, }, - }, - } - s.Run("getCompaction_failed", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(nil, errors.New("mocked")) - tr.handleGlobalSignal(&compactionSignal{ - segmentID: 1, - collectionID: s.collectionID, - partitionID: s.partitionID, - channel: s.channel, - isForce: false, - }) - - // suite shall check compactionHandler.execCompactionPlan never called - }) - - s.Run("collectionAutoCompactionConfigError", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ - Schema: schema, - Properties: map[string]string{ - common.CollectionAutoCompactionKey: "bad_value", + collections: map[int64]*collectionInfo{ + s.collectionID: { + ID: s.collectionID, + Schema: s.schema, + }, }, - }, nil) - tr.handleGlobalSignal(&compactionSignal{ - segmentID: 1, - collectionID: s.collectionID, - partitionID: s.partitionID, - channel: s.channel, - isForce: false, + } + s.meta.UpdateChannelCheckpoint(s.channel, &msgpb.MsgPosition{ + ChannelName: s.channel, + Timestamp: tsoutil.ComposeTSByTime(time.Now(), 0), + MsgID: []byte{1, 2, 3, 4}, }) - // suite shall check compactionHandler.execCompactionPlan never called - }) - - s.Run("collectionAutoCompactionDisabled", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ - Schema: schema, - Properties: map[string]string{ - common.CollectionAutoCompactionKey: "false", - }, - }, nil) - tr.handleGlobalSignal(&compactionSignal{ - segmentID: 1, + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, + Schema: s.schema, + }, nil).Once() + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Once() + s.compactionHandler.EXPECT().isFull().Return(false).Twice() + s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil).Once() + s.allocator.EXPECT().allocID(mock.Anything).Return(19530, nil).Once() + err := s.tr.handleGlobalSignal(&compactionSignal{ collectionID: s.collectionID, - partitionID: s.partitionID, - channel: s.channel, - isForce: false, + isGlobal: true, }) - - // suite shall check compactionHandler.execCompactionPlan never called + s.NoError(err) }) - - s.Run("collectionAutoCompactionDisabled_force", func() { - defer s.SetupTest() - tr := s.tr - s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) - s.handler.EXPECT().GetCollection(mock.Anything, int64(100)).Return(&collectionInfo{ - Schema: schema, - Properties: map[string]string{ - common.CollectionAutoCompactionKey: "false", - }, - }, nil) - s.compactionHandler.EXPECT().execCompactionPlan(mock.Anything, mock.Anything).Return(nil) - tr.handleGlobalSignal(&compactionSignal{ + s.Run("getCompaction_failed", func() { + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(nil, errors.New("mocked")) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Once() + s.tr.handleGlobalSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, partitionID: s.partitionID, channel: s.channel, - isForce: true, + isForce: false, }) }) s.Run("channel_cp_lag_too_large", func() { - defer s.SetupTest() ptKey := paramtable.Get().DataCoordCfg.ChannelCheckpointMaxLag.Key paramtable.Get().Save(ptKey, "900") defer paramtable.Get().Reset(ptKey) s.compactionHandler.EXPECT().isFull().Return(false) - s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil) - s.allocator.EXPECT().allocID(mock.Anything).Return(20000, nil) + s.handler.EXPECT().GetCollection(mock.Anything, s.collectionID).Return(&collectionInfo{ + ID: s.collectionID, + }, nil).Times(1) + s.allocator.EXPECT().allocTimestamp(mock.Anything).Return(10000, nil).Once() s.meta.channelCPs.checkpoints[s.channel] = &msgpb.MsgPosition{ ChannelName: s.channel, Timestamp: tsoutil.ComposeTSByTime(time.Now().Add(time.Second*-901), 0), MsgID: []byte{1, 2, 3, 4}, } - tr := s.tr - tr.handleGlobalSignal(&compactionSignal{ + s.tr.handleGlobalSignal(&compactionSignal{ segmentID: 1, collectionID: s.collectionID, partitionID: s.partitionID, channel: s.channel, - isForce: false, }) }) } diff --git a/internal/datacoord/compaction_trigger_v2.go b/internal/datacoord/compaction_trigger_v2.go index 1ba9c1d9ef4aa..55b0890c23c2e 100644 --- a/internal/datacoord/compaction_trigger_v2.go +++ b/internal/datacoord/compaction_trigger_v2.go @@ -68,10 +68,10 @@ func (m *CompactionTriggerManager) Notify(taskID UniqueID, eventType CompactionT } case TriggerTypeLevelZeroViewIDLE: - log.Debug("Start to trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") + log.Debug("Start to trigger a level zero compaction by TriggerTypeLevelZeroViewIDLE") outView, reason := view.Trigger() if outView == nil { - log.Info("Start to force trigger a level zero compaction by TriggerTypLevelZeroViewIDLE") + log.Info("Start to force trigger a level zero compaction by TriggerTypeLevelZeroViewIDLE") outView, reason = view.ForceTrigger() } diff --git a/internal/datacoord/compaction_view_manager.go b/internal/datacoord/compaction_view_manager.go index 373044f77e230..bffea7fc6ca16 100644 --- a/internal/datacoord/compaction_view_manager.go +++ b/internal/datacoord/compaction_view_manager.go @@ -12,6 +12,7 @@ import ( "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/typeutil" ) @@ -53,16 +54,12 @@ func (m *CompactionViewManager) checkLoop() { defer logutil.LogPanic() defer m.closeWg.Done() - if !Params.DataCoordCfg.EnableAutoCompaction.GetAsBool() { - return - } - - // TODO: Only process L0 compaction now, so just return if its not enabled - if !Params.DataCoordCfg.EnableLevelZeroSegment.GetAsBool() { - return + // TODO: Only process L0 compaction now + L0SegmentEnabled := func() bool { + return autoCompactionEnabled() && paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.GetAsBool() } - interval := Params.DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second) + interval := paramtable.Get().DataCoordCfg.GlobalCompactionInterval.GetAsDuration(time.Second) checkTicker := time.NewTicker(interval) defer checkTicker.Stop() @@ -88,16 +85,19 @@ func (m *CompactionViewManager) checkLoop() { log.Info("Compaction View checkLoop quit") return case <-checkTicker.C: - refreshViewsAndTrigger(context.Background()) + if L0SegmentEnabled() { + refreshViewsAndTrigger(context.Background()) + } case <-idleTicker.C: - // idelTicker will be reset everytime when Check's able to - // generates compaction events - - // if no views are freshed, try to get cached views and trigger a - // TriggerTypeViewIDLE event - if !refreshViewsAndTrigger(context.Background()) { - m.triggerEventForIDLEView() + if L0SegmentEnabled() { + // idelTicker will be reset everytime when Check valid generates compaction events + // + // if no views are freshed, try to get cached views and trigger a + // TriggerTypeViewIDLE event + if !refreshViewsAndTrigger(context.Background()) { + m.triggerEventForIDLEView() + } } } } diff --git a/internal/datacoord/compaction_view_manager_test.go b/internal/datacoord/compaction_view_manager_test.go index d1712e6d9ea7d..49c0ef28519a5 100644 --- a/internal/datacoord/compaction_view_manager_test.go +++ b/internal/datacoord/compaction_view_manager_test.go @@ -71,6 +71,10 @@ func genSegmentsForMeta(label *CompactionGroupLabel) map[int64]*SegmentInfo { } func (s *CompactionViewManagerSuite) SetupTest() { + s.SetupSubTest() +} + +func (s *CompactionViewManagerSuite) SetupSubTest() { s.mockAlloc = NewNMockAllocator(s.T()) s.mockTriggerManager = NewMockTriggerManager(s.T()) @@ -95,20 +99,36 @@ func (s *CompactionViewManagerSuite) TestCheckLoop() { s.m.Close() }) - s.Run("Test not enable auto compaction", func() { - paramtable.Get().Save(Params.DataCoordCfg.EnableAutoCompaction.Key, "false") - defer paramtable.Get().Reset(Params.DataCoordCfg.EnableAutoCompaction.Key) - + s.Run("Test not enable compaction", func() { + var ( + globalIntervalK = paramtable.Get().DataCoordCfg.GlobalCompactionInterval.Key + enableCompK = paramtable.Get().DataCoordCfg.EnableCompaction.Key + enableAutoCompK = paramtable.Get().DataCoordCfg.EnableAutoCompaction.Key + enableL0K = paramtable.Get().DataCoordCfg.EnableLevelZeroSegment.Key + ) + defer paramtable.Get().Reset(globalIntervalK) + defer paramtable.Get().Reset(enableCompK) + defer paramtable.Get().Reset(enableAutoCompK) + defer paramtable.Get().Reset(enableL0K) + + paramtable.Get().Save(globalIntervalK, "0.01") + paramtable.Get().Save(enableCompK, "false") + paramtable.Get().Save(enableAutoCompK, "true") + + // no mocks needed for no events will be triggerred s.m.Start() - s.m.closeWg.Wait() - }) - s.Run("Test not enable levelZero segment", func() { - paramtable.Get().Save(Params.DataCoordCfg.EnableLevelZeroSegment.Key, "false") - defer paramtable.Get().Reset(Params.DataCoordCfg.EnableLevelZeroSegment.Key) + waitDur := 100 * time.Millisecond + <-time.After(waitDur) + paramtable.Get().Save(enableAutoCompK, "false") + paramtable.Get().Save(enableCompK, "true") - s.m.Start() - s.m.closeWg.Wait() + <-time.After(waitDur) + paramtable.Get().Save(enableL0K, "false") + paramtable.Get().Save(enableAutoCompK, "true") + + <-time.After(waitDur) + s.m.Close() }) } diff --git a/internal/datacoord/index_service.go b/internal/datacoord/index_service.go index e4816ae3341c1..6d9efa325c513 100644 --- a/internal/datacoord/index_service.go +++ b/internal/datacoord/index_service.go @@ -49,8 +49,6 @@ func (s *Server) serverID() int64 { func (s *Server) startIndexService(ctx context.Context) { s.indexBuilder.Start() - - s.serverLoopWg.Add(1) go s.createIndexForSegmentLoop(ctx) } diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index 91d4664077e90..c1f4dc1ab6664 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -230,19 +230,17 @@ func (m *meta) GetSegmentsChanPart(selector SegmentInfoSelector) []*chanPartSegm continue } - cloned := segmentInfo.Clone() - - dim := fmt.Sprintf("%d-%s", cloned.PartitionID, cloned.InsertChannel) + dim := fmt.Sprintf("%d-%s", segmentInfo.PartitionID, segmentInfo.InsertChannel) entry, ok := mDimEntry[dim] if !ok { entry = &chanPartSegments{ - collectionID: cloned.CollectionID, - partitionID: cloned.PartitionID, - channelName: cloned.InsertChannel, + collectionID: segmentInfo.CollectionID, + partitionID: segmentInfo.PartitionID, + channelName: segmentInfo.InsertChannel, } mDimEntry[dim] = entry } - entry.segments = append(entry.segments, cloned) + entry.segments = append(entry.segments, segmentInfo) } result := make([]*chanPartSegments, 0, len(mDimEntry)) diff --git a/internal/datacoord/mock_compaction_plan_context.go b/internal/datacoord/mock_compaction_plan_context.go index b22041fb7f169..8238f3cb58d78 100644 --- a/internal/datacoord/mock_compaction_plan_context.go +++ b/internal/datacoord/mock_compaction_plan_context.go @@ -289,48 +289,6 @@ func (_c *MockCompactionPlanContext_stop_Call) RunAndReturn(run func()) *MockCom return _c } -// updateCompaction provides a mock function with given fields: ts -func (_m *MockCompactionPlanContext) updateCompaction(ts uint64) error { - ret := _m.Called(ts) - - var r0 error - if rf, ok := ret.Get(0).(func(uint64) error); ok { - r0 = rf(ts) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// MockCompactionPlanContext_updateCompaction_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'updateCompaction' -type MockCompactionPlanContext_updateCompaction_Call struct { - *mock.Call -} - -// updateCompaction is a helper method to define mock.On call -// - ts uint64 -func (_e *MockCompactionPlanContext_Expecter) updateCompaction(ts interface{}) *MockCompactionPlanContext_updateCompaction_Call { - return &MockCompactionPlanContext_updateCompaction_Call{Call: _e.mock.On("updateCompaction", ts)} -} - -func (_c *MockCompactionPlanContext_updateCompaction_Call) Run(run func(ts uint64)) *MockCompactionPlanContext_updateCompaction_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(uint64)) - }) - return _c -} - -func (_c *MockCompactionPlanContext_updateCompaction_Call) Return(_a0 error) *MockCompactionPlanContext_updateCompaction_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockCompactionPlanContext_updateCompaction_Call) RunAndReturn(run func(uint64) error) *MockCompactionPlanContext_updateCompaction_Call { - _c.Call.Return(run) - return _c -} - // NewMockCompactionPlanContext creates a new instance of MockCompactionPlanContext. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockCompactionPlanContext(t interface { diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index abe346ceee8c2..eac2a971766c8 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -358,7 +358,7 @@ func (s *Server) initDataCoord() error { s.handler = newServerHandler(s) // check whether old node exist, if yes suspend auto balance until all old nodes down - s.updateBalanceConfigLoop(s.ctx) + s.updateBalanceConfigLoop(s.serverLoopCtx) if err = s.initCluster(); err != nil { return err @@ -372,11 +372,8 @@ func (s *Server) initDataCoord() error { } log.Info("init service discovery done") - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.createCompactionHandler() - s.createCompactionTrigger() - log.Info("init compaction scheduler done") - } + s.initCompaction() + log.Info("init compaction done") if err = s.initSegmentManager(); err != nil { return err @@ -416,11 +413,6 @@ func (s *Server) Start() error { } func (s *Server) startDataCoord() { - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.compactionHandler.start() - s.compactionTrigger.start() - s.compactionViewManager.Start() - } s.startServerLoop() // http.Register(&http.Handler{ @@ -522,23 +514,32 @@ func (s *Server) SetIndexNodeCreator(f func(context.Context, string, int64) (typ s.indexNodeCreator = f } -func (s *Server) createCompactionHandler() { +func (s *Server) initCompaction() { s.compactionHandler = newCompactionPlanHandler(s.sessionManager, s.channelManager, s.meta, s.allocator) + s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) + triggerv2 := NewCompactionTriggerManager(s.allocator, s.compactionHandler) s.compactionViewManager = NewCompactionViewManager(s.meta, triggerv2, s.allocator) } -func (s *Server) stopCompactionHandler() { - s.compactionHandler.stop() - s.compactionViewManager.Close() +func (s *Server) startCompaction() { + s.compactionHandler.start() + s.compactionTrigger.start() + s.compactionViewManager.Start() } -func (s *Server) createCompactionTrigger() { - s.compactionTrigger = newCompactionTrigger(s.meta, s.compactionHandler, s.allocator, s.handler, s.indexEngineVersionManager) -} +func (s *Server) stopCompaction() { + if s.compactionHandler != nil { + s.compactionHandler.stop() + } -func (s *Server) stopCompactionTrigger() { - s.compactionTrigger.stop() + if s.compactionViewManager != nil { + s.compactionViewManager.Close() + } + + if s.compactionTrigger != nil { + s.compactionTrigger.stop() + } } func (s *Server) newChunkManagerFactory() (storage.ChunkManager, error) { @@ -692,15 +693,18 @@ func (s *Server) initIndexNodeManager() { } func (s *Server) startServerLoop() { + s.startCompaction() + if !Params.DataNodeCfg.DataNodeTimeTickByRPC.GetAsBool() { s.serverLoopWg.Add(1) s.startDataNodeTtLoop(s.serverLoopCtx) } - s.serverLoopWg.Add(2) + s.serverLoopWg.Add(3) s.startWatchService(s.serverLoopCtx) s.startFlushLoop(s.serverLoopCtx) s.startIndexService(s.serverLoopCtx) + go s.importScheduler.Start() go s.importChecker.Start() s.garbageCollector.start() @@ -1090,15 +1094,10 @@ func (s *Server) Stop() error { s.garbageCollector.close() logutil.Logger(s.ctx).Info("datacoord garbage collector stopped") - s.stopServerLoop() - s.importScheduler.Close() s.importChecker.Close() - if Params.DataCoordCfg.EnableCompaction.GetAsBool() { - s.stopCompactionTrigger() - s.stopCompactionHandler() - } + s.stopCompaction() logutil.Logger(s.ctx).Info("datacoord compaction stopped") s.indexBuilder.Stop() diff --git a/internal/datacoord/server_test.go b/internal/datacoord/server_test.go index 349c49861775e..e2056a95bdec1 100644 --- a/internal/datacoord/server_test.go +++ b/internal/datacoord/server_test.go @@ -3080,7 +3080,7 @@ func newTestServer(t *testing.T, opts ...Option) *Server { <-signal err = svr.Start() assert.NoError(t, err) - assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode)) + require.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode)) return svr } diff --git a/internal/datacoord/services.go b/internal/datacoord/services.go index 22e4a2d585c3c..8be09c01bbdcf 100644 --- a/internal/datacoord/services.go +++ b/internal/datacoord/services.go @@ -547,12 +547,12 @@ func (s *Server) SaveBinlogPaths(ctx context.Context, req *datapb.SaveBinlogPath // notify building index s.flushCh <- req.SegmentID - // notify compaction - if paramtable.Get().DataCoordCfg.EnableCompaction.GetAsBool() { + if !req.Importing { + // notify compaction err := s.compactionTrigger.triggerSingleCompaction(req.GetCollectionID(), req.GetPartitionID(), req.GetSegmentID(), req.GetChannel(), false) if err != nil { - log.Warn("failed to trigger single compaction") + log.Warn("failed to trigger single compaction", zap.Error(err)) } } } @@ -618,6 +618,7 @@ func (s *Server) DropVirtualChannel(ctx context.Context, req *datapb.DropVirtual log.Warn("DropVChannel failed to ReleaseAndRemove", zap.String("channel", channel), zap.Error(err)) } s.segmentManager.DropSegmentsOfChannel(ctx, channel) + s.compactionHandler.removeTasksByChannel(channel) metrics.CleanupDataCoordNumStoredRows(collectionID) @@ -1084,11 +1085,6 @@ func (s *Server) ManualCompaction(ctx context.Context, req *milvuspb.ManualCompa }, nil } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { - resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) - return resp, nil - } - id, err := s.compactionTrigger.forceTriggerCompaction(req.CollectionID) if err != nil { log.Error("failed to trigger manual compaction", zap.Error(err)) @@ -1125,7 +1121,7 @@ func (s *Server) GetCompactionState(ctx context.Context, req *milvuspb.GetCompac }, nil } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { + if !compactionEnabled() { resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } @@ -1165,7 +1161,8 @@ func (s *Server) GetCompactionStateWithPlans(ctx context.Context, req *milvuspb. resp := &milvuspb.GetCompactionPlansResponse{ Status: merr.Success(), } - if !Params.DataCoordCfg.EnableCompaction.GetAsBool() { + + if !compactionEnabled() { resp.Status = merr.Status(merr.WrapErrServiceUnavailable("compaction disabled")) return resp, nil } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 3a743694ccffb..25432f77759ea 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2670,7 +2670,7 @@ type dataCoordConfig struct { AutoUpgradeSegmentIndex ParamItem `refreshable:"true"` // compaction - EnableCompaction ParamItem `refreshable:"false"` + EnableCompaction ParamItem `refreshable:"true"` EnableAutoCompaction ParamItem `refreshable:"true"` IndexBasedCompaction ParamItem `refreshable:"true"`