Skip to content

Inconsistent async cause failure #216

@DragonRollDev

Description

@DragonRollDev

I'm not 100% sure this is related to Valkey itself, but it is a consistent behavior in which it stops happening when I switch Valkey from async to sync, all other async libraries don't suffer from the same problem and that's why I think the problems might be within Valkey itself...

I'm getting inconsistent async behavior, sometimes valkey operations fails and its hard to pinpoint why, I tried debugging into the internals of valkeypy itself to no luck.

I have a suit of tests that is using a simple valkey set/get to write a session token to the db, those are all async implementation of the valkey connector, and each test case executes ok on their own. The problem starts when I try to execute all tests at the same time, then, I get an inconsistent behaviour where sometimes the valkey command get just halts the test, as if the problem was within the async loop itself. The problem is fixed if I switch by to non async valkey. Same endpoints are also doing postgresql+asyncpg operations without a problem.

Here are the things I've tried:

Guaranteeing the same event loop through all test suit(session scoped), tried function scoped, tried flushing db between tests, tried reconnecting+disconnecting in the function scope, tried calling different set methods, like setnx instead.

Also tried connecting through URL, through ConnectionPool, tried also surrounding the problematic get functio into a pipeline(multi/exec), in which case, the problem started happening on the exec instead of the get(might be pointing to an IO bound problem as the actual download is happening on the execute instead).

And then finally tried reverting valkey to synchronous and everything is back working. I'm not really sure how to reproduce the problem because it is completely inconsistent, sometimes, after a while, its a different set of tests that fail.

I extracted the problematic flow the API in order to execute it apart from the API app and got this error output:

self = <valkey.asyncio.connection.Connection(host=localhost,port=6379,db=0)>
command = [b'*1\r\n$5\r\nMULTI\r\n*5\r\n$3\r\nSET\r\n$15\r\nrefresh_token:1\r\n$164\r\neyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiaWF0IjoxNzU2MDA2MzE5LCJleHAiOjE3NTY2MTExMTksInR5cGUiOiJyZWZyZXNoIn0.6cxYwgbKK_Y1p_3c7OrR5T1bhZ-QRi0WDEQVioqgHa0\r\n$2\r\nEX\r\n$6\r\n604800\r\n*1\r\n$4\r\nEXEC\r\n']
check_health = True

    async def send_packed_command(
        self, command: Union[bytes, str, Iterable[bytes]], check_health: bool = True
    ) -> None:
        if not self.is_connected:
            await self.connect()
        elif check_health:
            await self.check_health()
    
        try:
            if isinstance(command, str):
                command = command.encode()
            if isinstance(command, bytes):
                command = [command]
            if self.socket_timeout:
>               await asyncio.wait_for(
                    self._send_packed_command(command), self.socket_timeout
                )

.venv\Lib\site-packages\valkey\asyncio\connection.py:514: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\tasks.py:520: in wait_for
    return await fut
           ^^^^^^^^^
.venv\Lib\site-packages\valkey\asyncio\connection.py:497: in _send_packed_command
    self._writer.writelines(command)
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\streams.py:349: in writelines
    self._transport.writelines(data)
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\transports.py:123: in writelines
    self.write(data)
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\proactor_events.py:366: in write
    self._loop_writing(data=bytes(data))
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <_ProactorSocketTransport closing fd=1380 read=<_OverlappedFuture cancelled>>
f = None
data = b'*1\r\n$5\r\nMULTI\r\n*5\r\n$3\r\nSET\r\n$15\r\nrefresh_token:1\r\n$164\r\neyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiaWF0IjoxNzU2MDA2MzE5LCJleHAiOjE3NTY2MTExMTksInR5cGUiOiJyZWZyZXNoIn0.6cxYwgbKK_Y1p_3c7OrR5T1bhZ-QRi0WDEQVioqgHa0\r\n$2\r\nEX\r\n$6\r\n604800\r\n*1\r\n$4\r\nEXEC\r\n'

    def _loop_writing(self, f=None, data=None):
        try:
            if f is not None and self._write_fut is None and self._closing:
                # XXX most likely self._force_close() has been called, and
                # it has set self._write_fut to None.
                return
            assert f is self._write_fut
            self._write_fut = None
            self._pending_write = 0
            if f:
                f.result()
            if data is None:
                data = self._buffer
                self._buffer = None
            if not data:
                if self._closing:
                    self._loop.call_soon(self._call_connection_lost, None)
                if self._eof_written:
                    self._sock.shutdown(socket.SHUT_WR)
                # Now that we've reduced the buffer size, tell the
                # protocol to resume writing if it was paused.  Note that
                # we do this last since the callback is called immediately
                # and it may add more data to the buffer (even causing the
                # protocol to be paused again).
                self._maybe_resume_protocol()
            else:
>               self._write_fut = self._loop._proactor.send(self._sock, data)
                                  ^^^^^^^^^^^^^^^^^^^^^^^^^
E               AttributeError: 'NoneType' object has no attribute 'send'

..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\proactor_events.py:402: AttributeError

During handling of the above exception, another exception occurred:

http_client = <sanic_testing.testing.SanicASGITestClient object at 0x000002417FEB2E70>

    @pytest.mark.asyncio
    async def test_refresh_with_expired_refresh_token(http_client):
        # This test would require creating an expired token which is complex
        # But we can verify the endpoint structure works with valid tokens
    
        # Register a user
        from social_backend.users.service import _register
    
>       reg_resp = await _register("bob", "bob@hotmail.com", "secret")
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

social-tests\src\test_users.py:87: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
social-backend\social_backend\users\service.py:96: in _register
    await valkey_factory.execute_pipeline()
valkey_engine\core.py:26: in execute_pipeline
    resp = await self.pipeline.execute()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
.venv\Lib\site-packages\valkey\asyncio\client.py:1574: in execute
    return await conn.retry.call_with_retry(
.venv\Lib\site-packages\valkey\asyncio\retry.py:59: in call_with_retry
    return await do()
           ^^^^^^^^^^
.venv\Lib\site-packages\valkey\asyncio\client.py:1409: in _execute_transaction
    await connection.send_packed_command(all_cmds)
.venv\Lib\site-packages\valkey\asyncio\connection.py:538: in send_packed_command
    await self.disconnect(nowait=True)
.venv\Lib\site-packages\valkey\asyncio\connection.py:460: in disconnect
    self._writer.close()  # type: ignore[union-attr]
    ^^^^^^^^^^^^^^^^^^^^
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\streams.py:358: in close
    return self._transport.close()
           ^^^^^^^^^^^^^^^^^^^^^^^
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\proactor_events.py:109: in close
    self._loop.call_soon(self._call_connection_lost, None)
..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\base_events.py:799: in call_soon
    self._check_closed()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <ProactorEventLoop running=False closed=True debug=False>

    def _check_closed(self):
        if self._closed:
>           raise RuntimeError('Event loop is closed')
E           RuntimeError: Event loop is closed

..\..\..\AppData\Roaming\uv\python\cpython-3.12.9-windows-x86_64-none\Lib\asyncio\base_events.py:545: RuntimeError

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions