Getting the currently executing job from within the job function #125

Closed
nvie opened this Issue Sep 2, 2012 · 21 comments

Comments

Projects
None yet
6 participants
Collaborator

nvie commented Sep 2, 2012

There is a need by people implementing jobs to be able to get at the currently executing Job instance (or its ID) from within the job function itself. For example, to communicate status or other meta information that RQ itself does not govern.

Given that #91 is already implemented, it seems illogical that there's no primitive to get to the current job.

Collaborator

nvie commented Sep 2, 2012

I'm thinking this API, using stack locals:

from rq import current_job

def my_job():
    job = current_job()
    job.meta.progress = 0.3
    ...

def outside_of_worker():
    job = current_job()     # raises NoJobExecutionContext
Collaborator

selwin commented Sep 2, 2012

If the intent of this API is to support job progress indicator, I think passing in the job to the callable is a lot more explicit. If the user needs to support progress indicator, he'd have to modify the callable anyway so adding an extra argument shouldn't be too bad.

Example:

def my_job(x, y, job):
    job.meta.progress = 0.3
    ...

q.enqueue(my_job, args=(1,2), accepts_job=True) #Not too sure about what to name accepts_job yet though

Or is this a bad idea? :)

Collaborator

nvie commented Sep 2, 2012

What I don't like about that particular implementation is that every job function needs to accept that new param (which is both tedious and backwards-incompatible). I feel that adding a flag to enqueue to enable this behaviour complicates RQ too much (it would require more documentation to explain how this works).

If you're familiar to Flask, you might be aware of the request or g context locals (which look and act as a global, but which are actually thread bound, thus safe). I like this concept very much and would fit the RQ use case very nicely, too.

@ghost ghost assigned nvie Sep 2, 2012

Collaborator

selwin commented Sep 2, 2012

Well, in terms of documentation, new features, including current_job will always need to be documented, so there's no avoiding that, right?

Doing job = current_job() feels a little bit too magic to me.

Django also uses threadlocals in a few places (localization being one of them) but in general the core developers try very hard to avoid these, I haven't seen any new features using threadlocals being introduced in the past few years. I agree that there are circumstances where using thread storage is unavoidable (especially in frameworks like Flash and Django), but this doesn't feel it's one of them.

Just my two cents though :)

bulkan commented Sep 2, 2012

+1 to the API proposed by @nvie

rubik commented Sep 2, 2012

Why not setting the job as function attribute. This is how Celery does IIRC.

Collaborator

nvie commented Sep 2, 2012

Another issue I have with the job-as-an-explicit-arg is that functions get tied to RQ use only. Whereas currently, you can put any function in the background using RQ. This has been one of the key features of RQ from the beginning, and I'd really like to keep it that way. It is what makes it so easy to get started with RQ, since you don't have to adapt your existing functions to it (in most cases).

For the few cases where you'd want access to the current job, the local benefits (explicitness) don't outweigh the disadvantages elsewhere (RQ-awareness of all job functions), IMO.

Yes, using get_current_job will turn your function into an RQ-specific one, but at least the rest of your functions isn't, and they can still be invoked regularly.

rubik commented Sep 2, 2012

And so why not set an attribute to the function?

@job(foo=bar)
def task(arg, opt):
    task.job.meta.progress = .3
    # continue

You can catch the AttributeError if you need to.

Collaborator

selwin commented Sep 2, 2012

Just to summarize, there are now three proposed APIs. And here's roughly how each of them would be used to turn a callable into a job, while still maintaining its capability to be used as a regular function.

Option 1

from rq import current_job

@job(foo=bar)
def my_job():
    job = current_job()
    if job is not None:
        job.meta.progress = 0.3
    ...

Option 2

@job(foo=bar, accepts_job=True)
def my_job(job=None):
    if job is not None:
        job.meta.progress = 0.3
    ...

Option 3

@job(foo=bar)
def my_job():
    if hasattr('job', my_job):
        my_job.job.meta.progress = 0.3
    ...

Out of the three, I think I like @rubik 's suggestion (Option 3) best because it doesn't require introducing an extra flag to enqueue to enable this feature. It's implementation should also be quite straight forward.

EDIT: added accepts_job argument to Option 2.

Collaborator

nvie commented Sep 2, 2012

@selwin The examples seem a little contrived. In cases where you want to access the job, you are RQ-aware anyway. You could indeed go to lengths to be safe against non-RQ invocations, but that's up to yourself. My objections against the RQ-awareness involve the functions where you are not — and for good reasons don't want to be.

That leaves us with the original proposal (option 1) and @rubik's (option 3). Using the function as the vehicle to carry the job instance (option 3) looks like an effective vehicle to carry the job around: it does the trick, although I don't particularly like the aesthetics.

What's plain dangerous about it though, is that you're effectively assigning a property (job) to a global object (the job function), which isn't thread safe. This is of no harm when there is a single thread of execution (as is now), but is doomed to give unpredictable results in a multithreaded/multiprocessing context in the future. (Yes, we're working on that at the moment.)

Option 1 remains safe, because we will implement the "global" as a context local, which is safe by design in concurrent environments.

@nvie nvie closed this in 372de4b Sep 2, 2012

@nvie nvie reopened this Sep 2, 2012

Collaborator

nvie commented Sep 2, 2012

I implemented my proposal (see changes). It's fully functional and encourage everybody (especially the ones that aren't convinced of my proposal yet) to play with it.

It isn't merged into master yet.

bulkan commented Sep 3, 2012

Tried the proposed changes now. It's pretty much what I wanted !

Contributor

omarkhan commented Sep 12, 2012

+1

Contributor

omarkhan commented Sep 12, 2012

This is great, but doesn't appear to work with synchronous workers.

Collaborator

nvie commented Sep 12, 2012

What's a "synchronous worker"?

Contributor

omarkhan commented Sep 13, 2012

Sorry, should have been clearer. I meant a Queue with async=False. From looking through the code, it appears that the job is only saved to redis after it has completed, so any call to get_current_job() will fail on attempting to retrieve the job from redis.

bulkan commented Sep 16, 2012

@nvie is there an ETA on this change going into master ? Is there anything that I can do to help ?

Collaborator

nvie commented Sep 19, 2012

@bulkan I'm planning on including this in 0.3.3, along with these features that still need implementations. If you can help with those, this will speed up things.

I'm using the job-access-within-job branch and am not able to get it working.

I've got a simple job...

@job("default") 
def test_job(wait):
    for i in range(wait):
        time.sleep(1)

        cj = get_current_job()
        cj.meta.progress = float(i)/float(wait)

    return datetime.datetime.now()

which fails with the following:

Traceback (most recent call last):
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/worker.py", line 393, in perform_job
    rv = job.perform()
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 338, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File "/home/mperry/src/rq_test/swaps/swaps/jobs.py", line 15, in test_job
    cj = get_current_job()
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 63, in get_current_job
    return Job.fetch(job_id)
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 159, in fetch
    job = cls(id, connection=connection)
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/job.py", line 164, in __init__
    self.connection = resolve_connection(connection)
  File "/home/mperry/src/rq_test/env/local/lib/python2.7/site-packages/rq/connections.py", line 64, in resolve_connection
    'Could not resolve a Redis connection.')
NoRedisConnectionException: Could not resolve a Redis connection.

If I comment out the cj lines, everything works as expected. Is there something I'm missing?

Collaborator

nvie commented Oct 23, 2012

Can you try passing an explicit connection argument to the @job
decorator? I think this shouldn't even be allowed, in the same way that a
Queue can't be constructed without a connection.

Collaborator

nvie commented Jan 21, 2013

This went into 0.3.3.

@nvie nvie closed this Jan 21, 2013

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment