Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support querying the worker for the currently executing job. #269

Closed
wants to merge 4 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 33 additions & 2 deletions rq/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ def find_by_key(cls, worker_key, connection=None):
worker = cls([], name, connection=connection)
queues = as_text(connection.hget(worker.key, 'queues'))
worker._state = connection.hget(worker.key, 'state') or '?'
worker._job_id = connection.hget(worker.key, 'current_job') or None
if queues:
worker.queues = [Queue(queue, connection=connection)
for queue in queues.split(',')]
Expand Down Expand Up @@ -215,15 +216,41 @@ def register_death(self):
p.expire(self.key, 60)
p.execute()

def set_state(self, new_state):
def set_state(self, new_state, pipeline=None):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the pipeline kwarg in these setters is a Good Thing™, but they aren't used when accessed through the .state property. Actually, I don't think it was good design (I'm fully to blame here!) to have implicit IO when setting a property, because both property access and assignment should have side effects.

I'm fine adding this .set_state(..., pipeline=None) change when we stop using it through the .state property, and I think we should remove it (via proper deprecation, that is, for those who rely on it now). Would you feel comfortable adding that, perhaps?

Same goes for the job_id, which I think is a fine addition. So thanks for that!

self._state = new_state
self.connection.hset(self.key, 'state', new_state)

connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'state', new_state)

def get_state(self):
return self._state

state = property(get_state, set_state)

def set_job_id(self, new_job_id, pipeline=None):
self._job_id = new_job_id

connection = pipeline if pipeline is not None else self.connection

if new_job_id is None:
connection.hdel(self.key, 'current_job')
else:
connection.hset(self.key, 'current_job', new_job_id)

def get_job_id(self):
return self._job_id

job_id = property(get_job_id, set_job_id)

def get_current_job(self):
"""Returns the job id of the currently executing job."""
job_id = self.get_job_id()

if job_id is None:
return None

return Job.safe_fetch(job_id)

@property
def stopped(self):
return self._stopped
Expand Down Expand Up @@ -318,6 +345,8 @@ def work(self, burst=False): # noqa
self.state = 'busy'

job, queue = result
self.job_id = job.id

# Use the public setter here, to immediately update Redis
job.status = Status.STARTED
self.log.info('%s: %s (%s)' % (green(queue.name),
Expand All @@ -326,6 +355,8 @@ def work(self, burst=False): # noqa
self.connection.expire(self.key, (job.timeout or Queue.DEFAULT_TIMEOUT) + 60)
self.fork_and_perform_job(job)
self.connection.expire(self.key, self.default_worker_ttl)
self.job_id = None

if job.status == 'finished':
queue.enqueue_dependents(job)

Expand Down