Skip to content

Commit

Permalink
Merge pull request #421 from selwin/finished-registry
Browse files Browse the repository at this point in the history
Added FinishedJobRegistry to monitor/view completed jobs
  • Loading branch information
nvie committed Sep 14, 2014
2 parents 18c6ea6 + 2307bc8 commit 45ac484
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 24 deletions.
53 changes: 41 additions & 12 deletions rq/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,17 @@
from .utils import current_timestamp


class StartedJobRegistry:
class BaseRegistry(object):
"""
Registry of currently executing jobs. Each queue maintains a StartedJobRegistry.
StartedJobRegistry contains job keys that are currently being executed.
Each key is scored by job's expiration time (datetime started + timeout).
Base implementation of job registry, implemented in Redis sorted set. Each job
is stored as a key in the registry, scored by expiration time (unix timestamp).
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
Jobs with scores are lower than current time is considered "expired" and
should be cleaned up.
"""

def __init__(self, name='default', connection=None):
self.name = name
self.key = 'rq:wip:%s' % name
self.connection = resolve_connection(connection)

def __len__(self):
Expand All @@ -28,7 +24,7 @@ def __len__(self):
@property
def count(self):
"""Returns the number of jobs in this registry"""
self.move_expired_jobs_to_failed_queue()
self.cleanup()
return self.connection.zcard(self.key)

def add(self, job, timeout, pipeline=None):
Expand All @@ -50,11 +46,28 @@ def get_expired_job_ids(self):

def get_job_ids(self, start=0, end=-1):
"""Returns list of all job ids."""
self.move_expired_jobs_to_failed_queue()
self.cleanup()
return [as_text(job_id) for job_id in
self.connection.zrange(self.key, start, end)]

def move_expired_jobs_to_failed_queue(self):

class StartedJobRegistry(BaseRegistry):
"""
Registry of currently executing jobs. Each queue maintains a
StartedJobRegistry. Jobs in this registry are ones that are currently
being executed.
Jobs are added to registry right before they are executed and removed
right after completion (success or failure).
Jobs whose score are lower than current time is considered "expired".
"""

def __init__(self, name='default', connection=None):
super(StartedJobRegistry, self).__init__(name, connection)
self.key = 'rq:wip:%s' % name

def cleanup(self):
"""Remove expired jobs from registry and add them to FailedQueue."""
job_ids = self.get_expired_job_ids()

Expand All @@ -63,6 +76,22 @@ def move_expired_jobs_to_failed_queue(self):
with self.connection.pipeline() as pipeline:
for job_id in job_ids:
failed_queue.push_job_id(job_id, pipeline=pipeline)
pipeline.zremrangebyscore(self.key, 0, current_timestamp())
pipeline.execute()

return job_ids


class FinishedJobRegistry(BaseRegistry):
"""
Registry of jobs that have been completed. Jobs are added to this
registry after they have successfully completed for monitoring purposes.
"""

def __init__(self, name='default', connection=None):
super(FinishedJobRegistry, self).__init__(name, connection)
self.key = 'rq:finished:%s' % name

def cleanup(self):
"""Remove expired jobs from registry."""
self.connection.zremrangebyscore(self.key, 0, current_timestamp())
12 changes: 8 additions & 4 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from .timeouts import UnixSignalDeathPenalty
from .utils import import_attribute, make_colorizer, utcformat, utcnow
from .version import VERSION
from .registry import StartedJobRegistry
from .registry import FinishedJobRegistry, StartedJobRegistry

try:
from procname import setprocname
Expand Down Expand Up @@ -496,7 +496,7 @@ def perform_job(self, job):
self.prepare_job_execution(job)

with self.connection._pipeline() as pipeline:
registry = StartedJobRegistry(job.origin, self.connection)
started_job_registry = StartedJobRegistry(job.origin)

try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
Expand All @@ -513,14 +513,18 @@ def perform_job(self, job):
job.ended_at = utcnow()
job._status = Status.FINISHED
job.save(pipeline=pipeline)

finished_job_registry = FinishedJobRegistry(job.origin)
finished_job_registry.add(job, result_ttl, pipeline)

job.cleanup(result_ttl, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)

pipeline.execute()

except Exception:
job.set_status(Status.FAILED, pipeline=pipeline)
registry.remove(job, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
pipeline.execute()
self.handle_exception(job, *sys.exc_info())
return False
Expand Down
51 changes: 43 additions & 8 deletions tests/test_job_started_registry.py → tests/test_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@
from rq.queue import FailedQueue, Queue
from rq.utils import current_timestamp
from rq.worker import Worker
from rq.registry import StartedJobRegistry
from rq.registry import FinishedJobRegistry, StartedJobRegistry

from tests import RQTestCase
from tests.fixtures import div_by_zero, say_hello


class TestQueue(RQTestCase):
class TestRegistry(RQTestCase):

def setUp(self):
super(TestQueue, self).setUp()
super(TestRegistry, self).setUp()
self.registry = StartedJobRegistry(connection=self.testconn)

def test_add_and_remove(self):
Expand All @@ -33,8 +33,9 @@ def test_add_and_remove(self):

def test_get_job_ids(self):
"""Getting job ids from StartedJobRegistry."""
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, 10, 'bar')
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, timestamp + 10, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 20, 'bar')
self.assertEqual(self.registry.get_job_ids(), ['foo', 'bar'])

def test_get_expired_job_ids(self):
Expand All @@ -51,8 +52,9 @@ def test_cleanup(self):
failed_queue = FailedQueue(connection=self.testconn)
self.assertTrue(failed_queue.is_empty())
self.testconn.zadd(self.registry.key, 1, 'foo')
self.registry.move_expired_jobs_to_failed_queue()
self.registry.cleanup()
self.assertIn('foo', failed_queue.job_ids)
self.assertEqual(self.testconn.zscore(self.registry.key, 'foo'), None)

def test_job_execution(self):
"""Job is removed from StartedJobRegistry after execution."""
Expand All @@ -79,7 +81,40 @@ def test_job_execution(self):

def test_get_job_count(self):
"""StartedJobRegistry returns the right number of job count."""
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, 10, 'bar')
timestamp = current_timestamp() + 10
self.testconn.zadd(self.registry.key, timestamp, 'foo')
self.testconn.zadd(self.registry.key, timestamp, 'bar')
self.assertEqual(self.registry.count, 2)
self.assertEqual(len(self.registry), 2)


class TestFinishedJobRegistry(RQTestCase):

def setUp(self):
super(TestFinishedJobRegistry, self).setUp()
self.registry = FinishedJobRegistry(connection=self.testconn)

def test_cleanup(self):
"""Finished job registry removes expired jobs."""
timestamp = current_timestamp()
self.testconn.zadd(self.registry.key, 1, 'foo')
self.testconn.zadd(self.registry.key, timestamp + 10, 'bar')

self.registry.cleanup()
self.assertEqual(self.registry.get_job_ids(), ['bar'])

def test_jobs_are_put_in_registry(self):
"""Completed jobs are added to FinishedJobRegistry."""
self.assertEqual(self.registry.get_job_ids(), [])
queue = Queue(connection=self.testconn)
worker = Worker([queue])

# Completed jobs are put in FinishedJobRegistry
job = queue.enqueue(say_hello)
worker.perform_job(job)
self.assertEqual(self.registry.get_job_ids(), [job.id])

# Failed jobs are not put in FinishedJobRegistry
failed_job = queue.enqueue(div_by_zero)
worker.perform_job(failed_job)
self.assertEqual(self.registry.get_job_ids(), [job.id])

0 comments on commit 45ac484

Please sign in to comment.