Skip to content

Asyncio errors when cleaning up async drivers #1044

@williamhakim10

Description

@williamhakim10

Bug Report

We've been experiencing issues with event loops and trying to close() on an AsyncNeo4jDriver.

This is the error we've been seeing:

Traceback (most recent call last):
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 1463, in wsgi_app
    response = self.full_dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 872, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 870, in full_dispatch_request
    rv = self.dispatch_request()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/flask/app.py", line 855, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)  # type: ignore[no-any-return]
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/functions_framework/__init__.py", line 178, in view_func
    function(event)
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/functions_framework/__init__.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/workspace/main.py", line 66, in schedule_collection
    asyncio.run(graph_db_template.dispose())
  File "/layers/google.python.runtime/python/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/layers/google.python.runtime/python/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/workspace/libs/database_support/graph_database_template.py", line 114, in dispose
    await self.__async_driver.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/driver.py", line 602, in close
    await self._pool.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/io/_pool.py", line 483, in close
    await self._close_connections(connections)
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/io/_pool.py", line 417, in _close_connections
    await connection.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async/io/_bolt.py", line 953, in close
    await self.socket.close()
  File "/layers/google.python.pip/pip/lib/python3.10/site-packages/neo4j/_async_compat/network/_bolt_socket.py", line 165, in close
    await self._writer.wait_closed()
  File "/layers/google.python.runtime/python/lib/python3.10/asyncio/streams.py", line 343, in wait_closed
    await self._protocol._get_close_waiter(self)
RuntimeError: Task <Task pending name='Task-364892' coro=<GraphDatabaseTemplate.dispose() running at /workspace/libs/database_support/graph_database_template.py:114> cb=[_run_until_complete_cb() at /layers/google.python.runtime/python/lib/python3.10/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

Our code for handling graph db drivers looks like this. I am including the handlers for dead connections for completeness, but I've tried removing them and refactoring the code to use execute_query or something else instead, and it doesn't matter.

class GraphDatabaseTemplate:
    def __init__(
        self, *, db_url: str, db_user: str, db_password: str, database: str
    ) -> None:
        ...
        self.__driver = self.__create_driver()
        self.__async_driver = self.__create_async_driver()

    def __create_driver(self) -> Driver:
        return GraphDatabase.driver(
            self.__db_url,
            auth=self.__db_auth,
            database=self.__default_db,
            max_connection_lifetime=40,
            keep_alive=True,
        )

    def __create_async_driver(self) -> AsyncDriver:
        return AsyncGraphDatabase.driver(
            self.__db_url,
            auth=self.__db_auth,
            database=self.__default_db,
            max_connection_lifetime=40,
            keep_alive=True,
        )

    @asynccontextmanager
    async def begin_async(self) -> AsyncIterator[AsyncSession]:
        async with self.__async_driver.session() as session:
            yield session

    async def query_async(
        self,
        *,
        statement: LiteralString,
        session: Optional[AsyncSession] = None,
        parameters: Optional[dict[str, Any]] = None,
        retries_remaining: int = _MAX_RETRIES,
    ) -> list[Record]:
        # AuraDB has a connection timeout of 60 minutes; this is the recommended way of handling SessionExpired errors
        # https://support.neo4j.com/s/article/1500001173021-How-to-handle-Session-Expired-Errors-while-connecting-to-Neo4j-Aura
        try:
            if session is None:
                async with self.begin_async() as session:
                    return [r async for r in await session.run(statement, parameters)]
            else:
                return [r async for r in await session.run(statement, parameters)]
        except SessionExpired:
            if retries_remaining:
                self.__async_driver = self.__create_async_driver()
                # if the connection expired, the session will correspond to the dead driver instance; retry without one
                return await self.query_async(
                    statement=statement,
                    session=None,
                    parameters=parameters,
                    retries_remaining=retries_remaining - 1,
                )
            raise

    def dispose(self) -> None:
        self.__driver.close()
        asyncio.run(self.__async_driver.close())

This code gets called inside of a Cloud Function which looks like this:

@functions_framework.cloud_event
def schedule_collection(event: CloudEvent) -> None:
   ...
    graph_db_template = GraphDatabaseTemplate(
        db_url=env.neo4j_url,
        db_user="neo4j",
        db_password=env.neo4j_password,
        database=env.neo4j_database,
    )
    try:
        ... do some stuff here that involves `asyncio.run(...)` and uses the async driver ...
    except Exception:
        logger.exception("unable to handle event %s", event)
    finally:
        graph_db_template.dispose()

I have tried various permutations of keeping track of the event loop (i.e. by storing it in the template) and then using a lower-level asyncio primitive which allows passing in a loop, as well as just allowing the driver to clean itself up without doing anything at all - in both cases I get a RuntimeError('Event loop is closed').

Note that I am able to recreate this behavior using a simple unittest test case with only the following parameters as long as the test in question actually queries the graph.

class TestCase(IsolatedAsyncioTestCase):
    def setUp(self) -> None:
        super().setUp()
        self.graph = test_graph_template()

    def tearDown(self) -> None:
        super().tearDown()
        self.graph.dispose()
        
    async def test_fails(self) -> None:
        await self.graph_db.query_async(statement="MATCH (n) RETURN n")
     
    async def test_passes(self) -> None:
        pass

I see that this came up previously (#950) but obviously we're on 3.10 so that particular issue hasn't gone away.

Thanks for taking a look and appreciate the support!

My Environment

Python Version: Python 3.10.13 (main, Apr 8 2024, 10:10:13) [Clang 15.0.0 (clang-1500.3.9.4)]
Driver Version: 5.19.0
Server Version and Edition: Neo4j EE 5.18.0 (locally, in tests) but also AuraDB
Operating System: MacOS Sonoma 14.3, but also ubuntu:latest in Github CI and whatever linux version Cloud Run uses.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions