Skip to content

Commit

Permalink
try to fix rq/rq#1507
Browse files Browse the repository at this point in the history
the idea is to port the changes from the current version of rq's monitor_work_horse to the version
introduced by redash. This is a classic example why overriding a method that you don't own is
usually a bad idea. You miss on upstream fixes. Another point is that I'm not even sure if this
custom method is even necessary with the version of rq we're using. Maybe we should investigate it
more
  • Loading branch information
igorcalabria committed Jul 22, 2022
1 parent 2ab57bc commit a3fda7b
Showing 1 changed file with 14 additions and 5 deletions.
19 changes: 14 additions & 5 deletions redash/tasks/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,20 +113,20 @@ def monitor_work_horse(self, job, queue):
with UnixSignalDeathPenalty(
self.job_monitoring_interval, HorseMonitorTimeoutException
):
retpid, ret_val = os.waitpid(self._horse_pid, 0)
retpid, ret_val = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.heartbeat(self.job_monitoring_interval + 5)

job.refresh()
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())

if job.is_cancelled:
self.stop_executing_job(job)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)

self.maintain_heartbeats(job)
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
Expand All @@ -139,12 +139,21 @@ def monitor_work_horse(self, job, queue):
# Send a heartbeat to keep the worker alive.
self.heartbeat()

self.set_current_job_working_time(0)
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return
job_status = job.get_status()
if job_status is None: # Job completed and its ttl has expired
return
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
elif job_status == JobStatus.STOPPED:
# Work-horse killed deliberately
self.log.warning('Job stopped by user, moving job to FailedJobRegistry')
self.handle_job_failure(
job, queue=queue,
exc_string="Job stopped by user, work-horse terminated."
)
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:

if not job.ended_at:
job.ended_at = utcnow()
Expand Down

0 comments on commit a3fda7b

Please sign in to comment.