Skip to content

Commit

Permalink
enhance: Refine compaction (#33982) (#34363)
Browse files Browse the repository at this point in the history
This PR cherry-picks the following commits related to data compaction:
- enhance: Refine compaction.
[#33982](#33982)
- fix l0 compaction may miss some sealed segments.
[#33838](#33980)

issue : #32939
#33955

pr : #33982

Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
  • Loading branch information
czs007 committed Jul 3, 2024
1 parent fb88267 commit 760b3fa
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 312 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ run:
- scripts
- internal/core
- cmake_build
- mmap
- data
- ci
skip-files:
- partial_search_test.go

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ require (
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/valyala/fastjson v1.6.4
google.golang.org/protobuf v1.33.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down Expand Up @@ -234,7 +235,6 @@ require (
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230629202037-9506855d4529 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240401170217-c3f982113cda // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
Expand All @@ -248,6 +248,7 @@ replace (
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/expr-lang/expr => github.com/SimFG/expr v0.0.0-20231218130003-94d085776dc5
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
github.com/greatroar/blobloom => github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93
// github.com/milvus-io/milvus-storage/go => ../milvus-storage/go
github.com/milvus-io/milvus/pkg => ./pkg
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
Expand Down
90 changes: 53 additions & 37 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,37 @@ func (c *compactionPlanHandler) loadMeta() {
triggers := c.meta.(*meta).compactionTaskMeta.GetCompactionTasks()
for _, tasks := range triggers {
for _, task := range tasks {
if task.State != datapb.CompactionTaskState_completed && task.State != datapb.CompactionTaskState_cleaned {
c.enqueueCompaction(task)
state := task.GetState()
if state == datapb.CompactionTaskState_completed ||
state == datapb.CompactionTaskState_cleaned ||
state == datapb.CompactionTaskState_unknown {
log.Info("compactionPlanHandler loadMeta abandon compactionTask",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
continue
} else {
t, err := c.createCompactTask(task)
if err != nil {
log.Warn("compactionPlanHandler loadMeta create compactionTask failed",
zap.Int64("planID", task.GetPlanID()),
zap.String("State", task.GetState().String()))
continue
}
if t.NeedReAssignNodeID() {
c.submitTask(t)
log.Info("compactionPlanHandler loadMeta submitTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("state", t.GetState().String()))
} else {
c.restoreTask(t)
log.Info("compactionPlanHandler loadMeta restoreTask",
zap.Int64("planID", t.GetPlanID()),
zap.Int64("triggerID", t.GetTriggerID()),
zap.Int64("collectionID", t.GetCollectionID()),
zap.String("state", t.GetState().String()))
}
}
}
}
Expand Down Expand Up @@ -467,6 +496,8 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
}

func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.mu.Lock()
c.queueTasks[t.GetPlanID()] = t
c.mu.Unlock()
Expand All @@ -475,6 +506,8 @@ func (c *compactionPlanHandler) submitTask(t CompactionTask) {

// restoreTask used to restore Task from etcd
func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.executingMu.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingMu.Unlock()
Expand Down Expand Up @@ -505,38 +538,23 @@ func (c *compactionPlanHandler) enqueueCompaction(task *datapb.CompactionTask) e
if c.isFull() {
return errCompactionBusy
}
// TODO change to set this on scheduling task
exist, succeed := c.checkAndSetSegmentsCompacting(task)
if !exist {
return merr.WrapErrIllegalCompactionPlan("segment not exist")
}
if !succeed {
return merr.WrapErrCompactionPlanConflict("segment is compacting")
}

// TODO change to set this on scheduling task
t := c.createCompactTask(task)
if t == nil {
return merr.WrapErrIllegalCompactionPlan("illegal compaction type")
t, err := c.createCompactTask(task)
if err != nil {
return err
}
if task.StartTime != 0 {
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err := t.SaveTaskMeta()
if err != nil {
return err
}
t.SetTask(t.ShadowClone(setStartTime(time.Now().Unix())))
err = t.SaveTaskMeta()
if err != nil {
c.meta.SetSegmentsCompacting(t.GetInputSegments(), false)
return err
}

_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", task.GetType()))
t.SetSpan(span)

c.submitTask(t)
log.Info("Compaction plan submitted")
return nil
}

// set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) CompactionTask {
func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) (CompactionTask, error) {
var task CompactionTask
switch t.GetType() {
case datapb.CompactionType_MixCompaction:
Expand All @@ -559,19 +577,17 @@ func (c *compactionPlanHandler) createCompactTask(t *datapb.CompactionTask) Comp
handler: c.handler,
analyzeScheduler: c.analyzeScheduler,
}
default:
return nil, merr.WrapErrIllegalCompactionPlan("illegal compaction type")
}
return task
}

// set segments compacting, one segment can only participate one compactionTask
func (c *compactionPlanHandler) setSegmentsCompacting(task CompactionTask, compacting bool) {
for _, segmentID := range task.GetInputSegments() {
c.meta.SetSegmentCompacting(segmentID, compacting)
exist, succeed := c.meta.CheckAndSetSegmentsCompacting(t.GetInputSegments())
if !exist {
return nil, merr.WrapErrIllegalCompactionPlan("segment not exist")
}
}

func (c *compactionPlanHandler) checkAndSetSegmentsCompacting(task *datapb.CompactionTask) (bool, bool) {
return c.meta.CheckAndSetSegmentsCompacting(task.GetInputSegments())
if !succeed {
return nil, merr.WrapErrCompactionPlanConflict("segment is compacting")
}
return task, nil
}

func (c *compactionPlanHandler) assignNodeIDs(tasks []CompactionTask) {
Expand Down
12 changes: 7 additions & 5 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// todo reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil
}
return err
Expand All @@ -230,7 +230,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
return segment.GetSegmentID()
})

_, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result)
_, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil {
return err
}
Expand Down Expand Up @@ -334,9 +334,11 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
}

func (t *clusteringCompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
}

func (t *clusteringCompactionTask) processFailedOrTimeout() error {
Expand Down Expand Up @@ -412,7 +414,7 @@ func (t *clusteringCompactionTask) doCompact() error {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return err
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
Expand Down
14 changes: 8 additions & 6 deletions internal/datacoord/compaction_task_l0.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (t *l0CompactionTask) processPipelining() bool {
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
Expand All @@ -85,7 +85,7 @@ func (t *l0CompactionTask) processExecuting() bool {
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return false
}
Expand Down Expand Up @@ -186,7 +186,7 @@ func (t *l0CompactionTask) SetStartTime(startTime int64) {
}

func (t *l0CompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID
}

func (t *l0CompactionTask) SetResult(result *datapb.CompactionPlanResult) {
Expand Down Expand Up @@ -250,7 +250,7 @@ func (t *l0CompactionTask) BuildCompactionRequest() (*datapb.CompactionPlan, err
//!info.isCompacting &&
!info.GetIsImporting() &&
info.GetLevel() != datapb.SegmentLevel_L0 &&
info.GetDmlPosition().GetTimestamp() < t.GetPos().GetTimestamp()
info.GetStartPosition().GetTimestamp() < t.GetPos().GetTimestamp()
}))

if len(sealedSegments) == 0 {
Expand Down Expand Up @@ -307,9 +307,11 @@ func (t *l0CompactionTask) processCompleted() bool {
}

func (t *l0CompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
}

func (t *l0CompactionTask) processTimeout() bool {
Expand Down
36 changes: 23 additions & 13 deletions internal/datacoord/compaction_task_mix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,15 @@ func (t *mixCompactionTask) processPipelining() bool {
}
var err error
t.plan, err = t.BuildCompactionRequest()
// Segment not found
if err != nil {
err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return err2 == nil
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
log.Warn("Failed to notify compaction tasks to DataNode", zap.Error(err))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return false
}
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
Expand All @@ -59,7 +60,7 @@ func (t *mixCompactionTask) processExecuting() bool {
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(0))
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
}
return false
}
Expand All @@ -82,13 +83,20 @@ func (t *mixCompactionTask) processExecuting() bool {
}
return t.processFailed()
}
saveSuccess := t.saveSegmentMeta()
if !saveSuccess {
err2 := t.saveSegmentMeta()
if err2 != nil {
if errors.Is(err2, merr.ErrIllegalCompactionPlan) {
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
if err3 != nil {
log.Warn("fail to updateAndSaveTaskMeta")
}
return true
}
return false
}
segments := []UniqueID{t.newSegment.GetID()}
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err == nil {
err3 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_meta_saved), setResultSegments(segments))
if err3 == nil {
return t.processMetaSaved()
}
return false
Expand All @@ -110,18 +118,18 @@ func (t *mixCompactionTask) SaveTaskMeta() error {
return t.saveTaskMeta(t.CompactionTask)
}

func (t *mixCompactionTask) saveSegmentMeta() bool {
func (t *mixCompactionTask) saveSegmentMeta() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
// Also prepare metric updates.
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.GetPlan(), t.result)
newSegments, metricMutation, err := t.meta.CompleteCompactionMutation(t.CompactionTask, t.result)
if err != nil {
return false
return err
}
// Apply metrics after successful meta update.
t.newSegment = newSegments[0]
metricMutation.commit()
log.Info("mixCompactionTask success to save segment meta")
return true
return nil
}

func (t *mixCompactionTask) Process() bool {
Expand Down Expand Up @@ -161,7 +169,7 @@ func (t *mixCompactionTask) GetLabel() string {
}

func (t *mixCompactionTask) NeedReAssignNodeID() bool {
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == 0
return t.GetState() == datapb.CompactionTaskState_pipelining && t.GetNodeID() == NullNodeID
}

func (t *mixCompactionTask) processCompleted() bool {
Expand All @@ -178,9 +186,11 @@ func (t *mixCompactionTask) processCompleted() bool {
}

func (t *mixCompactionTask) resetSegmentCompacting() {
for _, segmentBinlogs := range t.GetPlan().GetSegmentBinlogs() {
t.meta.SetSegmentCompacting(segmentBinlogs.GetSegmentID(), false)
var segmentIDs []UniqueID
for _, binLogs := range t.GetPlan().GetSegmentBinlogs() {
segmentIDs = append(segmentIDs, binLogs.GetSegmentID())
}
t.meta.SetSegmentsCompacting(segmentIDs, false)
}

func (t *mixCompactionTask) processTimeout() bool {
Expand Down
Loading

0 comments on commit 760b3fa

Please sign in to comment.