Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: The collection in downstream can not be dropped when it was dropped in upstream after 120s #3

Closed
zhuwenxing opened this issue Apr 7, 2023 · 3 comments · Fixed by #9 or #12

Comments

@zhuwenxing
Copy link
Collaborator

Current Behavior

pytest_log.log

Expected Behavior

No response

Steps To Reproduce

def test_milvus_cdc_collection(self, upstream_host, upstream_port, downstream_host, downstream_port):
        """
        target: test cdc default
        method: create task with default params
        expected: create successfully
        """
        collection_name = prefix + datetime.now().strftime("%Y_%m_%d_%H_%M_%S_%f")
        request_data = {
            "milvus_connect_param": {
                "host": downstream_host,
                "port": int(downstream_port),
                "username": "",
                "password": "",
                "enable_tls": False,
                "ignore_partition": False,
                "connect_timeout": 10
            },
            "collection_infos": [
                {
                    "name": collection_name
                }
            ]
        }
        # create a cdc task
        rsp, result = client.create_task(request_data)
        assert result
        log.info(f"create task response: {rsp}")
        task_id = rsp['task_id']
        # get the cdc task
        rsp, result = client.get_task(task_id)
        assert result
        log.info(f"get task {task_id} response: {rsp}")
        # check create collection and  insert entities to collection
        connections.connect(host=upstream_host, port=upstream_port)
        checker = InsertEntitiesCollectionChecker(host=upstream_host, port=upstream_port, c_name=collection_name)
        checker.run()
        time.sleep(120)
        all_collections = list_collections()
        # pause the insert task
        log.info(f"start to pause the insert task")
        checker.pause()
        log.info(f"pause the insert task successfully")
        # check the collection in upstream
        num_entities_upstream =  checker.get_num_entities()
        log.info(f"num_entities_upstream: {num_entities_upstream}")
        count_by_query_upstream = checker.get_count_by_query()
        log.info(f"count_by_query_upstream: {count_by_query_upstream}")
        # check the collection in downstream
        connections.disconnect("default")
        log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
        connections.connect(host=downstream_host, port=downstream_port)
        all_collections = list_collections()
        collection = Collection(name=collection_name)
        collection.create_index(field_name="float_vector", index_params={"index_type": "IVF_FLAT", "metric_type": "L2", "params": {"nlist": 128}})
        collection.load()
        # wait for the collection to be synced
        timeout = 120
        t0 = time.time()
        while True and time.time() - t0 < timeout:
            count_by_query_downstream = len(collection.query(expr=checker.query_expr, output_fields=checker.output_fields))
            if count_by_query_downstream == count_by_query_upstream:
                break
            time.sleep(1)
            if time.time() - t0 > timeout:
                raise Exception(f"Timeout waiting for collection {collection_name} to be synced")
        log.info(f"count_by_query_downstream: {count_by_query_downstream}")
        assert count_by_query_upstream == count_by_query_downstream
        # wait for the collection to be flushed
        time.sleep(20)
        collection.flush()
        num_entities_downstream = collection.num_entities
        log.info(f"num_entities_downstream: {num_entities_downstream}")
        assert num_entities_upstream == num_entities_downstream, f"num_entities_upstream {num_entities_upstream} != num_entities_downstream {num_entities_downstream}"

        # delete the entities in upstream
        connections.disconnect("default")
        log.info(f"start to connect to upstream {upstream_host} {upstream_port}")
        connections.connect(host=upstream_host, port=upstream_port)
        log.info(f"start to delete the entities in upstream")
        delete_expr = f"int64 in {[i for i in range(0, 3000)]}"
        checker.collection.delete(delete_expr)
        while True and time.time() - t0 < timeout:
            res = checker.collection.query(expr=delete_expr, output_fields=checker.output_fields)
            if len(res) == 0:
                break
            else:
                log.info(f"res: {len(res)}")
            time.sleep(1)
            if time.time() - t0 > timeout:
                raise Exception(f"Timeout waiting for delete entities in upstream")
        log.info(f"res: {res}")
        count_by_query_upstream = len(res)
        assert count_by_query_upstream == 0
        log.info(f"delete the entities in upstream successfully")
        # check the collection in downstream
        connections.disconnect("default")
        log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
        connections.connect(host=downstream_host, port=downstream_port)
        collection = Collection(name=collection_name)
        collection.load()
        # wait for the collection to be synced
        timeout = 120
        t0 = time.time()
        while True and time.time() - t0 < timeout:
            count_by_query_downstream = len(collection.query(expr=delete_expr, output_fields=checker.output_fields))
            if count_by_query_downstream == count_by_query_upstream:
                log.info(f"cost time: {time.time() - t0} to sync the delete entities")
                break
            else:
                log.info(f"count_by_query_downstream: {count_by_query_downstream}")
            time.sleep(1)
            if time.time() - t0 > timeout:
                raise Exception(f"Timeout waiting for collection {collection_name} to be synced")
        log.info(f"count_by_query_downstream: {count_by_query_downstream}")
        assert count_by_query_upstream == count_by_query_downstream

        # drop the collection in upstream
        connections.disconnect("default")
        log.info(f"start to connect to upstream {upstream_host} {upstream_port}")
        connections.connect(host=upstream_host, port=upstream_port)
        log.info(f"start to drop the collection in upstream")
        checker.collection.drop()
        t0 = time.time()
        while True and time.time() - t0 < timeout:
            if collection_name not in list_collections():
                break
            time.sleep(1)
            log.info(f"collection: {collection_name} still exists")
            if time.time() - t0 > timeout:
                log.error(f"Timeout waiting for collection {collection_name} to be dropped")
        log.info(f"drop the collection in upstream successfully")
        # check the collection in downstream
        connections.disconnect("default")
        log.info(f"start to connect to downstream {downstream_host} {downstream_port}")
        connections.connect(host=downstream_host, port=downstream_port)
        t0 = time.time()
        while True and time.time() - t0 < timeout:
            log.info(f"all collections in downstream: {list_collections()}")
            if collection_name not in list_collections():
                log.info(f"cost time: {time.time() - t0} to drop the collection")
                break
            time.sleep(1)
            log.info(f"collection: {collection_name} still exists")
            if time.time() - t0 > timeout:
                log.error(f"Timeout waiting for collection {collection_name} to be dropped")
        assert collection_name not in list_collections()


### Environment

_No response_

### Anything else?

_No response_
@zhuwenxing
Copy link
Collaborator Author

same for partition

@zhuwenxing
Copy link
Collaborator Author

zhuwenxing commented Apr 10, 2023

step:

  1. create cdc task
  2. create a collection and insert entities in upstream
  3. check downstream
  4. delete entities in upstream
  5. check downstream
  6. drop collection in upstream
  7. check downstream

failed at step 7

cdc log:

❯ ./cdc
[2023/04/10 10:36:58.307 +08:00] [INFO] [server/server.go:46] ["start server..."]
[2023/04/10 10:40:20.361 +08:00] [INFO] [server/monitor.go:56] ["success to get a collection info"] [task_id=592858e5-9a2a-4be9-9851-1d77cbc64cf0] [id=440598435665473071] [name=cdc_e2e_2023_04_10_10_40_19_330521]
INFO[0202] Connecting to broker                          remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] TCP connection established                    local_addr="172.16.20.40:50241" remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] Connection is ready                           local_addr="172.16.20.40:50241" remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] Connecting to broker                          remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] TCP connection established                    local_addr="172.16.20.40:50242" remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] Connection is ready                           local_addr="172.16.20.40:50242" remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] Connected consumer                            consumerID=1 name=jiemq subscription="cdc-test-source-rootcoord-dml_220_440598435665473071v0�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_220"
INFO[0202] Created consumer                              consumerID=1 name=jiemq subscription="cdc-test-source-rootcoord-dml_220_440598435665473071v0�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_220"
[2023/04/10 10:40:20.577 +08:00] [INFO] [msgstream/mq_msgstream.go:168] ["Successfully create consumer"] [channel=cdc-test-source-rootcoord-dml_220] [subname=cdc-test-source-rootcoord-dml_220_440598435665473071v0�]
[2023/04/10 10:40:20.580 +08:00] [INFO] [msgstream/mq_msgstream.go:432] ["MsgStream seek begin"] [channel=cdc-test-source-rootcoord-dml_220] [MessageID="CO6FAhCFfBgAIAA="]
INFO[0202] Broker notification of Closed consumer: 1     local_addr="172.16.20.40:50242" remote_addr="pulsar://10.101.93.105:6650"
INFO[0202] Reconnecting to broker in 108.975847ms        consumerID=1 name=jiemq subscription="cdc-test-source-rootcoord-dml_220_440598435665473071v0�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_220"
INFO[0203] Connected consumer                            consumerID=1 name=jiemq subscription="cdc-test-source-rootcoord-dml_220_440598435665473071v0�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_220"
INFO[0203] Reconnected consumer to broker                consumerID=1 name=jiemq subscription="cdc-test-source-rootcoord-dml_220_440598435665473071v0�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_220"
[2023/04/10 10:40:20.732 +08:00] [INFO] [msgstream/mq_msgstream.go:438] ["MsgStream seek finished"] [channel=cdc-test-source-rootcoord-dml_220]
INFO[0203] Connected consumer                            consumerID=2 name=yansf subscription="cdc-test-source-rootcoord-dml_221_440598435665473071v1�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_221"
INFO[0203] Created consumer                              consumerID=2 name=yansf subscription="cdc-test-source-rootcoord-dml_221_440598435665473071v1�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_221"
[2023/04/10 10:40:20.784 +08:00] [INFO] [msgstream/mq_msgstream.go:168] ["Successfully create consumer"] [channel=cdc-test-source-rootcoord-dml_221] [subname=cdc-test-source-rootcoord-dml_221_440598435665473071v1�]
[2023/04/10 10:40:20.784 +08:00] [INFO] [msgstream/mq_msgstream.go:432] ["MsgStream seek begin"] [channel=cdc-test-source-rootcoord-dml_221] [MessageID="CO2FAhCGfBgAIAA="]
INFO[0203] Broker notification of Closed consumer: 2     local_addr="172.16.20.40:50242" remote_addr="pulsar://10.101.93.105:6650"
INFO[0203] Reconnecting to broker in 115.920411ms        consumerID=2 name=yansf subscription="cdc-test-source-rootcoord-dml_221_440598435665473071v1�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_221"
INFO[0203] Connected consumer                            consumerID=2 name=yansf subscription="cdc-test-source-rootcoord-dml_221_440598435665473071v1�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_221"
INFO[0203] Reconnected consumer to broker                consumerID=2 name=yansf subscription="cdc-test-source-rootcoord-dml_221_440598435665473071v1�" topic="persistent://public/default/cdc-test-source-rootcoord-dml_221"
[2023/04/10 10:40:20.941 +08:00] [INFO] [msgstream/mq_msgstream.go:438] ["MsgStream seek finished"] [channel=cdc-test-source-rootcoord-dml_221]

ci_test_log.log

@zhuwenxing
Copy link
Collaborator Author

drop partition still failed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
1 participant