Skip to content

Commit

Permalink
Refine clustering_compaction_task retry mechanism (milvus-io#34194)
Browse files Browse the repository at this point in the history
milvus-io#32939

Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Jul 3, 2024
1 parent b3c5eb2 commit f7377c9
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 16 deletions.
53 changes: 37 additions & 16 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ func (t *clusteringCompactionTask) Process() bool {
Observe(float64(elapse))
}
}
// todo debug
log.Info("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
log.Debug("process clustering task", zap.String("lastState", lastState), zap.String("currentState", currentState))
return t.State == datapb.CompactionTaskState_completed || t.State == datapb.CompactionTaskState_cleaned
}

Expand Down Expand Up @@ -186,7 +185,7 @@ func (t *clusteringCompactionTask) processPipelining() error {
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("fail to set segment level to L2", zap.Error(err))
return err
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo before compaction executing", err)
}

if typeutil.IsVectorType(t.GetClusteringKeyField().DataType) {
Expand All @@ -211,7 +210,7 @@ func (t *clusteringCompactionTask) processExecuting() error {
if err != nil || result == nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// todo reassign node ID
// setNodeID(NullNodeID) to trigger reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil
}
Expand All @@ -223,9 +222,9 @@ func (t *clusteringCompactionTask) processExecuting() error {
t.result = result
result := t.result
if len(result.GetSegments()) == 0 {
log.Info("illegal compaction results")
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
return err
log.Warn("illegal compaction results, this should not happen")
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed))
return merr.WrapErrCompactionResult("compaction result is empty")
}

resultSegmentIDs := lo.Map(result.Segments, func(segment *datapb.CompactionSegment, _ int) int64 {
Expand All @@ -247,6 +246,8 @@ func (t *clusteringCompactionTask) processExecuting() error {
err := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_timeout))
if err == nil {
return t.processFailedOrTimeout()
} else {
return err
}
}
return nil
Expand Down Expand Up @@ -294,19 +295,19 @@ func (t *clusteringCompactionTask) completeTask() error {
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("SavePartitionStatsInfo", err)
}

var operators []UpdateOperator
for _, segID := range t.GetResultSegments() {
operators = append(operators, UpdateSegmentPartitionStatsVersionOperator(segID, t.GetPlanID()))
}

err = t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentPartitionStatsVersion", err)
}

err = t.meta.GetPartitionStatsMeta().SaveCurrentPartitionStatsVersion(t.GetCollectionID(), t.GetPartitionID(), t.GetChannel(), t.GetPlanID())
if err != nil {
return err
return merr.WrapErrClusteringCompactionMetaError("SaveCurrentPartitionStatsVersion", err)
}
return t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_completed))
}
Expand All @@ -315,13 +316,13 @@ func (t *clusteringCompactionTask) processAnalyzing() error {
analyzeTask := t.meta.GetAnalyzeMeta().GetTask(t.GetAnalyzeTaskID())
if analyzeTask == nil {
log.Warn("analyzeTask not found", zap.Int64("id", t.GetAnalyzeTaskID()))
return errors.New("analyzeTask not found")
return merr.WrapErrAnalyzeTaskNotFound(t.GetAnalyzeTaskID()) // retryable
}
log.Info("check analyze task state", zap.Int64("id", t.GetAnalyzeTaskID()), zap.Int64("version", analyzeTask.GetVersion()), zap.String("state", analyzeTask.State.String()))
switch analyzeTask.State {
case indexpb.JobState_JobStateFinished:
if analyzeTask.GetCentroidsFile() == "" {
// fake finished vector clustering is not supported in opensource
// not retryable, fake finished vector clustering is not supported in opensource
return merr.WrapErrClusteringCompactionNotSupportVector()
} else {
t.AnalyzeVersion = analyzeTask.GetVersion()
Expand Down Expand Up @@ -354,7 +355,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err := t.meta.UpdateSegmentsInfo(operators...)
if err != nil {
log.Warn("UpdateSegmentsInfo fail", zap.Error(err))
return err
return merr.WrapErrClusteringCompactionMetaError("UpdateSegmentsInfo", err)
}
t.resetSegmentCompacting()

Expand All @@ -369,6 +370,7 @@ func (t *clusteringCompactionTask) processFailedOrTimeout() error {
err = t.meta.CleanPartitionStatsInfo(partitionStatsInfo)
if err != nil {
log.Warn("gcPartitionStatsInfo fail", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("CleanPartitionStatsInfo", err)
}

t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_cleaned))
Expand Down Expand Up @@ -404,14 +406,32 @@ func (t *clusteringCompactionTask) doAnalyze() error {
}

func (t *clusteringCompactionTask) doCompact() error {
log := log.With(zap.Int64("planID", t.GetPlanID()), zap.String("type", t.GetType().String()))
if t.NeedReAssignNodeID() {
return errors.New("not assign nodeID")
}
var err error
// check whether the compaction plan is already submitted considering
// datacoord may crash between call sessions.Compaction and updateTaskState to executing
result, err := t.sessions.GetCompactionPlanResult(t.GetNodeID(), t.GetPlanID())
if err != nil {
if errors.Is(err, merr.ErrNodeNotFound) {
log.Warn("GetCompactionPlanResult fail", zap.Error(err))
// setNodeID(NullNodeID) to trigger reassign node ID
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_pipelining), setNodeID(NullNodeID))
return nil
}
return merr.WrapErrGetCompactionPlanResultFail(err)
}
if result != nil {
log.Info("compaction already submitted")
t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_executing))
return nil
}

t.plan, err = t.BuildCompactionRequest()
if err != nil {
err2 := t.updateAndSaveTaskMeta(setState(datapb.CompactionTaskState_failed), setFailReason(err.Error()))
return err2
log.Warn("Failed to BuildCompactionRequest", zap.Error(err))
return merr.WrapErrBuildCompactionRequestFail(err) // retryable
}
err = t.sessions.Compaction(context.Background(), t.GetNodeID(), t.GetPlan())
if err != nil {
Expand Down Expand Up @@ -460,7 +480,8 @@ func (t *clusteringCompactionTask) updateAndSaveTaskMeta(opts ...compactionTaskO
task := t.ShadowClone(opts...)
err := t.saveTaskMeta(task)
if err != nil {
return err
log.Warn("Failed to saveTaskMeta", zap.Error(err))
return merr.WrapErrClusteringCompactionMetaError("updateAndSaveTaskMeta", err) // retryable
}
t.CompactionTask = task
return nil
Expand Down
4 changes: 4 additions & 0 deletions pkg/util/merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ var (
ErrClusteringCompactionMetaError = newMilvusError("fail to update meta in clustering compaction", 2308, true)
ErrClusteringCompactionGetCollectionFail = newMilvusError("fail to get collection in compaction", 2309, true)
ErrCompactionResultNotFound = newMilvusError("compaction result not found", 2310, false)
ErrAnalyzeTaskNotFound = newMilvusError("analyze task not found", 2311, true)
ErrBuildCompactionRequestFail = newMilvusError("fail to build CompactionRequest", 2312, true)
ErrGetCompactionPlanResultFail = newMilvusError("fail to get compaction plan", 2313, true)
ErrCompactionResult = newMilvusError("illegal compaction results", 2314, false)

// General
ErrOperationNotSupported = newMilvusError("unsupported operation", 3000, false)
Expand Down
20 changes: 20 additions & 0 deletions pkg/util/merr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -1117,3 +1117,23 @@ func WrapErrClusteringCompactionSubmitTaskFail(taskType string, err error) error
func WrapErrClusteringCompactionMetaError(operation string, err error) error {
return wrapFieldsWithDesc(ErrClusteringCompactionMetaError, err.Error(), value("operation", operation))
}

func WrapErrAnalyzeTaskNotFound(id int64) error {
return wrapFields(ErrAnalyzeTaskNotFound, value("analyzeId", id))
}

func WrapErrBuildCompactionRequestFail(err error) error {
return wrapFieldsWithDesc(ErrBuildCompactionRequestFail, err.Error())
}

func WrapErrGetCompactionPlanResultFail(err error) error {
return wrapFieldsWithDesc(ErrGetCompactionPlanResultFail, err.Error())
}

func WrapErrCompactionResult(msg ...string) error {
err := error(ErrCompactionResult)
if len(msg) > 0 {
err = errors.Wrap(err, strings.Join(msg, "->"))
}
return err
}

0 comments on commit f7377c9

Please sign in to comment.