Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/codemodder/codemodder.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from codemodder.project_analysis.python_repo_manager import PythonRepoManager
from codemodder.report.codetf_reporter import report_default
from codemodder.result import ResultSet
from codemodder.sarifs import detect_sarif_tools
from codemodder.semgrep import run as run_semgrep


Expand Down Expand Up @@ -156,8 +157,12 @@ def run(original_args) -> int:
logger.info("codemodder: python/%s", __version__)
logger.info("command: %s %s", Path(sys.argv[0]).name, " ".join(original_args))

tool_result_files_map = {"sonar": argv.sonar_issues_json}
# TODO find the tool name in the --sarif files here and populate the dict
# TODO: sonar files should be _parsed_ here as well
# TODO: this should be dict[str, list[Path]]
tool_result_files_map: dict[str, list[str]] = detect_sarif_tools(
[Path(name) for name in argv.sarif or []]
)
tool_result_files_map["sonar"] = argv.sonar_issues_json

repo_manager = PythonRepoManager(Path(argv.directory))
context = CodemodExecutionContext(
Expand Down
4 changes: 3 additions & 1 deletion src/codemodder/codemods/base_codemod.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ def _process_file(
findings_for_rule = []
for rule in rules:
findings_for_rule.extend(
results.results_for_rule_and_file(rule, filename)
results.results_for_rule_and_file(context, rule, filename)
)

file_context = FileContext(
Expand All @@ -199,6 +199,8 @@ def _process_file(
findings_for_rule,
)

# TODO: for SAST tools we should preemtively filter out files that are not part of the result set

if change_set := self.transformer.apply(
context, file_context, findings_for_rule
):
Expand Down
2 changes: 0 additions & 2 deletions src/codemodder/codemods/base_visitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ def lineno_for_node(self, node):


class BaseTransformer(VisitorBasedCodemodCommand, UtilsMixin):

def __init__(
self,
context,
Expand All @@ -65,7 +64,6 @@ def __init__(


class BaseVisitor(ContextAwareVisitor, UtilsMixin):

def __init__(
self,
context,
Expand Down
45 changes: 34 additions & 11 deletions src/codemodder/result.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from typing import Any, TYPE_CHECKING

import libcst as cst
from libcst._position import CodeRange

from .utils.abc_dataclass import ABCDataclass

if TYPE_CHECKING:
from codemodder.context import CodemodExecutionContext


@dataclass
class LineInfo:
Expand All @@ -24,24 +31,40 @@ class Result(ABCDataclass):
rule_id: str
locations: list[Location]

def match_location(self, pos, node):
for location in self.locations:
start_column = location.start.column
end_column = location.end.column
return (
pos.start.line == location.start.line
and (pos.start.column in (start_column - 1, start_column))
and pos.end.line == location.end.line
and (pos.end.column in (end_column - 1, end_column))
def match_location(self, pos: CodeRange, node: cst.CSTNode) -> bool:
del node
return any(
pos.start.line == location.start.line
and (
pos.start.column
in ((start_column := location.start.column) - 1, start_column)
)
and pos.end.line == location.end.line
and (
pos.end.column in ((end_column := location.end.column) - 1, end_column)
)
for location in self.locations
)


class ResultSet(dict[str, dict[Path, list[Result]]]):
def add_result(self, result: Result):
for loc in result.locations:
self.setdefault(result.rule_id, {}).setdefault(loc.file, []).append(result)

def results_for_rule_and_file(self, rule_id: str, file: Path) -> list[Result]:
def results_for_rule_and_file(
self, context: CodemodExecutionContext, rule_id: str, file: Path
) -> list[Result]:
"""
Return list of results for a given rule and file.

:param context: The codemod execution context
:param rule_id: The rule ID
:param file: The filename

Some implementers may need to use the context to compute paths that are relative to the target directory.
"""
del context
return self.get(rule_id, {}).get(file, [])

def files_for_rule(self, rule_id: str) -> list[Path]:
Expand Down
33 changes: 33 additions & 0 deletions src/codemodder/sarifs.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,44 @@
from abc import ABCMeta, abstractmethod
from importlib.metadata import entry_points
import json
from pathlib import Path
from typing import Optional

from typing_extensions import Self

from codemodder.logging import logger
from .result import ResultSet, Result, Location, LineInfo


class AbstractSarifToolDetector(metaclass=ABCMeta):
@classmethod
@abstractmethod
def detect(cls, run_data: dict) -> bool:
pass


def detect_sarif_tools(filenames: list[Path]) -> dict[str, list[str]]:
results: dict[str, list[str]] = {}

logger.debug("loading registered SARIF tool detectors")
detectors = {
ent.name: ent.load() for ent in entry_points().select(group="sarif_detectors")
}
for fname in filenames:
data = json.loads(fname.read_text())
for name, det in detectors.items():
# TODO: handle malformed sarif?
for run in data["runs"]:
try:
if det.detect(run):
logger.debug("detected %s sarif: %s", name, fname)
results.setdefault(name, []).append(str(fname))
except (KeyError, AttributeError, ValueError):
continue

return results


def extract_rule_id(result, sarif_run) -> Optional[str]:
if "ruleId" in result:
# semgrep preprends the folders into the rule-id, we want the base name only
Expand All @@ -21,6 +53,7 @@ def extract_rule_id(result, sarif_run) -> Optional[str]:
return None


# NOTE: These Sarif classes are actually specific to Semgrep and should be moved elsewhere
class SarifLocation(Location):
@classmethod
def from_sarif(cls, sarif_location) -> Self:
Expand Down