Skip to content

asyncio error: read() called while another coroutine is already waiting for incoming data #945

@prrao87

Description

@prrao87

Bug Report

I'm able to use the async driver to load data in a blocking manner pretty easily, using the following code.

Method 1

import asyncio
from neo4j import AsyncGraphDatabase


async def create_node_tx(tx, record):
    query = """
        MERGE (p:Product {id: $record.id})
          SET p.item = $record.item, p.price = $record.price
        RETURN p.id as node_id
    """
    result = await tx.run(query, record=record)
    node_id = await result.single()
    print(f"Loaded node with id: {node_id['node_id']}")


async def main(data):
     async with AsyncGraphDatabase.driver(URI, auth=("neo4j", "password123")) as driver:
         async with driver.session(database="neo4j") as session:
              for item in data:
                  await session.execute_write(create_node_tx, item)


if __name__ == "__main__":
    URI = "bolt://localhost:7687"

    data = [
        {"id": 1, "item": "book", "price": 10.0},
        {"id": 2, "item": "pencil", "price": 5.0},
        {"id": 3, "item": "eraser", "price": 2.0},
        {"id": 4, "item": "pen", "price": 6.0},
        {"id": 5, "item": "ruler", "price": 4.0},
    ]
    # Run the main function asynchronously
    asyncio.run(main(data))

Output

Loaded node with id: 1
Loaded node with id: 2
Loaded node with id: 3
Loaded node with id: 4
Loaded node with id: 5

Because I iterate through the list of records in a for loop and await the creation of nodes, the execution must be happening in a blocking fashion.

My understanding is, as per the async client's capability, I should be able to ingest the same data in a non-blocking fashion by creating a list of async tasks, and then gather them to run them in the existing event loop as follows:

Method 2

import asyncio
from neo4j import AsyncGraphDatabase


async def create_node_tx(tx, record):
    query = """
        MERGE (p:Product {id: $record.id})
          SET p.item = $record.item, p.price = $record.price
        RETURN p.id as node_id
    """
    result = await tx.run(query, record=record)
    node_id = await result.single()
    print(f"Loaded node with id: {node_id['node_id']}")


async def main(data):
     async with AsyncGraphDatabase.driver(URI, auth=("neo4j", "abcd1234")) as driver:
         async with driver.session(database="neo4j") as session:
            tasks = [
                asyncio.create_task(session.execute_write(create_node_tx, item))
                for item in data
            ]
            await asyncio.gather(*tasks)


if __name__ == "__main__":
    URI = "bolt://localhost:7687"

    data = [
        {"id": 1, "item": "book", "price": 10.0},
        {"id": 2, "item": "pencil", "price": 5.0},
        {"id": 3, "item": "eraser", "price": 2.0},
        {"id": 4, "item": "pen", "price": 6.0},
        {"id": 5, "item": "ruler", "price": 4.0},
    ]
    # Run the main function asynchronously
    asyncio.run(main(data))

Output 2

However, this code fails with the following stack trace:

Failed to read from defunct connection IPv4Address(('localhost', 7687)) (ResolvedIPv4Address(('127.0.0.1', 7687)))
Transaction failed and will be retried in 0.95423874187965s (Failed to read from defunct connection IPv4Address(('localhost', 7687)) (ResolvedIPv4Address(('127.0.0.1', 7687))))
Failed to read from defunct connection IPv4Address(('localhost', 7687)) (ResolvedIPv4Address(('127.0.0.1', 7687)))
Transaction failed and will be retried in 0.84179296081801s (Failed to read from defunct connection IPv4Address(('localhost', 7687)) (ResolvedIPv4Address(('127.0.0.1', 7687))))
Traceback (most recent call last):
  File "/Users/prrao/code/neo4j-python-fastapi/src/ingest/test2.py", line 37, in <module>
    asyncio.run(main(data))
  File "/Users/prrao/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/prrao/.pyenv/versions/3.11.2/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/.pyenv/versions/3.11.2/lib/python3.11/asyncio/base_events.py", line 653, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/src/ingest/test2.py", line 23, in main
    await asyncio.gather(*tasks)
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/work/session.py", line 697, in execute_write
    return await self._run_transaction(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/work/session.py", line 512, in _run_transaction
    result = await transaction_function(tx, *args, **kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/src/ingest/test2.py", line 11, in create_node_tx
    result = await tx.run(query, record=record)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/work/transaction.py", line 159, in run
    await result._tx_ready_run(query, parameters)
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/work/result.py", line 116, in _tx_ready_run
    await self._run(query, parameters, None, None, None, None, None, None)
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/work/result.py", line 166, in _run
    await self._attach()
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/work/result.py", line 274, in _attach
    await self._connection.fetch_message()
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/io/_common.py", line 190, in inner
    await coroutine_func(*args, **kwargs)
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/io/_bolt.py", line 805, in fetch_message
    tag, fields = await self.inbox.pop(
                  ^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/io/_common.py", line 74, in pop
    await self._buffer_one_chunk()
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/io/_common.py", line 53, in _buffer_one_chunk
    await receive_into_buffer(self._socket, self._buffer, 2)
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async/io/_common.py", line 308, in receive_into_buffer
    n = await sock.recv_into(view[buffer.used:end], end - buffer.used)
        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 156, in recv_into
    res = await self._wait_for_io(io_fut)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 115, in _wait_for_io
    return await wait_for(io_fut, timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/prrao/code/neo4j-python-fastapi/.venv/lib/python3.11/site-packages/neo4j/_async_compat/shims/__init__.py", line 121, in wait_for
    return fut.result()
           ^^^^^^^^^^^^
  File "/Users/prrao/.pyenv/versions/3.11.2/lib/python3.11/asyncio/streams.py", line 689, in read
    await self._wait_for_data('read')
  File "/Users/prrao/.pyenv/versions/3.11.2/lib/python3.11/asyncio/streams.py", line 508, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data

Question

I understand why method 1 works as intended, and I also understand how to use the approach shown in method 1 to ingest large batches of data into Neo4j and then unwind them in Cypher, speeding up ingestion greatly.

But my question is regarding method 2: Why doesn't it work? I can understand that there will be locks imposed if I try to run a create/merge query along with edge queries, because the engine can't merge an edge on a node that doesn't yet exist. However, I'm not trying to do that here. All I'm doing is creating nodes that are completely independent of each other, which, in theory, should be possible in a fully non-blocking fashion as per the syntax shown, correct? I am able to index data in document databases like Elasticsearch with this asyncio.gather approach, so I'm wondering if I'm doing something wrong, or if this is something inherent with the way Neo4j creates node locks prior to async loading?

My Environment

Python Version: Python 3.11.2 (main, Feb 25 2023, 22:38:31) [Clang 13.1.6 (clang-1316.0.21.2.5)]
Driver Version: 5.10.0
Server Version and Edition: 5.9.0 (enterprise)
Operating System: macOS Ventura 13.4.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions