Skip to content

Commit

Permalink
refactored worker code
Browse files Browse the repository at this point in the history
Moved code into a new handle_job_success() method and reduced context of used
pipelines.
  • Loading branch information
th3hamm0r committed Sep 22, 2016
1 parent 44f9869 commit a0cee2d
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 86 deletions.
173 changes: 88 additions & 85 deletions rq/worker.py
Expand Up @@ -536,29 +536,23 @@ def monitor_work_horse(self, job):
# Job completed and its ttl has expired
break
if job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
with self.connection._pipeline() as pipeline:
self.handle_job_failure(
job=job,
pipeline=pipeline
self.handle_job_failure(
job=job
)

#Unhandled failure: move the job to the failed queue
self.log.warning(
'Moving job to {0!r} queue'.format(
self.failed_queue.name
)
try:
pipeline.execute()
except Exception:
pass

#Unhandled failure: move the job to the failed queue
self.log.warning(
'Moving job to {0!r} queue'.format(
self.failed_queue.name
)
)
self.failed_queue.quarantine(
job,
exc_info=(
"Work-horse proccess "
"was terminated unexpectedly"
)
)
self.failed_queue.quarantine(
job,
exc_info=(
"Work-horse proccess "
"was terminated unexpectedly"
)
)
break
except OSError as e:
# In case we encountered an OSError due to EINTR (which is
Expand Down Expand Up @@ -631,98 +625,107 @@ def prepare_job_execution(self, job):
def handle_job_failure(
self,
job,
started_job_registry=None,
pipeline=None
started_job_registry=None
):
"""Handles the failure or an executing job by:
1. Setting the job status to failed
2. Removing the job from the started_job_registry
3. Setting the workers current job to None
"""

if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin,
self.connection
)
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)

def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.prepare_job_execution(job)
with self.connection._pipeline() as pipeline:
if started_job_registry is None:
started_job_registry = StartedJobRegistry(
job.origin,
self.connection
)
job.set_status(JobStatus.FAILED, pipeline=pipeline)
started_job_registry.remove(job, pipeline=pipeline)
self.set_current_job_id(None, pipeline=pipeline)
try:
pipeline.execute()
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
pass

def handle_job_success(
self,
job,
queue,
started_job_registry
):
with self.connection._pipeline() as pipeline:
while True:
try:
# if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute()
pipeline.watch(job.dependents_key)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)

push_connection(self.connection)
self.set_current_job_id(None, pipeline=pipeline)

started_job_registry = StartedJobRegistry(job.origin, self.connection)
result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)

try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform()
finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
finished_job_registry.add(job, result_ttl, pipeline)

# Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails
job._result = rv
job.cleanup(result_ttl, pipeline=pipeline,
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)

result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl != 0:
job.ended_at = utcnow()
pipeline.execute()
break
except WatchError:
continue

while True:
try:
# if dependencies are inserted after enqueue_dependents
# a WatchError is thrown by execute()
pipeline.watch(job.dependents_key)
# enqueue_dependents calls multi() on the pipeline!
queue.enqueue_dependents(job, pipeline=pipeline)
def perform_job(self, job, queue):
"""Performs the actual work of a job. Will/should only be called
inside the work horse's process.
"""
self.prepare_job_execution(job)

self.set_current_job_id(None, pipeline=pipeline)
push_connection(self.connection)

if result_ttl != 0:
job.set_status(JobStatus.FINISHED, pipeline=pipeline)
job.save(pipeline=pipeline)
started_job_registry = StartedJobRegistry(job.origin, self.connection)

finished_job_registry = FinishedJobRegistry(job.origin,
self.connection)
finished_job_registry.add(job, result_ttl, pipeline)
try:
with self.death_penalty_class(job.timeout or self.queue_class.DEFAULT_TIMEOUT):
rv = job.perform()

job.cleanup(result_ttl, pipeline=pipeline,
remove_from_queue=False)
started_job_registry.remove(job, pipeline=pipeline)
job.ended_at = utcnow()

pipeline.execute()
break
except WatchError:
continue
# Pickle the result in the same try-except block since we need
# to use the same exc handling when pickling fails
job._result = rv

except Exception:
self.handle_job_failure(
job=job,
started_job_registry=started_job_registry,
pipeline=pipeline
)
try:
pipeline.execute()
except Exception:
# Ensure that custom exception handlers are called
# even if Redis is down
pass
self.handle_exception(job, *sys.exc_info())
return False
self.handle_job_success(
job=job,
queue=queue,
started_job_registry=started_job_registry
)
except Exception:
self.handle_job_failure(
job=job,
started_job_registry=started_job_registry
)
self.handle_exception(job, *sys.exc_info())
return False

finally:
pop_connection()
finally:
pop_connection()

self.log.info('{0}: {1} ({2})'.format(green(job.origin), blue('Job OK'), job.id))
if rv is not None:
log_result = "{0!r}".format(as_text(text_type(rv)))
self.log.debug('Result: {0}'.format(yellow(log_result)))

result_ttl = job.get_result_ttl(self.default_result_ttl)
if result_ttl == 0:
self.log.info('Result discarded immediately')
elif result_ttl > 0:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_worker.py
Expand Up @@ -578,7 +578,7 @@ def test_job_dependency_race_condition(self):

def new_enqueue_dependents(self, job, *args, **kwargs):
orig_enqueue_dependents(self, job, *args, **kwargs)
if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue.id == job.id:
if hasattr(Queue, '_add_enqueue') and Queue._add_enqueue is not None and Queue._add_enqueue.id == job.id:
Queue._add_enqueue = None
Queue().enqueue_call(say_hello, depends_on=job)

Expand Down

0 comments on commit a0cee2d

Please sign in to comment.