Skip to content

Commit

Permalink
Merge pull request #420 from zhangliyong/cli-rq-worker
Browse files Browse the repository at this point in the history
Convert `rqworker` to `rq worker` subcommand
  • Loading branch information
nvie committed Sep 14, 2014
2 parents 45ac484 + 6621105 commit bc6d30e
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 192 deletions.
2 changes: 1 addition & 1 deletion rq/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

# TODO: the following imports can be removed when we drop the `rqinfo` and
# `rqworkers` commands in favor of just shipping the `rq` command.
from .cli import info
from .cli import info, worker
93 changes: 79 additions & 14 deletions rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,28 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import os
import sys

import click
from redis import StrictRedis
from redis.exceptions import ConnectionError

from rq import Connection, get_failed_queue, Queue
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute

from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)

from .helpers import refresh, show_both, show_queues, show_workers

url_option = click.option('--url', '-u', envvar='URL', default='redis://localhost:6379/0',
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')


def connect(url):
return StrictRedis.from_url(url)
return StrictRedis.from_url(url or 'redis://localhost:6379/0')


@click.group()
Expand All @@ -33,8 +39,7 @@ def main():
@url_option
@click.option('--all', '-a', is_flag=True, help='Empty all queues')
@click.argument('queues', nargs=-1)
@click.pass_context
def empty(ctx, url, all, queues):
def empty(url, all, queues):
"""Empty given queues."""
conn = connect(url)

Expand All @@ -55,8 +60,7 @@ def empty(ctx, url, all, queues):
@url_option
@click.option('--all', '-a', is_flag=True, help='Requeue all failed jobs')
@click.argument('job_ids', nargs=-1)
@click.pass_context
def requeue(ctx, url, all, job_ids):
def requeue(url, all, job_ids):
"""Requeue failed jobs."""
conn = connect(url)
failed_queue = get_failed_queue(connection=conn)
Expand Down Expand Up @@ -84,14 +88,13 @@ def requeue(ctx, url, all, job_ids):
@main.command()
@url_option
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', default=None, help='Updates stats every N seconds (default: don\'t poll)') # noqa
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa
@click.option('--interval', '-i', type=float, help='Updates stats every N seconds (default: don\'t poll)')
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts')
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info')
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info')
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue')
@click.argument('queues', nargs=-1)
@click.pass_context
def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, queues):
def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor."""

if path:
Expand All @@ -113,3 +116,65 @@ def info(ctx, url, path, interval, raw, only_queues, only_workers, by_queue, que
except KeyboardInterrupt:
click.echo()
sys.exit(0)


@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
@click.option('--job-class', '-j', default='rq.job.Job', help='RQ Job class to use')
@click.option('--queue-class', default='rq.Queue', help='RQ Queue class to use')
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--results-ttl', help='Default results timeout to be used')
@click.option('--worker-ttl', type=int, help='Default worker timeout to be used')
@click.option('--verbose', '-v', is_flag=True, help='Show more output')
@click.option('--quiet', '-q', is_flag=True, help='Show less output')
@click.option('--sentry-dsn', envvar='SENTRY_DSN', help='Report exceptions to this Sentry DSN')
@click.option('--pid', help='Write the process ID number to a file at the specified path')
@click.argument('queues', nargs=-1)
def worker(url, config, burst, name, worker_class, job_class, queue_class, path, results_ttl, worker_ttl,
verbose, quiet, sentry_dsn, pid, queues):
"""Starts an RQ worker."""

if path:
sys.path = path.split(':') + sys.path

settings = read_config_file(config) if config else {}
# Worker specific default arguments
url = url or settings.get('REDIS_URL')
queues = queues or settings.get('QUEUES', ['default'])
sentry_dsn = sentry_dsn or settings.get('SENTRY_DSN')

if pid:
with open(os.path.expanduser(pid), "w") as fp:
fp.write(str(os.getpid()))

setup_loghandlers_from_args(verbose, quiet)

conn = connect(url)
cleanup_ghosts(conn)
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)

try:
queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
connection=conn,
default_worker_ttl=worker_ttl,
default_result_ttl=results_ttl,
job_class=job_class)

# Should we configure Sentry?
if sentry_dsn:
from raven import Client
from rq.contrib.sentry import register_sentry
client = Client(sentry_dsn)
register_sentry(client, w)

w.work(burst=burst)
except ConnectionError as e:
print(e)
sys.exit(1)
23 changes: 23 additions & 0 deletions rq/cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import importlib
import time
from functools import partial

import click
from rq import Queue, Worker
from rq.logutils import setup_loghandlers

red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow')


def read_config_file(module):
"""Reads all UPPERCASE variables defined in the given module file."""
settings = importlib.import_module(module)
return dict([(k, v)
for k, v in settings.__dict__.items()
if k.upper() == k])


def pad(s, pad_to_length):
"""Pads the given string to the given length."""
return ('%-' + '%ds' % pad_to_length) % (s,)
Expand Down Expand Up @@ -141,3 +151,16 @@ def refresh(interval, func, *args):
time.sleep(interval)
else:
break


def setup_loghandlers_from_args(verbose, quiet):
if verbose and quiet:
raise RuntimeError("Flags --verbose and --quiet are mutually exclusive.")

if verbose:
level = 'DEBUG'
elif quiet:
level = 'WARNING'
else:
level = 'INFO'
setup_loghandlers(level)
6 changes: 3 additions & 3 deletions rq/contrib/legacy.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
logger = logging.getLogger(__name__)


def cleanup_ghosts():
def cleanup_ghosts(conn=None):
"""
RQ versions < 0.3.6 suffered from a race condition where workers, when
abruptly terminated, did not have a chance to clean up their worker
Expand All @@ -21,8 +21,8 @@ def cleanup_ghosts():
This function will clean up any of such legacy ghosted workers.
"""
conn = get_current_connection()
for worker in Worker.all():
conn = conn if conn else get_current_connection()
for worker in Worker.all(connection=conn):
if conn._ttl(worker.key) == -1:
ttl = worker.default_worker_ttl
conn.expire(worker.key, ttl)
Expand Down
1 change: 0 additions & 1 deletion rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import types
import inspect
import warnings
from functools import partial
Expand Down
65 changes: 0 additions & 65 deletions rq/scripts/__init__.py

This file was deleted.

102 changes: 0 additions & 102 deletions rq/scripts/rqworker.py

This file was deleted.

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def get_dependencies():
# NOTE: rqworker/rqinfo are kept for backward-compatibility,
# remove eventually (TODO)
'rqinfo = rq.cli:info',
'rqworker = rq.scripts.rqworker:main', # TODO convert to click subcommand
'rqworker = rq.cli:worker',
],
},
classifiers=[
Expand Down
Loading

0 comments on commit bc6d30e

Please sign in to comment.