Skip to content

Commit

Permalink
Deprecate resources argument of dispatch
Browse files Browse the repository at this point in the history
fixes #9257
  • Loading branch information
mdellweg committed Aug 19, 2021
1 parent 6674440 commit 85281bb
Show file tree
Hide file tree
Showing 15 changed files with 64 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGES/plugin_api/9257.deprecation
@@ -0,0 +1 @@
Deprecate the ``resources`` argument of ``dispatch`` in favor of ``exclusive_resources`` and ``shared_resources``.
3 changes: 2 additions & 1 deletion docs/plugins/plugin-writer/concepts/subclassing/viewsets.rst
Expand Up @@ -94,7 +94,8 @@ See :class:`~pulpcore.plugin.tasking.dispatch` for more details.
# This is how tasks are kicked off.
result = dispatch(
tasks.synchronize,
[repository, remote],
exclusive_resources=[repository],
shared_resources=[remote],
kwargs={
'remote_pk': remote.pk,
'repository_pk': repository.pk,
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/tasks/importer.py
Expand Up @@ -427,7 +427,7 @@ def validate_and_assemble(toc_filename):

dispatch(
import_repository_version,
[dest_repo],
exclusive_resources=[dest_repo],
args=[importer.pk, dest_repo.pk, src_repo["name"], path],
task_group=task_group,
)
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/views/orphans.py
Expand Up @@ -25,6 +25,6 @@ def delete(self, request, format=None):
"`POST /pulp/api/v3/orphans/cleanup/` instead."
)

task = dispatch(orphan_cleanup, [])
task = dispatch(orphan_cleanup)

return OperationPostponedResponse(task, request)
2 changes: 1 addition & 1 deletion pulpcore/app/views/repair.py
Expand Up @@ -26,6 +26,6 @@ def post(self, request):

verify_checksums = serializer.validated_data["verify_checksums"]

task = dispatch(repair_all_artifacts, [], args=[verify_checksums])
task = dispatch(repair_all_artifacts, args=[verify_checksums])

return OperationPostponedResponse(task, request)
6 changes: 3 additions & 3 deletions pulpcore/app/viewsets/base.py
Expand Up @@ -431,7 +431,7 @@ def create(self, request, *args, **kwargs):
app_label = self.queryset.model._meta.app_label
task = dispatch(
tasks.base.general_create,
self.async_reserved_resources(None),
exclusive_resources=self.async_reserved_resources(None),
args=(app_label, serializer.__class__.__name__),
kwargs={"data": request.data},
)
Expand All @@ -455,7 +455,7 @@ def update(self, request, pk, **kwargs):
app_label = instance._meta.app_label
task = dispatch(
tasks.base.general_update,
self.async_reserved_resources(instance),
exclusive_resources=self.async_reserved_resources(instance),
args=(pk, app_label, serializer.__class__.__name__),
kwargs={"data": request.data, "partial": partial},
)
Expand Down Expand Up @@ -488,7 +488,7 @@ def destroy(self, request, pk, **kwargs):
app_label = instance._meta.app_label
task = dispatch(
tasks.base.general_delete,
self.async_reserved_resources(instance),
exclusive_resources=self.async_reserved_resources(instance),
args=(pk, app_label, serializer.__class__.__name__),
)
return OperationPostponedResponse(task, request)
Expand Down
6 changes: 3 additions & 3 deletions pulpcore/app/viewsets/exporter.py
Expand Up @@ -145,7 +145,7 @@ def create(self, request, exporter_pk):
# Invoke the export
task = dispatch(
pulp_export,
[exporter],
exclusive_resources=[exporter],
kwargs={"exporter_pk": str(exporter.pk), "params": request.data},
)

Expand Down Expand Up @@ -183,15 +183,15 @@ def create(self, request, exporter_pk):

task = dispatch(
fs_publication_export,
[exporter],
exclusive_resources=[exporter],
kwargs={"exporter_pk": exporter.pk, "publication_pk": publication.pk},
)
else:
repo_version = self.get_resource(request.data["repository_version"], RepositoryVersion)

task = dispatch(
fs_repo_version_export,
[exporter],
exclusive_resources=[exporter],
kwargs={"exporter_pk": str(exporter.pk), "repo_version_pk": repo_version.pk},
)

Expand Down
4 changes: 3 additions & 1 deletion pulpcore/app/viewsets/importer.py
Expand Up @@ -105,6 +105,8 @@ def create(self, request, importer_pk):
path = serializer.validated_data.get("path")
toc = serializer.validated_data.get("toc")
task = dispatch(
pulp_import, [importer], kwargs={"importer_pk": importer.pk, "path": path, "toc": toc}
pulp_import,
exclusive_resources=[importer],
kwargs={"importer_pk": importer.pk, "path": path, "toc": toc},
)
return OperationPostponedResponse(task, request)
1 change: 0 additions & 1 deletion pulpcore/app/viewsets/orphans.py
Expand Up @@ -26,7 +26,6 @@ def cleanup(self, request):

task = dispatch(
orphan_cleanup,
[],
kwargs={"content_pks": content_pks, "orphan_protection_time": orphan_protection_time},
)

Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/viewsets/reclaim.py
Expand Up @@ -37,7 +37,7 @@ def reclaim(self, request):

task = dispatch(
reclaim_space,
repos,
shared_resources=repos,
kwargs={
"repo_pks": reclaim_repo_pks,
"keeplist_rv_pks": keeplist_rv_pks,
Expand Down
6 changes: 4 additions & 2 deletions pulpcore/app/viewsets/repository.py
Expand Up @@ -174,7 +174,9 @@ def destroy(self, request, repository_pk, number):
version = self.get_object()

task = dispatch(
tasks.repository.delete_version, [version.repository], kwargs={"pk": version.pk}
tasks.repository.delete_version,
exclusive_resources=[version.repository],
kwargs={"pk": version.pk},
)
return OperationPostponedResponse(task, request)

Expand All @@ -195,7 +197,7 @@ def repair(self, request, repository_pk, number):

task = dispatch(
tasks.repository.repair_version,
[version.repository],
shared_resources=[version.repository],
args=[version.pk, verify_checksums],
)
return OperationPostponedResponse(task, request)
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/app/viewsets/upload.py
Expand Up @@ -95,5 +95,5 @@ def commit(self, request, pk):
sha256 = serializer.validated_data["sha256"]

upload = self.get_object()
task = dispatch(tasks.upload.commit, [upload], args=(upload.pk, sha256))
task = dispatch(tasks.upload.commit, exclusive_resources=[upload], args=(upload.pk, sha256))
return OperationPostponedResponse(task, request)
2 changes: 1 addition & 1 deletion pulpcore/plugin/actions.py
Expand Up @@ -58,7 +58,7 @@ def modify(self, request, pk):

task = dispatch(
tasks.repository.add_and_remove,
[repository],
exclusive_resources=[repository],
kwargs={
"repository_pk": pk,
"base_version_pk": base_version_pk,
Expand Down
17 changes: 6 additions & 11 deletions pulpcore/plugin/viewsets/content.py
@@ -1,5 +1,3 @@
from collections import namedtuple

from drf_spectacular.utils import extend_schema

from django.db import DatabaseError
Expand All @@ -17,8 +15,6 @@
OperationPostponedResponse,
)

ContentUploadData = namedtuple("ContentUploadData", ["shared_resources", "task_payload"])


class DefaultDeferredContextMixin:
"""A mixin that provides a method for retrieving the default deferred context."""
Expand Down Expand Up @@ -52,15 +48,15 @@ def create(self, request):
temp_file = PulpTemporaryFile.init_and_validate(file_content)
temp_file.save()

shared_resources = []
resources = []
repository = serializer.validated_data.get("repository")
if repository:
shared_resources.append(repository)
resources.append(repository)

app_label = self.queryset.model._meta.app_label
task = dispatch(
tasks.base.general_create_from_temp_file,
shared_resources,
exclusive_resources=resources,
args=(app_label, serializer.__class__.__name__, str(temp_file.pk)),
kwargs={"data": task_payload, "context": self.get_deferred_context(request)},
)
Expand All @@ -84,15 +80,14 @@ def create(self, request):

repository = serializer.validated_data.get("repository")
if repository:
content_data.shared_resources.append(repository)
content_data.append(repository)

app_label = self.queryset.model._meta.app_label
task = dispatch(
tasks.base.general_create,
content_data.shared_resources,
args=(app_label, serializer.__class__.__name__),
kwargs={
"data": content_data.task_payload,
"data": content_data,
"context": self.get_deferred_context(request),
},
)
Expand Down Expand Up @@ -121,4 +116,4 @@ def init_content_data(self, serializer, request):
artifact, context={"request": request}
).data["pulp_href"]

return ContentUploadData([artifact], task_payload)
return task_payload
41 changes: 36 additions & 5 deletions pulpcore/tasking/tasks.py
Expand Up @@ -230,7 +230,15 @@ def _enqueue_with_reservation(
return Job(id=inner_task_id, connection=redis_conn)


def dispatch(func, resources, args=None, kwargs=None, task_group=None, shared_resources=None):
def dispatch(
func,
resources=None,
args=None,
kwargs=None,
task_group=None,
exclusive_resources=None,
shared_resources=None,
):
"""
Enqueue a message to Pulp workers with a reservation.
Expand All @@ -246,10 +254,13 @@ def dispatch(func, resources, args=None, kwargs=None, task_group=None, shared_re
Args:
func (callable): The function to be run by RQ when the necessary locks are acquired.
resources (list): A list of resources this task needs exclusive access to while running.
Each resource can be either a `str` or a `django.models.Model` instance.
Each resource can be either a `str` or a `django.models.Model` instance. This parameter
is deprecated.
args (tuple): The positional arguments to pass on to the task.
kwargs (dict): The keyword arguments to pass on to the task.
task_group (pulpcore.app.models.TaskGroup): A TaskGroup to add the created Task to.
exclusive_resources (list): A list of resources this task needs exclusive access to while
running. Each resource can be either a `str` or a `django.models.Model` instance.
shared_resources (list): A list of resources this task needs non-exclusive access to while
running. Each resource can be either a `str` or a `django.models.Model` instance.
Expand All @@ -258,10 +269,30 @@ def dispatch(func, resources, args=None, kwargs=None, task_group=None, shared_re
Raises:
ValueError: When `resources` is an unsupported type.
"""
if resources is not None:
if exclusive_resources is not None:
raise RuntimeError(
_(
"Only one of 'exclusive_resources' and 'resources' can be specified for dispatch."
)
)
deprecation_logger.warning(
_(
"The use of the 'resources' argument to 'dispatch' has been deprecated and may be"
" removed as soon as pulpcore==3.16. Please use 'exclusive_resources' and"
" 'shared_resources' instead."
)
)
exclusive_resources = resources
resources = None

if exclusive_resources is None:
exclusive_resources = []

if settings.USE_NEW_WORKER_TYPE:
args_as_json = json.dumps(args, cls=UUIDEncoder)
kwargs_as_json = json.dumps(kwargs, cls=UUIDEncoder)
resources = _validate_and_get_resources(resources)
resources = _validate_and_get_resources(exclusive_resources)
if shared_resources:
resources.extend(
(f"shared:{resource}" for resource in _validate_and_get_resources(shared_resources))
Expand Down Expand Up @@ -290,8 +321,8 @@ def dispatch(func, resources, args=None, kwargs=None, task_group=None, shared_re
)
# There is only exclusive use in the legacy tasking system
if shared_resources:
resources = resources + shared_resources
exclusive_resources = exclusive_resources + shared_resources
RQ_job_id = _enqueue_with_reservation(
func, resources=resources, args=args, kwargs=kwargs, task_group=task_group
func, resources=exclusive_resources, args=args, kwargs=kwargs, task_group=task_group
)
return Task.objects.get(pk=RQ_job_id.id)

0 comments on commit 85281bb

Please sign in to comment.