diff --git a/go.sum b/go.sum index 94c259ddcb22..c90f3ebfa950 100644 --- a/go.sum +++ b/go.sum @@ -607,8 +607,6 @@ github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEH github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8= github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016 h1:8WV4maXLeGEyJCCYIc1DmZ18H+VFAjMrwXJg5iI2nX4= -github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53 h1:hLeTFOV/IXUoTbm4slVWFSnR296yALJ8Zo+YCMEvAy0= github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs= github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI= diff --git a/internal/querycoordv2/meta/coordinator_broker.go b/internal/querycoordv2/meta/coordinator_broker.go index 2df54688affb..b5b606b81697 100644 --- a/internal/querycoordv2/meta/coordinator_broker.go +++ b/internal/querycoordv2/meta/coordinator_broker.go @@ -107,23 +107,48 @@ func (broker *CoordinatorBroker) DescribeDatabase(ctx context.Context, dbName st // try to get database level replica_num and resource groups, return (resource_groups, replica_num, error) func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) { - // to do by weiliu1031: querycoord should cache mappings: collectionID->dbName collectionInfo, err := broker.DescribeCollection(ctx, collectionID) if err != nil { return nil, 0, err } - dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName()) + replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties()) if err != nil { - return nil, 0, err + log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err)) + } else if replicaNum > 0 { + log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum)) } - replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties()) + + rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties()) if err != nil { - return nil, 0, err + log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err)) + } else if len(rgs) > 0 { + log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs)) } - rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties()) - if err != nil { - return nil, 0, err + + if replicaNum <= 0 || len(rgs) == 0 { + dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName()) + if err != nil { + return nil, 0, err + } + + if replicaNum <= 0 { + replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties()) + if err != nil { + log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err)) + } else if replicaNum > 0 { + log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum)) + } + } + + if len(rgs) == 0 { + rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties()) + if err != nil { + log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err)) + } else if len(rgs) > 0 { + log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs)) + } + } } return rgs, replicaNum, nil diff --git a/internal/querycoordv2/meta/coordinator_broker_test.go b/internal/querycoordv2/meta/coordinator_broker_test.go index 778268f7ce66..dbecfc20a26c 100644 --- a/internal/querycoordv2/meta/coordinator_broker_test.go +++ b/internal/querycoordv2/meta/coordinator_broker_test.go @@ -572,7 +572,7 @@ func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionLoadInfo() { Properties: []*commonpb.KeyValuePair{}, }, nil) _, _, err := s.broker.GetCollectionLoadInfo(ctx, 1) - s.Error(err) + s.NoError(err) s.resetMock() }) } diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 75dcaf8f716c..6b3f4c43d153 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -217,20 +217,18 @@ func (s *Server) LoadCollection(ctx context.Context, req *querypb.LoadCollection } if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 { - // when replica number or resource groups is not set, use database level config + // when replica number or resource groups is not set, use pre-defined load config rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID()) if err != nil { - log.Warn("failed to get data base level load info", zap.Error(err)) - } - - if req.GetReplicaNumber() <= 0 { - log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas)) - req.ReplicaNumber = int32(replicas) - } + log.Warn("failed to get pre-defined load info", zap.Error(err)) + } else { + if req.GetReplicaNumber() <= 0 && replicas > 0 { + req.ReplicaNumber = int32(replicas) + } - if len(req.GetResourceGroups()) == 0 { - log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs)) - req.ResourceGroups = rgs + if len(req.GetResourceGroups()) == 0 && len(rgs) > 0 { + req.ResourceGroups = rgs + } } } diff --git a/pkg/common/common.go b/pkg/common/common.go index b0fbe73043ad..bd5b600b3ade 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -153,6 +153,10 @@ const ( DatabaseDiskQuotaKey = "database.diskQuota.mb" DatabaseMaxCollectionsKey = "database.max.collections" DatabaseForceDenyWritingKey = "database.force.deny.writing" + + // collection level load properties + CollectionReplicaNumber = "collection.replica.number" + CollectionResourceGroups = "collection.resource_groups" ) // common properties @@ -259,3 +263,38 @@ func DatabaseLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) return nil, fmt.Errorf("database property not found: %s", DatabaseResourceGroups) } + +func CollectionLevelReplicaNumber(kvs []*commonpb.KeyValuePair) (int64, error) { + for _, kv := range kvs { + if kv.Key == CollectionReplicaNumber { + replicaNum, err := strconv.ParseInt(kv.Value, 10, 64) + if err != nil { + return 0, fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value) + } + + return replicaNum, nil + } + } + + return 0, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber) +} + +func CollectionLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) { + for _, kv := range kvs { + if kv.Key == CollectionResourceGroups { + invalidPropValue := fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value) + if len(kv.Value) == 0 { + return nil, invalidPropValue + } + + rgs := strings.Split(kv.Value, ",") + if len(rgs) == 0 { + return nil, invalidPropValue + } + + return rgs, nil + } + } + + return nil, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber) +} diff --git a/tests/integration/replicas/load/load_test.go b/tests/integration/replicas/load/load_test.go index 837a634c5379..89c8c9fbf488 100644 --- a/tests/integration/replicas/load/load_test.go +++ b/tests/integration/replicas/load/load_test.go @@ -54,29 +54,29 @@ func (s *LoadTestSuite) SetupSuite() { s.Require().NoError(s.SetupEmbedEtcd()) } -func (s *LoadTestSuite) loadCollection(collectionName string, replica int, rgs []string) { +func (s *LoadTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // load loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{ - DbName: dbName, + DbName: db, CollectionName: collectionName, ReplicaNumber: int32(replica), ResourceGroups: rgs, }) s.NoError(err) s.True(merr.Ok(loadStatus)) - s.WaitForLoad(ctx, collectionName) + s.WaitForLoadWithDB(ctx, db, collectionName) } -func (s *LoadTestSuite) releaseCollection(collectionName string) { +func (s *LoadTestSuite) releaseCollection(db, collectionName string) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // load status, err := s.Cluster.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{ - DbName: dbName, + DbName: db, CollectionName: collectionName, }) s.NoError(err) @@ -171,7 +171,7 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() { s.Len(resp1.GetProperties(), 2) // load collection without specified replica and rgs - s.loadCollection(collectionName, 0, nil) + s.loadCollection(collectionName, dbName, 0, nil) resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ DbName: dbName, CollectionName: collectionName, @@ -179,7 +179,180 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() { s.NoError(err) s.True(merr.Ok(resp2.Status)) s.Len(resp2.GetReplicas(), 3) - s.releaseCollection(collectionName) + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() { + ctx := context.Background() + + // prepare resource groups + rgNum := 3 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: dbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + ReplicaNumber: 3, + ResourceGroups: rgs, + }) + + // load collection without specified replica and rgs + s.loadCollection(collectionName, dbName, 0, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: dbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 3) + s.releaseCollection(dbName, collectionName) +} + +func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() { + ctx := context.Background() + + // prepare resource groups + rgNum := 3 + rgs := make([]string, 0) + for i := 0; i < rgNum; i++ { + rgs = append(rgs, fmt.Sprintf("rg_%d", i)) + s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{ + ResourceGroup: rgs[i], + Config: &rgpb.ResourceGroupConfig{ + Requests: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + Limits: &rgpb.ResourceGroupLimit{ + NodeNum: 1, + }, + + TransferFrom: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + TransferTo: []*rgpb.ResourceGroupTransfer{ + { + ResourceGroup: meta.DefaultResourceGroupName, + }, + }, + }, + }) + } + + resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{}) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + s.Len(resp.GetResourceGroups(), rgNum+1) + + for i := 1; i < rgNum; i++ { + s.Cluster.AddQueryNode() + } + + s.Eventually(func() bool { + matchCounter := 0 + for _, rg := range rgs { + resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{ + ResourceGroup: rg, + }) + s.NoError(err) + s.True(merr.Ok(resp.GetStatus())) + if len(resp1.ResourceGroup.Nodes) == 1 { + matchCounter += 1 + } + } + return matchCounter == rgNum + }, 30*time.Second, time.Second) + + newDbName := "db_load_test_with_db_level_config" + resp1, err := s.Cluster.Proxy.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{ + DbName: newDbName, + Properties: []*commonpb.KeyValuePair{ + { + Key: common.DatabaseReplicaNumber, + Value: "3", + }, + { + Key: common.DatabaseResourceGroups, + Value: strings.Join(rgs, ","), + }, + }, + }) + s.NoError(err) + s.True(merr.Ok(resp1)) + + s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{ + DBName: newDbName, + Dim: dim, + CollectionName: collectionName, + ChannelNum: 1, + SegmentNum: 3, + RowNumPerSegment: 2000, + }) + + // load collection without specified replica and rgs + s.loadCollection(collectionName, newDbName, 0, nil) + resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ + DbName: newDbName, + CollectionName: collectionName, + }) + s.NoError(err) + s.True(merr.Ok(resp2.Status)) + s.Len(resp2.GetReplicas(), 3) + s.releaseCollection(newDbName, collectionName) } func TestReplicas(t *testing.T) { diff --git a/tests/integration/util_collection.go b/tests/integration/util_collection.go index 5f22f78099eb..bd8fdc0db2fc 100644 --- a/tests/integration/util_collection.go +++ b/tests/integration/util_collection.go @@ -2,12 +2,16 @@ package integration import ( "context" + "strconv" + "strings" "github.com/golang/protobuf/proto" "go.uber.org/zap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/milvuspb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metric" @@ -20,6 +24,8 @@ type CreateCollectionConfig struct { SegmentNum int RowNumPerSegment int Dim int + ReplicaNumber int32 + ResourceGroups []string } func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context, cfg *CreateCollectionConfig) { @@ -33,12 +39,22 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context CollectionName: cfg.CollectionName, Schema: marshaledSchema, ShardsNum: int32(cfg.ChannelNum), + Properties: []*commonpb.KeyValuePair{ + { + Key: common.CollectionReplicaNumber, + Value: strconv.FormatInt(int64(cfg.ReplicaNumber), 10), + }, + { + Key: common.CollectionResourceGroups, + Value: strings.Join(cfg.ResourceGroups, ","), + }, + }, }) s.NoError(err) s.True(merr.Ok(createCollectionStatus)) log.Info("CreateCollection result", zap.Any("createCollectionStatus", createCollectionStatus)) - showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{}) + showCollectionsResp, err := s.Cluster.Proxy.ShowCollections(ctx, &milvuspb.ShowCollectionsRequest{DbName: cfg.DBName}) s.NoError(err) s.True(merr.Ok(showCollectionsResp.Status)) log.Info("ShowCollections result", zap.Any("showCollectionsResp", showCollectionsResp)) @@ -80,5 +96,5 @@ func (s *MiniClusterSuite) CreateCollectionWithConfiguration(ctx context.Context }) s.NoError(err) s.True(merr.Ok(createIndexStatus)) - s.WaitForIndexBuilt(ctx, cfg.CollectionName, FloatVecField) + s.WaitForIndexBuiltWithDB(ctx, cfg.DBName, cfg.CollectionName, FloatVecField) }