Skip to content

Commit

Permalink
fix: Sync the sealed and flushed segments to datanode (milvus-io#34301)
Browse files Browse the repository at this point in the history
issue: milvus-io#33696

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 authored and yellow-shine committed Jul 2, 2024
1 parent d3ef5d7 commit 2f4b8f5
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 5 deletions.
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,10 @@ func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
}

func needSync(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed
}

// buckets will be updated inplace
func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) {
for i := len(small) - 1; i >= 0; i-- {
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
continue
}
for _, partitionID := range collInfo.Partitions {
if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
log.Warn("sync segment with channel failed, retry next ticker",
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partitionID),
Expand All @@ -111,11 +111,14 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
}
}

func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
// sync all healthy segments, but only check flushed segments on datanode. Because L0 growing segments may not in datacoord's meta.
// upon receiving the SyncSegments request, the datanode's segment state may have already transitioned from Growing/Flushing
// to Flushed, so the view must include this segment.
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isFlush(info)
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ func (c *metaCacheImpl) UpdateSegmentView(partitionID int64,
}

for segID, info := range c.segmentInfos {
if info.partitionID != partitionID ||
(info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) {
// only check flushed segments
// 1. flushing may be compacted on datacoord
// 2. growing may doesn't have stats log, it won't include in sync views
if info.partitionID != partitionID || info.state != commonpb.SegmentState_Flushed {
continue
}
if _, ok := allSegments[segID]; !ok {
Expand Down
343 changes: 343 additions & 0 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,349 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
s.True(exist)
s.NotNil(info)
})

s.Run("dc growing/flushing dn flushed", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)
})

s.Run("dc flushed dn growing/flushing", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)
})

s.Run("dc dropped dn growing/flushing", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
102: {
SegmentId: 102,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(102)
s.True(exist)
s.NotNil(info)
})

s.Run("dc dropped dn flushed", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
102: {
SegmentId: 102,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1025,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.False(exist)
s.Nil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(102)
s.True(exist)
s.NotNil(info)
})
}

func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
Expand Down

0 comments on commit 2f4b8f5

Please sign in to comment.