Skip to content

Commit

Permalink
Runs all DB queries synchronously during a sync task.
Browse files Browse the repository at this point in the history
Required PR: pulp/pulpcore#1553

closes: #9253
  • Loading branch information
dkliban authored and dralley committed Aug 21, 2021
1 parent 74db325 commit 9d10803
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGES/9253.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug where sync tasks would open a lot of DB connections.
93 changes: 46 additions & 47 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import defaultdict
from gettext import gettext as _ # noqa:F401

from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.files import File
Expand Down Expand Up @@ -352,11 +353,6 @@ def get_treeinfo_data(remote, remote_url):
sha256 = result.artifact_attributes["sha256"]
treeinfo_data = TreeinfoData(treeinfo.parsed_sections())

# import pydevd_pycharm
# pydevd_pycharm.settrace(
# "localhost", port=12735, stdoutToServer=True, stderrToServer=True
# )

# get the data we need before changing the original
treeinfo_serialized = treeinfo_data.to_dict(hash=sha256, filename=namespace)

Expand Down Expand Up @@ -617,14 +613,14 @@ async def run(self):
progress_data = dict(
message="Downloading Metadata Files", code="sync.downloading.metadata"
)
with ProgressReport(**progress_data) as metadata_pb:
async with ProgressReport(**progress_data) as metadata_pb:
# download repomd.xml
downloader = self.remote.get_downloader(
url=urlpath_sanitize(self.remote_url, "repodata/repomd.xml")
)
result = await downloader.run()
store_metadata_for_mirroring(self.repository, result.path, "repodata/repomd.xml")
metadata_pb.increment()
await metadata_pb.aincrement()

repomd_path = result.path
repomd = cr.Repomd(repomd_path)
Expand Down Expand Up @@ -675,7 +671,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
name, location_href, result = await future
store_metadata_for_mirroring(self.repository, result.path, location_href)
repomd_files[name] = result
metadata_pb.increment()
await metadata_pb.aincrement()
except ClientResponseError as exc:
raise HTTPNotFound(reason=_("File not found: {}".format(exc.request_info.url)))
except FileNotFoundError:
Expand All @@ -690,7 +686,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
)
result = await downloader.run()
store_metadata_for_mirroring(self.repository, result.path, file_href)
metadata_pb.increment()
await metadata_pb.aincrement()
except (ClientResponseError, FileNotFoundError):
pass

Expand All @@ -703,7 +699,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
store_metadata_for_mirroring(
self.repository, result.path, "extra_files.json"
)
metadata_pb.increment()
await metadata_pb.aincrement()
except (ClientResponseError, FileNotFoundError):
pass
else:
Expand All @@ -725,7 +721,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
store_metadata_for_mirroring(
self.repository, result.path, data["file"]
)
metadata_pb.increment()
await metadata_pb.aincrement()
except ClientResponseError as exc:
raise HTTPNotFound(
reason=_("File not found: {}".format(exc.request_info.url))
Expand Down Expand Up @@ -855,7 +851,7 @@ async def parse_modules_metadata(self, modulemd_result):
# Parsing modules happens all at one time, and from here on no useful work happens.
# So just report that it finished this stage.
modulemd_pb_data = {"message": "Parsed Modulemd", "code": "sync.parsing.modulemds"}
with ProgressReport(**modulemd_pb_data) as modulemd_pb:
async with ProgressReport(**modulemd_pb_data) as modulemd_pb:
modulemd_total = len(modulemd_all)
modulemd_pb.total = modulemd_total
modulemd_pb.done = modulemd_total
Expand Down Expand Up @@ -890,7 +886,7 @@ async def parse_modules_metadata(self, modulemd_result):
"message": "Parsed Modulemd-defaults",
"code": "sync.parsing.modulemd_defaults",
}
with ProgressReport(**modulemd_defaults_pb_data) as modulemd_defaults_pb:
async with ProgressReport(**modulemd_defaults_pb_data) as modulemd_defaults_pb:
modulemd_defaults_total = len(modulemd_default_names)
modulemd_defaults_pb.total = modulemd_defaults_total
modulemd_defaults_pb.done = modulemd_defaults_total
Expand Down Expand Up @@ -930,7 +926,7 @@ async def parse_packages_components(self, comps_result):
comps = libcomps.Comps()
comps.fromxml_f(comps_result.path)

with ProgressReport(message="Parsed Comps", code="sync.parsing.comps") as comps_pb:
async with ProgressReport(message="Parsed Comps", code="sync.parsing.comps") as comps_pb:
comps_total = len(comps.groups) + len(comps.categories) + len(comps.environments)
comps_pb.total = comps_total
comps_pb.done = comps_total
Expand Down Expand Up @@ -1039,7 +1035,7 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_exten
file_extension, filelists_xml.path, other_xml.path
)
seen_pkgids = set()
with ProgressReport(**progress_data) as packages_pb:
async with ProgressReport(**progress_data) as packages_pb:
while True:
try:
(pkgid, pkg) = packages.popitem(last=False)
Expand Down Expand Up @@ -1100,7 +1096,7 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_exten
dc.extra_data["group_relations"].append(dc_group)
dc_group.extra_data["related_packages"].append(dc)

packages_pb.increment()
await packages_pb.aincrement()
await self.put(dc)

async def parse_advisories(self, result):
Expand All @@ -1113,7 +1109,7 @@ async def parse_advisories(self, result):
"code": "sync.parsing.advisories",
"total": len(updates),
}
with ProgressReport(**progress_data) as advisories_pb:
async with ProgressReport(**progress_data) as advisories_pb:
for update in updates:
update_record = UpdateRecord(**UpdateRecord.createrepo_to_dict(update))
update_record.digest = hash_update_record(update)
Expand All @@ -1133,7 +1129,7 @@ async def parse_advisories(self, result):
ref = UpdateReference(**reference_dict)
future_relations["references"].append(ref)

advisories_pb.increment()
await advisories_pb.aincrement()
dc = DeclarativeContent(content=update_record)
dc.extra_data = future_relations
await self.put(dc)
Expand All @@ -1152,38 +1148,41 @@ async def run(self):
Create all the relationships.
"""
async for batch in self.batches():
with transaction.atomic():
ModulemdPackages = Modulemd.packages.through

modulemd_pkgs_to_save = []
def process_batch():
with transaction.atomic():
ModulemdPackages = Modulemd.packages.through

for d_content in batch:
if d_content is None:
continue
modulemd_pkgs_to_save = []

if isinstance(d_content.content, Modulemd):
for pkg in d_content.extra_data["package_relation"]:
if not pkg.content._state.adding:
module_package = ModulemdPackages(
package_id=pkg.content.pk,
modulemd_id=d_content.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

elif isinstance(d_content.content, Package):
for modulemd in d_content.extra_data["modulemd_relation"]:
if not modulemd.content._state.adding:
module_package = ModulemdPackages(
package_id=d_content.content.pk,
modulemd_id=modulemd.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

if modulemd_pkgs_to_save:
ModulemdPackages.objects.bulk_create(
modulemd_pkgs_to_save, ignore_conflicts=True
)
for d_content in batch:
if d_content is None:
continue

if isinstance(d_content.content, Modulemd):
for pkg in d_content.extra_data["package_relation"]:
if not pkg.content._state.adding:
module_package = ModulemdPackages(
package_id=pkg.content.pk,
modulemd_id=d_content.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

elif isinstance(d_content.content, Package):
for modulemd in d_content.extra_data["modulemd_relation"]:
if not modulemd.content._state.adding:
module_package = ModulemdPackages(
package_id=d_content.content.pk,
modulemd_id=modulemd.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

if modulemd_pkgs_to_save:
ModulemdPackages.objects.bulk_create(
modulemd_pkgs_to_save, ignore_conflicts=True
)

await sync_to_async(process_batch)()
for declarative_content in batch:
await self.put(declarative_content)

Expand All @@ -1196,7 +1195,7 @@ class RpmContentSaver(ContentSaver):
the UpdateRecord content unit.
"""

async def _post_save(self, batch):
def _post_save(self, batch):
"""
Save a batch of UpdateCollection, UpdateCollectionPackage, UpdateReference objects.
Expand Down

0 comments on commit 9d10803

Please sign in to comment.