Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Enable setting the replica number and resource group during collection creation #34403

Merged
merged 3 commits into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -607,8 +607,6 @@ github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93 h1:xnIeuG1nuTEH
github.com/milvus-io/blobloom v0.0.0-20240603110411-471ae49f3b93/go.mod h1:mjMJ1hh1wjGVfr93QIHJ6FfDNVrA0IELv8OvMHJxHKs=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b h1:TfeY0NxYxZzUfIfYe5qYDBzt4ZYRqzUjTR6CvUzjat8=
github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b/go.mod h1:iwW+9cWfIzzDseEBCCeDSN5SD16Tidvy8cwQ7ZY8Qj4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016 h1:8WV4maXLeGEyJCCYIc1DmZ18H+VFAjMrwXJg5iI2nX4=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240430035521-259ae1d10016/go.mod h1:1OIl0v5PQeNxIJhCvY+K55CBUOYDZevw9g9380u1Wek=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53 h1:hLeTFOV/IXUoTbm4slVWFSnR296yALJ8Zo+YCMEvAy0=
github.com/milvus-io/milvus-proto/go-api/v2 v2.3.4-0.20240708102203-5e0455265c53/go.mod h1:/6UT4zZl6awVeXLeE7UGDWZvXj3IWkRsh3mqsn0DiAs=
github.com/milvus-io/milvus-storage/go v0.0.0-20231227072638-ebd0b8e56d70 h1:Z+sp64fmAOxAG7mU0dfVOXvAXlwRB0c8a96rIM5HevI=
Expand Down
41 changes: 33 additions & 8 deletions internal/querycoordv2/meta/coordinator_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,23 +107,48 @@

// try to get database level replica_num and resource groups, return (resource_groups, replica_num, error)
func (broker *CoordinatorBroker) GetCollectionLoadInfo(ctx context.Context, collectionID UniqueID) ([]string, int64, error) {
// to do by weiliu1031: querycoord should cache mappings: collectionID->dbName
collectionInfo, err := broker.DescribeCollection(ctx, collectionID)
if err != nil {
return nil, 0, err
}

dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
replicaNum, err := common.CollectionLevelReplicaNumber(collectionInfo.GetProperties())
if err != nil {
return nil, 0, err
log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if replicaNum > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
replicaNum, err := common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())

rgs, err := common.CollectionLevelResourceGroups(collectionInfo.GetProperties())
if err != nil {
return nil, 0, err
log.Warn("failed to get collection level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if len(rgs) > 0 {
log.Info("get collection level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
rgs, err := common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
return nil, 0, err

if replicaNum <= 0 || len(rgs) == 0 {
dbInfo, err := broker.DescribeDatabase(ctx, collectionInfo.GetDbName())
if err != nil {
return nil, 0, err

Check warning on line 132 in internal/querycoordv2/meta/coordinator_broker.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/meta/coordinator_broker.go#L132

Added line #L132 was not covered by tests
}

if replicaNum <= 0 {
replicaNum, err = common.DatabaseLevelReplicaNumber(dbInfo.GetProperties())
if err != nil {
log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if replicaNum > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Int64("replica_num", replicaNum))
}
}

if len(rgs) == 0 {
rgs, err = common.DatabaseLevelResourceGroups(dbInfo.GetProperties())
if err != nil {
log.Warn("failed to get database level load info", zap.Int64("collectionID", collectionID), zap.Error(err))
} else if len(rgs) > 0 {
log.Info("get database level load info", zap.Int64("collectionID", collectionID), zap.Strings("resource_groups", rgs))
}
}
}

return rgs, replicaNum, nil
Expand Down
2 changes: 1 addition & 1 deletion internal/querycoordv2/meta/coordinator_broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (s *CoordinatorBrokerRootCoordSuite) TestGetCollectionLoadInfo() {
Properties: []*commonpb.KeyValuePair{},
}, nil)
_, _, err := s.broker.GetCollectionLoadInfo(ctx, 1)
s.Error(err)
s.NoError(err)
s.resetMock()
})
}
Expand Down
20 changes: 9 additions & 11 deletions internal/querycoordv2/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,20 +217,18 @@
}

if req.GetReplicaNumber() <= 0 || len(req.GetResourceGroups()) == 0 {
// when replica number or resource groups is not set, use database level config
// when replica number or resource groups is not set, use pre-defined load config
rgs, replicas, err := s.broker.GetCollectionLoadInfo(ctx, req.GetCollectionID())
if err != nil {
log.Warn("failed to get data base level load info", zap.Error(err))
}

if req.GetReplicaNumber() <= 0 {
log.Info("load collection use database level replica number", zap.Int64("databaseLevelReplicaNum", replicas))
req.ReplicaNumber = int32(replicas)
}
log.Warn("failed to get pre-defined load info", zap.Error(err))

Check warning on line 223 in internal/querycoordv2/services.go

View check run for this annotation

Codecov / codecov/patch

internal/querycoordv2/services.go#L223

Added line #L223 was not covered by tests
} else {
if req.GetReplicaNumber() <= 0 && replicas > 0 {
req.ReplicaNumber = int32(replicas)
}

if len(req.GetResourceGroups()) == 0 {
log.Info("load collection use database level resource groups", zap.Strings("databaseLevelResourceGroups", rgs))
req.ResourceGroups = rgs
if len(req.GetResourceGroups()) == 0 && len(rgs) > 0 {
req.ResourceGroups = rgs
}
}
}

Expand Down
39 changes: 39 additions & 0 deletions pkg/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@
DatabaseDiskQuotaKey = "database.diskQuota.mb"
DatabaseMaxCollectionsKey = "database.max.collections"
DatabaseForceDenyWritingKey = "database.force.deny.writing"

// collection level load properties
CollectionReplicaNumber = "collection.replica.number"
CollectionResourceGroups = "collection.resource_groups"
weiliu1031 marked this conversation as resolved.
Show resolved Hide resolved
)

// common properties
Expand Down Expand Up @@ -259,3 +263,38 @@

return nil, fmt.Errorf("database property not found: %s", DatabaseResourceGroups)
}

func CollectionLevelReplicaNumber(kvs []*commonpb.KeyValuePair) (int64, error) {
weiliu1031 marked this conversation as resolved.
Show resolved Hide resolved
for _, kv := range kvs {
if kv.Key == CollectionReplicaNumber {
replicaNum, err := strconv.ParseInt(kv.Value, 10, 64)
if err != nil {
return 0, fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)

Check warning on line 272 in pkg/common/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/common.go#L272

Added line #L272 was not covered by tests
}

return replicaNum, nil
}
}

return 0, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}

func CollectionLevelResourceGroups(kvs []*commonpb.KeyValuePair) ([]string, error) {
weiliu1031 marked this conversation as resolved.
Show resolved Hide resolved
for _, kv := range kvs {
if kv.Key == CollectionResourceGroups {
invalidPropValue := fmt.Errorf("invalid collection property: [key=%s] [value=%s]", kv.Key, kv.Value)
if len(kv.Value) == 0 {
return nil, invalidPropValue
}

rgs := strings.Split(kv.Value, ",")
if len(rgs) == 0 {
return nil, invalidPropValue

Check warning on line 292 in pkg/common/common.go

View check run for this annotation

Codecov / codecov/patch

pkg/common/common.go#L292

Added line #L292 was not covered by tests
}

return rgs, nil
}
}

return nil, fmt.Errorf("collection property not found: %s", CollectionReplicaNumber)
}
187 changes: 180 additions & 7 deletions tests/integration/replicas/load/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,29 +54,29 @@ func (s *LoadTestSuite) SetupSuite() {
s.Require().NoError(s.SetupEmbedEtcd())
}

func (s *LoadTestSuite) loadCollection(collectionName string, replica int, rgs []string) {
func (s *LoadTestSuite) loadCollection(collectionName string, db string, replica int, rgs []string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// load
loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
DbName: db,
CollectionName: collectionName,
ReplicaNumber: int32(replica),
ResourceGroups: rgs,
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.WaitForLoad(ctx, collectionName)
s.WaitForLoadWithDB(ctx, db, collectionName)
}

func (s *LoadTestSuite) releaseCollection(collectionName string) {
func (s *LoadTestSuite) releaseCollection(db, collectionName string) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// load
status, err := s.Cluster.Proxy.ReleaseCollection(ctx, &milvuspb.ReleaseCollectionRequest{
DbName: dbName,
DbName: db,
CollectionName: collectionName,
})
s.NoError(err)
Expand Down Expand Up @@ -171,15 +171,188 @@ func (s *LoadTestSuite) TestLoadWithDatabaseLevelConfig() {
s.Len(resp1.GetProperties(), 2)

// load collection without specified replica and rgs
s.loadCollection(collectionName, 0, nil)
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(collectionName)
s.releaseCollection(dbName, collectionName)
}

func (s *LoadTestSuite) TestLoadWithPredefineCollectionLevelConfig() {
ctx := context.Background()

// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},

TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}

resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)

for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}

s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)

s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: dbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
ReplicaNumber: 3,
ResourceGroups: rgs,
})

// load collection without specified replica and rgs
s.loadCollection(collectionName, dbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(dbName, collectionName)
}

func (s *LoadTestSuite) TestLoadWithPredefineDatabaseLevelConfig() {
ctx := context.Background()

// prepare resource groups
rgNum := 3
rgs := make([]string, 0)
for i := 0; i < rgNum; i++ {
rgs = append(rgs, fmt.Sprintf("rg_%d", i))
s.Cluster.QueryCoord.CreateResourceGroup(ctx, &milvuspb.CreateResourceGroupRequest{
ResourceGroup: rgs[i],
Config: &rgpb.ResourceGroupConfig{
Requests: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},
Limits: &rgpb.ResourceGroupLimit{
NodeNum: 1,
},

TransferFrom: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
TransferTo: []*rgpb.ResourceGroupTransfer{
{
ResourceGroup: meta.DefaultResourceGroupName,
},
},
},
})
}

resp, err := s.Cluster.QueryCoord.ListResourceGroups(ctx, &milvuspb.ListResourceGroupsRequest{})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
s.Len(resp.GetResourceGroups(), rgNum+1)

for i := 1; i < rgNum; i++ {
s.Cluster.AddQueryNode()
}

s.Eventually(func() bool {
matchCounter := 0
for _, rg := range rgs {
resp1, err := s.Cluster.QueryCoord.DescribeResourceGroup(ctx, &querypb.DescribeResourceGroupRequest{
ResourceGroup: rg,
})
s.NoError(err)
s.True(merr.Ok(resp.GetStatus()))
if len(resp1.ResourceGroup.Nodes) == 1 {
matchCounter += 1
}
}
return matchCounter == rgNum
}, 30*time.Second, time.Second)

newDbName := "db_load_test_with_db_level_config"
resp1, err := s.Cluster.Proxy.CreateDatabase(ctx, &milvuspb.CreateDatabaseRequest{
DbName: newDbName,
Properties: []*commonpb.KeyValuePair{
{
Key: common.DatabaseReplicaNumber,
Value: "3",
},
{
Key: common.DatabaseResourceGroups,
Value: strings.Join(rgs, ","),
},
},
})
s.NoError(err)
s.True(merr.Ok(resp1))

s.CreateCollectionWithConfiguration(ctx, &integration.CreateCollectionConfig{
DBName: newDbName,
Dim: dim,
CollectionName: collectionName,
ChannelNum: 1,
SegmentNum: 3,
RowNumPerSegment: 2000,
})

// load collection without specified replica and rgs
s.loadCollection(collectionName, newDbName, 0, nil)
resp2, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: newDbName,
CollectionName: collectionName,
})
s.NoError(err)
s.True(merr.Ok(resp2.Status))
s.Len(resp2.GetReplicas(), 3)
s.releaseCollection(newDbName, collectionName)
}

func TestReplicas(t *testing.T) {
Expand Down
Loading
Loading