diff --git a/CHANGES/7009.misc b/CHANGES/7009.misc new file mode 100644 index 00000000..95898e47 --- /dev/null +++ b/CHANGES/7009.misc @@ -0,0 +1 @@ +Do not create a child task for already migrated repos. diff --git a/CHANGES/8314.misc b/CHANGES/8314.misc new file mode 100644 index 00000000..8eb8eadb --- /dev/null +++ b/CHANGES/8314.misc @@ -0,0 +1 @@ +Make migration no-op if there are no changes in plan and everything is migrated. diff --git a/pulp_2to3_migration/app/migration.py b/pulp_2to3_migration/app/migration.py index da00ea14..158ab4d4 100644 --- a/pulp_2to3_migration/app/migration.py +++ b/pulp_2to3_migration/app/migration.py @@ -61,6 +61,14 @@ def migrate_repositories(plan): ) with ProgressReport(**progress_data) as pb: for plugin in plan.get_plugin_plans(): + # all pulp2 repos in current plan were already migrated, no need to proceed + not_migrated_repos = Pulp2Repository.objects.filter( + is_migrated=False, + not_in_plan=False, + pulp2_repo_type=plugin.type + ) + if not not_migrated_repos.exists(): + continue pulp2repos_qs = Pulp2Repository.objects.filter( pulp3_repository_version=None, @@ -234,6 +242,22 @@ def create_repoversions_publications_distributions(plan, parallel=True): parallel (bool): If True, attempt to migrate things in parallel where possible. """ for plugin in plan.get_plugin_plans(): + # verify whether all pulp2 repos and distributors have been migrated + not_migrated_repos = Pulp2Repository.objects.filter( + is_migrated=False, + not_in_plan=False, + pulp2_repo_type=plugin.type) + not_migrated_dists = Pulp2Distributor.objects.filter( + is_migrated=False, + not_in_plan=False, + pulp2_type_id__in=plugin.migrator.distributor_migrators.keys()) + # no need to proceed - everything is migrated + if not not_migrated_repos and not not_migrated_dists: + continue + not_migrated_repo_ids = not_migrated_repos.values_list('pulp2_repo_id', flat=True) + not_migrated_repo_ids_dists = not_migrated_dists.values_list('pulp2_repo_id', flat=True) + repos_ids_to_check = set(not_migrated_repo_ids).union(not_migrated_repo_ids_dists) + pulp3_repo_setup = plugin.get_repo_creation_setup() repo_ver_to_create = 0 @@ -242,22 +266,36 @@ def create_repoversions_publications_distributions(plan, parallel=True): if parallel: for repo_name in pulp3_repo_setup: repo_versions = pulp3_repo_setup[repo_name]['repository_versions'] - repo_ver_to_create += len(repo_versions) + needs_a_task = False for repo_ver in repo_versions: - dist_to_create += len(repo_ver['dist_repo_ids']) - repo = Repository.objects.get(name=repo_name).cast() - task_args = [plugin, pulp3_repo_setup, repo_name] - enqueue_with_reservation( - complex_repo_migration, - [repo], - args=task_args, - task_group=TaskGroup.current() - ) + repos = set(repo_ver['dist_repo_ids'] + [repo_ver['repo_id']]) + # check whether any resources are not migrated and need a task + if repos.intersection(repos_ids_to_check): + needs_a_task = True + dist_to_create += len(repo_ver['dist_repo_ids']) + if needs_a_task: + repo_ver_to_create += len(repo_versions) + repo = Repository.objects.get(name=repo_name).cast() + task_args = [plugin, pulp3_repo_setup, repo_name] + enqueue_with_reservation( + complex_repo_migration, + [repo], + args=task_args, + task_group=TaskGroup.current() + ) else: # Serial (non-parallel) for repo_name in pulp3_repo_setup: - task_args = [plugin, pulp3_repo_setup, repo_name] - complex_repo_migration(*task_args) + repo_versions = pulp3_repo_setup[repo_name]['repository_versions'] + needs_a_task = False + for repo_ver in repo_versions: + repos = set(repo_ver['dist_repo_ids'] + [repo_ver['repo_id']]) + # check whether any resources are not migrated and need a task + if repos.intersection(repos_ids_to_check): + needs_a_task = True + if needs_a_task: + task_args = [plugin, pulp3_repo_setup, repo_name] + complex_repo_migration(*task_args) task_group = TaskGroup.current() progress_rv = task_group.group_progress_reports.filter(code='create.repo_version') diff --git a/pulp_2to3_migration/app/models/base.py b/pulp_2to3_migration/app/models/base.py index b44a0746..a554cb52 100644 --- a/pulp_2to3_migration/app/models/base.py +++ b/pulp_2to3_migration/app/models/base.py @@ -349,6 +349,15 @@ def _parse_plugin_plan(self, repository_data): class RepoSetup(BaseModel): """ A model to reflect changes between previous and current migration plans. + + Fields: + pulp2_repo_id (models.TextField): pulp2_repo_id to migrate into a repo_version + pulp2_repo_type (models.CharField): pulp2 repo type + pulp2_resource_repo_id (models.TextField): pulp2_repo_id of the resource + (importer/distributor) to migrate + pulp2_resource_type (models.SmallIntegerField): pulp2 resource type - importer/distributor + status (models.SmallIntegerField): status of the record + """ OLD = 0 UP_TO_DATE = 1 diff --git a/pulp_2to3_migration/tests/functional/test_migration_plan.py b/pulp_2to3_migration/tests/functional/test_migration_plan.py index 74f4d56a..248660a4 100644 --- a/pulp_2to3_migration/tests/functional/test_migration_plan.py +++ b/pulp_2to3_migration/tests/functional/test_migration_plan.py @@ -1,11 +1,13 @@ import json +import time import unittest from pulpcore.client.pulp_2to3_migration.exceptions import ApiException -from pulp_2to3_migration.tests.functional.util import set_pulp2_snapshot +from pulp_2to3_migration.tests.functional.util import get_psql_smash_cmd, set_pulp2_snapshot from pulp_smash.pulp3.bindings import monitor_task, monitor_task_group, PulpTaskError from .common_plans import FILE_SIMPLE_PLAN, FILE_COMPLEX_PLAN +from .constants import TRUNCATE_TABLES_QUERY_BASH from .file_base import BaseTestFile EXTRA_COMMA_PLAN = '{"plugins": [{"type": "iso"},]}' @@ -44,6 +46,14 @@ def setUpClass(cls): super().setUpClass() set_pulp2_snapshot(name='file_base_4repos') + def tearDown(self): + """ + Clean up the database after each test. + """ + cmd = get_psql_smash_cmd(TRUNCATE_TABLES_QUERY_BASH) + self.smash_cli_client.run(cmd, sudo=True) + time.sleep(0.5) + def _do_test_parallel(self, plan, outcome): """Test that there were multiple tasks running in parallel as a part of a task group.""" mp = self.migration_plans_api.create({'plan': plan}) @@ -105,3 +115,8 @@ def test_simple_plan_parallel(self): def test_complex_plan_parallel(self): """Test that using a complex plan, there is work which is performed in parallel.""" self._do_test_parallel(FILE_COMPLEX_PLAN, 5) + + def test_optimized_tasks_rerun(self): + """Test that second run of the same plan with no changes at all triggers only one task""" + self._do_test_parallel(FILE_COMPLEX_PLAN, 5) + self._do_test_parallel(FILE_COMPLEX_PLAN, 1)