diff --git a/CHANGES/5375.bugfix b/CHANGES/5375.bugfix new file mode 100644 index 0000000000..6d5a74d931 --- /dev/null +++ b/CHANGES/5375.bugfix @@ -0,0 +1 @@ +Added Pulp side batching to fix large exports that were failing due to changes in psycopg. diff --git a/pulpcore/app/importexport.py b/pulpcore/app/importexport.py index 35deeefd3e..3dab3aa6ee 100644 --- a/pulpcore/app/importexport.py +++ b/pulpcore/app/importexport.py @@ -9,6 +9,7 @@ from django.db.models.query import QuerySet from pulpcore.app.apps import get_plugin_config +from pulpcore.app.models.content import Artifact from pulpcore.app.models.progress import ProgressReport from pulpcore.app.models.repository import Repository from pulpcore.app.modelresource import ( @@ -50,25 +51,21 @@ def _write_export(the_tarfile, resource, dest_dir=None): temp_file.write("[") def process_batch(batch): - dataset = resource.export(batch) + model = resource.queryset.model + queryset = model.objects.filter(pk__in=batch) + dataset = resource.export(queryset) # Strip "[" and "]" as we are writing the dataset in batch temp_file.write(dataset.json.lstrip("[").rstrip("]")) - batch = [] - needs_comma = False - for item in resource.queryset.iterator(chunk_size=EXPORT_BATCH_SIZE): - batch.append(item) - if needs_comma: - # Write "," if not last loop - temp_file.write(", ") - needs_comma = False - - if len(batch) >= EXPORT_BATCH_SIZE: - process_batch(batch) - batch.clear() - needs_comma = True + first_loop = True + resource_pks = resource.queryset.values_list("pk", flat=True) + for offset in range(0, len(resource_pks), EXPORT_BATCH_SIZE): + batch = resource_pks[offset : offset + EXPORT_BATCH_SIZE] - if batch: + if not first_loop: + temp_file.write(", ") + else: + first_loop = False process_batch(batch) temp_file.write("]") @@ -102,39 +99,48 @@ def export_versions(export, version_info): export.tarfile.addfile(info, io.BytesIO(version_json)) -def export_artifacts(export, artifacts): +def export_artifacts(export, artifact_pks): """ Export a set of Artifacts, ArtifactResources, and RepositoryResources Args: export (django.db.models.PulpExport): export instance that's doing the export - artifacts (django.db.models.Artifacts): List of artifacts in all repos being exported + artifact_pks (django.db.models.Artifacts): List of artifact_pks in all repos being exported Raises: ValidationError: When path is not in the ALLOWED_EXPORT_PATHS setting """ - data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifacts)) + data = dict(message="Exporting Artifacts", code="export.artifacts", total=len(artifact_pks)) with ProgressReport(**data) as pb: pb.BATCH_INTERVAL = 5000 if settings.DEFAULT_FILE_STORAGE != "pulpcore.app.models.storage.FileSystem": with tempfile.TemporaryDirectory(dir=".") as temp_dir: - for artifact in pb.iter(artifacts.only("file").iterator()): - with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file: - # TODO: this looks like a memory usage threat - # TODO: it's also probably horrificaly slow, going one-by-one over the net - # TODO: probably we could skip the temp file entirely and add - # artifact.file.read() directly to the tarfile with tarfile.addfile() - temp_file.write(artifact.file.read()) - temp_file.flush() - artifact.file.close() - export.tarfile.add(temp_file.name, artifact.file.name) + for offset in range(0, len(artifact_pks), EXPORT_BATCH_SIZE): + batch = artifact_pks[offset : offset + EXPORT_BATCH_SIZE] + batch_qs = Artifact.objects.filter(pk__in=batch).only("file") + + for artifact in pb.iter(batch_qs.iterator()): + with tempfile.NamedTemporaryFile(dir=temp_dir) as temp_file: + # TODO: this looks like a memory usage threat + # TODO: it's probably very slow, going one-by-one over the net + # TODO: probably we could skip the temp file entirely and add + # artifact.file.read() directly to the tarfile with + # tarfile.addfile() + temp_file.write(artifact.file.read()) + temp_file.flush() + artifact.file.close() + export.tarfile.add(temp_file.name, artifact.file.name) else: - for artifact in pb.iter(artifacts.only("file").iterator()): - export.tarfile.add(artifact.file.path, artifact.file.name) + for offset in range(0, len(artifact_pks), EXPORT_BATCH_SIZE): + batch = artifact_pks[offset : offset + EXPORT_BATCH_SIZE] + batch_qs = Artifact.objects.filter(pk__in=batch).only("file") + + for artifact in pb.iter(batch_qs.iterator()): + export.tarfile.add(artifact.file.path, artifact.file.name) resource = ArtifactResource() - resource.queryset = artifacts + resource.queryset = Artifact.objects.filter(pk__in=artifact_pks) _write_export(export.tarfile, resource) resource = RepositoryResource() diff --git a/pulpcore/app/tasks/export.py b/pulpcore/app/tasks/export.py index 57e23caef9..71956d2359 100644 --- a/pulpcore/app/tasks/export.py +++ b/pulpcore/app/tasks/export.py @@ -25,7 +25,7 @@ RepositoryVersion, Task, ) -from pulpcore.app.models.content import Artifact, ContentArtifact +from pulpcore.app.models.content import ContentArtifact from pulpcore.app.serializers import PulpExportSerializer from pulpcore.app.util import compute_file_hash, Crc32Hasher @@ -509,7 +509,7 @@ def _do_export(pulp_exporter, tar, the_export): # Export the top-level entities (artifacts and repositories) # Note: we've already handled "what about incrementals" when building the 'artifacts' list - export_artifacts(the_export, Artifact.objects.filter(pk__in=artifact_pks)) + export_artifacts(the_export, list(artifact_pks)) del artifact_pks # Export the repository-version data, per-version