diff --git a/core/reader/milvus_reader.go b/core/reader/milvus_reader.go index 7ae9178..185b51a 100644 --- a/core/reader/milvus_reader.go +++ b/core/reader/milvus_reader.go @@ -521,7 +521,6 @@ func (reader *MilvusCollectionReader) readMsg(collectionName string, collectionI if reader.filterMsgType(msgType) { continue } - // TODO fubang dropPartition no collection id if reader.filterMsg(collectionName, collectionID, msg) { continue } @@ -529,6 +528,9 @@ func (reader *MilvusCollectionReader) readMsg(collectionName string, collectionI Msg: msg, } if barrierManager.IsBarrierData(data) { + if dropPartitionMsg, ok := msg.(*msgstream.DropPartitionMsg); ok { + dropPartitionMsg.CollectionName = collectionName + } barrierManager.AddData(vchannelName, data) if _, ok := msg.(*msgstream.DropCollectionMsg); ok { return diff --git a/server/cdc_impl.go b/server/cdc_impl.go index 2ef9405..03567f8 100644 --- a/server/cdc_impl.go +++ b/server/cdc_impl.go @@ -183,7 +183,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon if err != nil { return nil, NewServerError(errors.WithMessage(err, "fail to get task list to check num")) } - if getResp.Count > int64(e.config.MaxTaskNum) { + if getResp.Count >= int64(e.config.MaxTaskNum) { return nil, NewServerError(errors.Newf("the task num has reach the limit, %d", e.config.MaxTaskNum)) }