From 86db3e1b88783ce811dc0205d46b5e423866b16c Mon Sep 17 00:00:00 2001 From: Cai Zhang Date: Mon, 1 Jul 2024 17:33:27 +0800 Subject: [PATCH] Sync the sealed and flushed segments to datanode Signed-off-by: Cai Zhang --- internal/datacoord/compaction_trigger.go | 4 + internal/datacoord/sync_segments_scheduler.go | 9 +- internal/datanode/metacache/meta_cache.go | 6 +- internal/datanode/services_test.go | 343 ++++++++++++++++++ 4 files changed, 357 insertions(+), 5 deletions(-) diff --git a/internal/datacoord/compaction_trigger.go b/internal/datacoord/compaction_trigger.go index 430ad0f93b0d..49ee00480343 100644 --- a/internal/datacoord/compaction_trigger.go +++ b/internal/datacoord/compaction_trigger.go @@ -863,6 +863,10 @@ func isFlush(segment *SegmentInfo) bool { return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing } +func needSync(segment *SegmentInfo) bool { + return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing || segment.GetState() == commonpb.SegmentState_Sealed +} + // buckets will be updated inplace func (t *compactionTrigger) squeezeSmallSegmentsToBuckets(small []*SegmentInfo, buckets [][]*SegmentInfo, expectedSize int64) (remaining []*SegmentInfo) { for i := len(small) - 1; i >= 0; i-- { diff --git a/internal/datacoord/sync_segments_scheduler.go b/internal/datacoord/sync_segments_scheduler.go index 8b3a694fea92..be121b0df96d 100644 --- a/internal/datacoord/sync_segments_scheduler.go +++ b/internal/datacoord/sync_segments_scheduler.go @@ -98,7 +98,7 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() { continue } for _, partitionID := range collInfo.Partitions { - if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil { + if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil { log.Warn("sync segment with channel failed, retry next ticker", zap.Int64("collectionID", collID), zap.Int64("partitionID", partitionID), @@ -111,11 +111,14 @@ func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() { } } -func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error { +func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error { log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.String("channelName", channelName), zap.Int64("nodeID", nodeID)) + // sync all healthy segments, but only check flushed segments on datanode. Because L0 growing segments may not in datacoord's meta. + // upon receiving the SyncSegments request, the datanode's segment state may have already transitioned from Growing/Flushing + // to Flushed, so the view must include this segment. segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool { - return info.GetPartitionID() == partitionID && isFlush(info) + return info.GetPartitionID() == partitionID && isSegmentHealthy(info) })) req := &datapb.SyncSegmentsRequest{ ChannelName: channelName, diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index 1c1228cc61b4..1c070e824bbd 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -291,8 +291,10 @@ func (c *metaCacheImpl) UpdateSegmentView(partitionID int64, } for segID, info := range c.segmentInfos { - if info.partitionID != partitionID || - (info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) { + // only check flushed segments + // 1. flushing may be compacted on datacoord + // 2. growing may doesn't have stats log, it won't include in sync views + if info.partitionID != partitionID || info.state != commonpb.SegmentState_Flushed { continue } if _, ok := allSegments[segID]; !ok { diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index 7342eed06858..9bb70dff4939 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -684,6 +684,349 @@ func (s *DataNodeServicesSuite) TestSyncSegments() { s.True(exist) s.NotNil(info) }) + + s.Run("dc growing/flushing dn flushed", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 100, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). + Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 100: { + SegmentId: 100, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + 101: { + SegmentId: 101, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushing, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(101) + s.True(exist) + s.NotNil(info) + }) + + s.Run("dc flushed dn growing/flushing", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 100, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushing, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). + Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 100: { + SegmentId: 100, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + 101: { + SegmentId: 101, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(101) + s.True(exist) + s.NotNil(info) + }) + + s.Run("dc dropped dn growing/flushing", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 100, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushing, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 102, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). + Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 102: { + SegmentId: 102, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(101) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(102) + s.True(exist) + s.NotNil(info) + }) + + s.Run("dc dropped dn flushed", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 100, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushing, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything). + Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 102: { + SegmentId: 102, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1025, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.False(exist) + s.Nil(info) + + info, exist = cache.GetSegmentByID(101) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(102) + s.True(exist) + s.NotNil(info) + }) } func (s *DataNodeServicesSuite) TestDropCompactionPlan() {