diff --git a/rq/worker.py b/rq/worker.py index 20554ee99..8b472361d 100644 --- a/rq/worker.py +++ b/rq/worker.py @@ -536,29 +536,23 @@ def monitor_work_horse(self, job): # Job completed and its ttl has expired break if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]: - with self.connection._pipeline() as pipeline: - self.handle_job_failure( - job=job, - pipeline=pipeline + self.handle_job_failure( + job=job + ) + + #Unhandled failure: move the job to the failed queue + self.log.warning( + 'Moving job to {0!r} queue'.format( + self.failed_queue.name ) - try: - pipeline.execute() - except Exception: - pass - - #Unhandled failure: move the job to the failed queue - self.log.warning( - 'Moving job to {0!r} queue'.format( - self.failed_queue.name - ) - ) - self.failed_queue.quarantine( - job, - exc_info=( - "Work-horse proccess " - "was terminated unexpectedly" - ) + ) + self.failed_queue.quarantine( + job, + exc_info=( + "Work-horse proccess " + "was terminated unexpectedly" ) + ) break except OSError as e: # In case we encountered an OSError due to EINTR (which is @@ -631,8 +625,7 @@ def prepare_job_execution(self, job): def handle_job_failure( self, job, - started_job_registry=None, - pipeline=None + started_job_registry=None ): """Handles the failure or an executing job by: 1. Setting the job status to failed @@ -640,89 +633,99 @@ def handle_job_failure( 3. Setting the workers current job to None """ - if started_job_registry is None: - started_job_registry = StartedJobRegistry( - job.origin, - self.connection - ) - job.set_status(JobStatus.FAILED, pipeline=pipeline) - started_job_registry.remove(job, pipeline=pipeline) - self.set_current_job_id(None, pipeline=pipeline) - - def perform_job(self, job, queue): - """Performs the actual work of a job. Will/should only be called - inside the work horse's process. - """ - self.prepare_job_execution(job) + with self.connection._pipeline() as pipeline: + if started_job_registry is None: + started_job_registry = StartedJobRegistry( + job.origin, + self.connection + ) + job.set_status(JobStatus.FAILED, pipeline=pipeline) + started_job_registry.remove(job, pipeline=pipeline) + self.set_current_job_id(None, pipeline=pipeline) + try: + pipeline.execute() + except Exception: + # Ensure that custom exception handlers are called + # even if Redis is down + pass + def handle_job_success( + self, + job, + queue, + started_job_registry + ): with self.connection._pipeline() as pipeline: + while True: + try: + # if dependencies are inserted after enqueue_dependents + # a WatchError is thrown by execute() + pipeline.watch(job.dependents_key) + # enqueue_dependents calls multi() on the pipeline! + queue.enqueue_dependents(job, pipeline=pipeline) - push_connection(self.connection) + self.set_current_job_id(None, pipeline=pipeline) - started_job_registry = StartedJobRegistry(job.origin, self.connection) + result_ttl = job.get_result_ttl(self.default_result_ttl) + if result_ttl != 0: + job.set_status(JobStatus.FINISHED, pipeline=pipeline) + job.save(pipeline=pipeline) - try: - with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): - rv = job.perform() + finished_job_registry = FinishedJobRegistry(job.origin, + self.connection) + finished_job_registry.add(job, result_ttl, pipeline) - # Pickle the result in the same try-except block since we need - # to use the same exc handling when pickling fails - job._result = rv + job.cleanup(result_ttl, pipeline=pipeline, + remove_from_queue=False) + started_job_registry.remove(job, pipeline=pipeline) - result_ttl = job.get_result_ttl(self.default_result_ttl) - if result_ttl != 0: - job.ended_at = utcnow() + pipeline.execute() + break + except WatchError: + continue - while True: - try: - # if dependencies are inserted after enqueue_dependents - # a WatchError is thrown by execute() - pipeline.watch(job.dependents_key) - # enqueue_dependents calls multi() on the pipeline! - queue.enqueue_dependents(job, pipeline=pipeline) + def perform_job(self, job, queue): + """Performs the actual work of a job. Will/should only be called + inside the work horse's process. + """ + self.prepare_job_execution(job) - self.set_current_job_id(None, pipeline=pipeline) + push_connection(self.connection) - if result_ttl != 0: - job.set_status(JobStatus.FINISHED, pipeline=pipeline) - job.save(pipeline=pipeline) + started_job_registry = StartedJobRegistry(job.origin, self.connection) - finished_job_registry = FinishedJobRegistry(job.origin, - self.connection) - finished_job_registry.add(job, result_ttl, pipeline) + try: + with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT): + rv = job.perform() - job.cleanup(result_ttl, pipeline=pipeline, - remove_from_queue=False) - started_job_registry.remove(job, pipeline=pipeline) + job.ended_at = utcnow() - pipeline.execute() - break - except WatchError: - continue + # Pickle the result in the same try-except block since we need + # to use the same exc handling when pickling fails + job._result = rv - except Exception: - self.handle_job_failure( - job=job, - started_job_registry=started_job_registry, - pipeline=pipeline - ) - try: - pipeline.execute() - except Exception: - # Ensure that custom exception handlers are called - # even if Redis is down - pass - self.handle_exception(job, *sys.exc_info()) - return False + self.handle_job_success( + job=job, + queue=queue, + started_job_registry=started_job_registry + ) + except Exception: + self.handle_job_failure( + job=job, + started_job_registry=started_job_registry + ) + self.handle_exception(job, *sys.exc_info()) + return False - finally: - pop_connection() + finally: + pop_connection() self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id)) if rv is not None: log_result = "{0!r}".format(as_text(text_type(rv))) self.log.debug('Result: {0}'.format(yellow(log_result))) + result_ttl = job.get_result_ttl(self.default_result_ttl) if result_ttl == 0: self.log.info('Result discarded immediately') elif result_ttl > 0: diff --git a/tests/test_worker.py b/tests/test_worker.py index f09a2265d..dd4d0fea2 100644 --- a/tests/test_worker.py +++ b/tests/test_worker.py @@ -578,7 +578,7 @@ def test_job_dependency_race_condition(self): def new_enqueue_dependents(self, job, *args, **kwargs): orig_enqueue_dependents(self, job, *args, **kwargs) - if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue.id == job.id: + if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue is not None and Queue._add_enqueue.id == job.id: Queue._add_enqueue = None Queue().enqueue_call(say_hello, depends_on=job)