diff --git a/vexy/client.py b/vexy/client.py index 4216cb8..e1986d7 100644 --- a/vexy/client.py +++ b/vexy/client.py @@ -20,6 +20,7 @@ import argparse import enum +import json import sys from datetime import datetime from importlib import import_module @@ -28,8 +29,10 @@ from string import printable from typing import Dict, Optional, Set, cast from urllib.parse import quote +from xml.etree import ElementTree import yaml +from cyclonedx.exception import CycloneDxException from cyclonedx.model import ExternalReference, ExternalReferenceType, Tool, XsUri from cyclonedx.model.bom import Bom from cyclonedx.output import BaseOutput, OutputFormat, SchemaVersion @@ -69,31 +72,31 @@ class _CLI_OUTPUT_FORMAT(enum.Enum): ThisTool = Tool(vendor='Vexy', name='vexy', version=__ThisToolVersion or 'UNKNOWN') ThisTool.external_references.update([ ExternalReference( - reference_type=ExternalReferenceType.BUILD_SYSTEM, + type_=ExternalReferenceType.BUILD_SYSTEM, url=XsUri('https://github.com/madpah/vexy/actions') ), ExternalReference( - reference_type=ExternalReferenceType.DISTRIBUTION, + type_=ExternalReferenceType.DISTRIBUTION, url=XsUri('https://pypi.org/project/vexy/') ), ExternalReference( - reference_type=ExternalReferenceType.DOCUMENTATION, + type_=ExternalReferenceType.DOCUMENTATION, url=XsUri('https://vexy.readthedocs.io/') ), ExternalReference( - reference_type=ExternalReferenceType.ISSUE_TRACKER, + type_=ExternalReferenceType.ISSUE_TRACKER, url=XsUri('https://github.com/madpah/vexy/issues') ), ExternalReference( - reference_type=ExternalReferenceType.LICENSE, + type_=ExternalReferenceType.LICENSE, url=XsUri('https://github.com/madpah/vexy/blob/main/LICENSE') ), ExternalReference( - reference_type=ExternalReferenceType.RELEASE_NOTES, + type_=ExternalReferenceType.RELEASE_NOTES, url=XsUri('https://github.com/madpah/vexy/blob/main/CHANGELOG.md') ), ExternalReference( - reference_type=ExternalReferenceType.VCS, + type_=ExternalReferenceType.VCS, url=XsUri('https://github.com/madpah/vexy') ) ]) @@ -149,16 +152,22 @@ def execute(self) -> None: ) progress.start_task(task_id=task_parse) - parser: BaseParser - if str(self._arguments.input_source.name).endswith('.json'): - parser = CycloneDxJsonParser(input_file=self._arguments.input_source) - elif str(self._arguments.input_source.name).endswith('.xml'): - parser = CycloneDxXmlParser(input_file=self._arguments.input_source) + # parser: BaseParser + input_bom: Bom + try: + with self._arguments.input_source as input_bom_fh: + if str(self._arguments.input_source.name).endswith('.json'): + input_bom = Bom.from_json(data=json.loads(input_bom_fh.read())) + + elif str(self._arguments.input_source.name).endswith('.xml'): + input_bom = Bom.from_xml(data=ElementTree.fromstring(input_bom_fh.read())) + except CycloneDxException as e: + print(f'Failure validating input BOM: {e}') + return - parser.parse_bom() progress.update( task_id=task_parse, completed=100, - description=f'Parsed {len(parser.bom.components)} Components from CycloneDX SBOM' + description=f'Parsed {len(input_bom.components)} Components from CycloneDX SBOM' ) vex = Bom() @@ -169,7 +178,7 @@ def execute(self) -> None: f'Consulting {data_source.source_name()} for known vulnerabilities', total=100, visible=not self._is_quiet() ) - data_source.process_components(components=parser.bom.components) + data_source.process_components(components=input_bom.components) progress.update( task_id=data_source_tasks[data_source.__class__], completed=25, description=f'{data_source.source_name()}: Querying for {len(data_source.valid_components)} ' @@ -187,7 +196,7 @@ def execute(self) -> None: i: int = 1 for v in vulnerabilities: for a in v.affects: - a.ref = f'{parser.bom.urn()}#{quote(a.ref, safe=printable)}' + a.ref = f'{input_bom.urn()}#{quote(a.ref, safe=printable)}' vex.vulnerabilities.add(v) progress.update( task_id=data_source_tasks[data_source.__class__], diff --git a/vexy/parser/__init__.py b/vexy/parser/__init__.py deleted file mode 100644 index c551c68..0000000 --- a/vexy/parser/__init__.py +++ /dev/null @@ -1,47 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 - -# This file is part of Vexy -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# Copyright (c) Paul Horton. All Rights Reserved. - -from abc import ABC, abstractmethod -from io import TextIOWrapper - -from cyclonedx.model.bom import Bom - - -class BaseParser(ABC): - - def __init__(self, input_file: TextIOWrapper) -> None: - self._input_file = input_file - self._bom = Bom() - - @property - def input_file(self) -> TextIOWrapper: - return self._input_file - - @property - def bom(self) -> Bom: - return self._bom - - @bom.setter - def bom(self, bom: Bom) -> None: - self._bom = bom - - @abstractmethod - def parse_bom(self) -> None: - pass diff --git a/vexy/parser/cyclonedx.py b/vexy/parser/cyclonedx.py deleted file mode 100644 index c378277..0000000 --- a/vexy/parser/cyclonedx.py +++ /dev/null @@ -1,121 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 - -# This file is part of Vexy -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# SPDX-License-Identifier: Apache-2.0 -# Copyright (c) Paul Horton. All Rights Reserved. - -""" -Contains classes and methods for parsing a Component List from CycloneDX BOM documents. -""" - -import json -import keyword -from typing import Any, Dict, Set, cast -from uuid import UUID -from xml.dom.minidom import Element, Text, parseString - -from cyclonedx.model.bom import Bom -from cyclonedx.model.component import Component - -# See https://github.com/package-url/packageurl-python/issues/65 -from packageurl import PackageURL # type: ignore - -from . import BaseParser - -_KEYWORDS: Set[str] = set(keyword.kwlist) -_JSON_IGNORE_KEYS = ['externalReferences', 'hashes', 'licenses'] -_JSON_KEY_MAPPINGS = { - 'type': 'component_type' -} -_XML_IGNORE_KEYS = ['externalReferences', 'hashes', 'licenses'] - - -class CycloneDxJsonParser(BaseParser): - - def parse_bom(self) -> None: - with self.input_file as input_file: - bom_data = json.loads(input_file.read()) - - # Handle Serial Number and Version - self.bom = Bom(serial_number=bom_data.get('serialNumber'), version=bom_data.get('version')) - - # Process Metadata - bom_metadata_data = bom_data.get('metadata') - self.bom.metadata.component = _component_from_json(bom_metadata_data.get('component')) - - # Process Components - bom_component_data = bom_data.get('components') - for c in bom_component_data: - self.bom.components.add(_component_from_json(json_data=c)) - - -class CycloneDxXmlParser(BaseParser): - - def parse_bom(self) -> None: - with self.input_file as input_file: - bom_data = parseString(input_file.read()) - - assert bom_data.documentElement.tagName == 'bom' - - # Handle Serial Number and Version - bom_attributes = bom_data.documentElement.attributes - self.bom = Bom() - if bom_attributes.get('serialNumber', None): - self.bom.uuid = UUID(bom_attributes.get('serialNumber').value) - # if bom_attributes.get('version', None): - # self.bomn.version_ = bom_attributes.get('version').value - - # Process Metadata - bom_metadata_data = bom_data.documentElement.getElementsByTagName('metadata')[0] - self.bom.metadata.component = _component_from_xml( - xml_element=bom_metadata_data.getElementsByTagName('component')[0] - ) - - # Process Components - bom_component_data = bom_data.documentElement.getElementsByTagName('components')[0] - bom_components_data = bom_component_data.getElementsByTagName('component') - for c in bom_components_data: - self.bom.components.add(_component_from_xml(xml_element=c)) - - -def _component_from_json(json_data: Dict[str, Any]) -> Component: - jd = {} - for k, v in json_data.items(): - if k in _JSON_IGNORE_KEYS: - continue - k = k.replace('-', '_') - if k in _KEYWORDS: - k = f'{k}_' - if k in _JSON_KEY_MAPPINGS: - k = _JSON_KEY_MAPPINGS[k] - if k == 'purl': - v = PackageURL.from_string(purl=v) - jd.update({k: v}) - - return Component(**jd) - - -def _component_from_xml(xml_element: Element) -> Component: - jd = {} - for e in xml_element.childNodes: - if isinstance(e, Element): - if e.nodeName == 'purl': - jd.update({e.nodeName: PackageURL.from_string(purl=str(cast(Text, e.firstChild).data).strip())}) - elif e.nodeName not in _XML_IGNORE_KEYS: - jd.update({e.nodeName: str(cast(Text, e.firstChild).data).strip()}) - - return Component(**jd)