diff --git a/pyproject.toml b/pyproject.toml index 22d339b..9446c67 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,6 @@ dependencies = [ [project.optional-dependencies] cli = [ - "wheel" ] dev = [ "check-manifest", diff --git a/variantlib/commands/make_variant.py b/variantlib/commands/make_variant.py index 44270ce..bf5d59f 100644 --- a/variantlib/commands/make_variant.py +++ b/variantlib/commands/make_variant.py @@ -1,22 +1,19 @@ from __future__ import annotations import argparse +import base64 import email.parser import email.policy +import hashlib import logging -import os import pathlib -import re -import tempfile - -import wheel.cli.pack as whl_pck -from wheel.cli.unpack import unpack as wheel_unpack +import shutil +import zipfile from variantlib.api import VariantDescription from variantlib.api import VariantProperty from variantlib.api import set_variant_metadata from variantlib.api import validate_variant -from variantlib.constants import VARIANT_HASH_LEN from variantlib.constants import WHEEL_NAME_VALIDATION_REGEX from variantlib.errors import ValidationError @@ -30,88 +27,6 @@ ) -def wheel_variant_pack( - directory: str | pathlib.Path, - dest_dir: str | pathlib.Path, - variant_hash: str, - build_number: str | None = None, -) -> str: - """Repack a previously unpacked wheel directory into a new wheel file. - - The .dist-info/WHEEL file must contain one or more tags so that the target - wheel file name can be determined. - - This function is heavily taken from: - https://github.com/pypa/wheel/blob/main/src/wheel/_commands/pack.py#L14 - - Minimal changes tried to be applied to make it work with the Variant Hash. - - :param directory: The unpacked wheel directory - :param dest_dir: Destination directory (defaults to the current directory) - :param variant_hash: The hash of the variant to be stored - """ - - # Input Validation - variant_hash_pattern = rf"^[a-fA-F0-9]{{{VARIANT_HASH_LEN}}}$" - if not re.match(variant_hash_pattern, variant_hash): - raise ValidationError(f"Invalid Variant Hash Value `{variant_hash}` ...") - - # Find the .dist-info directory - dist_info_dirs = [ - fn - for fn in os.listdir(directory) # noqa: PTH208 - if os.path.isdir(os.path.join(directory, fn)) and whl_pck.DIST_INFO_RE.match(fn) # noqa: PTH112, PTH118 - ] - if len(dist_info_dirs) > 1: - raise whl_pck.WheelError( - f"Multiple .dist-info directories found in {directory}" - ) - if not dist_info_dirs: - raise whl_pck.WheelError(f"No .dist-info directories found in {directory}") - - # Determine the target wheel filename - dist_info_dir = dist_info_dirs[0] - name_version = whl_pck.DIST_INFO_RE.match(dist_info_dir).group("namever") - - # Read the tags and the existing build number from .dist-info/WHEEL - wheel_file_path = os.path.join(directory, dist_info_dir, "WHEEL") # noqa: PTH118 - with open(wheel_file_path, "rb") as f: # noqa: PTH123 - info = whl_pck.BytesParser(policy=whl_pck.email.policy.compat32).parse(f) - tags: list[str] = info.get_all("Tag", []) - existing_build_number = info.get("Build") - - if not tags: - raise whl_pck.WheelError( - f"No tags present in {dist_info_dir}/WHEEL; cannot determine target " - f"wheel filename" - ) - - # Set the wheel file name and add/replace/remove the Build tag in .dist-info/WHEEL - build_number = build_number if build_number is not None else existing_build_number - if build_number is not None: - del info["Build"] - if build_number: - info["Build"] = build_number - name_version += "-" + build_number - - if build_number != existing_build_number: - with open(wheel_file_path, "wb") as f: # noqa: PTH123 - whl_pck.BytesGenerator(f, maxheaderlen=0).flatten(info) - - # Reassemble the tags for the wheel file - tagline = whl_pck.compute_tagline(tags) - - # Repack the wheel - wheel_path = os.path.join(dest_dir, f"{name_version}-{tagline}-{variant_hash}.whl") # noqa: PTH118 - with whl_pck.WheelFile(wheel_path, "w") as wf: - logging.info( - "Repacking wheel as `%(wheel_path)s` ...", {"wheel_path": wheel_path} - ) - wf.write_files(directory) - - return wheel_path - - def make_variant(args: list[str]) -> None: parser = argparse.ArgumentParser( prog="make_variant", @@ -165,7 +80,7 @@ def make_variant(args: list[str]) -> None: # Input Validation - Wheel Filename is valid and non variant already. wheel_info = WHEEL_NAME_VALIDATION_REGEX.match(input_filepath.name) - if not wheel_info: + if wheel_info is None: raise ValueError(f"{input_filepath.name!r} is not a valid wheel filename.") # Transform properties into a VariantDescription @@ -185,45 +100,66 @@ def make_variant(args: list[str]) -> None: f"{', '.join(x.to_str() for x in vdesc_valid.unknown_properties)}" ) - with tempfile.TemporaryDirectory() as _tmpdir: - tempdir = pathlib.Path(_tmpdir) - wheel_unpack(input_filepath, tempdir) - - wheel_dir = next(tempdir.iterdir()) + # Determine output wheel filename + output_filepath = ( + output_directory + / f"{wheel_info.group('base_wheel_name')}-{vdesc.hexdigest}.whl" + ) - for _dir in wheel_dir.iterdir(): - if _dir.is_dir() and _dir.name.endswith(".dist-info"): - distinfo_dir = _dir - break + with zipfile.ZipFile(input_filepath, "r") as input_zip: + # First, find METADATA file + for filename in input_zip.namelist(): + components = filename.split("/", 2) + if ( + len(components) == 2 + and components[0].endswith(".dist-info") + and components[1] == "METADATA" + ): + metadata_filename = filename.encode() + with input_zip.open(filename, "r") as input_file: + # Parse the metadata + metadata_parser = email.parser.BytesParser() + metadata = metadata_parser.parse(input_file) + + # Update the metadata + set_variant_metadata(metadata, vdesc) + + # Write the serialized metadata + new_metadata = metadata.as_bytes(policy=METADATA_POLICY) + break else: - raise FileNotFoundError("Impossible to find the .dist-info directory.") - - if not (metadata_f := distinfo_dir / "METADATA").exists(): - raise FileNotFoundError(metadata_f) - - with metadata_f.open(mode="r+b") as file: - # Parse the metadata - metadata_parser = email.parser.BytesParser() - metadata = metadata_parser.parse(file) - - # Update the metadata - set_variant_metadata(metadata, vdesc) - - # Move the file pointer to the beginning - file.seek(0) - - # Write back the serialized metadata - file.write(metadata.as_bytes(policy=METADATA_POLICY)) - - # Truncate the file to remove any remaining old content - file.truncate() - - dest_whl_path = wheel_variant_pack( - directory=wheel_dir, - dest_dir=output_directory, - variant_hash=vdesc.hexdigest, - ) - - logger.info( - "Variant Wheel Created: `%s`", pathlib.Path(dest_whl_path).resolve() - ) + raise FileNotFoundError("No *.dist-info/METADATA file found in wheel") + + with zipfile.ZipFile(output_filepath, "w") as output_zip: + for file_info in input_zip.infolist(): + components = file_info.filename.split("/", 2) + with ( + input_zip.open(file_info, "r") as input_file, + output_zip.open(file_info, "w") as output_file, + ): + if ( + len(components) != 2 + or not components[0].endswith(".dist-info") + or components[1] not in ("METADATA", "RECORD") + ): + shutil.copyfileobj(input_file, output_file) + elif components[1] == "METADATA": + # Write the new metadata + output_file.write(new_metadata) + else: + # Update RECORD for the new metadata checksum + for line in input_file: + new_line = line + rec_filename, sha256, size = line.split(b",") + if rec_filename == metadata_filename: + new_sha256 = base64.urlsafe_b64encode( + hashlib.sha256(new_metadata).digest() + ).rstrip(b"=") + new_line = ( + f"{rec_filename.decode()}," + f"sha256={new_sha256.decode()}," + f"{len(new_metadata)}\n" + ).encode() + output_file.write(new_line) + + logger.info("Variant Wheel Created: `%s`", output_filepath.resolve())