Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions Lib/test/test_zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,8 @@ def make_zip64_file(
self, file_size_64_set=False, file_size_extra=False,
compress_size_64_set=False, compress_size_extra=False,
header_offset_64_set=False, header_offset_extra=False,
extensible_data=b'',
end_of_central_dir_size=None, offset_to_end_of_central_dir=None,
):
"""Generate bytes sequence for a zip with (incomplete) zip64 data.
Expand Down Expand Up @@ -940,6 +942,12 @@ def make_zip64_file(

central_dir_size = struct.pack('<Q', 58 + 8 * len(central_zip64_fields))
offset_to_central_dir = struct.pack('<Q', 50 + 8 * len(local_zip64_fields))
if end_of_central_dir_size is None:
end_of_central_dir_size = 44 + len(extensible_data)
if offset_to_end_of_central_dir is None:
offset_to_end_of_central_dir = (108
+ 8 * len(local_zip64_fields)
+ 8 * len(central_zip64_fields))

local_extra_length = struct.pack("<H", 4 + 8 * len(local_zip64_fields))
central_extra_length = struct.pack("<H", 4 + 8 * len(central_zip64_fields))
Expand Down Expand Up @@ -968,14 +976,17 @@ def make_zip64_file(
+ filename
+ central_extra
# Zip64 end of central directory
+ b"PK\x06\x06,\x00\x00\x00\x00\x00\x00\x00-\x00-"
+ b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
+ b"PK\x06\x06"
+ struct.pack('<Q', end_of_central_dir_size)
+ b"-\x00-\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00"
+ b"\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00"
+ central_dir_size
+ offset_to_central_dir
+ extensible_data
# Zip64 end of central directory locator
+ b"PK\x06\x07\x00\x00\x00\x00l\x00\x00\x00\x00\x00\x00\x00\x01"
+ b"\x00\x00\x00"
+ b"PK\x06\x07\x00\x00\x00\x00"
+ struct.pack('<Q', offset_to_end_of_central_dir)
+ b"\x01\x00\x00\x00"
# end of central directory
+ b"PK\x05\x06\x00\x00\x00\x00\x01\x00\x01\x00:\x00\x00\x002\x00"
+ b"\x00\x00\x00\x00"
Expand Down Expand Up @@ -1006,6 +1017,7 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_file_size_extra))
self.assertIn('file size', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_file_size_extra)))

# zip64 file size present, zip64 compress size present, one field in
# extra, expecting two, equals missing compress size.
Expand All @@ -1017,6 +1029,7 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
self.assertIn('compress size', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_compress_size_extra)))

# zip64 compress size present, no fields in extra, expecting one,
# equals missing compress size.
Expand All @@ -1026,6 +1039,7 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_compress_size_extra))
self.assertIn('compress size', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_compress_size_extra)))

# zip64 file size present, zip64 compress size present, zip64 header
# offset present, two fields in extra, expecting three, equals missing
Expand All @@ -1040,6 +1054,7 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))

# zip64 compress size present, zip64 header offset present, one field
# in extra, expecting two, equals missing header offset
Expand All @@ -1052,6 +1067,7 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))

# zip64 file size present, zip64 header offset present, one field in
# extra, expecting two, equals missing header offset
Expand All @@ -1064,6 +1080,7 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))

# zip64 header offset present, no fields in extra, expecting one,
# equals missing header offset
Expand All @@ -1075,6 +1092,63 @@ def test_bad_zip64_extra(self):
with self.assertRaises(zipfile.BadZipFile) as e:
zipfile.ZipFile(io.BytesIO(missing_header_offset_extra))
self.assertIn('header offset', str(e.exception).lower())
self.assertTrue(zipfile.is_zipfile(io.BytesIO(missing_header_offset_extra)))

def test_bad_zip64_end_of_central_dir(self):
zipdata = self.make_zip64_file(end_of_central_dir_size=0)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))

zipdata = self.make_zip64_file(end_of_central_dir_size=100)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))

zipdata = self.make_zip64_file(offset_to_end_of_central_dir=0)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*record'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))

zipdata = self.make_zip64_file(offset_to_end_of_central_dir=1000)
with self.assertRaisesRegex(zipfile.BadZipFile, 'Corrupt.*locator'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))

def test_zip64_end_of_central_dir_record_not_found(self):
zipdata = self.make_zip64_file()
zipdata = zipdata.replace(b"PK\x06\x06", b'\x00'*4)
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))

zipdata = self.make_zip64_file(
extensible_data=b'\xca\xfe\x04\x00\x00\x00data')
zipdata = zipdata.replace(b"PK\x06\x06", b'\x00'*4)
with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
zipfile.ZipFile(io.BytesIO(zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(zipdata)))

def test_zip64_extensible_data(self):
# These values are what is set in the make_zip64_file method.
expected_file_size = 8
expected_compress_size = 8
expected_header_offset = 0
expected_content = b"test1234"

zipdata = self.make_zip64_file(
extensible_data=b'\xca\xfe\x04\x00\x00\x00data')
with zipfile.ZipFile(io.BytesIO(zipdata)) as zf:
zinfo = zf.infolist()[0]
self.assertEqual(zinfo.file_size, expected_file_size)
self.assertEqual(zinfo.compress_size, expected_compress_size)
self.assertEqual(zinfo.header_offset, expected_header_offset)
self.assertEqual(zf.read(zinfo), expected_content)
self.assertTrue(zipfile.is_zipfile(io.BytesIO(zipdata)))

with self.assertRaisesRegex(zipfile.BadZipFile, 'record not found'):
zipfile.ZipFile(io.BytesIO(b'prepended' + zipdata))
self.assertFalse(zipfile.is_zipfile(io.BytesIO(b'prepended' + zipdata)))

def test_generated_valid_zip64_extra(self):
# These values are what is set in the make_zip64_file method.
Expand Down
51 changes: 32 additions & 19 deletions Lib/zipfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,41 +236,57 @@ def is_zipfile(filename):
else:
with open(filename, "rb") as fp:
result = _check_zipfile(fp)
except OSError:
except (OSError, BadZipFile):
pass
return result

def _EndRecData64(fpin, offset, endrec):
"""
Read the ZIP64 end-of-archive records and use that to update endrec
"""
try:
fpin.seek(offset - sizeEndCentDir64Locator, 2)
except OSError:
# If the seek fails, the file is not large enough to contain a ZIP64
offset -= sizeEndCentDir64Locator
if offset < 0:
# The file is not large enough to contain a ZIP64
# end-of-archive record, so just return the end record we were given.
return endrec

fpin.seek(offset)
data = fpin.read(sizeEndCentDir64Locator)
if len(data) != sizeEndCentDir64Locator:
return endrec
raise OSError("Unknown I/O error")
sig, diskno, reloff, disks = struct.unpack(structEndArchive64Locator, data)
if sig != stringEndArchive64Locator:
return endrec

if diskno != 0 or disks > 1:
raise BadZipFile("zipfiles that span multiple disks are not supported")

# Assume no 'zip64 extensible data'
fpin.seek(offset - sizeEndCentDir64Locator - sizeEndCentDir64, 2)
offset -= sizeEndCentDir64
if reloff > offset:
raise BadZipFile("Corrupt zip64 end of central directory locator")
# First, check the assumption that there is no prepended data.
fpin.seek(reloff)
extrasz = offset - reloff
data = fpin.read(sizeEndCentDir64)
if len(data) != sizeEndCentDir64:
return endrec
raise OSError("Unknown I/O error")
if not data.startswith(stringEndArchive64) and reloff != offset:
# Since we already have seen the Zip64 EOCD Locator, it's
# possible we got here because there is prepended data.
# Assume no 'zip64 extensible data'
fpin.seek(offset)
extrasz = 0
data = fpin.read(sizeEndCentDir64)
if len(data) != sizeEndCentDir64:
raise OSError("Unknown I/O error")
if not data.startswith(stringEndArchive64):
raise BadZipFile("Zip64 end of central directory record not found")

sig, sz, create_version, read_version, disk_num, disk_dir, \
dircount, dircount2, dirsize, diroffset = \
struct.unpack(structEndArchive64, data)
if sig != stringEndArchive64:
return endrec
if (diroffset + dirsize != reloff or
sz + 12 != sizeEndCentDir64 + extrasz):
raise BadZipFile("Corrupt zip64 end of central directory record")

# Update the original endrec using data from the ZIP64 record
endrec[_ECD_SIGNATURE] = sig
Expand All @@ -280,6 +296,7 @@ def _EndRecData64(fpin, offset, endrec):
endrec[_ECD_ENTRIES_TOTAL] = dircount2
endrec[_ECD_SIZE] = dirsize
endrec[_ECD_OFFSET] = diroffset
endrec[_ECD_LOCATION] = offset - extrasz
return endrec


Expand Down Expand Up @@ -313,7 +330,7 @@ def _EndRecData(fpin):
endrec.append(filesize - sizeEndCentDir)

# Try to read the "Zip64 end of central directory" structure
return _EndRecData64(fpin, -sizeEndCentDir, endrec)
return _EndRecData64(fpin, filesize - sizeEndCentDir, endrec)

# Either this is not a ZIP file, or it is a ZIP file with an archive
# comment. Search the end of the file for the "end of central directory"
Expand All @@ -337,8 +354,7 @@ def _EndRecData(fpin):
endrec.append(maxCommentStart + start)

# Try to read the "Zip64 end of central directory" structure
return _EndRecData64(fpin, maxCommentStart + start - filesize,
endrec)
return _EndRecData64(fpin, maxCommentStart + start, endrec)

# Unable to find a valid end of central directory structure
return None
Expand Down Expand Up @@ -1386,9 +1402,6 @@ def _RealGetContents(self):

# "concat" is zero, unless zip was concatenated to another file
concat = endrec[_ECD_LOCATION] - size_cd - offset_cd
if endrec[_ECD_SIGNATURE] == stringEndArchive64:
# If Zip64 extension structures are present, account for them
concat -= (sizeEndCentDir64 + sizeEndCentDir64Locator)

if self.debug > 2:
inferred = concat + offset_cd
Expand Down Expand Up @@ -1989,7 +2002,7 @@ def _write_end_record(self):
" would require ZIP64 extensions")
zip64endrec = struct.pack(
structEndArchive64, stringEndArchive64,
44, 45, 45, 0, 0, centDirCount, centDirCount,
sizeEndCentDir64 - 12, 45, 45, 0, 0, centDirCount, centDirCount,
centDirSize, centDirOffset)
self.fp.write(zip64endrec)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Check consistency of the zip64 end of central directory record. Support
records with "zip64 extensible data" if there are no bytes prepended to the
ZIP file.
Loading