Skip to content

Commit

Permalink
Fix drop partition failure and wrong max task num (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Apr 12, 2023
1 parent 2cc4ff0 commit bd19d94
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
4 changes: 3 additions & 1 deletion core/reader/milvus_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,14 +521,16 @@ func (reader *MilvusCollectionReader) readMsg(collectionName string, collectionI
if reader.filterMsgType(msgType) {
continue
}
// TODO fubang dropPartition no collection id
if reader.filterMsg(collectionName, collectionID, msg) {
continue
}
data := &model.CDCData{
Msg: msg,
}
if barrierManager.IsBarrierData(data) {
if dropPartitionMsg, ok := msg.(*msgstream.DropPartitionMsg); ok {
dropPartitionMsg.CollectionName = collectionName
}
barrierManager.AddData(vchannelName, data)
if _, ok := msg.(*msgstream.DropCollectionMsg); ok {
return
Expand Down
2 changes: 1 addition & 1 deletion server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
if err != nil {
return nil, NewServerError(errors.WithMessage(err, "fail to get task list to check num"))
}
if getResp.Count > int64(e.config.MaxTaskNum) {
if getResp.Count >= int64(e.config.MaxTaskNum) {
return nil, NewServerError(errors.Newf("the task num has reach the limit, %d", e.config.MaxTaskNum))
}

Expand Down

0 comments on commit bd19d94

Please sign in to comment.