Skip to content

Commit

Permalink
Fix race condition in handling of reserved resources
Browse files Browse the repository at this point in the history
  • Loading branch information
dralley committed Apr 8, 2021
1 parent 6689f70 commit 69bc192
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 36 deletions.
1 change: 1 addition & 0 deletions CHANGES/8352.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed a race condition that sometimes surfaced during handling of reserved resources.
50 changes: 29 additions & 21 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@

def _acquire_worker(resources):
"""
Attempts to acquire a worker for a set of resource urls. If no worker has any of those resources
reserved, then the first available worker is returned
Attempts to acquire a worker for a set of resource urls. If no worker has any of those
resources reserved, then the first available worker is returned.
Must be done in a transaction, and locks the worker in question until the end of the
transaction.
Arguments:
resources (list): a list of resource urls
Expand All @@ -45,25 +48,33 @@ def _acquire_worker(resources):
"""
# Find a worker who already has one of the reservations, it is safe to send this work to them
try:
worker = Worker.objects.filter(reservations__resource__in=resources).distinct().get()
return (
Worker.objects.select_for_update()
.filter(pk__in=Worker.objects.filter(reservations__resource__in=resources))
.get()
)
except Worker.MultipleObjectsReturned:
raise Worker.DoesNotExist
except Worker.DoesNotExist:
pass
else:
return worker

# Otherwise, return any available worker at random
workers_qs = Worker.objects.online_workers().exclude(
name=constants.TASKING_CONSTANTS.RESOURCE_MANAGER_WORKER_NAME
)
workers_qs_with_counts = workers_qs.annotate(models.Count("reservations"))
try:
# A randomly-selected Worker instance that has zero ReservedResource entries associated with it.
return workers_qs_with_counts.filter(reservations__count=0).order_by("?")[0]
except IndexError:
workers_without_res = workers_qs.annotate(models.Count("reservations")).filter(
reservations__count=0
)
# A randomly-selected Worker instance that has zero ReservedResource entries.
worker = (
Worker.objects.select_for_update().filter(pk__in=workers_without_res).order_by("?").first()
)

if worker is None:
# If all Workers have at least one ReservedResource entry.
raise Worker.model.DoesNotExist()
raise Worker.DoesNotExist()

return worker


def _queue_reserved_task(func, inner_task_id, resources, inner_args, inner_kwargs, options):
Expand Down Expand Up @@ -123,15 +134,11 @@ def _queue_reserved_task(func, inner_task_id, resources, inner_args, inner_kwarg
return

try:
worker = _acquire_worker(resources)
except Worker.DoesNotExist:
# no worker is ready so we need to wait
time.sleep(0.25)
continue

try:
# Attempt to lock all resources by their urls. Must be atomic to prevent deadlocks.
with transaction.atomic():
# lock the worker - there is a similar lock in mark_worker_offline()
worker = _acquire_worker(resources)

# Attempt to lock all resources by their urls. Must be atomic to prevent deadlocks.
for resource in resources:
if worker.reservations.filter(resource=resource).exists():
reservation = worker.reservations.get(resource=resource)
Expand All @@ -140,9 +147,10 @@ def _queue_reserved_task(func, inner_task_id, resources, inner_args, inner_kwarg
worker=worker, resource=resource
)
TaskReservedResource.objects.create(resource=reservation, task=task_status)
except IntegrityError:
# we have a worker but we can't create the reservations so wait
except (Worker.DoesNotExist, IntegrityError):
# if worker is ready, or we have a worker but we can't create the reservations, wait
time.sleep(0.25)
continue
else:
# we have a worker with the locks
break
Expand Down
34 changes: 19 additions & 15 deletions pulpcore/tasking/worker_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from rq.exceptions import NoSuchJobError
from rq.job import Job

from django.db import transaction


_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -176,20 +178,22 @@ def mark_worker_offline(worker_name, normal_shutdown=False):
_logger.info(_("Cleaning up shutdown worker '{name}'.".format(name=worker_name)))

try:
worker = Worker.objects.get(name=worker_name, gracefully_stopped=False, cleaned_up=False)
with transaction.atomic():
worker = Worker.objects.select_for_update().get(
name=worker_name, gracefully_stopped=False, cleaned_up=False
)
# Cancel all of the tasks that were assigned to this worker's queue
for task in worker.tasks.filter(state__in=TASK_INCOMPLETE_STATES):
cancel(task.pk)

# Ensure all locks are released for those tasks that are in final states also
for task in worker.tasks.exclude(state__in=TASK_INCOMPLETE_STATES):
task.release_resources()

if normal_shutdown:
worker.gracefully_stopped = True

worker.cleaned_up = True
worker.save()
except Worker.DoesNotExist:
pass
else:
# Cancel all of the tasks that were assigned to this worker's queue
for task in worker.tasks.filter(state__in=TASK_INCOMPLETE_STATES):
cancel(task.pk)

# Ensure all locks are released for those tasks that are in final states also
for task in worker.tasks.exclude(state__in=TASK_INCOMPLETE_STATES):
task.release_resources()

if normal_shutdown:
worker.gracefully_stopped = True

worker.cleaned_up = True
worker.save()

0 comments on commit 69bc192

Please sign in to comment.