Skip to content

Commit

Permalink
Replace custom task cancellation with RQ 1.7 functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
dralley committed Dec 1, 2020
1 parent af7787b commit 5f7dc90
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGES/7902.misc
@@ -0,0 +1 @@
Replaced some custom code for task cancellation with RQ native functionality.
2 changes: 0 additions & 2 deletions pulpcore/tasking/constants.py
Expand Up @@ -7,6 +7,4 @@
WORKER_TTL=30,
# The amount of time (in seconds) between checks
JOB_MONITORING_INTERVAL=5,
# The Redis key used to force-kill a job
KILL_KEY="rq:jobs:kill",
)
18 changes: 10 additions & 8 deletions pulpcore/tasking/util.py
Expand Up @@ -4,6 +4,8 @@

from django.db import transaction
from django.urls import reverse
from rq.command import send_stop_job_command
from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.job import Job, get_current_job
from rq.worker import Worker

Expand All @@ -12,7 +14,6 @@
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES
from pulpcore.exceptions import MissingResource
from pulpcore.tasking import connection
from pulpcore.tasking.constants import TASKING_CONSTANTS

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,18 +51,19 @@ def cancel(task_id):
task_status.state = TASK_STATES.CANCELED
task_status.save()

if job.is_started:
redis_conn.sadd(TASKING_CONSTANTS.KILL_KEY, job.get_id())
try:
send_stop_job_command(redis_conn, job.get_id())
send_stop_job_command(redis_conn, resource_job.get_id())
except (InvalidJobOperation, NoSuchJobError):
# We don't care if the job isn't currently running when we try to cancel
pass

if resource_job.is_started:
redis_conn.sadd(TASKING_CONSTANTS.KILL_KEY, resource_job.get_id())
# A hack to ensure that we aren't deleting resources still being used by the workhorse
time.sleep(0.5)

resource_job.delete()
job.delete()

# A hack to ensure that we aren't deleting resources still being used by the workhorse
time.sleep(1.5)

with transaction.atomic():
for report in task_status.progress_reports.all():
if report.state not in TASK_FINAL_STATES:
Expand Down
22 changes: 0 additions & 22 deletions pulpcore/tasking/worker.py
@@ -1,12 +1,7 @@
import logging
import os
import signal
import socket
import sys
import threading
import time

from gettext import gettext as _

from rq import Queue
from rq.worker import Worker
Expand Down Expand Up @@ -104,23 +99,6 @@ def perform_job(self, job, queue):
user = get_users_with_perms(task).first()
_set_current_user(user)

def check_kill(conn, job_id, interval=1):
while True:
res = conn.srem(TASKING_CONSTANTS.KILL_KEY, job_id)
if res > 0:
_logger.info(
_(
"Received notice of task cancellation, killing workhorse."
" (PID {})".format(os.getpid())
)
)

os.kill(os.getpid(), signal.SIGKILL)
time.sleep(interval)

t = threading.Thread(target=check_kill, args=(self.connection, job.get_id()))
t.start()

return super().perform_job(job, queue)

def handle_job_failure(self, job, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -21,6 +21,6 @@ psycopg2>=2.7,<2.9
PyYAML>=5.1.1,<5.4.0
python-gnupg~=0.4.6
redis>=3.4.0
rq>=1.1,<1.7
rq>=1.7<1.8
setuptools>=39.2.0
whitenoise>=5.0.0,<5.3.0

0 comments on commit 5f7dc90

Please sign in to comment.