From b1dc35e23d942b84a4384f74fbd0ac9a3e7587dc Mon Sep 17 00:00:00 2001 From: wayblink Date: Tue, 25 Jun 2024 18:52:04 +0800 Subject: [PATCH] Add an option to enable/disable vector field clustering key (#34097) #30633 Signed-off-by: wayblink --- configs/milvus.yaml | 2 + internal/proxy/task.go | 4 ++ internal/proxy/task_test.go | 35 +++++++++++++ .../delegator/segment_pruner_test.go | 1 + internal/util/clustering/clustering.go | 9 +++- pkg/util/merr/errors.go | 15 +++--- pkg/util/merr/errors_test.go | 1 + pkg/util/merr/utils.go | 10 ++++ pkg/util/paramtable/component_param.go | 49 +++++++++++-------- pkg/util/paramtable/component_param_test.go | 2 + 10 files changed, 100 insertions(+), 28 deletions(-) diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 6c63cfb5ff29..991880ff6654 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -649,8 +649,10 @@ common: traceLogMode: 0 # trace request info bloomFilterSize: 100000 # bloom filter initial size maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter + # clustering key/compaction related usePartitionKeyAsClusteringKey: false useVectorAsClusteringKey: false + enableVectorClusteringKey: false # QuotaConfig, configurations of Milvus quota and limits. # By default, we enable: diff --git a/internal/proxy/task.go b/internal/proxy/task.go index 87fe4bd195a6..5824bce1037b 100644 --- a/internal/proxy/task.go +++ b/internal/proxy/task.go @@ -248,6 +248,10 @@ func (t *createCollectionTask) validateClusteringKey() error { idx := -1 for i, field := range t.schema.Fields { if field.GetIsClusteringKey() { + if typeutil.IsVectorType(field.GetDataType()) && + !paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() { + return merr.WrapErrCollectionVectorClusteringKeyNotAllowed(t.CollectionName) + } if idx != -1 { return merr.WrapErrCollectionIllegalSchema(t.CollectionName, fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name)) diff --git a/internal/proxy/task_test.go b/internal/proxy/task_test.go index 8d80338417a1..17644e99d98d 100644 --- a/internal/proxy/task_test.go +++ b/internal/proxy/task_test.go @@ -3568,6 +3568,41 @@ func TestClusteringKey(t *testing.T) { err = createCollectionTask.PreExecute(ctx) assert.Error(t, err) }) + + t.Run("create collection with vector clustering key", func(t *testing.T) { + fieldName2Type := make(map[string]schemapb.DataType) + fieldName2Type["int64_field"] = schemapb.DataType_Int64 + fieldName2Type["varChar_field"] = schemapb.DataType_VarChar + schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false) + clusterKeyField := &schemapb.FieldSchema{ + Name: "vec_field", + DataType: schemapb.DataType_FloatVector, + IsClusteringKey: true, + } + schema.Fields = append(schema.Fields, clusterKeyField) + marshaledSchema, err := proto.Marshal(schema) + assert.NoError(t, err) + + createCollectionTask := &createCollectionTask{ + Condition: NewTaskCondition(ctx), + CreateCollectionRequest: &milvuspb.CreateCollectionRequest{ + Base: &commonpb.MsgBase{ + MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()), + Timestamp: Timestamp(time.Now().UnixNano()), + }, + DbName: "", + CollectionName: collectionName, + Schema: marshaledSchema, + ShardsNum: shardsNum, + }, + ctx: ctx, + rootCoord: rc, + result: nil, + schema: nil, + } + err = createCollectionTask.PreExecute(ctx) + assert.Error(t, err) + }) } func TestAlterCollectionCheckLoaded(t *testing.T) { diff --git a/internal/querynodev2/delegator/segment_pruner_test.go b/internal/querynodev2/delegator/segment_pruner_test.go index 555d80eefc6a..a6bb8d934d8d 100644 --- a/internal/querynodev2/delegator/segment_pruner_test.go +++ b/internal/querynodev2/delegator/segment_pruner_test.go @@ -380,6 +380,7 @@ func vector2Placeholder(vectors [][]float32) *commonpb.PlaceholderValue { func (sps *SegmentPrunerSuite) TestPruneSegmentsByVectorField() { paramtable.Init() + paramtable.Get().Save(paramtable.Get().CommonCfg.EnableVectorClusteringKey.Key, "true") sps.SetupForClustering("vec", schemapb.DataType_FloatVector) vector1 := []float32{0.8877872002188053, 0.6131822285635065, 0.8476814632326242, 0.6645877829359371, 0.9962627712600025, 0.8976183052440327, 0.41941169325798844, 0.7554387854258499} vector2 := []float32{0.8644394874390322, 0.023327886647378615, 0.08330118483461302, 0.7068040179963112, 0.6983994910799851, 0.5562075958994153, 0.3288536247938002, 0.07077341010237759} diff --git a/internal/util/clustering/clustering.go b/internal/util/clustering/clustering.go index b9859922332d..20b6636bca6a 100644 --- a/internal/util/clustering/clustering.go +++ b/internal/util/clustering/clustering.go @@ -8,6 +8,7 @@ import ( "github.com/milvus-io/milvus/pkg/util/distance" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/paramtable" + "github.com/milvus-io/milvus/pkg/util/typeutil" ) func CalcVectorDistance(dim int64, dataType schemapb.DataType, left []byte, right []float32, metric string) ([]float32, error) { @@ -70,10 +71,16 @@ func GetClusteringKeyField(collectionSchema *schemapb.CollectionSchema) *schemap // in some server mode, we regard partition key field or vector field as clustering key by default. // here is the priority: clusteringKey > partitionKey > vector field(only single vector) if clusteringKeyField != nil { + if typeutil.IsVectorType(clusteringKeyField.GetDataType()) && + !paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() { + return nil + } return clusteringKeyField } else if paramtable.Get().CommonCfg.UsePartitionKeyAsClusteringKey.GetAsBool() && partitionKeyField != nil { return partitionKeyField - } else if paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && len(vectorFields) == 1 { + } else if paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() && + paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && + len(vectorFields) == 1 { return vectorFields[0] } return nil diff --git a/pkg/util/merr/errors.go b/pkg/util/merr/errors.go index a40d041ea7e3..78034ca1ab3d 100644 --- a/pkg/util/merr/errors.go +++ b/pkg/util/merr/errors.go @@ -46,13 +46,14 @@ var ( ErrServiceResourceInsufficient = newMilvusError("service resource insufficient", 12, true) // Collection related - ErrCollectionNotFound = newMilvusError("collection not found", 100, false) - ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) - ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) - ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) - ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) - ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) - ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) + ErrCollectionNotFound = newMilvusError("collection not found", 100, false) + ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false) + ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false) + ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true) + ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false) + ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false) + ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true) + ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false) // Partition related ErrPartitionNotFound = newMilvusError("partition not found", 200, false) diff --git a/pkg/util/merr/errors_test.go b/pkg/util/merr/errors_test.go index 67782d1c507e..125a2e72f91a 100644 --- a/pkg/util/merr/errors_test.go +++ b/pkg/util/merr/errors_test.go @@ -87,6 +87,7 @@ func (s *ErrSuite) TestWrap() { s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded) s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded) s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering) + s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed) // Partition related s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound) diff --git a/pkg/util/merr/utils.go b/pkg/util/merr/utils.go index 1b2b21fdefbf..c61a4cc92f23 100644 --- a/pkg/util/merr/utils.go +++ b/pkg/util/merr/utils.go @@ -489,6 +489,16 @@ func WrapErrCollectionOnRecovering(collection any, msgAndArgs ...any) error { return err } +// WrapErrCollectionVectorClusteringKeyNotAllowed wraps ErrCollectionVectorClusteringKeyNotAllowed with collection +func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs ...any) error { + err := wrapFields(ErrCollectionVectorClusteringKeyNotAllowed, value("collection", collection)) + if len(msgAndArgs) > 0 { + msg := msgAndArgs[0].(string) + err = errors.Wrapf(err, msg, msgAndArgs[1:]...) + } + return err +} + func WrapErrAliasNotFound(db any, alias any, msg ...string) error { err := wrapFields(ErrAliasNotFound, value("database", db), diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 739753901d22..2ccc38cd4253 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -249,6 +249,7 @@ type commonConfig struct { UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"` UseVectorAsClusteringKey ParamItem `refreshable:"true"` + EnableVectorClusteringKey ParamItem `refreshable:"true"` } func (p *commonConfig) init(base *BaseTable) { @@ -757,7 +758,7 @@ like the old password verification when updating the credential`, p.UsePartitionKeyAsClusteringKey = ParamItem{ Key: "common.usePartitionKeyAsClusteringKey", - Version: "2.4.2", + Version: "2.4.6", Doc: "if true, do clustering compaction and segment prune on partition key field", DefaultValue: "false", } @@ -765,11 +766,19 @@ like the old password verification when updating the credential`, p.UseVectorAsClusteringKey = ParamItem{ Key: "common.useVectorAsClusteringKey", - Version: "2.4.2", + Version: "2.4.6", Doc: "if true, do clustering compaction and segment prune on vector field", DefaultValue: "false", } p.UseVectorAsClusteringKey.Init(base.mgr) + + p.EnableVectorClusteringKey = ParamItem{ + Key: "common.enableVectorClusteringKey", + Version: "2.4.6", + Doc: "if true, enable vector clustering key and vector clustering compaction", + DefaultValue: "false", + } + p.EnableVectorClusteringKey.Init(base.mgr) } type gpuConfig struct { @@ -3260,7 +3269,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionEnable = ParamItem{ Key: "dataCoord.compaction.clustering.enable", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "false", Doc: "Enable clustering compaction", Export: true, @@ -3269,7 +3278,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionAutoEnable = ParamItem{ Key: "dataCoord.compaction.clustering.autoEnable", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "false", Doc: "Enable auto clustering compaction", Export: true, @@ -3278,28 +3287,28 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTriggerInterval = ParamItem{ Key: "dataCoord.compaction.clustering.triggerInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "600", } p.ClusteringCompactionTriggerInterval.Init(base.mgr) p.ClusteringCompactionStateCheckInterval = ParamItem{ Key: "dataCoord.compaction.clustering.stateCheckInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10", } p.ClusteringCompactionStateCheckInterval.Init(base.mgr) p.ClusteringCompactionGCInterval = ParamItem{ Key: "dataCoord.compaction.clustering.gcInterval", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "600", } p.ClusteringCompactionGCInterval.Init(base.mgr) p.ClusteringCompactionMinInterval = ParamItem{ Key: "dataCoord.compaction.clustering.minInterval", - Version: "2.4.2", + Version: "2.4.6", Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction", DefaultValue: "3600", } @@ -3307,7 +3316,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxInterval = ParamItem{ Key: "dataCoord.compaction.clustering.maxInterval", - Version: "2.4.2", + Version: "2.4.6", Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact", DefaultValue: "86400", } @@ -3315,7 +3324,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionNewDataSizeThreshold = ParamItem{ Key: "dataCoord.compaction.clustering.newDataSizeThreshold", - Version: "2.4.2", + Version: "2.4.6", Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction", DefaultValue: "512m", } @@ -3323,14 +3332,14 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionTimeoutInSeconds = ParamItem{ Key: "dataCoord.compaction.clustering.timeout", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "3600", } p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr) p.ClusteringCompactionDropTolerance = ParamItem{ Key: "dataCoord.compaction.clustering.dropTolerance", - Version: "2.4.2", + Version: "2.4.6", Doc: "If clustering compaction job is finished for a long time, gc it", DefaultValue: "259200", } @@ -3338,7 +3347,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionPreferSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.preferSegmentSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "512m", PanicIfEmpty: false, Export: true, @@ -3347,7 +3356,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxSegmentSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxSegmentSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "1024m", PanicIfEmpty: false, Export: true, @@ -3356,7 +3365,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxTrainSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "0.8", Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit", Export: true, @@ -3365,7 +3374,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.maxCentroidsNum", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10240", Doc: "maximum centroids number in Kmeans train", Export: true, @@ -3374,7 +3383,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinCentroidsNum = ParamItem{ Key: "dataCoord.compaction.clustering.minCentroidsNum", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "16", Doc: "minimum centroids number in Kmeans train", Export: true, @@ -3383,7 +3392,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMinClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.minClusterSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "0.01", Doc: "minimum cluster size / avg size in Kmeans train", Export: true, @@ -3392,7 +3401,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSizeRatio", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "10", Doc: "maximum cluster size / avg size in Kmeans train", Export: true, @@ -3401,7 +3410,7 @@ During compaction, the size of segment # of rows is able to exceed segment max # p.ClusteringCompactionMaxClusterSize = ParamItem{ Key: "dataCoord.compaction.clustering.maxClusterSize", - Version: "2.4.2", + Version: "2.4.6", DefaultValue: "5g", Doc: "maximum cluster size in Kmeans train", Export: true, diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 21f2e0218989..8968d3ab3b52 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -542,6 +542,8 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, true, Params.UsePartitionKeyAsClusteringKey.GetAsBool()) params.Save("common.useVectorAsClusteringKey", "true") assert.Equal(t, true, Params.UseVectorAsClusteringKey.GetAsBool()) + params.Save("common.enableVectorClusteringKey", "true") + assert.Equal(t, true, Params.EnableVectorClusteringKey.GetAsBool()) }) }