Skip to content

Commit

Permalink
Add the task num limit
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Mar 25, 2023
1 parent 457e216 commit 365f83d
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 7 deletions.
8 changes: 8 additions & 0 deletions server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
e.collectionNames.data[milvusAddress] = lo.Without(e.collectionNames.data[milvusAddress], newCollectionNames...)
}

getResp, err := util.EtcdGet(e.etcdCli, getTaskInfoPrefix(e.rootPath), clientv3.WithPrefix(), clientv3.WithCountOnly())
if err != nil {
return nil, NewServerError(errors.WithMessage(err, "fail to get task list to check num"))
}
if getResp.Count > int64(e.config.MaxTaskNum) {
return nil, NewServerError(errors.WithMessagef(err, "the task num has reach the limit, %d", e.config.MaxTaskNum))
}

info := &meta.TaskInfo{
TaskID: e.getUuid(),
MilvusConnectParam: req.MilvusConnectParam,
Expand Down
3 changes: 2 additions & 1 deletion server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package server
import "github.com/zilliztech/milvus-cdc/core/config"

type CDCServerConfig struct {
Address string // like: "localhost:8080"
Address string // like: "localhost:8080"
MaxTaskNum int
EtcdConfig CDCEtcdConfig // cdc meta data save
SourceConfig MilvusSourceConfig // cdc source
MaxNameLength int
Expand Down
2 changes: 2 additions & 0 deletions server/configs/cdc.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
---
address: localhost
port: 8444
task.maxNum: 100
name:maxLength: 256
etcd.endpoints: [localhost:2379] # etcd endpoints
etcd.rootpath: cdc
source:
Expand Down
16 changes: 10 additions & 6 deletions server/main/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@ import (
)

type config struct {
Address string
Port int
Endpoints []string `yaml:"etcd.endpoints"`
RootPath string `yaml:"etcd.rootpath"`
Source struct {
Address string
Port int
Endpoints []string `yaml:"etcd.endpoints"`
RootPath string `yaml:"etcd.rootpath"`
MaxTaskNum int `yaml:"task.maxNum"`
MaxNameLength int `yaml:"name.maxLength"`
Source struct {
Endpoints []string `yaml:"etcd.endpoints"`
RootPath string `yaml:"etcd.rootpath"`
MetaPath string `yaml:"etcd.meta.path"`
Expand Down Expand Up @@ -81,7 +83,9 @@ func main() {
}

s.Run(&server.CDCServerConfig{
Address: fmt.Sprintf("%s:%d", conf.Address, conf.Port),
Address: fmt.Sprintf("%s:%d", conf.Address, conf.Port),
MaxTaskNum: conf.MaxTaskNum,
MaxNameLength: conf.MaxNameLength,
EtcdConfig: server.CDCEtcdConfig{
Endpoints: conf.Endpoints,
RootPath: conf.RootPath,
Expand Down

0 comments on commit 365f83d

Please sign in to comment.