Skip to content

Commit

Permalink
calculate heartbeat_ttl in an overrideable function + override it in …
Browse files Browse the repository at this point in the history
…SimpleWorker + move storing StartedJobRegistry scores to job.heartbeat()
  • Loading branch information
Omer Lachish committed Dec 28, 2020
1 parent 1dac2df commit 365ef93
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 19 deletions.
9 changes: 8 additions & 1 deletion rq/job.py
Expand Up @@ -387,10 +387,11 @@ def set_id(self, value):
raise TypeError('id must be a string, not {0}'.format(type(value)))
self._id = value

def heartbeat(self, heartbeat, pipeline=None):
def heartbeat(self, heartbeat, ttl, pipeline=None):
self.last_heartbeat = heartbeat
connection = pipeline if pipeline is not None else self.connection
connection.hset(self.key, 'last_heartbeat', utcformat(self.last_heartbeat))
self.started_job_registry.add(self, ttl, pipeline=pipeline)

id = property(get_id, set_id)

Expand Down Expand Up @@ -768,6 +769,12 @@ def cleanup(self, ttl=None, pipeline=None, remove_from_queue=True):
connection.expire(self.dependents_key, ttl)
connection.expire(self.dependencies_key, ttl)

@property
def started_job_registry(self):
from .registry import StartedJobRegistry
return StartedJobRegistry(self.origin, connection=self.connection,
job_class=self.__class__)

@property
def failed_job_registry(self):
from .registry import FailedJobRegistry
Expand Down
37 changes: 20 additions & 17 deletions rq/worker.py
Expand Up @@ -737,6 +737,10 @@ def fork_work_horse(self, job, queue):
self._horse_pid = child_pid
self.procline('Forked {0} at {1}'.format(child_pid, time.time()))

def get_heartbeat_ttl(self, job, elapsed_execution_time):
remaining_execution_time = job.timeout - elapsed_execution_time
return min(remaining_execution_time, self.job_monitoring_interval) + 60

def monitor_work_horse(self, job, queue):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
Expand All @@ -745,7 +749,6 @@ def monitor_work_horse(self, job, queue):

ret_val = None
job.started_at = utcnow()
heartbeat_ttl = self.job_monitoring_interval + 60
while True:
try:
with UnixSignalDeathPenalty(self.job_monitoring_interval, HorseMonitorTimeoutException):
Expand All @@ -754,18 +757,19 @@ def monitor_work_horse(self, job, queue):
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
elapsed_execution_time = (utcnow() - job.started_at).total_seconds()

# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and (utcnow() - job.started_at).total_seconds() > (job.timeout + 60):
self.heartbeat(heartbeat_ttl)
if job.timeout != -1 and elapsed_execution_time > (job.timeout + 60):
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break

with self.connection.pipeline() as pipeline:
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
queue.started_job_registry.add(job, heartbeat_ttl, pipeline=pipeline)
job.heartbeat(utcnow(), pipeline=pipeline)
self.heartbeat(self.job_monitoring_interval + 60, pipeline=pipeline)
ttl = self.get_heartbeat_ttl(job, elapsed_execution_time)
job.heartbeat(utcnow(), ttl, pipeline=pipeline)
pipeline.execute()

except OSError as e:
Expand Down Expand Up @@ -842,20 +846,17 @@ def setup_work_horse_signals(self):
signal.signal(signal.SIGINT, signal.SIG_IGN)
signal.signal(signal.SIGTERM, signal.SIG_DFL)

def prepare_job_execution(self, job, heartbeat_ttl=None):
def prepare_job_execution(self, job):
"""Performs misc bookkeeping like updating states prior to
job execution.
"""
if heartbeat_ttl is None:
heartbeat_ttl = self.job_monitoring_interval + 60
heartbeat_ttl = self.get_heartbeat_ttl(job, 0)

with self.connection.pipeline() as pipeline:
self.set_state(WorkerStatus.BUSY, pipeline=pipeline)
self.set_current_job_id(job.id, pipeline=pipeline)
self.heartbeat(heartbeat_ttl, pipeline=pipeline)
registry = StartedJobRegistry(job.origin, self.connection,
job_class=self.job_class)
registry.add(job, heartbeat_ttl, pipeline=pipeline)
job.heartbeat(utcnow(), heartbeat_ttl, pipeline=pipeline)
job.prepare_for_execution(self.name, pipeline=pipeline)
pipeline.execute()

Expand Down Expand Up @@ -954,7 +955,7 @@ def handle_job_success(self, job, queue, started_job_registry):
except WatchError:
continue

def perform_job(self, job, queue, heartbeat_ttl=None):
def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
Expand All @@ -963,7 +964,7 @@ def perform_job(self, job, queue, heartbeat_ttl=None):
started_job_registry = queue.started_job_registry

try:
self.prepare_job_execution(job, heartbeat_ttl)
self.prepare_job_execution(job)

job.started_at = utcnow()
timeout = job.timeout or self.queue_class.DEFAULT_TIMEOUT
Expand Down Expand Up @@ -1078,12 +1079,14 @@ class SimpleWorker(Worker):

def execute_job(self, job, queue):
"""Execute job in same thread/process, do not fork()"""
return self.perform_job(job, queue)

def get_heartbeat_ttl(self, job, _):
# "-1" means that jobs never timeout. In this case, we should _not_ do -1 + 60 = 59. We should just stick to DEFAULT_WORKER_TTL.
if job.timeout == -1:
timeout = DEFAULT_WORKER_TTL
return DEFAULT_WORKER_TTL
else:
timeout = (job.timeout or DEFAULT_WORKER_TTL) + 60
return self.perform_job(job, queue, heartbeat_ttl=timeout)
return (job.timeout or DEFAULT_WORKER_TTL) + 60


class HerokuWorker(Worker):
Expand Down
2 changes: 1 addition & 1 deletion tests/test_job.py
Expand Up @@ -219,7 +219,7 @@ def test_persistence_of_typical_jobs(self):
self.assertEqual(job.last_heartbeat, None)

ts = utcnow()
job.heartbeat(ts)
job.heartbeat(ts, 0)
self.assertEqual(job.last_heartbeat, ts)

def test_persistence_of_retry_data(self):
Expand Down

0 comments on commit 365ef93

Please sign in to comment.