Skip to content

Commit

Permalink
fix: [cherry-pick] Remove flushed segment in segment manager generate…
Browse files Browse the repository at this point in the history
…d through import (#34651)

issue: #34648 

master pr: #34649

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed Jul 12, 2024
1 parent 6a3a14a commit e1686d0
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 1 deletion.
24 changes: 23 additions & 1 deletion internal/datacoord/segment_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,19 +278,28 @@ func (s *SegmentManager) AllocSegment(ctx context.Context, collectionID UniqueID
defer s.mu.Unlock()

// filter segments
validSegments := make(map[UniqueID]struct{})
invalidSegments := make(map[UniqueID]struct{})
segments := make([]*SegmentInfo, 0)
for _, segmentID := range s.segments {
segment := s.meta.GetHealthySegment(segmentID)
if segment == nil {
log.Warn("Failed to get segment info from meta", zap.Int64("id", segmentID))
invalidSegments[segmentID] = struct{}{}
continue
}
validSegments[segmentID] = struct{}{}

if !satisfy(segment, collectionID, partitionID, channelName) || !isGrowing(segment) || segment.GetLevel() == datapb.SegmentLevel_L0 {
continue
}
segments = append(segments, segment)
}

if len(invalidSegments) > 0 {
log.Warn("Failed to get segments infos from meta, clear them", zap.Int64s("segmentIDs", lo.Keys(invalidSegments)))
}
s.segments = lo.Keys(validSegments)

// Apply allocation policy.
maxCountPerSegment, err := s.estimateMaxNumOfRows(collectionID)
if err != nil {
Expand Down Expand Up @@ -500,11 +509,24 @@ func (s *SegmentManager) FlushImportSegments(ctx context.Context, collectionID U
// We set the importing segment state directly to 'Flushed' rather than
// 'Sealed' because all data has been imported, and there is no data
// in the datanode flowgraph that needs to be synced.
candidatesMap := make(map[UniqueID]struct{})
for _, id := range candidates {
if err := s.meta.SetState(id, commonpb.SegmentState_Flushed); err != nil {
return err
}
candidatesMap[id] = struct{}{}
}

validSegments := make(map[UniqueID]struct{})
for _, id := range s.segments {
if _, ok := candidatesMap[id]; !ok {
validSegments[id] = struct{}{}
}
}

// it is necessary for v2.4.x, import segments were no longer assigned by the segmentManager.
s.segments = lo.Keys(validSegments)

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions internal/datacoord/segment_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,23 @@ func TestAllocSegment(t *testing.T) {
assert.Error(t, err)
assert.Nil(t, segmentManager)
})

t.Run("alloc clear unhealthy segment", func(t *testing.T) {
allocations1, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations1))
assert.EqualValues(t, 1, len(segmentManager.segments))

err = meta.SetState(allocations1[0].SegmentID, commonpb.SegmentState_Dropped)
assert.NoError(t, err)

allocations2, err := segmentManager.AllocSegment(ctx, collID, 100, "c1", 100)
assert.NoError(t, err)
assert.EqualValues(t, 1, len(allocations2))
// clear old healthy and alloc new
assert.EqualValues(t, 1, len(segmentManager.segments))
assert.NotEqual(t, allocations1[0].SegmentID, allocations2[0].SegmentID)
})
}

func TestLastExpireReset(t *testing.T) {
Expand Down

0 comments on commit e1686d0

Please sign in to comment.