Skip to content
This repository has been archived by the owner on Dec 7, 2022. It is now read-only.

Commit

Permalink
Flatten queries for content unit removal
Browse files Browse the repository at this point in the history
https://pulp.plan.io/issues/4549

Replace the recursive pattern with a fixed number of larger queries.

Additionally, reorder the content removal to "top down". This will fail
more gracefully; failure leaves orphans (safe) rather than user-facing
unlinked content (unsafe). This requires the additional plugin step of
removing the explicitly given content, which is normally handled by pulp
platform.

A side effect of this change is the correction of a bug that did not
remove shared content, even if all linked content is removed.
https://pulp.plan.io/issues/5161

fixes #5161
fixes #4549
  • Loading branch information
asmacdo committed Jul 30, 2019
1 parent 69bf508 commit 76f5894
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 354 deletions.
273 changes: 136 additions & 137 deletions plugins/pulp_docker/plugins/importers/importer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from gettext import gettext as _
from collections import defaultdict
import logging

from pulp.common.config import read_json_config
Expand Down Expand Up @@ -400,14 +401,19 @@ def validate_config(self, repo, config):

def remove_units(self, repo, units, config):
"""
Removes content units from the given repository.
Removes (unassociates) content units from the given repository.
This method also removes tags associated with Images, Tags associated with Manifests,
unreferenced Blobs associated with Image Manifests, unreferenced Image Manifests
associated with Manifest Lists.
This method also removes content units recursively:
- tags associated with explicitly removed Images, Manifests and Manifest Lists
- Manifests associated with explicitly removed Manifest Lists (not associated with
remaining Manifest Lists)
- Blobs (and config blobs) associated with explictly removed Manifest
- Blobs (and config blobs) indirectly associated to Manifest Lists
This call will not result in the units being deleted from Pulp itself, except for Tags since
they are repository specific.
The actual removal is "top down", meaning that we remove user facing content first, then
its related content, then the content related to that. This is done because if the function
fails, it will leave unlinked content (safe) instead of user-facing content without it's
linked content (unsafe).
:param repo: metadata describing the repository
:type repo: pulp.plugins.model.Repository
Expand All @@ -417,134 +423,118 @@ def remove_units(self, repo, units, config):
:param config: plugin configuration
:type config: pulp.plugins.config.PluginCallConfiguration
"""
unit_removers = {
models.Image: DockerImporter._remove_image,
models.Manifest: DockerImporter._remove_manifest,
models.ManifestList: DockerImporter._remove_manifest_list,
models.Tag: DockerImporter._remove_tag
}

map((lambda u: type(u) in unit_removers and unit_removers[type(u)](
repo.repo_obj, u)), units)

@staticmethod
def _remove_image(repo, image):
"""
Purge tags associated with a given Image in the repository.

:param repo: The affected repository.
:type repo: pulp.server.db.model.Repository
:param image: The Image being removed
:type image: pulp_docker.plugins.models.Image
"""
tags = repo.scratchpad.get(u'tags', [])
for tag_dict in tags[:]:
if tag_dict[constants.IMAGE_ID_KEY] == image.image_id:
tags.remove(tag_dict)

repo.scratchpad[u'tags'] = tags
repo.save()

@classmethod
def _remove_manifest(cls, repo, manifest):
"""
Purge Tags and Blobs associated with a given Manifest in the repository.
:param repo: The affected repository.
:type repo: pulp.server.db.model.Repository
:param manifest: The Manifest being removed
:type manifest: pulp_docker.plugins.models.Manifest
"""
cls._purge_unlinked_tags(repo, manifest)
cls._purge_unlinked_blobs(repo, manifest)
type_dict = defaultdict(set)
manifest_list_digests = set()
manifest_digests = set()
all_unit_ids = []
for unit in units:
if type(unit) is models.ManifestList:
manifest_list_digests.add(unit.digest)
if type(unit) is models.Manifest:
manifest_digests.add(unit.digest)
type_dict[type(unit)].add(unit.id)
all_unit_ids.append(unit.id)

# Start by removing the highest level (therefore safest) units: tags
all_removed_digests = list(manifest_list_digests) + list(manifest_digests)
self._purge_unlinked_tags(repo.repo_obj, all_removed_digests)

# Remove the units explicitly given by the user. This is normally handled by platform after
# the plugin is finished. However, this is unsafe for docker content units, because in the
# event of an interruption, the linked units would be removed leaving user-accessible
# content which references content that has been removed. By removing these units now, we
# can guarantee that if there is an interruption, we will fail safely by leaving orphans
# instead of invalid content.
unit_filter = {'_id': {'$in': sorted(all_unit_ids)}}
criteria = UnitAssociationCriteria(
unit_filters=unit_filter)
manager = manager_factory.repo_unit_association_manager()
manager.unassociate_by_criteria(
repo_id=repo.repo_obj.repo_id,
criteria=criteria,
notify_plugins=False,
)

@classmethod
def _remove_manifest_list(cls, repo, manifest_list):
"""
Purge Tags and image manifests associated with a given ManifestList in the repository.
# These manifests are safe to purge before tags, if they are tagged, they are not removed.
removed_unlinked_manifest_ids = self._purge_unlinked_manifests(
repo.repo_obj,
list(type_dict[models.ManifestList])
)

:param repo: The affected repository.
:type repo: pulp.server.db.model.Repository
:param manifest_list: The ManifestList being removed
:type manifest_list: pulp_docker.plugins.models.ManifestList
"""
cls._purge_unlinked_tags(repo, manifest_list)
cls._purge_unlinked_manifests(repo, manifest_list)
type_dict[models.Manifest] |= removed_unlinked_manifest_ids
self._purge_unlinked_blobs(repo.repo_obj, list(type_dict[models.Manifest]))

@classmethod
def _remove_tag(cls, repo, tag):
@staticmethod
def _purge_unlinked_manifests(repo, manifest_list_pks):
"""
Tags are repository-specific, so when they are removed, they should also be deleted.
Remove manifests related to given manifest lists, otherwise unlinked in the repository.
:param repo: unused
:param repo: Repository to remove manifests from
:type repo: pulp.server.db.model.Repository
:param tag: Tag to be deleted
:type tag: pulp_docker.plugins.models.Tag
"""
tag.delete()

@staticmethod
def _purge_unlinked_manifests(repo, manifest_list):

# Find manifest digests referenced by removed manifest lists (orphaned)
orphaned = set()
for image_man in manifest_list.manifests:
orphaned.add(image_man.digest)
:param manifest_list_pks: Retrieve manifests that are associated with these manifest_lists
:type manifest_lists: Set of manifest_list pks
:returns: manifests (by _id) unlinked by this function
:rtype: set
"""
# Retrieve the manifest digests unlinked by manifest_lists
manifest_lists = models.ManifestList.objects.filter(
pk__in=sorted(manifest_list_pks)
).only('manifests', 'amd64_digest')
possibly_unlinked_manifest_digests = set()
for manifest_list in manifest_lists:
for image_man in manifest_list.manifests:
possibly_unlinked_manifest_digests.add(image_man.digest)
if manifest_list.amd64_digest:
orphaned.add(manifest_list.amd64_digest)
if not orphaned:
# nothing orphaned
return

# Find manifest digests still referenced by other manifest lists (adopted)
adopted = set()
criteria = UnitAssociationCriteria(type_ids=[constants.MANIFEST_LIST_TYPE_ID],
unit_filters={'digest': {'$ne': manifest_list.digest}})
possibly_unlinked_manifest_digests.add(manifest_list.amd64_digest)
if not possibly_unlinked_manifest_digests:
return set()

# Find manifest digests still referenced by other manifest lists in the repo
criteria = UnitAssociationCriteria(
type_ids=[constants.MANIFEST_LIST_TYPE_ID],
unit_filters={'_id': {'$nin': sorted(manifest_list_pks)}}
)
for man_list in unit_association.RepoUnitAssociationManager._units_from_criteria(
repo, criteria):
for image_man in man_list.manifests:
adopted.add(image_man.digest)
possibly_unlinked_manifest_digests.discard(image_man.digest)
if man_list.amd64_digest:
adopted.add(man_list.amd64_digest)

# Remove unreferenced manifests
orphaned = orphaned.difference(adopted)
if not orphaned:
# all adopted
return
possibly_unlinked_manifest_digests.discard(man_list.amd64_digest)
if not possibly_unlinked_manifest_digests:
return set()

# Check if those manifests have tags, tagged manifests cannot be removed
criteria = UnitAssociationCriteria(
type_ids=[constants.TAG_TYPE_ID],
unit_filters={'manifest_digest': {'$in': list(orphaned)},
unit_filters={'manifest_digest': {'$in': sorted(possibly_unlinked_manifest_digests)},
'manifest_type': constants.MANIFEST_IMAGE_TYPE})
for tag in unit_association.RepoUnitAssociationManager._units_from_criteria(
repo, criteria):
orphaned.remove(tag.manifest_digest)

unit_filter = {
'digest': {
'$in': sorted(orphaned)
}
}

possibly_unlinked_manifest_digests.discard(tag.manifest_digest)

removed_unlinked_manifest_ids = list(
models.Manifest.objects.filter(
digest__in=sorted(possibly_unlinked_manifest_digests)
).distinct("_id")
)
unit_filter = {'_id': {'$in': sorted(removed_unlinked_manifest_ids)}}
criteria = UnitAssociationCriteria(
type_ids=[constants.MANIFEST_TYPE_ID],
unit_filters=unit_filter)
manager = manager_factory.repo_unit_association_manager()
manager.unassociate_by_criteria(
repo_id=repo.repo_id,
criteria=criteria,
notify_plugins=False)

for manifest in models.Manifest.objects.filter(digest__in=sorted(orphaned)):
DockerImporter._purge_unlinked_blobs(repo, manifest)
notify_plugins=False,
)
return set(removed_unlinked_manifest_ids)

@staticmethod
def _purge_unlinked_tags(repo, manifest):
def _purge_unlinked_tags(repo, digests):
"""
Purge Tags associated with the given Manifest (image or list) in the repository.
We don't want to leave Tags that reference Manifests (image or lists) that no longer exist.
Unassociate and delete tags that point to the manifest[list] digests.
:param repo: The affected repository.
:type repo: pulp.server.db.model.Repository
Expand All @@ -553,7 +543,7 @@ def _purge_unlinked_tags(repo, manifest):
"""
# Find Tag objects that reference the removed Manifest. We can remove any such Tags from
# the repository, and from Pulp as well (since Tag objects are repository specific).
unit_filter = {'manifest_digest': manifest.digest}
unit_filter = {'manifest_digest': {"$in": digests}}
criteria = UnitAssociationCriteria(
type_ids=[constants.TAG_TYPE_ID],
unit_filters=unit_filter)
Expand All @@ -564,10 +554,10 @@ def _purge_unlinked_tags(repo, manifest):
notify_plugins=False)
# Finally, we can remove the Tag objects from Pulp entirely, since Tags are repository
# specific.
models.Tag.objects.filter(repo_id=repo.repo_id, manifest_digest=manifest.digest).delete()
models.Tag.objects.filter(repo_id=repo.repo_id, manifest_digest=digests).delete()

@staticmethod
def _purge_unlinked_blobs(repo, manifest):
def _purge_unlinked_blobs(repo, manifest_pks):
"""
Purge blobs associated with the given Manifests when removing it would leave them no longer
referenced by any remaining Manifests.
Expand All @@ -577,37 +567,46 @@ def _purge_unlinked_blobs(repo, manifest):
:param units: List of removed units.
:type units: list of: pulp.plugins.model.AssociatedUnit
"""
# Find blob digests referenced by removed manifests (orphaned)
orphaned = set()
map((lambda layer: orphaned.add(layer.blob_sum)), manifest.fs_layers)
# in manifest schema version 2 there is an additional blob layer called config_layer
if manifest.config_layer:
orphaned.add(manifest.config_layer)
if not orphaned:
# nothing orphaned
return

# Find blob digests still referenced by other manifests (adopted)
adopted = set()
criteria = UnitAssociationCriteria(type_ids=[constants.MANIFEST_TYPE_ID],
unit_filters={'digest': {'$ne': manifest.digest}})
possibly_unlinked_blob_digests = set()
manifests = models.Manifest.objects.filter(
pk__in=sorted(manifest_pks)
) .only('fs_layers', 'config_layer')
for manifest in manifests:
# in manifest schema version 2 there is an additional blob layer called config_layer
if manifest.config_layer:
possibly_unlinked_blob_digests.add(manifest.config_layer)
for layer in manifest.fs_layers:
possibly_unlinked_blob_digests.add(layer.blob_sum)

if not possibly_unlinked_blob_digests:
return set()

criteria = UnitAssociationCriteria(
type_ids=[constants.MANIFEST_TYPE_ID],
unit_filters={'_id': {'$nin': sorted(manifest_pks)},
'fs_layers.blob_sum': {'$in': sorted(possibly_unlinked_blob_digests)}},
unit_fields=["fs_layers.blob_sum"]
)

for manifest in unit_association.RepoUnitAssociationManager._units_from_criteria(
repo, criteria):
map((lambda layer: adopted.add(layer.blob_sum)), manifest.fs_layers)
if manifest.config_layer:
adopted.add(manifest.config_layer)

# Remove unreferenced blobs
orphaned = orphaned.difference(adopted)
if not orphaned:
# all adopted
return

unit_filter = {
'digest': {
'$in': sorted(orphaned)
}
}
for layer in manifest.fs_layers:
possibly_unlinked_blob_digests.discard(layer.blob_sum)

criteria = UnitAssociationCriteria(
type_ids=[constants.MANIFEST_TYPE_ID],
unit_filters={'_id': {'$nin': sorted(manifest_pks)},
'config_layer': {'$in': sorted(possibly_unlinked_blob_digests)}},
unit_fields=["config_layer"]
)
for manifest in unit_association.RepoUnitAssociationManager._units_from_criteria(
repo, criteria):
possibly_unlinked_blob_digests.discard(manifest.config_layer)

if not possibly_unlinked_blob_digests:
return set()

unit_filter = {'digest': {'$in': sorted(possibly_unlinked_blob_digests)}}
criteria = UnitAssociationCriteria(
type_ids=[constants.BLOB_TYPE_ID],
unit_filters=unit_filter)
Expand Down
Loading

0 comments on commit 76f5894

Please sign in to comment.