Skip to content
This repository
  • 9 commits
  • 6 files changed
  • 0 comments
  • 2 contributors
6 CHANGES.md
Source Rendered
... ... @@ -1,6 +1,12 @@
1 1 ### 0.3.3
2 2 (not released yet)
3 3
  4 +- Jobs can now access the current `Job` instance from within. Relevant
  5 + documentation [here](http://python-rq.org/docs/jobs/).
  6 +
  7 +- Custom properties can be set by modifying the `job.meta` dict. Relevant
  8 + documentation [here](http://python-rq.org/docs/jobs/).
  9 +
4 10 - Custom properties can be set by modifying the `job.meta` dict. Relevant
5 11 documentation [here](http://python-rq.org/docs/jobs/).
6 12
3  rq/__init__.py
@@ -3,6 +3,7 @@
3 3 from .connections import Connection
4 4 from .queue import Queue, get_failed_queue
5 5 from .job import cancel_job, requeue_job
  6 +from .job import get_current_job
6 7 from .worker import Worker
7 8 from .version import VERSION
8 9
@@ -11,5 +12,5 @@
11 12 'use_connection', 'get_current_connection',
12 13 'push_connection', 'pop_connection', 'Connection',
13 14 'Queue', 'get_failed_queue', 'Worker',
14   - 'cancel_job', 'requeue_job']
  15 + 'cancel_job', 'requeue_job', 'get_current_job']
15 16 __version__ = VERSION
24 rq/job.py
@@ -49,6 +49,16 @@ def requeue_job(job_id, connection=None):
49 49 fq.requeue(job_id)
50 50
51 51
  52 +def get_current_job():
  53 + """Returns the Job instance that is currently being executed. If this
  54 + function is invoked from outside a job context, None is returned.
  55 + """
  56 + job_id = _job_stack.top
  57 + if job_id is None:
  58 + return None
  59 + return Job.fetch(job_id)
  60 +
  61 +
52 62 class Job(object):
53 63 """A Job is just a convenient datastructure to pass around job (meta) data.
54 64 """
@@ -310,9 +320,12 @@ def delete(self):
310 320
311 321 # Job execution
312 322 def perform(self): # noqa
313   - """Invokes the job function with the job arguments.
314   - """
315   - self._result = self.func(*self.args, **self.kwargs)
  323 + """Invokes the job function with the job arguments."""
  324 + _job_stack.push(self.id)
  325 + try:
  326 + self._result = self.func(*self.args, **self.kwargs)
  327 + finally:
  328 + assert self.id == _job_stack.pop()
316 329 return self._result
317 330
318 331
@@ -342,7 +355,7 @@ def __hash__(self):
342 355
343 356
344 357 # Backwards compatibility for custom properties
345   - def __getattr__(self, name):
  358 + def __getattr__(self, name): # noqa
346 359 import warnings
347 360 warnings.warn(
348 361 "Getting custom properties from the job instance directly "
@@ -379,3 +392,6 @@ def __setattr__(self, name, value):
379 392 SyntaxWarning)
380 393
381 394 self.__dict__['meta'][name] = value
  395 +
  396 +
  397 +_job_stack = LocalStack()
2  rq/queue.py
@@ -172,9 +172,9 @@ def enqueue_job(self, job, timeout=None, set_meta_data=True):
172 172 job.timeout = timeout # _timeout_in_seconds(timeout)
173 173 else:
174 174 job.timeout = 180 # default
  175 + job.save()
175 176
176 177 if self._async:
177   - job.save()
178 178 self.push_job_id(job.id)
179 179 else:
180 180 job.perform()
6 tests/fixtures.py
@@ -5,6 +5,7 @@
5 5 import time
6 6 from rq import Connection
7 7 from rq.decorators import job
  8 +from rq import get_current_job
8 9
9 10
10 11 def say_hello(name=None):
@@ -43,6 +44,11 @@ def create_file_after_timeout(path, timeout):
43 44 create_file(path)
44 45
45 46
  47 +def access_self():
  48 + job = get_current_job()
  49 + return job.id
  50 +
  51 +
46 52 class Calculator(object):
47 53 """Test instance methods."""
48 54 def __init__(self, denominator):
25 tests/test_job.py
... ... @@ -1,11 +1,12 @@
1 1 import times
2 2 from datetime import datetime
3 3 from tests import RQTestCase
4   -from tests.fixtures import Calculator, some_calculation, say_hello
  4 +from tests.fixtures import Calculator, some_calculation, say_hello, access_self
5 5 from tests.helpers import strip_milliseconds
6 6 from cPickle import loads
7   -from rq.job import Job
  7 +from rq.job import Job, get_current_job
8 8 from rq.exceptions import NoSuchJobError, UnpickleError
  9 +from rq.queue import Queue
9 10
10 11
11 12 class TestJob(RQTestCase):
@@ -198,3 +199,23 @@ def test_result_ttl_is_persisted(self):
198 199 job.save()
199 200 job_from_queue = Job.fetch(job.id, connection=self.testconn)
200 201 self.assertEqual(job.result_ttl, None)
  202 +
  203 + def test_job_access_within_job_function(self):
  204 + """The current job is accessible within the job function."""
  205 + # Executing the job function from outside of RQ throws an exception
  206 + self.assertIsNone(get_current_job())
  207 +
  208 + # Executing the job function from within the job works (and in
  209 + # this case leads to the job ID being returned)
  210 + job = Job.create(func=access_self)
  211 + job.save()
  212 + id = job.perform()
  213 + self.assertEqual(job.id, id)
  214 + self.assertEqual(job.func, access_self)
  215 +
  216 + # Ensure that get_current_job also works from within synchronous jobs
  217 + queue = Queue(async=False)
  218 + job = queue.enqueue(access_self)
  219 + id = job.perform()
  220 + self.assertEqual(job.id, id)
  221 + self.assertEqual(job.func, access_self)

No commit comments for this range

Something went wrong with that request. Please try again.