Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop calling deprecated Connection functions push_connection, pop_connection, resolve_connection, parse_connection #1949

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/docs/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ backend, even when outside of the connection context.

### Pushing/popping connections

deprecated, call `connection` explicitly instead.

If your code does not allow you to use a `with` statement, for example, if you
want to use this to set up a unit test, you can use the `push_connection()` and
`pop_connection()` methods instead of using the context manager.
Expand Down
6 changes: 3 additions & 3 deletions rq/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,11 @@ def Connection(connection: Optional['Redis'] = None): # noqa
)
if connection is None:
connection = Redis()
push_connection(connection)
connection or LocalStack().push(connection) or push_connection(connection)
try:
yield
finally:
popped = pop_connection()
popped = LocalStack().pop() or pop_connection()
assert (
popped == connection
), 'Unexpected Redis connection was popped off the stack. Check your Redis connection setup.'
Expand Down Expand Up @@ -112,7 +112,7 @@ def resolve_connection(connection: Optional['Redis'] = None) -> 'Redis':
if connection is not None:
return connection

connection = get_current_connection()
connection = LocalStack().top() or get_current_connection()
if connection is None:
raise NoRedisConnectionException('Could not resolve a Redis connection')
return connection
Expand Down
3 changes: 2 additions & 1 deletion rq/contrib/legacy.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from rq import Worker, get_current_connection
from rq.local import LocalStack

logger = logging.getLogger(__name__)

Expand All @@ -15,7 +16,7 @@ def cleanup_ghosts(conn=None):

This function will clean up any of such legacy ghosted workers.
"""
conn = conn if conn else get_current_connection()
conn = conn if conn else LocalStack().top() or get_current_connection()
for worker in Worker.all(connection=conn):
if conn.ttl(worker.key) == -1:
ttl = worker.worker_ttl
Expand Down
12 changes: 7 additions & 5 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Type, Union
from uuid import uuid4

from .local import LocalStack

if TYPE_CHECKING:
try:
from resource import struct_rusage
Expand Down Expand Up @@ -240,7 +242,7 @@ def all(
if queue:
connection = queue.connection
elif connection is None:
connection = get_current_connection()
connection = LocalStack().top() or get_current_connection()

worker_keys = worker_registration.get_keys(queue=queue, connection=connection)
workers = [
Expand Down Expand Up @@ -295,7 +297,7 @@ def _set_connection(self, connection: Optional['Redis']) -> 'Redis':
connection (Optional[Redis]): The Redis Connection.
"""
if connection is None:
connection = get_current_connection()
connection = LocalStack().top() or get_current_connection()
current_socket_timeout = connection.connection_pool.connection_kwargs.get("socket_timeout")
if current_socket_timeout is None:
timeout_config = {"socket_timeout": self.connection_timeout}
Expand Down Expand Up @@ -744,7 +746,7 @@ def find_by_key(
raise ValueError('Not a valid RQ worker key: %s' % worker_key)

if connection is None:
connection = get_current_connection()
connection = LocalStack().top() or get_current_connection()
if not connection.exists(worker_key):
connection.srem(cls.redis_workers_keys, worker_key)
return None
Expand Down Expand Up @@ -1413,7 +1415,7 @@ def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
Returns:
bool: True after finished.
"""
push_connection(self.connection)
self.connection or LocalStack().push(self.connection) or push_connection(self.connection)
started_job_registry = queue.started_job_registry
self.log.debug('Started Job Registry set.')

Expand Down Expand Up @@ -1458,7 +1460,7 @@ def perform_job(self, job: 'Job', queue: 'Queue') -> bool:
return False

finally:
pop_connection()
LocalStack().pop() or pop_connection()

self.log.info('%s: %s (%s)', green(job.origin), blue('Job OK'), job.id)
if rv is not None:
Expand Down
5 changes: 3 additions & 2 deletions tests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from redis import Redis

from rq import pop_connection, push_connection
from rq.local import LocalStack


def find_empty_redis_database(ssl=False):
Expand Down Expand Up @@ -77,7 +78,7 @@ class RQTestCase(unittest.TestCase):
def setUpClass(cls):
# Set up connection to Redis
testconn = find_empty_redis_database()
push_connection(testconn)
testconn or LocalStack().push(testconn) or push_connection(testconn)

# Store the connection (for sanity checking)
cls.testconn = testconn
Expand Down Expand Up @@ -105,7 +106,7 @@ def tearDownClass(cls):
logging.disable(logging.NOTSET)

# Pop the connection to Redis
testconn = pop_connection()
testconn = LocalStack().pop() or pop_connection()
assert (
testconn == cls.testconn
), 'Wow, something really nasty happened to the Redis connection stack. Check your setup.'
7 changes: 4 additions & 3 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from rq.command import send_kill_horse_command, send_shutdown_command
from rq.decorators import job
from rq.job import Job
from rq.local import LocalStack
from rq.worker import HerokuWorker, Worker


Expand Down Expand Up @@ -78,7 +79,7 @@ def rpush(key, value, append_worker_name=False, sleep=0):
time.sleep(sleep)
if append_worker_name:
value += ':' + get_current_job().worker_name
redis = get_current_connection()
redis = LocalStack().top() or get_current_connection()
redis.rpush(key, value)


Expand Down Expand Up @@ -111,7 +112,7 @@ def launch_process_within_worker_and_store_pid(path, timeout):


def access_self():
assert get_current_connection() is not None
assert LocalStack().top() or get_current_connection() is not None
assert get_current_job() is not None


Expand Down Expand Up @@ -253,7 +254,7 @@ def start_worker_process(queue_name, connection=None, worker_name=None, burst=Fa
"""
Use multiprocessing to start a new worker in a separate process.
"""
connection = connection or get_current_connection()
connection = connection or LocalStack().top() or get_current_connection()
conn_kwargs = connection.connection_pool.connection_kwargs
p = Process(target=start_worker, args=(queue_name, conn_kwargs, worker_name, burst))
p.start()
Expand Down
3 changes: 2 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from rq import Queue, SimpleWorker, Worker, get_current_connection
from rq.defaults import DEFAULT_MAINTENANCE_TASK_INTERVAL
from rq.job import Job, JobStatus, Retry
from rq.local import LocalStack
from rq.registry import FailedJobRegistry, FinishedJobRegistry, StartedJobRegistry
from rq.results import Result
from rq.serializers import JSONSerializer
Expand Down Expand Up @@ -1360,7 +1361,7 @@ def test_work_horse_force_death(self):


def schedule_access_self():
q = Queue('default', connection=get_current_connection())
q = Queue('default', connection=LocalStack().top() or get_current_connection())
q.enqueue(access_self)


Expand Down