Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker automatically cleans job registries every hour #534

Merged
merged 3 commits into from
May 28, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions rq/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@

class BaseRegistry(object):
"""
Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp).
Base implementation of a job registry, implemented in Redis sorted set.
Each job is stored as a key in the registry, scored by expiration time
(unix timestamp).
"""

def __init__(self, name='default', connection=None):
Expand Down Expand Up @@ -134,3 +135,11 @@ def cleanup(self):
automatically called by `count()` and `get_job_ids()` methods
implemented in BaseRegistry."""
pass


def clean_registries(queue):
"""Cleans StartedJobRegistry and FinishedJobRegistry of a queue."""
registry = FinishedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()
registry = StartedJobRegistry(name=queue.name, connection=queue.connection)
registry.cleanup()
24 changes: 22 additions & 2 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import (absolute_import, division, print_function,
unicode_literals)

from datetime import timedelta
import errno
import logging
import os
Expand All @@ -20,7 +21,7 @@
from .job import Job, JobStatus
from .logutils import setup_loghandlers
from .queue import get_failed_queue, Queue
from .registry import FinishedJobRegistry, StartedJobRegistry
from .registry import clean_registries, FinishedJobRegistry, StartedJobRegistry
from .suspension import is_suspended
from .timeouts import UnixSignalDeathPenalty
from .utils import enum, import_attribute, make_colorizer, utcformat, utcnow, utcparse
Expand Down Expand Up @@ -146,6 +147,7 @@ def __init__(self, queues, name=None,
self._stopped = False
self.log = logger
self.failed_queue = get_failed_queue(connection=self.connection)
self.maintenance_date = None

# By default, push the "move-to-failed-queue" exception handler onto
# the stack
Expand Down Expand Up @@ -402,6 +404,9 @@ def work(self, burst=False):
try:
self.check_for_suspension(burst)

if self.should_run_maintenance_tasks:
self.clean_registries()

if self.stopped:
self.log.info('Stopping on request.')
break
Expand Down Expand Up @@ -608,7 +613,7 @@ def handle_exception(self, job, *exc_info):
'arguments': job.args,
'kwargs': job.kwargs,
'queue': job.origin,
})
})

for handler in reversed(self._exc_handlers):
self.log.debug('Invoking exception handler %s' % (handler,))
Expand Down Expand Up @@ -646,6 +651,21 @@ def __hash__(self):
"""The hash does not take the database/connection into account"""
return hash(self.name)

def clean_registries(self):
"""Runs maintenance jobs on each Queue's registries."""
for queue in self.queues:
clean_registries(queue)
self.maintenance_date = utcnow()

@property
def should_run_maintenance_tasks(self):
"""Maintenance tasks should run on first startup or every hour."""
if self.maintenance_date is None:
return True
if (utcnow() - self.maintenance_date) > timedelta(seconds=3600):
return True
return False


class SimpleWorker(Worker):
def main_work_horse(self, *args, **kwargs):
Expand Down
19 changes: 17 additions & 2 deletions tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import (DeferredJobRegistry, FinishedJobRegistry,
StartedJobRegistry)
from rq.registry import (clean_registries, DeferredJobRegistry,
FinishedJobRegistry, StartedJobRegistry)

from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello
Expand Down Expand Up @@ -107,6 +107,21 @@ def test_get_job_count(self):
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)

def test_clean_registries(self):
"""clean_registries() cleans Started and Finished job registries."""

queue = Queue(connection=self.testconn)

finished_job_registry = FinishedJobRegistry(connection=self.testconn)
self.testconn.zadd(finished_job_registry.key, 1, 'foo')

started_job_registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(started_job_registry.key, 1, 'foo')

clean_registries(queue)
self.assertEqual(self.testconn.zcard(finished_job_registry.key), 0)
self.assertEqual(self.testconn.zcard(started_job_registry.key), 0)


class TestFinishedJobRegistry(RQTestCase):

Expand Down
42 changes: 42 additions & 0 deletions tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
unicode_literals)

import os
from datetime import timedelta
from time import sleep

from tests import RQTestCase, slow
Expand All @@ -15,6 +16,7 @@
from rq.job import Job, JobStatus
from rq.registry import StartedJobRegistry
from rq.suspension import resume, suspend
from rq.utils import utcnow


class CustomJob(Job):
Expand Down Expand Up @@ -401,3 +403,43 @@ def test_worker_sets_death(self):
death_date = w.death_date
self.assertIsNotNone(death_date)
self.assertEquals(type(death_date).__name__, 'datetime')

def test_clean_queue_registries(self):
"""worker.clean_registries sets maintenance_date and cleans registries."""
foo_queue = Queue('foo', connection=self.testconn)
foo_registry = StartedJobRegistry('foo', connection=self.testconn)
self.testconn.zadd(foo_registry.key, 1, 'foo')
self.assertEqual(self.testconn.zcard(foo_registry.key), 1)

bar_queue = Queue('bar', connection=self.testconn)
bar_registry = StartedJobRegistry('bar', connection=self.testconn)
self.testconn.zadd(bar_registry.key, 1, 'bar')
self.assertEqual(self.testconn.zcard(bar_registry.key), 1)

worker = Worker([foo_queue, bar_queue])
self.assertEqual(worker.maintenance_date, None)
worker.clean_registries()
self.assertNotEqual(worker.maintenance_date, None)
self.assertEqual(self.testconn.zcard(foo_registry.key), 0)
self.assertEqual(self.testconn.zcard(bar_registry.key), 0)

def test_should_run_maintenance_tasks(self):
"""Workers should run maintenance tasks on startup and every hour."""
queue = Queue(connection=self.testconn)
worker = Worker(queue)
self.assertTrue(worker.should_run_maintenance_tasks)

worker.maintenance_date = utcnow()
self.assertFalse(worker.should_run_maintenance_tasks)
worker.maintenance_date = utcnow() - timedelta(seconds=3700)
self.assertTrue(worker.should_run_maintenance_tasks)

def test_worker_calls_clean_registries(self):
"""Worker calls clean_registries when run."""
queue = Queue(connection=self.testconn)
registry = StartedJobRegistry(connection=self.testconn)
self.testconn.zadd(registry.key, 1, 'foo')

worker = Worker(queue, connection=self.testconn)
worker.work(burst=True)
self.assertEqual(self.testconn.zcard(registry.key), 0)