Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

triggering shutdown by setting a redis flag #434

Merged
merged 4 commits into from Jan 12, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions .gitignore
Expand Up @@ -10,6 +10,3 @@
.tox
.vagrant
Vagrantfile

# PyCharm
.idea
4 changes: 2 additions & 2 deletions requirements.txt
@@ -1,2 +1,2 @@
redis
click
redis==2.7.0
click>=3.0.0
45 changes: 43 additions & 2 deletions rq/cli/cli.py
Expand Up @@ -16,6 +16,7 @@
from rq.contrib.legacy import cleanup_ghosts
from rq.exceptions import InvalidJobOperationError
from rq.utils import import_attribute
from rq.suspension import suspend as connection_suspend, resume as connection_resume, is_suspended

from .helpers import (read_config_file, refresh, setup_loghandlers_from_args,
show_both, show_queues, show_workers)
Expand All @@ -24,8 +25,12 @@
url_option = click.option('--url', '-u', envvar='RQ_REDIS_URL',
help='URL describing Redis connection details.')

config_option = click.option('--config', '-c', help='Module containing RQ settings.')

def connect(url):

def connect(url, config=None):
settings = read_config_file(config) if config else {}
url = url or settings.get('REDIS_URL')
return StrictRedis.from_url(url or 'redis://localhost:6379/0')


Expand Down Expand Up @@ -120,7 +125,7 @@ def info(url, path, interval, raw, only_queues, only_workers, by_queue, queues):

@main.command()
@url_option
@click.option('--config', '-c', help='Module containing RQ settings.')
@config_option
@click.option('--burst', '-b', is_flag=True, help='Run in burst mode (quit after all work is done)')
@click.option('--name', '-n', help='Specify a different name')
@click.option('--worker-class', '-w', default='rq.Worker', help='RQ Worker class to use')
Expand Down Expand Up @@ -158,7 +163,12 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
worker_class = import_attribute(worker_class)
queue_class = import_attribute(queue_class)

if is_suspended(conn):
click.secho('RQ is currently suspended, to resume job execution run "rq resume"', fg='red')
sys.exit(1)

try:

queues = [queue_class(queue, connection=conn) for queue in queues]
w = worker_class(queues,
name=name,
Expand All @@ -178,3 +188,34 @@ def worker(url, config, burst, name, worker_class, job_class, queue_class, path,
except ConnectionError as e:
print(e)
sys.exit(1)


@main.command()
@url_option
@config_option
@click.option('--duration', help='Seconds you want the workers to be suspended. Default is forever.', type=int)
def suspend(url, config, duration):
"""Suspends all workers, to resume run `rq resume`"""
if duration is not None and duration < 1:
click.echo("Duration must be an integer greater than 1")
sys.exit(1)

connection = connect(url, config)
connection_suspend(connection, duration)

if duration:
msg = """Suspending workers for {0} seconds. No new jobs will be started during that time, but then will
automatically resume""".format(duration)
click.echo(msg)
else:
click.echo("Suspending workers. No new jobs will be started. But current jobs will be completed")


@main.command()
@url_option
@config_option
def resume(url, config):
"""Resumes processing of queues, that where suspended with `rq suspend`"""
connection = connect(url, config)
connection_resume(connection)
click.echo("Resuming workers.")
7 changes: 5 additions & 2 deletions rq/cli/helpers.py
Expand Up @@ -8,7 +8,9 @@

import click
from rq import Queue, Worker
from rq.worker import WorkerStatus
from rq.logutils import setup_loghandlers
from rq.suspension import is_suspended

red = partial(click.style, fg='red')
green = partial(click.style, fg='green')
Expand Down Expand Up @@ -39,8 +41,9 @@ def get_scale(x):

def state_symbol(state):
symbols = {
'busy': red('busy'),
'idle': green('idle'),
WorkerStatus.BUSY: red('busy'),
WorkerStatus.IDLE: green('idle'),
WorkerStatus.SUSPENDED: yellow('suspended'),
}
try:
return symbols[state]
Expand Down
21 changes: 6 additions & 15 deletions rq/job.py
Expand Up @@ -12,7 +12,7 @@
from .connections import resolve_connection
from .exceptions import NoSuchJobError, UnpickleError
from .local import LocalStack
from .utils import import_attribute, utcformat, utcnow, utcparse
from .utils import import_attribute, utcformat, utcnow, utcparse, enum

try:
import cPickle as pickle
Expand All @@ -25,16 +25,7 @@
loads = pickle.loads


def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)

# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)

Status = enum('Status',
JobStatus = enum('JobStatus',
QUEUED='queued', FINISHED='finished', FAILED='failed',
STARTED='started')

Expand Down Expand Up @@ -167,19 +158,19 @@ def _set_status(self, status):

@property
def is_finished(self):
return self.get_status() == Status.FINISHED
return self.get_status() == JobStatus.FINISHED

@property
def is_queued(self):
return self.get_status() == Status.QUEUED
return self.get_status() == JobStatus.QUEUED

@property
def is_failed(self):
return self.get_status() == Status.FAILED
return self.get_status() == JobStatus.FAILED

@property
def is_started(self):
return self.get_status() == Status.STARTED
return self.get_status() == JobStatus.STARTED

@property
def dependency(self):
Expand Down
10 changes: 5 additions & 5 deletions rq/queue.py
Expand Up @@ -5,7 +5,7 @@
import uuid

from .connections import resolve_connection
from .job import Job, Status
from .job import Job, JobStatus
from .utils import import_attribute, utcnow

from .exceptions import (DequeueTimeout, InvalidJobOperationError,
Expand Down Expand Up @@ -180,7 +180,7 @@ def enqueue_call(self, func, args=None, kwargs=None, timeout=None,

# TODO: job with dependency shouldn't have "queued" as status
job = self.job_class.create(func, args, kwargs, connection=self.connection,
result_ttl=result_ttl, ttl=ttl, status=Status.QUEUED,
result_ttl=result_ttl, status=JobStatus.QUEUED,
description=description, depends_on=depends_on, timeout=timeout,
id=job_id)

Expand All @@ -195,7 +195,7 @@ def enqueue_call(self, func, args=None, kwargs=None, timeout=None,
while True:
try:
pipe.watch(depends_on.key)
if depends_on.get_status() != Status.FINISHED:
if depends_on.get_status() != JobStatus.FINISHED:
job.register_dependency(pipeline=pipe)
job.save(pipeline=pipe)
pipe.execute()
Expand Down Expand Up @@ -391,7 +391,7 @@ def __str__(self):

class FailedQueue(Queue):
def __init__(self, connection=None):
super(FailedQueue, self).__init__(Status.FAILED, connection=connection)
super(FailedQueue, self).__init__(JobStatus.FAILED, connection=connection)

def quarantine(self, job, exc_info):
"""Puts the given Job in quarantine (i.e. put it on the failed
Expand All @@ -418,7 +418,7 @@ def requeue(self, job_id):
if self.remove(job) == 0:
raise InvalidJobOperationError('Cannot requeue non-failed jobs.')

job.set_status(Status.QUEUED)
job.set_status(JobStatus.QUEUED)
job.exc_info = None
q = Queue(job.origin, connection=self.connection)
q.enqueue_job(job)
18 changes: 18 additions & 0 deletions rq/suspension.py
@@ -0,0 +1,18 @@
WORKERS_SUSPENDED = 'rq:suspended'


def is_suspended(connection):
return connection.exists(WORKERS_SUSPENDED)


def suspend(connection, ttl=None):
"""ttl = time to live in seconds. Default is no expiration
Note: If you pass in 0 it will invalidate right away
"""
connection.set(WORKERS_SUSPENDED, 1)
if ttl is not None:
connection.expire(WORKERS_SUSPENDED, ttl)


def resume(connection):
return connection.delete(WORKERS_SUSPENDED)
10 changes: 10 additions & 0 deletions rq/utils.py
Expand Up @@ -208,3 +208,13 @@ def first(iterable, default=None, key=None):
def current_timestamp():
"""Returns current UTC timestamp"""
return calendar.timegm(datetime.datetime.utcnow().utctimetuple())


def enum(name, *sequential, **named):
values = dict(zip(sequential, range(len(sequential))), **named)

# NOTE: Yes, we *really* want to cast using str() here.
# On Python 2 type() requires a byte string (which is str() on Python 2).
# On Python 3 it does not matter, so we'll use str(), which acts as
# a no-op.
return type(str(name), (), values)