Skip to content

Commit

Permalink
fix: [cherry-pick] Sync the sealed and flushed segments to datanode (#…
Browse files Browse the repository at this point in the history
…34301) (#34318)

issue: #33696

master pr: #34301

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Jul 2, 2024
1 parent ad545b6 commit 6cb0f1f
Show file tree
Hide file tree
Showing 3 changed files with 358 additions and 6 deletions.
11 changes: 7 additions & 4 deletions internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
continue
}
for _, partitionID := range collInfo.Partitions {
if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
log.Warn("sync segment with channel failed, retry next ticker",
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partitionID),
Expand All @@ -111,11 +111,14 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
}
}

func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
segments := sss.meta.SelectSegments(WithCollection(collectionID), WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isFlush(info)
// sync all healthy segments, but only check flushed segments on datanode. Because L0 growing segments may not in datacoord's meta.
// upon receiving the SyncSegments request, the datanode's segment state may have already transitioned from Growing/Flushing
// to Flushed, so the view must include this segment.
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,10 @@ func (c *metaCacheImpl) UpdateSegmentView(partitionID int64,
}

for segID, info := range c.segmentInfos {
if info.partitionID != partitionID ||
(info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) {
// only check flushed segments
// 1. flushing may be compacted on datacoord
// 2. growing may doesn't have stats log, it won't include in sync views
if info.partitionID != partitionID || info.state != commonpb.SegmentState_Flushed {
continue
}
if _, ok := allSegments[segID]; !ok {
Expand Down
347 changes: 347 additions & 0 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,6 +675,353 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
s.True(exist)
s.NotNil(info)
})

s.Run("dc growing/flushing dn flushed", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{
metacache: cache,
}, true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)
})

s.Run("dc flushed dn growing/flushing", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{
metacache: cache,
}, true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)
})

s.Run("dc dropped dn growing/flushing", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{
metacache: cache,
}, true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
102: {
SegmentId: 102,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(102)
s.True(exist)
s.NotNil(info)
})

s.Run("dc dropped dn flushed", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{
metacache: cache,
}, true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
102: {
SegmentId: 102,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1025,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.False(exist)
s.Nil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(102)
s.True(exist)
s.NotNil(info)
})
}

func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
Expand Down

0 comments on commit 6cb0f1f

Please sign in to comment.