Skip to content
This repository has been archived by the owner on Jan 9, 2023. It is now read-only.

Commit

Permalink
Optimize tasks creations.
Browse files Browse the repository at this point in the history
  • Loading branch information
ipanova authored and goosemania committed Mar 9, 2021
1 parent 86f0f99 commit 9df621c
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 13 deletions.
1 change: 1 addition & 0 deletions CHANGES/7009.misc
@@ -0,0 +1 @@
Do not create a child task for already migrated repos.
1 change: 1 addition & 0 deletions CHANGES/8314.misc
@@ -0,0 +1 @@
Make migration no-op if there are no changes in plan and everything is migrated.
62 changes: 50 additions & 12 deletions pulp_2to3_migration/app/migration.py
Expand Up @@ -61,6 +61,14 @@ def migrate_repositories(plan):
)
with ProgressReport(**progress_data) as pb:
for plugin in plan.get_plugin_plans():
# all pulp2 repos in current plan were already migrated, no need to proceed
not_migrated_repos = Pulp2Repository.objects.filter(
is_migrated=False,
not_in_plan=False,
pulp2_repo_type=plugin.type
)
if not not_migrated_repos.exists():
continue

pulp2repos_qs = Pulp2Repository.objects.filter(
pulp3_repository_version=None,
Expand Down Expand Up @@ -234,6 +242,22 @@ def create_repoversions_publications_distributions(plan, parallel=True):
parallel (bool): If True, attempt to migrate things in parallel where possible.
"""
for plugin in plan.get_plugin_plans():
# verify whether all pulp2 repos and distributors have been migrated
not_migrated_repos = Pulp2Repository.objects.filter(
is_migrated=False,
not_in_plan=False,
pulp2_repo_type=plugin.type)
not_migrated_dists = Pulp2Distributor.objects.filter(
is_migrated=False,
not_in_plan=False,
pulp2_type_id__in=plugin.migrator.distributor_migrators.keys())
# no need to proceed - everything is migrated
if not not_migrated_repos and not not_migrated_dists:
continue
not_migrated_repo_ids = not_migrated_repos.values_list('pulp2_repo_id', flat=True)
not_migrated_repo_ids_dists = not_migrated_dists.values_list('pulp2_repo_id', flat=True)
repos_ids_to_check = set(not_migrated_repo_ids).union(not_migrated_repo_ids_dists)

pulp3_repo_setup = plugin.get_repo_creation_setup()

repo_ver_to_create = 0
Expand All @@ -242,22 +266,36 @@ def create_repoversions_publications_distributions(plan, parallel=True):
if parallel:
for repo_name in pulp3_repo_setup:
repo_versions = pulp3_repo_setup[repo_name]['repository_versions']
repo_ver_to_create += len(repo_versions)
needs_a_task = False
for repo_ver in repo_versions:
dist_to_create += len(repo_ver['dist_repo_ids'])
repo = Repository.objects.get(name=repo_name).cast()
task_args = [plugin, pulp3_repo_setup, repo_name]
enqueue_with_reservation(
complex_repo_migration,
[repo],
args=task_args,
task_group=TaskGroup.current()
)
repos = set(repo_ver['dist_repo_ids'] + [repo_ver['repo_id']])
# check whether any resources are not migrated and need a task
if repos.intersection(repos_ids_to_check):
needs_a_task = True
dist_to_create += len(repo_ver['dist_repo_ids'])
if needs_a_task:
repo_ver_to_create += len(repo_versions)
repo = Repository.objects.get(name=repo_name).cast()
task_args = [plugin, pulp3_repo_setup, repo_name]
enqueue_with_reservation(
complex_repo_migration,
[repo],
args=task_args,
task_group=TaskGroup.current()
)
else:
# Serial (non-parallel)
for repo_name in pulp3_repo_setup:
task_args = [plugin, pulp3_repo_setup, repo_name]
complex_repo_migration(*task_args)
repo_versions = pulp3_repo_setup[repo_name]['repository_versions']
needs_a_task = False
for repo_ver in repo_versions:
repos = set(repo_ver['dist_repo_ids'] + [repo_ver['repo_id']])
# check whether any resources are not migrated and need a task
if repos.intersection(repos_ids_to_check):
needs_a_task = True
if needs_a_task:
task_args = [plugin, pulp3_repo_setup, repo_name]
complex_repo_migration(*task_args)

task_group = TaskGroup.current()
progress_rv = task_group.group_progress_reports.filter(code='create.repo_version')
Expand Down
9 changes: 9 additions & 0 deletions pulp_2to3_migration/app/models/base.py
Expand Up @@ -349,6 +349,15 @@ def _parse_plugin_plan(self, repository_data):
class RepoSetup(BaseModel):
"""
A model to reflect changes between previous and current migration plans.
Fields:
pulp2_repo_id (models.TextField): pulp2_repo_id to migrate into a repo_version
pulp2_repo_type (models.CharField): pulp2 repo type
pulp2_resource_repo_id (models.TextField): pulp2_repo_id of the resource
(importer/distributor) to migrate
pulp2_resource_type (models.SmallIntegerField): pulp2 resource type - importer/distributor
status (models.SmallIntegerField): status of the record
"""
OLD = 0
UP_TO_DATE = 1
Expand Down
17 changes: 16 additions & 1 deletion pulp_2to3_migration/tests/functional/test_migration_plan.py
@@ -1,11 +1,13 @@
import json
import time
import unittest

from pulpcore.client.pulp_2to3_migration.exceptions import ApiException
from pulp_2to3_migration.tests.functional.util import set_pulp2_snapshot
from pulp_2to3_migration.tests.functional.util import get_psql_smash_cmd, set_pulp2_snapshot
from pulp_smash.pulp3.bindings import monitor_task, monitor_task_group, PulpTaskError

from .common_plans import FILE_SIMPLE_PLAN, FILE_COMPLEX_PLAN
from .constants import TRUNCATE_TABLES_QUERY_BASH
from .file_base import BaseTestFile

EXTRA_COMMA_PLAN = '{"plugins": [{"type": "iso"},]}'
Expand Down Expand Up @@ -44,6 +46,14 @@ def setUpClass(cls):
super().setUpClass()
set_pulp2_snapshot(name='file_base_4repos')

def tearDown(self):
"""
Clean up the database after each test.
"""
cmd = get_psql_smash_cmd(TRUNCATE_TABLES_QUERY_BASH)
self.smash_cli_client.run(cmd, sudo=True)
time.sleep(0.5)

def _do_test_parallel(self, plan, outcome):
"""Test that there were multiple tasks running in parallel as a part of a task group."""
mp = self.migration_plans_api.create({'plan': plan})
Expand Down Expand Up @@ -105,3 +115,8 @@ def test_simple_plan_parallel(self):
def test_complex_plan_parallel(self):
"""Test that using a complex plan, there is work which is performed in parallel."""
self._do_test_parallel(FILE_COMPLEX_PLAN, 5)

def test_optimized_tasks_rerun(self):
"""Test that second run of the same plan with no changes at all triggers only one task"""
self._do_test_parallel(FILE_COMPLEX_PLAN, 5)
self._do_test_parallel(FILE_COMPLEX_PLAN, 1)

0 comments on commit 9df621c

Please sign in to comment.