Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Optimize content migration queries to avoid sorts
Browse files Browse the repository at this point in the history
* Use select_related instead of prefetch_related to get everything in
  one query
* Use .iterator() instead of chunked_iterator() which required an
  additional sort operation

The result is not much faster on my dev machine, but much faster on the
Katello box w/ HDD.

closes: #7699
https://pulp.plan.io/issues/7699
  • Loading branch information
dralley committed Oct 13, 2020
1 parent fe34e10 commit 9997909
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 50 deletions.
1 change: 1 addition & 0 deletions CHANGES/7699.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Made content migration significantly faster on low-spec machines w/ HDD backed database storage.
60 changes: 15 additions & 45 deletions pulp_2to3_migration/app/plugin/content.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,10 @@

from gettext import gettext as _

from cursor_pagination import CursorPaginator
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.db.models import Prefetch, Q
from django.db.models import Q

from pulpcore.app.models import storage
from pulpcore.plugin.models import (
Expand Down Expand Up @@ -89,30 +88,6 @@ async def create(self):
await pipeline


def chunked_queryset_iterator(queryset, size, *, ordering=('pk',)):
"""
Yield items from a queryset, but break it up into pages behind the scenes.
Primarily a workaround for the fact that .iterator() and .prefetch_related() are incompatible.
Code from: https://blog.labdigital.nl/working-with-huge-data-sets-in-django-169453bca049
Caveat:
The ordering must uniquely identify the object, and be in the same order (ASC/DESC).
"""
pager = CursorPaginator(queryset, ordering)
after = None
while True:
page = pager.page(after=after, first=size)
if page:
yield from page.items
else:
return
if not page.has_next:
break # take last item, next page starts after this.
after = pager.cursor(instance=page[-1])


class ContentMigrationFirstStage(Stage):
"""
The first stage of a content migration pipeline.
Expand Down Expand Up @@ -251,22 +226,16 @@ def get_remote_by_importer_id(importer_id):
code='migrating.{}.content'.format(self.migrator.pulp2_plugin),
total=pulp_2to3_detail_qs.count()
) as pb:
prefetch_args = [
Prefetch('pulp2content'),
Prefetch('pulp2content__pulp3_content'),
select_extra = [
'pulp2content',
'pulp2content__pulp3_content',
]

if content_model.set_pulp2_repo:
prefetch_args.append(Prefetch('pulp2content__pulp2_repo'))
# Warning: It's dangerous to save records of the type of pulp_2to3_detail_qs
# while using this iterator due to the need for globally-accurate ordering.
# We're using the PK which is not an incrementing integer so that doesn't hold
# true. However, at this point, all records have already been saved so we are safe.
chunked_iterator = chunked_queryset_iterator(
pulp_2to3_detail_qs.prefetch_related(*prefetch_args),
2000
)
select_extra.append('pulp2content__pulp2_repo')

for pulp_2to3_detail_content in chunked_iterator:
pulp_2to3_detail_qs = pulp_2to3_detail_qs.select_related(*select_extra)
for pulp_2to3_detail_content in pulp_2to3_detail_qs.iterator():
dc = None
pulp2content = pulp_2to3_detail_content.pulp2content
# only content that supports on_demand download can have entries in LCE
Expand All @@ -277,22 +246,23 @@ def get_remote_by_importer_id(importer_id):
is_migrated=False,
)

if is_lazy_type and not pulp2content.downloaded and not pulp2lazycatalog:
_logger.warn(_('On_demand content cannot be migrated without an entry in the '
'lazy catalog, pulp2 unit_id: '
'{}'.format(pulp2content.pulp2_id)))
continue
if not pulp2content.downloaded and not pulp2lazycatalog:
_logger.warn(_(
'On_demand content cannot be migrated without an entry in the '
'lazy catalog, pulp2 unit_id: {}'.format(pulp2content.pulp2_id))
)
continue

if pulp2content.pulp3_content is not None and is_lazy_type and pulp2lazycatalog:
# find already created pulp3 content
pulp3content = pulp2content.pulp3_content
extra_info = None
if is_multi_artifact:
extra_info = pulp_2to3_detail_content.get_treeinfo_serialized()

else:
# create pulp3 content and assign relations if present
pulp3content, extra_info = pulp_2to3_detail_content.create_pulp3_content()

future_relations = {'pulp2content': pulp2content}
if extra_info:
future_relations.update(extra_info)
Expand Down
8 changes: 4 additions & 4 deletions pulp_2to3_migration/app/pre_migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,8 +309,7 @@ def pre_migrate_lazycatalog(content_type):
pulp2lazycatalog = []

mongo_lce_qs = LazyCatalogEntry.objects(unit_type_id=content_type)
total_lce = mongo_lce_qs.count()
for i, lce in enumerate(mongo_lce_qs.batch_size(batch_size).as_pymongo().no_cache()):
for lce in mongo_lce_qs.batch_size(batch_size).as_pymongo().no_cache():
item = Pulp2LazyCatalog(pulp2_importer_id=lce['importer_id'],
pulp2_unit_id=lce['unit_id'],
pulp2_content_type_id=lce['unit_type_id'],
Expand All @@ -320,10 +319,11 @@ def pre_migrate_lazycatalog(content_type):
is_migrated=False)
pulp2lazycatalog.append(item)

save_batch = (i and not (i + 1) % batch_size or i == total_lce - 1)
if save_batch:
if len(pulp2lazycatalog) > batch_size:
Pulp2LazyCatalog.objects.bulk_create(pulp2lazycatalog, ignore_conflicts=True)
pulp2lazycatalog.clear()
else:
Pulp2LazyCatalog.objects.bulk_create(pulp2lazycatalog, ignore_conflicts=True)


def pre_migrate_all_without_content(plan, type_to_repo_ids, repo_id_to_type):
Expand Down
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
django-cursor-pagination
pulpcore>=3.6
mongoengine
semantic_version
Expand Down

0 comments on commit 9997909

Please sign in to comment.