diff --git a/rq/worker.py b/rq/worker.py index 0744a8da5..c52c21cc9 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -544,28 +544,7 @@ def monitor_work_horse(self, job): """ while True: try: - _, ret_val = os.waitpid(self._horse_pid, 0) - if ret_val != os.EX_OK: - job_status = job.get_status() - if job_status is None: - # Job completed and its ttl has expired - break - if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - self.handle_job_failure(job=job) - - # Unhandled failure: move the job to the failed queue - self.log.warning( - 'Moving job to {0!r} queue'.format( - self.failed_queue.name - ) - ) - self.failed_queue.quarantine( - job, - exc_info=( - "Work-horse process " - "was terminated unexpectedly" - ) - ) + self._monitor_work_horse_tick(job) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -577,6 +556,29 @@ def monitor_work_horse(self, job): if e.errno != errno.EINTR: raise + def _monitor_work_horse_tick(self, job): + _, ret_val = os.waitpid(self._horse_pid, 0) + if ret_val == os.EX_OK: # The process exited normally. + return + job_status = job.get_status() + if job_status is None: # Job completed and its ttl has expired + return + if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: + self.handle_job_failure(job=job) + + # Unhandled failure: move the job to the failed queue + self.log.warning(( + 'Moving job to {0!r} queue ' + '(work-horse terminated unexpectedly; waitpid returned {1})' + ).format(self.failed_queue.name, ret_val)) + self.failed_queue.quarantine( + job, + exc_info=( + "Work-horse process was terminated unexpectedly " + "(waitpid returned {0})" + ).format(ret_val) + ) + def execute_job(self, job, queue): """Spawns a work horse to perform the actual work and passes it a job. The worker will wait for the work horse and make sure it executes