/
content_stages.py
293 lines (241 loc) · 13.3 KB
/
content_stages.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
from collections import defaultdict
from django.core.exceptions import ObjectDoesNotExist
from django.db import IntegrityError, transaction
from django.db.models import Q
from pulpcore.plugin.models import Content, ContentArtifact, ProgressReport
from .api import Stage
class QueryExistingContents(Stage):
"""
A Stages API stage that saves :attr:`DeclarativeContent.content` objects and saves its related
:class:`~pulpcore.plugin.models.ContentArtifact` objects too.
This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `self._in_q`
and inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one
:class:`~pulpcore.plugin.models.Artifact`.
This stage inspects any "unsaved" Content unit objects and searches for existing saved Content
units inside Pulp with the same unit key. Any existing Content objects found, replace their
"unsaved" counterpart in the :class:`~pulpcore.plugin.stages.DeclarativeContent` object.
Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to `self._out_q` after it has
been handled.
This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.
"""
async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
async for batch in self.batches():
content_q_by_type = defaultdict(lambda: Q(pk__in=[]))
d_content_by_nat_key = defaultdict(list)
for d_content in batch:
if d_content.content._state.adding:
model_type = type(d_content.content)
unit_q = d_content.content.q()
content_q_by_type[model_type] = content_q_by_type[model_type] | unit_q
d_content_by_nat_key[d_content.content.natural_key()].append(d_content)
for model_type, content_q in content_q_by_type.items():
try:
model_type.objects.filter(content_q).touch()
except AttributeError:
from pulpcore.app.loggers import deprecation_logger
from gettext import gettext as _
deprecation_logger.warning(
_(
"As of pulpcore 3.14.5, plugins which declare custom ORM managers on "
"their content classes should have those managers inherit from "
"pulpcore.plugin.models.ContentManager. This will become a hard error "
"in the future."
)
)
for result in model_type.objects.filter(content_q).iterator():
for d_content in d_content_by_nat_key[result.natural_key()]:
d_content.content = result
for d_content in batch:
await self.put(d_content)
class ContentSaver(Stage):
"""
A Stages API stage that saves :attr:`DeclarativeContent.content` objects and saves its related
:class:`~pulpcore.plugin.models.ContentArtifact` objects too.
This stage expects :class:`~pulpcore.plugin.stages.DeclarativeContent` units from `self._in_q`
and inspects their associated :class:`~pulpcore.plugin.stages.DeclarativeArtifact` objects. Each
:class:`~pulpcore.plugin.stages.DeclarativeArtifact` object stores one
:class:`~pulpcore.plugin.models.Artifact`.
Each "unsaved" Content objects is saved and a :class:`~pulpcore.plugin.models.ContentArtifact`
objects too.
Each :class:`~pulpcore.plugin.stages.DeclarativeContent` is sent to after it has been handled.
This stage drains all available items from `self._in_q` and batches everything into one large
call to the db for efficiency.
"""
async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
async for batch in self.batches():
content_artifact_bulk = []
to_update_ca_query = ContentArtifact.objects.none()
to_update_ca_bulk = []
to_update_ca_artifact = {}
with transaction.atomic():
await self._pre_save(batch)
# Process the batch in dc.content.natural_keys order.
# This prevents deadlocks when we're processing the same/similar content
# in concurrent workers.
batch.sort(key=lambda x: "".join(map(str, x.content.natural_key())))
for d_content in batch:
# Are we saving to the database for the first time?
content_already_saved = not d_content.content._state.adding
if not content_already_saved:
try:
with transaction.atomic():
d_content.content.save()
except IntegrityError as e:
try:
d_content.content = d_content.content.__class__.objects.get(
d_content.content.q()
)
except ObjectDoesNotExist:
raise e
else:
for d_artifact in d_content.d_artifacts:
if not d_artifact.artifact._state.adding:
artifact = d_artifact.artifact
else:
# set to None for on-demand synced artifacts
artifact = None
content_artifact = ContentArtifact(
content=d_content.content,
artifact=artifact,
relative_path=d_artifact.relative_path,
)
content_artifact_bulk.append(content_artifact)
continue
# When the Content already exists, check if ContentArtifacts need to be updated
for d_artifact in d_content.d_artifacts:
if not d_artifact.artifact._state.adding:
# the artifact is already present in the database; update references
# Creating one large query and one large dictionary
to_update_ca_query |= ContentArtifact.objects.filter(
content=d_content.content, relative_path=d_artifact.relative_path
)
key = (d_content.content.pk, d_artifact.relative_path)
to_update_ca_artifact[key] = d_artifact.artifact
# Query db once and update each object in memory for bulk_update call
for content_artifact in to_update_ca_query.iterator():
key = (content_artifact.content_id, content_artifact.relative_path)
# Maybe remove dict elements after to reduce memory?
content_artifact.artifact = to_update_ca_artifact[key]
to_update_ca_bulk.append(content_artifact)
# Sort the lists we're about to do bulk updates/creates on.
# We know to_update_ca_bulk entries already are in the DB, so we can enforce
# order just using pulp_id.
to_update_ca_bulk.sort(key=lambda x: x.pulp_id)
content_artifact_bulk.sort(key=lambda x: ContentArtifact.sort_key(x))
ContentArtifact.objects.bulk_update(to_update_ca_bulk, ["artifact"])
ContentArtifact.objects.bulk_get_or_create(content_artifact_bulk)
await self._post_save(batch)
for declarative_content in batch:
await self.put(declarative_content)
async def _pre_save(self, batch):
"""
A hook plugin-writers can override to save related objects prior to content unit saving.
This is run within the same transaction as the content unit saving.
Args:
batch (list of :class:`~pulpcore.plugin.stages.DeclarativeContent`): The batch of
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects to be saved.
"""
pass
async def _post_save(self, batch):
"""
A hook plugin-writers can override to save related objects after content unit saving.
This is run within the same transaction as the content unit saving.
Args:
batch (list of :class:`~pulpcore.plugin.stages.DeclarativeContent`): The batch of
:class:`~pulpcore.plugin.stages.DeclarativeContent` objects to be saved.
"""
pass
class ResolveContentFutures(Stage):
"""
This stage resolves the futures in :class:`~pulpcore.plugin.stages.DeclarativeContent`.
Futures results are set to the found/created :class:`~pulpcore.plugin.models.Content`.
This is useful when data downloaded from the plugin API needs to be parsed by FirstStage to
create additional :class:`~pulpcore.plugin.stages.DeclarativeContent` objects to be send down
the pipeline. Consider an example where content type `Foo` references additional instances of a
different content type `Bar`. Consider this code in FirstStage::
# Create d_content and d_artifact for a `foo_a`
foo_a = DeclarativeContent(...)
# Send it in the pipeline
await self.put(foo_a)
...
foo_a_content = await foo_a.resolution() # awaits until the foo_a reaches this stage
This creates a "looping" pattern, of sorts, where downloaded content at the end of the pipeline
can introduce new additional to-be-downloaded content at the beginning of the pipeline.
On the other hand, it can impose a substantial performance decrement of batching content in the
earlier stages.
If you want to drop a declarative content prematurely from the pipeline, use the function
`resolve()` to unblock the coroutines awaiting the attached future and do not hand the content
to the next stage.
As a rule of thumb, sending more items into the pipeline first and awaiting their resolution
later is better.
"""
async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
async for d_content in self.items():
d_content.resolve()
await self.put(d_content)
class ContentAssociation(Stage):
"""
A Stages API stage that associates content units with `new_version`.
This stage stores all content unit primary keys in memory before running. This is done to
compute the units already associated but not received from `self._in_q`. These units are passed
via `self._out_q` to the next stage as a :class:`django.db.models.query.QuerySet`.
This stage creates a ProgressReport named 'Associating Content' that counts the number of units
associated. Since it's a stream the total count isn't known until it's finished.
If `mirror` was enabled, then content units may also be un-assocated (removed) from
`new_version`. A ProgressReport named 'Un-Associating Content' is created that counts the number
of units un-associated.
Args:
new_version (:class:`~pulpcore.plugin.models.RepositoryVersion`): The repo version this
stage associates content with.
mirror (bool): Whether or not to "mirror" the stream of DeclarativeContent - whether content
not in the stream should be removed from the repository.
args: unused positional arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
kwargs: unused keyword arguments passed along to :class:`~pulpcore.plugin.stages.Stage`.
"""
def __init__(self, new_version, mirror, *args, **kwargs):
super().__init__(*args, **kwargs)
self.new_version = new_version
self.allow_delete = mirror
async def run(self):
"""
The coroutine for this stage.
Returns:
The coroutine for this stage.
"""
with ProgressReport(message="Associating Content", code="associating.content") as pb:
to_delete = set(self.new_version.content.values_list("pk", flat=True))
async for batch in self.batches():
to_add = set()
for d_content in batch:
try:
to_delete.remove(d_content.content.pk)
except KeyError:
to_add.add(d_content.content.pk)
await self.put(d_content)
if to_add:
self.new_version.add_content(Content.objects.filter(pk__in=to_add))
pb.increase_by(len(to_add))
if self.allow_delete:
with ProgressReport(
message="Un-Associating Content", code="unassociating.content"
) as pb:
if to_delete:
self.new_version.remove_content(Content.objects.filter(pk__in=to_delete))
pb.increase_by(len(to_delete))