Skip to content

Commit

Permalink
Problem: async update and delete code is duplicated
Browse files Browse the repository at this point in the history
Solution: add general update and delete tasks and viewsets

This patch creates two new tasks that are used by both the publisher viewset and the importer viewset.
This patch also introduces a new base viewset. Both the importer and publisher viewset inherit from it.

closes #3038
https://pulp.plan.io/issues/3038
  • Loading branch information
dkliban committed Dec 13, 2017
1 parent b0a6f3f commit 95574fb
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 137 deletions.
6 changes: 4 additions & 2 deletions pulpcore/pulpcore/app/apps.py
Expand Up @@ -94,11 +94,13 @@ def import_serializers(self):
def import_viewsets(self):
# circular import avoidance
from pulpcore.app.viewsets import (GenericNamedModelViewSet, NamedModelViewSet,
CreateDestroyReadNamedModelViewSet)
CreateDestroyReadNamedModelViewSet,
CreateReadAsyncUpdateDestroyNamedModelViewset)
# These viewsets are used as base classes for actual model viewsets and
# should not be registered
base_viewsets = [GenericNamedModelViewSet, NamedModelViewSet,
CreateDestroyReadNamedModelViewSet]
CreateDestroyReadNamedModelViewSet,
CreateReadAsyncUpdateDestroyNamedModelViewset]
self.named_viewsets = {}
if module_has_submodule(self.module, VIEWSETS_MODULE_NAME):
# import the viewsets module and track any interesting viewsets
Expand Down
2 changes: 1 addition & 1 deletion pulpcore/pulpcore/app/tasks/__init__.py
@@ -1 +1 @@
from pulpcore.app.tasks import importer, orphan, publisher, repository # noqa
from pulpcore.app.tasks import base, importer, orphan, publisher, repository # noqa
56 changes: 56 additions & 0 deletions pulpcore/pulpcore/app/tasks/base.py
@@ -0,0 +1,56 @@
from celery import shared_task
from django.http import QueryDict

from pulpcore.tasking.tasks import UserFacingTask
from pulpcore.app.apps import get_plugin_config


@shared_task(base=UserFacingTask)
def general_update(instance_id, app_label, serializer_name, *args, **kwargs):
"""
Update a model
The model instance is identified using the app_label, id, and serializer name. The serializer is
used to perform validation.
Args:
id (str): the id of the model
app_label (str): the Django app label of the plugin that provides the model
serializer_name (str): name of the serializer class for the model
data (dict): dictionary whose keys represent the fields of the model and their corresponding
values.
partial (bool): When true, only the fields specified in the data dictionary are updated.
When false, any fields missing from the data dictionary are assumed to be None and
their values are updated as such.
Raises:
:class:`rest_framework.exceptions.ValidationError`: When serializer instance can't be saved
due to validation error. This theoretically should never occur since validation is
performed before the task is dispatched.
"""
data = kwargs.pop('data', None)
partial = kwargs.pop('partial', False)
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
instance = serializer_class.Meta.model.objects.get(id=instance_id).cast()
data_querydict = QueryDict('', mutable=True)
data_querydict.update(data)
serializer = serializer_class(instance, data=data_querydict, partial=partial)
serializer.is_valid(raise_exception=True)
serializer.save()


@shared_task(base=UserFacingTask)
def general_delete(instance_id, app_label, serializer_name):
"""
Delete a model
The model instance is identified using the app_label, id, and serializer name.
Args:
id (str): the id of the model
app_label (str): the Django app label of the plugin that provides the model
serializer_name (str): name of the serializer class for the model
"""
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
instance = serializer_class.Meta.model.objects.get(id=instance_id).cast()
instance.delete()
42 changes: 0 additions & 42 deletions pulpcore/pulpcore/app/tasks/importer.py
Expand Up @@ -2,57 +2,15 @@
import logging

from celery import shared_task
from django.http import QueryDict

from pulpcore.app import models
from pulpcore.app.apps import get_plugin_config
from pulpcore.tasking.services import storage
from pulpcore.tasking.tasks import UserFacingTask


log = logging.getLogger(__name__)


@shared_task(base=UserFacingTask)
def update(importer_pk, app_label, serializer_name, data=None, partial=False):
"""
Update an :class:`~pulp.app.models.Importer`
Args:
importer_pk (str): the PK of the importer
app_label (str): the Django app label of the plugin that provides the importer
serializer_name (str): name of the serializer class for the importer
data (dict): dictionary whose keys represent the fields of the importer that need to be
updated with the corresponding values.
partial (bool): When true, only the fields specified in the data dictionary are updated.
When false, any fields missing from the data dictionary are assumed to be None and
their values are updated as such.
Raises:
:class:`rest_framework.exceptions.ValidationError`: When serializer instance can't be saved
due to validation error. This theoretically should never occur since validation is
performed before the task is dispatched.
"""
importer = models.Importer.objects.get(pk=importer_pk).cast()
data_querydict = QueryDict('', mutable=True)
data_querydict.update(data or {})
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
serializer = serializer_class(importer, data=data_querydict, partial=partial)
serializer.is_valid(raise_exception=True)
serializer.save()


@shared_task(base=UserFacingTask)
def delete(importer_pk):
"""
Delete an :class:`~pulpcore.app.models.Importer`
Args:
importer_pk (str): the PK of the importer
"""
models.Importer.objects.filter(pk=importer_pk).delete()


@shared_task(base=UserFacingTask)
def sync(importer_pk):
"""
Expand Down
41 changes: 0 additions & 41 deletions pulpcore/pulpcore/app/tasks/publisher.py
Expand Up @@ -4,56 +4,15 @@

from celery import shared_task
from django.db import transaction
from django.http import QueryDict

from pulpcore.app import models
from pulpcore.app.apps import get_plugin_config
from pulpcore.tasking.services import storage
from pulpcore.tasking.tasks import UserFacingTask


log = getLogger(__name__)


@shared_task(base=UserFacingTask)
def update(publisher_pk, app_label, serializer_name, data=None, partial=False):
"""
Update an instance of a :class:`~pulpcore.app.models.Publisher`
Args:
publisher_pk (str): The publisher PK.
app_label (str): the Django app label of the plugin that provides the publisher
serializer_name (str): name of the serializer class for this publisher
data (dict): Data to update on the publisher. keys are field names, values are new values.
partial (bool): When true, update only the specified fields. When false, omitted fields
are set to None.
Raises:
:class:`rest_framework.exceptions.ValidationError`: When serializer instance can't be saved
due to validation error. This theoretically should never occur since validation is
performed before the task is dispatched.
"""
publisher = models.Publisher.objects.get(pk=publisher_pk).cast()
data_querydict = QueryDict("", mutable=True)
data_querydict.update(data or {})
# The publisher serializer class is different for each plugin
serializer_class = get_plugin_config(app_label).named_serializers[serializer_name]
serializer = serializer_class(publisher, data=data_querydict, partial=partial)
serializer.is_valid(raise_exception=True)
serializer.save()


@shared_task(base=UserFacingTask)
def delete(publisher_pk):
"""
Delete a :class:`~pulpcore.app.models.Publisher`
Args:
publisher_pk (str): The publisher PK.
"""
models.Publisher.objects.filter(pk=publisher_pk).delete()


@shared_task(base=UserFacingTask)
def publish(publisher_pk):
"""
Expand Down
3 changes: 2 additions & 1 deletion pulpcore/pulpcore/app/viewsets/__init__.py
@@ -1,5 +1,6 @@
from pulpcore.app.viewsets.base import (GenericNamedModelViewSet, NamedModelViewSet, # noqa
CreateDestroyReadNamedModelViewSet) # noqa
CreateDestroyReadNamedModelViewSet,
CreateReadAsyncUpdateDestroyNamedModelViewset) # noqa
from pulpcore.app.viewsets.content import ArtifactViewSet, ContentViewSet # noqa
from pulpcore.app.viewsets.repository import (DistributionViewSet, # noqa
ImporterViewSet,
Expand Down
58 changes: 58 additions & 0 deletions pulpcore/pulpcore/app/viewsets/base.py
@@ -1,6 +1,9 @@
import warnings

from pulpcore.app import tasks
from pulpcore.app.models import MasterModel
from pulpcore.app.response import OperationPostponedResponse

from rest_framework import viewsets, mixins
from rest_framework.generics import get_object_or_404

Expand Down Expand Up @@ -158,3 +161,58 @@ class CreateDestroyReadNamedModelViewSet(mixins.CreateModelMixin,
"""
pass


class AsyncUpdateMixin(object):
"""
Provides an update method that dispatches a task with reservation for a repository
"""
def update(self, request, pk, **kwargs):
partial = kwargs.pop('partial', False)
instance = self.get_object()
serializer = self.get_serializer(instance, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
app_label = instance._meta.app_label
async_result = tasks.base.general_update.apply_async_with_reservation(
instance._meta.db_table, pk,
args=(pk, app_label, serializer.__class__.__name__),
kwargs={'data': request.data, 'partial': partial}
)
return OperationPostponedResponse([async_result], request)

def partial_update(self, request, *args, **kwargs):
kwargs['partial'] = True
return self.update(request, *args, **kwargs)


class AsyncRemoveMixin(object):
"""
Provides a delete method that dispatches a task with reservation for a repository
"""
def destroy(self, request, pk, **kwargs):
"""
Delete a model instance
"""
instance = self.get_object()
serializer = self.get_serializer(instance)
app_label = instance._meta.app_label
async_result = tasks.base.general_delete.apply_async_with_reservation(
instance._meta.db_table, pk,
args=(pk, app_label, serializer.__class__.__name__)
)
return OperationPostponedResponse([async_result], request)


class CreateReadAsyncUpdateDestroyNamedModelViewset(mixins.CreateModelMixin,
mixins.RetrieveModelMixin,
mixins.ListModelMixin,
AsyncUpdateMixin,
AsyncRemoveMixin,
GenericNamedModelViewSet):
"""
A viewset that performs asynchronous update and remove operations
This viewset should be used with resources that require making a reservation for a repository
during an update or delete.
"""
pass
53 changes: 3 additions & 50 deletions pulpcore/pulpcore/app/viewsets/repository.py
Expand Up @@ -18,7 +18,7 @@
PublisherSerializer,
RepositorySerializer,
RepositoryContentSerializer)
from pulpcore.app.viewsets import NamedModelViewSet
from pulpcore.app.viewsets import (NamedModelViewSet, CreateReadAsyncUpdateDestroyNamedModelViewset)
from pulpcore.app.viewsets.custom_filters import CharInFilter
from pulpcore.common import tags

Expand Down Expand Up @@ -122,35 +122,12 @@ class Meta:
fields = ContentAdaptorFilter.Meta.fields


class ImporterViewSet(NamedModelViewSet):
class ImporterViewSet(CreateReadAsyncUpdateDestroyNamedModelViewset):
endpoint_name = 'importers'
serializer_class = ImporterSerializer
queryset = Importer.objects.all()
filter_class = ImporterFilter

def update(self, request, pk, partial=False):
importer = self.get_object()
serializer = self.get_serializer(importer, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
app_label = importer._meta.app_label
async_result = tasks.importer.update.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, str(importer.repository.pk),
kwargs={'importer_pk': importer.pk,
'app_label': app_label,
'serializer_name': serializer.__class__.__name__,
'data': request.data,
'partial': partial}
)
return OperationPostponedResponse([async_result], request)

def destroy(self, request, pk):
importer = self.get_object()
async_result = tasks.importer.delete.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, str(importer.repository.pk),
kwargs={'importer_pk': importer.pk}
)
return OperationPostponedResponse([async_result], request)

@decorators.detail_route(methods=('post',))
def sync(self, request, pk):
importer = self.get_object()
Expand All @@ -161,36 +138,12 @@ def sync(self, request, pk):
return OperationPostponedResponse([async_result], request)


class PublisherViewSet(NamedModelViewSet):
class PublisherViewSet(CreateReadAsyncUpdateDestroyNamedModelViewset):
endpoint_name = 'publishers'
serializer_class = PublisherSerializer
queryset = Publisher.objects.all()
filter_class = PublisherFilter

def update(self, request, pk, partial=False):
publisher = self.get_object()
serializer = self.get_serializer(publisher, data=request.data, partial=partial)
serializer.is_valid(raise_exception=True)
app_label = publisher._meta.app_label
async_result = tasks.publisher.update.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, str(publisher.repository.pk),
kwargs={'publisher_pk': publisher.pk,
'app_label': app_label,
'serializer_name': serializer.__class__.__name__,
'data': request.data,
'partial': partial}
)
return OperationPostponedResponse([async_result], request)

def destroy(self, request, pk):
publisher = self.get_object()
async_result = tasks.publisher.delete.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, str(publisher.repository.pk),
kwargs={'publisher_pk': publisher.pk}
)

return OperationPostponedResponse([async_result], request)


class PublicationViewSet(NamedModelViewSet):
endpoint_name = 'publications'
Expand Down

0 comments on commit 95574fb

Please sign in to comment.