Skip to content

Commit

Permalink
Merge 162a02f into 2091c60
Browse files Browse the repository at this point in the history
  • Loading branch information
jtushman committed Dec 2, 2014
2 parents 2091c60 + 162a02f commit 885d18d
Show file tree
Hide file tree
Showing 12 changed files with 237 additions and 60 deletions.
Binary file added redis-2.7.0-py2.7.egg
Binary file not shown.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
redis
click
redis==2.7.0
click>=3.0.0
45 changes: 43 additions & 2 deletions rq/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended

from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
Expand All @@ -24,8 +25,12 @@
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')

config_option = click.option('--config', '-c', help='Module containing RQ settings.')

def connect(url):

def connect(url, config=None):
settings = read_config_file(config) if config else {}
url = url or settings.get('REDIS_URL')
return StrictRedis.from_url(url or 'redis://localhost:6379/0')


Expand Down Expand Up @@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):

@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
Expand Down Expand Up @@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)

if is_suspended(conn):
click.secho("The worker has been paused, run reset_paused", fg='red')
sys.exit(1)

try:

queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
Expand All @@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
except ConnectionError as e:
print(e)
sys.exit(1)


@main.command()
@url_option
@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
def suspend(url, config, duration):
"""Suspends all workers, to resume run `rq resume`"""
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)

connection = connect(url, config)
connection_suspend(connection, duration)

if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(duration)
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")


@main.command()
@url_option
@config_option
def resume(url, config):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
connection = connect(url, config)
connection_resume(connection)
click.echo("Resuming workers.")
7 changes: 5 additions & 2 deletions rq/cli/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@

import click
from rq import Queue, Worker
from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers
from rq.suspension import is_suspended

red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
Expand Down Expand Up @@ -39,8 +41,9 @@ def get_scale(x):

def state_symbol(state):
symbols = {
'busy': red('busy'),
'idle': green('idle'),
WorkerStatus.BUSY: red('busy'),
WorkerStatus.IDLE: green('idle'),
WorkerStatus.SUSPENDED: yellow('suspended'),
}
try:
return symbols[state]
Expand Down
21 changes: 6 additions & 15 deletions rq/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse
from .utils import import_attribute, utcformat, utcnow, utcparse, enum

try:
import cPickle as pickle
Expand All @@ -25,16 +25,7 @@
loads = pickle.loads


def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)

# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)

Status = enum('Status',
JobStatus = enum('JobStatus',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')

Expand Down Expand Up @@ -166,19 +157,19 @@ def _set_status(self, status):

@property
def is_finished(self):
return self.get_status() == Status.FINISHED
return self.get_status() == JobStatus.FINISHED

@property
def is_queued(self):
return self.get_status() == Status.QUEUED
return self.get_status() == JobStatus.QUEUED

@property
def is_failed(self):
return self.get_status() == Status.FAILED
return self.get_status() == JobStatus.FAILED

@property
def is_started(self):
return self.get_status() == Status.STARTED
return self.get_status() == JobStatus.STARTED

@property
def dependency(self):
Expand Down
10 changes: 5 additions & 5 deletions rq/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import uuid

from .connections import resolve_connection
from .job import Job, Status
from .job import Job, JobStatus
from .utils import import_attribute, utcnow

from .exceptions import (DequeueTimeout, InvalidJobOperationError,
Expand Down Expand Up @@ -180,7 +180,7 @@ def enqueue_call(self, func, args=None, kwargs=None, timeout=None,

# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, status=Status.QUEUED,
result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)

Expand All @@ -195,7 +195,7 @@ def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
while True:
try:
pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED:
if depends_on.get_status() != JobStatus.FINISHED:
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
pipe.execute()
Expand Down Expand Up @@ -390,7 +390,7 @@ def __str__(self):

class FailedQueue(Queue):
def __init__(self, connection=None):
super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)

def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
Expand All @@ -417,7 +417,7 @@ def requeue(self, job_id):
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')

job.set_status(Status.QUEUED)
job.set_status(JobStatus.QUEUED)
job.exc_info = None
q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job)
18 changes: 18 additions & 0 deletions rq/suspension.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
WORKERS_SUSPENDED = 'rq:suspended'


def is_suspended(connection):
return connection.exists(WORKERS_SUSPENDED)


def suspend(connection, ttl=None):
"""ttl = time to live in seconds. Default is no expiration
Note: If you pass in 0 it will invalidate right away
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:
connection.expire(WORKERS_SUSPENDED, ttl)


def resume(connection):
return connection.delete(WORKERS_SUSPENDED)
10 changes: 10 additions & 0 deletions rq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,3 +208,13 @@ def first(iterable, default=None, key=None):
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())


def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)

# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)

0 comments on commit 885d18d

Please sign in to comment.