Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Sync the sealed and flushed segments to datanode #34301

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/datacoord/compaction_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,10 @@ func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
}

func needSync(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed
}

// buckets will be updated inplace
func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) {
for i := len(small) - 1; i >= 0; i-- {
Expand Down
9 changes: 6 additions & 3 deletions internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
continue
}
for _, partitionID := range collInfo.Partitions {
if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
log.Warn("sync segment with channel failed, retry next ticker",
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partitionID),
Expand All @@ -111,11 +111,14 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
}
}

func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
// sync all healthy segments, but only check flushed segments on datanode. Because L0 growing segments may not in datacoord's meta.
// upon receiving the SyncSegments request, the datanode's segment state may have already transitioned from Growing/Flushing
// to Flushed, so the view must include this segment.
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isFlush(info)
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
Expand Down
6 changes: 4 additions & 2 deletions internal/datanode/metacache/meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,10 @@ func (c *metaCacheImpl) UpdateSegmentView(partitionID int64,
}

for segID, info := range c.segmentInfos {
if info.partitionID != partitionID ||
(info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) {
// only check flushed segments
// 1. flushing may be compacted on datacoord
// 2. growing may doesn't have stats log, it won't include in sync views
if info.partitionID != partitionID || info.state != commonpb.SegmentState_Flushed {
continue
}
if _, ok := allSegments[segID]; !ok {
Expand Down
343 changes: 343 additions & 0 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,349 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
s.True(exist)
s.NotNil(info)
})

s.Run("dc growing/flushing dn flushed", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)
})

s.Run("dc flushed dn growing/flushing", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)
})

s.Run("dc dropped dn growing/flushing", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 102,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
102: {
SegmentId: 102,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(102)
s.True(exist)
s.NotNil(info)
})

s.Run("dc dropped dn flushed", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 100,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L0,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
cache.AddSegment(&datapb.SegmentInfo{
ID: 101,
CollectionID: 1,
PartitionID: 2,
InsertChannel: "111",
NumOfRows: 0,
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
102: {
SegmentId: 102,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushed,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1025,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.False(exist)
s.Nil(info)

info, exist = cache.GetSegmentByID(101)
s.True(exist)
s.NotNil(info)

info, exist = cache.GetSegmentByID(102)
s.True(exist)
s.NotNil(info)
})
}

func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
Expand Down
Loading