Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Runs all DB queries synchronously during a sync task. #2084

Merged
merged 1 commit into from
Aug 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/9253.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed bug where sync tasks would open a lot of DB connections.
93 changes: 46 additions & 47 deletions pulp_rpm/app/tasks/synchronizing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections import defaultdict
from gettext import gettext as _ # noqa:F401

from asgiref.sync import sync_to_async
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.core.files import File
Expand Down Expand Up @@ -352,11 +353,6 @@ def get_treeinfo_data(remote, remote_url):
sha256 = result.artifact_attributes["sha256"]
treeinfo_data = TreeinfoData(treeinfo.parsed_sections())

# import pydevd_pycharm
# pydevd_pycharm.settrace(
# "localhost", port=12735, stdoutToServer=True, stderrToServer=True
# )
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol


# get the data we need before changing the original
treeinfo_serialized = treeinfo_data.to_dict(hash=sha256, filename=namespace)

Expand Down Expand Up @@ -617,14 +613,14 @@ async def run(self):
progress_data = dict(
message="Downloading Metadata Files", code="sync.downloading.metadata"
)
with ProgressReport(**progress_data) as metadata_pb:
async with ProgressReport(**progress_data) as metadata_pb:
# download repomd.xml
downloader = self.remote.get_downloader(
url=urlpath_sanitize(self.remote_url, "repodata/repomd.xml")
)
result = await downloader.run()
store_metadata_for_mirroring(self.repository, result.path, "repodata/repomd.xml")
metadata_pb.increment()
await metadata_pb.aincrement()

repomd_path = result.path
repomd = cr.Repomd(repomd_path)
Expand Down Expand Up @@ -675,7 +671,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
name, location_href, result = await future
store_metadata_for_mirroring(self.repository, result.path, location_href)
repomd_files[name] = result
metadata_pb.increment()
await metadata_pb.aincrement()
except ClientResponseError as exc:
raise HTTPNotFound(reason=_("File not found: {}".format(exc.request_info.url)))
except FileNotFoundError:
Expand All @@ -690,7 +686,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
)
result = await downloader.run()
store_metadata_for_mirroring(self.repository, result.path, file_href)
metadata_pb.increment()
await metadata_pb.aincrement()
except (ClientResponseError, FileNotFoundError):
pass

Expand All @@ -703,7 +699,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
store_metadata_for_mirroring(
self.repository, result.path, "extra_files.json"
)
metadata_pb.increment()
await metadata_pb.aincrement()
except (ClientResponseError, FileNotFoundError):
pass
else:
Expand All @@ -725,7 +721,7 @@ async def run_repomdrecord_download(name, location_href, downloader):
store_metadata_for_mirroring(
self.repository, result.path, data["file"]
)
metadata_pb.increment()
await metadata_pb.aincrement()
except ClientResponseError as exc:
raise HTTPNotFound(
reason=_("File not found: {}".format(exc.request_info.url))
Expand Down Expand Up @@ -855,7 +851,7 @@ async def parse_modules_metadata(self, modulemd_result):
# Parsing modules happens all at one time, and from here on no useful work happens.
# So just report that it finished this stage.
modulemd_pb_data = {"message": "Parsed Modulemd", "code": "sync.parsing.modulemds"}
with ProgressReport(**modulemd_pb_data) as modulemd_pb:
async with ProgressReport(**modulemd_pb_data) as modulemd_pb:
modulemd_total = len(modulemd_all)
modulemd_pb.total = modulemd_total
modulemd_pb.done = modulemd_total
Expand Down Expand Up @@ -890,7 +886,7 @@ async def parse_modules_metadata(self, modulemd_result):
"message": "Parsed Modulemd-defaults",
"code": "sync.parsing.modulemd_defaults",
}
with ProgressReport(**modulemd_defaults_pb_data) as modulemd_defaults_pb:
async with ProgressReport(**modulemd_defaults_pb_data) as modulemd_defaults_pb:
modulemd_defaults_total = len(modulemd_default_names)
modulemd_defaults_pb.total = modulemd_defaults_total
modulemd_defaults_pb.done = modulemd_defaults_total
Expand Down Expand Up @@ -930,7 +926,7 @@ async def parse_packages_components(self, comps_result):
comps = libcomps.Comps()
comps.fromxml_f(comps_result.path)

with ProgressReport(message="Parsed Comps", code="sync.parsing.comps") as comps_pb:
async with ProgressReport(message="Parsed Comps", code="sync.parsing.comps") as comps_pb:
comps_total = len(comps.groups) + len(comps.categories) + len(comps.environments)
comps_pb.total = comps_total
comps_pb.done = comps_total
Expand Down Expand Up @@ -1039,7 +1035,7 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_exten
file_extension, filelists_xml.path, other_xml.path
)
seen_pkgids = set()
with ProgressReport(**progress_data) as packages_pb:
async with ProgressReport(**progress_data) as packages_pb:
while True:
try:
(pkgid, pkg) = packages.popitem(last=False)
Expand Down Expand Up @@ -1100,7 +1096,7 @@ async def parse_packages(self, primary_xml, filelists_xml, other_xml, file_exten
dc.extra_data["group_relations"].append(dc_group)
dc_group.extra_data["related_packages"].append(dc)

packages_pb.increment()
await packages_pb.aincrement()
await self.put(dc)

async def parse_advisories(self, result):
Expand All @@ -1113,7 +1109,7 @@ async def parse_advisories(self, result):
"code": "sync.parsing.advisories",
"total": len(updates),
}
with ProgressReport(**progress_data) as advisories_pb:
async with ProgressReport(**progress_data) as advisories_pb:
for update in updates:
update_record = UpdateRecord(**UpdateRecord.createrepo_to_dict(update))
update_record.digest = hash_update_record(update)
Expand All @@ -1133,7 +1129,7 @@ async def parse_advisories(self, result):
ref = UpdateReference(**reference_dict)
future_relations["references"].append(ref)

advisories_pb.increment()
await advisories_pb.aincrement()
dc = DeclarativeContent(content=update_record)
dc.extra_data = future_relations
await self.put(dc)
Expand All @@ -1152,38 +1148,41 @@ async def run(self):
Create all the relationships.
"""
async for batch in self.batches():
with transaction.atomic():
ModulemdPackages = Modulemd.packages.through

modulemd_pkgs_to_save = []
def process_batch():
with transaction.atomic():
ModulemdPackages = Modulemd.packages.through

for d_content in batch:
if d_content is None:
continue
modulemd_pkgs_to_save = []

if isinstance(d_content.content, Modulemd):
for pkg in d_content.extra_data["package_relation"]:
if not pkg.content._state.adding:
module_package = ModulemdPackages(
package_id=pkg.content.pk,
modulemd_id=d_content.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

elif isinstance(d_content.content, Package):
for modulemd in d_content.extra_data["modulemd_relation"]:
if not modulemd.content._state.adding:
module_package = ModulemdPackages(
package_id=d_content.content.pk,
modulemd_id=modulemd.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

if modulemd_pkgs_to_save:
ModulemdPackages.objects.bulk_create(
modulemd_pkgs_to_save, ignore_conflicts=True
)
for d_content in batch:
if d_content is None:
continue

if isinstance(d_content.content, Modulemd):
for pkg in d_content.extra_data["package_relation"]:
if not pkg.content._state.adding:
module_package = ModulemdPackages(
package_id=pkg.content.pk,
modulemd_id=d_content.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

elif isinstance(d_content.content, Package):
for modulemd in d_content.extra_data["modulemd_relation"]:
if not modulemd.content._state.adding:
module_package = ModulemdPackages(
package_id=d_content.content.pk,
modulemd_id=modulemd.content.pk,
)
modulemd_pkgs_to_save.append(module_package)

if modulemd_pkgs_to_save:
ModulemdPackages.objects.bulk_create(
modulemd_pkgs_to_save, ignore_conflicts=True
)

await sync_to_async(process_batch)()
for declarative_content in batch:
await self.put(declarative_content)

Expand All @@ -1196,7 +1195,7 @@ class RpmContentSaver(ContentSaver):
the UpdateRecord content unit.
"""

async def _post_save(self, batch):
def _post_save(self, batch):
"""
Save a batch of UpdateCollection, UpdateCollectionPackage, UpdateReference objects.

Expand Down