diff --git a/go.mod b/go.mod index 97570a30fc96d..b92cbb83202c6 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/aliyun/credentials-go v1.2.7 github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e github.com/apache/arrow/go/v12 v12.0.1 - github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 + github.com/apache/pulsar-client-go v0.6.1-0.20210728062540-29414db801a7 // indirect github.com/bits-and-blooms/bloom/v3 v3.0.1 github.com/blang/semver/v4 v4.0.0 github.com/casbin/casbin/v2 v2.44.2 @@ -39,7 +39,7 @@ require ( github.com/spf13/cast v1.3.1 github.com/spf13/viper v1.8.1 github.com/stretchr/testify v1.8.4 - github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c + github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/common v1.0.865 github.com/tidwall/gjson v1.14.4 github.com/tikv/client-go/v2 v2.0.4 @@ -69,7 +69,6 @@ require ( github.com/jolestar/go-commons-pool/v2 v2.1.2 github.com/milvus-io/milvus/pkg v0.0.0-00010101000000-000000000000 github.com/pkg/errors v0.9.1 - github.com/remeh/sizedwaitgroup v1.0.0 github.com/valyala/fastjson v1.6.4 github.com/zeebo/xxh3 v1.0.2 google.golang.org/protobuf v1.33.0 @@ -114,9 +113,6 @@ require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/dvsekhvalnov/jose2go v1.6.0 // indirect github.com/expr-lang/expr v1.15.7 // indirect - github.com/facebookgo/ensure v0.0.0-20200202191622-63f1cf65ac4c // indirect - github.com/facebookgo/stack v0.0.0-20160209184415-751773369052 // indirect - github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 // indirect github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect github.com/fsnotify/fsnotify v1.4.9 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -186,6 +182,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/procfs v0.9.0 // indirect + github.com/remeh/sizedwaitgroup v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect github.com/rs/xid v1.5.0 // indirect @@ -261,3 +258,5 @@ replace ( ) exclude github.com/apache/pulsar-client-go/oauth2 v0.0.0-20211108044248-fe3b7c4e445b + +replace github.com/milvus-io/milvus-proto/go-api/v2 => github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20240703112147-a191ca8d29d0 diff --git a/go.sum b/go.sum index f52d95b12cdc3..7d72ecffe4633 100644 --- a/go.sum +++ b/go.sum @@ -607,8 +607,6 @@ github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEH github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016 h1:8WV4maXLeGEyJCCYIc1DmZ18H+VFAjMrwXJg5iI2nX4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI= github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70/go.mod h1:GPETMcTZq1gLY1WA6Na5kiNAKnq8SEMMiVKUZrM3sho= github.com/milvus-io/pulsar-client-go v0.6.10 h1:eqpJjU+/QX0iIhEo3nhOqMNXL+TyInAs1IAHZCrCM/A= @@ -900,6 +898,8 @@ github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLr github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20240703112147-a191ca8d29d0 h1:1UjAmnQlfo0fS8woE2R7Sbzq3G+i/z2lQjYv+wxyc+w= +github.com/weiliu1031/milvus-proto/go-api/v2 v2.0.0-20240703112147-a191ca8d29d0/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 2df54688affb0..8b454e00938db 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -107,23 +107,43 @@ func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName st // try to get database level replica_num and resource groups, return (resource_groups, replica_num, error) func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) { - // to do by weiliu1031: querycoord should cache mappings: collectionID->dbName collectionInfo, err := broker.DescribeCollection(ctx, collectionID) if err != nil { return nil, 0, err } - dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName()) - if err != nil { - return nil, 0, err + log.Info("collection props", zap.Any("xxx", collectionInfo.GetProperties())) + replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties()) + if replicaNum > 0 { + log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum)) } - replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties()) - if err != nil { - return nil, 0, err + + rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties()) + if len(rgs) > 0 { + log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs)) } - rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties()) - if err != nil { - return nil, 0, err + + if replicaNum <= 0 || len(rgs) == 0 { + dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName()) + if err != nil { + return nil, 0, err + } + log.Info("database props", zap.Any("xxx", dbInfo.GetProperties())) + log.Info("xxx", zap.Int64("replicaNumber", replicaNum)) + log.Info("xxx", zap.Strings("rgs", rgs)) + if replicaNum <= 0 { + replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties()) + if replicaNum > 0 { + log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum)) + } + } + + if len(rgs) == 0 { + rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties()) + if len(rgs) > 0 { + log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs)) + } + } } return rgs, replicaNum, nil diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 75dcaf8f716c5..6b3f4c43d1539 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -217,20 +217,18 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection } if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 { - // when replica number or resource groups is not set, use database level config + // when replica number or resource groups is not set, use pre-defined load config rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID()) if err != nil { - log.Warn("failed to get data base level load info", zap.Error(err)) - } - - if req.GetReplicaNumber() <= 0 { - log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas)) - req.ReplicaNumber = int32(replicas) - } + log.Warn("failed to get pre-defined load info", zap.Error(err)) + } else { + if req.GetReplicaNumber() <= 0 && replicas > 0 { + req.ReplicaNumber = int32(replicas) + } - if len(req.GetResourceGroups()) == 0 { - log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs)) - req.ResourceGroups = rgs + if len(req.GetResourceGroups()) == 0 && len(rgs) > 0 { + req.ResourceGroups = rgs + } } } diff --git a/internal/rootcoord/create_db_task.go b/internal/rootcoord/create_db_task.go index 089c25c68fad3..5db02b470515e 100644 --- a/internal/rootcoord/create_db_task.go +++ b/internal/rootcoord/create_db_task.go @@ -51,5 +51,6 @@ func (t *createDatabaseTask) Prepare(ctx context.Context) error { func (t *createDatabaseTask) Execute(ctx context.Context) error { db := model.NewDatabase(t.dbID, t.Req.GetDbName(), etcdpb.DatabaseState_DatabaseCreated) + db.Properties = t.Req.GetProperties() return t.core.meta.CreateDatabase(ctx, db, t.GetTs()) } diff --git a/pkg/common/common.go b/pkg/common/common.go index 8b881c14a2de9..6f000822c7c03 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -146,7 +146,11 @@ const ( PartitionDiskQuotaKey = "partition.diskProtection.diskQuota.mb" - // database level properties + // collection level load properties + CollectionReplicaNumber = "collection.replica.number" + CollectionResourceGroups = "collection.resource_groups" + + // database level load properties DatabaseReplicaNumber = "database.replica.number" DatabaseResourceGroups = "database.resource_groups" ) @@ -255,3 +259,38 @@ func DatabaseLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) return nil, fmt.Errorf("database property not found: %s", DatabaseResourceGroups) } + +func CollectionLevelReplicaNumber(kvs []*commonpb.KeyValuePair) (int64, error) { + for _, kv := range kvs { + if kv.Key == CollectionReplicaNumber { + replicaNum, err := strconv.ParseInt(kv.Value, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value) + } + + return replicaNum, nil + } + } + + return 0, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber) +} + +func CollectionLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) { + for _, kv := range kvs { + if kv.Key == CollectionResourceGroups { + invalidPropValue := fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value) + if len(kv.Value) == 0 { + return nil, invalidPropValue + } + + rgs := strings.Split(kv.Value, ",") + if len(rgs) == 0 { + return nil, invalidPropValue + } + + return rgs, nil + } + } + + return nil, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber) +} diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index 837a634c53799..89c8c9fbf4889 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -54,29 +54,29 @@ func (s *LoadTestSuite) SetupSuite() { s.Require().NoError(s.SetupEmbedEtcd()) } -func (s *LoadTestSuite) loadCollection(collectionName string, replica int, rgs []string) { +func (s *LoadTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // load loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ - DbName: dbName, + DbName: db, CollectionName: collectionName, ReplicaNumber: int32(replica), ResourceGroups: rgs, }) s.NoError(err) s.True(merr.Ok(loadStatus)) - s.WaitForLoad(ctx, collectionName) + s.WaitForLoadWithDB(ctx, db, collectionName) } -func (s *LoadTestSuite) releaseCollection(collectionName string) { +func (s *LoadTestSuite) releaseCollection(db, collectionName string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // load status, err := s.Cluster.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ - DbName: dbName, + DbName: db, CollectionName: collectionName, }) s.NoError(err) @@ -171,7 +171,7 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() { s.Len(resp1.GetProperties(), 2) // load collection without specified replica and rgs - s.loadCollection(collectionName, 0, nil) + s.loadCollection(collectionName, dbName, 0, nil) resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ DbName: dbName, CollectionName: collectionName, @@ -179,7 +179,180 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() { s.NoError(err) s.True(merr.Ok(resp2.Status)) s.Len(resp2.GetReplicas(), 3) - s.releaseCollection(collectionName) + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() { + ctx := context.Background() + + // prepare resource groups + rgNum := 3 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + ReplicaNumber: 3, + ResourceGroups: rgs, + }) + + // load collection without specified replica and rgs + s.loadCollection(collectionName, dbName, 0, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 3) + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() { + ctx := context.Background() + + // prepare resource groups + rgNum := 3 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + newDbName := "db_load_test_with_db_level_config" + resp1, err := s.Cluster.Proxy.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{ + DbName: newDbName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseReplicaNumber, + Value: "3", + }, + { + Key: common.DatabaseResourceGroups, + Value: strings.Join(rgs, ","), + }, + }, + }) + s.NoError(err) + s.True(merr.Ok(resp1)) + + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: newDbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + // load collection without specified replica and rgs + s.loadCollection(collectionName, newDbName, 0, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: newDbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 3) + s.releaseCollection(newDbName, collectionName) } func TestReplicas(t *testing.T) { diff --git a/tests/integration/util_collection.go b/tests/integration/util_collection.go index 5f22f78099eb7..bd8fdc0db2fc8 100644 --- a/tests/integration/util_collection.go +++ b/tests/integration/util_collection.go @@ -2,12 +2,16 @@ package integration import ( "context" + "strconv" + "strings" "github.com/golang/protobuf/proto" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" @@ -20,6 +24,8 @@ type CreateCollectionConfig struct { SegmentNum int RowNumPerSegment int Dim int + ReplicaNumber int32 + ResourceGroups []string } func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) { @@ -33,12 +39,22 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context CollectionName: cfg.CollectionName, Schema: marshaledSchema, ShardsNum: int32(cfg.ChannelNum), + Properties: []*commonpb.KeyValuePair{ + { + Key: common.CollectionReplicaNumber, + Value: strconv.FormatInt(int64(cfg.ReplicaNumber), 10), + }, + { + Key: common.CollectionResourceGroups, + Value: strings.Join(cfg.ResourceGroups, ","), + }, + }, }) s.NoError(err) s.True(merr.Ok(createCollectionStatus)) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{DbName: cfg.DBName}) s.NoError(err) s.True(merr.Ok(showCollectionsResp.Status)) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) @@ -80,5 +96,5 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context }) s.NoError(err) s.True(merr.Ok(createIndexStatus)) - s.WaitForIndexBuilt(ctx, cfg.CollectionName, FloatVecField) + s.WaitForIndexBuiltWithDB(ctx, cfg.DBName, cfg.CollectionName, FloatVecField) }