Skip to content

Commit

Permalink
refactor: modified language parsers for purl2cpe support (#4188)
Browse files Browse the repository at this point in the history
changed all the language parsers according to the purl2cpe database
and made the database query universal for all parsers

removed 'slf4j-simple' and 'slf4j-api' products from fail_pom.xml
file as they can be found now with purl2cpe

Signed-off-by: Meet Soni <meetsoni3017@gmail.com>
  • Loading branch information
inosmeet committed Jun 18, 2024
1 parent a34c43c commit c794765
Show file tree
Hide file tree
Showing 14 changed files with 137 additions and 90 deletions.
70 changes: 41 additions & 29 deletions cve_bin_tool/parsers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def find_vendor(self, product, version):
)
return vendorlist

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generate purl string based on various components."""
purl = PackageURL(
type=self.purl_pkg_type,
Expand All @@ -104,36 +104,48 @@ def find_vendor_from_purl(self, purl, ver) -> Tuple[List[ScanInfo], bool]:
It then decodes the CPE data to extract vendor, product, and version information. If the version matches the provided
version, it constructs a ScanInfo object for each matching entry and returns a list of these objects.
"""

query = "SELECT cpe from purl2cpe WHERE purl=?"
cursor = self.db_open_and_get_cursor()
cursor.execute(query, [str(purl)])
cpeList = cursor.fetchall()
vendorlist: list[ScanInfo] = []
vendors = set()

if cpeList != []:
for item in cpeList:
vendor, product, version = self.decode_cpe23(str(item))
vendors.add((vendor, product))
else:
return vendorlist, False
purl_with_ver = f"{str(purl)}@{ver}"
for vendor, product in vendors:
vendorlist.append(
ScanInfo(
ProductInfo(
vendor,
product,
ver,
"/usr/local/bin/product",
purl=purl_with_ver,
),
self.filename,
try:
purl = purl.to_dict()
param1 = f"pkg:{purl['type']}/{purl['name']}"
param2 = f"pkg:{purl['type']}/%/{purl['name']}"

query = """
SELECT cpe from purl2cpe WHERE purl LIKE ?
UNION
SELECT cpe from purl2cpe WHERE purl LIKE ?
"""
cursor = self.db_open_and_get_cursor()
cursor.execute(query, (param1, param2))
cpeList = cursor.fetchall()
vendorlist: list[ScanInfo] = []
vendors = set()

if cpeList != []:
for item in cpeList:
vendor, _, _ = self.decode_cpe23(str(item))
vendors.add((vendor, purl["name"]))
else:
return vendorlist, False

purl_with_ver = f"{str(purl)}@{ver}"
for vendor, product in vendors:
vendorlist.append(
ScanInfo(
ProductInfo(
vendor,
product,
ver,
"/usr/local/bin/product",
purl_with_ver,
),
self.filename,
)
)
)

return vendorlist, True
return vendorlist, True
except Exception as e:
self.logger.error(f"Error occurred: {e}")
return [], False

def db_open_and_get_cursor(self) -> sqlite3.Cursor:
"""Opens connection to sqlite database, returns cursor object."""
Expand Down
10 changes: 6 additions & 4 deletions cve_bin_tool/parsers/dart.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "pub"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""
Generates PURL after normalizing all components.
pubspec: https://dart.dev/tools/pub/pubspec#name
purl-spec for pub: https://github.com/package-url/purl-spec/blob/master/PURL-TYPES.rst#pub
"""
# Normalize product and vendor for Dart packages
# Normalize product for Dart packages
product = re.sub(r"[^a-zA-Z0-9_]", "", product).lower()
vendor = "UNKNOWN" # The vendor is not explicitly defined for pub packages
if not product:
return None

Expand All @@ -50,7 +49,10 @@ def run_checker(self, filename):
for package_name, package_detail in data.get("packages", {}).items():
product = package_name
version = package_detail.get("version").replace('"', "")
vendor = self.find_vendor(product, version)
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)
if not result:
vendor = self.find_vendor(product, version)
if vendor:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
11 changes: 6 additions & 5 deletions cve_bin_tool/parsers/go.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,13 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "golang"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""

product = re.sub(r"[^a-zA-Z0-9_-]", "", product)
vendor = re.sub(r"^[^a-zA-Z_]|[^a-zA-Z0-9_-]", "", vendor)

if not re.match(r"^[a-zA-Z0-9_-]", product):
return
if vendor == "":
vendor = "UNKNOWN"

purl = super().generate_purl(
product,
Expand Down Expand Up @@ -73,7 +70,11 @@ def run_checker(self, filename):
if len(parts) >= 2:
product = line.split(" ")[0].split("/")[-1]
version = line.split(" ")[1][1:].split("-")[0].split("+")[0]
vendors = self.find_vendor(product, version)
purl = self.generate_purl(product)
vendors, result = self.find_vendor_from_purl(purl, version)

if not result:
vendors = self.find_vendor(product, version)
if vendors is not None:
yield from vendors
self.logger.debug(f"Done scanning file: {self.filename}")
20 changes: 15 additions & 5 deletions cve_bin_tool/parsers/java.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# NOTE: DONE
# Copyright (C) 2022 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later
"""Script containing all functionalities relating to parsing of Java-based files."""
Expand All @@ -18,11 +19,10 @@ def __init__(self, cve_db, logger, validate=True):
self.validate = validate
self.purl_pkg_type = "maven"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components of a Maven package."""
# Normalize product and vendor
# Normalize product
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()
vendor = re.sub(r"[^a-zA-Z0-9._-]", "", vendor).lower() if vendor else "UNKNOWN"

if not product:
return None
Expand Down Expand Up @@ -97,7 +97,10 @@ def run_checker(self, filename):
if product is None and parent is not None:
product = parent.find(schema + "artifactId").text
if product is not None and version is not None:
product_info = self.find_vendor(product, version)
purl = self.generate_purl(product)
product_info, result = self.find_vendor_from_purl(purl, version)
if not result:
product_info = self.find_vendor(product, version)
if product_info is not None:
yield from product_info

Expand Down Expand Up @@ -130,7 +133,14 @@ def run_checker(self, filename):
self.logger.debug(f"{file_path} {product.text} {version}")
if version[0].isdigit():
# Valid version identifier
product_info = self.find_vendor(product.text, version)
purl = self.generate_purl(product.text)
product_info, result = self.find_vendor_from_purl(
purl, version
)
if not result:
product_info = self.find_vendor(
product.text, version
)
if product_info is not None:
yield from product_info
self.logger.debug(f"Done scanning file: {filename}")
15 changes: 11 additions & 4 deletions cve_bin_tool/parsers/javascript.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "npm"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()
vendor = "UNKNOWN" # Typically, the vendor is not explicitly defined for npm packages

if not product:
return None
Expand All @@ -44,7 +43,11 @@ def run_checker(self, filename):
if "name" in data and "version" in data:
product = data["name"]
version = data["version"]
vendor = self.find_vendor(product, version)
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)
else:
vendor = None
if vendor is not None:
Expand Down Expand Up @@ -93,7 +96,11 @@ def run_checker(self, filename):
product_version_mapping.append((product, version))

for product, version in product_version_mapping:
vendor = self.find_vendor(product, version)
purl = self.generate_purl(product, "")
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)
if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
11 changes: 8 additions & 3 deletions cve_bin_tool/parsers/perl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "cpan"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
# Normalize product and vendor for Perl packages
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()
vendor = "UNKNOWN" # Typically, the vendor is not explicitly defined for CPAN packages

if not product:
return None
Expand Down Expand Up @@ -53,7 +52,13 @@ def run_checker(self, filename):

# Print the extracted dependencies
for dependency in dependencies:
vendor = self.find_vendor(dependency[0], dependency[1])
product = dependency[0]
version = dependency[1]
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)
if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
14 changes: 10 additions & 4 deletions cve_bin_tool/parsers/php.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# NOTE: remains not complete


# Copyright (C) 2024 Intel Corporation
# SPDX-License-Identifier: GPL-3.0-or-later
"""Python script containing all functionalities related to parsing of php's composer.lock files."""
Expand All @@ -19,12 +22,11 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "composer"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
vendor = re.sub(r"[^a-zA-Z0-9._-]", "", vendor).lower()
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()

if not vendor or not product:
if not product:
return None

purl = super().generate_purl(
Expand All @@ -51,7 +53,11 @@ def run_checker(self, filename):
version = version[1:]
if "dev" in version:
continue
vendor = self.find_vendor(product, version)
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)
if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
8 changes: 4 additions & 4 deletions cve_bin_tool/parsers/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "pypi"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()

Expand Down Expand Up @@ -96,7 +96,7 @@ def run_checker(self, filename):
for line in lines["install"]:
product = line["metadata"]["name"]
version = line["metadata"]["version"]
purl = self.generate_purl(product, "")
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
Expand All @@ -119,7 +119,7 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "pypi"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""
product = re.sub(r"[^a-zA-Z0-9._-]", "", product).lower()

Expand Down Expand Up @@ -147,7 +147,7 @@ def run_checker(self, filename):
try:
product = search(compile(r"^Name: (.+)$", MULTILINE), lines).group(1)
version = search(compile(r"^Version: (.+)$", MULTILINE), lines).group(1)
purl = self.generate_purl(product, "")
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if vendor is not None:
Expand Down
10 changes: 7 additions & 3 deletions cve_bin_tool/parsers/r.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "cran"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""

product = re.sub(r"[^a-zA-Z0-9.-]", "", product)
vendor = "UNKNOWN"

if not re.match(r"^[a-zA-Z0-9_-]", product):
return
Expand All @@ -57,7 +56,12 @@ def run_checker(self, filename):
for package in content["Packages"]:
product = content["Packages"][package]["Package"]
version = content["Packages"][package]["Version"]
vendor = self.find_vendor(product, version)
purl = self.generate_purl(product)
vendor, result = self.find_vendor_from_purl(purl, version)

if not result:
vendor = self.find_vendor(product, version)

if vendor is not None:
yield from vendor
self.logger.debug(f"Done scanning file: {self.filename}")
15 changes: 9 additions & 6 deletions cve_bin_tool/parsers/ruby.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Copyright (C) 2022 Intel Corporation
# Copyright (C) 2024 Intel Corporation


# SPDX-License-Identifier: GPL-3.0-or-later

import re
Expand Down Expand Up @@ -29,16 +31,13 @@ def __init__(self, cve_db, logger):
super().__init__(cve_db, logger)
self.purl_pkg_type = "gem"

def generate_purl(self, product, vendor, qualifier={}, subpath=None):
def generate_purl(self, product, vendor="", qualifier={}, subpath=None):
"""Generates PURL after normalizing all components."""

product = re.sub(r"^[^a-z]|[^a-z0-9_-]", "", product)
vendor = re.sub(r"^[^a-z]|[^a-z0-9_-]", "", vendor)

if not re.match(r"^[a-z]|[a-z0-9_-]", product):
return
if vendor == "":
vendor = "UNKNOWN"

purl = super().generate_purl(
product,
Expand Down Expand Up @@ -69,7 +68,11 @@ def run_checker(self, filename):
):
product = line.strip().split()[0]
version = line.strip().split("(")[1][:-1]
vendors = self.find_vendor(product, version)
purl = self.generate_purl(product)
vendors, result = self.find_vendor_from_purl(purl, version)

if not result:
vendors = self.find_vendor(product, version)
if vendors is not None:
yield from vendors
self.logger.debug(f"Done scanning file: {self.filename}")
Loading

0 comments on commit c794765

Please sign in to comment.