From 8fd6b301b457856069d83f2b541cb05160551c26 Mon Sep 17 00:00:00 2001 From: martin bendsoe Date: Wed, 1 Jul 2020 14:37:17 +0200 Subject: [PATCH 1/2] can handle noop messages --- neo4j/io/_courier.py | 6 +++++ neo4j/packstream.py | 1 - tests/integration/conftest.py | 4 ++-- tests/stub/conftest.py | 13 ++++++----- .../v4x1/return_1_noop_port_9001.script | 15 +++++++++++++ tests/stub/test_directdriver.py | 22 +++++++++++++++++-- 6 files changed, 50 insertions(+), 11 deletions(-) create mode 100644 tests/stub/scripts/v4x1/return_1_noop_port_9001.script diff --git a/neo4j/io/_courier.py b/neo4j/io/_courier.py index f20e20ccc..cf2d23f55 100644 --- a/neo4j/io/_courier.py +++ b/neo4j/io/_courier.py @@ -24,6 +24,9 @@ Unpacker, ) +import logging +log = logging.getLogger("neo4j") + class MessageInbox: @@ -53,6 +56,9 @@ def _yield_messages(self, sock): while chunk_size != 0: chunk_size = next(chunk_loader) size, tag = unpacker.unpack_structure_header() + if tag is None: + log.debug("[#%04X] S: ", sock.getsockname()[1]) + continue fields = [unpacker.unpack() for _ in range(size)] yield tag, fields except OSError as error: diff --git a/neo4j/packstream.py b/neo4j/packstream.py index 42d6afcc6..7e25d7137 100644 --- a/neo4j/packstream.py +++ b/neo4j/packstream.py @@ -23,7 +23,6 @@ from io import BytesIO from struct import pack as struct_pack, unpack as struct_unpack - PACKED_UINT_8 = [struct_pack(">B", value) for value in range(0x100)] PACKED_UINT_16 = [struct_pack(">H", value) for value in range(0x10000)] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 706123ccd..e36cf9d90 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -35,10 +35,10 @@ from neo4j.io import Bolt # import logging +# log = logging.getLogger("neo4j") +# # from neo4j.debug import watch # watch("neo4j") -# -# log = logging.getLogger("neo4j") NEO4J_RELEASES = getenv("NEO4J_RELEASES", "snapshot-enterprise 3.5-enterprise").split() NEO4J_HOST = "localhost" diff --git a/tests/stub/conftest.py b/tests/stub/conftest.py index 44a891db3..e0298db21 100644 --- a/tests/stub/conftest.py +++ b/tests/stub/conftest.py @@ -28,11 +28,11 @@ from boltkit.server.stub import BoltStubService from pytest import fixture -# import logging +import logging +log = logging.getLogger("neo4j") + # from neo4j.debug import watch # watch("neo4j") -# -# log = logging.getLogger("neo4j") class StubServer: @@ -44,16 +44,17 @@ def __init__(self, port, script): def run(self): self._process = subprocess.Popen(["python", "-m", "boltkit", "stub", "-v", "-l", ":{}".format(str(self.port)), "-t", "10", self.script], stdout=subprocess.PIPE) # Need verbose for this to work - line =self._process.stdout.readline() + line = self._process.stdout.readline() def wait(self): try: returncode = self._process.wait(2) if returncode != 0: - print("Stubserver failed with error") + log.debug("stubserver return code {}".format(returncode)) + log.debug("check for miss match in script") return returncode == 0 except subprocess.TimeoutExpired: - print("Stubserver timeout!") + log.debug("stubserver timeout!") return False def kill(self): diff --git a/tests/stub/scripts/v4x1/return_1_noop_port_9001.script b/tests/stub/scripts/v4x1/return_1_noop_port_9001.script new file mode 100644 index 000000000..41e5f1710 --- /dev/null +++ b/tests/stub/scripts/v4x1/return_1_noop_port_9001.script @@ -0,0 +1,15 @@ +!: BOLT 4.1 +!: AUTO GOODBYE +!: AUTO RESET +!: PORT 9001 + +C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"} +S: SUCCESS {"server": "Neo4j/4.1.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"} +C: RUN "RETURN 1 AS x" {} {"mode": "r"} + PULL {"n": 2} +S: SUCCESS {"fields": ["x"]} + + + RECORD [1] + + SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "r", "t_last": 5, "db": "neo4j"} diff --git a/tests/stub/test_directdriver.py b/tests/stub/test_directdriver.py index 4504eb287..4661f2c14 100644 --- a/tests/stub/test_directdriver.py +++ b/tests/stub/test_directdriver.py @@ -543,7 +543,7 @@ def unwind(transaction): @pytest.mark.parametrize( "test_script, database", [ - ("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), # TODO: Fix correct new behaviour with qid + ("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), ] ) def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, test_script, database): @@ -561,7 +561,7 @@ def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, tes @pytest.mark.parametrize( "test_script, database", [ - ("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), # TODO: Fix correct new behaviour with qid + ("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), ] ) def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, test_script, database): @@ -575,3 +575,21 @@ def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, tes result = transaction.run("UNWIND [1,2,3,4] AS x RETURN x") result.consume() transaction.commit() + + +@pytest.mark.parametrize( + "test_script", + [ + "v4x1/return_1_noop_port_9001.script", + ] +) +def test_direct_can_handle_noop(driver_info, test_script): + # python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_can_handle_noop + with StubCluster(test_script): + uri = "bolt://127.0.0.1:9001" + with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver: + assert isinstance(driver, BoltDriver) + with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session: + result = session.run("RETURN 1 AS x") + value = result.single().value() + assert value == 1 From d5a126765e7940fbd766244c80282647c035c23b Mon Sep 17 00:00:00 2001 From: martin bendsoe Date: Thu, 2 Jul 2020 13:01:28 +0200 Subject: [PATCH 2/2] changed the looping for getting a message --- neo4j/io/_courier.py | 42 ++++++++++++++++++++---------------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/neo4j/io/_courier.py b/neo4j/io/_courier.py index cf2d23f55..d994c72d5 100644 --- a/neo4j/io/_courier.py +++ b/neo4j/io/_courier.py @@ -34,33 +34,31 @@ def __init__(self, s, on_error): self.on_error = on_error self._messages = self._yield_messages(s) - @classmethod - def _load_chunks(cls, sock, buffer): - chunk_size = 0 - while True: - if chunk_size == 0: - buffer.receive(sock, 2) - chunk_size = buffer.pop_u16() - if chunk_size > 0: - buffer.receive(sock, chunk_size + 2) - yield chunk_size - def _yield_messages(self, sock): try: buffer = UnpackableBuffer() - chunk_loader = self._load_chunks(sock, buffer) unpacker = Unpacker(buffer) + chunk_size = 0 while True: - unpacker.reset() - chunk_size = -1 - while chunk_size != 0: - chunk_size = next(chunk_loader) - size, tag = unpacker.unpack_structure_header() - if tag is None: - log.debug("[#%04X] S: ", sock.getsockname()[1]) - continue - fields = [unpacker.unpack() for _ in range(size)] - yield tag, fields + + while chunk_size == 0: + # Determine the chunk size and skip noop + buffer.receive(sock, 2) + chunk_size = buffer.pop_u16() + if chunk_size == 0: + log.debug("[#%04X] S: ", sock.getsockname()[1]) + + buffer.receive(sock, chunk_size + 2) + chunk_size = buffer.pop_u16() + + if chunk_size == 0: + # chunk_size was the end marker for the message + size, tag = unpacker.unpack_structure_header() + fields = [unpacker.unpack() for _ in range(size)] + yield tag, fields + # Reset for new message + unpacker.reset() + except OSError as error: self.on_error(error)