Skip to content

Commit

Permalink
Merge pull request #26 from mikemill/cli_list_jobs
Browse files Browse the repository at this point in the history
Add a CLI command to list all the jobs
  • Loading branch information
mikemill committed Jul 31, 2017
2 parents 04199af + 719e28d commit a5c0989
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 5 deletions.
34 changes: 34 additions & 0 deletions rq_retry_scheduler/cli/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from __future__ import print_function

import click
from datetime import datetime
from functools import partial
import logging
from rq.cli import cli as rqcli
from rq.cli import helpers
Expand All @@ -27,6 +29,7 @@ def main():
type=click.Choice([
'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']))
def run(url, config, burst, interval, loglevel):
"""Run the RQ Retry Scheduler"""
conn = rqcli.connect(url, config)
setup_logging(loglevel)
scheduler = Scheduler(connection=conn, interval=interval)
Expand All @@ -39,6 +42,7 @@ def run(url, config, burst, interval, loglevel):
@click.option('--rq/--no-rq', default=True, help="Show RQ info data")
@click.pass_context
def info(ctx, url, config, rq):
"""Get information about the RQ Schedule"""
conn = rqcli.connect(url, config)

if rq:
Expand All @@ -64,6 +68,36 @@ def info(ctx, url, config, rq):
click.echo("Next job to be queued at: {:s}".format(str(next_job)))


@main.command()
@rqcli.url_option
@rqcli.config_option
def list(url, config):
"""List out all the scheduled jobs"""
conn = rqcli.connect(url, config)
scheduler = Scheduler(connection=conn)

now = datetime.utcnow()

jobs = scheduler.schedule(fetch_jobs=True)

queue_name_length = max(len(job.origin) for job, _ in jobs)

cyan = partial(click.style, fg='cyan')

width = queue_name_length + len(cyan(''))

for job, time in jobs:
if time <= now:
color = helpers.red
else:
color = helpers.green

line = '{:s} {:s} {: <{width}}\t{:s}'.format(
color(str(time)), job.id,
cyan(job.origin), job.description, width=width)
click.echo(line)


main.add_command(rqcli.main, name='rq')


Expand Down
13 changes: 9 additions & 4 deletions rq_retry_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,19 @@ def enqueue_jobs(self):
queue.enqueue_job(job)
self.remove_job(job.id)

def schedule(self):
def schedule(self, fetch_jobs=False):
"""Returns the job ids and when they are scheduled to be queued"""
data = self.connection.zrange(
self.scheduler_jobs_key, 0, -1, withscores=True)

return [
(job_id, datetime.utcfromtimestamp(ts))
for job_id, ts in data]
if fetch_jobs:
return [
(self.get_job(job_id), datetime.utcfromtimestamp(ts))
for job_id, ts in data]
else:
return [
(job_id, datetime.utcfromtimestamp(ts))
for job_id, ts in data]

def run(self, burst=False):
self.log.info('Starting RQ Retry Scheduler..')
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def queue2(connection):

@pytest.yield_fixture
def scheduler(connection):
s = Scheduler('unittest', connection=connection)
s = Scheduler(connection=connection)
try:
yield s
finally:
Expand Down
19 changes: 19 additions & 0 deletions tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,22 @@ def test_schedule(scheduler, mock, connection):
expected = [(job_id, dt)]

assert scheduler.schedule() == expected


def test_schedule_fetch_job(scheduler, mock, connection):
assert scheduler.schedule() == []

dt = datetime.utcnow().replace(microsecond=0)
scheduler.current_time = lambda: dt

job_id = b'unittest'

ret = [(job_id, to_unix(dt))]
mock.patch.object(connection, 'zrange', return_value=ret)

job = Job(connection=connection)
mock.patch.object(Job, 'fetch', return_value=job)

expected = [(job, dt)]

assert scheduler.schedule(fetch_jobs=True) == expected

0 comments on commit a5c0989

Please sign in to comment.