Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Support for storing units in multiple collections
Browse files Browse the repository at this point in the history
The following is added:
 - load of auxiliary models
 - post-delete method during orphan removal
 - serialization in profiler to have all the data
   in a proper shape for applicability calculation

Also migration progress logging util is added

re #2681
https://pulp.plan.io/issues/2681
  • Loading branch information
goosemania committed Jun 10, 2017
1 parent 764fe65 commit 630193f
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 7 deletions.
2 changes: 1 addition & 1 deletion common/pulp/common/error_codes.py
Expand Up @@ -101,7 +101,7 @@
"PLP0037",
_("Content import of %(path)s failed - must be an existing file."),
['path'])
PLP0038 = Error("PLP0038", _("The unit model with id %(model_id)s and class "
PLP0038 = Error("PLP0038", _("The model with id %(model_id)s and class "
"%(model_class)s failed to register. Another model has already "
"been registered with the same id."), ['model_id', 'model_class'])
PLP0040 = Error("PLP0040", _("Database 'seeds' config must include at least one hostname:port "
Expand Down
3 changes: 3 additions & 0 deletions server/pulp/plugins/conduits/profiler.py
Expand Up @@ -56,6 +56,7 @@ def get_repo_units(self, repo_id, content_type_id, additional_unit_fields=None):
additional_unit_fields = additional_unit_fields or []
try:
unit_key_fields = units_controller.get_unit_key_fields_for_type(content_type_id)
serializer = units_controller.get_model_serializer_for_type(content_type_id)

# Query repo association manager to get all units of given type
# associated with given repo. Limit data by requesting only the fields
Expand All @@ -69,6 +70,8 @@ def get_repo_units(self, repo_id, content_type_id, additional_unit_fields=None):
# Convert units to plugin units with unit_key and required metadata values for each unit
all_units = []
for unit in units:
if serializer:
serializer.serialize(unit['metadata'])
unit_key = {}
metadata = {}
for k in unit_key_fields:
Expand Down
28 changes: 26 additions & 2 deletions server/pulp/plugins/loader/manager.py
Expand Up @@ -13,6 +13,7 @@
_logger = logging.getLogger(__name__)

ENTRY_POINT_UNIT_MODELS = 'pulp.unit_models'
ENTRY_POINT_AUXILIARY_MODELS = 'pulp.auxiliary_models'


class PluginManager(object):
Expand All @@ -29,9 +30,11 @@ def __init__(self):
self.profilers = _PluginMap()
self.catalogers = _PluginMap()
self.unit_models = dict()
self.auxiliary_models = dict()

# Load the unit models
self._load_unit_models()
self._load_auxiliary_models()

def _load_unit_models(self):
""""
Expand All @@ -40,8 +43,8 @@ def _load_unit_models(self):
Attach the signals to the models here since the mongoengine signals will not be
sent correctly if they are attached to the base class.
:raises: TypeError if a model is not a subclass of ContentUnit
:raises: PLP0038 if two models are defined with the same id
:raises TypeError: if a model is not a subclass of ContentUnit
:raises PulpCodedException: PLP0038 if two models are defined with the same id
"""
_logger.debug(_("Loading Unit Models"))
for entry_point in pkg_resources.iter_entry_points(ENTRY_POINT_UNIT_MODELS):
Expand All @@ -66,6 +69,27 @@ def _load_unit_models(self):

_logger.debug(_("Unit Model Loading Completed"))

def _load_auxiliary_models(self):
"""
Load all auxiliary models from the ENTRY_POINT_AUXILIARY_MODELS.
Auxiliary models are plugin specific but they are not unit models themselves.
:raises PulpCodedException: PLP0038 if two models are defined with the same id
"""
_logger.debug(_("Loading Auxiliary Models"))
for entry_point in pkg_resources.iter_entry_points(ENTRY_POINT_AUXILIARY_MODELS):
msg = _('Loading auxiliary model: %s' % str(entry_point))
_logger.info(msg)
model_id = entry_point.name
model_class = entry_point.load()
class_name = model_class.__class__.__module__ + "." + model_class.__class__.__name__
if model_id in self.auxiliary_models:
raise PulpCodedException(error_code=error_codes.PLP0038,
model_id=model_id,
model_class=class_name)
self.auxiliary_models[model_id] = model_class
_logger.debug(_("Auxiliary Model Loading Completed"))


class _PluginMap(object):
"""
Expand Down
5 changes: 5 additions & 0 deletions server/pulp/server/db/manage.py
Expand Up @@ -184,6 +184,11 @@ def ensure_database_indexes():
model_class._build_index_specs(model_class._meta['indexes'])
model_class.ensure_indexes()

for model_type, model_class in plugin_manager.auxiliary_models.items():
model_class._meta['index_specs'] = \
model_class._build_index_specs(model_class._meta['indexes'])
model_class.ensure_indexes()


def main():
"""
Expand Down
61 changes: 61 additions & 0 deletions server/pulp/server/db/migrations/lib/utils.py
@@ -0,0 +1,61 @@
"""
Utils for migrations.
"""

import logging

_logger = logging.getLogger(__name__)


MIGRATION_HEADER_MSG = '* Migrating %s content...'
MIGRATION_PROGRESS_MSG = '* Migrated units: %s of %s'
STARS = '*' * 79


class MigrationProgressLog(object):
"""
Context manager that logs every 10% of migration completion.
:ivar migrated_units: number of already migrated units
:type migrated_units: int
:ivar content_type: name of the content to be migrated
:type content_type: str
:ivar total_units: total number of units to be migrated
:type total_units: int
:ivar header_msg: message printed at the beginning of migration
:type header_msg: str
"""
migrated_units = 0

def __init__(self, content_type, total_units, msg=MIGRATION_HEADER_MSG):
self.content_type = content_type
self.total_units = total_units
self.msg = msg

def __enter__(self):
"""
Log a message indicating a start of migration process for a specific content.
"""
if self.total_units:
_logger.info(STARS)
_logger.info(self.msg % self.content_type)

return self

def progress(self, msg=MIGRATION_PROGRESS_MSG):
"""
Count migrated units and logs progress every 10%.
Expected to be called for every migrated unit.
"""
self.migrated_units += 1
another_ten_percent_completed = self.total_units >= 10 and \
not self.migrated_units % (self.total_units // 10)
all_units_migrated = self.migrated_units == self.total_units
if another_ten_percent_completed or all_units_migrated:
_logger.info(msg % (self.migrated_units, self.total_units))

def __exit__(self, *exc):
"""
Print footer (or delimiter) to indicate the end of the content unit migration.
"""
_logger.info(STARS)
27 changes: 23 additions & 4 deletions server/pulp/server/managers/content/orphan.py
Expand Up @@ -224,10 +224,16 @@ def delete_orphans_by_type(content_type_id, content_unit_ids=None):
"""

content_units_collection = content_types_db.type_units_collection(content_type_id)
content_model = plugin_api.get_unit_model_by_id(content_type_id)

try:
unit_key_fields = units_controller.get_unit_key_fields_for_type(content_type_id)
except ValueError:
raise MissingResource(content_type_id=content_type_id)

fields = ('_id', '_storage_path') + unit_key_fields
count = 0
for content_unit in OrphanManager.generate_orphans_by_type(content_type_id,
fields=['_id', '_storage_path']):
for content_unit in OrphanManager.generate_orphans_by_type(content_type_id, fields=fields):

if content_unit_ids is not None and content_unit['_id'] not in content_unit_ids:
continue
Expand All @@ -238,6 +244,9 @@ def delete_orphans_by_type(content_type_id, content_unit_ids=None):
).delete()
content_units_collection.remove(content_unit['_id'])

if hasattr(content_model, 'do_post_delete_actions'):
content_model.do_post_delete_actions(content_unit)

storage_path = content_unit.get('_storage_path', None)
if storage_path is not None:
OrphanManager.delete_orphaned_file(storage_path)
Expand All @@ -261,14 +270,20 @@ def delete_orphan_content_units_by_type(type_id, content_unit_ids=None):
"""
# get the model matching the type
content_model = plugin_api.get_unit_model_by_id(type_id)
try:
unit_key_fields = units_controller.get_unit_key_fields_for_type(type_id)
except ValueError:
raise MissingResource(content_type_id=type_id)

fields = ('id', '_storage_path') + unit_key_fields
if content_unit_ids:
query_sets = []
for page in plugin_misc.paginate(content_unit_ids):
qs = content_model.objects(id__in=page).only('id', '_storage_path')
qs = content_model.objects(id__in=page).only(*fields)
query_sets.append(qs)
content_units = itertools.chain(*query_sets)
else:
content_units = content_model.objects.only('id', '_storage_path')
content_units = content_model.objects.only(*fields)

count = 0

Expand All @@ -294,6 +309,10 @@ def delete_orphan_content_units_by_type(type_id, content_unit_ids=None):
unit_type_id=str(type_id)
).delete()
unit_to_delete.delete()

if hasattr(content_model, 'do_post_delete_actions'):
content_model.do_post_delete_actions(unit_to_delete)

if unit_to_delete._storage_path:
OrphanManager.delete_orphaned_file(unit_to_delete._storage_path)
count += 1
Expand Down

0 comments on commit 630193f

Please sign in to comment.