Skip to content

Commit

Permalink
fix: Only load or release Flushed segment in datanode meta (#34390)
Browse files Browse the repository at this point in the history
issue: #34376 , #34379 , #34375

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Jul 3, 2024
1 parent 94fb580 commit 2e434e4
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 18 deletions.
45 changes: 27 additions & 18 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,26 +311,35 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
futures := make([]*conc.Future[any], 0, len(missingSegments))

for _, segID := range missingSegments {
segID := segID
newSeg := req.GetSegmentInfos()[segID]
newSegments = append(newSegments, newSeg)
future := io.GetOrCreateStatsPool().Submit(func() (any, error) {
var val *metacache.BloomFilterSet
var err error
err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
log.Warn("failed to DecompressBinLog", zap.Error(err))
return val, err
switch newSeg.GetLevel() {
case datapb.SegmentLevel_L0:
log.Warn("segment level is L0, may be the channel has not been successfully watched yet", zap.Int64("segmentID", segID))
case datapb.SegmentLevel_Legacy:
log.Warn("segment level is legacy, please check", zap.Int64("segmentID", segID))
default:
if newSeg.GetState() == commonpb.SegmentState_Flushed {
log.Info("segment loading PKs", zap.Int64("segmentID", segID))
newSegments = append(newSegments, newSeg)
future := io.GetOrCreateStatsPool().Submit(func() (any, error) {
var val *metacache.BloomFilterSet
var err error
err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
log.Warn("failed to DecompressBinLog", zap.Error(err))
return val, err
}
pks, err := compaction.LoadStats(ctx, node.chunkManager, ds.GetMetaCache().Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return val, err
}
val = metacache.NewBloomFilterSet(pks...)
return val, nil
})
futures = append(futures, future)
}
pks, err := compaction.LoadStats(ctx, node.chunkManager, ds.GetMetaCache().Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return val, err
}
val = metacache.NewBloomFilterSet(pks...)
return val, nil
})
futures = append(futures, future)
}
}

err := conc.AwaitAll(futures...)
Expand Down
64 changes: 64 additions & 0 deletions internal/datanode/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,70 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
s.True(exist)
s.NotNil(info)
})

s.Run("dc growing/flushing dn dropped", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := pipeline.NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).
Return(pipeline.NewDataSyncServiceWithMetaCache(cache), true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}

status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))

info, exist := cache.GetSegmentByID(100)
s.False(exist)
s.Nil(info)

info, exist = cache.GetSegmentByID(101)
s.False(exist)
s.Nil(info)
})
}

func (s *DataNodeServicesSuite) TestDropCompactionPlan() {
Expand Down

0 comments on commit 2e434e4

Please sign in to comment.