Skip to content

Commit

Permalink
track elapsed_execution_time as part of worker
Browse files Browse the repository at this point in the history
  • Loading branch information
Omer Lachish committed Mar 16, 2021
1 parent 21fd4ea commit e9091bc
Showing 1 changed file with 20 additions and 7 deletions.
27 changes: 20 additions & 7 deletions rq/worker.py
Expand Up @@ -374,6 +374,16 @@ def _get_state(self):

state = property(_get_state, _set_state)

def set_elapsed_execution_time(self, elapsed_execution_time, pipeline=None):
self._elapsed_execution_time = elapsed_execution_time
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'elapsed_execution_time', elapsed_execution_time)

def get_elapsed_execution_time(self):
return self._elapsed_execution_time or 0

elapsed_execution_time = property(get_elapsed_execution_time, set_elapsed_execution_time)

def set_current_job_id(self, job_id, pipeline=None):
connection = pipeline if pipeline is not None else self.connection

Expand Down Expand Up @@ -752,9 +762,9 @@ def fork_work_horse(self, job, queue):
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))

def get_heartbeat_ttl(self, job, elapsed_execution_time):
def get_heartbeat_ttl(self, job):
if job.timeout and job.timeout > 0:
remaining_execution_time = job.timeout - elapsed_execution_time
remaining_execution_time = job.timeout - self.elapsed_execution_time
return min(remaining_execution_time, self.job_monitoring_interval) + 60
else:
return self.job_monitoring_interval + 60
Expand All @@ -775,18 +785,18 @@ def monitor_work_horse(self, job, queue):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
elapsed_execution_time = (utcnow() - job.started_at).total_seconds()
self.elapsed_execution_time = (utcnow() - job.started_at).total_seconds()

# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and elapsed_execution_time > (job.timeout + 60):
if job.timeout != -1 and self.elapsed_execution_time > (job.timeout + 60):
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break

with self.connection.pipeline() as pipeline:
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job, elapsed_execution_time)
ttl = self.get_heartbeat_ttl(job)
job.heartbeat(utcnow(), ttl, pipeline=pipeline)
pipeline.execute()

Expand Down Expand Up @@ -874,13 +884,16 @@ def prepare_job_execution(self, job):
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
heartbeat_ttl = self.get_heartbeat_ttl(job, 0)

with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.set_elapsed_execution_time(0, pipeline=pipeline)

heartbeat_ttl = self.get_heartbeat_ttl(job)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)

job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()

Expand Down Expand Up @@ -1112,7 +1125,7 @@ def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()"""
return self.perform_job(job, queue)

def get_heartbeat_ttl(self, job, _):
def get_heartbeat_ttl(self, job):
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL.
if job.timeout == -1:
return DEFAULT_WORKER_TTL
Expand Down

0 comments on commit e9091bc

Please sign in to comment.