Skip to content

Commit

Permalink
feat: support CycloneDX 1.4 (#87)
Browse files Browse the repository at this point in the history
* fix: unified how current version of `jake` is collected

Signed-off-by: Paul Horton <phorton@sonatype.com>

* feat: adopted latest RC for CycloneDX libraries to enable 1.4 support

Signed-off-by: Paul Horton <phorton@sonatype.com>

* fixes

Signed-off-by: Paul Horton <phorton@sonatype.com>

* chore: bumped to released versions of CyloneDX libraries`

Signed-off-by: Paul Horton <phorton@sonatype.com>
  • Loading branch information
madpah committed Jan 13, 2022
1 parent 8360771 commit 20c62fc
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 221 deletions.
13 changes: 4 additions & 9 deletions jake/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@

import argparse
from datetime import datetime
from rich.console import Console
from typing import Dict

import pkg_resources
from pyfiglet import figlet_format
from rich.console import Console

from .command import BaseCommand
from .command import BaseCommand, _jake_version
from .command.iq import IqCommand
from .command.oss import OssCommand
from .command.sbom import SbomCommand
Expand Down Expand Up @@ -86,7 +85,7 @@ def _build_arg_parser(self):
# Add global options
self._arg_parser.add_argument('-v', '--version', help='show which version of jake you are running',
action='version',
version=f'jake {JakeCmd._get_jake_version()}')
version=f'jake {_jake_version}')
self._arg_parser.add_argument('-w', '--warn-only', action='store_true', dest='warn_only',
help='prevents exit with non-zero code when issues have been detected')
self._arg_parser.add_argument('-X', action='store_true', help='enable debug output', dest='debug_enabled')
Expand All @@ -99,15 +98,11 @@ def _debug_message(self, message: str):
if self._DEBUG_ENABLED:
print('[DEBUG] - {} - {}'.format(datetime.now(), message))

@staticmethod
def _get_jake_version():
return pkg_resources.get_distribution('jake').version

def _print_jake_header(self):
""" Prints the banner, most of the user facing commands start with this """
self._console.print(figlet_format('Jake', font='isometric4'), style='dark_green')
self._console.print(figlet_format('..the snake..', font='invita'), style='dark_green')
print("Jake Version: {}".format(JakeCmd._get_jake_version()))
print("Jake Version: {}".format(_jake_version))
print('Put your Python dependencies in a chokehold')
print('')

Expand Down
12 changes: 12 additions & 0 deletions jake/command/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,19 @@
#

import argparse
import sys
from abc import ABC, abstractmethod
from typing import Optional

if sys.version_info >= (3, 8):
from importlib.metadata import version as meta_version
else:
from importlib_metadata import version as meta_version

try:
_jake_version: Optional[str] = str(meta_version('jake')) # type: ignore[no-untyped-call]
except Exception:
_jake_version = 'DEVELOPMENT'


class BaseCommand(ABC):
Expand Down
22 changes: 10 additions & 12 deletions jake/command/iq.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,18 @@

import argparse
import logging
import pkg_resources
import requests

from polling2 import poll_decorator
from requests.auth import HTTPBasicAuth
from rich.progress import Progress
from typing import Union
from urllib.parse import urlparse

import requests
from cyclonedx.model.bom import Bom
from cyclonedx.parser.environment import EnvironmentParser
from cyclonedx.output import get_instance
from cyclonedx_py.parser.environment import EnvironmentParser
from polling2 import poll_decorator
from requests.auth import HTTPBasicAuth
from rich.progress import Progress

from . import BaseCommand
from . import BaseCommand, _jake_version


class IqCommand(BaseCommand):
Expand All @@ -51,7 +49,7 @@ class IqServerApi:
_auth: HTTPBasicAuth

_default_headers = {
'User-Agent': 'jake/{}'.format(pkg_resources.get_distribution('jake').version)
'User-Agent': 'jake/{}'.format(_jake_version)
}

_max_wait_in_seconds: int = 300
Expand Down Expand Up @@ -182,13 +180,13 @@ def handle_args(self) -> int:

with Progress() as progress:
task_validate_iq = progress.add_task(
description="[yellow]Checking out your Nexus IQ Server", start=False, total=10
description="[yellow]Checking out your Nexus IQ Server", start=True, total=10
)
task_parser = progress.add_task(
description="[yellow]Collecting packages in your Python Environment", start=False, total=10
description="[yellow]Collecting packages in your Python Environment", start=True, total=10
)
task_query_iq = progress.add_task(
description="[yellow]Submitting to Nexus Lifecycle for Policy Evaluation", start=False, total=10
description="[yellow]Submitting to Nexus Lifecycle for Policy Evaluation", start=True, total=10
)

# task_validate_iq
Expand Down
180 changes: 120 additions & 60 deletions jake/command/oss.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,18 @@

import argparse
import os
from typing import List
from decimal import Decimal
from typing import List, Optional

from cyclonedx.model import XsUri
from cyclonedx.model.bom import Bom
from cyclonedx.model.component import Component
from cyclonedx.model.vulnerability import Vulnerability as CycloneDxVulnerability, VulnerabilityRating, \
VulnerabilitySourceType
from cyclonedx.model.vulnerability import Vulnerability, VulnerabilityRating, VulnerabilityReference, \
VulnerabilityScoreSource, VulnerabilitySeverity, VulnerabilitySource
from cyclonedx.output import get_instance, OutputFormat, SchemaVersion
from cyclonedx.parser.environment import EnvironmentParser
from ossindex.model import OssIndexComponent, Vulnerability
from cyclonedx_py.parser.environment import EnvironmentParser
from ossindex.model import OssIndexComponent
from ossindex.ossindex import OssIndex
from packageurl import PackageURL
from rich.console import Console
from rich.panel import Panel
from rich.progress import Progress
Expand All @@ -48,13 +49,13 @@ def handle_args(self) -> int:

with Progress() as progress:
task_parser = progress.add_task(
description="[yellow]Collecting packages in your Python Environment", start=False, total=10
description="[yellow]Collecting packages in your Python Environment", start=True, total=10
)
task_query_ossi = progress.add_task(
description="[yellow]Querying OSS Index for details on your packages", start=False, total=10
description="[yellow]Querying OSS Index for details on your packages", start=True, total=10
)
task_sanity_checking = progress.add_task(
description="[cyan]Sanity checking...", start=False, total=10
description="[cyan]Sanity checking...", start=True, total=10
)

parser = EnvironmentParser()
Expand All @@ -74,7 +75,7 @@ def handle_args(self) -> int:
progress.update(task_query_ossi, completed=3, description='Querying OSS Index for details on your packages')

oss_index_results = oss.get_component_report(
packages=list(map(lambda c: c.to_package_url(), parser.get_components()))
packages=list(map(lambda c: c.purl if c.purl else None, parser.get_components()))
)
progress.update(
task_query_ossi, completed=10,
Expand All @@ -93,12 +94,67 @@ def handle_args(self) -> int:
description="🐍 [green]Sane number of results from OSS Index"
)

task_munching_data = progress.add_task(
description="🐍 [green]Munching & crunching data...", start=True, total=len(parser.get_components())
)

components: List[Component] = []
for component in parser.get_components():
oss_index_component: OssIndexComponent = list(filter(
lambda oic: oic.get_package_url().to_string() == component.purl.to_string(), oss_index_results
)).pop()

if oss_index_component.has_known_vulnerabilities():
for oic_vulnerability in oss_index_component.get_vulnerabilities():

ratings: List[VulnerabilityRating] = []
if oic_vulnerability.get_cvss_score():
ratings.append(
VulnerabilityRating(
source=VulnerabilitySource(
name='Oss Index', url=XsUri(oic_vulnerability.get_oss_index_reference_url())
),
score=Decimal(
oic_vulnerability.get_cvss_score()
) if oic_vulnerability.get_cvss_score() else None,
severity=VulnerabilitySeverity.get_from_cvss_scores(
(oic_vulnerability.get_cvss_score(),)
) if oic_vulnerability.get_cvss_score() else None,
method=VulnerabilityScoreSource.get_from_vector(
vector=oic_vulnerability.get_cvss_vector()
) if oic_vulnerability.get_cvss_vector() else None,
vector=oic_vulnerability.get_cvss_vector()
)
)

vulnerability: Vulnerability = Vulnerability(
bom_ref=str(oic_vulnerability.get_id()) if oic_vulnerability.get_id() else None,
id=str(oic_vulnerability.get_cve()) if oic_vulnerability.get_cve() else None,
source=VulnerabilitySource(
name='Oss Index', url=XsUri(oic_vulnerability.get_oss_index_reference_url())
),
cwes=[int(oic_vulnerability.get_cwe())] if oic_vulnerability.get_cwe() else None,
description=oic_vulnerability.get_title(),
detail=oic_vulnerability.get_description(),
ratings=ratings
)
if oic_vulnerability.get_external_reference_urls():
for ext_ref_url in oic_vulnerability.get_external_reference_urls():
vulnerability.add_reference(VulnerabilityReference(
source=VulnerabilitySource(url=XsUri(ext_ref_url))
))

component.add_vulnerability(vulnerability=vulnerability)

components.append(component)
progress.update(task_munching_data, advance=1)

print('')
self._print_oss_index_report(oss_index_results=oss_index_results)
self._print_oss_index_report(components=components)

if self._arguments.oss_output_file:
cyclonedx_output = get_instance(
bom=self._build_bom(oss_index_results=oss_index_results),
bom=OssCommand._build_bom(components=components),
output_format=OutputFormat[str(self._arguments.oss_output_format).upper()],
schema_version=SchemaVersion['V{}'.format(
str(self._arguments.oss_schema_version).replace('.', '_')
Expand Down Expand Up @@ -131,57 +187,42 @@ def setup_argument_parser(self, subparsers: argparse._SubParsersAction):
parser.add_argument('--output-format', help='SBOM output format (default = xml)', choices={'json', 'xml'},
default='xml', dest='oss_output_format')
parser.add_argument('--schema-version', help='CycloneDX schema version to use (default = 1.3)',
choices={'1.3', '1.2', '1.1', '1.0'}, default='1.3',
choices={'1.4', '1.3', '1.2', '1.1', '1.0'}, default='1.3',
dest='oss_schema_version')

def _build_bom(self, oss_index_results: List[OssIndexComponent]) -> Bom:
@staticmethod
def _build_bom(components: List[Component]) -> Bom:
bom = Bom()
oic: OssIndexComponent = None
for oic in oss_index_results:
purl: PackageURL = oic.get_package_url()
component: Component = Component(name=purl.name, version=purl.version, qualifiers=purl.qualifiers)
if oic.has_known_vulnerabilities():
for oss_vuln in oic.get_vulnerabilities():
component.add_vulnerability(CycloneDxVulnerability(
id=oss_vuln.get_display_name(), source_name='OSSINDEX',
source_url=oss_vuln.get_oss_index_reference_url(), ratings=[
VulnerabilityRating(
score_base=oss_vuln.get_cvss_score(), vector=oss_vuln.get_cvss_vector(),
method=VulnerabilitySourceType.get_from_vector(vector=oss_vuln.get_cvss_vector())
)
],
description=oss_vuln.get_description(),
cwes=[oss_vuln.get_cwe().replace('CWE-')] if oss_vuln.get_cwe() else [],
advisories=oss_vuln.get_external_reference_urls()
))
bom.add_component(component=component)

bom.components = components
return bom

def _print_oss_index_report(self, oss_index_results: List[OssIndexComponent]):
def _print_oss_index_report(self, components: List[Component]):
total_vulnerabilities = 0
total_packages = len(oss_index_results)
total_packages = len(components)

oic: OssIndexComponent
v: Vulnerability
component: Component
i: int = 1
for oic in oss_index_results:
if oic.has_known_vulnerabilities():
for component in components:
if component.has_vulnerabilities():
self._console.print(
f"[{i}/{total_packages}] - {oic.get_coordinates()} [VULNERABLE]",
style=self._get_color_for_cvss_score(cvss_score=oic.get_max_cvss_score())
f"[{i}/{total_packages}] - {component.name}@{component.version} [VULNERABLE]",
style=OssCommand._get_color_for_cvss_score(
cvss_score=OssCommand._get_max_cvss_score(component=component)
)
)

total_vulnerabilities += len(oic.get_vulnerabilities())
if oic.get_vulnerabilities():
tree = Tree(f'Vulnerability Details for [bright_white]{oic.get_coordinates()}[white]')
for v in oic.get_vulnerabilities():
self._print_vulnerability(tree=tree, v=v)
total_vulnerabilities += len(component.get_vulnerabilities())
if component.get_vulnerabilities():
tree = Tree(f'Vulnerability Details for [bright_white]{component.name}@{component.version}[white]')
for v in component.get_vulnerabilities():
OssCommand._print_vulnerability(tree=tree, v=v)
self._console.print(tree)
else:
self._console.print(
f"[{i}/{total_packages}] - {oic.get_coordinates()}",
style=self._get_color_for_cvss_score(cvss_score=oic.get_max_cvss_score())
f"[{i}/{total_packages}] - {component.name}@{component.version}",
style=OssCommand._get_color_for_cvss_score(
cvss_score=OssCommand._get_max_cvss_score(component=component)
)
)

i += 1
Expand All @@ -191,32 +232,51 @@ def _print_oss_index_report(self, oss_index_results: List[OssIndexComponent]):
table = Table(title='Summary')
table.add_column("Audited Dependencies", justify="left", no_wrap=True)
table.add_column("Vulnerabilities Found", justify="left", no_wrap=True)
table.add_row('{}'.format(len(oss_index_results)), f'{total_vulnerabilities}')
table.add_row('{}'.format(len(components)), f'{total_vulnerabilities}')

self._console.print(table)

@staticmethod
def _get_max_cvss_score_for_vulnerability(vulnerability: Vulnerability) -> float:
max_score: float = 0.0
for rating in vulnerability.ratings:
if float(rating.score) > max_score:
max_score = float(rating.score)
return max_score

@staticmethod
def _get_max_cvss_score(component: Component) -> Optional[float]:
max_cvss_score: float = 0.0
for v in component.get_vulnerabilities():
max_cvss_score = OssCommand._get_max_cvss_score_for_vulnerability(vulnerability=v)
return max_cvss_score

@staticmethod
def _print_vulnerability(tree: Tree, v: Vulnerability) -> None:
b = tree.add(
f':warning: [bright_red] ID: {v.get_id()}'
f':warning: [bright_red] ID: {v.id}'
)

severity_color = OssCommand._get_color_for_cvss_score(v.get_cvss_score())
severity_color = OssCommand._get_color_for_cvss_score(
OssCommand._get_max_cvss_score_for_vulnerability(vulnerability=v)
)

content = f"""
[bright_white]{v.get_description()}
[bright_white]{v.description}
{v.detail}
Details:
- CVSS Score: {v.get_cvss_score()} - [{severity_color}]{OssCommand._get_severity_for_cvss_score(v.get_cvss_score())}
[bright_white]- CVSS Vector: {v.get_cvss_vector() if v.get_cvss_vector() else 'Unknown'}
- CWE: {v.get_cwe() if v.get_cwe() else 'Unknown'}
Ratings:
{os.linesep.join([f' - [{severity_color}]{rating.score} {rating.severity.name}: '
f'Vector: {rating.vector if rating.vector else "Unknown"}, '
f'CWEs: {",".join(v.cwes) if v.cwes else "Not Recorded"}[bright_white]' for rating in v.ratings])}
References:
- {v.get_oss_index_reference_url()}
{os.linesep.join([f' - {url}' for url in v.get_external_reference_urls()])}
{os.linesep.join([f' - {reference.source.name if reference.source.name else ""} [Ref: {reference.id}]'
f' URL: {reference.source.url if reference.source.url else "None"}'
for reference in v.references])}
"""

b.add(Panel(content, title=f'[bright_white]{v.get_cve()}', title_align="left"))
b.add(Panel(content, title=f'[bright_white]{v.id}', title_align="left"))

@staticmethod
def _get_color_for_cvss_score(cvss_score: float = 0.0) -> str:
Expand Down
Loading

0 comments on commit 20c62fc

Please sign in to comment.