From db4459318398db74a64a76e98a15443bd2ea2e64 Mon Sep 17 00:00:00 2001 From: Dennis Kliban Date: Wed, 18 Aug 2021 12:51:20 -0400 Subject: [PATCH] Runs all DB queries synchronously during a sync task. Required PR: https://github.com/pulp/pulpcore/pull/1553 closes: #9253 --- CHANGES/9253.bugfix | 1 + pulp_rpm/app/tasks/synchronizing.py | 93 ++++++++++++++--------------- 2 files changed, 47 insertions(+), 47 deletions(-) create mode 100644 CHANGES/9253.bugfix diff --git a/CHANGES/9253.bugfix b/CHANGES/9253.bugfix new file mode 100644 index 000000000..c1fcc54cc --- /dev/null +++ b/CHANGES/9253.bugfix @@ -0,0 +1 @@ +Fixed bug where sync tasks would open a lot of DB connections. diff --git a/pulp_rpm/app/tasks/synchronizing.py b/pulp_rpm/app/tasks/synchronizing.py index e094bbb3a..228854cdf 100644 --- a/pulp_rpm/app/tasks/synchronizing.py +++ b/pulp_rpm/app/tasks/synchronizing.py @@ -9,6 +9,7 @@ from collections import defaultdict from gettext import gettext as _ # noqa:F401 +from asgiref.sync import sync_to_async from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from django.core.files import File @@ -352,11 +353,6 @@ def get_treeinfo_data(remote, remote_url): sha256 = result.artifact_attributes["sha256"] treeinfo_data = TreeinfoData(treeinfo.parsed_sections()) - # import pydevd_pycharm - # pydevd_pycharm.settrace( - # "localhost", port=12735, stdoutToServer=True, stderrToServer=True - # ) - # get the data we need before changing the original treeinfo_serialized = treeinfo_data.to_dict(hash=sha256, filename=namespace) @@ -617,14 +613,14 @@ async def run(self): progress_data = dict( message="Downloading Metadata Files", code="sync.downloading.metadata" ) - with ProgressReport(**progress_data) as metadata_pb: + async with ProgressReport(**progress_data) as metadata_pb: # download repomd.xml downloader = self.remote.get_downloader( url=urlpath_sanitize(self.remote_url, "repodata/repomd.xml") ) result = await downloader.run() store_metadata_for_mirroring(self.repository, result.path, "repodata/repomd.xml") - metadata_pb.increment() + await metadata_pb.aincrement() repomd_path = result.path repomd = cr.Repomd(repomd_path) @@ -675,7 +671,7 @@ async def run_repomdrecord_download(name, location_href, downloader): name, location_href, result = await future store_metadata_for_mirroring(self.repository, result.path, location_href) repomd_files[name] = result - metadata_pb.increment() + await metadata_pb.aincrement() except ClientResponseError as exc: raise HTTPNotFound(reason=_("File not found: {}".format(exc.request_info.url))) except FileNotFoundError: @@ -690,7 +686,7 @@ async def run_repomdrecord_download(name, location_href, downloader): ) result = await downloader.run() store_metadata_for_mirroring(self.repository, result.path, file_href) - metadata_pb.increment() + await metadata_pb.aincrement() except (ClientResponseError, FileNotFoundError): pass @@ -703,7 +699,7 @@ async def run_repomdrecord_download(name, location_href, downloader): store_metadata_for_mirroring( self.repository, result.path, "extra_files.json" ) - metadata_pb.increment() + await metadata_pb.aincrement() except (ClientResponseError, FileNotFoundError): pass else: @@ -725,7 +721,7 @@ async def run_repomdrecord_download(name, location_href, downloader): store_metadata_for_mirroring( self.repository, result.path, data["file"] ) - metadata_pb.increment() + await metadata_pb.aincrement() except ClientResponseError as exc: raise HTTPNotFound( reason=_("File not found: {}".format(exc.request_info.url)) @@ -855,7 +851,7 @@ async def parse_modules_metadata(self, modulemd_result): # Parsing modules happens all at one time, and from here on no useful work happens. # So just report that it finished this stage. modulemd_pb_data = {"message": "Parsed Modulemd", "code": "sync.parsing.modulemds"} - with ProgressReport(**modulemd_pb_data) as modulemd_pb: + async with ProgressReport(**modulemd_pb_data) as modulemd_pb: modulemd_total = len(modulemd_all) modulemd_pb.total = modulemd_total modulemd_pb.done = modulemd_total @@ -890,7 +886,7 @@ async def parse_modules_metadata(self, modulemd_result): "message": "Parsed Modulemd-defaults", "code": "sync.parsing.modulemd_defaults", } - with ProgressReport(**modulemd_defaults_pb_data) as modulemd_defaults_pb: + async with ProgressReport(**modulemd_defaults_pb_data) as modulemd_defaults_pb: modulemd_defaults_total = len(modulemd_default_names) modulemd_defaults_pb.total = modulemd_defaults_total modulemd_defaults_pb.done = modulemd_defaults_total @@ -930,7 +926,7 @@ async def parse_packages_components(self, comps_result): comps = libcomps.Comps() comps.fromxml_f(comps_result.path) - with ProgressReport(message="Parsed Comps", code="sync.parsing.comps") as comps_pb: + async with ProgressReport(message="Parsed Comps", code="sync.parsing.comps") as comps_pb: comps_total = len(comps.groups) + len(comps.categories) + len(comps.environments) comps_pb.total = comps_total comps_pb.done = comps_total @@ -1039,7 +1035,7 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_exten file_extension, filelists_xml.path, other_xml.path ) seen_pkgids = set() - with ProgressReport(**progress_data) as packages_pb: + async with ProgressReport(**progress_data) as packages_pb: while True: try: (pkgid, pkg) = packages.popitem(last=False) @@ -1100,7 +1096,7 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_exten dc.extra_data["group_relations"].append(dc_group) dc_group.extra_data["related_packages"].append(dc) - packages_pb.increment() + await packages_pb.aincrement() await self.put(dc) async def parse_advisories(self, result): @@ -1113,7 +1109,7 @@ async def parse_advisories(self, result): "code": "sync.parsing.advisories", "total": len(updates), } - with ProgressReport(**progress_data) as advisories_pb: + async with ProgressReport(**progress_data) as advisories_pb: for update in updates: update_record = UpdateRecord(**UpdateRecord.createrepo_to_dict(update)) update_record.digest = hash_update_record(update) @@ -1133,7 +1129,7 @@ async def parse_advisories(self, result): ref = UpdateReference(**reference_dict) future_relations["references"].append(ref) - advisories_pb.increment() + await advisories_pb.aincrement() dc = DeclarativeContent(content=update_record) dc.extra_data = future_relations await self.put(dc) @@ -1152,38 +1148,41 @@ async def run(self): Create all the relationships. """ async for batch in self.batches(): - with transaction.atomic(): - ModulemdPackages = Modulemd.packages.through - modulemd_pkgs_to_save = [] + def process_batch(): + with transaction.atomic(): + ModulemdPackages = Modulemd.packages.through - for d_content in batch: - if d_content is None: - continue + modulemd_pkgs_to_save = [] - if isinstance(d_content.content, Modulemd): - for pkg in d_content.extra_data["package_relation"]: - if not pkg.content._state.adding: - module_package = ModulemdPackages( - package_id=pkg.content.pk, - modulemd_id=d_content.content.pk, - ) - modulemd_pkgs_to_save.append(module_package) - - elif isinstance(d_content.content, Package): - for modulemd in d_content.extra_data["modulemd_relation"]: - if not modulemd.content._state.adding: - module_package = ModulemdPackages( - package_id=d_content.content.pk, - modulemd_id=modulemd.content.pk, - ) - modulemd_pkgs_to_save.append(module_package) - - if modulemd_pkgs_to_save: - ModulemdPackages.objects.bulk_create( - modulemd_pkgs_to_save, ignore_conflicts=True - ) + for d_content in batch: + if d_content is None: + continue + + if isinstance(d_content.content, Modulemd): + for pkg in d_content.extra_data["package_relation"]: + if not pkg.content._state.adding: + module_package = ModulemdPackages( + package_id=pkg.content.pk, + modulemd_id=d_content.content.pk, + ) + modulemd_pkgs_to_save.append(module_package) + + elif isinstance(d_content.content, Package): + for modulemd in d_content.extra_data["modulemd_relation"]: + if not modulemd.content._state.adding: + module_package = ModulemdPackages( + package_id=d_content.content.pk, + modulemd_id=modulemd.content.pk, + ) + modulemd_pkgs_to_save.append(module_package) + + if modulemd_pkgs_to_save: + ModulemdPackages.objects.bulk_create( + modulemd_pkgs_to_save, ignore_conflicts=True + ) + await sync_to_async(process_batch)() for declarative_content in batch: await self.put(declarative_content) @@ -1196,7 +1195,7 @@ class RpmContentSaver(ContentSaver): the UpdateRecord content unit. """ - async def _post_save(self, batch): + def _post_save(self, batch): """ Save a batch of UpdateCollection, UpdateCollectionPackage, UpdateReference objects.