Skip to content

Commit

Permalink
fix: [Cherry-pick]Pick major compaction fixs and optimizations (#34360)
Browse files Browse the repository at this point in the history
This PR cherry-picks the following commits:

- fix: sync partitiion stats blocking balance task #33742
- fix: Fix meta prefix overlap bug #33830
- fix: Small fixs of major compaction #33929 
- fix: Fix memory buffer error & some renaming #33850
- fix: sync part stats task cannot be finished #34027 
- Add an option to enable/disable vector field clustering key #34097
- fix: fix error ignore in compactor #34169
- fix:load major compaction partial result #34052
- Use new stream segment reader in clustering compaction #34232

issue: #30633
pr: #33742 #33830 #33929 #33850 #34027 #34097 #34169 #34052 #34232

---------

Signed-off-by: MrPresent-Han <chun.han@zilliz.com>
Signed-off-by: wayblink <anyang.wang@zilliz.com>
Signed-off-by: MrPresent-Han <chun.han@gmail.com>
Co-authored-by: Chun Han <116052805+MrPresent-Han@users.noreply.github.com>
Co-authored-by: MrPresent-Han <chun.han@gmail.com>
  • Loading branch information
3 people committed Jul 3, 2024
1 parent 760b3fa commit c62bf8a
Show file tree
Hide file tree
Showing 28 changed files with 327 additions and 146 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,10 @@ common:
traceLogMode: 0 # trace request info
bloomFilterSize: 100000 # bloom filter initial size
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
# clustering key/compaction related
usePartitionKeyAsClusteringKey: false
useVectorAsClusteringKey: false
enableVectorClusteringKey: false

# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
Expand Down
2 changes: 2 additions & 0 deletions internal/datacoord/compaction_task_clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ func (t *clusteringCompactionTask) BuildCompactionRequest() (*datapb.CompactionP

func (t *clusteringCompactionTask) processPipelining() error {
log := log.With(zap.Int64("triggerID", t.TriggerID), zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("planID", t.GetPlanID()))
ts := time.Now().UnixMilli()
t.updateAndSaveTaskMeta(setStartTime(ts))
var operators []UpdateOperator
for _, segID := range t.InputSegments {
operators = append(operators, UpdateSegmentLevelOperator(segID, datapb.SegmentLevel_L2))
Expand Down
15 changes: 13 additions & 2 deletions internal/datacoord/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
zap.String("channel", channel.GetName()),
zap.Int("numOfSegments", len(segments)),
zap.Int("indexed segment", len(indexedSegments)),
zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion),
)
unIndexedIDs := make(typeutil.UniqueSet)

Expand All @@ -145,13 +146,23 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs .
// Skip bulk insert segments.
continue
}
if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion > currentPartitionStatsVersion {
// skip major compaction not fully completed.
if s.GetLevel() == datapb.SegmentLevel_L2 && s.PartitionStatsVersion != currentPartitionStatsVersion {
// in the process of L2 compaction, newly generated segment may be visible before the whole L2 compaction Plan
// is finished, we have to skip these fast-finished segment because all segments in one L2 Batch must be
// seen atomically, otherwise users will see intermediate result
continue
}
segmentInfos[s.GetID()] = s
switch {
case s.GetState() == commonpb.SegmentState_Dropped:
if s.GetLevel() == datapb.SegmentLevel_L2 && s.GetPartitionStatsVersion() == currentPartitionStatsVersion {
// if segment.partStatsVersion is equal to currentPartitionStatsVersion,
// it must have been indexed, this is guaranteed by clustering compaction process
// this is to ensure that the current valid L2 compaction produce is available to search/query
// to avoid insufficient data
indexedIDs.Insert(s.GetID())
continue
}
droppedIDs.Insert(s.GetID())
case !isFlushState(s.GetState()):
growingIDs.Insert(s.GetID())
Expand Down
Loading

0 comments on commit c62bf8a

Please sign in to comment.