Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ dependencies = [

[project.optional-dependencies]
cli = [
"wheel"
]
dev = [
"check-manifest",
Expand Down
196 changes: 66 additions & 130 deletions variantlib/commands/make_variant.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
from __future__ import annotations

import argparse
import base64
import email.parser
import email.policy
import hashlib
import logging
import os
import pathlib
import re
import tempfile

import wheel.cli.pack as whl_pck
from wheel.cli.unpack import unpack as wheel_unpack
import shutil
import zipfile

from variantlib.api import VariantDescription
from variantlib.api import VariantProperty
from variantlib.api import set_variant_metadata
from variantlib.api import validate_variant
from variantlib.constants import VARIANT_HASH_LEN
from variantlib.constants import WHEEL_NAME_VALIDATION_REGEX
from variantlib.errors import ValidationError

Expand All @@ -30,88 +27,6 @@
)


def wheel_variant_pack(
directory: str | pathlib.Path,
dest_dir: str | pathlib.Path,
variant_hash: str,
build_number: str | None = None,
) -> str:
"""Repack a previously unpacked wheel directory into a new wheel file.

The .dist-info/WHEEL file must contain one or more tags so that the target
wheel file name can be determined.

This function is heavily taken from:
https://github.com/pypa/wheel/blob/main/src/wheel/_commands/pack.py#L14

Minimal changes tried to be applied to make it work with the Variant Hash.

:param directory: The unpacked wheel directory
:param dest_dir: Destination directory (defaults to the current directory)
:param variant_hash: The hash of the variant to be stored
"""

# Input Validation
variant_hash_pattern = rf"^[a-fA-F0-9]{{{VARIANT_HASH_LEN}}}$"
if not re.match(variant_hash_pattern, variant_hash):
raise ValidationError(f"Invalid Variant Hash Value `{variant_hash}` ...")

# Find the .dist-info directory
dist_info_dirs = [
fn
for fn in os.listdir(directory) # noqa: PTH208
if os.path.isdir(os.path.join(directory, fn)) and whl_pck.DIST_INFO_RE.match(fn) # noqa: PTH112, PTH118
]
if len(dist_info_dirs) > 1:
raise whl_pck.WheelError(
f"Multiple .dist-info directories found in {directory}"
)
if not dist_info_dirs:
raise whl_pck.WheelError(f"No .dist-info directories found in {directory}")

# Determine the target wheel filename
dist_info_dir = dist_info_dirs[0]
name_version = whl_pck.DIST_INFO_RE.match(dist_info_dir).group("namever")

# Read the tags and the existing build number from .dist-info/WHEEL
wheel_file_path = os.path.join(directory, dist_info_dir, "WHEEL") # noqa: PTH118
with open(wheel_file_path, "rb") as f: # noqa: PTH123
info = whl_pck.BytesParser(policy=whl_pck.email.policy.compat32).parse(f)
tags: list[str] = info.get_all("Tag", [])
existing_build_number = info.get("Build")

if not tags:
raise whl_pck.WheelError(
f"No tags present in {dist_info_dir}/WHEEL; cannot determine target "
f"wheel filename"
)

# Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL
build_number = build_number if build_number is not None else existing_build_number
if build_number is not None:
del info["Build"]
if build_number:
info["Build"] = build_number
name_version += "-" + build_number

if build_number != existing_build_number:
with open(wheel_file_path, "wb") as f: # noqa: PTH123
whl_pck.BytesGenerator(f, maxheaderlen=0).flatten(info)

# Reassemble the tags for the wheel file
tagline = whl_pck.compute_tagline(tags)

# Repack the wheel
wheel_path = os.path.join(dest_dir, f"{name_version}-{tagline}-{variant_hash}.whl") # noqa: PTH118
with whl_pck.WheelFile(wheel_path, "w") as wf:
logging.info(
"Repacking wheel as `%(wheel_path)s` ...", {"wheel_path": wheel_path}
)
wf.write_files(directory)

return wheel_path


def make_variant(args: list[str]) -> None:
parser = argparse.ArgumentParser(
prog="make_variant",
Expand Down Expand Up @@ -165,7 +80,7 @@ def make_variant(args: list[str]) -> None:

# Input Validation - Wheel Filename is valid and non variant already.
wheel_info = WHEEL_NAME_VALIDATION_REGEX.match(input_filepath.name)
if not wheel_info:
if wheel_info is None:
raise ValueError(f"{input_filepath.name!r} is not a valid wheel filename.")

# Transform properties into a VariantDescription
Expand All @@ -185,45 +100,66 @@ def make_variant(args: list[str]) -> None:
f"{', '.join(x.to_str() for x in vdesc_valid.unknown_properties)}"
)

with tempfile.TemporaryDirectory() as _tmpdir:
tempdir = pathlib.Path(_tmpdir)
wheel_unpack(input_filepath, tempdir)

wheel_dir = next(tempdir.iterdir())
# Determine output wheel filename
output_filepath = (
output_directory
/ f"{wheel_info.group('base_wheel_name')}-{vdesc.hexdigest}.whl"
)

for _dir in wheel_dir.iterdir():
if _dir.is_dir() and _dir.name.endswith(".dist-info"):
distinfo_dir = _dir
break
with zipfile.ZipFile(input_filepath, "r") as input_zip:
# First, find METADATA file
for filename in input_zip.namelist():
components = filename.split("/", 2)
if (
len(components) == 2
and components[0].endswith(".dist-info")
and components[1] == "METADATA"
):
metadata_filename = filename.encode()
with input_zip.open(filename, "r") as input_file:
# Parse the metadata
metadata_parser = email.parser.BytesParser()
metadata = metadata_parser.parse(input_file)

# Update the metadata
set_variant_metadata(metadata, vdesc)

# Write the serialized metadata
new_metadata = metadata.as_bytes(policy=METADATA_POLICY)
break
else:
raise FileNotFoundError("Impossible to find the .dist-info directory.")

if not (metadata_f := distinfo_dir / "METADATA").exists():
raise FileNotFoundError(metadata_f)

with metadata_f.open(mode="r+b") as file:
# Parse the metadata
metadata_parser = email.parser.BytesParser()
metadata = metadata_parser.parse(file)

# Update the metadata
set_variant_metadata(metadata, vdesc)

# Move the file pointer to the beginning
file.seek(0)

# Write back the serialized metadata
file.write(metadata.as_bytes(policy=METADATA_POLICY))

# Truncate the file to remove any remaining old content
file.truncate()

dest_whl_path = wheel_variant_pack(
directory=wheel_dir,
dest_dir=output_directory,
variant_hash=vdesc.hexdigest,
)

logger.info(
"Variant Wheel Created: `%s`", pathlib.Path(dest_whl_path).resolve()
)
raise FileNotFoundError("No *.dist-info/METADATA file found in wheel")

with zipfile.ZipFile(output_filepath, "w") as output_zip:
for file_info in input_zip.infolist():
components = file_info.filename.split("/", 2)
with (
input_zip.open(file_info, "r") as input_file,
output_zip.open(file_info, "w") as output_file,
):
if (
len(components) != 2
or not components[0].endswith(".dist-info")
or components[1] not in ("METADATA", "RECORD")
):
shutil.copyfileobj(input_file, output_file)
elif components[1] == "METADATA":
# Write the new metadata
output_file.write(new_metadata)
else:
# Update RECORD for the new metadata checksum
for line in input_file:
new_line = line
rec_filename, sha256, size = line.split(b",")
if rec_filename == metadata_filename:
new_sha256 = base64.urlsafe_b64encode(
hashlib.sha256(new_metadata).digest()
).rstrip(b"=")
new_line = (
f"{rec_filename.decode()},"
f"sha256={new_sha256.decode()},"
f"{len(new_metadata)}\n"
).encode()
output_file.write(new_line)

logger.info("Variant Wheel Created: `%s`", output_filepath.resolve())
Loading