/
viewsets.py
287 lines (236 loc) · 9.05 KB
/
viewsets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
from collections import defaultdict
from gettext import gettext as _
from packaging.version import parse
from django.contrib.postgres.search import SearchQuery
from django.db import IntegrityError
from django.db.models import fields as db_fields
from django.db.models.expressions import F, Func
from django_filters import filters
from drf_yasg.utils import swagger_auto_schema
from rest_framework import mixins, serializers, viewsets
from rest_framework.decorators import action
from rest_framework.parsers import FormParser, MultiPartParser
from pulpcore.plugin.exceptions import DigestValidationError
from pulpcore.plugin.models import Artifact
from pulpcore.plugin.serializers import (
AsyncOperationResponseSerializer,
RepositorySyncURLSerializer,
)
from pulpcore.plugin.tasking import enqueue_with_reservation
from pulpcore.plugin.viewsets import (
BaseDistributionViewSet,
ContentFilter,
ContentViewSet,
NamedModelViewSet,
OperationPostponedResponse,
RemoteViewSet,
)
from .models import (
AnsibleDistribution,
AnsibleRemote,
CollectionVersion,
CollectionRemote,
Role,
Tag,
)
from .serializers import (
AnsibleDistributionSerializer,
AnsibleRemoteSerializer,
CollectionVersionSerializer,
CollectionRemoteSerializer,
CollectionOneShotSerializer,
RoleSerializer,
TagSerializer,
)
from .tasks.collections import sync as collection_sync
from .tasks.collections import import_collection
from .tasks.synchronizing import synchronize as role_sync
class RoleFilter(ContentFilter):
"""
FilterSet for Roles.
"""
class Meta:
model = Role
fields = ["name", "namespace", "version"]
class RoleViewSet(ContentViewSet):
"""
ViewSet for Role.
"""
endpoint_name = "roles"
queryset = Role.objects.all()
serializer_class = RoleSerializer
filterset_class = RoleFilter
class CollectionVersionFilter(ContentFilter):
"""
FilterSet for Ansible Collections.
"""
namespace = filters.CharFilter(field_name="namespace")
name = filters.CharFilter(field_name="name")
latest = filters.BooleanFilter(field_name="latest", method="filter_latest")
q = filters.CharFilter(field_name="q", method="filter_by_q")
def filter_by_q(self, queryset, name, value):
"""
Full text search provided by the 'q' option.
Args:
queryset: The query to add the additional full-text search filtering onto
name: The name of the option specified, i.e. 'q'
value: The string to search on
Returns:
The Django queryset that was passed in, additionally filtered by full-text search.
"""
search_query = SearchQuery(value)
qs = queryset.filter(search_vector=search_query)
ts_rank_fn = Func(
F("search_vector"),
search_query,
32, # RANK_NORMALIZATION = 32
function="ts_rank",
output_field=db_fields.FloatField(),
)
return qs.annotate(rank=ts_rank_fn).order_by("-rank")
def filter_latest(self, queryset, name, value):
"""
If the value of 'latest' is True, include only the latest Collection version in the results.
Args:
queryset: The already-formed queryset for modification
name: The name of the parameter, 'latest'
value: The value of the argument. This is checked if 'True' or not.
Returns:
Queryset with latest collections included if value is True.
"""
if not value:
return queryset
namespace_name_dict = defaultdict(lambda: defaultdict(list))
for collection in queryset.all():
version_entry = (parse(collection.version), collection.pk)
namespace_name_dict[collection.namespace][collection.name].append(version_entry)
latest_pks = []
for namespace, name_dict in namespace_name_dict.items():
for name, version_list in name_dict.items():
version_list.sort(reverse=True)
latest_pk = version_list[0][1]
latest_pks.append(latest_pk)
return queryset.filter(pk__in=latest_pks)
class Meta:
model = CollectionVersion
fields = ["namespace", "name", "version"]
class CollectionVersionViewSet(ContentViewSet):
"""
ViewSet for Ansible Collection.
"""
endpoint_name = "collections"
queryset = CollectionVersion.objects.prefetch_related("_artifacts")
serializer_class = CollectionVersionSerializer
filterset_class = CollectionVersionFilter
class AnsibleRemoteViewSet(RemoteViewSet):
"""
ViewSet for Ansible Remotes.
"""
endpoint_name = "ansible"
queryset = AnsibleRemote.objects.all()
serializer_class = AnsibleRemoteSerializer
@swagger_auto_schema(
operation_description="Trigger an asynchronous task to sync Ansible content.",
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
def sync(self, request, pk):
"""
Dispatches a sync task.
"""
remote = self.get_object()
serializer = RepositorySyncURLSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
repository = serializer.validated_data.get("repository")
mirror = serializer.validated_data.get("mirror", False)
result = enqueue_with_reservation(
role_sync,
[repository, remote],
kwargs={"remote_pk": remote.pk, "repository_pk": repository.pk, "mirror": mirror},
)
return OperationPostponedResponse(result, request)
class CollectionRemoteViewSet(RemoteViewSet):
"""
ViewSet for Collection Remotes.
"""
endpoint_name = "collection"
queryset = CollectionRemote.objects.all()
serializer_class = CollectionRemoteSerializer
@swagger_auto_schema(
operation_description="Trigger an asynchronous task to sync Collection content.",
responses={202: AsyncOperationResponseSerializer},
)
@action(detail=True, methods=["post"], serializer_class=RepositorySyncURLSerializer)
def sync(self, request, pk):
"""
Dispatches a Collection sync task.
"""
collection_remote = self.get_object()
serializer = RepositorySyncURLSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
repository = serializer.validated_data.get("repository")
mirror = serializer.validated_data.get("mirror", False)
kwargs = {
"remote_pk": collection_remote.pk,
"repository_pk": repository.pk,
"mirror": mirror,
}
result = enqueue_with_reservation(
collection_sync, [repository, collection_remote], kwargs=kwargs
)
return OperationPostponedResponse(result, request)
class CollectionUploadViewSet(viewsets.ViewSet):
"""
ViewSet for One Shot Collection Upload.
Args:
file@: package to upload
"""
serializer_class = CollectionOneShotSerializer
parser_classes = (MultiPartParser, FormParser)
@swagger_auto_schema(
operation_description="Create an artifact and trigger an asynchronous task to create "
"Collection content from it.",
operation_summary="Upload a collection",
operation_id="upload_collection",
request_body=CollectionOneShotSerializer,
responses={202: AsyncOperationResponseSerializer},
)
def create(self, request):
"""
Dispatch a Collection creation task.
"""
serializer = CollectionOneShotSerializer(data=request.data, context={"request": request})
serializer.is_valid(raise_exception=True)
expected_digests = {}
if serializer.validated_data["sha256"]:
expected_digests["sha256"] = serializer.validated_data["sha256"]
try:
artifact = Artifact.init_and_validate(
serializer.validated_data["file"], expected_digests=expected_digests
)
except DigestValidationError:
raise serializers.ValidationError(
_("The provided sha256 value does not match the sha256 of the uploaded file.")
)
try:
artifact.save()
except IntegrityError:
raise serializers.ValidationError(_("Artifact already exists."))
async_result = enqueue_with_reservation(
import_collection, [str(artifact.pk)], kwargs={"artifact_pk": artifact.pk}
)
return OperationPostponedResponse(async_result, request)
class AnsibleDistributionViewSet(BaseDistributionViewSet):
"""
ViewSet for Ansible Distributions.
"""
endpoint_name = "ansible"
queryset = AnsibleDistribution.objects.all()
serializer_class = AnsibleDistributionSerializer
class TagViewSet(NamedModelViewSet, mixins.ListModelMixin):
"""
ViewSet for Tag models.
"""
endpoint_name = "pulp_ansible/tags"
queryset = Tag.objects.all()
serializer_class = TagSerializer