Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 98 additions & 19 deletions cve_bin_tool/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
"""
Extraction of archives
"""
import itertools
import os
import re
import shutil
import sys
import tarfile
import tempfile
from pathlib import Path

import filetype
import zstandard
from rpmfile.cli import main as rpmextract

Expand All @@ -35,6 +36,10 @@
# Run rpmfile in a thread
rpmextract = async_wrap(rpmextract)

# Extractor dictionary keys
EXTENSIONS = "extensions"
MIMES = "mimes"


class BaseExtractor:
"""Extracts tar, rpm, etc. files"""
Expand All @@ -44,23 +49,48 @@ def __init__(self, logger=None, error_mode=ErrorMode.TruncTrace):
self.logger = logger or LOGGER.getChild(self.__class__.__name__)
self.error_mode = error_mode
self.tempdir = None
# Adding filetype LZMA (see comments on line 438)
filetype.add_type(Lzma())
self.file_extractors = {
self.extract_file_tar: {".tgz", ".tar.gz", ".tar", ".tar.xz", ".tar.bz2"},
self.extract_file_rpm: {".rpm"},
self.extract_file_deb: {".deb", ".ipk"},
self.extract_file_cab: {".cab"},
self.extract_file_apk: {".apk"},
self.extract_file_zst: {".zst"},
self.extract_file_pkg: {".pkg"},
self.extract_file_tar: {
EXTENSIONS: [
".tgz",
".tar.gz",
".tar",
".tar.xz",
".tar.bz2",
".xz",
".bz2",
".gz",
],
MIMES: [
"application/x-tar",
"appication/gzip",
],
},
self.extract_file_rpm: {EXTENSIONS: [".rpm"], MIMES: []},
self.extract_file_deb: {EXTENSIONS: [".deb", ".ipk"], MIMES: []},
self.extract_file_cab: {EXTENSIONS: [".cab"], MIMES: []},
self.extract_file_apk: {EXTENSIONS: [".apk"], MIMES: []},
self.extract_file_zst: {EXTENSIONS: [".zst"], MIMES: []},
self.extract_file_pkg: {EXTENSIONS: [".pkg"], MIMES: []},
self.extract_file_zip: {
".exe",
".zip",
".jar",
".msi",
".egg",
".whl",
".war",
".ear",
EXTENSIONS: [
".exe",
".zip",
".jar",
".msi",
".egg",
".whl",
".war",
".ear",
],
MIMES: [
"application/x-msdownload",
"application/x-7z-compressed",
"application/x-lzip",
"application/lzma",
],
},
}

Expand All @@ -72,9 +102,14 @@ def can_extract(self, filename):
return False
except PermissionError:
return False
for extension in itertools.chain(*self.file_extractors.values()):
if filename.endswith(extension):
for ext in self.file_extractors:
if Path(filename).suffix in self.file_extractors[ext][EXTENSIONS]:
return True
if os.path.isfile(filename):
guess = filetype.guess(filename)
for ext in self.file_extractors:
if guess is not None and guess.MIME in self.file_extractors[ext][MIMES]:
return True
return False

def tar_member_filter(self, members, extraction_path):
Expand Down Expand Up @@ -367,7 +402,7 @@ async def aio_extract(self, filename):
# Resolve path in case of cwd change
filename = str(filename_pathlib.resolve())
for extractor in self.file_extractors:
for extension in self.file_extractors[extractor]:
for extension in self.file_extractors[extractor][EXTENSIONS]:
if filename.endswith(extension):
extracted_path = str(
Path(self.tempdir) / f"{filename_pathlib.name}.extracted"
Expand All @@ -389,6 +424,27 @@ async def aio_extract(self, filename):
f"Extracted {filename} to {extracted_path}"
)
return extracted_path
guess = filetype.guess(filename)
if (
guess is not None
and guess.MIME in self.file_extractors[extractor][MIMES]
):
extracted_path = str(
Path(self.tempdir) / f"{filename_pathlib.name}.extracted"
)
if Path(extracted_path).exists():
await aio_rmdir(extracted_path)
await aio_makedirs(extracted_path, 0o700)
async with ChangeDirContext(extracted_path):
if await extractor(filename, extracted_path) != 0:
if self.raise_failure:
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise ExtractionFailed(filename)
else:
self.logger.warning(f"Failure extracting {filename}")
else:
self.logger.debug(f"Extracted {filename} to {extracted_path}")
return extracted_path
with ErrorHandler(mode=self.error_mode, logger=self.logger):
raise UnknownArchiveType(filename)

Expand All @@ -415,6 +471,29 @@ def __exit__(self, exc_type, exc_val, exc_tb):
shutil.rmtree(self.tempdir)


# Creating type LZMA for binary recognition and extraction because cve-bin-tool encounters extraction failure for this filetype
# Using python library filetype defined at https://github.com/h2non/filetype.py
# Following pattern of type creation according to examples in https://github.com/h2non/filetype.py/tree/master/filetype/types
# Adding type LZMA on line 54
class Lzma(filetype.Type):
"""Implements the lzma compression type matcher."""

MIME = "application/lzma"
EXTENSION = "lzma"

def __init__(self):
super().__init__(mime=Lzma.MIME, extension=Lzma.EXTENSION)

def match(self, buf):
return (
len(buf) > 3
and buf[0] == 0x5D
and buf[1] == 0x00
and buf[2] == 0x00
and buf[3] == 0x00
)


def Extractor(*args, **kwargs):
"""Provides a context which extraction is done in"""
return TempDirExtractorContext(*args, **kwargs)
1 change: 1 addition & 0 deletions requirements.csv
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ python_not_in_db,importlib_resources
vsajip_not_in_db,python-gnupg
anthonyharrison_not_in_db,lib4sbom
the_purl_authors_not_in_db,packageurl-python
h2non,filetype
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ beautifulsoup4
cvss
defusedxml
distro
filetype>=1.2.0
gsutil
importlib_metadata>=3.6; python_version < "3.10"
importlib_resources; python_version < "3.9"
Expand Down
51 changes: 38 additions & 13 deletions test/test_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import pytest
from pytest_mock import MockerFixture

from cve_bin_tool.extractor import Extractor
from cve_bin_tool.extractor import EXTENSIONS, Extractor
from cve_bin_tool.util import inpath

# Enable logging if tests are not passing to help you find errors
Expand Down Expand Up @@ -89,6 +89,9 @@ def setup_method(self):
("test.tar.bz2", "w:bz2"),
("test.tar", "w"),
("test.tar.xz", "w:xz"),
("test.xz", "w:xz"),
("test.bz2", "w:bz2"),
("test.gz", "w:gz"),
]:
tarpath = self.tempdir / filename
tar = tarfile.open(tarpath, mode=tarmode)
Expand All @@ -101,7 +104,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_tar]
return self.extractor.file_extractors[self.extractor.extract_file_tar][
EXTENSIONS
]

@pytest.mark.asyncio
async def test_extract_file_tar(self, extension_list: list[str]):
Expand Down Expand Up @@ -131,7 +136,9 @@ def setup_class(cls):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_rpm]
return self.extractor.file_extractors[self.extractor.extract_file_rpm][
EXTENSIONS
]

@pytest.mark.asyncio
async def test_extract_file_rpm(self, extension_list: list[str]):
Expand Down Expand Up @@ -162,7 +169,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_zst]
return self.extractor.file_extractors[self.extractor.extract_file_zst][
EXTENSIONS
]

@pytest.mark.asyncio
@pytest.mark.skipif(
Expand All @@ -187,7 +196,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_pkg]
return self.extractor.file_extractors[self.extractor.extract_file_pkg][
EXTENSIONS
]

@pytest.mark.parametrize(
"inpath_return_values",
Expand Down Expand Up @@ -227,7 +238,9 @@ def setup_class(cls):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_rpm]
return self.extractor.file_extractors[self.extractor.extract_file_rpm][
EXTENSIONS
]

@pytest.mark.asyncio
async def test_extract_file_rpm(self, extension_list: list[str]):
Expand All @@ -248,7 +261,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_deb]
return self.extractor.file_extractors[self.extractor.extract_file_deb][
EXTENSIONS
]

@pytest.mark.asyncio
@pytest.mark.skipif(
Expand Down Expand Up @@ -289,7 +304,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_deb]
return self.extractor.file_extractors[self.extractor.extract_file_deb][
EXTENSIONS
]

@pytest.mark.asyncio
@pytest.mark.skipif(
Expand Down Expand Up @@ -334,7 +351,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_deb]
return self.extractor.file_extractors[self.extractor.extract_file_deb][
EXTENSIONS
]

@pytest.mark.asyncio
@pytest.mark.skipif(
Expand All @@ -358,7 +377,9 @@ def setup_method(self):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_cab]
return self.extractor.file_extractors[self.extractor.extract_file_cab][
EXTENSIONS
]

@pytest.mark.asyncio
async def test_extract_file_cab(self, extension_list: list[str]):
Expand Down Expand Up @@ -392,8 +413,10 @@ class TestExtractFileZip(TestExtractorBase):
@pytest.fixture
def extension_list(self) -> list[str]:
return list(
self.extractor.file_extractors[self.extractor.extract_file_apk]
| self.extractor.file_extractors[self.extractor.extract_file_zip]
self.extractor.file_extractors[self.extractor.extract_file_apk][EXTENSIONS]
+ self.extractor.file_extractors[self.extractor.extract_file_zip][
EXTENSIONS
]
)

@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -441,7 +464,9 @@ def setup_class(cls):

@pytest.fixture
def extension_list(self) -> list[str]:
return self.extractor.file_extractors[self.extractor.extract_file_apk]
return self.extractor.file_extractors[self.extractor.extract_file_apk][
EXTENSIONS
]

@pytest.mark.asyncio
async def test_extract_file_apk(self, extension_list: list[str]):
Expand Down