diff --git a/CHANGELOG.md b/CHANGELOG.md index 67a32b73..a19331d0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added - Upcoming changes... +## [1.40.1] - 2025-10-29 +### Changed +- Refactored inspect module structure for better organization + - Reorganized inspection modules into `policy_check` and `summary` subdirectories + - Moved copyleft and undeclared component checks to `policy_check/scanoss/` + - Moved component, license, and match summaries to `summary/` + - Moved Dependency Track policy checks to `policy_check/dependency_track/` + - Extracted common scan result processing logic into `ScanResultProcessor` utility class + - Improved type safety with `PolicyOutput` named tuple for policy check results + - Made `PolicyCheck` class explicitly abstract with ABC +### Added +- Added Makefile targets for running ruff linter (`linter`, `linter-fix`, `linter-docker`, `linter-docker-fix`) + ## [1.40.0] - 2025-10-29 ### Added - Add support for `--rest` to `folder-scan` command @@ -716,3 +729,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 [1.38.0]: https://github.com/scanoss/scanoss.py/compare/v1.37.1...v1.38.0 [1.39.0]: https://github.com/scanoss/scanoss.py/compare/v1.38.0...v1.39.0 [1.40.0]: https://github.com/scanoss/scanoss.py/compare/v1.39.0...v1.40.0 +[1.40.1]: https://github.com/scanoss/scanoss.py/compare/v1.40.0...v1.40.1 \ No newline at end of file diff --git a/Makefile b/Makefile index ebde3c57..9c13989a 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,9 @@ dev_setup: date_time_clean ## Setup Python dev env for the current user @echo "Setting up dev env for the current user..." pip3 install -e . +dev_install: ## Install dev dependencies + pip3 install -r requirements-dev.txt + dev_uninstall: ## Uninstall Python dev setup for the current user @echo "Uninstalling dev env..." pip3 uninstall -y scanoss @@ -50,6 +53,18 @@ publish_test: ## Publish the Python package to TestPyPI @echo "Publishing package to TestPyPI..." twine upload --repository testpypi dist/* +lint-docker: ## Run ruff linter with docker + @./tools/linter.sh --docker + +lint-docker-fix: ## Run ruff linter with docker and auto-fix + @./tools/linter.sh --docker --fix + +lint: ## Run ruff linter locally + @./tools/linter.sh + +lint-fix: ## Run ruff linter locally with auto-fix + @./tools/linter.sh --fix + publish: ## Publish Python package to PyPI @echo "Publishing package to PyPI..." twine upload dist/* diff --git a/src/scanoss/__init__.py b/src/scanoss/__init__.py index 13f9d8ab..694457df 100644 --- a/src/scanoss/__init__.py +++ b/src/scanoss/__init__.py @@ -22,4 +22,4 @@ THE SOFTWARE. """ -__version__ = '1.40.0' +__version__ = '1.40.1' diff --git a/src/scanoss/cli.py b/src/scanoss/cli.py index c5dd20a3..824c5133 100644 --- a/src/scanoss/cli.py +++ b/src/scanoss/cli.py @@ -35,12 +35,6 @@ from scanoss.cryptography import Cryptography, create_cryptography_config_from_args from scanoss.delta import Delta from scanoss.export.dependency_track import DependencyTrackExporter -from scanoss.inspection.dependency_track.project_violation import ( - DependencyTrackProjectViolationPolicyCheck, -) -from scanoss.inspection.raw.component_summary import ComponentSummary -from scanoss.inspection.raw.license_summary import LicenseSummary -from scanoss.inspection.raw.match_summary import MatchSummary from scanoss.scanners.container_scanner import ( DEFAULT_SYFT_COMMAND, DEFAULT_SYFT_TIMEOUT, @@ -75,8 +69,14 @@ from .cyclonedx import CycloneDx from .filecount import FileCount from .gitlabqualityreport import GitLabQualityReport -from .inspection.raw.copyleft import Copyleft -from .inspection.raw.undeclared_component import UndeclaredComponent +from .inspection.policy_check.dependency_track.project_violation import ( + DependencyTrackProjectViolationPolicyCheck, +) +from .inspection.policy_check.scanoss.copyleft import Copyleft +from .inspection.policy_check.scanoss.undeclared_component import UndeclaredComponent +from .inspection.summary.component_summary import ComponentSummary +from .inspection.summary.license_summary import LicenseSummary +from .inspection.summary.match_summary import MatchSummary from .results import Results from .scancodedeps import ScancodeDeps from .scanner import FAST_WINNOWING, Scanner @@ -1753,7 +1753,6 @@ def inspect_copyleft(parser, args): exclude=args.exclude, # Licenses to ignore explicit=args.explicit, # Explicit license list ) - # Execute inspection and exit with appropriate status code status, _ = i_copyleft.run() sys.exit(status) diff --git a/src/scanoss/gitlabqualityreport.py b/src/scanoss/gitlabqualityreport.py index 62dc25f4..1a1b8ec6 100644 --- a/src/scanoss/gitlabqualityreport.py +++ b/src/scanoss/gitlabqualityreport.py @@ -74,16 +74,21 @@ def __init__(self, debug: bool = False, trace: bool = False, quiet: bool = False Initialise the GitLabCodeQuality class """ super().__init__(debug, trace, quiet) + self.print_trace(f"GitLabQualityReport initialized with debug={debug}, trace={trace}, quiet={quiet}") def _get_code_quality(self, file_name: str, result: dict) -> CodeQuality or None: + self.print_trace(f"_get_code_quality called for file: {file_name}") + self.print_trace(f"Processing result: {result}") + if not result.get('file_hash'): self.print_debug(f"Warning: no hash found for result: {result}") return None if result.get('id') == 'file': + self.print_debug(f"Processing file match for: {file_name}") description = f"File match found in: {file_name}" - return CodeQuality( + code_quality = CodeQuality( description=description, check_name=file_name, fingerprint=result.get('file_hash'), @@ -95,17 +100,21 @@ def _get_code_quality(self, file_name: str, result: dict) -> CodeQuality or None ) ) ) + self.print_trace(f"Created file CodeQuality object: {code_quality}") + return code_quality if not result.get('lines'): self.print_debug(f"Warning: No lines found for result: {result}") return None lines = scanoss_scan_results_utils.get_lines(result.get('lines')) + self.print_trace(f"Extracted lines: {lines}") if len(lines) == 0: self.print_debug(f"Warning: empty lines for result: {result}") return None end_line = lines[len(lines) - 1] if len(lines) > 1 else lines[0] description = f"Snippet found in: {file_name} - lines {lines[0]}-{end_line}" - return CodeQuality( + self.print_debug(f"Processing snippet match for: {file_name}, lines: {lines[0]}-{end_line}") + code_quality = CodeQuality( description=description, check_name=file_name, fingerprint=result.get('file_hash'), @@ -117,35 +126,47 @@ def _get_code_quality(self, file_name: str, result: dict) -> CodeQuality or None ) ) ) + self.print_trace(f"Created snippet CodeQuality object: {code_quality}") + return code_quality def _write_output(self, data: list[CodeQuality], output_file: str = None) -> bool: """Write the Gitlab Code Quality Report to output.""" + self.print_trace(f"_write_output called with {len(data)} items, output_file: {output_file}") try: json_data = [item.to_dict() for item in data] + self.print_trace(f"JSON data: {json_data}") file = open(output_file, 'w') if output_file else sys.stdout print(json.dumps(json_data, indent=2), file=file) if output_file: file.close() + self.print_debug(f"Wrote output to file: {output_file}") + else: + self.print_debug("Wrote output to 'stdout'") return True except Exception as e: self.print_stderr(f'Error writing output: {str(e)}') return False def _produce_from_json(self, data: dict, output_file: str = None) -> bool: + self.print_trace(f"_produce_from_json called with output_file: {output_file}") + self.print_debug(f"Processing {len(data)} files from JSON data") code_quality = [] for file_name, results in data.items(): + self.print_trace(f"Processing file: {file_name} with {len(results)} results") for result in results: if not result.get('id'): self.print_debug(f"Warning: No ID found for result: {result}") continue if result.get('id') != 'snippet' and result.get('id') != 'file': - self.print_debug(f"Skipping non-snippet/file match: {result}") + self.print_debug(f"Skipping non-snippet/file match: {file_name}, id: '{result['id']}'") continue code_quality_item = self._get_code_quality(file_name, result) if code_quality_item: code_quality.append(code_quality_item) + self.print_trace(f"Added code quality item for {file_name}") else: self.print_debug(f"Warning: No Code Quality found for result: {result}") + self.print_debug(f"Generated {len(code_quality)} code quality items") self._write_output(data=code_quality,output_file=output_file) return True @@ -156,11 +177,15 @@ def _produce_from_str(self, json_str: str, output_file: str = None) -> bool: :param output_file: Output file (optional) :return: True if successful, False otherwise """ + self.print_trace(f"_produce_from_str called with output_file: {output_file}") if not json_str: self.print_stderr('ERROR: No JSON string provided to parse.') return False + self.print_debug(f"Parsing JSON string of length: {len(json_str)}") try: data = json.loads(json_str) + self.print_debug("Successfully parsed JSON data") + self.print_trace(f"Parsed data structure: {type(data)}") except Exception as e: self.print_stderr(f'ERROR: Problem parsing input JSON: {e}') return False @@ -174,12 +199,16 @@ def produce_from_file(self, json_file: str, output_file: str = None) -> bool: :param output_file: :return: True if successful, False otherwise """ + self.print_trace(f"produce_from_file called with json_file: {json_file}, output_file: {output_file}") + self.print_debug(f"Input JSON file: {json_file}, output_file: {output_file}") if not json_file: self.print_stderr('ERROR: No JSON file provided to parse.') return False if not os.path.isfile(json_file): self.print_stderr(f'ERROR: JSON file does not exist or is not a file: {json_file}') return False + self.print_debug(f"Reading JSON file: {json_file}") with open(json_file, 'r') as f: - success = self._produce_from_str(f.read(), output_file) + json_content = f.read() + success = self._produce_from_str(json_content, output_file) return success diff --git a/src/scanoss/inspection/raw/__init__.py b/src/scanoss/inspection/policy_check/__init__.py similarity index 100% rename from src/scanoss/inspection/raw/__init__.py rename to src/scanoss/inspection/policy_check/__init__.py diff --git a/src/scanoss/inspection/policy_check/dependency_track/__init__.py b/src/scanoss/inspection/policy_check/dependency_track/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/scanoss/inspection/dependency_track/project_violation.py b/src/scanoss/inspection/policy_check/dependency_track/project_violation.py similarity index 93% rename from src/scanoss/inspection/dependency_track/project_violation.py rename to src/scanoss/inspection/policy_check/dependency_track/project_violation.py index c891d76c..e9d78667 100644 --- a/src/scanoss/inspection/dependency_track/project_violation.py +++ b/src/scanoss/inspection/policy_check/dependency_track/project_violation.py @@ -26,9 +26,9 @@ from datetime import datetime from typing import Any, Dict, List, Optional, TypedDict -from ...services.dependency_track_service import DependencyTrackService -from ..policy_check import PolicyCheck, PolicyStatus -from ..utils.markdown_utils import generate_jira_table, generate_table +from ....services.dependency_track_service import DependencyTrackService +from ...utils.markdown_utils import generate_jira_table, generate_table +from ..policy_check import PolicyCheck, PolicyOutput, PolicyStatus # Constants PROCESSING_RETRY_DELAY = 5 # seconds @@ -171,7 +171,7 @@ def __init__( # noqa: PLR0913 self.url = url.strip().rstrip('/') if url else None self.dep_track_service = DependencyTrackService(self.api_key, self.url, debug=debug, trace=trace, quiet=quiet) - def _json(self, project_violations: list[PolicyViolationDict]) -> Dict[str, Any]: + def _json(self, project_violations: list[PolicyViolationDict]) -> PolicyOutput: """ Format project violations as JSON. @@ -181,12 +181,12 @@ def _json(self, project_violations: list[PolicyViolationDict]) -> Dict[str, Any] Returns: Dictionary containing JSON formatted results and summary """ - return { - "details": json.dumps(project_violations, indent=2), - "summary": f'{len(project_violations)} policy violations were found.\n', - } + return PolicyOutput( + details= json.dumps(project_violations, indent=2), + summary= f'{len(project_violations)} policy violations were found.\n', + ) - def _markdown(self, project_violations: list[PolicyViolationDict]) -> Dict[str, Any]: + def _markdown(self, project_violations: list[PolicyViolationDict]) -> PolicyOutput: """ Format Dependency Track violations to Markdown format. @@ -198,7 +198,7 @@ def _markdown(self, project_violations: list[PolicyViolationDict]) -> Dict[str, """ return self._md_summary_generator(project_violations, generate_table) - def _jira_markdown(self, data: list[PolicyViolationDict]) -> Dict[str, Any]: + def _jira_markdown(self, data: list[PolicyViolationDict]) -> PolicyOutput: """ Format project violations for Jira Markdown. @@ -357,8 +357,7 @@ def _set_project_id(self) -> None: self.print_stderr(f'Error: Failed to get project uuid from: {dt_project}') raise ValueError(f'Error: Project {self.project_name}@{self.project_version} does not have a valid UUID') - @staticmethod - def _sort_project_violations(violations: List[PolicyViolationDict]) -> List[PolicyViolationDict]: + def _sort_project_violations(self,violations: List[PolicyViolationDict]) -> List[PolicyViolationDict]: """ Sort project violations by priority. @@ -377,7 +376,7 @@ def _sort_project_violations(violations: List[PolicyViolationDict]) -> List[Poli key=lambda x: -type_priority.get(x.get('type', 'OTHER'), 1) ) - def _md_summary_generator(self, project_violations: list[PolicyViolationDict], table_generator): + def _md_summary_generator(self, project_violations: list[PolicyViolationDict], table_generator) -> PolicyOutput: """ Generates a Markdown summary of project policy violations. @@ -396,10 +395,10 @@ def _md_summary_generator(self, project_violations: list[PolicyViolationDict], t """ if project_violations is None: self.print_stderr('Warning: No project violations found. Returning empty results.') - return { - "details": "h3. Dependency Track Project Violations\n\nNo policy violations found.\n", - "summary": "0 policy violations were found.\n", - } + return PolicyOutput( + details= "h3. Dependency Track Project Violations\n\nNo policy violations found.\n", + summary= "0 policy violations were found.\n", + ) headers = ['State', 'Risk Type', 'Policy Name', 'Component', 'Date'] c_cols = [0, 1] rows: List[List[str]] = [] @@ -424,11 +423,11 @@ def _md_summary_generator(self, project_violations: list[PolicyViolationDict], t ] rows.append(row) # End for loop - return { - "details": f'### Dependency Track Project Violations\n{table_generator(headers, rows, c_cols)}\n\n' + return PolicyOutput( + details= f'### Dependency Track Project Violations\n{table_generator(headers, rows, c_cols)}\n\n' f'View project in Dependency Track [here]({self.url}/projects/{self.project_id}).\n', - "summary": f'{len(project_violations)} policy violations were found.\n' - } + summary= f'{len(project_violations)} policy violations were found.\n' + ) def run(self) -> int: """ @@ -470,10 +469,11 @@ def run(self) -> int: self.print_stderr('Error: Invalid format specified.') return PolicyStatus.ERROR.value # Format and output data - handle empty results gracefully - data = formatter(self._sort_project_violations(dt_project_violations)) - self.print_to_file_or_stdout(data['details'], self.output) - self.print_to_file_or_stderr(data['summary'], self.status) + policy_output = formatter(self._sort_project_violations(dt_project_violations)) + self.print_to_file_or_stdout(policy_output.details, self.output) + self.print_to_file_or_stderr(policy_output.summary, self.status) # Return appropriate status based on violation count if len(dt_project_violations) > 0: return PolicyStatus.POLICY_FAIL.value return PolicyStatus.POLICY_SUCCESS.value + diff --git a/src/scanoss/inspection/policy_check.py b/src/scanoss/inspection/policy_check/policy_check.py similarity index 86% rename from src/scanoss/inspection/policy_check.py rename to src/scanoss/inspection/policy_check/policy_check.py index cd01972f..89263a30 100644 --- a/src/scanoss/inspection/policy_check.py +++ b/src/scanoss/inspection/policy_check/policy_check.py @@ -22,12 +22,12 @@ THE SOFTWARE. """ -from abc import abstractmethod +from abc import ABC, abstractmethod from enum import Enum -from typing import Any, Callable, Dict, Generic, List, TypeVar +from typing import Callable, Dict, Generic, List, NamedTuple, TypeVar -from ..scanossbase import ScanossBase -from .utils.license_utils import LicenseUtil +from ...scanossbase import ScanossBase +from ..utils.license_utils import LicenseUtil class PolicyStatus(Enum): @@ -46,9 +46,13 @@ class PolicyStatus(Enum): # End of PolicyStatus Class # +class PolicyOutput(NamedTuple): + details: str + summary: str + T = TypeVar('T') -class PolicyCheck(ScanossBase, Generic[T]): +class PolicyCheck(ScanossBase, Generic[T], ABC): """ A base class for implementing various software policy checks. @@ -80,7 +84,7 @@ def __init__( # noqa: PLR0913 self.output = output @abstractmethod - def run(self): + def run(self)-> tuple[int,PolicyOutput]: """ Execute the policy check process. @@ -91,14 +95,14 @@ def run(self): 3. Formatting the results 4. Saving the output to files if required - :return: A tuple containing: + :return: A named tuple containing two elements: - First element: PolicyStatus enum value (SUCCESS, FAIL, or ERROR) - - Second element: Dictionary containing the inspection results + - Second element: PolicyOutput A tuple containing the policy results. """ pass @abstractmethod - def _json(self, data: list[T]) -> Dict[str, Any]: + def _json(self, data: list[T]) -> PolicyOutput: """ Format the policy checks results as JSON. This method should be implemented by subclasses to create a Markdown representation @@ -112,7 +116,7 @@ def _json(self, data: list[T]) -> Dict[str, Any]: pass @abstractmethod - def _markdown(self, data: list[T]) -> Dict[str, Any]: + def _markdown(self, data: list[T]) -> PolicyOutput: """ Generate Markdown output for the policy check results. @@ -125,7 +129,7 @@ def _markdown(self, data: list[T]) -> Dict[str, Any]: pass @abstractmethod - def _jira_markdown(self, data: list[T]) -> Dict[str, Any]: + def _jira_markdown(self, data: list[T]) -> PolicyOutput: """ Generate Markdown output for the policy check results. @@ -137,7 +141,7 @@ def _jira_markdown(self, data: list[T]) -> Dict[str, Any]: """ pass - def _get_formatter(self) -> Callable[[List[dict]], Dict[str, Any]] or None: + def _get_formatter(self) -> Callable[[List[dict]], PolicyOutput]: """ Get the appropriate formatter function based on the specified format. @@ -145,7 +149,7 @@ def _get_formatter(self) -> Callable[[List[dict]], Dict[str, Any]] or None: """ valid_format = self._is_valid_format() if not valid_format: - return None + raise ValueError('Invalid format specified') # a map of which format function to return function_map = { 'json': self._json, @@ -205,14 +209,14 @@ def _generate_formatter_report(self, components: list[Dict]): if formatter is None: return PolicyStatus.ERROR.value, {} # Format the results - data = formatter(components) + policy_output = formatter(components) ## Save outputs if required - self.print_to_file_or_stdout(data['details'], self.output) - self.print_to_file_or_stderr(data['summary'], self.status) + self.print_to_file_or_stdout(policy_output.details, self.output) + self.print_to_file_or_stderr(policy_output.summary, self.status) # Check to see if we have policy violations if len(components) > 0: - return PolicyStatus.POLICY_FAIL.value, data - return PolicyStatus.POLICY_SUCCESS.value, data + return PolicyStatus.POLICY_FAIL.value, policy_output + return PolicyStatus.POLICY_SUCCESS.value, policy_output # # End of PolicyCheck Class # \ No newline at end of file diff --git a/src/scanoss/inspection/policy_check/scanoss/__init__.py b/src/scanoss/inspection/policy_check/scanoss/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/scanoss/inspection/raw/copyleft.py b/src/scanoss/inspection/policy_check/scanoss/copyleft.py similarity index 80% rename from src/scanoss/inspection/raw/copyleft.py rename to src/scanoss/inspection/policy_check/scanoss/copyleft.py index 97c25bab..08694854 100644 --- a/src/scanoss/inspection/raw/copyleft.py +++ b/src/scanoss/inspection/policy_check/scanoss/copyleft.py @@ -24,11 +24,11 @@ import json from dataclasses import dataclass -from typing import Any, Dict, List +from typing import Dict, List -from ..policy_check import PolicyStatus -from ..utils.markdown_utils import generate_jira_table, generate_table -from .raw_base import RawBase +from ...policy_check.policy_check import PolicyCheck, PolicyOutput, PolicyStatus +from ...utils.markdown_utils import generate_jira_table, generate_table +from ...utils.scan_result_processor import ScanResultProcessor @dataclass @@ -45,7 +45,7 @@ class Component: licenses: List[License] status: str -class Copyleft(RawBase[Component]): +class Copyleft(PolicyCheck[Component]): """ SCANOSS Copyleft class Inspects components for copyleft licenses @@ -78,17 +78,23 @@ def __init__( # noqa: PLR0913 :param exclude: Licenses to exclude from the analysis :param explicit: Explicitly defined licenses """ - super().__init__(debug, trace, quiet, format_type,filepath, output ,status, name='Copyleft Policy') + super().__init__( + debug, trace, quiet, format_type, status, name='Copyleft Policy', output=output + ) self.license_util.init(include, exclude, explicit) self.filepath = filepath - self.format = format self.output = output self.status = status - self.include = include - self.exclude = exclude - self.explicit = explicit + self.results_processor = ScanResultProcessor( + self.debug, + self.trace, + self.quiet, + self.filepath, + include, + exclude, + explicit) - def _json(self, components: list[Component]) -> Dict[str, Any]: + def _json(self, components: list[Component]) -> PolicyOutput: """ Format the components with copyleft licenses as JSON. @@ -96,16 +102,16 @@ def _json(self, components: list[Component]) -> Dict[str, Any]: :return: Dictionary with formatted JSON details and summary """ # A component is considered unique by its combination of PURL (Package URL) and license - component_licenses = self._group_components_by_license(components) + component_licenses = self.results_processor.group_components_by_license(components) details = {} if len(components) > 0: details = {'components': components} - return { - 'details': f'{json.dumps(details, indent=2)}\n', - 'summary': f'{len(component_licenses)} component(s) with copyleft licenses were found.\n', - } + return PolicyOutput( + details= f'{json.dumps(details, indent=2)}\n', + summary= f'{len(component_licenses)} component(s) with copyleft licenses were found.\n', + ) - def _markdown(self, components: list[Component]) -> Dict[str, Any]: + def _markdown(self, components: list[Component]) -> PolicyOutput: """ Format the components with copyleft licenses as Markdown. @@ -114,7 +120,7 @@ def _markdown(self, components: list[Component]) -> Dict[str, Any]: """ return self._md_summary_generator(components, generate_table) - def _jira_markdown(self, components: list[Component]) -> Dict[str, Any]: + def _jira_markdown(self, components: list[Component]) -> PolicyOutput: """ Format the components with copyleft licenses as Markdown. @@ -123,7 +129,7 @@ def _jira_markdown(self, components: list[Component]) -> Dict[str, Any]: """ return self._md_summary_generator(components, generate_jira_table) - def _md_summary_generator(self, components: list[Component], table_generator): + def _md_summary_generator(self, components: list[Component], table_generator) -> PolicyOutput: """ Generates a Markdown summary for components with a focus on copyleft licenses. @@ -138,15 +144,10 @@ def _md_summary_generator(self, components: list[Component], table_generator): A callable function to generate tabular data for components. Returns: - dict - A dictionary containing two keys: - - 'details': A detailed Markdown representation including a table of components - and associated copyleft license data. - - 'summary': A textual summary highlighting the total number of components - with copyleft licenses. + PolicyOutput """ # A component is considered unique by its combination of PURL (Package URL) and license - component_licenses = self._group_components_by_license(components) + component_licenses = self.results_processor.group_components_by_license(components) headers = ['Component', 'License', 'URL', 'Copyleft'] centered_columns = [1, 4] rows = [] @@ -160,10 +161,10 @@ def _md_summary_generator(self, components: list[Component], table_generator): rows.append(row) # End license loop # End component loop - return { - 'details': f'### Copyleft Licenses\n{table_generator(headers, rows, centered_columns)}', - 'summary': f'{len(component_licenses)} component(s) with copyleft licenses were found.\n', - } + return PolicyOutput( + details= f'### Copyleft Licenses\n{table_generator(headers, rows, centered_columns)}', + summary= f'{len(component_licenses)} component(s) with copyleft licenses were found.\n', + ) def _get_components_with_copyleft_licenses(self, components: list) -> list[Dict]: """ @@ -202,14 +203,13 @@ def _get_components(self): :return: A list of processed components with license data, or `None` if `self.results` is not set. """ - if self.results is None: + if self.results_processor.get_results() is None: return None - components: dict = {} # Extract component and license data from file and dependency results. Both helpers mutate `components` - self._get_components_data(self.results, components) - self._get_dependencies_data(self.results, components) - return self._convert_components_to_list(components) + self.results_processor.get_components_data(components) + self.results_processor.get_dependencies_data(components) + return self.results_processor.convert_components_to_list(components) def run(self): """ diff --git a/src/scanoss/inspection/raw/undeclared_component.py b/src/scanoss/inspection/policy_check/scanoss/undeclared_component.py similarity index 85% rename from src/scanoss/inspection/raw/undeclared_component.py rename to src/scanoss/inspection/policy_check/scanoss/undeclared_component.py index a7e32dac..ce122a5b 100644 --- a/src/scanoss/inspection/raw/undeclared_component.py +++ b/src/scanoss/inspection/policy_check/scanoss/undeclared_component.py @@ -24,11 +24,11 @@ import json from dataclasses import dataclass -from typing import Any, Dict, List +from typing import List -from ..policy_check import PolicyStatus -from ..utils.markdown_utils import generate_jira_table, generate_table -from .raw_base import RawBase +from ...policy_check.policy_check import PolicyCheck, PolicyOutput, PolicyStatus +from ...utils.markdown_utils import generate_jira_table, generate_table +from ...utils.scan_result_processor import ScanResultProcessor @dataclass @@ -44,7 +44,7 @@ class Component: licenses: List[License] status: str -class UndeclaredComponent(RawBase[Component]): +class UndeclaredComponent(PolicyCheck[Component]): """ SCANOSS UndeclaredComponent class Inspects for undeclared components @@ -59,7 +59,7 @@ def __init__( # noqa: PLR0913 format_type: str = 'json', status: str = None, output: str = None, - sbom_format: str = 'settings', + sbom_format: str = 'settings' ): """ Initialize the UndeclaredComponent class. @@ -74,13 +74,14 @@ def __init__( # noqa: PLR0913 :param sbom_format: Sbom format for status output (default 'settings') """ super().__init__( - debug, trace, quiet,format_type, filepath, output, status, name='Undeclared Components Policy' + debug, trace, quiet, format_type, status, name='Undeclared Components Policy', output=output ) self.filepath = filepath - self.format = format self.output = output self.status = status self.sbom_format = sbom_format + self.results_processor = ScanResultProcessor(self.debug, self.trace, self.quiet, self.filepath) + def _get_undeclared_components(self, components: list[Component]) -> list or None: """ @@ -163,7 +164,7 @@ def _get_summary(self, components: list) -> str: return summary - def _json(self, components: list[Component]) -> Dict[str, Any]: + def _json(self, components: list[Component]) -> PolicyOutput: """ Format the undeclared components as JSON. @@ -171,16 +172,16 @@ def _json(self, components: list[Component]) -> Dict[str, Any]: :return: Dictionary with formatted JSON details and summary """ # Use component grouped by licenses to generate the summary - component_licenses = self._group_components_by_license(components) + component_licenses = self.results_processor.group_components_by_license(components) details = {} if len(components) > 0: details = {'components': components} - return { - 'details': f'{json.dumps(details, indent=2)}\n', - 'summary': self._get_summary(component_licenses), - } + return PolicyOutput( + details=f'{json.dumps(details, indent=2)}\n', + summary=self._get_summary(component_licenses) + ) - def _markdown(self, components: list[Component]) -> Dict[str, Any]: + def _markdown(self, components: list[Component]) -> PolicyOutput: """ Format the undeclared components as Markdown. @@ -190,15 +191,15 @@ def _markdown(self, components: list[Component]) -> Dict[str, Any]: headers = ['Component', 'License'] rows = [] # TODO look at using SpdxLite license name lookup method - component_licenses = self._group_components_by_license(components) + component_licenses = self.results_processor.group_components_by_license(components) for component in component_licenses: rows.append([component.get('purl'), component.get('spdxid')]) - return { - 'details': f'### Undeclared components\n{generate_table(headers, rows)}\n', - 'summary': self._get_summary(component_licenses), - } + return PolicyOutput( + details= f'### Undeclared components\n{generate_table(headers, rows)}\n', + summary= self._get_summary(component_licenses), + ) - def _jira_markdown(self, components: list) -> Dict[str, Any]: + def _jira_markdown(self, components: list) -> PolicyOutput: """ Format the undeclared components as Markdown. @@ -208,13 +209,13 @@ def _jira_markdown(self, components: list) -> Dict[str, Any]: headers = ['Component', 'License'] rows = [] # TODO look at using SpdxLite license name lookup method - component_licenses = self._group_components_by_license(components) + component_licenses = self.results_processor.group_components_by_license(components) for component in component_licenses: rows.append([component.get('purl'), component.get('spdxid')]) - return { - 'details': f'{generate_jira_table(headers, rows)}', - 'summary': self._get_jira_summary(component_licenses), - } + return PolicyOutput( + details= f'{generate_jira_table(headers, rows)}', + summary= self._get_jira_summary(component_licenses), + ) def _get_unique_components(self, components: list) -> list: """ @@ -272,13 +273,13 @@ def _get_components(self): :return: A list of processed components with their licenses, or `None` if `self.results` is not set. """ - if self.results is None: + if self.results_processor.get_results() is None: return None components: dict = {} # Extract file and snippet components - components = self._get_components_data(self.results, components) + components = self.results_processor.get_components_data(components) # Convert to list and process licenses - return self._convert_components_to_list(components) + return self.results_processor.convert_components_to_list(components) def run(self): """ diff --git a/src/scanoss/inspection/summary/__init__.py b/src/scanoss/inspection/summary/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/scanoss/inspection/raw/component_summary.py b/src/scanoss/inspection/summary/component_summary.py similarity index 81% rename from src/scanoss/inspection/raw/component_summary.py rename to src/scanoss/inspection/summary/component_summary.py index 03bddbd8..b4563176 100644 --- a/src/scanoss/inspection/raw/component_summary.py +++ b/src/scanoss/inspection/summary/component_summary.py @@ -24,11 +24,36 @@ import json from typing import Any -from ..policy_check import T -from .raw_base import RawBase +from ...scanossbase import ScanossBase +from ..policy_check.policy_check import T +from ..utils.scan_result_processor import ScanResultProcessor + + +class ComponentSummary(ScanossBase): + + def __init__( # noqa: PLR0913 + self, + debug: bool = False, + trace: bool = False, + quiet: bool = False, + filepath: str = None, + format_type: str = 'json', + output: str = None, + ): + """ + Initialize the ComponentSummary class. + :param debug: Enable debug mode + :param trace: Enable trace mode + :param quiet: Enable quiet mode + :param filepath: Path to the file containing component data + :param format_type: Output format ('json' or 'md') + """ + super().__init__(debug, trace, quiet) + self.filepath = filepath + self.output = output + self.results_processor = ScanResultProcessor(debug, trace, quiet, filepath) -class ComponentSummary(RawBase): def _json(self, data: dict[str,Any]) -> dict[str,Any]: """ @@ -77,11 +102,11 @@ def _get_component_summary_from_components(self, scan_components: list)-> dict: """ Get a component summary from detected components. - :param components: List of all components + :param scan_components: List of all components :return: Dict with license summary information """ # A component is considered unique by its combination of PURL (Package URL) and license - component_licenses = self._group_components_by_license(scan_components) + component_licenses = self.results_processor.group_components_by_license(scan_components) total_components = len(component_licenses) # Get undeclared components undeclared_components = len([c for c in component_licenses if c['status'] == 'pending']) @@ -121,13 +146,13 @@ def _get_components(self): :return: A list of processed components with license data, or `None` if `self.results` is not set. """ - if self.results is None: - raise ValueError(f'Error: No results found in ${self.filepath}') + if self.results_processor.get_results() is None: + raise ValueError(f'Error: No results found in {self.filepath}') components: dict = {} # Extract component and license data from file and dependency results. Both helpers mutate `components` - self._get_components_data(self.results, components) - return self._convert_components_to_list(components) + self.results_processor.get_components_data(components) + return self.results_processor.convert_components_to_list(components) def _format(self, component_summary) -> str: # TODO: Implement formatter to support dynamic outputs diff --git a/src/scanoss/inspection/raw/license_summary.py b/src/scanoss/inspection/summary/license_summary.py similarity index 89% rename from src/scanoss/inspection/raw/license_summary.py rename to src/scanoss/inspection/summary/license_summary.py index c849ea0a..3cf3761e 100644 --- a/src/scanoss/inspection/raw/license_summary.py +++ b/src/scanoss/inspection/summary/license_summary.py @@ -25,11 +25,12 @@ import json from typing import Any -from ..policy_check import T -from .raw_base import RawBase +from ...scanossbase import ScanossBase +from ..policy_check.policy_check import T +from ..utils.scan_result_processor import ScanResultProcessor -class LicenseSummary(RawBase): +class LicenseSummary(ScanossBase): """ SCANOSS LicenseSummary class Inspects results and generates comprehensive license summaries from detected components. @@ -38,6 +39,42 @@ class LicenseSummary(RawBase): information, providing detailed summaries including copyleft analysis and license statistics. """ + # Define required license fields as class constants + REQUIRED_LICENSE_FIELDS = ['spdxid', 'url', 'copyleft', 'source'] + + def __init__( # noqa: PLR0913 + self, + debug: bool = False, + trace: bool = False, + quiet: bool = False, + filepath: str = None, + status: str = None, + output: str = None, + include: str = None, + exclude: str = None, + explicit: str = None, + ): + """ + Initialize the LicenseSummary class. + + :param debug: Enable debug mode + :param trace: Enable trace mode + :param quiet: Enable quiet mode + :param filepath: Path to the file containing component data + :param output: Path to save detailed output + :param include: Licenses to include in the analysis + :param exclude: Licenses to exclude from the analysis + :param explicit: Explicitly defined licenses + """ + super().__init__(debug=debug, trace=trace, quiet=quiet) + self.results_processor = ScanResultProcessor(debug, trace, quiet, filepath, include, exclude, explicit) + self.filepath = filepath + self.output = output + self.status = status + self.include = include + self.exclude = exclude + self.explicit = explicit + def _json(self, data: dict[str,Any]) -> dict[str, Any]: """ Format license summary data as JSON. @@ -78,41 +115,6 @@ def _jira_markdown(self, data: list[T]) -> dict[str, Any]: """ pass - # Define required license fields as class constants - REQUIRED_LICENSE_FIELDS = ['spdxid', 'url', 'copyleft', 'source'] - - def __init__( # noqa: PLR0913 - self, - debug: bool = False, - trace: bool = False, - quiet: bool = False, - filepath: str = None, - status: str = None, - output: str = None, - include: str = None, - exclude: str = None, - explicit: str = None, - ): - """ - Initialize the LicenseSummary class. - - :param debug: Enable debug mode - :param trace: Enable trace mode (default True) - :param quiet: Enable quiet mode - :param filepath: Path to the file containing component data - :param output: Path to save detailed output - :param include: Licenses to include in the analysis - :param exclude: Licenses to exclude from the analysis - :param explicit: Explicitly defined licenses - """ - super().__init__(debug, trace, quiet, filepath = filepath, output=output) - self.license_util.init(include, exclude, explicit) - self.filepath = filepath - self.output = output - self.status = status - self.include = include - self.exclude = exclude - self.explicit = explicit def _get_licenses_summary_from_components(self, components: list)-> dict: """ @@ -122,7 +124,7 @@ def _get_licenses_summary_from_components(self, components: list)-> dict: :return: Dict with license summary information """ # A component is considered unique by its combination of PURL (Package URL) and license - component_licenses = self._group_components_by_license(components) + component_licenses = self.results_processor.group_components_by_license(components) license_component_count = {} # Count license per component for lic in component_licenses: @@ -164,14 +166,14 @@ def _get_components(self): :return: A list of processed components with license data, or `None` if `self.results` is not set. """ - if self.results is None: - raise ValueError(f'Error: No results found in ${self.filepath}') + if self.results_processor.get_results() is None: + raise ValueError(f'Error: No results found in {self.filepath}') components: dict = {} # Extract component and license data from file and dependency results. Both helpers mutate `components` - self._get_components_data(self.results, components) - self._get_dependencies_data(self.results, components) - return self._convert_components_to_list(components) + self.results_processor.get_components_data(components) + self.results_processor.get_dependencies_data(components) + return self.results_processor.convert_components_to_list(components) def _format(self, license_summary) -> str: # TODO: Implement formatter to support dynamic outputs diff --git a/src/scanoss/inspection/raw/match_summary.py b/src/scanoss/inspection/summary/match_summary.py similarity index 82% rename from src/scanoss/inspection/raw/match_summary.py rename to src/scanoss/inspection/summary/match_summary.py index 645824bb..b79233e5 100644 --- a/src/scanoss/inspection/raw/match_summary.py +++ b/src/scanoss/inspection/summary/match_summary.py @@ -94,6 +94,7 @@ def __init__( # noqa: PLR0913 self.scanoss_results_path = scanoss_results_path self.line_range_prefix = line_range_prefix self.output = output + self.print_debug("Initializing MatchSummary class") def _get_match_summary_item(self, file_name: str, result: dict) -> MatchSummaryItem: @@ -108,11 +109,16 @@ def _get_match_summary_item(self, file_name: str, result: dict) -> MatchSummaryI :param result: SCANOSS scan result dictionary containing match details :return: Populated match summary item with all relevant information """ + self.print_trace(f"Creating match summary item for file: {file_name}, id: {result.get('id')}") + if result.get('id') == "snippet": # Snippet match: create URL with line range anchor lines = scanoss_scan_results_utils.get_lines(result.get('lines')) end_line = lines[len(lines) - 1] if len(lines) > 1 else lines[0] file_url = f"{self.line_range_prefix}/{file_name}#L{lines[0]}-L{end_line}" + + self.print_trace(f"Snippet match: lines {lines[0]}-{end_line}, purl: {result.get('purl')[0]}") + return MatchSummaryItem( file_url=file_url, file=file_name, @@ -124,6 +130,8 @@ def _get_match_summary_item(self, file_name: str, result: dict) -> MatchSummaryI lines=f"{lines[0]}-{lines[len(lines) - 1] if len(lines) > 1 else lines[0]}" ) # File match: create URL without line range + self.print_trace(f"File match: {file_name}, purl: {result.get('purl')[0]}, version: {result.get('version')}") + return MatchSummaryItem( file=file_name, file_url=f"{self.line_range_prefix}/{file_name}", @@ -176,12 +184,19 @@ def _get_matches_summary(self) -> ComponentMatchSummary: required fields and categorizing matches into file matches and snippet matches. Skips invalid or incomplete results with debug messages. """ + self.print_debug(f"Loading scan results from: {self.scanoss_results_path}") + # Load scan results from JSON file scan_results = load_json_file(self.scanoss_results_path) gitlab_matches_summary = ComponentMatchSummary(files=[], snippet=[]) + self.print_debug(f"Processing {len(scan_results)} files from scan results") + self.print_trace(f"Line range prefix set to: {self.line_range_prefix}") + # Process each file and its results for file_name, results in scan_results.items(): + self.print_trace(f"Processing file: {file_name} with {len(results)} results") + for result in results: # Skip non-matches if result.get('id') == "none": @@ -196,8 +211,15 @@ def _get_matches_summary(self) -> ComponentMatchSummary: summary_item = self._get_match_summary_item(file_name, result) if result.get('id') == "snippet": gitlab_matches_summary.snippet.append(summary_item) + self.print_trace(f"Added snippet match for {file_name}") else: gitlab_matches_summary.files.append(summary_item) + self.print_trace(f"Added file match for {file_name}") + + self.print_debug( + f"Match summary complete: {len(gitlab_matches_summary.files)} file matches, " + f"{len(gitlab_matches_summary.snippet)} snippet matches" + ) return gitlab_matches_summary @@ -212,14 +234,23 @@ def _markdown(self, gitlab_matches_summary: ComponentMatchSummary) -> str: :param gitlab_matches_summary: Container with categorized file and snippet matches to format :return: Complete Markdown document with formatted match tables """ + self.print_debug("Generating Markdown from match summaries") if len(gitlab_matches_summary.files) == 0 and len(gitlab_matches_summary.snippet) == 0: + self.print_debug("No matches to format - returning empty string") return "" + self.print_trace( + f"Formatting {len(gitlab_matches_summary.files)} file matches and " + f"{len(gitlab_matches_summary.snippet)} snippet matches" + ) + # Define table headers file_match_headers = ['File', 'License', 'Similarity', 'PURL', 'Version'] snippet_match_headers = ['File', 'License', 'Similarity', 'PURL', 'Version', 'Lines'] + # Build file matches table + self.print_trace("Building file matches table") file_match_rows = [] for file_match in gitlab_matches_summary.files: row = [ @@ -233,6 +264,7 @@ def _markdown(self, gitlab_matches_summary: ComponentMatchSummary) -> str: file_match_table = generate_table(file_match_headers, file_match_rows) # Build snippet matches table + self.print_trace("Building snippet matches table") snippet_match_rows = [] for snippet_match in gitlab_matches_summary.snippet: row = [ @@ -262,6 +294,8 @@ def _markdown(self, gitlab_matches_summary: ComponentMatchSummary) -> str: markdown += snippet_match_table markdown += "\n\n" + self.print_trace(f"Markdown generation complete (length: {len(markdown)} characters)") + self.print_debug("Match summary Markdown generation complete") return markdown def run(self): @@ -275,16 +309,33 @@ def run(self): 3. Generates Markdown report 4. Outputs to file or stdout """ + self.print_debug("Starting match summary generation process") + self.print_trace( + f"Configuration - Results path: {self.scanoss_results_path}, Output: {self.output}, " + f"Line range prefix: {self.line_range_prefix}" + ) + # Load and process scan results into categorized matches + self.print_trace("Loading and processing scan results") matches = self._get_matches_summary() # Format matches as GitLab-compatible Markdown + self.print_trace("Generating Markdown output") matches_md = self._markdown(matches) if matches_md == "": + self.print_debug("No matches found - exiting") self.print_stdout("No matches found.") return + # Output to file or stdout + self.print_trace("Writing output") + if self.output: + self.print_debug(f"Writing match summary to file: {self.output}") + else: + self.print_debug("Writing match summary to 'stdout'") + self.print_to_file_or_stdout(matches_md, self.output) + self.print_debug("Match summary generation complete") diff --git a/src/scanoss/inspection/raw/raw_base.py b/src/scanoss/inspection/utils/scan_result_processor.py similarity index 88% rename from src/scanoss/inspection/raw/raw_base.py rename to src/scanoss/inspection/utils/scan_result_processor.py index 8a1a6f8d..22333b5d 100644 --- a/src/scanoss/inspection/raw/raw_base.py +++ b/src/scanoss/inspection/utils/scan_result_processor.py @@ -22,11 +22,10 @@ THE SOFTWARE. """ -from abc import abstractmethod from enum import Enum from typing import Any, Dict, TypeVar -from ..policy_check import PolicyCheck +from ...scanossbase import ScanossBase from ..utils.file_utils import load_json_file from ..utils.license_utils import LicenseUtil @@ -51,12 +50,13 @@ class ComponentID(Enum): # T = TypeVar('T') -class RawBase(PolicyCheck[T]): +class ScanResultProcessor(ScanossBase): """ - A base class to perform inspections over scan results. + A utility class for processing and transforming scan results. - This class provides a basic for scan results inspection, including methods for - processing scan results components and licenses. + This class provides functionality for processing scan results, including methods for + loading, parsing, extracting, and aggregating component and license data from scan results. + It serves as a shared data processing layer used by both policy checks and summary generators. Inherits from: ScanossBase: A base class providing common functionality for SCANOSS-related operations. @@ -67,40 +67,19 @@ def __init__( # noqa: PLR0913 debug: bool = False, trace: bool = False, quiet: bool = False, - format_type: str = None, - filepath: str = None, - output: str = None, - status: str = None, - name: str = None, + result_file_path: str = None, + include: str = None, + exclude: str = None, + explicit: str = None, ): - super().__init__(debug, trace, quiet, format_type,status, name, output) + super().__init__(debug, trace, quiet) + self.result_file_path = result_file_path self.license_util = LicenseUtil() - self.filepath = filepath - self.output = output + self.license_util.init(include, exclude, explicit) self.results = self._load_input_file() - @abstractmethod - def _get_components(self): - """ - Retrieve and process components from the preloaded results. - - This method performs the following steps: - 1. Checks if the results have been previously loaded (self.results). - 2. Extracts and processes components from the loaded results. - - :return: A list of processed components, or None if an error occurred during any step. - - Possible reasons for returning None include: - - Results not loaded (self.results is None) - - Failure to extract components from the results - - Note: - - This method assumes that the results have been previously loaded and stored in self.results. - - Implementations must extract components (e.g. via `_get_components_data`, - `_get_dependencies_data`, or other helpers). - - If `self.results` is `None`, simply return `None`. - """ - pass + def get_results(self) -> Dict[str, Any]: + return self.results def _append_component(self, components: Dict[str, Any], new_component: Dict[str, Any]) -> Dict[str, Any]: """ @@ -213,7 +192,7 @@ def _update_component_counters(self, component, status): else: component['undeclared'] += 1 - def _get_components_data(self, results: Dict[str, Any], components: Dict[str, Any]) -> Dict[str, Any]: + def get_components_data(self, components: Dict[str, Any]) -> Dict[str, Any]: """ Extract and process file and snippet components from results. @@ -230,11 +209,11 @@ def _get_components_data(self, results: Dict[str, Any], components: Dict[str, An which tracks the number of occurrences of each license Args: - results: A dictionary containing the raw results of a component scan + components: A dictionary containing the raw results of a component scan Returns: Updated components dictionary with file and snippet data """ - for component in results.values(): + for component in self.results.values(): for c in component: component_id = c.get('id') if not component_id: @@ -266,15 +245,13 @@ def _get_components_data(self, results: Dict[str, Any], components: Dict[str, An # End components loop return components - def _get_dependencies_data(self, results: Dict[str, Any], components: Dict[str, Any]) -> Dict[str, Any]: + def get_dependencies_data(self,components: Dict[str, Any]) -> Dict[str, Any]: """ Extract and process dependency components from results. - - :param results: A dictionary containing the raw results of a component scan :param components: Existing components dictionary to update :return: Updated components dictionary with dependency data """ - for component in results.values(): + for component in self.results.values(): for c in component: component_id = c.get('id') if not component_id: @@ -313,12 +290,12 @@ def _load_input_file(self): Dict[str, Any]: The parsed JSON data """ try: - return load_json_file(self.filepath) + return load_json_file(self.result_file_path) except Exception as e: self.print_stderr(f'ERROR: Problem parsing input JSON: {e}') return None - def _convert_components_to_list(self, components: dict): + def convert_components_to_list(self, components: dict): if components is None: self.print_debug(f'WARNING: Components is empty {self.results}') return None @@ -372,7 +349,7 @@ def _get_licenses_order_by_source_priority(self,licenses_data): self.print_debug("No priority sources found, returning all licenses as list") return licenses_data - def _group_components_by_license(self,components): + def group_components_by_license(self,components): """ Groups components by their unique component-license pairs. @@ -425,5 +402,5 @@ def _group_components_by_license(self,components): # -# End of PolicyCheck Class -# +# End of ScanResultProcessor Class +# \ No newline at end of file diff --git a/tests/test_policy_inspect.py b/tests/test_policy_inspect.py index 24db1ab7..0ccf7886 100644 --- a/tests/test_policy_inspect.py +++ b/tests/test_policy_inspect.py @@ -28,12 +28,14 @@ import unittest from unittest.mock import Mock, patch -from src.scanoss.inspection.policy_check import PolicyStatus -from src.scanoss.inspection.raw.component_summary import ComponentSummary -from src.scanoss.inspection.raw.copyleft import Copyleft -from src.scanoss.inspection.raw.license_summary import LicenseSummary -from src.scanoss.inspection.raw.undeclared_component import UndeclaredComponent -from src.scanoss.inspection.dependency_track.project_violation import DependencyTrackProjectViolationPolicyCheck +from src.scanoss.inspection.policy_check.dependency_track.project_violation import ( + DependencyTrackProjectViolationPolicyCheck, +) +from src.scanoss.inspection.policy_check.policy_check import PolicyStatus +from src.scanoss.inspection.policy_check.scanoss.copyleft import Copyleft +from src.scanoss.inspection.policy_check.scanoss.undeclared_component import UndeclaredComponent +from src.scanoss.inspection.summary.component_summary import ComponentSummary +from src.scanoss.inspection.summary.license_summary import LicenseSummary class MyTestCase(unittest.TestCase): @@ -67,11 +69,11 @@ def test_empty_copyleft_policy(self): file_name = 'result-no-copyleft.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='json') - status, data = copyleft.run() - details = json.loads(data['details']) + status, policy_output = copyleft.run() + details = json.loads(policy_output.details) self.assertEqual(status, PolicyStatus.POLICY_SUCCESS.value) self.assertEqual(details, {}) - self.assertEqual(data['summary'], '0 component(s) with copyleft licenses were found.\n') + self.assertEqual(policy_output.summary, '0 component(s) with copyleft licenses were found.\n') """ Inspect for copyleft licenses include @@ -82,9 +84,9 @@ def test_copyleft_policy_include(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='json', include='MIT') - status, data = copyleft.run() + status, policy_output = copyleft.run() has_mit_license = False - details = json.loads(data['details']) + details = json.loads(policy_output.details) for component in details['components']: for license in component['licenses']: if license['spdxid'] == 'MIT': @@ -103,8 +105,8 @@ def test_copyleft_policy_exclude(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='json', exclude='GPL-2.0-only') - status, data = copyleft.run() - results = json.loads(data['details']) + status, policy_output = copyleft.run() + results = json.loads(policy_output.details) self.assertEqual(results, {}) self.assertEqual(status, PolicyStatus.POLICY_SUCCESS.value) @@ -117,8 +119,8 @@ def test_copyleft_policy_explicit(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='json', explicit='MIT') - status, data = copyleft.run() - results = json.loads(data['details']) + status, policy_output = copyleft.run() + results = json.loads(policy_output.details) self.assertEqual(len(results['components']), 2) self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) @@ -131,8 +133,8 @@ def test_copyleft_policy_empty_explicit(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='json', explicit='') - status, data = copyleft.run() - results = json.loads(data['details']) + status, policy_output = copyleft.run() + results = json.loads(policy_output.details) self.assertEqual(len(results['components']), 5) self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) @@ -145,7 +147,7 @@ def test_copyleft_policy_markdown(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='md', explicit='MIT') - status, data = copyleft.run() + status, policy_output = copyleft.run() expected_detail_output = ( '### Copyleft Licenses \n | Component | License | URL | Copyleft |\n' ' | - | :-: | - | - |\n' @@ -154,10 +156,10 @@ def test_copyleft_policy_markdown(self): ) expected_summary_output = '2 component(s) with copyleft licenses were found.\n' self.assertEqual( - re.sub(r'\s|\\(?!`)|\\(?=`)', '', data['details']), + re.sub(r'\s|\\(?!`)|\\(?=`)', '', policy_output.details), re.sub(r'\s|\\(?!`)|\\(?=`)', '', expected_detail_output), ) - self.assertEqual(data['summary'], expected_summary_output) + self.assertEqual(policy_output.summary, expected_summary_output) self.assertEqual(status, PolicyStatus.POLICY_FAIL.value) ## Undeclared Components Policy Tests ## @@ -180,9 +182,9 @@ def test_undeclared_policy(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) undeclared = UndeclaredComponent(filepath=input_file_name, format_type='json', sbom_format='legacy') - status, data = undeclared.run() - results = json.loads(data['details']) - summary = data['summary'] + status, policy_output = undeclared.run() + results = json.loads(policy_output.details) + summary = policy_output.summary expected_summary_output = """3 undeclared component(s) were found. Add the following snippet into your `sbom.json` file ```json @@ -215,9 +217,9 @@ def test_undeclared_policy_markdown(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) undeclared = UndeclaredComponent(filepath=input_file_name, format_type='md', sbom_format='legacy') - status, data = undeclared.run() - results = data['details'] - summary = data['summary'] + status, policy_output = undeclared.run() + results = policy_output.details + summary = policy_output.summary expected_details_output = """ ### Undeclared components | Component | License | | - | - | @@ -259,9 +261,9 @@ def test_undeclared_policy_markdown_scanoss_summary(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) undeclared = UndeclaredComponent(filepath=input_file_name, format_type='md') - status, data = undeclared.run() - results = data['details'] - summary = data['summary'] + status, policy_output = undeclared.run() + results = policy_output.details + summary = policy_output.summary expected_details_output = """ ### Undeclared components | Component | License | | - | - | @@ -306,9 +308,9 @@ def test_undeclared_policy_scanoss_summary(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) undeclared = UndeclaredComponent(filepath=input_file_name) - status, data = undeclared.run() - results = json.loads(data['details']) - summary = data['summary'] + status, policy_output = undeclared.run() + results = json.loads(policy_output.details) + summary = policy_output.summary expected_summary_output = """3 undeclared component(s) were found. Add the following snippet into your `scanoss.json` file @@ -340,9 +342,9 @@ def test_undeclared_policy_jira_markdown_output(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) undeclared = UndeclaredComponent(filepath=input_file_name, format_type='jira_md') - status, data = undeclared.run() - details = data['details'] - summary = data['summary'] + status, policy_output = undeclared.run() + details = policy_output.details + summary = policy_output.summary expected_details_output = """|*Component*|*License*| |pkg:github/scanoss/jenkins-pipeline-example|unknown| |pkg:github/scanoss/scanner.c|GPL-2.0-only| @@ -377,8 +379,8 @@ def test_copyleft_policy_jira_markdown_output(self): file_name = 'result.json' input_file_name = os.path.join(script_dir, 'data', file_name) copyleft = Copyleft(filepath=input_file_name, format_type='jira_md') - status, data = copyleft.run() - results = data['details'] + status, policy_output = copyleft.run() + results = policy_output.details expected_details_output = """### Copyleft Licenses\n|*Component*|*License*|*URL*|*Copyleft*| |pkg:github/scanoss/scanner.c|GPL-2.0-only|https://spdx.org/licenses/GPL-2.0-only.html|YES| |pkg:github/scanoss/engine|GPL-2.0-only|https://spdx.org/licenses/GPL-2.0-only.html|YES| @@ -436,7 +438,7 @@ def test_inspect_component_summary_empty_result(self): ## Dependency Track Project Violation Policy Tests ## - @patch('src.scanoss.inspection.dependency_track.project_violation.DependencyTrackService') + @patch('src.scanoss.inspection.policy_check.dependency_track.project_violation.DependencyTrackService') def test_dependency_track_project_violation_json_formatter(self, mock_service): mock_service.return_value = Mock() project_violation = DependencyTrackProjectViolationPolicyCheck( @@ -464,14 +466,12 @@ def test_dependency_track_project_violation_json_formatter(self, mock_service): } ] result = project_violation._json(test_violations) - self.assertIn('details', result) - self.assertIn('summary', result) - self.assertEqual(result['summary'], '1 policy violations were found.\n') - details = json.loads(result['details']) + self.assertEqual(result.summary, '1 policy violations were found.\n') + details = json.loads(result.details) self.assertEqual(len(details), 1) self.assertEqual(details[0]['type'], 'SECURITY') - @patch('src.scanoss.inspection.dependency_track.project_violation.DependencyTrackService') + @patch('src.scanoss.inspection.policy_check.dependency_track.project_violation.DependencyTrackService') def test_dependency_track_project_violation_markdown_formatter(self, mock_service): mock_service.return_value = Mock() project_violation = DependencyTrackProjectViolationPolicyCheck( @@ -499,16 +499,14 @@ def test_dependency_track_project_violation_markdown_formatter(self, mock_servic } ] result = project_violation._markdown(test_violations) - self.assertIn('details', result) - self.assertIn('summary', result) - self.assertEqual(result['summary'], '1 policy violations were found.\n') - self.assertIn('State', result['details']) - self.assertIn('Risk Type', result['details']) - self.assertIn('Policy Name', result['details']) - self.assertIn('Component', result['details']) - self.assertIn('Date', result['details']) - - @patch('src.scanoss.inspection.dependency_track.project_violation.DependencyTrackService') + self.assertEqual(result.summary, '1 policy violations were found.\n') + self.assertIn('State', result.details) + self.assertIn('Risk Type', result.details) + self.assertIn('Policy Name', result.details) + self.assertIn('Component', result.details) + self.assertIn('Date', result.details) + + @patch('src.scanoss.inspection.policy_check.dependency_track.project_violation.DependencyTrackService') def test_dependency_track_project_violation_sort_violations(self, mock_service): mock_service.return_value = Mock() project_violation = DependencyTrackProjectViolationPolicyCheck( @@ -528,7 +526,7 @@ def test_dependency_track_project_violation_sort_violations(self, mock_service): self.assertEqual(sorted_violations[2]['type'], 'LICENSE') self.assertEqual(sorted_violations[3]['type'], 'OTHER') - @patch('src.scanoss.inspection.dependency_track.project_violation.DependencyTrackService') + @patch('src.scanoss.inspection.policy_check.dependency_track.project_violation.DependencyTrackService') def test_dependency_track_project_violation_empty_violations(self, mock_service): mock_service.return_value = Mock() project_violation = DependencyTrackProjectViolationPolicyCheck( @@ -539,11 +537,11 @@ def test_dependency_track_project_violation_empty_violations(self, mock_service) ) empty_violations = [] result = project_violation._json(empty_violations) - self.assertEqual(result['summary'], '0 policy violations were found.\n') - details = json.loads(result['details']) + self.assertEqual(result.summary, '0 policy violations were found.\n') + details = json.loads(result.details) self.assertEqual(len(details), 0) - @patch('src.scanoss.inspection.dependency_track.project_violation.DependencyTrackService') + @patch('src.scanoss.inspection.policy_check.dependency_track.project_violation.DependencyTrackService') def test_dependency_track_project_violation_markdown_empty(self, mock_service): mock_service.return_value = Mock() project_violation = DependencyTrackProjectViolationPolicyCheck( @@ -554,11 +552,11 @@ def test_dependency_track_project_violation_markdown_empty(self, mock_service): ) empty_violations = [] result = project_violation._markdown(empty_violations) - self.assertEqual(result['summary'], '0 policy violations were found.\n') - self.assertIn('State', result['details']) - self.assertIn('Risk Type', result['details']) + self.assertEqual(result.summary, '0 policy violations were found.\n') + self.assertIn('State', result.details) + self.assertIn('Risk Type', result.details) - @patch('src.scanoss.inspection.dependency_track.project_violation.DependencyTrackService') + @patch('src.scanoss.inspection.policy_check.dependency_track.project_violation.DependencyTrackService') def test_dependency_track_project_violation_multiple_types(self, mock_service): mock_service.return_value = Mock() project_violation = DependencyTrackProjectViolationPolicyCheck( @@ -602,8 +600,8 @@ def test_dependency_track_project_violation_multiple_types(self, mock_service): } ] result = project_violation._json(test_violations) - self.assertEqual(result['summary'], '2 policy violations were found.\n') - details = json.loads(result['details']) + self.assertEqual(result.summary, '2 policy violations were found.\n') + details = json.loads(result.details) self.assertEqual(len(details), 2) if __name__ == '__main__': diff --git a/tools/linter.sh b/tools/linter.sh new file mode 100755 index 00000000..04d1e76d --- /dev/null +++ b/tools/linter.sh @@ -0,0 +1,50 @@ +#!/usr/bin/env bash +# SPDX-License-Identifier: MIT +# +# Lint Python files changed since merge base with origin/main +# Usage: linter.sh [--fix] [--docker] + +set -e + +# Parse arguments +FIX_FLAG="" +USE_DOCKER=false + +while [[ $# -gt 0 ]]; do + case $1 in + --fix) + FIX_FLAG="--fix" + shift + ;; + --docker) + USE_DOCKER=true + shift + ;; + *) + echo "Unknown option: $1" + echo "Usage: $0 [--fix] [--docker]" + exit 1 + ;; + esac +done + +# Find merge base with origin/main +merge_base=$(git merge-base origin/main HEAD) + +# Get all changed Python files since merge base +files=$(git diff --name-only "$merge_base" HEAD | grep '\.py$' || true) + +# Check if there are any Python files changed +if [ -z "$files" ]; then + echo "No Python files changed" + exit 0 +fi + +# Run linter +if [ "$USE_DOCKER" = true ]; then + # Run with Docker + docker run --rm -v "$(pwd)":/src -w /src ghcr.io/astral-sh/ruff:0.14.2 check ${files} ${FIX_FLAG} +else + # Run locally + python3 -m ruff check ${files} ${FIX_FLAG} +fi \ No newline at end of file