From c672962ee7e30901ad69dc5e3bf7417d7289d8b5 Mon Sep 17 00:00:00 2001 From: Daniel Alley Date: Sun, 20 Dec 2020 23:48:08 -0500 Subject: [PATCH] Fix another scenario where canceled tasks could be marked failed closes: #7980 https://pulp.plan.io/issues/7980 --- CHANGES/7980.bugfix | 1 + pulpcore/tasking/util.py | 6 +++--- pulpcore/tasking/worker.py | 15 +++++++++------ requirements.txt | 2 +- 4 files changed, 14 insertions(+), 10 deletions(-) create mode 100644 CHANGES/7980.bugfix diff --git a/CHANGES/7980.bugfix b/CHANGES/7980.bugfix new file mode 100644 index 0000000000..d9cdc67c7b --- /dev/null +++ b/CHANGES/7980.bugfix @@ -0,0 +1 @@ +Fixed a scenario where canceled tasks could be marked failed. \ No newline at end of file diff --git a/pulpcore/tasking/util.py b/pulpcore/tasking/util.py index 3337151c35..9e4a46a2e7 100644 --- a/pulpcore/tasking/util.py +++ b/pulpcore/tasking/util.py @@ -51,6 +51,9 @@ def cancel(task_id): task_status.state = TASK_STATES.CANCELED task_status.save() + resource_job.cancel() + job.cancel() + try: send_stop_job_command(redis_conn, job.get_id()) send_stop_job_command(redis_conn, resource_job.get_id()) @@ -61,9 +64,6 @@ def cancel(task_id): # A hack to ensure that we aren't deleting resources still being used by the workhorse time.sleep(0.5) - resource_job.delete() - job.delete() - with transaction.atomic(): for report in task_status.progress_reports.all(): if report.state not in TASK_FINAL_STATES: diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 026bf5747a..d966ecbc2b 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -122,13 +122,16 @@ def handle_job_failure(self, job, **kwargs): try: task = Task.objects.get(pk=job.get_id()) task.release_resources() - except Task.DoesNotExist: - pass - else: exc_type, exc, tb = sys.exc_info() - task.set_failed(exc, tb) - - return super().handle_job_failure(job, **kwargs) + res = super().handle_job_failure(job, **kwargs) + # job "is_stopped" state determined during super().handle_job_failure() + if not job.is_stopped: + # stopped jobs go onto the failed queue in RQ, so we need to ignore those + # to avoid overwriting the task status + task.set_failed(exc, tb) + return res + except Task.DoesNotExist: + return super().handle_job_failure(job, **kwargs) def handle_job_success(self, job, queue, started_job_registry): """ diff --git a/requirements.txt b/requirements.txt index 550eb30c0d..bdb7e9e8e9 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,6 @@ psycopg2>=2.7,<2.9 PyYAML>=5.1.1,<5.5.0 python-gnupg~=0.4.7 redis>=3.4.0 -rq>=1.7,<1.8 +rq~=1.8.0 setuptools>=39.2.0 whitenoise>=5.0.0,<5.3.0