Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace custom task cancellation with RQ 1.7 functionality #1040

Merged
merged 1 commit into from Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/7902.misc
@@ -0,0 +1 @@
Replaced some custom code for task cancellation with RQ native functionality.
2 changes: 0 additions & 2 deletions pulpcore/tasking/constants.py
Expand Up @@ -7,6 +7,4 @@
WORKER_TTL=30,
# The amount of time (in seconds) between checks
JOB_MONITORING_INTERVAL=5,
# The Redis key used to force-kill a job
KILL_KEY="rq:jobs:kill",
)
18 changes: 10 additions & 8 deletions pulpcore/tasking/util.py
Expand Up @@ -4,6 +4,8 @@

from django.db import transaction
from django.urls import reverse
from rq.command import send_stop_job_command
from rq.exceptions import InvalidJobOperation, NoSuchJobError
from rq.job import Job, get_current_job
from rq.worker import Worker

Expand All @@ -12,7 +14,6 @@
from pulpcore.constants import TASK_FINAL_STATES, TASK_STATES
from pulpcore.exceptions import MissingResource
from pulpcore.tasking import connection
from pulpcore.tasking.constants import TASKING_CONSTANTS

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -50,18 +51,19 @@ def cancel(task_id):
task_status.state = TASK_STATES.CANCELED
task_status.save()

if job.is_started:
redis_conn.sadd(TASKING_CONSTANTS.KILL_KEY, job.get_id())
try:
send_stop_job_command(redis_conn, job.get_id())
send_stop_job_command(redis_conn, resource_job.get_id())
except (InvalidJobOperation, NoSuchJobError):
# We don't care if the job isn't currently running when we try to cancel
pass

if resource_job.is_started:
redis_conn.sadd(TASKING_CONSTANTS.KILL_KEY, resource_job.get_id())
# A hack to ensure that we aren't deleting resources still being used by the workhorse
time.sleep(0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


resource_job.delete()
job.delete()

# A hack to ensure that we aren't deleting resources still being used by the workhorse
time.sleep(1.5)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should keep this but reduce it to ~0.1 seconds?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I'm not entirely sure what to do. I'm trying to think through it; so the resources delete while the task is in use, and it receives a database write error in the task at some point?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe that's ok?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's an inherent delay in between when the cancellation is triggered and when it actually happens. Before that delay was due to polling, now it's message passing so it should be faster but technically still present.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The main thing is that we should avoid cancellations being marked as failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bmbouter I'm thinking I should just err towards caution and add it back for now.


with transaction.atomic():
for report in task_status.progress_reports.all():
if report.state not in TASK_FINAL_STATES:
Expand Down
22 changes: 0 additions & 22 deletions pulpcore/tasking/worker.py
@@ -1,12 +1,7 @@
import logging
import os
import signal
import socket
import sys
import threading
import time

from gettext import gettext as _

from rq import Queue
from rq.worker import Worker
Expand Down Expand Up @@ -104,23 +99,6 @@ def perform_job(self, job, queue):
user = get_users_with_perms(task).first()
_set_current_user(user)

def check_kill(conn, job_id, interval=1):
while True:
res = conn.srem(TASKING_CONSTANTS.KILL_KEY, job_id)
if res > 0:
_logger.info(
_(
"Received notice of task cancellation, killing workhorse."
" (PID {})".format(os.getpid())
)
)

os.kill(os.getpid(), signal.SIGKILL)
time.sleep(interval)

t = threading.Thread(target=check_kill, args=(self.connection, job.get_id()))
t.start()

return super().perform_job(job, queue)

def handle_job_failure(self, job, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Expand Up @@ -21,6 +21,6 @@ psycopg2>=2.7,<2.9
PyYAML>=5.1.1,<5.4.0
python-gnupg~=0.4.6
redis>=3.4.0
rq>=1.1,<1.7
rq>=1.7<1.8
setuptools>=39.2.0
whitenoise>=5.0.0,<5.3.0