From d67239a21a7c75fc02aa56557441f14902a81aeb Mon Sep 17 00:00:00 2001 From: Hiroshi Miura Date: Sat, 20 Jun 2020 22:48:27 +0900 Subject: [PATCH] Drop get_password() helper - Pass password into Header object - Keep password in folders Signed-off-by: Hiroshi Miura --- py7zr/archiveinfo.py | 16 ++++++++++------ py7zr/compressor.py | 10 ++++------ py7zr/helpers.py | 16 ---------------- py7zr/py7zr.py | 11 ++++++----- tests/test_extra_codecs.py | 25 ++++--------------------- 5 files changed, 24 insertions(+), 54 deletions(-) diff --git a/py7zr/archiveinfo.py b/py7zr/archiveinfo.py index b185798a..1d1ac7f8 100644 --- a/py7zr/archiveinfo.py +++ b/py7zr/archiveinfo.py @@ -309,7 +309,7 @@ class Folder: """ __slots__ = ['unpacksizes', 'solid', 'coders', 'digestdefined', 'num_bindpairs', 'num_packedstreams', - 'bindpairs', 'packed_indices', 'crc', 'decompressor', 'compressor', 'files'] + 'bindpairs', 'packed_indices', 'crc', 'compressor', 'decompressor', 'files', 'password'] def __init__(self) -> None: self.unpacksizes = [] # type: List[int] @@ -325,6 +325,8 @@ def __init__(self) -> None: self.decompressor = None # type: Optional[SevenZipDecompressor] self.compressor = None # type: Optional[SevenZipCompressor] self.files = None + # encryption + self.password = None @classmethod def retrieve(cls, file: BinaryIO): @@ -367,7 +369,7 @@ def _read(self, file: BinaryIO) -> None: self.packed_indices.append(read_uint64(file)) def prepare_coderinfo(self, filters): - self.compressor = SevenZipCompressor(filters=filters) + self.compressor = SevenZipCompressor(filters=filters, password=self.password) self.coders = self.compressor.coders assert len(self.coders) > 0 self.solid = True @@ -408,7 +410,7 @@ def get_decompressor(self, packsize: int, reset: bool = False) -> SevenZipDecomp if self.decompressor is not None and not reset: return self.decompressor else: - self.decompressor = SevenZipDecompressor(self.coders, packsize, self.unpacksizes, self.crc) + self.decompressor = SevenZipDecompressor(self.coders, packsize, self.unpacksizes, self.crc, self.password) return self.decompressor def get_compressor(self) -> SevenZipCompressor: @@ -865,10 +867,12 @@ def __init__(self) -> None: self.files_info = None self.size = 0 # fixme. Not implemented yet self._start_pos = 0 + self.password = None @classmethod - def retrieve(cls, fp: BinaryIO, buffer: BytesIO, start_pos: int): + def retrieve(cls, fp: BinaryIO, buffer: BytesIO, start_pos: int, password=None): obj = cls() + obj.password = password obj._read(fp, buffer, start_pos) return obj @@ -907,7 +911,7 @@ def _get_headerdata_from_streams(self, fp: BinaryIO, streams: StreamsInfo) -> By uncompressed = [uncompressed] * len(folder.coders) compressed_size = streams.packinfo.packsizes[0] uncompressed_size = uncompressed[-1] - + folder.password = self.password src_start += streams.packinfo.packpos fp.seek(src_start, 0) decompressor = folder.get_decompressor(compressed_size) @@ -926,8 +930,8 @@ def _encode_header(self, file: BinaryIO, afterheader: int, filters): buf = io.BytesIO() _, raw_header_len, raw_crc = self.write(buf, 0, False) folder = Folder() + folder.password=self.password folder.prepare_coderinfo(filters=filters) - assert folder.compressor is not None streams = HeaderStreamsInfo() streams.unpackinfo.folders = [folder] streams.packinfo.packpos = packpos diff --git a/py7zr/compressor.py b/py7zr/compressor.py index 17794129..1475776e 100644 --- a/py7zr/compressor.py +++ b/py7zr/compressor.py @@ -32,7 +32,7 @@ from Crypto.Random import get_random_bytes from py7zr.exceptions import UnsupportedCompressionMethodError -from py7zr.helpers import Buffer, calculate_crc32, calculate_key, get_password +from py7zr.helpers import Buffer, calculate_crc32, calculate_key from py7zr.properties import (FILTER_ARM, FILTER_ARMTHUMB, FILTER_BZIP2, FILTER_COPY, FILTER_CRYPTO_AES256_SHA256, FILTER_DEFLATE, FILTER_DELTA, FILTER_IA64, FILTER_LZMA, FILTER_LZMA2, FILTER_POWERPC, FILTER_SPARC, FILTER_X86, FILTER_ZSTD, MAGIC_7Z, READ_BLOCKSIZE, CompressionMethod) @@ -353,9 +353,8 @@ class SevenZipDecompressor: """Main decompressor object which is properly configured and bind to each 7zip folder. because 7zip folder can have a custom compression method""" - def __init__(self, coders: List[Dict[str, Any]], packsize: int, unpacksizes: List[int], crc: Optional[int]) -> None: - # Get password which was set when creation of py7zr.SevenZipFile object. - password = get_password() + def __init__(self, coders: List[Dict[str, Any]], packsize: int, unpacksizes: List[int], crc: Optional[int], + password: Optional[str] = None) -> None: self.input_size = packsize self.unpacksizes = unpacksizes self.consumed = 0 # type: int @@ -452,8 +451,7 @@ class SevenZipCompressor: __slots__ = ['filters', 'compressor', 'coders', 'cchain', 'methods_map'] - def __init__(self, filters=None): - password = get_password() + def __init__(self, filters=None, password=None): if filters is None: self.filters = [{"id": lzma.FILTER_LZMA2, "preset": 7 | lzma.PRESET_EXTREME}] else: diff --git a/py7zr/helpers.py b/py7zr/helpers.py index ca24baf1..d56490e0 100644 --- a/py7zr/helpers.py +++ b/py7zr/helpers.py @@ -403,19 +403,3 @@ def __len__(self) -> int: def __bytes__(self): return bytes(self._buf[0:self._buflen]) - - -def get_password(): - clsname = 'SevenZipFile' - res = None - caller = inspect.currentframe().f_back - for oframe in inspect.getouterframes(caller.f_back): - if 'self' in oframe.frame.f_locals: - self = oframe.frame.f_locals['self'] - if type(self).__name__ == clsname: - if 'password' in self.__dict__: - res = self.password - break - if res is None: - res = '' - return res diff --git a/py7zr/py7zr.py b/py7zr/py7zr.py index dc8d5163..8554e7d9 100644 --- a/py7zr/py7zr.py +++ b/py7zr/py7zr.py @@ -275,7 +275,6 @@ def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r', dereference=False, password: Optional[str] = None) -> None: if mode not in ('r', 'w', 'x', 'a'): raise ValueError("ZipFile requires mode 'r', 'w', 'x', or 'a'") - self.password = password self.password_protected = (password is not None) # Check if we were passed a file-like object or not if isinstance(file, str): @@ -316,7 +315,7 @@ def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r', self._fileRefCnt = 1 try: if mode == "r": - self._real_get_contents() + self._real_get_contents(password) elif mode in 'w': if password is not None and filters is None: filters = ENCRYPTED_ARCHIVE_DEFAULT @@ -325,6 +324,7 @@ def __init__(self, file: Union[BinaryIO, str, pathlib.Path], mode: str = 'r', else: pass folder = Folder() + folder.password = password folder.prepare_coderinfo(filters) self.files = ArchiveFileList() self.sig_header = SignatureHeader() @@ -361,7 +361,7 @@ def _fpclose(self) -> None: if not self._fileRefCnt and not self._filePassed: self.fp.close() - def _real_get_contents(self) -> None: + def _real_get_contents(self, password) -> None: if not self._check_7zfile(self.fp): raise Bad7zFile('not a 7z file') self.sig_header = SignatureHeader.retrieve(self.fp) @@ -370,7 +370,7 @@ def _real_get_contents(self) -> None: buffer = io.BytesIO(self.fp.read(self.sig_header.nextheadersize)) if self.sig_header.nextheadercrc != calculate_crc32(buffer.getvalue()): raise Bad7zFile('invalid header data') - header = Header.retrieve(self.fp, buffer, self.afterheader) + header = Header.retrieve(self.fp, buffer, self.afterheader, password) if header is None: return self.header = header @@ -381,6 +381,8 @@ def _real_get_contents(self) -> None: # Initialize references for convenience if hasattr(self.header, 'main_streams') and self.header.main_streams is not None: folders = self.header.main_streams.unpackinfo.folders + for folder in folders: + folder.password = password packinfo = self.header.main_streams.packinfo packsizes = packinfo.packsizes subinfo = self.header.main_streams.substreamsinfo @@ -518,7 +520,6 @@ def _is_solid(self): def _var_release(self): self._dict = None self.files = None - self.password = None self.header = None self.worker = None self.sig_header = None diff --git a/tests/test_extra_codecs.py b/tests/test_extra_codecs.py index bbccf50c..df6d7062 100644 --- a/tests/test_extra_codecs.py +++ b/tests/test_extra_codecs.py @@ -107,32 +107,20 @@ def test_zstd_decompressor(): @pytest.mark.unit def test_sevenzipcompressor_aes_lzma2(): - - class SevenZipFile: - def __init__(self, password): - self.password = password - - def get_compressor(self, filters): - return py7zr.compressor.SevenZipCompressor(filters=filters) - - def get_decompressor(self, coders, packsizes, unpacksizes): - return py7zr.compressor.SevenZipDecompressor(coders=coders, packsize=packsizes, unpacksizes=unpacksizes, - crc=None) - plain_data = b"\x00*\x1a\t'd\x19\xb08s\xca\x8b\x13 \xaf:\x1b\x8d\x97\xf8|#M\xe9\xe1W\xd4\xe4\x97BB\xd2" plain_data += plain_data + plain_data filters = [ {"id": py7zr.FILTER_LZMA2, "preset": py7zr.PRESET_DEFAULT}, {"id": py7zr.FILTER_CRYPTO_AES256_SHA256} ] - szf = SevenZipFile('secret') - compressor = szf.get_compressor(filters) + compressor = py7zr.compressor.SevenZipCompressor(filters=filters, password='secret') outdata = compressor.compress(plain_data) outdata += compressor.flush() assert len(outdata) < 96 coders = compressor.coders unpacksizes = compressor.unpacksizes - decompressor = szf.get_decompressor(coders, len(outdata), unpacksizes) + decompressor = py7zr.compressor.SevenZipDecompressor(coders=coders, packsize=len(outdata), unpacksizes=unpacksizes, + crc=None, password='secret') revert_data = decompressor.decompress(outdata) assert revert_data == plain_data @@ -146,17 +134,12 @@ def test_aes_compressor(): @pytest.mark.unit def test_sevenzipcompressor_aes_only(): - class SevenZipFile: - def __init__(self, filters, password): - self.password = password - self.compressor = py7zr.compressor.SevenZipCompressor(filters=filters) - plain_data = b"\x00*\x1a\t'd\x19\xb08s\xca\x8b\x13 \xaf:\x1b\x8d\x97\xf8|#M\xe9\xe1W\xd4\xe4\x97BB\xd2" plain_data += plain_data filters = [ {"id": py7zr.FILTER_CRYPTO_AES256_SHA256} ] - compressor = SevenZipFile(filters, 'secret').compressor + compressor = py7zr.compressor.SevenZipCompressor(filters=filters, password='secret') outdata = compressor.compress(plain_data) outdata += compressor.flush() assert len(outdata) == 64