Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add posibility to use RedisCluster as connection Class #2030

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Tuple, Type, Union

from redis import WatchError
from redis.exceptions import RedisClusterException

from .timeouts import BaseDeathPenalty, UnixSignalDeathPenalty

Expand Down Expand Up @@ -1371,7 +1372,11 @@ def dequeue_any(
while True:
queue_keys = [q.key for q in queues]
if len(queue_keys) == 1 and get_version(connection) >= (6, 2, 0):
result = cls.lmove(connection, queue_keys[0], timeout)
try:
result = cls.lmove(connection, queue_keys[0], timeout)
# If you using redis Cluster you can only use lpop
except RedisClusterException:
result = cls.lpop(queue_keys, timeout, connection=connection)
else:
result = cls.lpop(queue_keys, timeout, connection=connection)
if result is None:
Expand Down
20 changes: 15 additions & 5 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,21 @@ def _set_connection(self, connection: 'Redis') -> 'Redis':
Args:
connection (Optional[Redis]): The Redis Connection.
"""
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
try:
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
connection.connection_pool.connection_kwargs.update(timeout_config)
return connection
Comment on lines +431 to +435
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd also need to test this so test coverage doesn't go down. An easy way to do this would be to move this logic to a separate setup_connection() and pass both Redis and RedisCluster instances into it so we can easily test this.

# If you are using RedisCluster you needs to pars all cluster nodes.
except AttributeError:
nodes = connection.get_nodes()
for node in nodes:
current_socket_timeout = node.redis_connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
node.redis_connection.connection_pool.connection_kwargs.update(timeout_config)
return connection

@property
def dequeue_timeout(self) -> int:
Expand Down
2 changes: 0 additions & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,9 @@ class RQTestCase(unittest.TestCase):
def setUpClass(cls):
# Set up connection to Redis
testconn = find_empty_redis_database()

# Store the connection (for sanity checking)
cls.testconn = testconn
cls.connection = testconn

# Shut up logging
logging.disable(logging.ERROR)

Expand Down
Loading