Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Don't iterate all lazy content every time we re-migrate
Browse files Browse the repository at this point in the history
Push some of our Python-level checks into database query logic to
significantly reduce the # of individual queries we make and the amount
of content we need to iterate over / prefetch for.

closes: #6111
https://pulp.plan.io/issues/6111
  • Loading branch information
dralley committed Aug 3, 2020
1 parent 8ec0360 commit c430b4c
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGES/6111.bugfix
@@ -0,0 +1 @@
Significantly improved performance of partial migrations (when some content / repos has been migrated already).
70 changes: 38 additions & 32 deletions pulp_2to3_migration/app/plugin/content.py
Expand Up @@ -7,11 +7,11 @@

from gettext import gettext as _

from cursor_pagination import CursorPaginator
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.paginator import Paginator
from django.db import transaction
from django.db.models import Prefetch
from django.db.models import Prefetch, Q

from pulpcore.app.models import storage
from pulpcore.plugin.models import (
Expand Down Expand Up @@ -86,28 +86,28 @@ async def create(self):
await pipeline


def page_queryset(queryset, per_page=2000):
def chunked_queryset_iterator(queryset, size, *, ordering=('pk',)):
"""
Yield items from a queryset, but break it up into pages behind the scenes.
In a perfect world you would use queryset.iterator(), but there is the issue that it fails to
load any prefetch_related() fields specified. Using Paginator() we can mimic the same
functionality.
Primarily a workaround for the fact that .iterator() and .prefetch_related() are incompatible.
> Note that if you use iterator() to run the query, prefetch_related() calls will be ignored
> since these two optimizations do not make sense together.
Code from: https://blog.labdigital.nl/working-with-huge-data-sets-in-django-169453bca049
https://docs.djangoproject.com/en/2.0/ref/models/querysets/
Caveat:
The ordering must uniquely identify the object, and be in the same order (ASC/DESC).
"""
if queryset._prefetch_related_lookups:
if not queryset.query.order_by:
# Paginator() throws a warning if there is no sorting attached to the queryset
queryset = queryset.order_by('pk')
paginator = Paginator(queryset, per_page)
for index in range(paginator.num_pages):
yield from paginator.get_page(index + 1)
else:
yield from queryset.iterator(chunk_size=per_page)
pager = CursorPaginator(queryset, ordering)
after = None
while True:
page = pager.page(after=after, first=size)
if page:
yield from page.items
else:
return
if not page.has_next:
break # take last item, next page starts after this.
after = pager.cursor(instance=page[-1])


class ContentMigrationFirstStage(Stage):
Expand Down Expand Up @@ -227,10 +227,16 @@ def get_remote_by_importer_id(importer_id):
has_future = content_type in self.migrator.future_types
is_multi_artifact = content_type in self.migrator.multi_artifact_types

# we need to go through all lazy content in case any of Remotes changed
if is_lazy_type:
pulp_2to3_detail_qs = content_model.objects.all()
# go through all of the content that haven't been migrated OR have been migrated
# but have new lazy catalog entries.
units_with_new_lces = Pulp2LazyCatalog.objects.filter(
is_migrated=False).values('pulp2_unit_id').distinct()
already_migrated = ~Q(pulp2content__pulp3_content=None)
no_new_lces = ~Q(pulp2content__pulp2_id__in=units_with_new_lces)
pulp_2to3_detail_qs = content_model.objects.exclude(already_migrated & no_new_lces)
else:
# go through all of the content that haven't been migrated
pulp_2to3_detail_qs = content_model.objects.filter(pulp2content__pulp3_content=None)

# order by pulp2_repo if it's set
Expand All @@ -243,11 +249,16 @@ def get_remote_by_importer_id(importer_id):
code='migrating.{}.content'.format(self.migrator.pulp2_plugin),
total=pulp_2to3_detail_qs.count()
) as pb:
chunked_iterator = page_queryset(
# Warning: It's dangerous to save records of the type of pulp_2to3_detail_qs
# while using this iterator due to the need for globally-accurate ordering.
# We're using the PK which is not an incrementing integer so that doesn't hold
# true. However, at this point, all records have already been saved so we are safe.
chunked_iterator = chunked_queryset_iterator(
pulp_2to3_detail_qs.prefetch_related(
Prefetch('pulp2content'),
Prefetch('pulp2content__pulp3_content')
)
),
2000
)

for pulp_2to3_detail_content in chunked_iterator:
Expand All @@ -259,13 +270,6 @@ def get_remote_by_importer_id(importer_id):
pulp2lazycatalog = Pulp2LazyCatalog.objects.filter(
pulp2_unit_id=pulp2content.pulp2_id, is_migrated=False)

lazy_wo_catalog = is_lazy_type and not pulp2lazycatalog
if pulp2content.pulp3_content is not None and \
(not is_lazy_type or lazy_wo_catalog):
if pb:
pb.total -= 1
pb.save()
continue
if is_lazy_type and not pulp2content.downloaded and not pulp2lazycatalog:
_logger.warn(_('On_demand content cannot be migrated without an entry in the '
'lazy catalog, pulp2 unit_id: '
Expand Down Expand Up @@ -299,6 +303,7 @@ def get_remote_by_importer_id(importer_id):
downloaded=downloaded)
else:
artifact = Artifact()

lces = pulp2lazycatalog.filter(pulp2_storage_path=image_path)
if lces:
for lce in lces:
Expand Down Expand Up @@ -398,12 +403,13 @@ def get_remote_by_importer_id(importer_id):
await self.put(dc)
future_relations.update({'lces': list(pulp2lazycatalog)})
else:
relative_path = (
pulp_2to3_detail_content.relative_path_for_content_artifact
)
da = DeclarativeArtifact(
artifact=artifact,
url=NOT_USED,
relative_path=(
pulp_2to3_detail_content.relative_path_for_content_artifact
),
relative_path=relative_path,
remote=None,
deferred_download=False)
dc = DeclarativeContent(content=pulp3content, d_artifacts=[da])
Expand Down
7 changes: 2 additions & 5 deletions pulp_2to3_migration/app/pre_migration.py
Expand Up @@ -3,10 +3,7 @@
from collections import namedtuple

from django.db import transaction
from django.db.models import (
Max,
Q,
)
from django.db.models import Max, Q
from django.utils import timezone

from mongoengine.queryset.visitor import Q as mongo_Q
Expand Down Expand Up @@ -264,7 +261,7 @@ def pre_migrate_lazycatalog(content_type):
save_batch = (i and not (i + 1) % batch_size or i == total_lce - 1)
if save_batch:
Pulp2LazyCatalog.objects.bulk_create(pulp2lazycatalog, ignore_conflicts=True)
pulp2lazycatalog = []
pulp2lazycatalog.clear()


def pre_migrate_all_without_content(plan, type_to_repo_ids, repo_id_to_type):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
@@ -1,3 +1,4 @@
django-cursor-pagination
pulpcore>=3.4
mongoengine
semantic_version
Expand Down

0 comments on commit c430b4c

Please sign in to comment.