Skip to content

Commit

Permalink
enhance: Enable setting the replica number and resource group during …
Browse files Browse the repository at this point in the history
…database creation

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Jul 4, 2024
1 parent d51d095 commit a8acb7f
Show file tree
Hide file tree
Showing 8 changed files with 285 additions and 39 deletions.
11 changes: 5 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/aliyun/credentials-go v1.2.7
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e
github.com/apache/arrow/go/v12 v12.0.1
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7
github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 // indirect
github.com/bits-and-blooms/bloom/v3 v3.0.1
github.com/blang/semver/v4 v4.0.0
github.com/casbin/casbin/v2 v2.44.2
Expand Down Expand Up @@ -39,7 +39,7 @@ require (
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/stretchr/testify v1.8.4
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect
github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865
github.com/tidwall/gjson v1.14.4
github.com/tikv/client-go/v2 v2.0.4
Expand Down Expand Up @@ -69,7 +69,6 @@ require (
github.com/jolestar/go-commons-pool/v2 v2.1.2
github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000
github.com/pkg/errors v0.9.1
github.com/remeh/sizedwaitgroup v1.0.0
github.com/valyala/fastjson v1.6.4
github.com/zeebo/xxh3 v1.0.2
google.golang.org/protobuf v1.33.0
Expand Down Expand Up @@ -114,9 +113,6 @@ require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/expr-lang/expr v1.15.7 // indirect
github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect
github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect
github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect
github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
Expand Down Expand Up @@ -186,6 +182,7 @@ require (
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/procfs v0.9.0 // indirect
github.com/remeh/sizedwaitgroup v1.0.0 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/rs/xid v1.5.0 // indirect
Expand Down Expand Up @@ -261,3 +258,5 @@ replace (
)

exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b

replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20240703112147-a191ca8d29d0
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,6 @@ github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEH
github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016 h1:8WV4maXLeGEyJCCYIc1DmZ18H+VFAjMrwXJg5iI2nX4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho=
github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A=
Expand Down Expand Up @@ -900,6 +898,8 @@ github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLr
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20240703112147-a191ca8d29d0 h1:1UjAmnQlfo0fS8woE2R7Sbzq3G+i/z2lQjYv+wxyc+w=
github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20240703112147-a191ca8d29d0/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
Expand Down
40 changes: 30 additions & 10 deletions internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,43 @@ func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName st

// try to get database level replica_num and resource groups, return (resource_groups, replica_num, error)
func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) {
// to do by weiliu1031: querycoord should cache mappings: collectionID->dbName
collectionInfo, err := broker.DescribeCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}

dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
if err != nil {
return nil, 0, err
log.Info("collection props", zap.Any("xxx", collectionInfo.GetProperties()))
replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties())
if replicaNum > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
if err != nil {
return nil, 0, err

rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties())
if len(rgs) > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
return nil, 0, err

if replicaNum <= 0 || len(rgs) == 0 {
dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
if err != nil {
return nil, 0, err
}
log.Info("database props", zap.Any("xxx", dbInfo.GetProperties()))
log.Info("xxx", zap.Int64("replicaNumber", replicaNum))
log.Info("xxx", zap.Strings("rgs", rgs))
if replicaNum <= 0 {
replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
if replicaNum > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
}

if len(rgs) == 0 {
rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if len(rgs) > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
}
}

return rgs, replicaNum, nil
Expand Down
20 changes: 9 additions & 11 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,20 +217,18 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
// when replica number or resource groups is not set, use pre-defined load config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to get data base level load info", zap.Error(err))
}

if req.GetReplicaNumber() <= 0 {
log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas))
req.ReplicaNumber = int32(replicas)
}
log.Warn("failed to get pre-defined load info", zap.Error(err))
} else {
if req.GetReplicaNumber() <= 0 && replicas > 0 {
req.ReplicaNumber = int32(replicas)
}

if len(req.GetResourceGroups()) == 0 {
log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs))
req.ResourceGroups = rgs
if len(req.GetResourceGroups()) == 0 && len(rgs) > 0 {
req.ResourceGroups = rgs
}
}
}

Expand Down
1 change: 1 addition & 0 deletions internal/rootcoord/create_db_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ func (t *createDatabaseTask) Prepare(ctx context.Context) error {

func (t *createDatabaseTask) Execute(ctx context.Context) error {
db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated)
db.Properties = t.Req.GetProperties()
return t.core.meta.CreateDatabase(ctx, db, t.GetTs())
}
41 changes: 40 additions & 1 deletion pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,11 @@ const (

PartitionDiskQuotaKey = "partition.diskProtection.diskQuota.mb"

// database level properties
// collection level load properties
CollectionReplicaNumber = "collection.replica.number"
CollectionResourceGroups = "collection.resource_groups"

// database level load properties
DatabaseReplicaNumber = "database.replica.number"
DatabaseResourceGroups = "database.resource_groups"
)
Expand Down Expand Up @@ -255,3 +259,38 @@ func DatabaseLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error)

return nil, fmt.Errorf("database property not found: %s", DatabaseResourceGroups)
}

func CollectionLevelReplicaNumber(kvs []*commonpb.KeyValuePair) (int64, error) {
for _, kv := range kvs {
if kv.Key == CollectionReplicaNumber {
replicaNum, err := strconv.ParseInt(kv.Value, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
}

return replicaNum, nil
}
}

return 0, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}

func CollectionLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) {
for _, kv := range kvs {
if kv.Key == CollectionResourceGroups {
invalidPropValue := fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
if len(kv.Value) == 0 {
return nil, invalidPropValue
}

rgs := strings.Split(kv.Value, ",")
if len(rgs) == 0 {
return nil, invalidPropValue
}

return rgs, nil
}
}

return nil, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}
Loading

0 comments on commit a8acb7f

Please sign in to comment.