This repository was archived by the owner on Dec 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 167
/
Copy pathrepository.py
1838 lines (1523 loc) · 76.4 KB
/
repository.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from gettext import gettext as _
from itertools import chain, izip_longest
import copy
import logging
import os
import socket
import sys
import time
from urlparse import urlunsplit
import uuid
from bson.objectid import ObjectId, InvalidId
import celery
from mongoengine import NotUniqueError, OperationError, ValidationError, DoesNotExist
from nectar.config import DownloaderConfig
from nectar.request import DownloadRequest
from nectar.downloaders.threaded import HTTPThreadedDownloader
from nectar.listener import DownloadEventListener
from pulp.common import dateutils, error_codes, tags
from pulp.common.config import parse_bool, Unparsable
from pulp.common.plugins import reporting_constants, importer_constants
from pulp.common.tags import resource_tag, RESOURCE_REPOSITORY_TYPE, action_tag
from pulp.plugins.conduits.repo_sync import RepoSyncConduit
from pulp.plugins.conduits.repo_publish import RepoPublishConduit
from pulp.plugins.config import PluginCallConfiguration
from pulp.plugins.loader import api as plugin_api
from pulp.plugins.loader import exceptions as plugin_exceptions
from pulp.plugins.model import SyncReport
from pulp.plugins.util.misc import paginate, mkdir
from pulp.plugins.util.verification import VerificationException, verify_checksum
from pulp.server import exceptions as pulp_exceptions
from pulp.server.async.tasks import (PulpTask, register_sigterm_handler, Task, TaskResult,
get_current_task_id)
from pulp.server.config import config as pulp_conf
from pulp.server.constants import PULP_STREAM_REQUEST_HEADER
from pulp.server.content.sources.constants import MAX_CONCURRENT, HEADERS, SSL_VALIDATION
from pulp.server.controllers import consumer as consumer_controller
from pulp.server.controllers import distributor as dist_controller
from pulp.server.controllers import importer as importer_controller
from pulp.server.db import connection, model
from pulp.server.db.model.repository import (
RepoContentUnit, RepoSyncResult, RepoPublishResult)
from pulp.server.exceptions import PulpCodedTaskException
from pulp.server.lazy import URL, Key
from pulp.server.managers import factory as manager_factory
from pulp.server.managers.repo import _common as common_utils
from pulp.server.util import InvalidChecksumType
_logger = logging.getLogger(__name__)
PATH_DOWNLOADED = 'downloaded'
CATALOG_ENTRY = 'catalog_entry'
UNIT_ID = 'unit_id'
TYPE_ID = 'type_id'
UNIT_FILES = 'unit_files'
REQUEST = 'request'
def get_associated_unit_ids(repo_id, unit_type, repo_content_unit_q=None):
"""
Return a generator of unit IDs within the given repo that match the type and query
:param repo_id: ID of the repo whose units should be queried
:type repo_id: str
:param unit_type: ID of the unit type which should be retrieved
:type unit_type: str
:param repo_content_unit_q: any additional filters that should be applied to the
RepositoryContentUnit search
:type repo_content_unit_q: mongoengine.Q
:return: generator of unit IDs
:rtype: generator
"""
qs = model.RepositoryContentUnit.objects(q_obj=repo_content_unit_q,
repo_id=repo_id,
unit_type_id=unit_type)
for assoc in qs.only('unit_id').as_pymongo():
yield assoc['unit_id']
def get_unit_model_querysets(repo_id, model_class, repo_content_unit_q=None):
"""
Return a generator of mongoengine.queryset.QuerySet objects that collectively represent the
units in the specified repo that are of the type corresponding to the model_class and that
match the optional query.
Results are broken up into multiple QuerySet objects, because units are requested by their ID,
and we do not want to exceed the maximum size for a single query by stuffing too many IDs in one
QuerySet object.
You are welcome and encouraged to convert the return value into one generator of ContentUnit
objects by using itertools.chain()
:param repo_id: ID of the repo whose units should be queried
:type repo_id: str
:param model_class: a subclass of ContentUnit that defines a unit model
:type model_class: pulp.server.db.model.ContentUnit
:param repo_content_unit_q: any additional filters that should be applied to the
RepositoryContentUnit search
:type repo_content_unit_q: mongoengine.Q
:return: generator of mongoengine.queryset.QuerySet
:rtype: generator
"""
for chunk in paginate(get_associated_unit_ids(repo_id,
model_class._content_type_id.default,
repo_content_unit_q)):
yield model_class.objects(id__in=chunk)
def get_repo_unit_type_ids(repo_id):
"""
Retrieve all the content unit type ids associated with a given repository.
:param repo_id: ID of the repo whose unit models should be retrieved.
:type repo_id: str
:return: A list of content unit type ids
:rtype: list of str
"""
unit_type_ids = model.RepositoryContentUnit.objects(
repo_id=repo_id).distinct('unit_type_id')
return unit_type_ids
def get_repo_unit_models(repo_id):
"""
Retrieve all the MongoEngine models for units in a given repository. If a unit
type is in the repository and does not have a MongoEngine model, that unit type
is excluded from the returned list.
:param repo_id: ID of the repo whose unit models should be retrieved.
:type repo_id: str
:return: A list of sub-classes of ContentUnit that define a unit model.
:rtype: list of pulp.server.db.model.ContentUnit
"""
unit_types = get_repo_unit_type_ids(repo_id)
unit_models = [plugin_api.get_unit_model_by_id(type_id) for type_id in unit_types]
# Filter any non-MongoEngine content types.
return filter(None, unit_models)
def get_mongoengine_unit_querysets(repo_id, repo_content_unit_q=None, file_units=False):
"""
Retrieve an iterable of QuerySets for all the units in a repository that have
MongoEngine models. If a unit type is in the repository and does not have a
MongoEngine model, that unit type is excluded from the iterable.
:param repo_id: The ID of the repo whose units should be queried
:type repo_id: str
:param repo_content_unit_q: Any additional filters that should be applied to the
RepositoryContentUnit search
:type repo_content_unit_q: mongoengine.Q
:param file_units: Retrieve QuerySets exclusively for units inheriting
from pulp.server.db.model.FileContentUnit.
:type file_units: bool
:return: A generator of query sets.
:rtype: generator of mongoengine.queryset.QuerySet
"""
unit_models = get_repo_unit_models(repo_id)
if file_units:
unit_models = filter(lambda m: issubclass(m, model.FileContentUnit), unit_models)
for unit_model in unit_models:
query_sets = get_unit_model_querysets(repo_id, unit_model, repo_content_unit_q)
for query_set in query_sets:
yield query_set
def find_repo_content_units(
repository, repo_content_unit_q=None,
units_q=None, unit_fields=None, limit=None, skip=None,
yield_content_unit=False):
"""
Search content units associated with a given repository.
If yield_content_unit is not specified, or is set to false, then the RepositoryContentUnit
representing the association will be returned with an attribute "unit" set to the actual
ContentUnit. If yield_content_unit is set to true then the ContentUnit will be yielded instead
of the RepoContentUnit.
:param repository: The repository to search.
:type repository: pulp.server.db.model.Repository
:param repo_content_unit_q: Any query filters to apply to the RepoContentUnits.
:type repo_content_unit_q: mongoengine.Q
:param units_q: Any query filters to apply to the ContentUnits.
:type units_q: mongoengine.Q
:param unit_fields: List of fields to fetch for the unit objects, defaults to all fields.
:type unit_fields: List of str
:param limit: The maximum number of items to return for the given query.
:type limit: int
:param skip: The starting offset.
:type skip: int
:param yield_content_unit: Whether we should yield a ContentUnit or RepositoryContentUnit.
If True then a ContentUnit will be yielded. Defaults to False
:type yield_content_unit: bool
:return: Content unit assoociations matching the query.
:rtype: generator of pulp.server.db.model.ContentUnit or
pulp.server.db.model.RepositoryContentUnit
"""
qs = model.RepositoryContentUnit.objects(q_obj=repo_content_unit_q,
repo_id=repository.repo_id)
type_map = {}
content_units = {}
yield_count = 1
skip_count = 0
for repo_content_unit in qs:
id_set = type_map.setdefault(repo_content_unit.unit_type_id, set())
id_set.add(repo_content_unit.unit_id)
content_unit_set = content_units.setdefault(repo_content_unit.unit_type_id, dict())
content_unit_set[repo_content_unit.unit_id] = repo_content_unit
for unit_type, unit_ids in type_map.iteritems():
_model = plugin_api.get_unit_model_by_id(unit_type)
# do chunks with izip_longest which zips together passed arguments
# so from x1,x2,x3 ... xn arguments it returns [x1[0],x2[0],x3[0],..xn[0]] every iteration
# until there's item at least in one of the x. Passing arugments as iter() * X will produces
# zipping from same iterator object thus iterating over the same source.
# So putting everything together behaves as source [x1,x2,x3,x4,...] is splitted
# into chunks of size 1000
for ids_chunk in izip_longest(*[iter(unit_ids)] * 1000):
ids_chunk = filter(lambda x: x, ids_chunk)
qs = _model.objects(q_obj=units_q,
__raw__={'_id': {'$in': list(ids_chunk)}})
if unit_fields:
qs = qs.only(*unit_fields)
# for performace reasons, do server-side counting of units the will be returned in
# result before we do actual query to db
# if expected count of units in result is zero, continue to next chunk of unit_ids
if qs.count() == 0:
continue
for unit in qs:
if skip and skip_count < skip:
skip_count += 1
continue
if yield_content_unit:
yield unit
else:
cu = content_units[unit_type][unit.id]
cu.unit = unit
yield cu
if limit:
if yield_count >= limit:
return
yield_count += 1
def find_units_not_downloaded(repo_id):
"""
Find content units that have not been fully downloaded.
:param repo_id: ID of the repo whose units should be retrieved.
:type repo_id: str
:return: The requested units.
:rtype: generator
"""
query_sets = get_mongoengine_unit_querysets(repo_id, file_units=True)
query_sets = [q(downloaded=False) for q in query_sets]
return chain(*query_sets)
def missing_unit_count(repo_id):
"""
Retrieve the number of units that have not been downloaded.
:param repo_id: ID of the repo to retrieve the missing unit count for.
:type repo_id: str
:return: Number of units that have a ``downloaded`` flag set to false.
:rtype: int
"""
query_sets = get_mongoengine_unit_querysets(repo_id, file_units=True)
return sum(query_set(downloaded=False).count() for query_set in query_sets)
def has_all_units_downloaded(repo_id):
"""
Get whether a repository contains units that have all been downloaded.
:param repo_id: ID of the repo to retrieve the missing unit count for.
:type repo_id: str
:return: True if no unit in the repository has the ``downloaded`` flag set
to False.
:rtype: bool
"""
for qs in get_mongoengine_unit_querysets(repo_id, file_units=True):
if qs(downloaded=False).count():
return False
return True
def rebuild_content_unit_counts(repository):
"""
Update the content_unit_counts field on a Repository.
:param repository: The repository to update
:type repository: pulp.server.db.model.Repository
"""
db = connection.get_database()
pipeline = [
{'$match': {'repo_id': repository.repo_id}},
{'$group': {'_id': '$unit_type_id', 'sum': {'$sum': 1}}}]
q = db.repo_content_units.aggregate(pipeline=pipeline)
# Flip this into the form that we need
counts = {}
for result in q:
counts[result['_id']] = result['sum']
repository.content_unit_counts = counts
repository.save()
def associate_single_unit(repository, unit):
"""
Associate a single unit to a repository.
:param repository: The repository to update.
:type repository: pulp.server.db.model.Repository
:param unit: The unit to associate to the repository.
:type unit: pulp.server.db.model.ContentUnit
"""
current_timestamp = dateutils.now_utc_timestamp()
formatted_datetime = dateutils.format_iso8601_utc_timestamp(current_timestamp)
qs = model.RepositoryContentUnit.objects(
repo_id=repository.repo_id,
unit_id=unit.id,
unit_type_id=unit._content_type_id)
qs.update_one(
set_on_insert__created=formatted_datetime,
set__updated=formatted_datetime,
upsert=True)
def disassociate_units(repository, unit_iterable):
"""
Disassociate all units in the iterable from the repository.
Update `last_unit_removed` timestamp for the repository if needed.
:param repository: The repository to update.
:type repository: pulp.server.db.model.Repository
:param unit_iterable: The units to disassociate from the repository.
:type unit_iterable: iterable of pulp.server.db.model.ContentUnit
"""
# track if units are removed so last_unit_removed is only updated when units are removed
units_removed = 0
for unit_group in paginate(unit_iterable):
unit_id_list = [unit.id for unit in unit_group]
qs = model.RepositoryContentUnit.objects(
repo_id=repository.repo_id, unit_id__in=unit_id_list)
# queryset delete returns the number of records deleted
units_removed += qs.delete()
if units_removed:
update_last_unit_removed(repository.repo_id)
def create_repo(repo_id, display_name=None, description=None, notes=None, importer_type_id=None,
importer_repo_plugin_config=None, distributor_list=None):
"""
Create a repository and add importers and distributors if they are specified. If there are any
issues adding any of the importers or distributors, the repo will be deleted and the exceptions
will be reraised.
Multiple distributors can be created in this call. Each distributor is specified as a dict with
the following keys:
distributor_type - ID of the type of distributor being added
distributor_config - values sent to the distributor when used by this repository
auto_publish - boolean indicating if the distributor should automatically publish with every
sync; defaults to False
distributor_id - used to refer to the distributor later; if omitted, one will be generated
:param repo_id: unique identifier for the repo
:type repo_id: str
:param display_name: user-friendly name for the repo
:type display_name: str
:param description: user-friendly text describing the repo's contents
:type description: str
:param notes: key-value pairs to programmatically tag the repo
:type notes: dict
:param importer_type_id: if specified, an importer with this type ID will be added to the repo
:type importer_type_id: str
:param importer_repo_plugin_config: configuration values for the importer, may be None
:type importer_repo_plugin_config: dict
:param distributor_list: iterable of distributor dicts to add; more details above
:type distributor_list: list or tuple
:raises DuplicateResource: if there is already a repo with the requested ID
:raises InvalidValue: if any of the fields are invalid
:return: created repository object
:rtype: pulp.server.db.model.Repository
"""
# Prevalidation.
if not isinstance(distributor_list, (list, tuple, type(None))):
raise pulp_exceptions.InvalidValue(['distributor_list'])
if not all(isinstance(distributor, dict) for distributor in distributor_list or []):
raise pulp_exceptions.InvalidValue(['distributor_list'])
# Note: the repo must be saved before the importer and distributor controllers can be called
# because the first thing that they do is validate that the repo exists.
repo = model.Repository(repo_id=repo_id, display_name=display_name, description=description,
notes=notes)
try:
repo.save()
except NotUniqueError:
raise pulp_exceptions.DuplicateResource(repo_id)
except ValidationError, e:
raise pulp_exceptions.InvalidValue(e.to_dict().keys())
# Add the importer. Delete the repository if this fails.
if importer_type_id is not None:
try:
importer_controller.set_importer(repo_id, importer_type_id, importer_repo_plugin_config)
except Exception:
_logger.exception(
'Exception adding importer to repo [%s]; the repo will be deleted' % repo_id)
repo.delete()
raise
# Add the distributors. Delete the repository if this fails.
for distributor in distributor_list or []:
type_id = distributor.get('distributor_type_id')
plugin_config = distributor.get('distributor_config')
auto_publish = distributor.get('auto_publish', False)
dist_id = distributor.get('distributor_id')
try:
dist_controller.add_distributor(repo_id, type_id, plugin_config, auto_publish, dist_id)
except Exception:
_logger.exception('Exception adding distributor to repo [%s]; the repo will be '
'deleted' % repo_id)
repo.delete()
raise
return repo
def queue_delete(repo_id):
"""
Dispatch the task to delete the specified repository.
:param repo_id: id of the repository to delete
:type repo_id: str
:return: A TaskResult with the details of any errors or spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
"""
task_tags = [
tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo_id),
tags.action_tag('delete')
]
async_result = delete.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repo_id,
[repo_id], tags=task_tags)
return async_result
def get_importer_by_id(object_id):
"""
Get a plugin, call configuration, and Importer document object using the document ID
of the repository-importer association document.
:param object_id: The document ID.
:type object_id: str
:return: A tuple of:
(pulp.plugins.importer.Importer, pulp.plugins.config.PluginCallConfiguration,
pulp.server.db.model.Importer)
:rtype: tuple
:raise pulp.plugins.loader.exceptions.PluginNotFound: not found.
"""
try:
object_id = ObjectId(object_id)
except InvalidId:
raise plugin_exceptions.PluginNotFound()
try:
document = model.Importer.objects.get(id=object_id)
except DoesNotExist:
raise plugin_exceptions.PluginNotFound()
plugin, cfg = plugin_api.get_importer_by_id(document.importer_type_id)
call_conf = PluginCallConfiguration(cfg, document.config)
return plugin, call_conf, document
@celery.task(base=Task, name='pulp.server.tasks.repository.delete')
def delete(repo_id):
"""
Delete a repository and inform other affected collections.
:param repo_id: id of the repository to delete.
:type repo_id: str
:raise pulp_exceptions.PulpExecutionException: if any part of the process fails; the exception
will contain information on which sections failed
:return: A TaskResult object with the details of any errors or spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
"""
# With so much going on during a delete, it's possible that a few things could go wrong while
# others are successful. We track lesser errors that shouldn't abort the entire process until
# the end and then raise an exception describing the incompleteness of the delete. The exception
# arguments are captured as the second element in the tuple, but the user will have to look at
# the server logs for more information.
error_tuples = [] # tuple of failed step and exception arguments
# Inform the importer
repo_importer = model.Importer.objects(repo_id=repo_id).first()
if repo_importer is not None:
try:
importer_controller.remove_importer(repo_id)
except Exception, e:
_logger.exception('Error received removing importer [%s] from repo [%s]' % (
repo_importer.importer_type_id, repo_id))
error_tuples.append(e)
# Inform all distributors
for distributor in model.Distributor.objects(repo_id=repo_id):
try:
dist_controller.delete(distributor.repo_id, distributor.distributor_id)
except Exception, e:
_logger.exception('Error received removing distributor [%s] from repo [%s]' % (
distributor.id, repo_id))
error_tuples.append(e)
# Database Updates
repo = model.Repository.objects.get_repo_or_missing_resource(repo_id)
repo.delete()
try:
# Remove all importers and distributors from the repo. This is likely already done by the
# calls to other methods in this manager, but in case those failed we still want to attempt
# to keep the database clean.
model.Distributor.objects(repo_id=repo_id).delete()
model.Importer.objects(repo_id=repo_id).delete()
RepoSyncResult.get_collection().remove({'repo_id': repo_id})
RepoPublishResult.get_collection().remove({'repo_id': repo_id})
RepoContentUnit.get_collection().remove({'repo_id': repo_id})
except Exception, e:
msg = _('Error updating one or more database collections while removing repo [%(r)s]')
msg = msg % {'r': repo_id}
_logger.exception(msg)
error_tuples.append(e)
# remove the repo from any groups it was a member of
group_manager = manager_factory.repo_group_manager()
group_manager.remove_repo_from_groups(repo_id)
if len(error_tuples) > 0:
pe = pulp_exceptions.PulpExecutionException()
pe.child_exceptions = error_tuples
raise pe
# append unbind itineraries foreach bound consumer
options = {}
consumer_bind_manager = manager_factory.consumer_bind_manager()
additional_tasks = []
errors = []
for bind in consumer_bind_manager.find_by_repo(repo_id):
try:
report = consumer_controller.unbind(bind['consumer_id'], bind['repo_id'],
bind['distributor_id'], options)
if report:
additional_tasks.extend(report.spawned_tasks)
except Exception, e:
errors.append(e)
error = None
if len(errors) > 0:
error = pulp_exceptions.PulpCodedException(error_codes.PLP0007, repo_id=repo_id)
error.child_exceptions = errors
return TaskResult(error=error, spawned_tasks=additional_tasks)
def update_repo_and_plugins(repo, repo_delta, importer_config, distributor_configs):
"""
Update a repository and its related collections.
All details do not need to be specified; if a piece is omitted it's configuration is not
touched, nor is it removed from the repository. The same holds true for the distributor_configs
dict, not every distributor must be represented.
This call will attempt to update the repository object, then the importer, then the
distributors. If an exception occurs during any of these steps, the updates stop and the
exception is immediately raised. Any updates that have already taken place are not rolled back.
Distributor updates are asynchronous as there could be a very large number of consumers to
update. Repository and importer updates are done synchronously.
:param repo: repository object
:type repo: pulp.server.db.model.Repository
:param repo_delta: list of attributes to change and their new values; if None, no attempt to
update the repository object will be made
:type repo_delta: dict, None
:param importer_config: new configuration to use for the repo's importer; if None, no attempt
will be made to update the importer
:type importer_config: dict, None
:param distributor_configs: mapping of distributor ID to the new configuration to set for it
:type distributor_configs: dict, None
:return: Task result that contains the updated repository object and additional spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
:raises pulp_exceptions.InvalidValue: if repo_delta is not a dictionary
"""
if repo_delta:
if isinstance(repo_delta, dict):
repo.update_from_delta(repo_delta)
repo.save()
else:
raise pulp_exceptions.PulpCodedValidationException(
error_code=error_codes.PLP1010, field='delta', field_type='dict', value=repo_delta)
if importer_config is not None:
importer_controller.update_importer_config(repo.repo_id, importer_config)
additional_tasks = []
if distributor_configs is not None:
for dist_id, dist_config in distributor_configs.items():
task_tags = [
tags.resource_tag(tags.RESOURCE_REPOSITORY_TYPE, repo.repo_id),
tags.resource_tag(tags.RESOURCE_REPOSITORY_DISTRIBUTOR_TYPE,
dist_id),
tags.action_tag(tags.ACTION_UPDATE_DISTRIBUTOR)
]
async_result = dist_controller.update.apply_async_with_reservation(
tags.RESOURCE_REPOSITORY_TYPE, repo.repo_id,
[repo.repo_id, dist_id, dist_config, None], tags=task_tags)
additional_tasks.append(async_result)
return TaskResult(repo, None, additional_tasks)
def update_unit_count(repo_id, unit_type_id, delta):
"""
Updates the total count of units associated with the repo. Each repo has an attribute
'content_unit_counts' which is a dict where keys are content type IDs and values are the
number of content units of that type in the repository.
example: {'rpm': 12, 'srpm': 3}
:param repo_id: identifies the repo
:type repo_id: str
:param unit_type_id: identifies the unit type to update
:type unit_type_id: str
:param delta: amount by which to increment the total count
:type delta: int
:raises pulp_exceptions.PulpExecutionException: if there is an error in the update
"""
atomic_inc_key = 'inc__content_unit_counts__{unit_type_id}'.format(unit_type_id=unit_type_id)
if delta:
try:
model.Repository.objects(repo_id=repo_id).update_one(**{atomic_inc_key: delta})
except OperationError:
message = 'There was a problem updating repository %s' % repo_id
raise pulp_exceptions.PulpExecutionException(message), None, sys.exc_info()[2]
def update_last_unit_added(repo_id):
"""
Updates the UTC date record on the repository for the time the last unit was added.
:param repo_id: identifies the repo
:type repo_id: str
"""
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
repo_obj.last_unit_added = dateutils.now_utc_datetime_with_tzinfo()
repo_obj.save()
def update_last_unit_removed(repo_id):
"""
Updates the UTC date record on the repository for the time the last unit was removed.
:param repo_id: identifies the repo
:type repo_id: str
"""
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
repo_obj.last_unit_removed = dateutils.now_utc_datetime_with_tzinfo()
repo_obj.save()
def update_last_unit_added_for_unit(unit_id, unit_type_id):
"""
Updates the UTC date record for the time the last unit was added on all
repositories containing a given unit.
This method is intended for use when a unit has been mutated in such
a way that the unit should be considered as re-added to all containing
repos (e.g. publish of repos should not be skipped).
It's safe to call the method without holding a lock on the repos.
:param unit_id: ID of a unit
:type unit_id: str
:param unit_type_id: type ID of a unit
:type unit_type_id: str
"""
now = dateutils.now_utc_datetime_with_tzinfo()
repo_units = model.RepositoryContentUnit.objects(
unit_id=unit_id,
unit_type_id=unit_type_id)
repo_ids = [assoc.repo_id for assoc in repo_units]
model.Repository.objects(repo_id__in=repo_ids).update(last_unit_added=now)
@celery.task(base=PulpTask, name='pulp.server.tasks.repository.sync_with_auto_publish')
def queue_sync_with_auto_publish(repo_id, overrides=None, scheduled_call_id=None):
"""
Sync a repository and upon successful completion, publish any distributors that are configured
for auto publish.
:param repo_id: id of the repository to create a sync call request list for
:type repo_id: str
:param overrides: dictionary of configuration overrides for this sync
:type overrides: dict or None
:param scheduled_call_id: id of scheduled call that dispatched this task
:type scheduled_call_id: str
:return: result containing the details of the task executed and any spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
"""
kwargs = {'repo_id': repo_id, 'sync_config_override': overrides,
'scheduled_call_id': scheduled_call_id}
tags = [resource_tag(RESOURCE_REPOSITORY_TYPE, repo_id), action_tag('sync')]
result = sync.apply_async_with_reservation(RESOURCE_REPOSITORY_TYPE, repo_id, tags=tags,
kwargs=kwargs)
return result
@celery.task(base=Task, name='pulp.server.managers.repo.sync.sync')
def sync(repo_id, sync_config_override=None, scheduled_call_id=None):
"""
Performs a synchronize operation on the given repository and triggers publishes for
distributors with auto-publish enabled.
The given repo must have an importer configured. This method is intentionally limited to
synchronizing a single repo. Performing multiple repository syncs concurrently will require a
more global view of the server and must be handled outside the scope of this class.
:param repo_id: identifies the repo to sync
:type repo_id: str
:param sync_config_override: optional config containing values to use for this sync only
:type sync_config_override: dict
:param scheduled_call_id: id of scheduled call that dispatched this task
:type scheduled_call_id: str
:return: TaskResult containing sync results and a list of spawned tasks
:rtype: pulp.server.async.tasks.TaskResult
:raise pulp_exceptions.MissingResource: if specified repo does not exist, or it does not have
an importer and associated plugin
:raise pulp_exceptions.PulpExecutionException: if the task fails.
"""
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id)
transfer_repo = repo_obj.to_transfer_repo()
repo_importer = model.Importer.objects.get_or_404(repo_id=repo_id)
try:
importer, imp_config = plugin_api.get_importer_by_id(repo_importer.importer_type_id)
except plugin_exceptions.PluginNotFound:
raise pulp_exceptions.MissingResource(repository=repo_id)
importer_config = importer_controller.clean_config_dict(copy.deepcopy(repo_importer.config))
call_config = PluginCallConfiguration(imp_config, importer_config, sync_config_override)
transfer_repo.working_dir = common_utils.get_working_directory()
conduit = RepoSyncConduit(repo_id, repo_importer.importer_type_id, repo_importer.id)
sync_result_collection = RepoSyncResult.get_collection()
# Fire an events around the call
fire_manager = manager_factory.event_fire_manager()
fire_manager.fire_repo_sync_started(repo_id)
before_sync_unit_count = model.RepositoryContentUnit.objects(repo_id=repo_id).count()
# Perform the sync
sync_start_timestamp = _now_timestamp()
sync_result = None
try:
# Replace the Importer's sync_repo() method with our register_sigterm_handler decorator,
# which will set up cancel_sync_repo() as the target for the signal handler
sync_repo = register_sigterm_handler(importer.sync_repo, importer.cancel_sync_repo)
sync_report = sync_repo(transfer_repo, conduit, call_config)
except Exception, e:
sync_end_timestamp = _now_timestamp()
sync_result = RepoSyncResult.error_result(
repo_obj.repo_id, repo_importer['id'], repo_importer['importer_type_id'],
sync_start_timestamp, sync_end_timestamp, e, sys.exc_info()[2])
raise
else:
# Need to be safe here in case the plugin is incorrect in its return
if isinstance(sync_report, SyncReport):
summary = sync_report.summary
details = sync_report.details
if sync_report.canceled_flag:
# need to leave this in case cancel_sync_repo() was not called from parent
result_code = RepoSyncResult.RESULT_CANCELED
elif sync_report.success_flag:
result_code = RepoSyncResult.RESULT_SUCCESS
else:
result_code = RepoSyncResult.RESULT_FAILED
else:
msg = _('Plugin type [%s] on repo [%s] did not return a valid sync report')
_logger.warn(msg % (repo_importer['importer_type_id'], repo_obj.repo_id))
summary = details = msg
result_code = RepoSyncResult.RESULT_ERROR # RESULT_UNKNOWN?
sync_result, sync_end_timestamp = _reposync_result(repo_obj, repo_importer,
sync_start_timestamp, summary, details,
result_code, before_sync_unit_count)
finally:
if sync_result is None:
msg = _('Sync was cancelled')
summary = details = msg
result_code = RepoSyncResult.RESULT_CANCELED
sync_result, sync_end_timestamp = _reposync_result(repo_obj, repo_importer,
sync_start_timestamp, summary,
details, result_code,
before_sync_unit_count)
# Update the override config if it has changed
if check_override_config_change(repo_id, call_config):
model.Importer.objects(repo_id=repo_id).\
update(set__last_override_config=call_config.override_config)
# Do an update instead of a save in case the importer has changed the scratchpad
model.Importer.objects(repo_id=repo_obj.repo_id).update(set__last_sync=sync_end_timestamp)
# Add a sync history entry for this run
sync_result_collection.save(sync_result)
# Ensure counts are updated
rebuild_content_unit_counts(repo_obj)
if sync_result['added_count'] > 0:
update_last_unit_added(repo_obj.repo_id)
if sync_result['removed_count'] > 0:
update_last_unit_removed(repo_obj.repo_id)
fire_manager.fire_repo_sync_finished(sync_result)
if sync_result.result == RepoSyncResult.RESULT_FAILED:
raise pulp_exceptions.PulpExecutionException(_('Importer indicated a failed response'))
spawned_tasks = _queue_auto_publish_tasks(repo_obj.repo_id, scheduled_call_id=scheduled_call_id)
download_policy = call_config.get(importer_constants.DOWNLOAD_POLICY)
if download_policy == importer_constants.DOWNLOAD_BACKGROUND:
spawned_tasks.append(queue_download_repo(repo_obj.repo_id).task_id)
return TaskResult(sync_result, spawned_tasks=spawned_tasks)
def check_unit_removed_since_last_sync(conduit, repo_id):
"""
Checks whether a content unit has been removed since the last_sync timestamp.
:param conduit: allows the plugin to interact with core pulp
:type conduit: pulp.plugins.conduits.repo_sync.RepoSyncConduit
:param repo_id: identifies the repo to sync
:type repo_id: str
:return: Whether a content unit has been removed since the last_sync timestamp
:rtype: bool
"""
last_sync = conduit.last_sync()
if last_sync is None:
return False
# convert the iso8601 datetime string to a python datetime object
last_sync = dateutils.parse_iso8601_datetime(last_sync)
repo_obj = model.Repository.objects.get_repo_or_missing_resource(repo_id=repo_id)
last_removed = repo_obj.last_unit_removed
# check if a unit has been removed since the past sync
if last_removed is not None:
if last_removed > last_sync:
return True
return False
def check_config_updated_since_last_sync(conduit, repo_id):
"""
Checks whether the config has been changed since the last sync occurred.
:param conduit: allows the plugin to interact with core pulp
:type conduit: pulp.plugins.conduits.repo_sync.RepoSyncConduit
:param repo_id: identifies the repo to sync
:type repo_id: str
:return: Whether the config has been changed since the last sync occurred.
:rtype: bool
"""
last_sync = conduit.last_sync()
if last_sync is None:
return False
# convert the iso8601 datetime string to a python datetime object
last_sync = dateutils.parse_iso8601_datetime(last_sync)
repo_importer = model.Importer.objects.get_or_404(repo_id=repo_id)
# the timestamp of the last configuration change
last_updated = repo_importer.last_updated
# check if a configuration change occurred after the most recent sync
if last_updated is not None:
if last_sync < last_updated:
return True
return False
def check_override_config_change(repo_id, call_config):
"""
Checks whether the override config is different from the override config on
the previous sync.
:param repo_id: identifies the repo to sync
:type repo_id: str
:param call_config: Plugin Call Configuration
:type call_config: pulp.plugins.config.PluginCallConfiguration
:return: Whether the override config is different from the override config on
the previous sync.
:rtype: bool
"""
repo_importer = model.Importer.objects.get_or_404(repo_id=repo_id)
# check if the override config is different from the last override config,
# excluding the 'force_full' key. otherwise using the --force-full flag
# would always trigger a full sync on the next sync too
prev_config = repo_importer.last_override_config.copy()
current_config = call_config.override_config.copy()
prev_config.pop('force_full', False)
current_config.pop('force_full', False)
return prev_config != current_config
def check_perform_full_sync(repo_id, conduit, call_config):
"""
Performs generic checks to determine if the sync should be a "full" sync.
Checks if the "force full" flag has been set, if content has been removed
since the last sync, and whether the configuration has changed since the
last sync (including override configs).
Plugins may want to perform additional checks beyond the ones appearing here.
:param repo_id: identifies the repo to sync
:type repo_id: str
:param conduit: allows the plugin to interact with core pulp
:type conduit: pulp.plugins.conduits.repo_sync.RepoSyncConduit
:param call_config: Plugin Call Configuration
:type call_config: pulp.plugins.config.PluginCallConfiguration
:return: Whether a full sync needs to be performed
:rtype: bool
"""
force_full = call_config.get('force_full', False)
first_sync = conduit.last_sync() is None
content_removed = check_unit_removed_since_last_sync(conduit, repo_id)
config_changed = check_config_updated_since_last_sync(conduit, repo_id)
override_config_changed = check_override_config_change(repo_id, call_config)
if force_full:
_logger.info(_("Fully resyncing due to use of force_full in config"))
return force_full or first_sync or content_removed or config_changed or override_config_changed
def _reposync_result(repo, importer, sync_start, summary, details, result_code, initial_unit_count):
"""
Creates repo sync result.
:param repo_obj: repository object