diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 430ad0f93b0d..49ee00480343 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -863,6 +863,10 @@ func isFlush(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing } +func needSync(segment *SegmentInfo) bool { + return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed +} + // buckets will be updated inplace func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) { for i := len(small) - 1; i >= 0; i-- { diff --git a/internal/datacoord/sync_segments_scheduler.go b/internal/datacoord/sync_segments_scheduler.go index 8b3a694fea92..19046812ed18 100644 --- a/internal/datacoord/sync_segments_scheduler.go +++ b/internal/datacoord/sync_segments_scheduler.go @@ -115,7 +115,7 @@ func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("channelName", channelName), zap.Int64("nodeID", nodeID)) segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool { - return info.GetPartitionID() == partitionID && isFlush(info) + return info.GetPartitionID() == partitionID && needSync(info) })) req := &datapb.SyncSegmentsRequest{ ChannelName: channelName,