Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid acquire index meta's lock for each segment (#31723) #31787

Merged
merged 1 commit into from Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
51 changes: 25 additions & 26 deletions internal/datacoord/index_meta.go
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/metrics"
"github.com/milvus-io/milvus/pkg/util/timerecord"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type indexMeta struct {
Expand Down Expand Up @@ -383,42 +384,40 @@ func (m *indexMeta) GetSegmentIndexState(collID, segmentID UniqueID, indexID Uni
return state
}

func (m *indexMeta) GetSegmentIndexStateOnField(collID, segmentID, fieldID UniqueID) *indexpb.SegmentIndexState {
func (m *indexMeta) GetIndexedSegments(collectionID int64, fieldIDs []UniqueID) []int64 {
m.RLock()
defer m.RUnlock()

state := &indexpb.SegmentIndexState{
SegmentID: segmentID,
State: commonpb.IndexState_IndexStateNone,
FailReason: "",
}
fieldIndexes, ok := m.indexes[collID]
fieldIndexes, ok := m.indexes[collectionID]
if !ok {
state.FailReason = fmt.Sprintf("collection not exist with ID: %d", collID)
return state
return nil
}

indexes, ok := m.segmentIndexes[segmentID]
if !ok {
state.FailReason = fmt.Sprintf("segment index not exist with ID: %d", segmentID)
state.State = commonpb.IndexState_Unissued
return state
}
fieldIDSet := typeutil.NewUniqueSet(fieldIDs...)

for indexID, index := range fieldIndexes {
if index.FieldID == fieldID && !index.IsDeleted {
if segIdx, ok := indexes[indexID]; ok {
state.IndexName = index.IndexName
state.State = segIdx.IndexState
state.FailReason = segIdx.FailReason
return state
checkSegmentState := func(indexes map[int64]*model.SegmentIndex) bool {
indexedFields := 0
for indexID, index := range fieldIndexes {
if !fieldIDSet.Contain(index.FieldID) || index.IsDeleted {
continue
}

if segIdx, ok := indexes[indexID]; ok && segIdx.IndexState == commonpb.IndexState_Finished {
indexedFields += 1
}
state.State = commonpb.IndexState_Unissued
return state
}

return indexedFields == fieldIDSet.Len()
}
state.FailReason = fmt.Sprintf("there is no index on fieldID: %d", fieldID)
return state

ret := make([]int64, 0)
for sid, indexes := range m.segmentIndexes {
if checkSegmentState(indexes) {
ret = append(ret, sid)
}
}

return ret
}

// GetIndexesForCollection gets all indexes info with the specified collection.
Expand Down
19 changes: 7 additions & 12 deletions internal/datacoord/index_meta_test.go
Expand Up @@ -533,7 +533,7 @@ func TestMeta_GetSegmentIndexState(t *testing.T) {
})
}

func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
func TestMeta_GetIndexedSegment(t *testing.T) {
var (
collID = UniqueID(1)
partID = UniqueID(2)
Expand Down Expand Up @@ -614,23 +614,18 @@ func TestMeta_GetSegmentIndexStateOnField(t *testing.T) {
}

t.Run("success", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID)
assert.Equal(t, commonpb.IndexState_Finished, state.GetState())
segments := m.GetIndexedSegments(collID, []int64{fieldID})
assert.Len(t, segments, 1)
})

t.Run("no index on field", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
segments := m.GetIndexedSegments(collID, []int64{fieldID + 1})
assert.Len(t, segments, 0)
})

t.Run("no index", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID+1, segID, fieldID+1)
assert.Equal(t, commonpb.IndexState_IndexStateNone, state.GetState())
})

t.Run("segment not exist", func(t *testing.T) {
state := m.GetSegmentIndexStateOnField(collID, segID+1, fieldID)
assert.Equal(t, commonpb.IndexState_Unissued, state.GetState())
segments := m.GetIndexedSegments(collID+1, []int64{fieldID})
assert.Len(t, segments, 0)
})
}

Expand Down
47 changes: 22 additions & 25 deletions internal/datacoord/util.go
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -70,49 +71,45 @@ func FilterInIndexedSegments(handler Handler, mt *meta, segments ...*SegmentInfo
return nil
}

segmentMap := make(map[int64]*SegmentInfo)
collectionSegments := make(map[int64][]int64)
collectionSegments := lo.GroupBy(segments, func(segment *SegmentInfo) int64 {
return segment.GetCollectionID()
})

vecFieldIDs := make(map[int64][]int64)
for _, segment := range segments {
collectionID := segment.GetCollectionID()
segmentMap[segment.GetID()] = segment
collectionSegments[collectionID] = append(collectionSegments[collectionID], segment.GetID())
}
for collection := range collectionSegments {
ret := make([]*SegmentInfo, 0)
for collection, segmentList := range collectionSegments {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
coll, err := handler.GetCollection(ctx, collection)
cancel()
if err != nil {
log.Warn("failed to get collection schema", zap.Error(err))
continue
}

// get vector field id
vecFieldIDs := make([]int64, 0)
for _, field := range coll.Schema.GetFields() {
if typeutil.IsVectorType(field.GetDataType()) {
vecFieldIDs[collection] = append(vecFieldIDs[collection], field.GetFieldID())
vecFieldIDs = append(vecFieldIDs, field.GetFieldID())
}
}
}

indexedSegments := make([]*SegmentInfo, 0)
for _, segment := range segments {
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
continue
}
// get indexed segments which finish build index on all vector field
indexed := mt.indexMeta.GetIndexedSegments(collection, vecFieldIDs)
if len(indexed) > 0 {
indexedSet := typeutil.NewUniqueSet(indexed...)
for _, segment := range segmentList {
if !isFlushState(segment.GetState()) && segment.GetState() != commonpb.SegmentState_Dropped {
continue
}

hasUnindexedVecField := false
for _, fieldID := range vecFieldIDs[segment.GetCollectionID()] {
segmentIndexState := mt.indexMeta.GetSegmentIndexStateOnField(segment.GetCollectionID(), segment.GetID(), fieldID)
if segmentIndexState.State != commonpb.IndexState_Finished {
hasUnindexedVecField = true
if indexedSet.Contain(segment.GetID()) {
ret = append(ret, segment)
}
}
}
if !hasUnindexedVecField {
indexedSegments = append(indexedSegments, segment)
}
}

return indexedSegments
return ret
}

func getZeroTime() time.Time {
Expand Down