diff --git a/src/codemodder/codemodder.py b/src/codemodder/codemodder.py index ff886dee..5528194b 100644 --- a/src/codemodder/codemodder.py +++ b/src/codemodder/codemodder.py @@ -19,6 +19,7 @@ from codemodder.project_analysis.python_repo_manager import PythonRepoManager from codemodder.report.codetf_reporter import report_default from codemodder.result import ResultSet +from codemodder.sarifs import detect_sarif_tools from codemodder.semgrep import run as run_semgrep @@ -156,8 +157,12 @@ def run(original_args) -> int: logger.info("codemodder: python/%s", __version__) logger.info("command: %s %s", Path(sys.argv[0]).name, " ".join(original_args)) - tool_result_files_map = {"sonar": argv.sonar_issues_json} - # TODO find the tool name in the --sarif files here and populate the dict + # TODO: sonar files should be _parsed_ here as well + # TODO: this should be dict[str, list[Path]] + tool_result_files_map: dict[str, list[str]] = detect_sarif_tools( + [Path(name) for name in argv.sarif or []] + ) + tool_result_files_map["sonar"] = argv.sonar_issues_json repo_manager = PythonRepoManager(Path(argv.directory)) context = CodemodExecutionContext( diff --git a/src/codemodder/codemods/base_codemod.py b/src/codemodder/codemods/base_codemod.py index a8e4d36f..ca3025ea 100644 --- a/src/codemodder/codemods/base_codemod.py +++ b/src/codemodder/codemods/base_codemod.py @@ -188,7 +188,7 @@ def _process_file( findings_for_rule = [] for rule in rules: findings_for_rule.extend( - results.results_for_rule_and_file(rule, filename) + results.results_for_rule_and_file(context, rule, filename) ) file_context = FileContext( @@ -199,6 +199,8 @@ def _process_file( findings_for_rule, ) + # TODO: for SAST tools we should preemtively filter out files that are not part of the result set + if change_set := self.transformer.apply( context, file_context, findings_for_rule ): diff --git a/src/codemodder/codemods/base_visitor.py b/src/codemodder/codemods/base_visitor.py index 539b2920..cfd243d1 100644 --- a/src/codemodder/codemods/base_visitor.py +++ b/src/codemodder/codemods/base_visitor.py @@ -52,7 +52,6 @@ def lineno_for_node(self, node): class BaseTransformer(VisitorBasedCodemodCommand, UtilsMixin): - def __init__( self, context, @@ -65,7 +64,6 @@ def __init__( class BaseVisitor(ContextAwareVisitor, UtilsMixin): - def __init__( self, context, diff --git a/src/codemodder/result.py b/src/codemodder/result.py index 7e38f43d..8320b07d 100644 --- a/src/codemodder/result.py +++ b/src/codemodder/result.py @@ -1,9 +1,16 @@ +from __future__ import annotations from dataclasses import dataclass from pathlib import Path -from typing import Any +from typing import Any, TYPE_CHECKING + +import libcst as cst +from libcst._position import CodeRange from .utils.abc_dataclass import ABCDataclass +if TYPE_CHECKING: + from codemodder.context import CodemodExecutionContext + @dataclass class LineInfo: @@ -24,16 +31,20 @@ class Result(ABCDataclass): rule_id: str locations: list[Location] - def match_location(self, pos, node): - for location in self.locations: - start_column = location.start.column - end_column = location.end.column - return ( - pos.start.line == location.start.line - and (pos.start.column in (start_column - 1, start_column)) - and pos.end.line == location.end.line - and (pos.end.column in (end_column - 1, end_column)) + def match_location(self, pos: CodeRange, node: cst.CSTNode) -> bool: + del node + return any( + pos.start.line == location.start.line + and ( + pos.start.column + in ((start_column := location.start.column) - 1, start_column) + ) + and pos.end.line == location.end.line + and ( + pos.end.column in ((end_column := location.end.column) - 1, end_column) ) + for location in self.locations + ) class ResultSet(dict[str, dict[Path, list[Result]]]): @@ -41,7 +52,19 @@ def add_result(self, result: Result): for loc in result.locations: self.setdefault(result.rule_id, {}).setdefault(loc.file, []).append(result) - def results_for_rule_and_file(self, rule_id: str, file: Path) -> list[Result]: + def results_for_rule_and_file( + self, context: CodemodExecutionContext, rule_id: str, file: Path + ) -> list[Result]: + """ + Return list of results for a given rule and file. + + :param context: The codemod execution context + :param rule_id: The rule ID + :param file: The filename + + Some implementers may need to use the context to compute paths that are relative to the target directory. + """ + del context return self.get(rule_id, {}).get(file, []) def files_for_rule(self, rule_id: str) -> list[Path]: diff --git a/src/codemodder/sarifs.py b/src/codemodder/sarifs.py index 56a38080..39b6fcf4 100644 --- a/src/codemodder/sarifs.py +++ b/src/codemodder/sarifs.py @@ -1,12 +1,44 @@ +from abc import ABCMeta, abstractmethod +from importlib.metadata import entry_points import json from pathlib import Path from typing import Optional from typing_extensions import Self +from codemodder.logging import logger from .result import ResultSet, Result, Location, LineInfo +class AbstractSarifToolDetector(metaclass=ABCMeta): + @classmethod + @abstractmethod + def detect(cls, run_data: dict) -> bool: + pass + + +def detect_sarif_tools(filenames: list[Path]) -> dict[str, list[str]]: + results: dict[str, list[str]] = {} + + logger.debug("loading registered SARIF tool detectors") + detectors = { + ent.name: ent.load() for ent in entry_points().select(group="sarif_detectors") + } + for fname in filenames: + data = json.loads(fname.read_text()) + for name, det in detectors.items(): + # TODO: handle malformed sarif? + for run in data["runs"]: + try: + if det.detect(run): + logger.debug("detected %s sarif: %s", name, fname) + results.setdefault(name, []).append(str(fname)) + except (KeyError, AttributeError, ValueError): + continue + + return results + + def extract_rule_id(result, sarif_run) -> Optional[str]: if "ruleId" in result: # semgrep preprends the folders into the rule-id, we want the base name only @@ -21,6 +53,7 @@ def extract_rule_id(result, sarif_run) -> Optional[str]: return None +# NOTE: These Sarif classes are actually specific to Semgrep and should be moved elsewhere class SarifLocation(Location): @classmethod def from_sarif(cls, sarif_location) -> Self: