diff --git a/tests/base/client_base.py b/tests/base/client_base.py index 8ad48f9..5155a85 100644 --- a/tests/base/client_base.py +++ b/tests/base/client_base.py @@ -1,7 +1,20 @@ from pymilvus import connections +from api.milvus_cdc import MilvusCdcClient + +client = MilvusCdcClient('http://localhost:8444') class TestBase: + + def setup_method(self, method): + res, result = client.list_tasks() + # delete the tasks + for task in res["tasks"]: + task_id = task["task_id"] + rsp, result = client.delete_task(task_id) + assert result + assert rsp == {} + def teardown_method(self, method): if len(connections.list_connections()) > 0: for conn in connections.list_connections(): diff --git a/tests/testcases/test_cdc_create.py b/tests/testcases/test_cdc_create.py index e426dde..ff90b2e 100644 --- a/tests/testcases/test_cdc_create.py +++ b/tests/testcases/test_cdc_create.py @@ -312,19 +312,23 @@ def test_cdc_for_partition_insert_after_cdc_task(self, upstream_host, upstream_p assert num_entities_upstream == num_entities_downstream,\ f"num_entities_upstream {num_entities_upstream} != num_entities_downstream {num_entities_downstream}" - - @pytest.mark.skip(reason="https://github.com/zilliztech/milvus-cdc/issues/5") - def test_cdc_for_cdc_task_by_max(self, upstream_host, upstream_port, downstream_host, downstream_port): + def test_cdc_for_cdc_task_large_than_max_num(self, upstream_host, upstream_port, downstream_host, downstream_port): max_task = 100 + # delete the tasks + res, result = client.list_tasks() + for task in res["tasks"]: + task_id = task["task_id"] + rsp, result = client.delete_task(task_id) + log.info(f"delete task response: {rsp}") + assert result res, result = client.list_tasks() assert result log.info(f"list tasks response: {res}") num_tasks = len(res["tasks"]) log.info(f"num_tasks: {num_tasks}") assert num_tasks <= max_task - available_task = max_task - num_tasks - for i in range(available_task): + for i in range(available_task+3): time.sleep(0.01) c_name = prefix + datetime.now().strftime('%Y_%m_%d_%H_%M_%S_%f') c_infos = [ @@ -343,13 +347,32 @@ def test_cdc_for_cdc_task_by_max(self, upstream_host, upstream_port, downstream_ "collection_infos": c_infos } rsp, result = client.create_task(request_data) - assert result - log.info(f"create task response: {rsp}") - task_id = rsp['task_id'] - log.info(f"task_id: {task_id}") + if i < available_task: + assert result + log.info(f"create task response: {rsp}") + task_id = rsp['task_id'] + log.info(f"task_id: {task_id}") + else: + log.info(f"create task response: {rsp}") + # assert not result + # check the number of tasks res, result = client.list_tasks() assert result log.info(f"list tasks response: {res}") num_tasks = len(res["tasks"]) log.info(f"num_tasks: {num_tasks}") + assert num_tasks == max_task + # delete the tasks + for task in res["tasks"]: + task_id = task["task_id"] + rsp, result = client.delete_task(task_id) + log.info(f"delete task response: {rsp}") + assert result + + # check the number of tasks + res, result = client.list_tasks() + assert result + log.info(f"list tasks response: {res}") + num_tasks = len(res["tasks"]) + assert num_tasks == 0