Skip to content

Commit

Permalink
enhance: decrease cpu overhead during filter segments on datacoord (#…
Browse files Browse the repository at this point in the history
…33130)

issue: #33129

Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 committed May 28, 2024
1 parent 6b3e42f commit 3d29907
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 69 deletions.
73 changes: 51 additions & 22 deletions internal/datacoord/compaction_trigger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2197,34 +2197,63 @@ func (s *CompactionTriggerSuite) SetupTest() {
catalog := mocks.NewDataCoordCatalog(s.T())
catalog.EXPECT().SaveChannelCheckpoint(mock.Anything, s.channel, mock.Anything).Return(nil)

seg1 := &SegmentInfo{
SegmentInfo: s.genSeg(1, 60),
lastFlushTime: time.Now().Add(-100 * time.Minute),
}
seg2 := &SegmentInfo{
SegmentInfo: s.genSeg(2, 60),
lastFlushTime: time.Now(),
}
seg3 := &SegmentInfo{
SegmentInfo: s.genSeg(3, 60),
lastFlushTime: time.Now(),
}
seg4 := &SegmentInfo{
SegmentInfo: s.genSeg(4, 60),
lastFlushTime: time.Now(),
}
seg5 := &SegmentInfo{
SegmentInfo: s.genSeg(5, 60),
lastFlushTime: time.Now(),
}
seg6 := &SegmentInfo{
SegmentInfo: s.genSeg(6, 60),
lastFlushTime: time.Now(),
}

s.meta = &meta{
channelCPs: newChannelCps(),
catalog: catalog,
segments: &SegmentsInfo{
segments: map[int64]*SegmentInfo{
1: {
SegmentInfo: s.genSeg(1, 60),
lastFlushTime: time.Now().Add(-100 * time.Minute),
},
2: {
SegmentInfo: s.genSeg(2, 60),
lastFlushTime: time.Now(),
},
3: {
SegmentInfo: s.genSeg(3, 60),
lastFlushTime: time.Now(),
},
4: {
SegmentInfo: s.genSeg(4, 60),
lastFlushTime: time.Now(),
},
5: {
SegmentInfo: s.genSeg(5, 26),
lastFlushTime: time.Now(),
1: seg1,
2: seg2,
3: seg3,
4: seg4,
5: seg5,
6: seg6,
},
secondaryIndexes: segmentInfoIndexes{
coll2Segments: map[UniqueID]map[UniqueID]*SegmentInfo{
s.collectionID: {
1: seg1,
2: seg2,
3: seg3,
4: seg4,
5: seg5,
6: seg6,
},
},
6: {
SegmentInfo: s.genSeg(6, 26),
lastFlushTime: time.Now(),
channel2Segments: map[string]map[UniqueID]*SegmentInfo{
s.channel: {
1: seg1,
2: seg2,
3: seg3,
4: seg4,
5: seg5,
6: seg6,
},
},
},
},
Expand Down
9 changes: 8 additions & 1 deletion internal/datacoord/garbage_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,14 @@ func createMetaForRecycleUnusedSegIndexes(catalog metastore.DataCoordCatalog) *m
},
},
segID + 1: {
SegmentInfo: nil,
SegmentInfo: &datapb.SegmentInfo{
ID: segID + 1,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "",
NumOfRows: 1026,
State: commonpb.SegmentState_Dropped,
},
},
}
meta := &meta{
Expand Down
7 changes: 6 additions & 1 deletion internal/datacoord/index_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,12 @@ func TestServer_GetSegmentIndexState(t *testing.T) {
WriteHandoff: false,
})
s.meta.segments.SetSegment(segID, &SegmentInfo{
SegmentInfo: nil,
SegmentInfo: &datapb.SegmentInfo{
ID: segID,
CollectionID: collID,
PartitionID: partID,
InsertChannel: "ch",
},
currRows: 0,
allocations: nil,
lastFlushTime: time.Time{},
Expand Down
92 changes: 92 additions & 0 deletions internal/datacoord/meta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/cockroachdb/errors"
"github.com/golang/protobuf/proto"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -982,6 +983,97 @@ func Test_meta_GetSegmentsOfCollection(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, expected, gotInfo.GetState())
}

got = m.GetSegmentsOfCollection(-1)
assert.Equal(t, 3, len(got))

got = m.GetSegmentsOfCollection(10)
assert.Equal(t, 0, len(got))
}

func Test_meta_GetSegmentsWithChannel(t *testing.T) {
storedSegments := NewSegmentsInfo()
for segID, segment := range map[int64]*SegmentInfo{
1: {
SegmentInfo: &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
InsertChannel: "h1",
State: commonpb.SegmentState_Flushed,
},
},
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 1,
InsertChannel: "h2",
State: commonpb.SegmentState_Growing,
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
State: commonpb.SegmentState_Flushed,
InsertChannel: "h1",
},
},
} {
storedSegments.SetSegment(segID, segment)
}
m := &meta{segments: storedSegments}
got := m.GetSegmentsByChannel("h1")
assert.Equal(t, 2, len(got))
assert.ElementsMatch(t, []int64{1, 3}, lo.Map(
got,
func(s *SegmentInfo, i int) int64 {
return s.ID
},
))

got = m.GetSegmentsByChannel("h3")
assert.Equal(t, 0, len(got))

got = m.SelectSegments(WithCollection(1), WithChannel("h1"), SegmentFilterFunc(func(segment *SegmentInfo) bool {
return segment != nil && segment.GetState() == commonpb.SegmentState_Flushed
}))
assert.Equal(t, 1, len(got))
assert.ElementsMatch(t, []int64{1}, lo.Map(
got,
func(s *SegmentInfo, i int) int64 {
return s.ID
},
))

m.segments.DropSegment(3)
_, ok := m.segments.secondaryIndexes.coll2Segments[2]
assert.False(t, ok)
assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments))
assert.Equal(t, 2, len(m.segments.secondaryIndexes.channel2Segments))

segments, ok := m.segments.secondaryIndexes.channel2Segments["h1"]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[1].ID)
segments, ok = m.segments.secondaryIndexes.channel2Segments["h2"]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(2), segments[2].ID)

m.segments.DropSegment(2)
segments, ok = m.segments.secondaryIndexes.coll2Segments[1]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[1].ID)
assert.Equal(t, 1, len(m.segments.secondaryIndexes.coll2Segments))
assert.Equal(t, 1, len(m.segments.secondaryIndexes.channel2Segments))

segments, ok = m.segments.secondaryIndexes.channel2Segments["h1"]
assert.True(t, ok)
assert.Equal(t, 1, len(segments))
assert.Equal(t, int64(1), segments[1].ID)
_, ok = m.segments.secondaryIndexes.channel2Segments["h2"]
assert.False(t, ok)
}

func TestMeta_HasSegments(t *testing.T) {
Expand Down
108 changes: 69 additions & 39 deletions internal/datacoord/segment_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,17 @@ import (

// SegmentsInfo wraps a map, which maintains ID to SegmentInfo relation
type SegmentsInfo struct {
segments map[UniqueID]*SegmentInfo
collSegments map[UniqueID]*CollectionSegments
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
segments map[UniqueID]*SegmentInfo
secondaryIndexes segmentInfoIndexes
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
// A segment can be compacted to only one segment finally in meta.
}

type segmentInfoIndexes struct {
coll2Segments map[UniqueID]map[UniqueID]*SegmentInfo
channel2Segments map[string]map[UniqueID]*SegmentInfo
}

// SegmentInfo wraps datapb.SegmentInfo and patches some extra info on it
type SegmentInfo struct {
*datapb.SegmentInfo
Expand Down Expand Up @@ -69,16 +74,15 @@ func NewSegmentInfo(info *datapb.SegmentInfo) *SegmentInfo {
// note that no mutex is wrapped so external concurrent control is needed
func NewSegmentsInfo() *SegmentsInfo {
return &SegmentsInfo{
segments: make(map[UniqueID]*SegmentInfo),
collSegments: make(map[UniqueID]*CollectionSegments),
segments: make(map[UniqueID]*SegmentInfo),
secondaryIndexes: segmentInfoIndexes{
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
channel2Segments: make(map[string]map[UniqueID]*SegmentInfo),
},
compactionTo: make(map[UniqueID]UniqueID),
}
}

type CollectionSegments struct {
segments map[int64]*SegmentInfo
}

// GetSegment returns SegmentInfo
// the logPath in meta is empty
func (s *SegmentsInfo) GetSegment(segmentID UniqueID) *SegmentInfo {
Expand All @@ -96,24 +100,42 @@ func (s *SegmentsInfo) GetSegments() []*SegmentInfo {
return lo.Values(s.segments)
}

func (s *SegmentsInfo) getCandidates(criterion *segmentCriterion) map[UniqueID]*SegmentInfo {
if criterion.collectionID > 0 {
collSegments, ok := s.secondaryIndexes.coll2Segments[criterion.collectionID]
if !ok {
return nil
}

// both collection id and channel are filters of criterion
if criterion.channel != "" {
return lo.OmitBy(collSegments, func(k UniqueID, v *SegmentInfo) bool {
return v.InsertChannel != criterion.channel
})
}
return collSegments
}

if criterion.channel != "" {
channelSegments, ok := s.secondaryIndexes.channel2Segments[criterion.channel]
if !ok {
return nil
}
return channelSegments
}

return s.segments
}

func (s *SegmentsInfo) GetSegmentsBySelector(filters ...SegmentFilter) []*SegmentInfo {
criterion := &segmentCriterion{}
for _, filter := range filters {
filter.AddFilter(criterion)
}
var result []*SegmentInfo
var candidates map[int64]*SegmentInfo

// apply criterion
switch {
case criterion.collectionID > 0:
collSegments, ok := s.collSegments[criterion.collectionID]
if !ok {
return nil
}
candidates = collSegments.segments
default:
candidates = s.segments
}
candidates := s.getCandidates(criterion)
var result []*SegmentInfo
for _, segment := range candidates {
if criterion.Match(segment) {
result = append(result, segment)
Expand Down Expand Up @@ -144,7 +166,7 @@ func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool)
func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
if segment, ok := s.segments[segmentID]; ok {
s.deleteCompactTo(segment)
s.delCollection(segment)
s.removeSecondaryIndex(segment)
delete(s.segments, segmentID)
}
}
Expand All @@ -156,10 +178,10 @@ func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
if segment, ok := s.segments[segmentID]; ok {
// Remove old segment compact to relation first.
s.deleteCompactTo(segment)
s.delCollection(segment)
s.removeSecondaryIndex(segment)
}
s.segments[segmentID] = segment
s.addCollection(segment)
s.addSecondaryIndex(segment)
s.addCompactTo(segment)
}

Expand Down Expand Up @@ -296,27 +318,35 @@ func (s *SegmentInfo) ShadowClone(opts ...SegmentInfoOption) *SegmentInfo {
return cloned
}

func (s *SegmentsInfo) addCollection(segment *SegmentInfo) {
func (s *SegmentsInfo) addSecondaryIndex(segment *SegmentInfo) {
collID := segment.GetCollectionID()
collSegment, ok := s.collSegments[collID]
if !ok {
collSegment = &CollectionSegments{
segments: make(map[UniqueID]*SegmentInfo),
}
s.collSegments[collID] = collSegment
channel := segment.GetInsertChannel()
if _, ok := s.secondaryIndexes.coll2Segments[collID]; !ok {
s.secondaryIndexes.coll2Segments[collID] = make(map[UniqueID]*SegmentInfo)
}
s.secondaryIndexes.coll2Segments[collID][segment.ID] = segment

if _, ok := s.secondaryIndexes.channel2Segments[channel]; !ok {
s.secondaryIndexes.channel2Segments[channel] = make(map[UniqueID]*SegmentInfo)
}
collSegment.segments[segment.GetID()] = segment
s.secondaryIndexes.channel2Segments[channel][segment.ID] = segment
}

func (s *SegmentsInfo) delCollection(segment *SegmentInfo) {
func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) {
collID := segment.GetCollectionID()
collSegment, ok := s.collSegments[collID]
if !ok {
return
channel := segment.GetInsertChannel()
if segments, ok := s.secondaryIndexes.coll2Segments[collID]; ok {
delete(segments, segment.ID)
if len(segments) == 0 {
delete(s.secondaryIndexes.coll2Segments, collID)
}
}
delete(collSegment.segments, segment.GetID())
if len(collSegment.segments) == 0 {
delete(s.collSegments, segment.GetCollectionID())

if segments, ok := s.secondaryIndexes.channel2Segments[channel]; ok {
delete(segments, segment.ID)
if len(segments) == 0 {
delete(s.secondaryIndexes.channel2Segments, channel)
}
}
}

Expand Down
Loading

0 comments on commit 3d29907

Please sign in to comment.