Skip to content

Commit

Permalink
Merge 37748e5 into 5331dc4
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangliyong committed Aug 22, 2014
2 parents 5331dc4 + 37748e5 commit 9e5c83b
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 81 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
redis
click
83 changes: 83 additions & 0 deletions rq/scripts/rq_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
rq command line tool
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import click
import redis
from rq import get_failed_queue, Queue
from rq.exceptions import InvalidJobOperationError

from .rqinfo import info


@click.group()
@click.option('--url', '-u', envvar='URL',
help='URL describing Redis connection details.')
@click.pass_context
def main(ctx, url):
"""Entrance of RQ CLI"""
if url is None:
url = "redis://localhost:6379/0"
redis_conn = redis.from_url(url)

ctx.obj = {}
ctx.obj['connection'] = redis_conn


@main.command()
@click.option('--yes', '-y', is_flag=True,
help='Empty failed queue by default')
@click.argument('queues', nargs=-1)
@click.pass_context
def empty(ctx, yes, queues):
"""[QUEUES]: queues to empty
\b
$ rq empty
Do you want to empty failed queue? [y/N]: y
2 jobs removed from failed queue
\b
$ rq empty -y
2 jobs removed from failed queue
\b
$ rq empty default high
10 jobs removed from default queue
2 jobs removed from high queue
"""
conn = ctx.obj['connection']
queues = [Queue(queue, connection=conn) for queue in queues]
if not queues:
if yes or click.confirm('Do you want to empty failed queue?',
abort=True):
queues = (get_failed_queue(connection=conn),)
for queue in queues:
num_jobs = queue.empty()
click.echo('{0} jobs removed from {1} queue'.format(
num_jobs, queue.name))


@main.command()
@click.pass_context
def requeue(ctx):
"""Requeue all failed jobs in failed queue"""
conn = ctx.obj['connection']
failed_queue = get_failed_queue(connection=conn)
job_ids = failed_queue.job_ids
click.echo('Requeue failed jobs: {0}'.format(len(job_ids)))
requeue_failed_num = 0
with click.progressbar(job_ids) as job_bar:
for job_id in job_bar:
try:
failed_queue.requeue(job_id)
except InvalidJobOperationError:
requeue_failed_num += 1

click.secho('Requeue failed: {0}'.format(
requeue_failed_num), fg='red')


main.add_command(info)
141 changes: 62 additions & 79 deletions rq/scripts/rqinfo.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

import argparse
import os
import sys
import time
import click
from functools import partial

from redis.exceptions import ConnectionError
from rq import get_failed_queue, Queue, Worker
from rq.scripts import (add_standard_arguments, read_config_file,
setup_default_arguments, setup_redis)
from rq.utils import gettermsize, make_colorizer
from rq import Queue, Worker, Connection

red = make_colorizer('darkred')
green = make_colorizer('darkgreen')
yellow = make_colorizer('darkyellow')

red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
yellow = partial(click.style, fg='yellow')


def pad(s, pad_to_length):
Expand Down Expand Up @@ -44,14 +42,14 @@ def state_symbol(state):
return state


def show_queues(args):
if len(args.queues):
qs = list(map(Queue, args.queues))
def show_queues(queues, raw, by_queue):
if len(queues):
qs = list(map(Queue, queues))
else:
qs = Queue.all()

num_jobs = 0
termwidth, _ = gettermsize()
termwidth, _ = click.get_terminal_size()
chartwidth = min(20, termwidth - 20)

max_count = 0
Expand All @@ -65,23 +63,23 @@ def show_queues(args):

for q in qs:
count = counts[q]
if not args.raw:
if not raw:
chart = green('|' + '█' * int(ratio * count))
line = '%-12s %s %d' % (q.name, chart, count)
else:
line = 'queue %s %d' % (q.name, count)
print(line)
click.echo(line)

num_jobs += count

# Print summary when not in raw mode
if not args.raw:
print('%d queues, %d jobs total' % (len(qs), num_jobs))
# print summary when not in raw mode
if not raw:
click.echo('%d queues, %d jobs total' % (len(qs), num_jobs))


def show_workers(args):
if len(args.queues):
qs = list(map(Queue, args.queues))
def show_workers(queues, raw, by_queue):
if len(queues):
qs = list(map(Queue, queues))

def any_matching_queue(worker):
def queue_matches(q):
Expand All @@ -99,13 +97,13 @@ def filter_queues(queue_names):
ws = Worker.all()
filter_queues = lambda x: x

if not args.by_queue:
if not by_queue:
for w in ws:
worker_queues = filter_queues(w.queue_names())
if not args.raw:
print('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues)))
if not raw:
click.echo('%s %s: %s' % (w.name, state_symbol(w.get_state()), ', '.join(worker_queues)))
else:
print('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues)))
click.echo('worker %s %s %s' % (w.name, w.get_state(), ','.join(worker_queues)))
else:
# Create reverse lookup table
queues = dict([(q, []) for q in qs])
Expand All @@ -121,78 +119,63 @@ def filter_queues(queue_names):
queues_str = ", ".join(sorted(map(lambda w: '%s (%s)' % (w.name, state_symbol(w.get_state())), queues[q]))) # noqa
else:
queues_str = '–'
print('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))
click.echo('%s %s' % (pad(q.name + ':', max_qname + 1), queues_str))

if not args.raw:
print('%d workers, %d queues' % (len(ws), len(qs)))
if not raw:
click.echo('%d workers, %d queues' % (len(ws), len(qs)))


def show_both(args):
show_queues(args)
if not args.raw:
print('')
show_workers(args)
if not args.raw:
print('')
def show_both(queues, raw, by_queue):
show_queues(queues, raw, by_queue)
if not raw:
click.echo('')
show_workers(queues, raw, by_queue)
if not raw:
click.echo('')
import datetime
print('Updated: %s' % datetime.datetime.now())

click.echo('Updated: %s' % datetime.datetime.now())

def parse_args():
parser = argparse.ArgumentParser(description='RQ command-line monitor.')
add_standard_arguments(parser)
parser.add_argument('--path', '-P', default='.', help='Specify the import path.')
parser.add_argument('--interval', '-i', metavar='N', type=float, default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa
parser.add_argument('--raw', '-r', action='store_true', default=False, help='Print only the raw numbers, no bar charts') # noqa
parser.add_argument('--only-queues', '-Q', dest='only_queues', default=False, action='store_true', help='Show only queue info') # noqa
parser.add_argument('--only-workers', '-W', dest='only_workers', default=False, action='store_true', help='Show only worker info') # noqa
parser.add_argument('--by-queue', '-R', dest='by_queue', default=False, action='store_true', help='Shows workers by queue') # noqa
parser.add_argument('--empty-failed-queue', '-X', dest='empty_failed_queue', default=False, action='store_true', help='Empties the failed queue, then quits') # noqa
parser.add_argument('queues', nargs='*', help='The queues to poll')
return parser.parse_args()


def interval(val, func, args):
def refresh(val, func, *args):
while True:
if val and sys.stdout.isatty():
os.system('clear')
func(args)
if val and sys.stdout.isatty():
if val:
click.clear()
func(*args)
if val:
time.sleep(val)
else:
break


def main():
args = parse_args()

if args.path:
sys.path = args.path.split(':') + sys.path
@click.command()
@click.option('--path', '-P', default='.', help='Specify the import path.')
@click.option('--interval', '-i', default=2.5, help='Updates stats every N seconds (default: don\'t poll)') # noqa
@click.option('--raw', '-r', is_flag=True, help='Print only the raw numbers, no bar charts') # noqa
@click.option('--only-queues', '-Q', is_flag=True, help='Show only queue info') # noqa
@click.option('--only-workers', '-W', is_flag=True, help='Show only worker info') # noqa
@click.option('--by-queue', '-R', is_flag=True, help='Shows workers by queue') # noqa
@click.argument('queues', nargs=-1)
@click.pass_context
def info(ctx, path, interval, raw, only_queues, only_workers, by_queue, queues):
"""RQ command-line monitor."""

settings = {}
if args.config:
settings = read_config_file(args.config)

setup_default_arguments(args, settings)

setup_redis(args)
if path:
sys.path = path.split(':') + sys.path

conn = ctx.obj['connection']
try:
if args.empty_failed_queue:
num_jobs = get_failed_queue().empty()
print('{} jobs removed from failed queue'.format(num_jobs))
if only_queues:
func = show_queues
elif only_workers:
func = show_workers
else:
if args.only_queues:
func = show_queues
elif args.only_workers:
func = show_workers
else:
func = show_both
func = show_both

interval(args.interval, func, args)
with Connection(conn):
refresh(interval, func, queues, raw, by_queue)
except ConnectionError as e:
print(e)
click.echo(e)
sys.exit(1)
except KeyboardInterrupt:
print()
click.echo()
sys.exit(0)
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_version():


def get_dependencies():
deps = ['redis >= 2.7.0']
deps = ['redis >= 2.7.0', 'click > 3.0']
if sys.version_info < (2, 7) or \
(sys.version_info >= (3, 0) and sys.version_info < (3, 1)):
deps += ['importlib']
Expand All @@ -43,6 +43,7 @@ def get_dependencies():
install_requires=get_dependencies(),
entry_points='''\
[console_scripts]
rq = rq.scripts.rq_cli:main
rqworker = rq.scripts.rqworker:main
rqinfo = rq.scripts.rqinfo:main
''',
Expand Down
45 changes: 44 additions & 1 deletion tests/test_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,14 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

from click.testing import CliRunner
from rq import get_failed_queue
from rq.compat import is_python_version
from rq.scripts import read_config_file
from rq.job import Job
from rq.scripts import read_config_file, rq_cli

from tests import RQTestCase
from tests.fixtures import div_by_zero

if is_python_version((2, 7), (3, 2)):
from unittest import TestCase
Expand All @@ -16,3 +22,40 @@ def test_config_file(self):
settings = read_config_file("tests.dummy_settings")
self.assertIn("REDIS_HOST", settings)
self.assertEqual(settings['REDIS_HOST'], "testhost.example.com")


class TestRQCli(RQTestCase):
"""Test rq_cli script"""
def setUp(self):
super(TestRQCli, self).setUp()
db_num = self.testconn.connection_pool.connection_kwargs['db']
self.redis_url = 'redis://127.0.0.1:6379/%d' % db_num

job = Job.create(func=div_by_zero, args=(1, 2, 3))
job.origin = 'fake'
job.save()
get_failed_queue().quarantine(job, Exception('Some fake error')) # noqa

def test_empty(self):
"""rq -u <url> empty -y"""
runner = CliRunner()
result = runner.invoke(rq_cli.main,
['-u', self.redis_url, 'empty', "-y"])
self.assertEqual(result.exit_code, 0)
self.assertEqual(result.output, '1 jobs removed from failed queue\n')

def test_requeue(self):
"""rq -u <url> requeue"""
runner = CliRunner()
result = runner.invoke(rq_cli.main, ['-u', self.redis_url, 'requeue'])
self.assertEqual(result.exit_code, 0)
self.assertIn('Requeue failed jobs: 1', result.output)
self.assertIn('Requeue failed: 0', result.output)

def test_info(self):
"""rq -u <url> info -i 0"""
runner = CliRunner()
result = runner.invoke(rq_cli.main,
['-u', self.redis_url, 'info', '-i 0'])
self.assertEqual(result.exit_code, 0)
self.assertIn('1 queues, 1 jobs total', result.output)

0 comments on commit 9e5c83b

Please sign in to comment.