Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix large exports failing #5388

Merged
merged 1 commit into from
May 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/5375.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added Pulp side batching to fix large exports that were failing due to changes in psycopg.
68 changes: 37 additions & 31 deletions pulpcore/app/importexport.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from django.db.models.query import QuerySet

from pulpcore.app.apps import get_plugin_config
from pulpcore.app.models.content import Artifact
from pulpcore.app.models.progress import ProgressReport
from pulpcore.app.models.repository import Repository
from pulpcore.app.modelresource import (
Expand Down Expand Up @@ -50,25 +51,21 @@ def _write_export(the_tarfile, resource, dest_dir=None):
temp_file.write("[")

def process_batch(batch):
dataset = resource.export(batch)
model = resource.queryset.model
queryset = model.objects.filter(pk__in=batch)
dataset = resource.export(queryset)
# Strip "[" and "]" as we are writing the dataset in batch
temp_file.write(dataset.json.lstrip("[").rstrip("]"))

batch = []
needs_comma = False
for item in resource.queryset.iterator(chunk_size=EXPORT_BATCH_SIZE):
batch.append(item)
if needs_comma:
# Write "," if not last loop
temp_file.write(", ")
needs_comma = False

if len(batch) >= EXPORT_BATCH_SIZE:
process_batch(batch)
batch.clear()
needs_comma = True
first_loop = True
resource_pks = resource.queryset.values_list("pk", flat=True)
for offset in range(0, len(resource_pks), EXPORT_BATCH_SIZE):
batch = resource_pks[offset : offset + EXPORT_BATCH_SIZE]

if batch:
if not first_loop:
temp_file.write(", ")
else:
first_loop = False
process_batch(batch)

temp_file.write("]")
Expand Down Expand Up @@ -102,39 +99,48 @@ def export_versions(export, version_info):
export.tarfile.addfile(info, io.BytesIO(version_json))


def export_artifacts(export, artifacts):
def export_artifacts(export, artifact_pks):
"""
Export a set of Artifacts, ArtifactResources, and RepositoryResources

Args:
export (django.db.models.PulpExport): export instance that's doing the export
artifacts (django.db.models.Artifacts): List of artifacts in all repos being exported
artifact_pks (django.db.models.Artifacts): List of artifact_pks in all repos being exported

Raises:
ValidationError: When path is not in the ALLOWED_EXPORT_PATHS setting
"""
data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifacts))
data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifact_pks))
with ProgressReport(**data) as pb:
pb.BATCH_INTERVAL = 5000

if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem":
hstct marked this conversation as resolved.
Show resolved Hide resolved
with tempfile.TemporaryDirectory(dir=".") as temp_dir:
for artifact in pb.iter(artifacts.only("file").iterator()):
with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file:
# TODO: this looks like a memory usage threat
# TODO: it's also probably horrificaly slow, going one-by-one over the net
# TODO: probably we could skip the temp file entirely and add
# artifact.file.read() directly to the tarfile with tarfile.addfile()
temp_file.write(artifact.file.read())
temp_file.flush()
artifact.file.close()
export.tarfile.add(temp_file.name, artifact.file.name)
for offset in range(0, len(artifact_pks), EXPORT_BATCH_SIZE):
batch = artifact_pks[offset : offset + EXPORT_BATCH_SIZE]
batch_qs = Artifact.objects.filter(pk__in=batch).only("file")

for artifact in pb.iter(batch_qs.iterator()):
with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file:
# TODO: this looks like a memory usage threat
# TODO: it's probably very slow, going one-by-one over the net
# TODO: probably we could skip the temp file entirely and add
# artifact.file.read() directly to the tarfile with
# tarfile.addfile()
temp_file.write(artifact.file.read())
temp_file.flush()
artifact.file.close()
export.tarfile.add(temp_file.name, artifact.file.name)
else:
for artifact in pb.iter(artifacts.only("file").iterator()):
export.tarfile.add(artifact.file.path, artifact.file.name)
for offset in range(0, len(artifact_pks), EXPORT_BATCH_SIZE):
batch = artifact_pks[offset : offset + EXPORT_BATCH_SIZE]
batch_qs = Artifact.objects.filter(pk__in=batch).only("file")

for artifact in pb.iter(batch_qs.iterator()):
export.tarfile.add(artifact.file.path, artifact.file.name)

resource = ArtifactResource()
resource.queryset = artifacts
resource.queryset = Artifact.objects.filter(pk__in=artifact_pks)
_write_export(export.tarfile, resource)

resource = RepositoryResource()
Expand Down
4 changes: 2 additions & 2 deletions pulpcore/app/tasks/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
RepositoryVersion,
Task,
)
from pulpcore.app.models.content import Artifact, ContentArtifact
from pulpcore.app.models.content import ContentArtifact
from pulpcore.app.serializers import PulpExportSerializer

from pulpcore.app.util import compute_file_hash, Crc32Hasher
Expand Down Expand Up @@ -509,7 +509,7 @@ def _do_export(pulp_exporter, tar, the_export):

# Export the top-level entities (artifacts and repositories)
# Note: we've already handled "what about incrementals" when building the 'artifacts' list
export_artifacts(the_export, Artifact.objects.filter(pk__in=artifact_pks))
export_artifacts(the_export, list(artifact_pks))
del artifact_pks

# Export the repository-version data, per-version
Expand Down
Loading