From bbdbb717efede5350dbd42245d96790febf78542 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Fri, 10 Oct 2025 16:21:25 -0500 Subject: [PATCH 1/3] Further lock-down ZIP archive features --- tests/unit/utils/test_zipfiles.py | 36 ++++- tests/unit/utils/zipdata/accept/empty.zip | Bin 0 -> 22 bytes .../unit/utils/zipdata/reject/cd_comment.zip | Bin 0 -> 113 bytes .../{accept => reject}/data_descriptor.zip | Bin .../data_descriptor_bad_crc.zip | Bin .../data_descriptor_bad_csize.zip | Bin .../data_descriptor_bad_usize.zip | Bin .../data_descriptor_bad_usize_no_sig.zip | Bin .../data_descriptor_zip64.zip | Bin .../data_descriptor_zip64_csize.zip | Bin .../data_descriptor_zip64_usize.zip | Bin warehouse/utils/zipfiles.py | 138 +++++++++++++----- 12 files changed, 137 insertions(+), 37 deletions(-) create mode 100644 tests/unit/utils/zipdata/accept/empty.zip create mode 100644 tests/unit/utils/zipdata/reject/cd_comment.zip rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_bad_crc.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_bad_csize.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_bad_usize.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_bad_usize_no_sig.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_zip64.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_zip64_csize.zip (100%) rename tests/unit/utils/zipdata/{accept => reject}/data_descriptor_zip64_usize.zip (100%) diff --git a/tests/unit/utils/test_zipfiles.py b/tests/unit/utils/test_zipfiles.py index 6e64d5b8f363..c269284133b1 100644 --- a/tests/unit/utils/test_zipfiles.py +++ b/tests/unit/utils/test_zipfiles.py @@ -23,7 +23,18 @@ def zippath(filename: str): ("reject/8bitcomment.zip", "Filename not in central directory"), ("reject/cd_extra_entry.zip", "Duplicate filename in central directory"), ("reject/cd_missing_entry.zip", "Filename not in central directory"), - ("reject/data_descriptor_bad_crc_0.zip", "Unknown record signature"), + ("reject/data_descriptor.zip", "ZIP contains a data descriptor"), + ("reject/data_descriptor_bad_crc.zip", "ZIP contains a data descriptor"), + ("reject/data_descriptor_bad_crc_0.zip", "ZIP contains a data descriptor"), + ("reject/data_descriptor_bad_csize.zip", "ZIP contains a data descriptor"), + ("reject/data_descriptor_bad_usize.zip", "ZIP contains a data descriptor"), + ( + "reject/data_descriptor_bad_usize_no_sig.zip", + "ZIP contains a data descriptor", + ), + ("reject/data_descriptor_zip64.zip", "ZIP contains a data descriptor"), + ("reject/data_descriptor_zip64_csize.zip", "ZIP contains a data descriptor"), + ("reject/data_descriptor_zip64_usize.zip", "ZIP contains a data descriptor"), ("reject/dupe_eocd.zip", "Truncated central directory"), ( "reject/eocd64_locator_mismatch.zip", @@ -37,10 +48,10 @@ def zippath(filename: str): ("reject/non_ascii_original_name.zip", "Filename not unicode"), ("reject/not.zip", "File is not a zip file"), ("reject/prefix.zip", "Unknown record signature"), - ("reject/second_unicode_extra.zip", "Filename not in central directory"), + ("reject/second_unicode_extra.zip", "Invalid duplicate extra in local file"), ("reject/shortextra.zip", "Corrupt extra field 7075 (size=9)"), ("reject/suffix_not_comment.zip", "Trailing data"), - ("reject/unicode_extra_chain.zip", "Filename not in central directory"), + ("reject/unicode_extra_chain.zip", "Invalid duplicate extra in local file"), ("reject/wheel-1.0-py3-none-any.whl", "Duplicate filename in local headers"), ("reject/zip64_eocd_confusion.zip", "Filename not in central directory"), ("reject/zip64_eocd_extensible_data.zip", "Bad offset for central directory"), @@ -70,8 +81,8 @@ def test_bad_zips(filename, error): @pytest.mark.parametrize("filename", list(os.listdir(ZIPDATA_DIR / "accept"))) def test_good_zips(filename): result = zipfiles.validate_zipfile(zippath(f"accept/{filename}")) - assert result[0] is True assert result[1] is None + assert result[0] is True def test_local_file_header(): @@ -151,3 +162,20 @@ def test_local_file_header_zip64_extra_no_compressed_size_nok_using_deflate(): fp = io.BytesIO(header + b"a" + extra + b"a") with pytest.raises(zipfiles.InvalidZipFileError): zipfiles._handle_local_file_header(fp, {"a": 0}) + + +def test_local_file_invalid_filename(): + header = struct.pack("89 bytes: return data +def _contains_unprintable_chars(value: bytes) -> bool: + return any(ch in UNPRINTABLE_CHARS for ch in value) + + def _handle_local_file_header( fp: typing.IO[bytes], zipfile_files_and_sizes: dict[str, int] ) -> bytes: @@ -53,16 +65,22 @@ def _handle_local_file_header( filename = _read_check(fp, filename_size) extra = _read_check(fp, extra_size) + if _contains_unprintable_chars(filename): + raise InvalidZipFileError("Invalid character in filename") + # Search for the ZIP64 extension in extras. - is_zip64 = False + seen_extra_ids = set() while extra: if len(extra) < 4: raise InvalidZipFileError("Malformed zip file") - extra_header, extra_data_size = struct.unpack(" len(extra): raise InvalidZipFileError("Malformed zip file") - if extra_header == 0x0001: - is_zip64 = True + if extra_id in seen_extra_ids and extra_id in DISALLOW_DUPLICATE_EXTRA_IDS: + raise InvalidZipFileError("Invalid duplicate extra in local file") + seen_extra_ids.add(extra_id) + + if extra_id == 0x0001: # ZIP64 extras must be one of these lengths. if extra_data_size not in (0, 8, 16, 24, 28): @@ -89,17 +107,32 @@ def _handle_local_file_header( # We receive an explicit compressed ZIP64 size. # This is the second field in the extra data. (compressed_size,) = struct.unpack(" bytes: +def _handle_central_directory_header(fp: typing.IO[bytes]) -> tuple[bytes, bytes]: """ Parses the body of a Central Directory (CD) header. Returns the contained filename field of the record. @@ -134,13 +155,18 @@ def _handle_central_directory_header(fp: typing.IO[bytes]) -> bytes: compressed_size, filename_size, extra_size, comment_size, offset = struct.unpack( " None: +def _handle_eocd(fp: typing.IO[bytes]) -> tuple[int, int, int]: """ Parses the body of an End of Central Directory (EOCD) record. @@ -148,21 +174,32 @@ def _handle_eocd(fp: typing.IO[bytes]) -> None: """ data = _read_check(fp, 18) ( + cd_records_on_disk, + cd_records, + cd_size, cd_offset, comment_size, - ) = struct.unpack(" None: +def _handle_eocd64(fp: typing.IO[bytes]) -> tuple[int, int, int]: """ Parses the body of an ZIP64 End of Central Directory (EOCD64) record. See section 4.3.14 of APPNOTE.TXT. """ - data = _read_check(fp, 8) - (eocd64_size,) = struct.unpack(" int: @@ -215,6 +252,17 @@ def validate_zipfile(zip_filepath: str) -> tuple[bool, str | None]: expected_eocd64_offset = None actual_eocd64_offset = None + # Track the number of CD records + # and their sizes. + cd_records = 0 + cd_offset = None + cd_size = 0 + + # Values from EOCD or EOCD64. + eocd_cd_records = None + eocd_cd_offset = None + eocd_cd_size = None + while True: try: signature = _read_check(fp, 4) @@ -244,7 +292,14 @@ def validate_zipfile(zip_filepath: str) -> tuple[bool, str | None]: # Central Directory File Header if signature == RECORD_SIG_CENTRAL_DIRECTORY: - filename = _handle_central_directory_header(fp) + # Record the first CD record we find as + # the start of the central directory. + if cd_offset is None: + cd_offset = fp.tell() - 4 + cd_records += 1 + + filename, extra = _handle_central_directory_header(fp) + cd_size += 46 + len(filename) + len(extra) if filename in cd_filenames: raise InvalidZipFileError( "Duplicate filename in central directory" @@ -262,7 +317,24 @@ def validate_zipfile(zip_filepath: str) -> tuple[bool, str | None]: # End of Central Directory elif signature == RECORD_SIG_EOCD: - _handle_eocd(fp) + # If the ZIP is empty then we expect + # to see zero CD entries. + if cd_offset is None: + cd_offset = fp.tell() - 4 + + # If this archive is ZIP64 we use the values + # from the EOCD64 values, otherwise use EOCD values. + if actual_eocd64_offset is not None and eocd_cd_offset is not None: + _handle_eocd(fp) + else: + eocd_cd_records, eocd_cd_size, eocd_cd_offset = _handle_eocd(fp) + + if eocd_cd_records != cd_records: + raise InvalidZipFileError("Malformed zip file") + if cd_offset is None or eocd_cd_offset != cd_offset: + raise InvalidZipFileError("Malformed zip file") + if cd_size is None or eocd_cd_size != cd_size: + raise InvalidZipFileError("Malformed zip file") break # This always means the end of a ZIP. # End of Central Directory (ZIP64) @@ -271,7 +343,7 @@ def validate_zipfile(zip_filepath: str) -> tuple[bool, str | None]: # we see EOCD64 Locator later. # -4 because we just read signature bytes. expected_eocd64_offset = fp.tell() - 4 - _handle_eocd64(fp) + eocd_cd_records, eocd_cd_size, eocd_cd_offset = _handle_eocd64(fp) # End of Central Directory (ZIP64) Locator elif signature == RECORD_SIG_EOCD64_LOCATOR: From 81bfcf7e17d5283a49a6b9ecb193d251210dd2b1 Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Thu, 16 Oct 2025 09:37:33 -0500 Subject: [PATCH 2/3] Get 100% coverage --- tests/unit/utils/test_zipfiles.py | 166 ++++++++++++++++++++++++++++++ warehouse/utils/zipfiles.py | 18 ++-- 2 files changed, 178 insertions(+), 6 deletions(-) diff --git a/tests/unit/utils/test_zipfiles.py b/tests/unit/utils/test_zipfiles.py index c269284133b1..4540e4d6a5dd 100644 --- a/tests/unit/utils/test_zipfiles.py +++ b/tests/unit/utils/test_zipfiles.py @@ -4,6 +4,7 @@ import os import pathlib import struct +import tempfile import pytest @@ -179,3 +180,168 @@ def test_local_file_invalid_filename_in_unicode_extra(): with pytest.raises(zipfiles.InvalidZipFileError) as e: zipfiles._handle_local_file_header(fp, {"a": 0}) assert str(e.value) == "Invalid character in filename" + + +def test_local_file_invalid_filename_utf8(): + extra = struct.pack(" tuple[bool, str | None]: try: zfp = zipfile.ZipFile(zip_filepath, mode="r") # Store compression sizes from the CD for use later. - zipfile_files = {zfi.filename: zfi.compress_size for zfi in zfp.filelist} + zipfile_files = {zfi.orig_filename: zfi.compress_size for zfi in zfp.filelist} except zipfile.BadZipfile as e: return False, e.args[0] @@ -330,11 +330,17 @@ def validate_zipfile(zip_filepath: str) -> tuple[bool, str | None]: eocd_cd_records, eocd_cd_size, eocd_cd_offset = _handle_eocd(fp) if eocd_cd_records != cd_records: - raise InvalidZipFileError("Malformed zip file") + raise InvalidZipFileError( + "Mismatched central directory records" + ) if cd_offset is None or eocd_cd_offset != cd_offset: - raise InvalidZipFileError("Malformed zip file") - if cd_size is None or eocd_cd_size != cd_size: - raise InvalidZipFileError("Malformed zip file") + raise InvalidZipFileError("Mismatched central directory offset") + # This branch is tough to cover, as CPython's ZIP archive + # implementation already doesn't like mismatches between size + # and offset of the CD. + if cd_size is None or eocd_cd_size != cd_size: # pragma: no cover + raise InvalidZipFileError("Mismatched central directory size") + break # This always means the end of a ZIP. # End of Central Directory (ZIP64) From 51d815d2a7a35d5d5c943bbd04d5f98b736b9a5c Mon Sep 17 00:00:00 2001 From: Seth Michael Larson Date: Fri, 17 Oct 2025 09:59:11 -0500 Subject: [PATCH 3/3] Add unexpected error to assert message --- tests/unit/utils/test_zipfiles.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/utils/test_zipfiles.py b/tests/unit/utils/test_zipfiles.py index 4540e4d6a5dd..04f6370a402d 100644 --- a/tests/unit/utils/test_zipfiles.py +++ b/tests/unit/utils/test_zipfiles.py @@ -82,8 +82,8 @@ def test_bad_zips(filename, error): @pytest.mark.parametrize("filename", list(os.listdir(ZIPDATA_DIR / "accept"))) def test_good_zips(filename): result = zipfiles.validate_zipfile(zippath(f"accept/{filename}")) + assert result[0] is True, f"Expected no error, got {result[1]!r}" assert result[1] is None - assert result[0] is True def test_local_file_header():