Skip to content

Commit

Permalink
test idle jobs using stale heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Lachish committed Nov 9, 2020
1 parent 7a45df5 commit 8d2ff55
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
5 changes: 4 additions & 1 deletion rq/worker.py
Expand Up @@ -764,7 +764,10 @@ def monitor_work_horse(self, job, queue):
with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
job.heartbeat(utcnow(), pipeline=pipeline)
pipeline.zadd(queue.started_job_registry.heartbeats_key, {job.id: current_timestamp()})
pipeline.zadd(
queue.started_job_registry.heartbeats_key,
{job.id: current_timestamp()}
)
pipeline.execute()

except OSError as e:
Expand Down
10 changes: 3 additions & 7 deletions tests/test_registry.py
Expand Up @@ -139,10 +139,7 @@ def test_cleanup_moves_expired_jobs_to_failed_job_registry(self):
queue = Queue(connection=self.testconn)
failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello)
job.worker_key = 'dummy-worker-key'
job.save()

self.testconn.set('dummy-worker-key', 1)
self.testconn.zadd(self.registry.key, {job.id: 2})

# Job has not been moved to FailedJobRegistry
Expand All @@ -157,15 +154,14 @@ def test_cleanup_moves_expired_jobs_to_failed_job_registry(self):
self.assertEqual(job.get_status(), JobStatus.FAILED)
self.assertTrue(job.exc_info) # explanation is written to exc_info

def test_cleanup_moves_zombie_jobs_to_failed_job_registry(self):
"""Moving zombie jobs (non-expired jobs that no living working is working on) to FailedJobRegistry."""
def test_cleanup_moves_idle_jobs_to_failed_job_registry(self):
"""Moving jobs that haven't had a heartbeat lately to FailedJobRegistry."""
queue = Queue(connection=self.testconn)
failed_job_registry = FailedJobRegistry(connection=self.testconn)
job = queue.enqueue(say_hello)
job.worker_key = 'dummy-worker-key'
job.save()

self.testconn.zadd(self.registry.key, {job.id: float('inf')})
self.testconn.zadd(self.registry.heartbeats_key, {job.id: current_timestamp() - 10})

self.registry.cleanup()
self.assertIn(job.id, failed_job_registry)
Expand Down

0 comments on commit 8d2ff55

Please sign in to comment.