Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Support the locking of multiple resources
Browse files Browse the repository at this point in the history
  • Loading branch information
David Davis authored and daviddavis committed Feb 19, 2018
1 parent b4a0041 commit ff0739e
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 48 deletions.
56 changes: 52 additions & 4 deletions pulpcore/pulpcore/app/models/task.py
Expand Up @@ -7,7 +7,7 @@

import celery

from django.db import models
from django.db import models, transaction
from django.utils import timezone

from pulpcore.app.models import Model, GenericRelationModel
Expand All @@ -26,16 +26,16 @@ class ReservedResource(Model):
Fields:
resource (models.TextField): The name of the resource reserved for the task.
resource (models.TextField): The url of the resource reserved for the task.
Relations:
task (models.ForeignKey): The task associated with this reservation
worker (models.ForeignKey): The worker associated with this reservation
"""
resource = models.TextField()
resource = models.TextField(unique=True, blank=False)

task = models.OneToOneField("Task")
tasks = models.ManyToManyField("Task", related_name="reserved_resources")
worker = models.ForeignKey("Worker", on_delete=models.CASCADE, related_name="reservations")


Expand Down Expand Up @@ -83,6 +83,25 @@ def online_workers(self):
last_heartbeat__gte=age_threshold,
online=True)

def with_reservations(self, resources):
"""
Returns a worker with ANY of the reservations for resources specified by resource urls. This
is useful when looking for a worker to queue work against as we don't care if it doesn't
have all the reservations as we can still try creating reservations for the additional
resources we need.
Arguments:
resources (list): a list of resource urls
Returns:
:class:`pulpcore.app.models.Worker`: A worker with locks on resources
Raises:
Worker.DoesNotExist: If no worker has all resources locked
Worker.MultipleObjectsReturned: More than one worker holds reservations
"""
return self.filter(reservations__resource__in=resources).distinct().get()


class Worker(Model):
"""
Expand Down Expand Up @@ -115,6 +134,25 @@ def save_heartbeat(self):
"""
self.save(update_fields=['last_heartbeat'])

def lock_resources(self, task, resource_urls):
"""
Attempt to lock all resources by their urls. Must be atomic to prevent deadlocks.
Arguments:
task (pulpcore.app.models.Task): task to lock the resource for
resource_urls (List): a list of resource urls to be locked
Raises:
django.db.IntegrityError: If the reservation already exists
"""
with transaction.atomic():
for resource in resource_urls:
if self.reservations.filter(resource=resource).exists():
reservation = self.reservations.get(resource=resource)
else:
reservation = ReservedResource.objects.create(worker=self, resource=resource)
reservation.tasks.add(task)


class TaskLock(Model):
"""
Expand Down Expand Up @@ -252,6 +290,16 @@ def set_failed(self, exc, einfo):
self.error = exception_to_dict(exc, einfo.traceback)
self.save()

def release_resources(self):
"""
Release the reserved resources that are reserved by this task. If a reserved resource no
longer has any tasks reserving it, delete it.
"""
for reservation in self.reserved_resources.all():
reservation.tasks.remove(self.id)
if not reservation.tasks.exists():
reservation.delete()


class TaskTag(Model):
"""
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/pulpcore/app/serializers/__init__.py
Expand Up @@ -3,7 +3,7 @@
# - all can import directly from base and fields if needed
from pulpcore.app.serializers.base import (DetailRelatedField, GenericKeyValueRelatedField, # noqa
ModelSerializer, MasterModelSerializer, DetailIdentityField, DetailRelatedField,
viewset_for_model)
view_name_for_model, viewset_for_model)
from pulpcore.app.serializers.fields import (BaseURLField, ContentRelatedField, FileField, # noqa
LatestVersionField)
from pulpcore.app.serializers.content import ContentSerializer, ArtifactSerializer # noqa
Expand Down
5 changes: 2 additions & 3 deletions pulpcore/pulpcore/app/viewsets/base.py
Expand Up @@ -222,8 +222,7 @@ def update(self, request, pk, **kwargs):
serializer.is_valid(raise_exception=True)
app_label = instance._meta.app_label
async_result = tasks.base.general_update.apply_async_with_reservation(
instance._meta.db_table, pk,
args=(pk, app_label, serializer.__class__.__name__),
[instance], args=(pk, app_label, serializer.__class__.__name__),
kwargs={'data': request.data, 'partial': partial}
)
return OperationPostponedResponse([async_result], request)
Expand All @@ -245,7 +244,7 @@ def destroy(self, request, pk, **kwargs):
serializer = self.get_serializer(instance)
app_label = instance._meta.app_label
async_result = tasks.base.general_delete.apply_async_with_reservation(
instance._meta.db_table, pk,
[instance],
args=(pk, app_label, serializer.__class__.__name__)
)
return OperationPostponedResponse([async_result], request)
Expand Down
11 changes: 5 additions & 6 deletions pulpcore/pulpcore/app/viewsets/repository.py
Expand Up @@ -34,7 +34,6 @@
CreateReadAsyncUpdateDestroyNamedModelViewset,
)
from pulpcore.app.viewsets.custom_filters import CharInFilter
from pulpcore.common import tags


class RepositoryFilter(filterset.FilterSet):
Expand All @@ -61,7 +60,7 @@ def update(self, request, pk, partial=False):
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
async_result = tasks.repository.update.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, str(instance.id),
[instance],
args=(instance.id, ),
kwargs={'data': request.data,
'partial': partial}
Expand All @@ -74,7 +73,7 @@ def destroy(self, request, pk):
"""
repo = self.get_object()
async_result = tasks.repository.delete.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, str(repo.id), kwargs={'repo_id': repo.id})
[repo], kwargs={'repo_id': repo.id})
return OperationPostponedResponse([async_result], request)


Expand Down Expand Up @@ -253,8 +252,7 @@ def destroy(self, request, repository_pk, number):
"""
version = self.get_object()
async_result = tasks.repository.delete_version.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repository_pk,
kwargs={'pk': version.pk}
[version.repository], kwargs={'pk': version.pk}
)
return OperationPostponedResponse([async_result], request)

Expand All @@ -264,6 +262,7 @@ def create(self, request, repository_pk):
"""
add_content_units = []
remove_content_units = []
version = self.get_object()

if 'add_content_units' in request.data:
for url in request.data['add_content_units'].split(','):
Expand All @@ -276,7 +275,7 @@ def create(self, request, repository_pk):
remove_content_units.append(content.pk)

result = tasks.repository.add_and_remove.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repository_pk,
[version.repository],
kwargs={
'repository_pk': repository_pk,
'add_content_units': add_content_units,
Expand Down
89 changes: 55 additions & 34 deletions pulpcore/pulpcore/tasking/tasks.py
Expand Up @@ -14,7 +14,7 @@
from django.db import IntegrityError, transaction

from pulpcore.app.models import Task as TaskStatus
from pulpcore.app.models import ReservedResource, Worker
from pulpcore.app.models import Worker
from pulpcore.common import TASK_STATES
from pulpcore.exceptions import PulpException
from pulpcore.tasking import util
Expand All @@ -41,8 +41,36 @@ class PulpTask(CeleryTask):
pass


def _acquire_worker(resources):
"""
Attempts to acquire a worker for a set of resource urls. If no worker has any of those resources
reserved, then the first available worker is returned
Arguments:
resources (list): a list of resource urls
Returns:
:class:`pulpcore.app.models.Worker`: A worker to queue work for
Raises:
Worker.DoesNotExist: If no worker is found
"""
# Find a worker who already has this reservation, it is safe to send this work to them
try:
worker = Worker.objects.with_reservations(resources)
except Worker.MultipleObjectsReturned:
raise Worker.DoesNotExist
except Worker.DoesNotExist:
pass
else:
return worker

# Otherwise, return any available worker
return Worker.objects.get_unreserved_worker()


@task(base=PulpTask, acks_late=True)
def _queue_reserved_task(name, inner_task_id, resource_id, inner_args, inner_kwargs, options):
def _queue_reserved_task(name, inner_task_id, resources, inner_args, inner_kwargs, options):
"""
A task that encapsulates another task to be dispatched later. This task being encapsulated is
called the "inner" task, and a task name, UUID, and accepts a list of positional args
Expand All @@ -59,7 +87,7 @@ def _queue_reserved_task(name, inner_task_id, resource_id, inner_args, inner_kwa
inner_task_id (basestring): The UUID to be set on the task being called. By providing
the UUID, the caller can have an asynchronous reference to the inner task
that will be dispatched.
resource_id (basestring): The name of the resource you wish to reserve for your task.
resources (basestring): The urls of the resource you wish to reserve for your task.
The system will ensure that no other tasks that want that same reservation will run
concurrently with yours.
inner_args (tuple): The positional arguments to pass on to the task.
Expand All @@ -69,47 +97,43 @@ def _queue_reserved_task(name, inner_task_id, resource_id, inner_args, inner_kwa
Returns:
An AsyncResult instance as returned by Celery's apply_async
"""
task_status = TaskStatus.objects.get(pk=inner_task_id)
while True:
# Find a worker who already has this reservation, it is safe to send this work to them
try:
worker = ReservedResource.objects.filter(resource=resource_id)[0].worker
except IndexError:
pass
else:
break
worker = _acquire_worker(resources)
except Worker.DoesNotExist:
# no worker is ready so we need to wait
time.sleep(0.25)
continue

try:
worker = Worker.objects.get_unreserved_worker()
except Worker.DoesNotExist:
pass
worker.lock_resources(task_status, resources)
except IntegrityError:
# we have a worker but we can't create the reservations so wait
time.sleep(0.25)
else:
# we have a worker with the locks
break

# No worker is ready for this work, so we need to wait
time.sleep(0.25)

task_status = TaskStatus.objects.get(pk=inner_task_id)
task_status.worker = worker
task_status.save()
ReservedResource.objects.create(task=task_status, worker=worker, resource=resource_id)

try:
celery.tasks[name].apply_async(args=inner_args, task_id=inner_task_id,
routing_key=worker.name, exchange=DEDICATED_QUEUE_EXCHANGE,
kwargs=inner_kwargs, **options)
finally:
_release_resource.apply_async(args=(inner_task_id, ), routing_key=worker.name,
exchange=DEDICATED_QUEUE_EXCHANGE)
_release_resources.apply_async(args=(inner_task_id, ), routing_key=worker.name,
exchange=DEDICATED_QUEUE_EXCHANGE)


@task(base=PulpTask)
def _release_resource(task_id):
def _release_resources(task_id):
"""
Do not queue this task yourself. It will be used automatically when your task is dispatched by
the _queue_reserved_task task.
When a resource-reserving task is complete, this method releases the resource by removing the
ReservedResource object by UUID.
When a resource-reserving task is complete, this method releases the task's resource(s)
Args:
task_id (basestring): The UUID of the task that requested the reservation
Expand All @@ -130,7 +154,7 @@ class MyEinfo(object):

new_task.on_failure(runtime_exception, task_id, (), {}, MyEinfo)

ReservedResource.objects.filter(task__pk=task_id).delete()
TaskStatus.objects.get(pk=task_id).release_resources()


class UserFacingTask(PulpTask):
Expand All @@ -152,12 +176,12 @@ class UserFacingTask(PulpTask):
# this tells celery to not automatically log tracebacks for these exceptions
throws = (PulpException,)

def apply_async_with_reservation(self, resource_type, resource_id, tags=[], group_id=None,
args=None, kwargs=None, **options):
def apply_async_with_reservation(self, resources, tags=[], group_id=None, args=None,
kwargs=None, **options):
"""
This method provides normal apply_async functionality, while also serializing tasks by
resource name. No two tasks that claim the same resource name can execute concurrently. It
accepts resource_type and resource_id and combines them to form a reservation key.
resource urls. No two tasks that claim the same resource can execute concurrently. It
accepts resources which it transforms into a list of urls (one for each resource).
This does not dispatch the task directly, but instead promises to dispatch it later by
encapsulating the desired task through a call to a :func:`_queue_reserved_task` task. See
Expand All @@ -168,9 +192,8 @@ def apply_async_with_reservation(self, resource_type, resource_id, tags=[], grou
before it returns.
Args:
resource_type (basestring): A string that identifies type of a resource
resource_id (basestring): A string that identifies some named resource, guaranteeing
that only one task reserving this same string can happen at a time.
resources (list): A list of resources to reserve guaranteeing that only one task
reserves these resources
tags (list): A list of tags (strings) to place onto the task, used for searching
for tasks by tag. This is an optional argument which is pulled out of kwargs.
group_id (uuid.UUID): The id to identify which group of tasks a task belongs to.
Expand All @@ -182,9 +205,7 @@ def apply_async_with_reservation(self, resource_type, resource_id, tags=[], grou
Returns (celery.result.AsyncResult):
An AsyncResult instance as returned by Celery's apply_async
"""
# Form a resource_id for reservation by combining given resource type and id. This way,
# two different resources having the same id will not block each other.
resource_id = ":".join((resource_type, resource_id))
resources = {util.get_url(resource) for resource in resources}
inner_task_id = str(uuid.uuid4())
task_name = self.name

Expand All @@ -199,7 +220,7 @@ def apply_async_with_reservation(self, resource_type, resource_id, tags=[], grou
task_status.tags.create(name=tag)

# Call the outer task which is a promise to call the real task when it can.
_queue_reserved_task.apply_async(args=(task_name, inner_task_id, resource_id, args,
_queue_reserved_task.apply_async(args=(task_name, inner_task_id, list(resources), args,
kwargs, options),
queue=RESOURCE_MANAGER_QUEUE)
return AsyncResult(inner_task_id)
Expand Down
16 changes: 16 additions & 0 deletions pulpcore/pulpcore/tasking/util.py
Expand Up @@ -6,8 +6,10 @@
from celery.app import control

from django.db import transaction
from django.urls import reverse

from pulpcore.app.models import Task
from pulpcore.app.serializers import view_name_for_model
from pulpcore.common import TASK_FINAL_STATES
from pulpcore.exceptions import MissingResource

Expand Down Expand Up @@ -87,3 +89,17 @@ def get_current_task_id():
"""
with suppress(AttributeError):
return current_task.request.id


def get_url(model):
"""
Get a resource url for the specified model object. This returns the path component of the
resource URI. This is used in our resource locking/reservation code to identify resources.
Args:
model (django.models.Model): A model object.
Returns:
str: The path component of the resource url
"""
return reverse(view_name_for_model(model, 'detail'), args=[model.pk])

0 comments on commit ff0739e

Please sign in to comment.