-
Notifications
You must be signed in to change notification settings - Fork 2.6k
Description
Hi again!
I just updated redis-py from 5.2.1 to 5.3.0 and I noticed that while adding the Dispatchers
there was deadlock introduced.
Summary
When using a client with Retries
enabled and subscribing to a pattern using psubscribe
, a deadlock can occur under certain conditions:
- If a
punsubscribe
is attempted and a disconnection (such as a broken pipe) happens during this process, a reconnection is triggered (via thePubSub._execute
error handling). - During reconnection, the client tries to re-subscribe to the previously subscribed patterns, but this process can result in a deadlock.
Full explanation
There was a new lock introduced before each PubSub
command execution on commit 40e5fc1
, which is part of the 5.3.0 release:
Lines 874 to 875 in 7130e1a
with self._lock: | |
self._execute(connection, connection.send_command, *args, **kwargs) |
When a command execution fails, if Retries
are enabled, there will be a reconnection attempt:
Lines 895 to 916 in 7130e1a
def _reconnect(self, conn) -> None: | |
""" | |
The supported exceptions are already checked in the | |
retry object so we don't need to do it here. | |
In this error handler we are trying to reconnect to the server. | |
""" | |
conn.disconnect() | |
conn.connect() | |
def _execute(self, conn, command, *args, **kwargs): | |
""" | |
Connect manually upon disconnection. If the Redis server is down, | |
this will fail and raise a ConnectionError as desired. | |
After reconnection, the ``on_connect`` callback should have been | |
called by the # connection to resubscribe us to any channels and | |
patterns we were previously listening to | |
""" | |
return conn.retry.call_with_retry( | |
lambda: command(*args, **kwargs), | |
lambda _: self._reconnect(conn), | |
) |
However, when executing the connect
we get to the final lines where the resubscriptions callbacks are triggered for pubsub
:
Lines 406 to 413 in 7130e1a
# run any user callbacks. right now the only internal callback | |
# is for pubsub channel/pattern resubscription | |
# first, remove any dead weakrefs | |
self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()] | |
for ref in self._connect_callbacks: | |
callback = ref() | |
if callback: | |
callback(self) |
This will hit the same lock again a cause a deadlock.
Let me know if you need any other information. 🙂