New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: added job heartbeat to track whether job is actually executing #1349
Conversation
heartbeat might be needed in cases when worker was hardkilled or the whole VM/docker was forcibly rebooted.
Codecov Report
@@ Coverage Diff @@
## master #1349 +/- ##
==========================================
- Coverage 94.98% 94.90% -0.08%
==========================================
Files 41 43 +2
Lines 5599 5728 +129
==========================================
+ Hits 5318 5436 +118
- Misses 281 292 +11
Continue to review full report at Codecov.
|
rq/worker.py
Outdated
@@ -729,6 +730,8 @@ def monitor_work_horse(self, job, queue): | |||
self.wait_for_horse() | |||
break | |||
|
|||
job.set_heartbeat(utcnow()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to job and worker heartbeat should be pipelined.
rq/job.py
Outdated
@@ -351,6 +351,7 @@ def __init__(self, id=None, connection=None, serializer=None): | |||
# retry_intervals is a list of int e.g [60, 120, 240] | |||
self.retry_intervals = None | |||
self.redis_server_version = None | |||
self.heartbeat = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be renamed to last_heartbeat
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
job.last_heartbeat
should also be added to the docs here: https://github.com/rq/rq/blob/master/docs/docs/jobs.md
rq/job.py
Outdated
@@ -384,6 +385,21 @@ def set_id(self, value): | |||
raise TypeError('id must be a string, not {0}'.format(type(value))) | |||
self._id = value | |||
|
|||
def set_heartbeat(self, heartbeat, pipeline=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we also rename this to heartbeat()
so that it's consistent with worker.heartbeat()
?
rq/job.py
Outdated
def get_heartbeat(self, refresh=True): | ||
if refresh: | ||
raw = self.connection.hget(self.key, 'heartbeat') | ||
if raw: | ||
self.last_heartbeat = str_to_date(raw) | ||
else: | ||
self.last_heartbeat = None | ||
|
||
return self.last_heartbeat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove this method, it's unnecessary. When we fetch job
from Redis, you can already call job.last_heartbeat
. If you want to refresh job metadata from, call job.refresh()
.
rq/job.py
Outdated
@@ -530,6 +547,7 @@ def to_dict(self, include_meta=True): | |||
'data': zlib.compress(self.data), | |||
'started_at': utcformat(self.started_at) if self.started_at else '', | |||
'ended_at': utcformat(self.ended_at) if self.ended_at else '', | |||
'heartbeat': utcformat(self.last_heartbeat) if self.last_heartbeat else '', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind changing this to last_heartbeat
to keep things consistent?
rq/worker.py
Outdated
@@ -713,6 +713,7 @@ def monitor_work_horse(self, job, queue): | |||
|
|||
ret_val = None | |||
job.started_at = utcnow() | |||
job.heartbeat(job.started_at) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first heartbeat should be located in prepare_job_execution() so it can be pipelined with job status changes etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer needed since the first heartbeat is already set in prepare_job_execution()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't forget to remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx, done, sorry for being messy
done |
docs/docs/jobs.md
Outdated
@@ -121,6 +121,7 @@ Some interesting job attributes include: | |||
* `job.started_at` | |||
* `job.ended_at` | |||
* `job.exc_info` stores exception information if job doesn't finish successfully. | |||
* `job.get_heartbeat()` returns last heartbeat of the job indicating last time the job was executing. Can be used to determine if a worker was killed forcely and to mark the job as `failed`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be updated to job.last_heartbeat
- the latest timestamp that's periodically updated when the job is executing. Can be used to determine if the job is still active.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Thanks! |
heartbeat might be needed in cases when worker was hardkilled or the whole VM/docker was forcibly rebooted.