Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Only go through the changed repositories on the re-run.
Browse files Browse the repository at this point in the history
Also improved how repo type is determined for complex plans.
There is no need to look at repo-content relations in such case becasue the plugin type is clear from the plan.

closes #7779
https://pulp.plan.io/issues/7779
  • Loading branch information
goosemania committed Feb 18, 2021
1 parent 29d4b51 commit bc4ab3a
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGES/7779.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed the re-run times when repositories/importers/distributors haven't changed much since the last run.
42 changes: 26 additions & 16 deletions pulp_2to3_migration/app/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def get_repo_types(plan):
"""
repo_id_to_type = {}
type_to_repo_ids = defaultdict(set)
is_simple_plan = False

# mapping content type -> plugin/repo type, e.g. 'docker_blob' -> 'docker'
content_type_to_plugin = {}
Expand All @@ -41,6 +42,12 @@ def get_repo_types(plan):
for content_type in plugin.migrator.pulp2_content_models:
content_type_to_plugin[content_type] = plugin.migrator.pulp2_plugin

# if any of the plugin plans is empty, we'd need to go through all repo content relations
# to determine repo types correctly.
if plugin.empty:
is_simple_plan = True
continue

repos = set(plugin.get_repositories())
repos |= set(plugin.get_importers_repos())
repos |= set(plugin.get_distributors_repos())
Expand All @@ -49,22 +56,25 @@ def get_repo_types(plan):
repo_id_to_type[repo] = plugin.type
type_to_repo_ids[plugin.type].update(repos)

# TODO: optimizations.
# It looks at each content at the moment. Potential optimizations:
# - This is a big query, paginate?
# - Filter by repos from the plan
# - Query any but one record for a repo
for rec in RepositoryContentUnit.objects().\
only('repo_id', 'unit_type_id').as_pymongo().no_cache():
repo_id = rec['repo_id']
unit_type_id = rec['unit_type_id']

# a type for a repo is already known or this content/repo type is not supported
if repo_id in repo_id_to_type or unit_type_id not in content_type_to_plugin:
continue
plugin_name = content_type_to_plugin[unit_type_id]
repo_id_to_type[repo_id] = plugin_name
type_to_repo_ids[plugin_name].add(repo_id)
# Go through repo content relations only when at least one of the plans is not complex,
# otherwise the type is determined by the plan in a much more efficient way.
if is_simple_plan:
# TODO: optimizations.
# It looks at each content at the moment. Potential optimizations:
# - This is a big query, paginate?
# - Filter by repos from the plan
# - Query any but one record for a repo
for rec in RepositoryContentUnit.objects().\
only('repo_id', 'unit_type_id').as_pymongo().no_cache():
repo_id = rec['repo_id']
unit_type_id = rec['unit_type_id']

# a type for a repo is already known or this content/repo type is not supported
if repo_id in repo_id_to_type or unit_type_id not in content_type_to_plugin:
continue
plugin_name = content_type_to_plugin[unit_type_id]
repo_id_to_type[repo_id] = plugin_name
type_to_repo_ids[plugin_name].add(repo_id)

return repo_id_to_type, type_to_repo_ids

Expand Down
91 changes: 73 additions & 18 deletions pulp_2to3_migration/app/pre_migration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging

from collections import namedtuple
from datetime import datetime

from django.db import transaction
from django.db.models import Max, Q
Expand Down Expand Up @@ -279,11 +280,18 @@ def pre_migrate_all_without_content(plan):
"""
Pre-migrate repositories, relations to their contents, importers and distributors.
NOTE: MongoDB and Django handle datetime fields differently. MongoDB doesn't care about
timezones and provides "naive" time, while Django is complaining about time without a timezone.
The problem is that naive time != time with specified timezone, that's why all the time for
MongoDB comparisons should be naive and all the time for Django/PostgreSQL should be timezone
aware.
Look at the last updated times in the pulp2to3 tables for repositories/importers/distributors:
* pulp2_last_unit_added or pulp2_last_unit_removed for repositories
* pulp2_last_updated for importers and distributors
Query empty-never-had-content repos (can't filter them out in any way) and repos for which
there were:
* content changes since the last run
* importer changes since the last run
* distributor changes since the last run
Query in order of last_unit_added for the case when pre-migration is interrupted before we are
done with repositories.
Args:
plan(MigrationPlan): A Migration Plan
Expand All @@ -296,27 +304,68 @@ def pre_migrate_all_without_content(plan):

for plugin_plan in plan.get_plugin_plans():
repos = plugin_plan.get_repositories()
# filter by repo type
repos_to_check = plan.type_to_repo_ids[plugin_plan.type]

mongo_repo_q = mongo_Q(repo_id__in=repos_to_check)
mongo_repo_qs = Repository.objects(mongo_repo_q)

pb.total += mongo_repo_qs.count()
pb.save()

importers_repos = plugin_plan.get_importers_repos()
distributors_repos = plugin_plan.get_distributors_repos()

distributor_migrators = plugin_plan.migrator.distributor_migrators
importer_types = list(plugin_plan.migrator.importer_migrators.keys())
distributor_migrators = plugin_plan.migrator.distributor_migrators
distributor_types = list(distributor_migrators.keys())

# figure out which repos/importers/distributors have been updated since the last run
epoch = datetime.utcfromtimestamp(0)
repo_type_q = Q(pulp2_repo_type=plugin_plan.type)
imp_type_q = Q(pulp2_type_id__in=importer_types)
dist_type_q = Q(pulp2_type_id__in=distributor_types)

plugin_pulp2repos = Pulp2Repository.objects.filter(repo_type_q)
repo_premigrated_last_by_added = plugin_pulp2repos.aggregate(
Max('pulp2_last_unit_added')
)['pulp2_last_unit_added__max'] or epoch
repo_premigrated_last_by_removed = plugin_pulp2repos.aggregate(
Max('pulp2_last_unit_removed')
)['pulp2_last_unit_removed__max'] or epoch
imp_premigrated_last = Pulp2Importer.objects.filter(imp_type_q).aggregate(
Max('pulp2_last_updated')
)['pulp2_last_updated__max'] or epoch
dist_premigrated_last = Pulp2Distributor.objects.filter(dist_type_q).aggregate(
Max('pulp2_last_updated')
)['pulp2_last_updated__max'] or epoch

is_content_added_q = mongo_Q(last_unit_added__gte=repo_premigrated_last_by_added)
is_content_removed_q = mongo_Q(last_unit_removed__gte=repo_premigrated_last_by_removed)
is_new_enough_repo_q = is_content_added_q | is_content_removed_q
is_empty_repo_q = mongo_Q(last_unit_added__exists=False)
is_new_enough_imp_q = mongo_Q(last_updated__gte=imp_premigrated_last)
is_new_enough_dist_q = mongo_Q(last_updated__gte=dist_premigrated_last)
repo_repo_id_q = mongo_Q(repo_id__in=repos)
imp_repo_id_q = mongo_Q(repo_id__in=importers_repos)
dist_repo_id_q = mongo_Q(repo_id__in=distributors_repos)

updated_importers = Importer.objects(
imp_repo_id_q & is_new_enough_imp_q
).only('repo_id')
updated_imp_repos = set(imp.repo_id for imp in updated_importers)
updated_distributors = Distributor.objects(
dist_repo_id_q & is_new_enough_dist_q
).only('repo_id')
updated_dist_repos = set(dist.repo_id for dist in updated_distributors)
updated_impdist_repos = updated_imp_repos | updated_dist_repos

mongo_updated_repo_q = repo_repo_id_q & (is_new_enough_repo_q | is_empty_repo_q)
mongo_updated_imp_dist_repo_q = mongo_Q(repo_id__in=updated_impdist_repos)

mongo_repo_qs = Repository.objects(
mongo_updated_repo_q | mongo_updated_imp_dist_repo_q
).order_by('last_unit_added')

pb.total += mongo_repo_qs.count()
pb.save()

for repo_data in mongo_repo_qs.only('id',
'repo_id',
'last_unit_added',
'last_unit_removed',
'description',
'notes'):
'description'):
repo = None
repo_id = repo_data.repo_id
with transaction.atomic():
Expand All @@ -329,13 +378,19 @@ def pre_migrate_all_without_content(plan):
if not repos or repos and distributors_repos:
pre_migrate_distributor(
repo_id, distributors_repos, distributor_migrators, repo)
pb.increment()
pb.increment()


def pre_migrate_repo(record, repo_id_to_type):
"""
Pre-migrate a pulp 2 repo.
NOTE: MongoDB and Django handle datetime fields differently. MongoDB doesn't care about
timezones and provides "naive" time, while Django is complaining about time without a timezone.
The problem is that naive time != time with specified timezone, that's why all the time for
MongoDB comparisons should be naive and all the time for Django/PostgreSQL should be timezone
aware.
Args:
record(Repository): Pulp 2 repository data
repo_id_to_type(dict): A mapping from a pulp 2 repo_id to pulp 2 repo types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,9 @@ def _test_pulp2repositories(self, plan):
Check correctness of the dara for the first repo in the list.
"""
self._load_and_run(plan)
pulp2repository = self.pulp2repositories_api.list(ordering='pulp2_id', limit=1).results[0]
pulp2repository = self.pulp2repositories_api.list(
ordering='pulp2_repo_id', limit=1
).results[0]
pulp3_repo = self.file_repo_api.read(pulp2repository.pulp3_repository_href)
pulp3_remote = self.file_remotes_api.read(pulp2repository.pulp3_remote_href)
pulp3_pub = self.file_publications_api.read(pulp2repository.pulp3_publication_href)
Expand Down Expand Up @@ -315,7 +317,7 @@ def test_importer_different_repo(self):
Importers are swapped in the plan.
"""
self._load_and_run(IMPORTER_DIFF_PLAN)
pulp2repositories = self.pulp2repositories_api.list(ordering='pulp2_id').results
pulp2repositories = self.pulp2repositories_api.list(ordering='pulp2_repo_id').results
pulp2repo1, pulp2repo2 = pulp2repositories
pulp3_remote1 = self.file_remotes_api.read(pulp2repo1.pulp3_remote_href)
pulp3_remote2 = self.file_remotes_api.read(pulp2repo2.pulp3_remote_href)
Expand All @@ -336,7 +338,7 @@ def test_distributor_different_repo(self):
Distributors are swapped in the plan.
"""
self._load_and_run(DISTRIBUTOR_DIFF_PLAN)
pulp2repositories = self.pulp2repositories_api.list(ordering='pulp2_id').results
pulp2repositories = self.pulp2repositories_api.list(ordering='pulp2_repo_id').results
pulp2repo1, pulp2repo2 = pulp2repositories
pulp3_pub1 = self.file_publications_api.read(pulp2repo1.pulp3_publication_href)
pulp3_pub2 = self.file_publications_api.read(pulp2repo2.pulp3_publication_href)
Expand Down

0 comments on commit bc4ab3a

Please sign in to comment.