Skip to content

Commit

Permalink
Add an option to enable/disable vector field clustering key (milvus-i…
Browse files Browse the repository at this point in the history
…o#34097)

milvus-io#30633

Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Jul 2, 2024
1 parent 7dff037 commit b1dc35e
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 28 deletions.
2 changes: 2 additions & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -649,8 +649,10 @@ common:
traceLogMode: 0 # trace request info
bloomFilterSize: 100000 # bloom filter initial size
maxBloomFalsePositive: 0.001 # max false positive rate for bloom filter
# clustering key/compaction related
usePartitionKeyAsClusteringKey: false
useVectorAsClusteringKey: false
enableVectorClusteringKey: false

# QuotaConfig, configurations of Milvus quota and limits.
# By default, we enable:
Expand Down
4 changes: 4 additions & 0 deletions internal/proxy/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (t *createCollectionTask) validateClusteringKey() error {
idx := -1
for i, field := range t.schema.Fields {
if field.GetIsClusteringKey() {
if typeutil.IsVectorType(field.GetDataType()) &&
!paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() {
return merr.WrapErrCollectionVectorClusteringKeyNotAllowed(t.CollectionName)
}
if idx != -1 {
return merr.WrapErrCollectionIllegalSchema(t.CollectionName,
fmt.Sprintf("there are more than one clustering key, field name = %s, %s", t.schema.Fields[idx].Name, field.Name))
Expand Down
35 changes: 35 additions & 0 deletions internal/proxy/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3568,6 +3568,41 @@ func TestClusteringKey(t *testing.T) {
err = createCollectionTask.PreExecute(ctx)
assert.Error(t, err)
})

t.Run("create collection with vector clustering key", func(t *testing.T) {
fieldName2Type := make(map[string]schemapb.DataType)
fieldName2Type["int64_field"] = schemapb.DataType_Int64
fieldName2Type["varChar_field"] = schemapb.DataType_VarChar
schema := constructCollectionSchemaByDataType(collectionName, fieldName2Type, "int64_field", false)
clusterKeyField := &schemapb.FieldSchema{
Name: "vec_field",
DataType: schemapb.DataType_FloatVector,
IsClusteringKey: true,
}
schema.Fields = append(schema.Fields, clusterKeyField)
marshaledSchema, err := proto.Marshal(schema)
assert.NoError(t, err)

createCollectionTask := &createCollectionTask{
Condition: NewTaskCondition(ctx),
CreateCollectionRequest: &milvuspb.CreateCollectionRequest{
Base: &commonpb.MsgBase{
MsgID: UniqueID(uniquegenerator.GetUniqueIntGeneratorIns().GetInt()),
Timestamp: Timestamp(time.Now().UnixNano()),
},
DbName: "",
CollectionName: collectionName,
Schema: marshaledSchema,
ShardsNum: shardsNum,
},
ctx: ctx,
rootCoord: rc,
result: nil,
schema: nil,
}
err = createCollectionTask.PreExecute(ctx)
assert.Error(t, err)
})
}

func TestAlterCollectionCheckLoaded(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions internal/querynodev2/delegator/segment_pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ func vector2Placeholder(vectors [][]float32) *commonpb.PlaceholderValue {

func (sps *SegmentPrunerSuite) TestPruneSegmentsByVectorField() {
paramtable.Init()
paramtable.Get().Save(paramtable.Get().CommonCfg.EnableVectorClusteringKey.Key, "true")
sps.SetupForClustering("vec", schemapb.DataType_FloatVector)
vector1 := []float32{0.8877872002188053, 0.6131822285635065, 0.8476814632326242, 0.6645877829359371, 0.9962627712600025, 0.8976183052440327, 0.41941169325798844, 0.7554387854258499}
vector2 := []float32{0.8644394874390322, 0.023327886647378615, 0.08330118483461302, 0.7068040179963112, 0.6983994910799851, 0.5562075958994153, 0.3288536247938002, 0.07077341010237759}
Expand Down
9 changes: 8 additions & 1 deletion internal/util/clustering/clustering.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/milvus-io/milvus/pkg/util/distance"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

func CalcVectorDistance(dim int64, dataType schemapb.DataType, left []byte, right []float32, metric string) ([]float32, error) {
Expand Down Expand Up @@ -70,10 +71,16 @@ func GetClusteringKeyField(collectionSchema *schemapb.CollectionSchema) *schemap
// in some server mode, we regard partition key field or vector field as clustering key by default.
// here is the priority: clusteringKey > partitionKey > vector field(only single vector)
if clusteringKeyField != nil {
if typeutil.IsVectorType(clusteringKeyField.GetDataType()) &&
!paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() {
return nil
}
return clusteringKeyField
} else if paramtable.Get().CommonCfg.UsePartitionKeyAsClusteringKey.GetAsBool() && partitionKeyField != nil {
return partitionKeyField
} else if paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() && len(vectorFields) == 1 {
} else if paramtable.Get().CommonCfg.EnableVectorClusteringKey.GetAsBool() &&
paramtable.Get().CommonCfg.UseVectorAsClusteringKey.GetAsBool() &&
len(vectorFields) == 1 {
return vectorFields[0]
}
return nil
Expand Down
15 changes: 8 additions & 7 deletions pkg/util/merr/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,14 @@ var (
ErrServiceResourceInsufficient = newMilvusError("service resource insufficient", 12, true)

// Collection related
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false)
ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false)
ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true)
ErrCollectionNotFound = newMilvusError("collection not found", 100, false)
ErrCollectionNotLoaded = newMilvusError("collection not loaded", 101, false)
ErrCollectionNumLimitExceeded = newMilvusError("exceeded the limit number of collections", 102, false)
ErrCollectionNotFullyLoaded = newMilvusError("collection not fully loaded", 103, true)
ErrCollectionLoaded = newMilvusError("collection already loaded", 104, false)
ErrCollectionIllegalSchema = newMilvusError("illegal collection schema", 105, false)
ErrCollectionOnRecovering = newMilvusError("collection on recovering", 106, true)
ErrCollectionVectorClusteringKeyNotAllowed = newMilvusError("vector clustering key not allowed", 107, false)

// Partition related
ErrPartitionNotFound = newMilvusError("partition not found", 200, false)
Expand Down
1 change: 1 addition & 0 deletions pkg/util/merr/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (s *ErrSuite) TestWrap() {
s.ErrorIs(WrapErrCollectionNotFullyLoaded("test_collection", "failed to query"), ErrCollectionNotFullyLoaded)
s.ErrorIs(WrapErrCollectionNotLoaded("test_collection", "failed to alter index %s", "hnsw"), ErrCollectionNotLoaded)
s.ErrorIs(WrapErrCollectionOnRecovering("test_collection", "channel lost %s", "dev"), ErrCollectionOnRecovering)
s.ErrorIs(WrapErrCollectionVectorClusteringKeyNotAllowed("test_collection", "field"), ErrCollectionVectorClusteringKeyNotAllowed)

// Partition related
s.ErrorIs(WrapErrPartitionNotFound("test_partition", "failed to get partition"), ErrPartitionNotFound)
Expand Down
10 changes: 10 additions & 0 deletions pkg/util/merr/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,16 @@ func WrapErrCollectionOnRecovering(collection any, msgAndArgs ...any) error {
return err
}

// WrapErrCollectionVectorClusteringKeyNotAllowed wraps ErrCollectionVectorClusteringKeyNotAllowed with collection
func WrapErrCollectionVectorClusteringKeyNotAllowed(collection any, msgAndArgs ...any) error {
err := wrapFields(ErrCollectionVectorClusteringKeyNotAllowed, value("collection", collection))
if len(msgAndArgs) > 0 {
msg := msgAndArgs[0].(string)
err = errors.Wrapf(err, msg, msgAndArgs[1:]...)
}
return err
}

func WrapErrAliasNotFound(db any, alias any, msg ...string) error {
err := wrapFields(ErrAliasNotFound,
value("database", db),
Expand Down
49 changes: 29 additions & 20 deletions pkg/util/paramtable/component_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ type commonConfig struct {

UsePartitionKeyAsClusteringKey ParamItem `refreshable:"true"`
UseVectorAsClusteringKey ParamItem `refreshable:"true"`
EnableVectorClusteringKey ParamItem `refreshable:"true"`
}

func (p *commonConfig) init(base *BaseTable) {
Expand Down Expand Up @@ -757,19 +758,27 @@ like the old password verification when updating the credential`,

p.UsePartitionKeyAsClusteringKey = ParamItem{
Key: "common.usePartitionKeyAsClusteringKey",
Version: "2.4.2",
Version: "2.4.6",
Doc: "if true, do clustering compaction and segment prune on partition key field",
DefaultValue: "false",
}
p.UsePartitionKeyAsClusteringKey.Init(base.mgr)

p.UseVectorAsClusteringKey = ParamItem{
Key: "common.useVectorAsClusteringKey",
Version: "2.4.2",
Version: "2.4.6",
Doc: "if true, do clustering compaction and segment prune on vector field",
DefaultValue: "false",
}
p.UseVectorAsClusteringKey.Init(base.mgr)

p.EnableVectorClusteringKey = ParamItem{
Key: "common.enableVectorClusteringKey",
Version: "2.4.6",
Doc: "if true, enable vector clustering key and vector clustering compaction",
DefaultValue: "false",
}
p.EnableVectorClusteringKey.Init(base.mgr)
}

type gpuConfig struct {
Expand Down Expand Up @@ -3260,7 +3269,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionEnable = ParamItem{
Key: "dataCoord.compaction.clustering.enable",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "false",
Doc: "Enable clustering compaction",
Export: true,
Expand All @@ -3269,7 +3278,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionAutoEnable = ParamItem{
Key: "dataCoord.compaction.clustering.autoEnable",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "false",
Doc: "Enable auto clustering compaction",
Export: true,
Expand All @@ -3278,67 +3287,67 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionTriggerInterval = ParamItem{
Key: "dataCoord.compaction.clustering.triggerInterval",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "600",
}
p.ClusteringCompactionTriggerInterval.Init(base.mgr)

p.ClusteringCompactionStateCheckInterval = ParamItem{
Key: "dataCoord.compaction.clustering.stateCheckInterval",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "10",
}
p.ClusteringCompactionStateCheckInterval.Init(base.mgr)

p.ClusteringCompactionGCInterval = ParamItem{
Key: "dataCoord.compaction.clustering.gcInterval",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "600",
}
p.ClusteringCompactionGCInterval.Init(base.mgr)

p.ClusteringCompactionMinInterval = ParamItem{
Key: "dataCoord.compaction.clustering.minInterval",
Version: "2.4.2",
Version: "2.4.6",
Doc: "The minimum interval between clustering compaction executions of one collection, to avoid redundant compaction",
DefaultValue: "3600",
}
p.ClusteringCompactionMinInterval.Init(base.mgr)

p.ClusteringCompactionMaxInterval = ParamItem{
Key: "dataCoord.compaction.clustering.maxInterval",
Version: "2.4.2",
Version: "2.4.6",
Doc: "If a collection haven't been clustering compacted for longer than maxInterval, force compact",
DefaultValue: "86400",
}
p.ClusteringCompactionMaxInterval.Init(base.mgr)

p.ClusteringCompactionNewDataSizeThreshold = ParamItem{
Key: "dataCoord.compaction.clustering.newDataSizeThreshold",
Version: "2.4.2",
Version: "2.4.6",
Doc: "If new data size is large than newDataSizeThreshold, execute clustering compaction",
DefaultValue: "512m",
}
p.ClusteringCompactionNewDataSizeThreshold.Init(base.mgr)

p.ClusteringCompactionTimeoutInSeconds = ParamItem{
Key: "dataCoord.compaction.clustering.timeout",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "3600",
}
p.ClusteringCompactionTimeoutInSeconds.Init(base.mgr)

p.ClusteringCompactionDropTolerance = ParamItem{
Key: "dataCoord.compaction.clustering.dropTolerance",
Version: "2.4.2",
Version: "2.4.6",
Doc: "If clustering compaction job is finished for a long time, gc it",
DefaultValue: "259200",
}
p.ClusteringCompactionDropTolerance.Init(base.mgr)

p.ClusteringCompactionPreferSegmentSize = ParamItem{
Key: "dataCoord.compaction.clustering.preferSegmentSize",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "512m",
PanicIfEmpty: false,
Export: true,
Expand All @@ -3347,7 +3356,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMaxSegmentSize = ParamItem{
Key: "dataCoord.compaction.clustering.maxSegmentSize",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "1024m",
PanicIfEmpty: false,
Export: true,
Expand All @@ -3356,7 +3365,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMaxTrainSizeRatio = ParamItem{
Key: "dataCoord.compaction.clustering.maxTrainSizeRatio",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "0.8",
Doc: "max data size ratio in Kmeans train, if larger than it, will down sampling to meet this limit",
Export: true,
Expand All @@ -3365,7 +3374,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMaxCentroidsNum = ParamItem{
Key: "dataCoord.compaction.clustering.maxCentroidsNum",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "10240",
Doc: "maximum centroids number in Kmeans train",
Export: true,
Expand All @@ -3374,7 +3383,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMinCentroidsNum = ParamItem{
Key: "dataCoord.compaction.clustering.minCentroidsNum",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "16",
Doc: "minimum centroids number in Kmeans train",
Export: true,
Expand All @@ -3383,7 +3392,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMinClusterSizeRatio = ParamItem{
Key: "dataCoord.compaction.clustering.minClusterSizeRatio",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "0.01",
Doc: "minimum cluster size / avg size in Kmeans train",
Export: true,
Expand All @@ -3392,7 +3401,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMaxClusterSizeRatio = ParamItem{
Key: "dataCoord.compaction.clustering.maxClusterSizeRatio",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "10",
Doc: "maximum cluster size / avg size in Kmeans train",
Export: true,
Expand All @@ -3401,7 +3410,7 @@ During compaction, the size of segment # of rows is able to exceed segment max #

p.ClusteringCompactionMaxClusterSize = ParamItem{
Key: "dataCoord.compaction.clustering.maxClusterSize",
Version: "2.4.2",
Version: "2.4.6",
DefaultValue: "5g",
Doc: "maximum cluster size in Kmeans train",
Export: true,
Expand Down
2 changes: 2 additions & 0 deletions pkg/util/paramtable/component_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,8 @@ func TestComponentParam(t *testing.T) {
assert.Equal(t, true, Params.UsePartitionKeyAsClusteringKey.GetAsBool())
params.Save("common.useVectorAsClusteringKey", "true")
assert.Equal(t, true, Params.UseVectorAsClusteringKey.GetAsBool())
params.Save("common.enableVectorClusteringKey", "true")
assert.Equal(t, true, Params.EnableVectorClusteringKey.GetAsBool())
})
}

Expand Down

0 comments on commit b1dc35e

Please sign in to comment.