Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 23 additions & 19 deletions neo4j/io/_courier.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,37 +24,41 @@
Unpacker,
)

import logging
log = logging.getLogger("neo4j")


class MessageInbox:

def __init__(self, s, on_error):
self.on_error = on_error
self._messages = self._yield_messages(s)

@classmethod
def _load_chunks(cls, sock, buffer):
chunk_size = 0
while True:
if chunk_size == 0:
buffer.receive(sock, 2)
chunk_size = buffer.pop_u16()
if chunk_size > 0:
buffer.receive(sock, chunk_size + 2)
yield chunk_size

def _yield_messages(self, sock):
try:
buffer = UnpackableBuffer()
chunk_loader = self._load_chunks(sock, buffer)
unpacker = Unpacker(buffer)
chunk_size = 0
while True:
unpacker.reset()
chunk_size = -1
while chunk_size != 0:
chunk_size = next(chunk_loader)
size, tag = unpacker.unpack_structure_header()
fields = [unpacker.unpack() for _ in range(size)]
yield tag, fields

while chunk_size == 0:
# Determine the chunk size and skip noop
buffer.receive(sock, 2)
chunk_size = buffer.pop_u16()
if chunk_size == 0:
log.debug("[#%04X] S: <NOOP>", sock.getsockname()[1])

buffer.receive(sock, chunk_size + 2)
chunk_size = buffer.pop_u16()

if chunk_size == 0:
# chunk_size was the end marker for the message
size, tag = unpacker.unpack_structure_header()
fields = [unpacker.unpack() for _ in range(size)]
yield tag, fields
# Reset for new message
unpacker.reset()

except OSError as error:
self.on_error(error)

Expand Down
1 change: 0 additions & 1 deletion neo4j/packstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
from io import BytesIO
from struct import pack as struct_pack, unpack as struct_unpack


PACKED_UINT_8 = [struct_pack(">B", value) for value in range(0x100)]
PACKED_UINT_16 = [struct_pack(">H", value) for value in range(0x10000)]

Expand Down
4 changes: 2 additions & 2 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@
from neo4j.io import Bolt

# import logging
# log = logging.getLogger("neo4j")
#
# from neo4j.debug import watch
# watch("neo4j")
#
# log = logging.getLogger("neo4j")

NEO4J_RELEASES = getenv("NEO4J_RELEASES", "snapshot-enterprise 3.5-enterprise").split()
NEO4J_HOST = "localhost"
Expand Down
13 changes: 7 additions & 6 deletions tests/stub/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
from boltkit.server.stub import BoltStubService
from pytest import fixture

# import logging
import logging
log = logging.getLogger("neo4j")

# from neo4j.debug import watch
# watch("neo4j")
#
# log = logging.getLogger("neo4j")


class StubServer:
Expand All @@ -44,16 +44,17 @@ def __init__(self, port, script):
def run(self):
self._process = subprocess.Popen(["python", "-m", "boltkit", "stub", "-v", "-l", ":{}".format(str(self.port)), "-t", "10", self.script], stdout=subprocess.PIPE)
# Need verbose for this to work
line =self._process.stdout.readline()
line = self._process.stdout.readline()

def wait(self):
try:
returncode = self._process.wait(2)
if returncode != 0:
print("Stubserver failed with error")
log.debug("stubserver return code {}".format(returncode))
log.debug("check for miss match in script")
return returncode == 0
except subprocess.TimeoutExpired:
print("Stubserver timeout!")
log.debug("stubserver timeout!")
return False

def kill(self):
Expand Down
15 changes: 15 additions & 0 deletions tests/stub/scripts/v4x1/return_1_noop_port_9001.script
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
!: BOLT 4.1
!: AUTO GOODBYE
!: AUTO RESET
!: PORT 9001

C: HELLO {"user_agent": "test", "scheme": "basic", "principal": "test", "credentials": "test"}
S: SUCCESS {"server": "Neo4j/4.1.0", "connection_id": "123e4567-e89b-12d3-a456-426655440000"}
C: RUN "RETURN 1 AS x" {} {"mode": "r"}
PULL {"n": 2}
S: SUCCESS {"fields": ["x"]}
<NOOP>
<NOOP>
RECORD [1]
<NOOP>
SUCCESS {"bookmark": "neo4j:bookmark-test-1", "type": "r", "t_last": 5, "db": "neo4j"}
22 changes: 20 additions & 2 deletions tests/stub/test_directdriver.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ def unwind(transaction):
@pytest.mark.parametrize(
"test_script, database",
[
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), # TODO: Fix correct new behaviour with qid
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"),
]
)
def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, test_script, database):
Expand All @@ -561,7 +561,7 @@ def test_bolt_driver_explicit_transaction_consume_result_case_a(driver_info, tes
@pytest.mark.parametrize(
"test_script, database",
[
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"), # TODO: Fix correct new behaviour with qid
("v4x0/tx_pull_2_discard_all_port_9001.script", "test"),
]
)
def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, test_script, database):
Expand All @@ -575,3 +575,21 @@ def test_bolt_driver_explicit_transaction_consume_result_case_b(driver_info, tes
result = transaction.run("UNWIND [1,2,3,4] AS x RETURN x")
result.consume()
transaction.commit()


@pytest.mark.parametrize(
"test_script",
[
"v4x1/return_1_noop_port_9001.script",
]
)
def test_direct_can_handle_noop(driver_info, test_script):
# python -m pytest tests/stub/test_directdriver.py -s -v -k test_direct_can_handle_noop
with StubCluster(test_script):
uri = "bolt://127.0.0.1:9001"
with GraphDatabase.driver(uri, auth=driver_info["auth_token"], **driver_config) as driver:
assert isinstance(driver, BoltDriver)
with driver.session(fetch_size=2, default_access_mode=READ_ACCESS) as session:
result = session.run("RETURN 1 AS x")
value = result.single().value()
assert value == 1