Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow passing backend classes from CLI and other APIs #786

Merged
merged 2 commits into from
Feb 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@
.vagrant
Vagrantfile
.idea/
.coverage.*
.coverage*
/.cache
177 changes: 101 additions & 76 deletions rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,81 +5,113 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

from functools import update_wrapper
import os
import sys

import click
from redis import StrictRedis
from redis.exceptions import ConnectionError

from rq import Connection, get_failed_queue, Queue
from rq import Connection, get_failed_queue, __version__ as version
from rq.cli.helpers import (read_config_file, refresh,
setup_loghandlers_from_args,
show_both, show_queues, show_workers, CliConfig)
from rq.contrib.legacy import cleanup_ghosts
from rq.defaults import (DEFAULT_CONNECTION_CLASS, DEFAULT_JOB_CLASS,
DEFAULT_QUEUE_CLASS, DEFAULT_WORKER_CLASS)
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import (suspend as connection_suspend,
resume as connection_resume, is_suspended)

from .helpers import (get_redis_from_config, read_config_file, refresh,
setup_loghandlers_from_args, show_both, show_queues,
show_workers)


# Disable the warning that Click displays (as of Click version 5.0) when users
# use unicode_literals in Python 2.
# See http://click.pocoo.org/dev/python3/#unicode-literals for more details.
click.disable_unicode_literals_warning = True


url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')

config_option = click.option('--config', '-c',
help='Module containing RQ settings.')


def connect(url, config=None, connection_class=StrictRedis):
if url:
return connection_class.from_url(url)

settings = read_config_file(config) if config else {}
return get_redis_from_config(settings, connection_class)
shared_options = [
click.option('--url', '-u',
envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.'),
click.option('--config', '-c',
envvar='RQ_CONFIG',
help='Module containing RQ settings.'),
click.option('--worker-class', '-w',
envvar='RQ_WORKER_CLASS',
default=DEFAULT_WORKER_CLASS,
help='RQ Worker class to use'),
click.option('--job-class', '-j',
envvar='RQ_JOB_CLASS',
default=DEFAULT_JOB_CLASS,
help='RQ Job class to use'),
click.option('--queue-class',
envvar='RQ_QUEUE_CLASS',
default=DEFAULT_QUEUE_CLASS,
help='RQ Queue class to use'),
click.option('--connection-class',
envvar='RQ_CONNECTION_CLASS',
default=DEFAULT_CONNECTION_CLASS,
help='Redis client class to use'),
]


def pass_cli_config(func):
# add all the shared options to the command
for option in shared_options:
func = option(func)

# pass the cli config object into the command
def wrapper(*args, **kwargs):
ctx = click.get_current_context()
cli_config = CliConfig(**kwargs)
return ctx.invoke(func, cli_config, *args[1:], **kwargs)

return update_wrapper(wrapper, func)


@click.group()
@click.version_option(version)
def main():
"""RQ command line tool."""
pass


@main.command()
@url_option
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
def empty(url, all, queues):
@pass_cli_config
def empty(cli_config, all, queues, **options):
"""Empty given queues."""
conn = connect(url)

if all:
queues = Queue.all(connection=conn)
queues = cli_config.queue_class.all(connection=cli_config.connection,
job_class=cli_config.job_class)
else:
queues = [Queue(queue, connection=conn) for queue in queues]
queues = [cli_config.queue_class(queue,
connection=cli_config.connection,
job_class=cli_config.job_class)
for queue in queues]

if not queues:
click.echo('Nothing to do')
sys.exit(0)

for queue in queues:
num_jobs = queue.empty()
click.echo('{0} jobs removed from {1} queue'.format(num_jobs, queue.name))


@main.command()
@url_option
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
def requeue(url, all, job_ids):
@pass_cli_config
def requeue(cli_config, all, job_class, job_ids, **options):
"""Requeue failed jobs."""
conn = connect(url)
failed_queue = get_failed_queue(connection=conn)

failed_queue = get_failed_queue(connection=cli_config.connection,
job_class=cli_config.job_class)

if all:
job_ids = failed_queue.job_ids
Expand All @@ -102,16 +134,16 @@ def requeue(url, all, job_ids):


@main.command()
@url_option
@config_option
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1)
def info(url, config, path, interval, raw, only_queues, only_workers, by_queue, queues):
@pass_cli_config
def info(cli_config, path, interval, raw, only_queues, only_workers, by_queue, queues,
**options):
"""RQ command-line monitor."""

if path:
Expand All @@ -125,8 +157,9 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
func = show_both

try:
with Connection(connect(url, config)):
refresh(interval, func, queues, raw, by_queue)
with Connection(cli_config.connection):
refresh(interval, func, queues, raw, by_queue,
cli_config.queue_class, cli_config.worker_class)
except ConnectionError as e:
click.echo(e)
sys.exit(1)
Expand All @@ -136,14 +169,8 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,


@main.command()
@url_option
@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
@click.option('--connection-class', default='redis.StrictRedis', help='Redis client class to use')
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--results-ttl', type=int, help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
Expand All @@ -153,14 +180,16 @@ def info(url, config, path, interval, raw, only_queues, only_workers, by_queue,
@click.option('--exception-handler', help='Exception handler(s) to use', multiple=True)
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, connection_class, path, results_ttl,
worker_ttl, verbose, quiet, sentry_dsn, exception_handler, pid, queues):
@pass_cli_config
def worker(cli_config, burst, name, path, results_ttl,
worker_ttl, verbose, quiet, sentry_dsn, exception_handler,
pid, queues, **options):
"""Starts an RQ worker."""

if path:
sys.path = path.split(':') + sys.path

settings = read_config_file(config) if config else {}
settings = read_config_file(cli_config.config) if cli_config.config else {}
# Worker specific default arguments
queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')
Expand All @@ -171,57 +200,55 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, conne

setup_loghandlers_from_args(verbose, quiet)

connection_class = import_attribute(connection_class)
conn = connect(url, config, connection_class)
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)
exception_handlers = []
for h in exception_handler:
exception_handlers.append(import_attribute(h))

if is_suspended(conn):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)

try:

queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
connection=conn,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_class=job_class,
queue_class=queue_class,
exception_handlers=exception_handlers or None)
cleanup_ghosts(cli_config.connection)
exception_handlers = []
for h in exception_handler:
exception_handlers.append(import_attribute(h))

if is_suspended(cli_config.connection):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)

queues = [cli_config.queue_class(queue,
connection=cli_config.connection,
job_class=cli_config.job_class)
for queue in queues]
worker = cli_config.worker_class(queues,
name=name,
connection=cli_config.connection,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_class=cli_config.job_class,
queue_class=cli_config.queue_class,
exception_handlers=exception_handlers or None)

# Should we configure Sentry?
if sentry_dsn:
from raven import Client
from raven.transport.http import HTTPTransport
from rq.contrib.sentry import register_sentry
client = Client(sentry_dsn, transport=HTTPTransport)
register_sentry(client, w)
register_sentry(client, worker)

w.work(burst=burst)
worker.work(burst=burst)
except ConnectionError as e:
print(e)
sys.exit(1)


@main.command()
@url_option
@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
def suspend(url, config, duration):
@pass_cli_config
def suspend(cli_config, duration, **options):
"""Suspends all workers, to resume run `rq resume`"""

if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)

connection = connect(url, config)
connection_suspend(connection, duration)
connection_suspend(cli_config.connection, duration)

if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
Expand All @@ -232,10 +259,8 @@ def suspend(url, config, duration):


@main.command()
@url_option
@config_option
def resume(url, config):
@pass_cli_config
def resume(cli_config, **options):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
connection = connect(url, config)
connection_resume(connection)
connection_resume(cli_config.connection)
click.echo("Resuming workers.")
Loading