Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Getting the currently executing job from within the job function #125

Closed
nvie opened this Issue · 21 comments

6 participants

@nvie
Owner

There is a need by people implementing jobs to be able to get at the currently executing Job instance (or its ID) from within the job function itself. For example, to communicate status or other meta information that RQ itself does not govern.

Given that #91 is already implemented, it seems illogical that there's no primitive to get to the current job.

@nvie
Owner

I'm thinking this API, using stack locals:

from rq import current_job

def my_job():
    job = current_job()
    job.meta.progress = 0.3
    ...

def outside_of_worker():
    job = current_job()     # raises NoJobExecutionContext
@selwin
Collaborator

If the intent of this API is to support job progress indicator, I think passing in the job to the callable is a lot more explicit. If the user needs to support progress indicator, he'd have to modify the callable anyway so adding an extra argument shouldn't be too bad.

Example:

def my_job(x, y, job):
    job.meta.progress = 0.3
    ...

q.enqueue(my_job, args=(1,2), accepts_job=True) #Not too sure about what to name accepts_job yet though

Or is this a bad idea? :)

@nvie
Owner

What I don't like about that particular implementation is that every job function needs to accept that new param (which is both tedious and backwards-incompatible). I feel that adding a flag to enqueue to enable this behaviour complicates RQ too much (it would require more documentation to explain how this works).

If you're familiar to Flask, you might be aware of the request or g context locals (which look and act as a global, but which are actually thread bound, thus safe). I like this concept very much and would fit the RQ use case very nicely, too.

@nvie nvie was assigned
@selwin
Collaborator

Well, in terms of documentation, new features, including current_job will always need to be documented, so there's no avoiding that, right?

Doing job = current_job() feels a little bit too magic to me.

Django also uses threadlocals in a few places (localization being one of them) but in general the core developers try very hard to avoid these, I haven't seen any new features using threadlocals being introduced in the past few years. I agree that there are circumstances where using thread storage is unavoidable (especially in frameworks like Flash and Django), but this doesn't feel it's one of them.

Just my two cents though :)

@bulkan

+1 to the API proposed by @nvie

@rubik

Why not setting the job as function attribute. This is how Celery does IIRC.

@nvie
Owner

Another issue I have with the job-as-an-explicit-arg is that functions get tied to RQ use only. Whereas currently, you can put any function in the background using RQ. This has been one of the key features of RQ from the beginning, and I'd really like to keep it that way. It is what makes it so easy to get started with RQ, since you don't have to adapt your existing functions to it (in most cases).

For the few cases where you'd want access to the current job, the local benefits (explicitness) don't outweigh the disadvantages elsewhere (RQ-awareness of all job functions), IMO.

Yes, using get_current_job will turn your function into an RQ-specific one, but at least the rest of your functions isn't, and they can still be invoked regularly.

@rubik

And so why not set an attribute to the function?

@job(foo=bar)
def task(arg, opt):
    task.job.meta.progress = .3
    # continue

You can catch the AttributeError if you need to.

@selwin
Collaborator

Just to summarize, there are now three proposed APIs. And here's roughly how each of them would be used to turn a callable into a job, while still maintaining its capability to be used as a regular function.

Option 1

from rq import current_job

@job(foo=bar)
def my_job():
    job = current_job()
    if job is not None:
        job.meta.progress = 0.3
    ...

Option 2

@job(foo=bar, accepts_job=True)
def my_job(job=None):
    if job is not None:
        job.meta.progress = 0.3
    ...

Option 3

@job(foo=bar)
def my_job():
    if hasattr('job', my_job):
        my_job.job.meta.progress = 0.3
    ...

Out of the three, I think I like @rubik 's suggestion (Option 3) best because it doesn't require introducing an extra flag to enqueue to enable this feature. It's implementation should also be quite straight forward.

EDIT: added accepts_job argument to Option 2.

@nvie
Owner

@selwin The examples seem a little contrived. In cases where you want to access the job, you are RQ-aware anyway. You could indeed go to lengths to be safe against non-RQ invocations, but that's up to yourself. My objections against the RQ-awareness involve the functions where you are not — and for good reasons don't want to be.

That leaves us with the original proposal (option 1) and @rubik's (option 3). Using the function as the vehicle to carry the job instance (option 3) looks like an effective vehicle to carry the job around: it does the trick, although I don't particularly like the aesthetics.

What's plain dangerous about it though, is that you're effectively assigning a property (job) to a global object (the job function), which isn't thread safe. This is of no harm when there is a single thread of execution (as is now), but is doomed to give unpredictable results in a multithreaded/multiprocessing context in the future. (Yes, we're working on that at the moment.)

Option 1 remains safe, because we will implement the "global" as a context local, which is safe by design in concurrent environments.

@nvie nvie closed this in 372de4b
@nvie nvie reopened this
@nvie
Owner

I implemented my proposal (see changes). It's fully functional and encourage everybody (especially the ones that aren't convinced of my proposal yet) to play with it.

It isn't merged into master yet.

@bulkan

Tried the proposed changes now. It's pretty much what I wanted !

@omarkhan

This is great, but doesn't appear to work with synchronous workers.

@nvie
Owner

What's a "synchronous worker"?

@omarkhan

Sorry, should have been clearer. I meant a Queue with async=False. From looking through the code, it appears that the job is only saved to redis after it has completed, so any call to get_current_job() will fail on attempting to retrieve the job from redis.

@bulkan

@nvie is there an ETA on this change going into master ? Is there anything that I can do to help ?

@nvie
Owner

@bulkan I'm planning on including this in 0.3.3, along with these features that still need implementations. If you can help with those, this will speed up things.

@perrygeo

I'm using the job-access-within-job branch and am not able to get it working.

I've got a simple job...

@job("default") 
def test_job(wait):
    for i in range(wait):
        time.sleep(1)

        cj = get_current_job()
        cj.meta.progress = float(i)/float(wait)

    return datetime.datetime.now()

which fails with the following:

Traceback (most recent call last):
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/worker.py", line 393, in perform_job
    rv = job.perform()
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 338, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/home/mperry/src/rq_test/swaps/swaps/jobs.py", line 15, in test_job
    cj = get_current_job()
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 63, in get_current_job
    return Job.fetch(job_id)
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 159, in fetch
    job = cls(id, connection=connection)
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 164, in __init__
    self.connection = resolve_connection(connection)
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/connections.py", line 64, in resolve_connection
    'Could not resolve a Redis connection.')
NoRedisConnectionException: Could not resolve a Redis connection.

If I comment out the cj lines, everything works as expected. Is there something I'm missing?

@nvie
Owner
@nvie
Owner

This went into 0.3.3.

@nvie nvie closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.