Skip to content

Commit

Permalink
Adds RpmInterrelateContent stage
Browse files Browse the repository at this point in the history
  • Loading branch information
dkliban committed Apr 6, 2020
1 parent ea7a72c commit 7f75a3e
Show file tree
Hide file tree
Showing 2 changed files with 145 additions and 116 deletions.
1 change: 1 addition & 0 deletions CHANGES/5455.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added an RpmInterrelateContent stage to reduce the length of transaction in the RpmContentSaver stage.
260 changes: 144 additions & 116 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from gettext import gettext as _ # noqa:F401
from urllib.parse import urljoin

from django.db import transaction

from aiohttp.client_exceptions import ClientResponseError
from aiohttp.web_exceptions import HTTPNotFound
import createrepo_c as cr
Expand Down Expand Up @@ -194,6 +196,7 @@ def pipeline_stages(self, new_version):
ArtifactSaver(),
QueryExistingContents(),
RpmContentSaver(),
RpmInterrelateContent(),
RemoteArtifactSaver(),
]
return pipeline
Expand Down Expand Up @@ -730,6 +733,147 @@ async def run(self):
await self.put(dc_group)


class RpmInterrelateContent(Stage):
"""
A stage that creates relationships between Packages and other related types.
This stage creates relationships Packages and Modulemd, PackageGroup, PackageCatagory and
PackageEnvironment models.
"""

async def run(self):
"""
Create all the relationships.
"""
async for batch in self.batches():
with transaction.atomic():
ModulemdPackages = Modulemd.packages.through
PackageGroupPackages = PackageGroup.related_packages.through
PackageCategoryGroups = PackageCategory.packagegroups.through
PackageEnvironmentGroups = PackageEnvironment.packagegroups.through
PackageEnvironmentOptionalGroups = PackageEnvironment.optionalgroups.through

modulemd_pkgs_to_save = []
group_pkgs_to_save = []
category_groups_to_save = []
env_groups_to_save = []
env_optgroups_to_save = []

for d_content in batch:
if d_content is None:
continue

if isinstance(d_content.content, Modulemd):
for pkg in d_content.extra_data['package_relation']:
if not pkg.content._state.adding:
module_package = ModulemdPackages(
package_id=pkg.content.pk,
modulemd_id=d_content.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

elif isinstance(d_content.content, PackageCategory):
for grp in d_content.extra_data['packagegroups']:
if not grp.content._state.adding:
category_group = PackageCategoryGroups(
packagegroup_id=grp.content.pk,
packagecategory_id=d_content.content.pk,
)
category_groups_to_save.append(category_group)

elif isinstance(d_content.content, PackageEnvironment):
for grp in d_content.extra_data['packagegroups']:
if not grp.content._state.adding:
env_group = PackageEnvironmentGroups(
packagegroup_id=grp.content.pk,
packageenvironment_id=d_content.content.pk,
)
env_groups_to_save.append(env_group)

for opt in d_content.extra_data['optionalgroups']:
if not opt.content._state.adding:
env_optgroup = PackageEnvironmentOptionalGroups(
packagegroup_id=opt.content.pk,
packageenvironment_id=d_content.content.pk,
)
env_optgroups_to_save.append(env_optgroup)

elif isinstance(d_content.content, PackageGroup):
for pkg in d_content.extra_data['related_packages']:
if not pkg.content._state.adding:
group_pkg = PackageGroupPackages(
package_id=pkg.content.pk,
packagegroup_id=d_content.content.pk
)
group_pkgs_to_save.append(group_pkg)

for packagecategory in d_content.extra_data['category_relations']:
if not packagecategory.content._state.adding:
category_group = PackageCategoryGroups(
packagegroup_id=d_content.content.pk,
packagecategory_id=packagecategory.content.pk,
)
category_groups_to_save.append(category_group)

for packageenvironment in d_content.extra_data['environment_relations']:
if not packageenvironment.content._state.adding:
env_group = PackageEnvironmentGroups(
packagegroup_id=d_content.content.pk,
packageenvironment_id=packageenvironment.content.pk,
)
env_groups_to_save.append(env_group)

for packageenvironment in d_content.extra_data['env_relations_optional']:
if not packageenvironment.content._state.adding:
env_optgroup = PackageEnvironmentOptionalGroups(
packagegroup_id=d_content.content.pk,
packageenvironment_id=packageenvironment.content.pk,
)
env_optgroups_to_save.append(env_optgroup)

elif isinstance(d_content.content, Package):
for modulemd in d_content.extra_data['modulemd_relation']:
if not modulemd.content._state.adding:
module_package = ModulemdPackages(
package_id=d_content.content.pk,
modulemd_id=modulemd.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

for packagegroup in d_content.extra_data['group_relations']:
if not packagegroup.content._state.adding:
group_pkg = PackageGroupPackages(
package_id=d_content.content.pk,
packagegroup_id=packagegroup.content.pk,
)
group_pkgs_to_save.append(group_pkg)

if modulemd_pkgs_to_save:
ModulemdPackages.objects.bulk_create(modulemd_pkgs_to_save,
ignore_conflicts=True)

if group_pkgs_to_save:
PackageGroupPackages.objects.bulk_create(group_pkgs_to_save,
ignore_conflicts=True)

if category_groups_to_save:
PackageCategoryGroups.objects.bulk_create(
category_groups_to_save, ignore_conflicts=True
)

if env_groups_to_save:
PackageEnvironmentGroups.objects.bulk_create(env_groups_to_save,
ignore_conflicts=True)

if env_optgroups_to_save:
PackageEnvironmentOptionalGroups.objects.bulk_create(
env_optgroups_to_save, ignore_conflicts=True
)

for declarative_content in batch:
await self.put(declarative_content)


class RpmContentSaver(ContentSaver):
"""
A modification of ContentSaver stage that additionally saves RPM plugin specific items.
Expand Down Expand Up @@ -798,28 +942,16 @@ def _handle_distribution_tree(declarative_content):
Variant.objects.bulk_create(variants)

UpdateRecordCollections = UpdateRecord.collections.through
ModulemdPackages = Modulemd.packages.through
PackageGroupPackages = PackageGroup.related_packages.through
PackageCategoryGroups = PackageCategory.packagegroups.through
PackageEnvironmentGroups = PackageEnvironment.packagegroups.through
PackageEnvironmentOptionalGroups = PackageEnvironment.optionalgroups.through

update_collection_to_save = []
update_record_collections_to_save = []
update_references_to_save = []
update_collection_packages_to_save = []
modulemd_pkgs_to_save = []
group_pkgs_to_save = []
category_groups_to_save = []
env_groups_to_save = []
env_optgroups_to_save = []

for declarative_content in batch:
if declarative_content is None:
continue

content_already_saved = not declarative_content.content._state.adding

if isinstance(declarative_content.content, DistributionTree):
_handle_distribution_tree(declarative_content)
continue
Expand Down Expand Up @@ -850,110 +982,6 @@ def _handle_distribution_tree(declarative_content):
update_reference.update_record = update_record
update_references_to_save.append(update_reference)

elif isinstance(declarative_content.content, Modulemd):
for pkg in declarative_content.extra_data['package_relation']:
if not pkg.content._state.adding and content_already_saved:
module_package = ModulemdPackages(
package_id=pkg.content.pk,
modulemd_id=declarative_content.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

elif isinstance(declarative_content.content, PackageCategory):
for grp in declarative_content.extra_data['packagegroups']:
if not grp.content._state.adding and content_already_saved:
category_group = PackageCategoryGroups(
packagegroup_id=grp.content.pk,
packagecategory_id=declarative_content.content.pk,
)
category_groups_to_save.append(category_group)

elif isinstance(declarative_content.content, PackageEnvironment):
for grp in declarative_content.extra_data['packagegroups']:
if not grp.content._state.adding and content_already_saved:
env_group = PackageEnvironmentGroups(
packagegroup_id=grp.content.pk,
packageenvironment_id=declarative_content.content.pk,
)
env_groups_to_save.append(env_group)

for opt in declarative_content.extra_data['optionalgroups']:
if not opt.content._state.adding and content_already_saved:
env_optgroup = PackageEnvironmentOptionalGroups(
packagegroup_id=opt.content.pk,
packageenvironment_id=declarative_content.content.pk,
)
env_optgroups_to_save.append(env_optgroup)

elif isinstance(declarative_content.content, PackageGroup):
for pkg in declarative_content.extra_data['related_packages']:
if not pkg.content._state.adding and content_already_saved:
group_pkg = PackageGroupPackages(
package_id=pkg.content.pk,
packagegroup_id=declarative_content.content.pk
)
group_pkgs_to_save.append(group_pkg)

for packagecategory in declarative_content.extra_data['category_relations']:
if not packagecategory.content._state.adding and content_already_saved:
category_group = PackageCategoryGroups(
packagegroup_id=declarative_content.content.pk,
packagecategory_id=packagecategory.content.pk,
)
category_groups_to_save.append(category_group)

for packageenvironment in declarative_content.extra_data['environment_relations']:
if not packageenvironment.content._state.adding and content_already_saved:
env_group = PackageEnvironmentGroups(
packagegroup_id=declarative_content.content.pk,
packageenvironment_id=packageenvironment.content.pk,
)
env_groups_to_save.append(env_group)

for packageenvironment in declarative_content.extra_data['env_relations_optional']:
if not packageenvironment.content._state.adding and content_already_saved:
env_optgroup = PackageEnvironmentOptionalGroups(
packagegroup_id=declarative_content.content.pk,
packageenvironment_id=packageenvironment.content.pk,
)
env_optgroups_to_save.append(env_optgroup)

elif isinstance(declarative_content.content, Package):
for modulemd in declarative_content.extra_data['modulemd_relation']:
if not modulemd.content._state.adding and content_already_saved:
module_package = ModulemdPackages(
package_id=declarative_content.content.pk,
modulemd_id=modulemd.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

for packagegroup in declarative_content.extra_data['group_relations']:
if not packagegroup.content._state.adding and content_already_saved:
group_pkg = PackageGroupPackages(
package_id=declarative_content.content.pk,
packagegroup_id=packagegroup.content.pk,
)
group_pkgs_to_save.append(group_pkg)

if modulemd_pkgs_to_save:
ModulemdPackages.objects.bulk_create(modulemd_pkgs_to_save, ignore_conflicts=True)

if group_pkgs_to_save:
PackageGroupPackages.objects.bulk_create(group_pkgs_to_save, ignore_conflicts=True)

if category_groups_to_save:
PackageCategoryGroups.objects.bulk_create(
category_groups_to_save, ignore_conflicts=True
)

if env_groups_to_save:
PackageEnvironmentGroups.objects.bulk_create(env_groups_to_save, ignore_conflicts=True)

if env_optgroups_to_save:
PackageEnvironmentOptionalGroups.objects.bulk_create(
env_optgroups_to_save, ignore_conflicts=True
)

if update_collection_to_save:
UpdateCollection.objects.bulk_create(update_collection_to_save)

Expand Down

0 comments on commit 7f75a3e

Please sign in to comment.