Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/1188.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Reduced peak memory consumption of repair_metadata by lowering batch size from 1000 to 250,
eliminating double S3 reads for wheel files, and closing artifact file handles after each
iteration. This fixes "Worker has gone missing" errors on repositories with 1000+ packages.
33 changes: 21 additions & 12 deletions pulp_python/app/tasks/repair.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import os
from collections import defaultdict
from gettext import gettext as _
from itertools import groupby
Expand All @@ -10,6 +11,7 @@
from pulp_python.app.utils import (
artifact_to_metadata_artifact,
artifact_to_python_content_data,
copy_artifact_to_temp_file,
fetch_json_release_metadata,
parse_metadata,
)
Expand All @@ -19,7 +21,7 @@
log = logging.getLogger(__name__)


BULK_SIZE = 1000
BULK_SIZE = 250


def repair(repository_pk: UUID) -> None:
Expand Down Expand Up @@ -118,17 +120,24 @@ def repair_metadata(content: QuerySet[PythonPackageContent]) -> tuple[int, set[s
.first()
.artifact
)
new_data = artifact_to_python_content_data(package.filename, main_artifact, domain)
total_metadata_repaired += update_metadata_artifact_if_needed(
package,
new_data.get("metadata_sha256"),
main_artifact,
metadata_batch,
pkgs_metadata_not_repaired,
)
total_repaired += update_package_if_needed(
package, new_data, batch, set_of_update_fields
)
# Copy artifact to temp file once, reuse for both content data and metadata
temp_path = copy_artifact_to_temp_file(main_artifact, package.filename)
try:
new_data = artifact_to_python_content_data(
package.filename, main_artifact, domain, temp_path=temp_path
)
total_metadata_repaired += update_metadata_artifact_if_needed(
package,
new_data.get("metadata_sha256"),
main_artifact,
metadata_batch,
pkgs_metadata_not_repaired,
)
total_repaired += update_package_if_needed(
package, new_data, batch, set_of_update_fields
)
finally:
os.unlink(temp_path)

# For on-demand content, we expect that:
# 1. PythonPackageContent always has correct name and version
Expand Down
50 changes: 38 additions & 12 deletions pulp_python/app/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,37 @@ def compute_metadata_sha256(filename: str) -> str | None:
return hashlib.sha256(metadata_content).hexdigest() if metadata_content else None


def artifact_to_python_content_data(filename, artifact, domain=None):
def copy_artifact_to_temp_file(artifact, filename, tmp_dir="."):
"""
Copy an artifact's file to a temporary file on disk.

Returns the path to the temp file. The caller is responsible for cleanup.
"""
temp_file = tempfile.NamedTemporaryFile("wb", dir=tmp_dir, suffix=filename, delete=False)
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
temp_file.close()
return temp_file.name


def artifact_to_python_content_data(filename, artifact, domain=None, temp_path=None):
"""
Takes the artifact/filename and returns the metadata needed to create a PythonPackageContent.

If temp_path is provided, uses it instead of copying the artifact to a new temp file.
"""
# Copy file to a temp directory under the user provided filename, we do this
# because pkginfo validates that the filename has a valid extension before
# reading it
with tempfile.NamedTemporaryFile("wb", dir=".", suffix=filename) as temp_file:
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
metadata = get_project_metadata_from_file(temp_file.name)
if temp_path:
metadata = get_project_metadata_from_file(temp_path)
else:
with tempfile.NamedTemporaryFile("wb", dir=".", suffix=filename) as temp_file:
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
metadata = get_project_metadata_from_file(temp_file.name)
data = parse_project_metadata(vars(metadata))
data["sha256"] = artifact.sha256
data["size"] = artifact.size
Expand All @@ -262,19 +281,26 @@ def artifact_to_python_content_data(filename, artifact, domain=None):


def artifact_to_metadata_artifact(
filename: str, artifact: Artifact, tmp_dir: str = "."
filename: str, artifact: Artifact, tmp_dir: str = ".", temp_path: str | None = None
) -> Artifact | None:
"""
Creates artifact for metadata from the provided wheel artifact.

If temp_path is provided, uses it instead of copying the artifact to a new temp file.
"""
if not filename.endswith(".whl"):
return None

with tempfile.NamedTemporaryFile("wb", dir=tmp_dir, suffix=filename, delete=False) as temp_file:
temp_wheel_path = temp_file.name
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()
if temp_path:
temp_wheel_path = temp_path
else:
with tempfile.NamedTemporaryFile(
"wb", dir=tmp_dir, suffix=filename, delete=False
) as temp_file:
temp_wheel_path = temp_file.name
artifact.file.seek(0)
shutil.copyfileobj(artifact.file, temp_file)
temp_file.flush()

metadata_content = extract_wheel_metadata(temp_wheel_path)
if not metadata_content:
Expand Down
52 changes: 52 additions & 0 deletions pulp_python/tests/functional/api/test_repair.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,55 @@ def test_metadata_artifact_repair_endpoint(
# Check deduplication
assert len(main_artifact_hrefs) == 4
assert len(metadata_artifact_hrefs) == 3


def test_metadata_repair_batch_boundary(
create_content_direct,
delete_orphans_pre,
download_python_file,
monitor_task,
move_to_repository,
python_bindings,
python_repo_factory,
):
"""
Test that repair_metadata correctly handles packages across batch boundaries.

Verifies that the batch flush (at BULK_SIZE) does not lose or corrupt metadata
for packages processed before and after the flush point.
"""
python_repo = python_repo_factory()

# Create multiple wheel packages with wrong metadata
wheel_files = [
("scipy-1.1.0-cp27-none-win32.whl", "ME"),
("scipy-1.1.0-cp27-none-win_amd64.whl", "ME"),
("scipy-1.1.0-cp27-cp27m-manylinux1_x86_64.whl", "ME"),
]
content_hrefs = []
for filename, author in wheel_files:
url = urljoin(urljoin(PYTHON_FIXTURES_URL, "packages/"), filename)
file_path = download_python_file(filename, url)
data = {
"name": "scipy",
"version": "1.1.0",
"filename": filename,
"author": author,
"packagetype": "bdist",
}
content = create_content_direct(file_path, data)
assert content.author == "ME"
content_hrefs.append(content.pulp_href)

move_to_repository(python_repo.pulp_href, content_hrefs)

# Repair
response = python_bindings.RepositoriesPythonApi.repair_metadata(python_repo.pulp_href)
monitor_task(response.task)

# All packages should have repaired metadata (author != "ME")
for href in content_hrefs:
content = python_bindings.ContentPackagesApi.read(href)
assert content.author != "ME", f"Package {content.filename} was not repaired"
assert content.packagetype == "bdist_wheel"
assert content.metadata_sha256 is not None
Loading