From 3d29907b6e728fe0699298812c0a6ffdb4d4ba37 Mon Sep 17 00:00:00 2001 From: jaime Date: Tue, 28 May 2024 19:17:43 +0800 Subject: [PATCH] enhance: decrease cpu overhead during filter segments on datacoord (#33130) issue: #33129 Signed-off-by: jaime --- internal/datacoord/compaction_trigger_test.go | 73 ++++++++---- internal/datacoord/garbage_collector_test.go | 9 +- internal/datacoord/index_service_test.go | 7 +- internal/datacoord/meta_test.go | 92 +++++++++++++++ internal/datacoord/segment_info.go | 108 +++++++++++------- internal/datacoord/segment_operator.go | 22 +++- 6 files changed, 242 insertions(+), 69 deletions(-) diff --git a/internal/datacoord/compaction_trigger_test.go b/internal/datacoord/compaction_trigger_test.go index 56710ed80efe..78166718148a 100644 --- a/internal/datacoord/compaction_trigger_test.go +++ b/internal/datacoord/compaction_trigger_test.go @@ -2197,34 +2197,63 @@ func (s *CompactionTriggerSuite) SetupTest() { catalog := mocks.NewDataCoordCatalog(s.T()) catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil) + seg1 := &SegmentInfo{ + SegmentInfo: s.genSeg(1, 60), + lastFlushTime: time.Now().Add(-100 * time.Minute), + } + seg2 := &SegmentInfo{ + SegmentInfo: s.genSeg(2, 60), + lastFlushTime: time.Now(), + } + seg3 := &SegmentInfo{ + SegmentInfo: s.genSeg(3, 60), + lastFlushTime: time.Now(), + } + seg4 := &SegmentInfo{ + SegmentInfo: s.genSeg(4, 60), + lastFlushTime: time.Now(), + } + seg5 := &SegmentInfo{ + SegmentInfo: s.genSeg(5, 60), + lastFlushTime: time.Now(), + } + seg6 := &SegmentInfo{ + SegmentInfo: s.genSeg(6, 60), + lastFlushTime: time.Now(), + } + s.meta = &meta{ channelCPs: newChannelCps(), catalog: catalog, segments: &SegmentsInfo{ segments: map[int64]*SegmentInfo{ - 1: { - SegmentInfo: s.genSeg(1, 60), - lastFlushTime: time.Now().Add(-100 * time.Minute), - }, - 2: { - SegmentInfo: s.genSeg(2, 60), - lastFlushTime: time.Now(), - }, - 3: { - SegmentInfo: s.genSeg(3, 60), - lastFlushTime: time.Now(), - }, - 4: { - SegmentInfo: s.genSeg(4, 60), - lastFlushTime: time.Now(), - }, - 5: { - SegmentInfo: s.genSeg(5, 26), - lastFlushTime: time.Now(), + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + 5: seg5, + 6: seg6, + }, + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{ + s.collectionID: { + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + 5: seg5, + 6: seg6, + }, }, - 6: { - SegmentInfo: s.genSeg(6, 26), - lastFlushTime: time.Now(), + channel2Segments: map[string]map[UniqueID]*SegmentInfo{ + s.channel: { + 1: seg1, + 2: seg2, + 3: seg3, + 4: seg4, + 5: seg5, + 6: seg6, + }, }, }, }, diff --git a/internal/datacoord/garbage_collector_test.go b/internal/datacoord/garbage_collector_test.go index 66f7873b4b81..93a96f7e3742 100644 --- a/internal/datacoord/garbage_collector_test.go +++ b/internal/datacoord/garbage_collector_test.go @@ -465,7 +465,14 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m }, }, segID + 1: { - SegmentInfo: nil, + SegmentInfo: &datapb.SegmentInfo{ + ID: segID + 1, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "", + NumOfRows: 1026, + State: commonpb.SegmentState_Dropped, + }, }, } meta := &meta{ diff --git a/internal/datacoord/index_service_test.go b/internal/datacoord/index_service_test.go index cda44b2558e8..d10c8d104f1b 100644 --- a/internal/datacoord/index_service_test.go +++ b/internal/datacoord/index_service_test.go @@ -938,7 +938,12 @@ func TestServer_GetSegmentIndexState(t *testing.T) { WriteHandoff: false, }) s.meta.segments.SetSegment(segID, &SegmentInfo{ - SegmentInfo: nil, + SegmentInfo: &datapb.SegmentInfo{ + ID: segID, + CollectionID: collID, + PartitionID: partID, + InsertChannel: "ch", + }, currRows: 0, allocations: nil, lastFlushTime: time.Time{}, diff --git a/internal/datacoord/meta_test.go b/internal/datacoord/meta_test.go index dd0471eebcdd..d90b1b015018 100644 --- a/internal/datacoord/meta_test.go +++ b/internal/datacoord/meta_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/errors" "github.com/golang/protobuf/proto" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" @@ -982,6 +983,97 @@ func Test_meta_GetSegmentsOfCollection(t *testing.T) { assert.True(t, ok) assert.Equal(t, expected, gotInfo.GetState()) } + + got = m.GetSegmentsOfCollection(-1) + assert.Equal(t, 3, len(got)) + + got = m.GetSegmentsOfCollection(10) + assert.Equal(t, 0, len(got)) +} + +func Test_meta_GetSegmentsWithChannel(t *testing.T) { + storedSegments := NewSegmentsInfo() + for segID, segment := range map[int64]*SegmentInfo{ + 1: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 1, + CollectionID: 1, + InsertChannel: "h1", + State: commonpb.SegmentState_Flushed, + }, + }, + 2: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 2, + CollectionID: 1, + InsertChannel: "h2", + State: commonpb.SegmentState_Growing, + }, + }, + 3: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 3, + CollectionID: 2, + State: commonpb.SegmentState_Flushed, + InsertChannel: "h1", + }, + }, + } { + storedSegments.SetSegment(segID, segment) + } + m := &meta{segments: storedSegments} + got := m.GetSegmentsByChannel("h1") + assert.Equal(t, 2, len(got)) + assert.ElementsMatch(t, []int64{1, 3}, lo.Map( + got, + func(s *SegmentInfo, i int) int64 { + return s.ID + }, + )) + + got = m.GetSegmentsByChannel("h3") + assert.Equal(t, 0, len(got)) + + got = m.SelectSegments(WithCollection(1), WithChannel("h1"), SegmentFilterFunc(func(segment *SegmentInfo) bool { + return segment != nil && segment.GetState() == commonpb.SegmentState_Flushed + })) + assert.Equal(t, 1, len(got)) + assert.ElementsMatch(t, []int64{1}, lo.Map( + got, + func(s *SegmentInfo, i int) int64 { + return s.ID + }, + )) + + m.segments.DropSegment(3) + _, ok := m.segments.secondaryIndexes.coll2Segments[2] + assert.False(t, ok) + assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments)) + assert.Equal(t, 2, len(m.segments.secondaryIndexes.channel2Segments)) + + segments, ok := m.segments.secondaryIndexes.channel2Segments["h1"] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[1].ID) + segments, ok = m.segments.secondaryIndexes.channel2Segments["h2"] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(2), segments[2].ID) + + m.segments.DropSegment(2) + segments, ok = m.segments.secondaryIndexes.coll2Segments[1] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[1].ID) + assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments)) + assert.Equal(t, 1, len(m.segments.secondaryIndexes.channel2Segments)) + + segments, ok = m.segments.secondaryIndexes.channel2Segments["h1"] + assert.True(t, ok) + assert.Equal(t, 1, len(segments)) + assert.Equal(t, int64(1), segments[1].ID) + _, ok = m.segments.secondaryIndexes.channel2Segments["h2"] + assert.False(t, ok) } func TestMeta_HasSegments(t *testing.T) { diff --git a/internal/datacoord/segment_info.go b/internal/datacoord/segment_info.go index 40f0d46fe6cb..13e6f4aad1c3 100644 --- a/internal/datacoord/segment_info.go +++ b/internal/datacoord/segment_info.go @@ -32,12 +32,17 @@ import ( // SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation type SegmentsInfo struct { - segments map[UniqueID]*SegmentInfo - collSegments map[UniqueID]*CollectionSegments - compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. + segments map[UniqueID]*SegmentInfo + secondaryIndexes segmentInfoIndexes + compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key. // A segment can be compacted to only one segment finally in meta. } +type segmentInfoIndexes struct { + coll2Segments map[UniqueID]map[UniqueID]*SegmentInfo + channel2Segments map[string]map[UniqueID]*SegmentInfo +} + // SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it type SegmentInfo struct { *datapb.SegmentInfo @@ -69,16 +74,15 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo { // note that no mutex is wrapped so external concurrent control is needed func NewSegmentsInfo() *SegmentsInfo { return &SegmentsInfo{ - segments: make(map[UniqueID]*SegmentInfo), - collSegments: make(map[UniqueID]*CollectionSegments), + segments: make(map[UniqueID]*SegmentInfo), + secondaryIndexes: segmentInfoIndexes{ + coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo), + channel2Segments: make(map[string]map[UniqueID]*SegmentInfo), + }, compactionTo: make(map[UniqueID]UniqueID), } } -type CollectionSegments struct { - segments map[int64]*SegmentInfo -} - // GetSegment returns SegmentInfo // the logPath in meta is empty func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo { @@ -96,24 +100,42 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo { return lo.Values(s.segments) } +func (s *SegmentsInfo) getCandidates(criterion *segmentCriterion) map[UniqueID]*SegmentInfo { + if criterion.collectionID > 0 { + collSegments, ok := s.secondaryIndexes.coll2Segments[criterion.collectionID] + if !ok { + return nil + } + + // both collection id and channel are filters of criterion + if criterion.channel != "" { + return lo.OmitBy(collSegments, func(k UniqueID, v *SegmentInfo) bool { + return v.InsertChannel != criterion.channel + }) + } + return collSegments + } + + if criterion.channel != "" { + channelSegments, ok := s.secondaryIndexes.channel2Segments[criterion.channel] + if !ok { + return nil + } + return channelSegments + } + + return s.segments +} + func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo { criterion := &segmentCriterion{} for _, filter := range filters { filter.AddFilter(criterion) } - var result []*SegmentInfo - var candidates map[int64]*SegmentInfo + // apply criterion - switch { - case criterion.collectionID > 0: - collSegments, ok := s.collSegments[criterion.collectionID] - if !ok { - return nil - } - candidates = collSegments.segments - default: - candidates = s.segments - } + candidates := s.getCandidates(criterion) + var result []*SegmentInfo for _, segment := range candidates { if criterion.Match(segment) { result = append(result, segment) @@ -144,7 +166,7 @@ func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) func (s *SegmentsInfo) DropSegment(segmentID UniqueID) { if segment, ok := s.segments[segmentID]; ok { s.deleteCompactTo(segment) - s.delCollection(segment) + s.removeSecondaryIndex(segment) delete(s.segments, segmentID) } } @@ -156,10 +178,10 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) { if segment, ok := s.segments[segmentID]; ok { // Remove old segment compact to relation first. s.deleteCompactTo(segment) - s.delCollection(segment) + s.removeSecondaryIndex(segment) } s.segments[segmentID] = segment - s.addCollection(segment) + s.addSecondaryIndex(segment) s.addCompactTo(segment) } @@ -296,27 +318,35 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo { return cloned } -func (s *SegmentsInfo) addCollection(segment *SegmentInfo) { +func (s *SegmentsInfo) addSecondaryIndex(segment *SegmentInfo) { collID := segment.GetCollectionID() - collSegment, ok := s.collSegments[collID] - if !ok { - collSegment = &CollectionSegments{ - segments: make(map[UniqueID]*SegmentInfo), - } - s.collSegments[collID] = collSegment + channel := segment.GetInsertChannel() + if _, ok := s.secondaryIndexes.coll2Segments[collID]; !ok { + s.secondaryIndexes.coll2Segments[collID] = make(map[UniqueID]*SegmentInfo) + } + s.secondaryIndexes.coll2Segments[collID][segment.ID] = segment + + if _, ok := s.secondaryIndexes.channel2Segments[channel]; !ok { + s.secondaryIndexes.channel2Segments[channel] = make(map[UniqueID]*SegmentInfo) } - collSegment.segments[segment.GetID()] = segment + s.secondaryIndexes.channel2Segments[channel][segment.ID] = segment } -func (s *SegmentsInfo) delCollection(segment *SegmentInfo) { +func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) { collID := segment.GetCollectionID() - collSegment, ok := s.collSegments[collID] - if !ok { - return + channel := segment.GetInsertChannel() + if segments, ok := s.secondaryIndexes.coll2Segments[collID]; ok { + delete(segments, segment.ID) + if len(segments) == 0 { + delete(s.secondaryIndexes.coll2Segments, collID) + } } - delete(collSegment.segments, segment.GetID()) - if len(collSegment.segments) == 0 { - delete(s.collSegments, segment.GetCollectionID()) + + if segments, ok := s.secondaryIndexes.channel2Segments[channel]; ok { + delete(segments, segment.ID) + if len(segments) == 0 { + delete(s.secondaryIndexes.channel2Segments, channel) + } } } diff --git a/internal/datacoord/segment_operator.go b/internal/datacoord/segment_operator.go index 2d26f6d03d7d..d31d1a4c3d8e 100644 --- a/internal/datacoord/segment_operator.go +++ b/internal/datacoord/segment_operator.go @@ -31,6 +31,7 @@ func SetMaxRowCount(maxRow int64) SegmentOperator { type segmentCriterion struct { collectionID int64 + channel string others []SegmentFilter } @@ -62,6 +63,21 @@ func WithCollection(collectionID int64) SegmentFilter { return CollectionFilter(collectionID) } +type ChannelFilter string + +func (f ChannelFilter) Match(segment *SegmentInfo) bool { + return segment.GetInsertChannel() == string(f) +} + +func (f ChannelFilter) AddFilter(criterion *segmentCriterion) { + criterion.channel = string(f) +} + +// WithChannel WithCollection has a higher priority if both WithCollection and WithChannel are in condition together. +func WithChannel(channel string) SegmentFilter { + return ChannelFilter(channel) +} + type SegmentFilterFunc func(*SegmentInfo) bool func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool { @@ -71,9 +87,3 @@ func (f SegmentFilterFunc) Match(segment *SegmentInfo) bool { func (f SegmentFilterFunc) AddFilter(criterion *segmentCriterion) { criterion.others = append(criterion.others, f) } - -func WithChannel(channel string) SegmentFilter { - return SegmentFilterFunc(func(si *SegmentInfo) bool { - return si.GetInsertChannel() == channel - }) -}