New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Cleanup zombie worker leftovers as part of StartedJobRegistry's cleanup() #1372
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1372 +/- ##
==========================================
- Coverage 95.08% 94.98% -0.10%
==========================================
Files 44 44
Lines 6039 5939 -100
==========================================
- Hits 5742 5641 -101
- Misses 297 298 +1
Continue to review full report at Codecov.
|
Hey there, this is an interesting addition, but I think this needs a little bit of tweaking. This operation is potentially expensive to run on systems with a large number of workers running at the same time (hundreds or thousands), since it would verify that all workers of jobs in There are a few ways we can approach this. Approach 1
Approach 2This PR adds Note that I've also been working on adding |
Thanks @selwin. Approach 2 (+
|
No, "as part of the heartbeat process" is correct, it's this line here. We store the timeout in a sorted set, and the cleanup process periodically check whether any job has "expired". After we implement this, the cleanup process can rely on this sorted set as part of it's cleaning process (this is better than checking StartedJobRegistry for expired jobs). In most cases we'll be able to detect dead jobs sooner.
I think |
Ok, we are talking about the same thing. I meant "look for out of date heartbeats as part of the cleanup process". This does lead to my second question - how can we determine the right interval when running in the scope of the cleanup process? Passing down the monitoring interval from the worker to the cleanup method seems to be the easiest way, but makes the registry APIs uneven in terms of arguments. |
The expiry is set during the heartbeat process, which is now + 60 seconds. The cleanup process simply looks for heartbeats whose value is lower than the current timestamp and moves those jobs to |
Makes sense @selwin. Now using a sorted set for heartbeats per registry. |
We can actually simplify this PR by quite a bit.
I think that's pretty much the changes we need to do. |
Oh that makes total sense. I was working under the false assumption that the old non-heartbeats scores had to live side-by-side with heartbeats, but heartbeats do make a better indicator for a job's WIP status. Can you have another look? |
rq/worker.py
Outdated
self.kill_horse() | ||
self.wait_for_horse() | ||
break | ||
|
||
with self.connection.pipeline() as pipeline: | ||
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline) | ||
self.heartbeat(heartbeat_ttl, pipeline=pipeline) | ||
queue.started_job_registry.add(job, heartbeat_ttl, pipeline=pipeline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to job.heartbeat()
? I think it makes more sense that way.
Two more things that we need to take into account:
To solve the two issues, I think we should add a
|
rq/worker.py
Outdated
@@ -737,6 +737,10 @@ def fork_work_horse(self, job, queue): | |||
self._horse_pid = child_pid | |||
self.procline('Forked {0} at {1}'.format(child_pid, time.time())) | |||
|
|||
def get_heartbeat_ttl(self, job, elapsed_execution_time): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing elapsed_execution_time
around, I think it makes sense for the worker to keep track of the start of job execution time. Can we make this change?
…SimpleWorker + move storing StartedJobRegistry scores to job.heartbeat()
|
||
with self.connection.pipeline() as pipeline: | ||
self.set_state(WorkerStatus.BUSY, pipeline=pipeline) | ||
self.set_current_job_id(job.id, pipeline=pipeline) | ||
self.set_current_job_working_time(0, pipeline=pipeline) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this needs to be moved to monitor_work_horse()
after job has finished so this will be set to zero after job execution finishes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. I added it to monitor_work_horse
, but I also kept it here in case a work horse dies (due to an OSError
for example) without resetting it. WDYT?
@@ -374,6 +375,11 @@ def _get_state(self): | |||
|
|||
state = property(_get_state, _set_state) | |||
|
|||
def set_current_job_working_time(self, current_job_working_time, pipeline=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I initially thought that this can be a simple variable tracked by the worker itself. Thanks for persisting this in Redis, this is also a useful information to have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@selwin <GentlePing /> |
Sorry I missed this. Thanks! |
Turns out this change broke the public interface for Workers when it removed the heartbeat_ttl argument from Worker.prepare_job_execution. michaelbrooks/rq-win#11 |
When workers die ungracefully, jobs without a timeout will remain in the
StartedJobRegistry
forever.This PR cleans those up as part of the routine
cleanup()
call inStartedJobRegistry
.This should solve #1164