Skip to content

Commit

Permalink
enhance: Add back compactionTaskNum metrics (milvus-io#34583)
Browse files Browse the repository at this point in the history
Fix L0 compaction task recover unable to set segment not isCompacting

See also: milvus-io#34460

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Jul 11, 2024
1 parent fd3da90 commit d7a3697
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 108 deletions.
111 changes: 65 additions & 46 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/lock"
"github.com/milvus-io/milvus/pkg/util/merr"
Expand Down Expand Up @@ -70,10 +71,10 @@ type compactionInfo struct {
}

type compactionPlanHandler struct {
mu lock.RWMutex
queueGuard lock.RWMutex
queueTasks map[int64]CompactionTask // planID -> task

executingMu lock.RWMutex
executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task

meta CompactionMeta
Expand Down Expand Up @@ -157,21 +158,21 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {

func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int {
cnt := 0
c.mu.RLock()
c.queueGuard.RLock()
for _, t := range c.queueTasks {
if t.GetTriggerID() == triggerID {
cnt += 1
}
// if t.GetPlanID()
}
c.mu.RUnlock()
c.executingMu.RLock()
c.queueGuard.RUnlock()
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.GetTriggerID() == triggerID {
cnt += 1
}
}
c.executingMu.RUnlock()
c.executingGuard.RUnlock()
return cnt
}

Expand All @@ -193,20 +194,20 @@ func newCompactionPlanHandler(cluster Cluster, sessions SessionManager, cm Chann
}

func (c *compactionPlanHandler) schedule() []CompactionTask {
c.mu.RLock()
c.queueGuard.RLock()
if len(c.queueTasks) == 0 {
c.mu.RUnlock()
c.queueGuard.RUnlock()
return nil
}
c.mu.RUnlock()
c.queueGuard.RUnlock()

l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
clusterChannelExcludes := typeutil.NewSet[string]()
mixLabelExcludes := typeutil.NewSet[string]()
clusterLabelExcludes := typeutil.NewSet[string]()

c.executingMu.RLock()
c.executingGuard.RLock()
for _, t := range c.executingTasks {
switch t.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
Expand All @@ -219,11 +220,11 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
clusterLabelExcludes.Insert(t.GetLabel())
}
}
c.executingMu.RUnlock()
c.executingGuard.RUnlock()

var picked []CompactionTask
c.mu.RLock()
defer c.mu.RUnlock()
c.queueGuard.RLock()
defer c.queueGuard.RUnlock()
keys := lo.Keys(c.queueTasks)
sort.SliceStable(keys, func(i, j int) bool {
return keys[i] < keys[j]
Expand Down Expand Up @@ -268,8 +269,8 @@ func (c *compactionPlanHandler) start() {
}

func (c *compactionPlanHandler) loadMeta() {
// todo: make it compatible to all types of compaction with persist meta
triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks()
// TODO: make it compatible to all types of compaction with persist meta
triggers := c.meta.GetCompactionTasks()
for _, tasks := range triggers {
for _, task := range tasks {
state := task.GetState()
Expand All @@ -278,14 +279,19 @@ func (c *compactionPlanHandler) loadMeta() {
state == datapb.CompactionTaskState_unknown {
log.Info("compactionPlanHandler loadMeta abandon compactionTask",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()))
continue
} else {
// TODO: how to deal with the create failed tasks, leave it in meta forever?
t, err := c.createCompactTask(task)
if err != nil {
log.Warn("compactionPlanHandler loadMeta create compactionTask failed",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
zap.String("type", task.GetType().String()),
zap.String("state", task.GetState().String()),
zap.Error(err),
)
continue
}
if t.NeedReAssignNodeID() {
Expand All @@ -294,13 +300,15 @@ func (c *compactionPlanHandler) loadMeta() {
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("type", task.GetType().String()),
zap.String("state", t.GetState().String()))
} else {
c.restoreTask(t)
log.Info("compactionPlanHandler loadMeta restoreTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("type", task.GetType().String()),
zap.String("state", t.GetState().String()))
}
}
Expand All @@ -311,17 +319,20 @@ func (c *compactionPlanHandler) loadMeta() {
func (c *compactionPlanHandler) doSchedule() {
picked := c.schedule()
if len(picked) > 0 {
c.executingMu.Lock()
c.executingGuard.Lock()
for _, t := range picked {
c.executingTasks[t.GetPlanID()] = t
}
c.executingMu.Unlock()
c.executingGuard.Unlock()

c.mu.Lock()
c.queueGuard.Lock()
for _, t := range picked {
delete(c.queueTasks, t.GetPlanID())
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
}
c.mu.Unlock()
c.queueGuard.Unlock()

}
}

Expand Down Expand Up @@ -394,7 +405,7 @@ func (c *compactionPlanHandler) cleanCompactionTaskMeta() {
// try best to delete meta
err := c.meta.DropCompactionTask(task)
if err != nil {
log.Warn("fail to drop task", zap.Int64("taskPlanID", task.PlanID), zap.Error(err))
log.Warn("fail to drop task", zap.Int64("planID", task.PlanID), zap.Error(err))
}
}
}
Expand Down Expand Up @@ -460,7 +471,7 @@ func (c *compactionPlanHandler) stop() {
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.mu.Lock()
c.queueGuard.Lock()
for id, task := range c.queueTasks {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel()))
Expand All @@ -472,13 +483,14 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
)
delete(c.queueTasks, id)
c.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Pending).Dec()
}
}
c.mu.Unlock()
c.executingMu.Lock()
c.queueGuard.Unlock()
c.executingGuard.Lock()
for id, task := range c.executingTasks {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel()))
zap.String("channel", channel), zap.Int64("planID", id), zap.Any("task_channel", task.GetChannel()))
if task.GetChannel() == channel {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
Expand All @@ -487,46 +499,49 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
)
delete(c.executingTasks, id)
c.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Executing).Dec()
}
}
c.executingMu.Unlock()
c.executingGuard.Unlock()
}

func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.mu.Lock()
c.queueGuard.Lock()
c.queueTasks[t.GetPlanID()] = t
c.mu.Unlock()
c.queueGuard.Unlock()
c.taskNumber.Add(1)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
}

// restoreTask used to restore Task from etcd
func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.executingMu.Lock()
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingMu.Unlock()
c.executingGuard.Unlock()
c.taskNumber.Add(1)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}

// getCompactionTask return compaction
func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask {
c.mu.RLock()
c.queueGuard.RLock()
t, ok := c.queueTasks[planID]
if ok {
c.mu.RUnlock()
c.queueGuard.RUnlock()
return t
}
c.mu.RUnlock()
c.executingMu.RLock()
c.queueGuard.RUnlock()
c.executingGuard.RLock()
t, ok = c.executingTasks[planID]
if ok {
c.executingMu.RUnlock()
c.executingGuard.RUnlock()
return t
}
c.executingMu.RUnlock()
c.executingGuard.RUnlock()
return t
}

Expand Down Expand Up @@ -604,6 +619,8 @@ func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
} else {
log.Info("compactionHandler assignNodeID success",
zap.Int64("planID", t.GetPlanID()), zap.String("vchannel", t.GetChannel()), zap.Any("nodeID", nodeID))
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}
}
}
Expand All @@ -613,34 +630,36 @@ func (c *compactionPlanHandler) checkCompaction() error {
// for DC might add new task while GetCompactionState.

var needAssignIDTasks []CompactionTask
c.executingMu.RLock()
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.NeedReAssignNodeID() {
needAssignIDTasks = append(needAssignIDTasks, t)
}
}
c.executingMu.RUnlock()
c.executingGuard.RUnlock()
if len(needAssignIDTasks) > 0 {
c.assignNodeIDs(needAssignIDTasks)
}

var finishedTasks []CompactionTask
c.executingMu.RLock()
c.executingGuard.RLock()
for _, t := range c.executingTasks {
finished := t.Process()
if finished {
finishedTasks = append(finishedTasks, t)
}
}
c.executingMu.RUnlock()
c.executingGuard.RUnlock()

// delete all finished
c.executingMu.Lock()
c.executingGuard.Lock()
for _, t := range finishedTasks {
delete(c.executingTasks, t.GetPlanID())
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc()
}
c.executingMu.Unlock()
c.taskNumber.Add(-int32(len(finishedTasks)))
c.executingGuard.Unlock()
c.taskNumber.Sub(int32(len(finishedTasks)))
return nil
}

Expand Down Expand Up @@ -681,8 +700,8 @@ func (c *compactionPlanHandler) getTaskCount() int {
}

func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState) []CompactionTask {
c.mu.RLock()
defer c.mu.RUnlock()
c.queueGuard.RLock()
defer c.queueGuard.RUnlock()
tasks := make([]CompactionTask, 0, len(c.queueTasks))
for _, t := range c.queueTasks {
if t.GetState() == state {
Expand Down
18 changes: 8 additions & 10 deletions internal/datacoord/compaction_policy_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ type l0CompactionPolicy struct {
emptyLoopCount *atomic.Int64
}

func newL0CompactionPolicy(meta *meta, view *FullViews) *l0CompactionPolicy {
func newL0CompactionPolicy(meta *meta) *l0CompactionPolicy {
return &l0CompactionPolicy{
meta: meta,
view: view,
meta: meta,
// donot share views with other compaction policy
view: &FullViews{collections: make(map[int64][]*SegmentView)},
emptyLoopCount: atomic.NewInt64(0),
}
}
Expand All @@ -39,13 +40,11 @@ func (policy *l0CompactionPolicy) Trigger() (map[CompactionTriggerType][]Compact
policy.emptyLoopCount.Inc()

if policy.emptyLoopCount.Load() >= 3 {
idleEvents := policy.generateEventForLevelZeroViewIDLE()
if len(idleEvents) > 0 {
policy.emptyLoopCount.Store(0)
}
return idleEvents, nil
policy.emptyLoopCount.Store(0)
return policy.generateEventForLevelZeroViewIDLE(), nil
}
return make(map[CompactionTriggerType][]CompactionView, 0), nil

return make(map[CompactionTriggerType][]CompactionView), nil
}

func (policy *l0CompactionPolicy) generateEventForLevelZeroViewChange() (events map[CompactionTriggerType][]CompactionView) {
Expand Down Expand Up @@ -73,7 +72,6 @@ func (policy *l0CompactionPolicy) RefreshLevelZeroViews(latestCollSegs map[int64
levelZeroSegments := lo.Filter(segments, func(info *SegmentInfo, _ int) bool {
return info.GetLevel() == datapb.SegmentLevel_L0
})

latestL0Segments := GetViewsByInfo(levelZeroSegments...)
needRefresh, collRefreshedViews := policy.getChangedLevelZeroViews(collID, latestL0Segments)
if needRefresh {
Expand Down
6 changes: 1 addition & 5 deletions internal/datacoord/compaction_policy_l0_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,7 @@ func (s *L0CompactionPolicySuite) SetupTest() {
meta.segments.SetSegment(id, segment)
}

views := &FullViews{
collections: make(map[int64][]*SegmentView),
}

s.l0_policy = newL0CompactionPolicy(meta, views)
s.l0_policy = newL0CompactionPolicy(meta)
}

func genTestSegmentInfo(label *CompactionGroupLabel, ID UniqueID, level datapb.SegmentLevel, state commonpb.SegmentState) *SegmentInfo {
Expand Down
6 changes: 1 addition & 5 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,11 +333,7 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
}

func (t *clusteringCompactionTask) resetSegmentCompacting() {
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
t.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Expand Down
Loading

0 comments on commit d7a3697

Please sign in to comment.