From a73e8261b6a1e2eb6f1f0546677e4d131d9aeb41 Mon Sep 17 00:00:00 2001 From: Stephen Groat Date: Thu, 15 Jun 2023 12:16:03 -0700 Subject: [PATCH] feat: add extraction enhancements to include binary identification --- cve_bin_tool/extractor.py | 117 +++++++++++++++++++++++++++++++------- requirements.csv | 1 + requirements.txt | 1 + test/test_extractor.py | 51 ++++++++++++----- 4 files changed, 138 insertions(+), 32 deletions(-) diff --git a/cve_bin_tool/extractor.py b/cve_bin_tool/extractor.py index e9211ee70c..5555f1fd48 100644 --- a/cve_bin_tool/extractor.py +++ b/cve_bin_tool/extractor.py @@ -4,13 +4,14 @@ """ Extraction of archives """ -import itertools +import os import re import shutil import sys import tempfile from pathlib import Path +import filetype import zstandard from rpmfile.cli import main as rpmextract @@ -34,6 +35,10 @@ # Run rpmfile in a thread rpmextract = async_wrap(rpmextract) +# Extractor dictionary keys +EXTENSIONS = "extensions" +MIMES = "mimes" + class BaseExtractor: """Extracts tar, rpm, etc. files""" @@ -43,23 +48,48 @@ def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace): self.logger = logger or LOGGER.getChild(self.__class__.__name__) self.error_mode = error_mode self.tempdir = None + # Adding filetype LZMA (see comments on line 438) + filetype.add_type(Lzma()) self.file_extractors = { - self.extract_file_tar: {".tgz", ".tar.gz", ".tar", ".tar.xz", ".tar.bz2"}, - self.extract_file_rpm: {".rpm"}, - self.extract_file_deb: {".deb", ".ipk"}, - self.extract_file_cab: {".cab"}, - self.extract_file_apk: {".apk"}, - self.extract_file_zst: {".zst"}, - self.extract_file_pkg: {".pkg"}, + self.extract_file_tar: { + EXTENSIONS: [ + ".tgz", + ".tar.gz", + ".tar", + ".tar.xz", + ".tar.bz2", + ".xz", + ".bz2", + ".gz", + ], + MIMES: [ + "application/x-tar", + "appication/gzip", + ], + }, + self.extract_file_rpm: {EXTENSIONS: [".rpm"], MIMES: []}, + self.extract_file_deb: {EXTENSIONS: [".deb", ".ipk"], MIMES: []}, + self.extract_file_cab: {EXTENSIONS: [".cab"], MIMES: []}, + self.extract_file_apk: {EXTENSIONS: [".apk"], MIMES: []}, + self.extract_file_zst: {EXTENSIONS: [".zst"], MIMES: []}, + self.extract_file_pkg: {EXTENSIONS: [".pkg"], MIMES: []}, self.extract_file_zip: { - ".exe", - ".zip", - ".jar", - ".msi", - ".egg", - ".whl", - ".war", - ".ear", + EXTENSIONS: [ + ".exe", + ".zip", + ".jar", + ".msi", + ".egg", + ".whl", + ".war", + ".ear", + ], + MIMES: [ + "application/x-msdownload", + "application/x-7z-compressed", + "application/x-lzip", + "application/lzma", + ], }, } @@ -68,9 +98,14 @@ def can_extract(self, filename): # Do not try to extract symlinks if Path(filename).is_symlink(): return False - for extension in itertools.chain(*self.file_extractors.values()): - if filename.endswith(extension): + for ext in self.file_extractors: + if Path(filename).suffix in self.file_extractors[ext][EXTENSIONS]: return True + if os.path.isfile(filename): + guess = filetype.guess(filename) + for ext in self.file_extractors: + if guess is not None and guess.MIME in self.file_extractors[ext][MIMES]: + return True return False @staticmethod @@ -332,7 +367,7 @@ async def aio_extract(self, filename): # Resolve path in case of cwd change filename = str(filename_pathlib.resolve()) for extractor in self.file_extractors: - for extension in self.file_extractors[extractor]: + for extension in self.file_extractors[extractor][EXTENSIONS]: if filename.endswith(extension): extracted_path = str( Path(self.tempdir) / f"{filename_pathlib.name}.extracted" @@ -354,6 +389,27 @@ async def aio_extract(self, filename): f"Extracted {filename} to {extracted_path}" ) return extracted_path + guess = filetype.guess(filename) + if ( + guess is not None + and guess.MIME in self.file_extractors[extractor][MIMES] + ): + extracted_path = str( + Path(self.tempdir) / f"{filename_pathlib.name}.extracted" + ) + if Path(extracted_path).exists(): + await aio_rmdir(extracted_path) + await aio_makedirs(extracted_path, 0o700) + async with ChangeDirContext(extracted_path): + if await extractor(filename, extracted_path) != 0: + if self.raise_failure: + with ErrorHandler(mode=self.error_mode, logger=self.logger): + raise ExtractionFailed(filename) + else: + self.logger.warning(f"Failure extracting {filename}") + else: + self.logger.debug(f"Extracted {filename} to {extracted_path}") + return extracted_path with ErrorHandler(mode=self.error_mode, logger=self.logger): raise UnknownArchiveType(filename) @@ -380,6 +436,29 @@ def __exit__(self, exc_type, exc_val, exc_tb): shutil.rmtree(self.tempdir) +# Creating type LZMA for binary recognition and extraction because cve-bin-tool encounters extraction failure for this filetype +# Using python library filetype defined at https://github.com/h2non/filetype.py +# Following pattern of type creation according to examples in https://github.com/h2non/filetype.py/tree/master/filetype/types +# Adding type LZMA on line 54 +class Lzma(filetype.Type): + """Implements the lzma compression type matcher.""" + + MIME = "application/lzma" + EXTENSION = "lzma" + + def __init__(self): + super().__init__(mime=Lzma.MIME, extension=Lzma.EXTENSION) + + def match(self, buf): + return ( + len(buf) > 3 + and buf[0] == 0x5D + and buf[1] == 0x00 + and buf[2] == 0x00 + and buf[3] == 0x00 + ) + + def Extractor(*args, **kwargs): """Provides a context which extraction is done in""" return TempDirExtractorContext(*args, **kwargs) diff --git a/requirements.csv b/requirements.csv index 6bb20b3008..3ac40cc600 100644 --- a/requirements.csv +++ b/requirements.csv @@ -22,3 +22,4 @@ python_not_in_db,importlib_resources vsajip_not_in_db,python-gnupg anthonyharrison_not_in_db,lib4sbom the_purl_authors_not_in_db,packageurl-python +h2non,filetype diff --git a/requirements.txt b/requirements.txt index bf7c07c84e..bfe4d6a9e1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,6 +3,7 @@ beautifulsoup4 cvss defusedxml distro +filetype>=1.2.0 gsutil importlib_metadata>=3.6; python_version < "3.10" importlib_resources; python_version < "3.9" diff --git a/test/test_extractor.py b/test/test_extractor.py index 19f5ed69ec..6fd69b5d42 100644 --- a/test/test_extractor.py +++ b/test/test_extractor.py @@ -29,7 +29,7 @@ import pytest from pytest_mock import MockerFixture -from cve_bin_tool.extractor import Extractor +from cve_bin_tool.extractor import EXTENSIONS, Extractor from cve_bin_tool.util import inpath # Enable logging if tests are not passing to help you find errors @@ -89,6 +89,9 @@ def setup_method(self): ("test.tar.bz2", "w:bz2"), ("test.tar", "w"), ("test.tar.xz", "w:xz"), + ("test.xz", "w:xz"), + ("test.bz2", "w:bz2"), + ("test.gz", "w:gz"), ]: tarpath = self.tempdir / filename tar = tarfile.open(tarpath, mode=tarmode) @@ -101,7 +104,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_tar] + return self.extractor.file_extractors[self.extractor.extract_file_tar][ + EXTENSIONS + ] @pytest.mark.asyncio async def test_extract_file_tar(self, extension_list: list[str]): @@ -131,7 +136,9 @@ def setup_class(cls): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_rpm] + return self.extractor.file_extractors[self.extractor.extract_file_rpm][ + EXTENSIONS + ] @pytest.mark.asyncio async def test_extract_file_rpm(self, extension_list: list[str]): @@ -162,7 +169,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_zst] + return self.extractor.file_extractors[self.extractor.extract_file_zst][ + EXTENSIONS + ] @pytest.mark.asyncio @pytest.mark.skipif( @@ -187,7 +196,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_pkg] + return self.extractor.file_extractors[self.extractor.extract_file_pkg][ + EXTENSIONS + ] @pytest.mark.parametrize( "inpath_return_values", @@ -227,7 +238,9 @@ def setup_class(cls): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_rpm] + return self.extractor.file_extractors[self.extractor.extract_file_rpm][ + EXTENSIONS + ] @pytest.mark.asyncio async def test_extract_file_rpm(self, extension_list: list[str]): @@ -248,7 +261,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_deb] + return self.extractor.file_extractors[self.extractor.extract_file_deb][ + EXTENSIONS + ] @pytest.mark.asyncio @pytest.mark.skipif( @@ -289,7 +304,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_deb] + return self.extractor.file_extractors[self.extractor.extract_file_deb][ + EXTENSIONS + ] @pytest.mark.asyncio @pytest.mark.skipif( @@ -334,7 +351,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_deb] + return self.extractor.file_extractors[self.extractor.extract_file_deb][ + EXTENSIONS + ] @pytest.mark.asyncio @pytest.mark.skipif( @@ -358,7 +377,9 @@ def setup_method(self): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_cab] + return self.extractor.file_extractors[self.extractor.extract_file_cab][ + EXTENSIONS + ] @pytest.mark.asyncio async def test_extract_file_cab(self, extension_list: list[str]): @@ -392,8 +413,10 @@ class TestExtractFileZip(TestExtractorBase): @pytest.fixture def extension_list(self) -> list[str]: return list( - self.extractor.file_extractors[self.extractor.extract_file_apk] - | self.extractor.file_extractors[self.extractor.extract_file_zip] + self.extractor.file_extractors[self.extractor.extract_file_apk][EXTENSIONS] + + self.extractor.file_extractors[self.extractor.extract_file_zip][ + EXTENSIONS + ] ) @pytest.fixture(autouse=True) @@ -441,7 +464,9 @@ def setup_class(cls): @pytest.fixture def extension_list(self) -> list[str]: - return self.extractor.file_extractors[self.extractor.extract_file_apk] + return self.extractor.file_extractors[self.extractor.extract_file_apk][ + EXTENSIONS + ] @pytest.mark.asyncio async def test_extract_file_apk(self, extension_list: list[str]):