Skip to content

Commit

Permalink
AsyncIO Race Condition Fix (#2640)
Browse files Browse the repository at this point in the history
  • Loading branch information
chayim committed Mar 22, 2023
1 parent 8592cac commit b3c89ac
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 8 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/integration.yaml
Expand Up @@ -13,8 +13,8 @@ on:
branches:
- master
- '[0-9].[0-9]'
schedule:
- cron: '0 1 * * *' # nightly build
# schedule:
# - cron: '0 1 * * *' # nightly build

permissions:
contents: read # to fetch code (actions/checkout)
Expand Down
12 changes: 9 additions & 3 deletions redis/asyncio/client.py
Expand Up @@ -1374,10 +1374,16 @@ async def execute(self, raise_on_error: bool = True):
conn = cast(Connection, conn)

try:
return await conn.retry.call_with_retry(
lambda: execute(conn, stack, raise_on_error),
lambda error: self._disconnect_raise_reset(conn, error),
return await asyncio.shield(
conn.retry.call_with_retry(
lambda: execute(conn, stack, raise_on_error),
lambda error: self._disconnect_raise_reset(conn, error),
)
)
except asyncio.CancelledError:
# not supposed to be possible, yet here we are
await conn.disconnect(nowait=True)
raise
finally:
await self.reset()

Expand Down
12 changes: 10 additions & 2 deletions redis/asyncio/cluster.py
Expand Up @@ -1002,10 +1002,18 @@ async def execute_command(self, *args: Any, **kwargs: Any) -> Any:
await connection.send_packed_command(connection.pack_command(*args), False)

# Read response
return await asyncio.shield(
self._parse_and_release(connection, args[0], **kwargs)
)

async def _parse_and_release(self, connection, *args, **kwargs):
try:
return await self.parse_response(connection, args[0], **kwargs)
return await self.parse_response(connection, *args, **kwargs)
except asyncio.CancelledError:
# should not be possible
await connection.disconnect(nowait=True)
raise
finally:
# Release connection
self._free.append(connection)

async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Expand Up @@ -8,7 +8,7 @@
long_description_content_type="text/markdown",
keywords=["Redis", "key-value store", "database"],
license="MIT",
version="4.4.2",
version="4.4.3",
packages=find_packages(
include=[
"redis",
Expand Down
17 changes: 17 additions & 0 deletions tests/test_asyncio/test_cluster.py
Expand Up @@ -340,6 +340,23 @@ async def test_from_url(self, request: FixtureRequest) -> None:
rc = RedisCluster.from_url("rediss://localhost:16379")
assert rc.connection_kwargs["connection_class"] is SSLConnection

async def test_asynckills(self, r) -> None:

await r.set("foo", "foo")
await r.set("bar", "bar")

t = asyncio.create_task(r.get("foo"))
await asyncio.sleep(1)
t.cancel()
try:
await t
except asyncio.CancelledError:
pytest.fail("connection is left open with unread response")

assert await r.get("bar") == b"bar"
assert await r.ping()
assert await r.get("foo") == b"foo"

async def test_max_connections(
self, create_redis: Callable[..., RedisCluster]
) -> None:
Expand Down
22 changes: 22 additions & 0 deletions tests/test_asyncio/test_connection.py
Expand Up @@ -41,6 +41,28 @@ async def test_invalid_response(create_redis):
await r.connection.disconnect()


@pytest.mark.onlynoncluster
async def test_asynckills(create_redis):

for b in [True, False]:
r = await create_redis(single_connection_client=b)

await r.set("foo", "foo")
await r.set("bar", "bar")

t = asyncio.create_task(r.get("foo"))
await asyncio.sleep(1)
t.cancel()
try:
await t
except asyncio.CancelledError:
pytest.fail("connection left open with unread response")

assert await r.get("bar") == b"bar"
assert await r.ping()
assert await r.get("foo") == b"foo"


@skip_if_server_version_lt("4.0.0")
@pytest.mark.redismod
@pytest.mark.onlynoncluster
Expand Down

0 comments on commit b3c89ac

Please sign in to comment.