Skip to content

Commit

Permalink
Sync the sealed and flushed segments to datanode
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Jul 1, 2024
1 parent ef3ced8 commit 2bac2f5
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 1 deletion.
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,10 @@ func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
}

func needSync(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed
}

// buckets will be updated inplace
func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) {
for i := len(small) - 1; i >= 0; i-- {
Expand Down
2 changes: 1 addition & 1 deletion internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isFlush(info)
return info.GetPartitionID() == partitionID && needSync(info)
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
Expand Down

0 comments on commit 2bac2f5

Please sign in to comment.