Skip to content

Commit

Permalink
Merge 0b770e0 into 4fc032b
Browse files Browse the repository at this point in the history
  • Loading branch information
WeatherGod committed Jan 9, 2017
2 parents 4fc032b + 0b770e0 commit a4a937d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
2 changes: 2 additions & 0 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ def perform_job(self, job, queue):
try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform()
job.refresh()

job.ended_at = utcnow()

Expand All @@ -711,6 +712,7 @@ def perform_job(self, job, queue):
started_job_registry=started_job_registry
)
except Exception:
job.refresh()
self.handle_job_failure(
job=job,
started_job_registry=started_job_registry
Expand Down
13 changes: 13 additions & 0 deletions tests/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ def access_self():
assert get_current_job() is not None


def modify_self(meta):
j = get_current_job()
j.meta.update(meta)
j.save()


def modify_self_and_error(meta):
j = get_current_job()
j.meta.update(meta)
j.save()
return 1 / 0


def echo(*args, **kwargs):
return (args, kwargs)

Expand Down
49 changes: 48 additions & 1 deletion tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
from tests import RQTestCase, slow
from tests.fixtures import (create_file, create_file_after_timeout,
div_by_zero, do_nothing, say_hello, say_pid,
run_dummy_heroku_worker, access_self)
run_dummy_heroku_worker, access_self,
modify_self, modify_self_and_error)
from tests.helpers import strip_microseconds

from rq import (get_failed_queue, Queue, SimpleWorker, Worker,
Expand Down Expand Up @@ -606,6 +607,52 @@ def new_enqueue_dependents(self, job, *args, **kwargs):
# So before that fix the call count was 4 instead of 3
self.assertEqual(mocked.call_count, 3)

def test_self_modification_persistence(self):
"""Make sure that any meta modification done by
the job itself persists completely through the
queue/worker/job stack."""
q = Queue()
# Also make sure that previously existing metadata
# persists properly
job = q.enqueue(modify_self, meta={'foo': 'bar', 'baz': 42},
args=[{'baz': 10, 'newinfo': 'waka'}])

w = Worker([q])
w.work(burst=True)

job_check = Job.fetch(job.id)
self.assertEqual(set(job_check.meta.keys()),
set(['foo', 'baz', 'newinfo']))
self.assertEqual(job_check.meta['foo'], 'bar')
self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka')

def test_self_modification_persistence_with_error(self):
"""Make sure that any meta modification done by
the job itself persists completely through the
queue/worker/job stack -- even if the job errored"""
q = Queue()
failed_q = get_failed_queue()
# Also make sure that previously existing metadata
# persists properly
job = q.enqueue(modify_self_and_error, meta={'foo': 'bar', 'baz': 42},
args=[{'baz': 10, 'newinfo': 'waka'}])

w = Worker([q])
w.work(burst=True)

# Postconditions
self.assertEqual(q.count, 0)
self.assertEqual(failed_q.count, 1)
self.assertEqual(w.get_current_job_id(), None)

job_check = Job.fetch(job.id)
self.assertEqual(set(job_check.meta.keys()),
set(['foo', 'baz', 'newinfo']))
self.assertEqual(job_check.meta['foo'], 'bar')
self.assertEqual(job_check.meta['baz'], 10)
self.assertEqual(job_check.meta['newinfo'], 'waka')


def kill_worker(pid, double_kill):
# wait for the worker to be started over on the main process
Expand Down

0 comments on commit a4a937d

Please sign in to comment.