Skip to content

Commit

Permalink
Adds new dispatch call and deprecate the old one
Browse files Browse the repository at this point in the history
This adds a new `pulpcore.plugin.tasking.dispatch` interface which will
replace the `pulpcore.plugin.tasking.enqueue_with_reservation`
interface. This also deprecates the
`pulpcore.plugin.tasking.enqueue_with_reservation` and causes it to
emit warnings if used.

Additionally the `pulpcore.plugin.viewsets.OperationPostponedResponse`
has been ported to support both the `dispatch` and
`enqueue_with_reservation` interfaces.

closes #8496
  • Loading branch information
bmbouter committed Apr 1, 2021
1 parent d55f1c2 commit a502591
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 42 deletions.
2 changes: 2 additions & 0 deletions CHANGES/plugin_api/8496.deprecation
@@ -0,0 +1,2 @@
Deprecated the ``pulpcore.plugin.tasking.enqueue_with_reservation``. Instead use the
``pulpcore.plugin.tasking.dispatch`` interface.
7 changes: 7 additions & 0 deletions CHANGES/plugin_api/8496.feature
@@ -0,0 +1,7 @@
Adds the ``pulpcore.plugin.tasking.dispatch`` interface which replcaes the
``pulpcore.plugin.tasking.enqueue_with_reservation`` interface. It is the same except:
* It returns a ``pulpcore.plugin.models.Task`` instead of an RQ object
* It does not support the ``options`` keyword argument

Additionally the ``pulpcore.plugin.viewsets.OperationPostponedResponse`` was updated to support both
the ``dispatch`` and ``enqueue_with_reservation`` interfaces.
4 changes: 2 additions & 2 deletions docs/plugins/plugin-writer/concepts/subclassing/viewsets.rst
Expand Up @@ -62,7 +62,7 @@ Kick off Tasks
Some endpoints may need to deploy tasks to the tasking system. The following is an example of how
this is accomplished.

See :class:`~pulpcore.plugin.tasking.enqueue_with_reservation` for more details.
See :class:`~pulpcore.plugin.tasking.dispatch` for more details.

.. code-block:: python
Expand All @@ -82,7 +82,7 @@ See :class:`~pulpcore.plugin.tasking.enqueue_with_reservation` for more details.
mirror = serializer.validated_data.get('mirror', False)
# This is how tasks are kicked off.
result = enqueue_with_reservation(
result = dispatch(
tasks.synchronize,
[repository, remote],
kwargs={
Expand Down
12 changes: 9 additions & 3 deletions pulpcore/app/response.py
Expand Up @@ -15,11 +15,17 @@ class OperationPostponedResponse(Response):
}
"""

def __init__(self, result, request):
def __init__(self, task, request):
"""
Args:
result (rq.job.Job): A :class:`rq.job.Job` object used to generate the response.
task (pulpcore.plugin.models.Task or rq.job.Job): A
:class:`~pulpcore.plugin.models.Task` or :class:`rq.job.Job` object used to generate
the response.
request (rest_framework.request.Request): Request used to generate the pulp_href urls
"""
resp = {"task": reverse("tasks-detail", args=[result.id], request=None)}
try:
pk = task.id
except AttributeError:
pk = task.pk
resp = {"task": reverse("tasks-detail", args=[pk], request=None)}
super().__init__(data=resp, status=202)
4 changes: 2 additions & 2 deletions pulpcore/app/tasks/importer.py
Expand Up @@ -34,7 +34,7 @@
ContentArtifactResource,
)
from pulpcore.constants import TASK_STATES
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch

log = getLogger(__name__)

Expand Down Expand Up @@ -403,7 +403,7 @@ def validate_and_assemble(toc_filename):
)
continue

enqueue_with_reservation(
dispatch(
import_repository_version,
[dest_repo],
args=[importer.pk, dest_repo.pk, src_repo["name"], path],
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/app/views/orphans.py
Expand Up @@ -4,7 +4,7 @@
from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import AsyncOperationResponseSerializer
from pulpcore.app.tasks import orphan_cleanup
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch


class OrphansView(APIView):
Expand All @@ -18,6 +18,6 @@ def delete(self, request, format=None):
"""
Cleans up all the Content and Artifact orphans in the system
"""
async_result = enqueue_with_reservation(orphan_cleanup, [])
task = dispatch(orphan_cleanup, [])

return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)
6 changes: 3 additions & 3 deletions pulpcore/app/views/repair.py
Expand Up @@ -4,7 +4,7 @@
from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import AsyncOperationResponseSerializer, RepairSerializer
from pulpcore.app.tasks import repair_all_artifacts
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch


class RepairView(APIView):
Expand All @@ -25,6 +25,6 @@ def post(self, request):

verify_checksums = serializer.validated_data["verify_checksums"]

async_result = enqueue_with_reservation(repair_all_artifacts, [], args=[verify_checksums])
task = dispatch(repair_all_artifacts, [], args=[verify_checksums])

return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)
14 changes: 7 additions & 7 deletions pulpcore/app/viewsets/base.py
Expand Up @@ -18,7 +18,7 @@
from pulpcore.app.models import MasterModel
from pulpcore.app.response import OperationPostponedResponse
from pulpcore.app.serializers import AsyncOperationResponseSerializer
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch

# These should be used to prevent duplication and keep things consistent
NAME_FILTER_OPTIONS = ["exact", "in", "icontains", "contains", "startswith"]
Expand Down Expand Up @@ -424,13 +424,13 @@ def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
app_label = self.queryset.model._meta.app_label
async_result = enqueue_with_reservation(
task = dispatch(
tasks.base.general_create,
self.async_reserved_resources(None),
args=(app_label, serializer.__class__.__name__),
kwargs={"data": request.data},
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)


class AsyncUpdateMixin(AsyncReservedObjectMixin):
Expand All @@ -448,13 +448,13 @@ def update(self, request, pk, **kwargs):
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
app_label = instance._meta.app_label
async_result = enqueue_with_reservation(
task = dispatch(
tasks.base.general_update,
self.async_reserved_resources(instance),
args=(pk, app_label, serializer.__class__.__name__),
kwargs={"data": request.data, "partial": partial},
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)

@extend_schema(
description="Trigger an asynchronous partial update task",
Expand All @@ -481,12 +481,12 @@ def destroy(self, request, pk, **kwargs):
instance = self.get_object()
serializer = self.get_serializer(instance)
app_label = instance._meta.app_label
async_result = enqueue_with_reservation(
task = dispatch(
tasks.base.general_delete,
self.async_reserved_resources(instance),
args=(pk, app_label, serializer.__class__.__name__),
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)


class BaseFilterSet(filterset.FilterSet):
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/app/viewsets/exporter.py
Expand Up @@ -27,7 +27,7 @@
NamedModelViewSet,
)
from pulpcore.app.viewsets.base import NAME_FILTER_OPTIONS
from pulpcore.plugin.tasking import enqueue_with_reservation
from pulpcore.plugin.tasking import dispatch
from pulpcore.app.response import OperationPostponedResponse


Expand Down Expand Up @@ -132,6 +132,6 @@ def create(self, request, exporter_pk):
export.validated_start_versions = serializer.validated_data.get("start_versions", None)
export.validated_chunk_size = serializer.validated_data.get("chunk_size", None)

result = enqueue_with_reservation(pulp_export, [exporter], kwargs={"the_export": export})
task = dispatch(pulp_export, [exporter], kwargs={"the_export": export})

return OperationPostponedResponse(result, request)
return OperationPostponedResponse(task, request)
6 changes: 3 additions & 3 deletions pulpcore/app/viewsets/importer.py
Expand Up @@ -23,7 +23,7 @@
NamedModelViewSet,
)
from pulpcore.app.viewsets.base import NAME_FILTER_OPTIONS
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch


class ImporterFilter(BaseFilterSet):
Expand Down Expand Up @@ -104,7 +104,7 @@ def create(self, request, importer_pk):
serializer.is_valid(raise_exception=True)
path = serializer.validated_data.get("path")
toc = serializer.validated_data.get("toc")
result = enqueue_with_reservation(
task = dispatch(
pulp_import, [importer], kwargs={"importer_pk": importer.pk, "path": path, "toc": toc}
)
return OperationPostponedResponse(result, request)
return OperationPostponedResponse(task, request)
10 changes: 5 additions & 5 deletions pulpcore/app/viewsets/repository.py
Expand Up @@ -32,7 +32,7 @@
)
from pulpcore.app.viewsets.base import DATETIME_FILTER_OPTIONS, NAME_FILTER_OPTIONS
from pulpcore.app.viewsets.custom_filters import IsoDateTimeFilter, LabelSelectFilter
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch


class RepositoryFilter(BaseFilterSet):
Expand Down Expand Up @@ -222,10 +222,10 @@ def destroy(self, request, repository_pk, number):
if version.number == 0:
raise serializers.ValidationError(detail=_("Cannot delete repository version 0."))

async_result = enqueue_with_reservation(
task = dispatch(
tasks.repository.delete_version, [version.repository], kwargs={"pk": version.pk}
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)

@extend_schema(
description="Trigger an asynchronous task to repair a repository version.",
Expand All @@ -242,12 +242,12 @@ def repair(self, request, repository_pk, number):

verify_checksums = serializer.validated_data["verify_checksums"]

async_result = enqueue_with_reservation(
task = dispatch(
tasks.repository.repair_version,
[version.repository],
args=[version.pk, verify_checksums],
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)


class RemoteFilter(BaseFilterSet):
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/app/viewsets/upload.py
Expand Up @@ -15,7 +15,7 @@
UploadDetailSerializer,
)
from pulpcore.app.viewsets.base import NamedModelViewSet
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch


class UploadViewSet(
Expand Down Expand Up @@ -95,7 +95,7 @@ def commit(self, request, pk):
sha256 = serializer.validated_data["sha256"]

upload = self.get_object()
async_result = enqueue_with_reservation(
task = dispatch(
tasks.upload.commit, [upload], args=(upload.pk, sha256)
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)
6 changes: 3 additions & 3 deletions pulpcore/plugin/actions.py
Expand Up @@ -8,7 +8,7 @@
AsyncOperationResponseSerializer,
RepositoryAddRemoveContentSerializer,
)
from pulpcore.tasking.tasks import enqueue_with_reservation
from pulpcore.tasking.tasks import dispatch


__all__ = ["ModifyRepositoryActionMixin"]
Expand Down Expand Up @@ -50,7 +50,7 @@ def modify(self, request, pk):
content = self.get_resource(url, Content)
remove_content_units.append(content.pk)

result = enqueue_with_reservation(
task = dispatch(
tasks.repository.add_and_remove,
[repository],
kwargs={
Expand All @@ -60,4 +60,4 @@ def modify(self, request, pk):
"remove_content_units": remove_content_units,
},
)
return OperationPostponedResponse(result, request)
return OperationPostponedResponse(task, request)
1 change: 1 addition & 0 deletions pulpcore/plugin/tasking.py
@@ -1,5 +1,6 @@
# Support plugins dispatching tasks
from pulpcore.tasking.tasks import enqueue_with_reservation # noqa
from pulpcore.tasking.tasks import dispatch # noqa

# Support plugins working with the working directory.
from pulpcore.tasking.storage import WorkingDirectory # noqa
Expand Down
10 changes: 5 additions & 5 deletions pulpcore/plugin/viewsets/content.py
Expand Up @@ -10,7 +10,7 @@
AsyncOperationResponseSerializer,
)
from pulpcore.plugin.models import Artifact, PulpTemporaryFile
from pulpcore.plugin.tasking import enqueue_with_reservation
from pulpcore.plugin.tasking import dispatch
from pulpcore.plugin.viewsets import (
ContentViewSet,
OperationPostponedResponse,
Expand Down Expand Up @@ -57,13 +57,13 @@ def create(self, request):
shared_resources.append(repository)

app_label = self.queryset.model._meta.app_label
async_result = enqueue_with_reservation(
task = dispatch(
tasks.base.general_create_from_temp_file,
shared_resources,
args=(app_label, serializer.__class__.__name__, str(temp_file.pk)),
kwargs={"data": task_payload, "context": self.get_deferred_context(request)},
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)


class SingleArtifactContentUploadViewSet(DefaultDeferredContextMixin, ContentViewSet):
Expand All @@ -86,7 +86,7 @@ def create(self, request):
content_data.shared_resources.append(repository)

app_label = self.queryset.model._meta.app_label
async_result = enqueue_with_reservation(
task = dispatch(
tasks.base.general_create,
content_data.shared_resources,
args=(app_label, serializer.__class__.__name__),
Expand All @@ -95,7 +95,7 @@ def create(self, request):
"context": self.get_deferred_context(request),
},
)
return OperationPostponedResponse(async_result, request)
return OperationPostponedResponse(task, request)

def init_content_data(self, serializer, request):
"""Initialize the reference to an Artifact along with relevant task's payload data."""
Expand Down
44 changes: 44 additions & 0 deletions pulpcore/tasking/tasks.py
@@ -1,6 +1,7 @@
import logging
import time
import uuid
import warnings
from gettext import gettext as _

from django.db import IntegrityError, transaction
Expand Down Expand Up @@ -176,6 +177,21 @@ def enqueue_with_reservation(
Raises:
ValueError: When `resources` is an unsupported type.
"""
logging.warning(
_(
"`enqueue_with_reservation` is deprecated and could be removed as early as "
"pulpcore==3.13; use pulpcore.plugin.tasking.dispatch instead."
),
DeprecationWarning,
)
return _enqueue_with_reservation(
func, resources, args=args, kwargs=kwargs, options=options, task_group=task_group
)


def _enqueue_with_reservation(
func, resources, args=None, kwargs=None, options=None, task_group=None
):
if not args:
args = tuple()
if not kwargs:
Expand Down Expand Up @@ -227,3 +243,31 @@ def as_url(r):
task.set_failed(e, None)

return Job(id=inner_task_id, connection=redis_conn)

def dispatch(func, resources, args=None, kwargs=None, task_group=None):
"""
Enqueue a message to Pulp workers with a reservation.
This method provides normal enqueue functionality, while also requesting necessary locks for
serialized urls. No two tasks that claim the same resource can execute concurrently. It
accepts resources which it transforms into a list of urls (one for each resource).
This method creates a :class:`pulpcore.app.models.Task` object and returns it.
Args:
func (callable): The function to be run by RQ when the necessary locks are acquired.
resources (list): A list of resources to this task needs exclusive access to while running.
Each resource can be either a `str` or a `django.models.Model` instance.
args (tuple): The positional arguments to pass on to the task.
kwargs (dict): The keyword arguments to pass on to the task.
task_group (pulpcore.app.models.TaskGroup): A TaskGroup to add the created Task to.
Returns (pulpcore.app.models.Task): The Pulp Task that was created.
Raises:
ValueError: When `resources` is an unsupported type.
"""
RQ_job_id = _enqueue_with_reservation(
func, resources=resources, args=args, kwargs=kwargs, task_group=task_group
)
return Task.objects.get(pk=RQ_job_id.id)

0 comments on commit a502591

Please sign in to comment.