In [None]:
import os
import shutil

# === Paths ===
input_dir = "../ground-truth-vulnerabilities/Vul_Database"
output_dir = os.path.join(input_dir, "collected-workflows")
os.makedirs(output_dir, exist_ok=True)

# === Collect YAML workflow files ===
for subdir, _, files in os.walk(input_dir):
    for file in files:
        if file.endswith((".yml", ".yaml")):
            src_path = os.path.join(subdir, file)
            rel_dir = os.path.relpath(subdir, input_dir).replace(os.sep, "-")
            dst_filename = f"{rel_dir}__{file}"
            dst_path = os.path.join(output_dir, dst_filename)
            shutil.copy2(src_path, dst_path)
            print(f"Copied: {src_path} -> {dst_path}")

print("YAML files collected in 'collected-workflows'.")


Copied: ../ground-truth-vulnerabilities/Vul_Database/1-GHSA-4mgv-m5cm-f9h7/terraform.yml -> ../ground-truth-vulnerabilities/Vul_Database/collected-workflows/1-GHSA-4mgv-m5cm-f9h7__terraform.yml
Copied: ../ground-truth-vulnerabilities/Vul_Database/4-GHSA-4xqx-pqpj-9fqw/known-vulnerable-actions.yml -> ../ground-truth-vulnerabilities/Vul_Database/collected-workflows/4-GHSA-4xqx-pqpj-9fqw__known-vulnerable-actions.yml
Copied: ../ground-truth-vulnerabilities/Vul_Database/10-GHSA-h3qr-39j9-4r5v/gradle-build.yml -> ../ground-truth-vulnerabilities/Vul_Database/collected-workflows/10-GHSA-h3qr-39j9-4r5v__gradle-build.yml
Copied: ../ground-truth-vulnerabilities/Vul_Database/22-GHSA-vqf5-2xx6-9wfm/codeql-analysis.yml -> ../ground-truth-vulnerabilities/Vul_Database/collected-workflows/22-GHSA-vqf5-2xx6-9wfm__codeql-analysis.yml
Copied: ../ground-truth-vulnerabilities/Vul_Database/27-GHSA-2487-9f55-2vg9/action.yml -> ../ground-truth-vulnerabilities/Vul_Database/collected-workflows/27-GHSA-2487-9f55

RUN THE TOOLS : 

poutine :

In [2]:
import json
from pathlib import Path
from collections import defaultdict

# Input and output paths
input_file = Path("../ground-truth-vulnerabilities/tools_output/poutine/findings.json")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/poutine/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Load raw findings
with open(input_file, "r", encoding="utf-8") as f:
    data = json.load(f)

all_findings = data.get("findings", [])

# Group findings by workflow file
grouped = defaultdict(list)
for finding in all_findings:
    path = finding.get("meta", {}).get("path")
    if path:
        workflow_file = Path(path).name
        grouped[workflow_file].append(finding)

# Save per-workflow JSON files for workflows with issues
for workflow_file, findings in grouped.items():
    if not findings:
        continue

    rule_summary = {}
    for f in findings:
        rule_id = f.get("rule_id")
        if rule_id:
            rule_summary[rule_id] = rule_summary.get(rule_id, 0) + 1

    output_data = {
        "workflow": workflow_file,
        "tool": "poutine",
        "summary": {
            "total_findings": len(findings),
            "by_rule": rule_summary
        },
        "findings": findings
    }

    output_path = output_dir / f"{workflow_file}.json"
    with open(output_path, "w", encoding="utf-8") as f_out:
        json.dump(output_data, f_out, indent=2)

print(f"Saved {len(grouped)} workflow files to {output_dir}")


Saved 13 workflow files to ../ground-truth-vulnerabilities/tools_output/poutine/workflow_with_issues


actionlint :

In [3]:
import re
import json
from pathlib import Path
from collections import defaultdict

# Input and output paths
input_file = Path("../ground-truth-vulnerabilities/tools_output/actionlint/findings.txt")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/actionlint/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Parse findings.txt and group by workflow filename
grouped = defaultdict(list)

with open(input_file, "r", encoding="utf-8") as f:
    for line in f:
        # Example format: .github/workflows/deploy.yml:8:1: some issue description
        match = re.match(r"(.+?):(\d+):(\d+):\s+(.*)", line)
        if match:
            path, line_num, col_num, message = match.groups()
            workflow_file = Path(path).name
            grouped[workflow_file].append({
                "line": int(line_num),
                "column": int(col_num),
                "message": message.strip()
            })

# Save JSON output for workflows with findings
for workflow_file, findings in grouped.items():
    if not findings:
        continue

    output_data = {
        "workflow": workflow_file,
        "tool": "actionlint",
        "summary": {
            "total_findings": len(findings)
        },
        "findings": findings
    }

    output_path = output_dir / f"{workflow_file}.json"
    with open(output_path, "w", encoding="utf-8") as f_out:
        json.dump(output_data, f_out, indent=2)

print(f"Saved {len(grouped)} workflow files to {output_dir}")


Saved 9 workflow files to ../ground-truth-vulnerabilities/tools_output/actionlint/workflow_with_issues


frizbee

In [30]:
import re
import json
from pathlib import Path

# Paths
original_dir = Path("../ground-truth-vulnerabilities/.github/workflows")
modified_dir = Path("../ground-truth-vulnerabilities/tools_output/frizbee/modified_workflows")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/frizbee/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Pattern to detect pinned actions with hash and version comment
pattern = re.compile(r"uses:\s+([\w\-./]+)@([a-f0-9]{10,})\s+#\s*(\S+)", re.IGNORECASE)

# Gather workflow names directly from modified_workflows directory
modified_files = [p.name for p in modified_dir.glob("*.yml")] + [p.name for p in modified_dir.glob("*.yaml")]

saved = 0

for wf_name in modified_files:
    original_path = original_dir / wf_name
    modified_path = modified_dir / wf_name

    if not original_path.exists() or not modified_path.exists():
        print(f"[!] Skipped: {wf_name} (missing original or modified)")
        continue

    original_lines = original_path.read_text(encoding="utf-8").splitlines()
    modified_lines = modified_path.read_text(encoding="utf-8").splitlines()

    findings = []

    # Compare line by line
    for i, (orig, mod) in enumerate(zip(original_lines, modified_lines)):
        match = pattern.search(mod)
        if match:
            action, sha, tag = match.groups()
            expected_unpinned = f"{action}@{tag}"
            actual_pinned = f"{action}@{sha}"
            if expected_unpinned in orig:
                findings.append({
                    "rule": "unpinned-github-actions",
                    "original": expected_unpinned,
                    "pinned": actual_pinned,
                    "line": i + 1
                })

    if not findings:
        continue

    summary = {
        "total_findings": len(findings),
        "by_rule": {"unpinned-github-actions": len(findings)}
    }

    result = {
        "workflow": wf_name,
        "tool": "frizbee",
        "summary": summary,
        "findings": findings
    }

    with open(output_dir / f"{wf_name}.json", "w", encoding="utf-8") as out_f:
        json.dump(result, out_f, indent=2)
        saved += 1

print(f"Saved {saved} normalized workflow result files to {output_dir}")


Saved 14 normalized workflow result files to ../ground-truth-vulnerabilities/tools_output/frizbee/workflow_with_issues


scharf :

In [5]:
import re
import json
from pathlib import Path
from collections import defaultdict

# Input/output paths
input_file = Path("../ground-truth-vulnerabilities/tools_output/scharf/findings.txt")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/scharf/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Regex to remove ANSI escape codes
ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")

# Read and clean lines
lines = []
with open(input_file, "r", encoding="utf-8") as f:
    for raw in f:
        clean = ansi_escape.sub("", raw).strip()
        if clean:
            lines.append(clean)

# Initialize parsing
grouped = defaultdict(list)
current_workflow = None

# Go through each line
for line in lines:
    # Detect workflow line
    if line.endswith(".yml") or line.endswith(".yaml"):
        current_workflow = Path(line).name
        continue

    # Detect and parse issue line (new format)
    if current_workflow and "[Line" in line:
        match = re.search(r"\[Line (\d+), Col (\d+)\] (.+)", line)
        if match:
            line_num, col_num, message = match.groups()
            grouped[current_workflow].append({
                "line": int(line_num),
                "column": int(col_num),
                "message": message
            })
        else:
            print(f"[NO MATCH] {line}")

# Save per-workflow JSONs
for workflow_file, findings in grouped.items():
    output_data = {
        "workflow": workflow_file,
        "tool": "scharf",
        "summary": {
            "total_findings": len(findings),
            "by_rule": {
                "unpinned-github-actions": len(findings)
            }
        },
        "findings": findings
    }

    out_path = output_dir / f"{workflow_file}.json"
    with open(out_path, "w", encoding="utf-8") as f_out:
        json.dump(output_data, f_out, indent=2)

print(f"Saved {len(grouped)} workflow result files to {output_dir}")


Saved 16 workflow result files to ../ground-truth-vulnerabilities/tools_output/scharf/workflow_with_issues


pinny 

In [6]:
import json
from pathlib import Path
from collections import defaultdict

# Paths
FINDINGS_FILE = Path("../ground-truth-vulnerabilities/tools_output/pinny/findings.txt")
WORKFLOWS_DIR = Path("../ground-truth-vulnerabilities/.github/workflows")
OUTPUT_DIR = Path("../ground-truth-vulnerabilities/tools_output/pinny/workflow_with_issues")
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

# Step 1: Extract all unpinned actions from findings.txt
unresolved_refs = set()
with open(FINDINGS_FILE, "r", encoding="utf-8") as f:
    for line in f:
        if "Branch references are being used" in line or "No exact match found for ref" in line:
            parts = line.strip().split(":")
            if parts:
                action = parts[-1].strip().strip("`")
                if action:
                    unresolved_refs.add(action)

print(f"Found {len(unresolved_refs)} unpinned actions in findings.txt")

# Step 2: Search these refs in all workflow files
findings_by_file = defaultdict(list)

yml_files = list(WORKFLOWS_DIR.glob("*.yml")) + list(WORKFLOWS_DIR.glob("*.yaml"))
for yml_file in yml_files:
    with open(yml_file, "r", encoding="utf-8") as f:
        lines = f.readlines()

    for idx, line in enumerate(lines, 1):
        for ref in unresolved_refs:
            if ref in line:
                findings_by_file[yml_file.name].append({
                    "rule_id": "unpinned-github-actions",
                    "meta": {
                        "action": ref,
                        "line_snippet": line.strip(),
                        "line_number": idx
                    }
                })

# Step 3: Save per-workflow normalized output
for wf_file, findings in findings_by_file.items():
    summary = {
        "total_findings": len(findings),
        "by_rule": {"unpinned-github-actions": len(findings)}
    }

    result = {
        "workflow": wf_file,
        "tool": "pinny",
        "summary": summary,
        "findings": findings
    }

    with open(OUTPUT_DIR / f"{wf_file}.json", "w", encoding="utf-8") as f_out:
        json.dump(result, f_out, indent=2)

print(f"Saved {len(findings_by_file)} workflow result files to {OUTPUT_DIR}")


Found 7 unpinned actions in findings.txt
Saved 2 workflow result files to ../ground-truth-vulnerabilities/tools_output/pinny/workflow_with_issues


zizmor

In [7]:
import re
import json
from pathlib import Path
from collections import defaultdict

# Paths
input_file = Path("../ground-truth-vulnerabilities/tools_output/zizmor/findings.txt")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/zizmor/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Patterns
warning_pattern = re.compile(r"warning\[(.+?)\]: (.+)")
location_pattern = re.compile(r"-->\s+\.github/workflows/(.+?):(\d+):(\d+)")
note_pattern = re.compile(r"= note: (.+)", re.IGNORECASE)

# Storage
grouped = defaultdict(list)
current = {}

# Read and parse
with input_file.open(encoding="utf-8") as f:
    for line in f:
        line = line.strip()

        if not line:
            continue

        # New finding block
        m = warning_pattern.match(line)
        if m:
            # Save previous
            if current.get("workflow"):
                grouped[current["workflow"]].append(current)
            # Start new
            rule_id, message = m.groups()
            current = {
                "rule_id": rule_id,
                "message": message,
                "workflow": None,
                "line": None,
                "column": None,
                "note": None
            }
            continue

        # Detect workflow file location
        m = location_pattern.match(line)
        if m:
            workflow, line_num, col_num = m.groups()
            current["workflow"] = workflow.strip()
            current["line"] = int(line_num)
            current["column"] = int(col_num)
            continue

        # Detect notes
        m = note_pattern.match(line)
        if m:
            current["note"] = m.group(1)
            continue

# Save last pending finding
if current.get("workflow"):
    grouped[current["workflow"]].append(current)

# Write outputs
for workflow_path, findings in grouped.items():
    workflow_file = Path(workflow_path).name

    rule_summary = {}
    for f in findings:
        rule = f["rule_id"]
        rule_summary[rule] = rule_summary.get(rule, 0) + 1

    result = {
        "workflow": workflow_file,
        "tool": "zizmor",
        "summary": {
            "total_findings": len(findings),
            "by_rule": rule_summary
        },
        "findings": findings
    }

    out_path = output_dir / f"{workflow_file}.json"
    with open(out_path, "w", encoding="utf-8") as out:
        json.dump(result, out, indent=2)

print(f"Saved {len(grouped)} workflow result files to {output_dir}")


Saved 22 workflow result files to ../ground-truth-vulnerabilities/tools_output/zizmor/workflow_with_issues


scorecard

In [32]:
import os
import shutil
import subprocess
import json
from pathlib import Path
from tqdm import tqdm

# Paths
WORKFLOW_DIR = Path("../ground-truth-vulnerabilities/.github/workflows")
TEMP_REPO_DIR = Path("scorecard_tmp_repo")
FINDINGS_PATH = Path("../ground-truth-vulnerabilities/tools_output/scorecard/findings_all.json")
FINDINGS_PATH.parent.mkdir(parents=True, exist_ok=True)

# Clean previous run
if TEMP_REPO_DIR.exists():
    shutil.rmtree(TEMP_REPO_DIR)
TEMP_REPO_DIR.mkdir(parents=True)
(TEMP_REPO_DIR / ".github" / "workflows").mkdir(parents=True, exist_ok=True)

all_workflow_results = []

yml_files = list(WORKFLOW_DIR.glob("*.yml")) + list(WORKFLOW_DIR.glob("*.yaml"))
print(f"Running Scorecard simulation on {len(yml_files)} workflows...")

for wf in tqdm(yml_files, desc="Simulating"):
    # Clean temp workflows folder
    temp_wf_dir = TEMP_REPO_DIR / ".github" / "workflows"
    for f in temp_wf_dir.glob("*"):
        f.unlink()

    # Copy workflow into temp repo
    temp_wf_path = temp_wf_dir / wf.name
    shutil.copy(wf, temp_wf_path)

    # Run Scorecard
    result = subprocess.run(
        ["../../tools/scorecard/scorecard", f"--local={TEMP_REPO_DIR}", "--format=json"],
        capture_output=True,
        text=True
    )

    try:
        json_data = json.loads(result.stdout)
    except json.JSONDecodeError:
        continue

    json_data["workflow"] = wf.name
    all_workflow_results.append(json_data)

# Save all results to a single JSON file
with open(FINDINGS_PATH, "w", encoding="utf-8") as f:
    json.dump(all_workflow_results, f, indent=2)

print(f"[✓] All scorecard workflow results saved to: {FINDINGS_PATH}")


Running Scorecard simulation on 18 workflows...


Simulating:   0%|          | 0/18 [00:00<?, ?it/s]

Simulating: 100%|██████████| 18/18 [00:06<00:00,  2.68it/s]

[✓] All scorecard workflow results saved to: ../ground-truth-vulnerabilities/tools_output/scorecard/findings_all.json





In [34]:
import json
from pathlib import Path
from collections import defaultdict

# Paths
input_file = Path("../ground-truth-vulnerabilities/tools_output/scorecard/findings_all.json")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/scorecard/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Rules we care about
relevant_rules = {
    "Dangerous-Workflow",
    "Pinned-Dependencies",
    "Token-Permissions",
    "SAST",
}

# Load all scorecard JSON results
with open(input_file, "r", encoding="utf-8") as f:
    workflows_data = json.load(f)

saved = 0
for wf in workflows_data:
    workflow_name = wf.get("workflow")
    checks = wf.get("checks", [])
    findings = []

    for check in checks:
        name = check.get("name")
        score = check.get("score")

        if name in relevant_rules and score is not None and score < 10 and score != -1:
            findings.append({
                "name": name,
                "score": score,
                "reason": check.get("reason"),
                "details": check.get("details"),
                "documentation": check.get("documentation", {})
            })

    if not findings:
        continue

    # Group by rule name
    by_rule = defaultdict(int)
    for f in findings:
        by_rule[f["name"]] += 1

    result = {
        "workflow": workflow_name,
        "tool": "scorecard",
        "summary": {
            "total_findings": len(findings),
            "by_rule": dict(by_rule)
        },
        "findings": findings
    }

    out_path = output_dir / f"{workflow_name}.json"
    with open(out_path, "w", encoding="utf-8") as f_out:
        json.dump(result, f_out, indent=2)
        saved += 1

print(f"Saved {saved} workflow files to {output_dir}")


Saved 17 workflow files to ../ground-truth-vulnerabilities/tools_output/scorecard/workflow_with_issues


semgrep

In [13]:
import json
from pathlib import Path
from collections import defaultdict

# Input and output paths
input_file = Path("../ground-truth-vulnerabilities/tools_output/semgrep/findings.json")
workflow_output_dir = Path("../ground-truth-vulnerabilities/tools_output/semgrep/workflow_with_issues")
workflow_output_dir.mkdir(parents=True, exist_ok=True)

# Load Semgrep findings JSON
with open(input_file, "r", encoding="utf-8") as f:
    data = json.load(f)

# Group findings by file (workflow)
grouped = defaultdict(list)
for finding in data.get("results", []):
    path = Path(finding.get("path", ""))
    if not path.name:
        continue

    workflow_name = path.name
    grouped[workflow_name].append(finding)

# Normalize and save
for wf_name, findings in grouped.items():
    structured = []
    rule_counts = defaultdict(int)

    for f in findings:
        rule_id = f.get("check_id", "unknown").split('.')[-1]
        rule_counts[rule_id] += 1

        structured.append({
            "rule": rule_id,
            "line": f.get("start", {}).get("line"),
            "code": f.get("extra", {}).get("lines"),
            "note": f.get("extra", {}).get("message"),
            "documentation": f.get("extra", {}).get("shortlink")
        })

    output_data = {
        "workflow": wf_name,
        "tool": "semgrep",
        "summary": {
            "total_findings": len(structured),
            "by_rule": dict(rule_counts)
        },
        "findings": structured
    }

    with open(workflow_output_dir / f"{wf_name}.json", "w", encoding="utf-8") as f_out:
        json.dump(output_data, f_out, indent=2)

print(f"[✓] Saved {len(list(workflow_output_dir.glob('*.json')))} normalized workflow result files to {workflow_output_dir}")


[✓] Saved 5 normalized workflow result files to ../ground-truth-vulnerabilities/tools_output/semgrep/workflow_with_issues


ggshield

In [14]:
import json
from pathlib import Path

# Paths
input_path = Path("../ground-truth-vulnerabilities/tools_output/ggshield/findings.json")
output_dir = Path("../ground-truth-vulnerabilities/tools_output/ggshield/workflow_with_issues")
output_dir.mkdir(parents=True, exist_ok=True)

# Load JSON data
with open(input_path, "r", encoding="utf-8") as f:
    data = json.load(f)

saved = 0

# Extract and normalize
for entity in data.get("entities_with_incidents", []):
    incidents = entity.get("incidents", [])
    if not incidents:
        continue

    filename = Path(entity["filename"]).name
    findings = []

    for inc in incidents:
        rule = inc.get("policy", "secret-detected")
        documentation = inc.get("detector_documentation")
        note = inc.get("type")

        for occ in inc.get("occurrences", []):
            findings.append({
                "rule": rule,
                "line": occ.get("line_start"),
                "code": occ.get("match"),
                "note": note,
                "documentation": documentation
            })

    if not findings:
        continue

    # Build summary by rule
    by_rule = {}
    for finding in findings:
        r = finding["rule"]
        by_rule[r] = by_rule.get(r, 0) + 1

    result = {
        "workflow": filename,
        "tool": "ggshield",
        "summary": {
            "total_findings": len(findings),
            "by_rule": by_rule
        },
        "findings": findings
    }

    with open(output_dir / f"{filename}.json", "w", encoding="utf-8") as out_f:
        json.dump(result, out_f, indent=2)
        saved += 1

print(f"[✓] Saved {saved} normalized workflow result files to {output_dir}")


[✓] Saved 0 normalized workflow result files to ../ground-truth-vulnerabilities/tools_output/ggshield/workflow_with_issues


EXTRACT RESULTS:

In [39]:
import json
from pathlib import Path
from collections import defaultdict

# Directory where each tool's normalized results live
TOOLS = ["frizbee", "pinny", "semgrep", "scorecard", "poutine", "scharf", "zizmor", "actionlint", "ggshield"]
base_dir = Path("../ground-truth-vulnerabilities/tools_output")
output_dir = Path("../ground-truth-vulnerabilities/results/merged") 
output_dir.mkdir(parents=True, exist_ok=True)

# Collect all results by workflow
workflow_results = defaultdict(dict)

for tool in TOOLS:
    tool_dir = base_dir / tool / "workflow_with_issues"
    if not tool_dir.exists():
        continue

    for wf_file in tool_dir.glob("*.json"):
        wf_name = wf_file.name
        with open(wf_file, "r", encoding="utf-8") as f:
            tool_result = json.load(f)
            workflow_results[wf_name][tool] = tool_result

# Save merged files
for wf_name, tool_dict in workflow_results.items():
    merged = {
        "workflow": wf_name,
        "tools": tool_dict
    }
    with open(output_dir / wf_name, "w", encoding="utf-8") as out_f:
        json.dump(merged, out_f, indent=2)

print(f"[✓] Saved {len(workflow_results)} merged workflow files to {output_dir}")


[✓] Saved 18 merged workflow files to ../ground-truth-vulnerabilities/results/merged


In [40]:
import json
import csv
from pathlib import Path
from collections import defaultdict

# Directories
input_dir = Path("../ground-truth-vulnerabilities/results/merged")
output_dir = Path("../ground-truth-vulnerabilities/results/normalized")
mapping_file = Path("../capabilities/rules_mapping.csv")

output_dir.mkdir(parents=True, exist_ok=True)

# ----------------------------
# Ordered: Most specific -> Least specific
# ----------------------------
ACTIONLINT_PATTERNS = [
    (["[action]", "action is too old to run on GitHub Actions"], "pinning-check"),
    (["[expression]", "undefined variable"], "workflow_structural_check"),
    (["[expression]", "undefined function"], "workflow_structural_check"),
    (["[expression]", "is not defined in object type"], "workflow_structural_check"),
    (["[expression]", "value cannot be compared to"], "control-flow-check"),
    (["[expression]", "untrusted"], "injection-check"),
    (["[expression]", "by reusable workflow"], "workflow_structural_check"),
    (["[expression]", "availability for more details"], "workflow_structural_check"),
    (["[workflow-call]"], "workflow_structural_check"),
    (["[syntax-check]"], "workflow_structural_check"),
    (["[expression]"], "control-flow-check"),
    (["[shellcheck]"], "workflow_structural_check"),
    (["[pyflakes]"], "workflow_structural_check"),
    (["[job-needs]"], "workflow_structural_check"),
    (["[matrix]"], "workflow_structural_check"),
    (["[events]"], "workflow_structural_check"),
    (["[glob]"], "workflow_structural_check"),
    (["[runner-label]"], "workflow_structural_check"),
    (["[action]"], "workflow_structural_check"),
    (["[shell-name]"], "workflow_structural_check"),
    (["[id]"], "workflow_structural_check"),
    (["[credentials]"], "secrets-check"),
    (["[env-var]"], "workflow_structural_check"),
    (["[permissions]"], "permissions-check"),
    (["[deprecated-commands]"], "injection-check"),
    (["[if-cond]"], "control-flow-check"),
]

# ----------------------------
# Load all other tools' rules
# ----------------------------
rule_map = defaultdict(list)
with open(mapping_file, newline='', encoding='utf-8') as csvfile:
    reader = csv.DictReader(csvfile)
    for row in reader:
        tool = row["tool_name"].strip()
        if tool == "actionlint":
            continue  # Skip, handled manually
        rule = row["rule"].strip()
        cap = row["capability"].strip()
        rule_map[(tool, rule)].append(cap)

# ----------------------------
# Match actionlint: first matching pattern wins
# ----------------------------
def match_actionlint_capability(message):
    msg = message.lower()
    for keywords, capability in ACTIONLINT_PATTERNS:
        if all(kw.lower() in msg for kw in keywords):
            return capability
    return None

# ----------------------------
# Generic capability assignment
# ----------------------------
def get_capability(tool, finding, rule_hint=None):
    for key in ["rule", "rule_id", "name"]:
        if key in finding:
            rule_val = finding[key]
            if (tool, rule_val) in rule_map:
                return rule_map[(tool, rule_val)][0]

    if rule_hint and (tool, rule_hint) in rule_map:
        return rule_map[(tool, rule_hint)][0]

    if tool == "actionlint" and "message" in finding:
        return match_actionlint_capability(finding["message"])

    return None

# ----------------------------
# Main Processing Loop
# ----------------------------
for file_path in input_dir.glob("*.json"):
    with open(file_path, encoding="utf-8") as f:
        data = json.load(f)

    for tool, result in data.get("tools", {}).items():
        findings = result.get("findings", [])

        # For tools like scharf where rule is only in summary
        by_rule = result.get("summary", {}).get("by_rule", {})
        if tool == "scharf" and by_rule:
            for rule_name in by_rule:
                cap = rule_map.get((tool, rule_name), [None])[0]
                if cap:
                    for finding in findings:
                        finding["capability"] = cap
            continue

        # Normal tools and actionlint
        for finding in findings:
            cap = get_capability(tool, finding)
            if cap:
                finding["capability"] = cap

    # Save final output
    output_file = output_dir / file_path.name
    with open(output_file, "w", encoding="utf-8") as out_f:
        json.dump(data, out_f, indent=2)

print(f"[✓] Normalized and saved files to: {output_dir}")


[✓] Normalized and saved files to: ../ground-truth-vulnerabilities/results/normalized


In [41]:
import json
import csv
from pathlib import Path
from collections import defaultdict

# Directories
input_dir = Path("../ground-truth-vulnerabilities/results/normalized")
output_csv = Path("../ground-truth-vulnerabilities/results/tools_findings_summary.csv")

# Collect all tool names and findings
all_tools = set()
all_workflows = []

for file_path in input_dir.glob("*.json"):
    with open(file_path, encoding="utf-8") as f:
        data = json.load(f)

    workflow_name = data.get("workflow", file_path.stem)
    tool_counts = {}

    for tool_name, tool_data in data.get("tools", {}).items():
        findings = tool_data.get("findings", [])
        count = len(findings)
        tool_counts[tool_name] = count
        all_tools.add(tool_name)

    all_workflows.append((workflow_name, tool_counts))

# Sort workflows alphabetically
all_workflows.sort(key=lambda x: x[0].lower())

# Prepare CSV rows
all_tools = sorted(all_tools)
header = ["workflow"] + all_tools

with open(output_csv, "w", newline='', encoding="utf-8") as csvfile:
    writer = csv.writer(csvfile)
    writer.writerow(header)

    for wf_name, tool_counts in all_workflows:
        row = [wf_name] + [tool_counts.get(tool, 0) for tool in all_tools]
        writer.writerow(row)

print(f"findings summary saved to: {output_csv}")


findings summary saved to: ../ground-truth-vulnerabilities/results/tools_findings_summary.csv


In [42]:
import pandas as pd
from pathlib import Path

# === Paths ===
input_csv = Path("../ground-truth-vulnerabilities/results/tools_findings_summary.csv")
output_txt = Path("../ground-truth-vulnerabilities/results/manual_review/selected_workflows.txt")
output_txt.parent.mkdir(parents=True, exist_ok=True)

# === Load CSV ===
df = pd.read_csv(input_csv, index_col=0)

# === Extract all index entries (workflow file names) ===
workflow_names = df.index.tolist()

# === Save to file ===
with open(output_txt, "w") as f:
    for wf in workflow_names:
        f.write(wf + "\n")

print(f"[✓] Extracted {len(workflow_names)} workflow names.")
print(f"[✓] Saved to: {output_txt}")


[✓] Extracted 18 workflow names.
[✓] Saved to: ../ground-truth-vulnerabilities/results/manual_review/selected_workflows.txt


In [43]:
import json
from pathlib import Path
import csv

# === Paths ===
input_dir = Path("../ground-truth-vulnerabilities/results/normalized")
output_dir = Path("../ground-truth-vulnerabilities/results/manual_review")
output_dir.mkdir(exist_ok=True)

# === Load workflow filenames ===
with open(output_dir / "selected_workflows.txt", "r", encoding="utf-8") as f:
    selected_workflows = [line.strip() for line in f if line.strip()]

# === Generate summaries ===
for wf_file in selected_workflows:
    wf_path = input_dir / wf_file
    if not wf_path.exists():
        print(f"[!] Workflow not found: {wf_file}")
        continue

    with open(wf_path, encoding="utf-8") as f:
        data = json.load(f)

    findings_data = []

    for tool, tool_data in data.get("tools", {}).items():
        findings = tool_data.get("findings", [])
        for f in findings:
            # Default values
            line = f.get("line", None)
            column = f.get("column", None)
            message = f.get("message", f.get("reason", f.get("note", None)))

            # Fallbacks from meta
            if not line and isinstance(f.get("meta"), dict):
                line = f["meta"].get("line_number") or f["meta"].get("line", "N/A")
            if not column and isinstance(f.get("meta"), dict):
                column = f["meta"].get("column", "N/A")

            # === Tool-specific message logic ===
            if tool == "frizbee" and "original" in f:
                message = f["original"]
            elif tool == "pinny" and "meta" in f:
                message = f["meta"].get("action") or f["meta"].get("line_snippet")
            elif not message and isinstance(f.get("meta"), dict):
                message = f["meta"].get("details", "N/A")

            findings_data.append({
                "tool": tool,
                "capability": f.get("capability", "N/A"),
                "line": line if line is not None else "N/A",
                "column": column if column is not None else "N/A",
                "message": message if message is not None else "N/A"
            })

    # Save to CSV
    out_file = output_dir / f"{wf_file.replace('.json', '')}_summary.csv"
    with open(out_file, "w", newline='', encoding="utf-8") as csvfile:
        writer = csv.DictWriter(csvfile, fieldnames=["tool", "capability", "line", "column", "message"])
        writer.writeheader()
        writer.writerows(findings_data)

    print(f"[✓] Created summary for: {wf_file}")


[✓] Created summary for: 1-GHSA-4mgv-m5cm-f9h7__terraform.yml.json
[✓] Created summary for: 10-GHSA-h3qr-39j9-4r5v__gradle-build.yml.json
[✓] Created summary for: 12-GHSA-8v8w-v8xg-79rf__fix-style.yml.json
[✓] Created summary for: 17-GHSA-xj87-mqvh-88w2__test.yml.json
[✓] Created summary for: 18-GHSA-7x29-qqmq-v6qc__main.yml.json
[✓] Created summary for: 19-GHSA-cxww-7g56-2vh6__pipeline.yaml.json
[✓] Created summary for: 2-GHSA-g86g-chm8-7r2p__spelling.yml.json
[✓] Created summary for: 21-GHSA-5xr6-xhww-33m4__PublishRelease.yaml.json
[✓] Created summary for: 22-GHSA-vqf5-2xx6-9wfm__codeql-analysis.yml.json
[✓] Created summary for: 26-GHSA-mxr3-8whj-j74r__code-review.yml.json
[✓] Created summary for: 26-GHSA-mxr3-8whj-j74r__test.yml.json
[✓] Created summary for: 27-GHSA-2487-9f55-2vg9__action.yml.json
[✓] Created summary for: 28-GHSA-m32f-fjw2-37v3__bullfrog.yml.json
[✓] Created summary for: 29-GHSA-phf6-hm3h-x8qp__scalafmt-fix.yml.json
[✓] Created summary for: 4-GHSA-4xqx-pqpj-9fqw__kn