Permalink
Browse files

Allow passing backend classes (job, queue, worker, connection) from C…

…LI and other APIs

This includes:

- a partial refactor of the CLI to organize the shared options
- extends the tests in areas where passing custom backend classes makes sense
- allow setting the core CLI options as env vars
- minor cosmetic changes here and there
  • Loading branch information...
jezdez committed Jan 23, 2017
1 parent 27e4f3a commit c01966243086336b2d934b7ca65401db8d317107
Showing with 492 additions and 207 deletions.
  1. +1 −1 .gitignore
  2. +101 −76 rq/cli/cli.py
  3. +55 −13 rq/cli/helpers.py
  4. +8 −2 rq/decorators.py
  5. +4 −0 rq/defaults.py
  6. +31 −23 rq/job.py
  7. +36 −21 rq/queue.py
  8. +14 −6 rq/registry.py
  9. +10 −0 rq/utils.py
  10. +45 −49 rq/worker.py
  11. +71 −9 tests/test_cli.py
  12. +37 −0 tests/test_decorator.py
  13. +29 −5 tests/test_job.py
  14. +38 −1 tests/test_queue.py
  15. +11 −0 tests/test_registry.py
  16. +1 −1 tests/test_worker.py
View
@@ -11,5 +11,5 @@
.vagrant
Vagrantfile
.idea/
-.coverage.*
+.coverage*
/.cache
View
@@ -5,81 +5,113 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)
+from functools import update_wrapper
import os
import sys
import click
-from redis import StrictRedis
from redis.exceptions import ConnectionError
-from rq import Connection, get_failed_queue, Queue
+from rq import Connection, get_failed_queue, __version__ as version
+from rq.cli.helpers import (read_config_file, refresh,
+ setup_loghandlers_from_args,
+ show_both, show_queues, show_workers, CliConfig)
from rq.contrib.legacy import cleanup_ghosts
+from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
+ DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)
-from .helpers import (get_redis_from_config, read_config_file, refresh,
- setup_loghandlers_from_args, show_both, show_queues,
- show_workers)
-
# Disable the warning that Click displays (as of Click version 5.0) when users
# use unicode_literals in Python 2.
# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
click.disable_unicode_literals_warning = True
-url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
- help='URL describing Redis connection details.')
-
-config_option = click.option('--config', '-c',
- help='Module containing RQ settings.')
-
-
-def connect(url, config=None, connection_class=StrictRedis):
- if url:
- return connection_class.from_url(url)
-
- settings = read_config_file(config) if config else {}
- return get_redis_from_config(settings, connection_class)
+shared_options = [
+ click.option('--url', '-u',
+ envvar='RQ_REDIS_URL',
+ help='URL describing Redis connection details.'),
+ click.option('--config', '-c',
+ envvar='RQ_CONFIG',
+ help='Module containing RQ settings.'),
+ click.option('--worker-class', '-w',
+ envvar='RQ_WORKER_CLASS',
+ default=DEFAULT_WORKER_CLASS,
+ help='RQ Worker class to use'),
+ click.option('--job-class', '-j',
+ envvar='RQ_JOB_CLASS',
+ default=DEFAULT_JOB_CLASS,
+ help='RQ Job class to use'),
+ click.option('--queue-class',
+ envvar='RQ_QUEUE_CLASS',
+ default=DEFAULT_QUEUE_CLASS,
+ help='RQ Queue class to use'),
+ click.option('--connection-class',
+ envvar='RQ_CONNECTION_CLASS',
+ default=DEFAULT_CONNECTION_CLASS,
+ help='Redis client class to use'),
+]
+
+
+def pass_cli_config(func):
+ # add all the shared options to the command
+ for option in shared_options:
+ func = option(func)
+
+ # pass the cli config object into the command
+ def wrapper(*args, **kwargs):
+ ctx = click.get_current_context()
+ cli_config = CliConfig(**kwargs)
+ return ctx.invoke(func, cli_config, *args[1:], **kwargs)
+
+ return update_wrapper(wrapper, func)
@click.group()
+@click.version_option(version)
def main():
"""RQ command line tool."""
pass
@main.command()
-@url_option
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
-def empty(url, all, queues):
+@pass_cli_config
+def empty(cli_config, all, queues, **options):
"""Empty given queues."""
- conn = connect(url)
if all:
- queues = Queue.all(connection=conn)
+ queues = cli_config.queue_class.all(connection=cli_config.connection,
+ job_class=cli_config.job_class)
else:
- queues = [Queue(queue, connection=conn) for queue in queues]
+ queues = [cli_config.queue_class(queue,
+ connection=cli_config.connection,
+ job_class=cli_config.job_class)
+ for queue in queues]
if not queues:
click.echo('Nothing to do')
+ sys.exit(0)
for queue in queues:
num_jobs = queue.empty()
click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))
@main.command()
-@url_option
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
-def requeue(url, all, job_ids):
+@pass_cli_config
+def requeue(cli_config, all, job_class, job_ids, **options):
"""Requeue failed jobs."""
- conn = connect(url)
- failed_queue = get_failed_queue(connection=conn)
+
+ failed_queue = get_failed_queue(connection=cli_config.connection,
+ job_class=cli_config.job_class)
if all:
job_ids = failed_queue.job_ids
@@ -102,16 +134,16 @@ def requeue(url, all, job_ids):
@main.command()
-@url_option
-@config_option
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1)
-def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, queues):
+@pass_cli_config
+def info(cli_config, path, interval, raw, only_queues, only_workers, by_queue, queues,
+ **options):
"""RQ command-line monitor."""
if path:
@@ -125,8 +157,9 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
func = show_both
try:
- with Connection(connect(url, config)):
- refresh(interval, func, queues, raw, by_queue)
+ with Connection(cli_config.connection):
+ refresh(interval, func, queues, raw, by_queue,
+ cli_config.queue_class, cli_config.worker_class)
except ConnectionError as e:
click.echo(e)
sys.exit(1)
@@ -136,14 +169,8 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
@main.command()
-@url_option
-@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
-@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
-@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
-@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
-@click.option('--connection-class', default='redis.StrictRedis', help='Redis client class to use')
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--results-ttl', type=int, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
@@ -153,14 +180,16 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1)
-def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl,
- worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues):
+@pass_cli_config
+def worker(cli_config, burst, name, path, results_ttl,
+ worker_ttl, verbose, quiet, sentry_dsn, exception_handler,
+ pid, queues, **options):
"""Starts an RQ worker."""
if path:
sys.path = path.split(':') + sys.path
- settings = read_config_file(config) if config else {}
+ settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
@@ -171,57 +200,55 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, conne
setup_loghandlers_from_args(verbose, quiet)
- connection_class = import_attribute(connection_class)
- conn = connect(url, config, connection_class)
- cleanup_ghosts(conn)
- worker_class = import_attribute(worker_class)
- queue_class = import_attribute(queue_class)
- exception_handlers = []
- for h in exception_handler:
- exception_handlers.append(import_attribute(h))
-
- if is_suspended(conn):
- click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
- sys.exit(1)
-
try:
- queues = [queue_class(queue, connection=conn) for queue in queues]
- w = worker_class(queues,
- name=name,
- connection=conn,
- default_worker_ttl=worker_ttl,
- default_result_ttl=results_ttl,
- job_class=job_class,
- queue_class=queue_class,
- exception_handlers=exception_handlers or None)
+ cleanup_ghosts(cli_config.connection)
+ exception_handlers = []
+ for h in exception_handler:
+ exception_handlers.append(import_attribute(h))
+
+ if is_suspended(cli_config.connection):
+ click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
+ sys.exit(1)
+
+ queues = [cli_config.queue_class(queue,
+ connection=cli_config.connection,
+ job_class=cli_config.job_class)
+ for queue in queues]
+ worker = cli_config.worker_class(queues,
+ name=name,
+ connection=cli_config.connection,
+ default_worker_ttl=worker_ttl,
+ default_result_ttl=results_ttl,
+ job_class=cli_config.job_class,
+ queue_class=cli_config.queue_class,
+ exception_handlers=exception_handlers or None)
# Should we configure Sentry?
if sentry_dsn:
from raven import Client
from raven.transport.http import HTTPTransport
from rq.contrib.sentry import register_sentry
client = Client(sentry_dsn, transport=HTTPTransport)
- register_sentry(client, w)
+ register_sentry(client, worker)
- w.work(burst=burst)
+ worker.work(burst=burst)
except ConnectionError as e:
print(e)
sys.exit(1)
@main.command()
-@url_option
-@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
-def suspend(url, config, duration):
+@pass_cli_config
+def suspend(cli_config, duration, **options):
"""Suspends all workers, to resume run `rq resume`"""
+
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)
- connection = connect(url, config)
- connection_suspend(connection, duration)
+ connection_suspend(cli_config.connection, duration)
if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
@@ -232,10 +259,8 @@ def suspend(url, config, duration):
@main.command()
-@url_option
-@config_option
-def resume(url, config):
+@pass_cli_config
+def resume(cli_config, **options):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
- connection = connect(url, config)
- connection_resume(connection)
+ connection_resume(cli_config.connection)
click.echo("Resuming workers.")
Oops, something went wrong.

0 comments on commit c019662

Please sign in to comment.