# CodeBERT Similarity Calculations
A consolidated notebook to run PSG, SG, and TPU-SG similarity scoring and persist results.

**Sections**
1. Configure Paths and Environment
2. Import Dependencies and Utilities
3. Load Reference File and Discover Candidates
4. Run Similarity Scoring Loop (All Three Scripts)
5. Append and Merge CSV Outputs
6. Quick Summary and Verification

In [1]:
# 1. Configure Paths and Environment
from pathlib import Path
import sys

if "__file__" in globals():
    SCRIPT_DIR = Path(__file__).resolve().parent
else:
    # Notebook execution: assume cwd is the repo root
    SCRIPT_DIR = Path.cwd() / "codebertscore-similarity/analyser-results"
    if not SCRIPT_DIR.exists():
        # Fallback to cwd if running from inside analyser-results already
        SCRIPT_DIR = Path.cwd()

# Derive project root based on known layout
if SCRIPT_DIR.name == "analyser-results" and SCRIPT_DIR.parent.name == "codebertscore-similarity":
    PROJECT_ROOT = SCRIPT_DIR.parent.parent
else:
    PROJECT_ROOT = SCRIPT_DIR

CODE_BERT_PACKAGE = PROJECT_ROOT / "codebertscore-similarity"
sys.path.insert(0, str(CODE_BERT_PACKAGE))

# Per-script configuration matching the original files
CONFIGS = {
    "psg": {
        "reference": PROJECT_ROOT / "codebertscore-similarity/references/TFLite_detection_video.py",
        "candidate_glob": "*/exported_valid_code/psg/*.py",
        "output_csv": PROJECT_ROOT / "codebertscore-similarity/analyser-results/similarity_results_psg.csv",
        "lang": "python",
    },
    "sg": {
        "reference": PROJECT_ROOT / "codebertscore-similarity/references/object_color_classify.ino",
        "candidate_glob": "*/exported_valid_code/sg/*.ino",
        "output_csv": PROJECT_ROOT / "codebertscore-similarity/analyser-results/similarity_results_sg.csv",
        "lang": "c_sharp",
    },
    "tpusg": {
        "reference": PROJECT_ROOT / "codebertscore-similarity/references/TFLite_detection_video_TPU.py",
        "candidate_glob": "*/exported_valid_code/tpusg/*.py",
        "output_csv": PROJECT_ROOT / "codebertscore-similarity/analyser-results/similarity_results_tpusg.csv",
        "lang": "python",
    },
}

# Include both 2025 and 2026 runs under langfuse_export
CANDIDATE_ROOT = PROJECT_ROOT / "langfuse_export"

In [2]:
# 2. Import Dependencies and Utilities
import os
from datetime import datetime
import pandas as pd
import torch
import code_bert_score


def load_file_content(file_path: Path):
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            return f.read()
    except Exception as e:
        print(f"Error reading {file_path}: {e}")
        return None


def extract_model_name(candidate_path: Path):
    # .../<run>/exported_valid_code/<variant>/file.ext
    try:
        return candidate_path.parents[2].name
    except IndexError:
        return candidate_path.stem


def ensure_header(csv_path: Path, columns):
    csv_path.parent.mkdir(parents=True, exist_ok=True)
    if not csv_path.exists():
        pd.DataFrame(columns=columns).to_csv(csv_path, index=False)


def get_processed_ids(csv_path: Path):
    if not csv_path.exists():
        return set()
    try:
        df = pd.read_csv(csv_path, usecols=["Candidate_ID"])
        if "Candidate_ID" in df.columns:
            return set(df["Candidate_ID"].astype(str).dropna())
    except Exception as e:
        print(f"Error reading existing CSV {csv_path}: {e}")
    return set()


def get_processed_paths(csv_path: Path):
    if not csv_path.exists():
        return set()
    try:
        df = pd.read_csv(csv_path, usecols=["Candidate_Path"])
        if "Candidate_Path" in df.columns:
            return set(df["Candidate_Path"].astype(str).dropna())
    except Exception as e:
        print(f"Error reading existing CSV {csv_path}: {e}")
    return set()


def parse_model_fields(raw_model):
    """Extract base model (phi4|qw32|gpt5) and prompt_level (abla-l1|abla-l2|original) from raw model text."""
    text = str(raw_model)
    if "_" in text:
        _, text = text.split("_", 1)
        
    levels = ["abla-l1", "abla-l2", "abla-1p", "abla-2p"]
    prompt_level = next((lvl for lvl in levels if lvl in text), "original")
    base_model = next((m for m in ("phi4", "qw32", "gpt5") if m in text), text.split("-")[0])
    
    # Normalize model names
    model_mapping = {
        "gpt5": "gpt-5",
        "qw32": "qwen32",
    }
    base_model = model_mapping.get(base_model, base_model)
    
    return base_model, prompt_level

In [3]:
# 3. Load Reference File and Discover Candidates
references = {}
candidate_sets = {}

for key, cfg in CONFIGS.items():
    ref_content = load_file_content(cfg["reference"])
    if not ref_content:
        print(f"Failed to load reference for {key}: {cfg['reference']}")
        continue
    references[key] = ref_content
    # search across all runs under langfuse_export (e.g., 2025, 2026, etc.)
    candidate_files = sorted(CANDIDATE_ROOT.glob(f"*/{cfg['candidate_glob']}"))
    candidate_sets[key] = candidate_files
    print(f"[{key}] reference loaded; candidates: {len(candidate_files)}")

# Device selection
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {DEVICE}")

[psg] reference loaded; candidates: 276
[sg] reference loaded; candidates: 54
[tpusg] reference loaded; candidates: 306
Using device: cuda


In [4]:
# 4. Run Similarity Scoring Loop (All Three Scripts)
results = {}
common_columns = [
    "Candidate_ID",
    "Model",
    "prompt_level",
    "Precision",
    "Recall",
    "F1",
    "F3",
    # "Reference_File",
    # "Language",
    "Processor",
    # "Timestamp",
]

for key, cfg in CONFIGS.items():
    ensure_header(cfg["output_csv"], common_columns)
    processed_ids = get_processed_ids(cfg["output_csv"])
    candidate_files = candidate_sets.get(key, [])
    ref_content = references.get(key)
    if not ref_content:
        continue

    loop_records = []
    for candidate_path in candidate_files:
        candidate_id = candidate_path.relative_to(PROJECT_ROOT).as_posix()
        if candidate_id in processed_ids:
            continue

        candidate_content = load_file_content(candidate_path)
        if not candidate_content:
            continue

        try:
            P, R, F1, F3 = code_bert_score.score(
                cands=[candidate_content],
                refs=[ref_content],
                lang=cfg["lang"],
                device=DEVICE,
                verbose=False,
            )

            p_val = P[0].item()
            r_val = R[0].item()
            f1_val = F1[0].item()
            f3_val = F3[0].item()

            model_name = extract_model_name(candidate_path)
            base_model, prompt_level = parse_model_fields(model_name)
            timestamp = datetime.now().isoformat()

            record = {
                "Candidate_ID": candidate_id,
                "Model": base_model,
                "prompt_level": prompt_level,
                "Precision": p_val,
                "Recall": r_val,
                "F1": f1_val,
                "F3": f3_val,
                # "Reference_File": cfg["reference"],
                # "Language": cfg["lang"],
                "Processor": key,
                # "Timestamp": timestamp,
            }
            loop_records.append(record)
        except Exception as e:
            print(f"[{key}] Error processing {candidate_path}: {e}")

    if loop_records:
        df_loop = pd.DataFrame(loop_records)
        df_loop.to_csv(cfg["output_csv"], mode="a", header=False, index=False)
        results[key] = df_loop
        print(f"[{key}] wrote {len(df_loop)} rows to {cfg['output_csv']}")
    else:
        print(f"[{key}] no new rows written")

[psg] no new rows written
[sg] no new rows written
[tpusg] no new rows written


In [5]:
# 5. Append and Merge CSV Outputs
merged = []
run_candidate_counts = {k: len(v) for k, v in candidate_sets.items()}
output_row_counts = {}
for key, cfg in CONFIGS.items():
    if cfg["output_csv"].exists():
        try:
            df = pd.read_csv(cfg["output_csv"])
            df["run"] = key
            merged.append(df)
            output_row_counts[key] = len(df)
        except Exception as e:
            print(f"[{key}] Error reading {cfg['output_csv']}: {e}")

if merged:
    combined = pd.concat(merged, ignore_index=True)
    print(f"Combined rows: {len(combined)}")
else:
    combined = pd.DataFrame()
    print("No CSVs to combine yet.")

print("Run counts (candidates -> csv rows):")
for key in CONFIGS:
    cand = run_candidate_counts.get(key, 0)
    rows = output_row_counts.get(key, 0)
    print(f"{key}: {cand} candidates; {rows} rows in csv")
total_candidates = sum(run_candidate_counts.values())
total_rows = sum(output_row_counts.values())
print(f"Totals discovered: {total_candidates} candidates across runs; {total_rows} rows across csvs")

Combined rows: 636
Run counts (candidates -> csv rows):
psg: 276 candidates; 276 rows in csv
sg: 54 candidates; 54 rows in csv
tpusg: 306 candidates; 306 rows in csv
Totals discovered: 636 candidates across runs; 636 rows across csvs


In [6]:
# 6. Quick Summary and Verification
if not combined.empty:
    display(combined.head())
    display(combined.tail())
    print("Counts by run:")
    print(combined.groupby("run")["Candidate_ID"].count())
    print("F1 stats:")
    print(combined["F1"].describe())
    discovered = {k: len(v) for k, v in candidate_sets.items()}
    print("Coverage (rows vs discovered candidates):")
    for key in CONFIGS:
        rows = len(combined[combined["run"] == key])
        print(f"{key}: {rows} rows vs {discovered.get(key, 0)} candidates")
    print(f"Total candidates: {sum(discovered.values())}; total rows: {len(combined)}")
else:
    print("No data to summarize yet.")

Unnamed: 0,Candidate_ID,Model,Precision,Recall,F1,F3,Processor,prompt_level,run
0,langfuse_export/2026/02.03_gpt5-mpu/exported_v...,gpt5,0.7922946810722351,0.86771,0.82829,0.859529,psg,original,psg
1,langfuse_export/2026/02.03_gpt5-mpu/exported_v...,gpt5,0.7844812870025635,0.873618,0.826654,0.863803,psg,original,psg
2,langfuse_export/2026/02.03_gpt5-mpu/exported_v...,gpt5,0.7849806547164917,0.876138,0.828058,0.866081,psg,original,psg
3,langfuse_export/2026/02.03_gpt5-mpu/exported_v...,gpt5,0.7809998989105225,0.864467,0.820617,0.855326,psg,original,psg
4,langfuse_export/2026/02.03_gpt5-mpu/exported_v...,gpt5,0.7773381471633911,0.871617,0.821782,0.861172,psg,original,psg


Unnamed: 0,Candidate_ID,Model,Precision,Recall,F1,F3,Processor,prompt_level,run
631,langfuse_export/2026/02.23_abla-2p-phi4/export...,phi4,abla-2p,0.912027,0.848934,0.87935,0.8548478484153748,abla-2p,tpusg
632,langfuse_export/2026/02.23_abla-2p-phi4/export...,phi4,abla-2p,0.902719,0.85082,0.876002,0.8557400703430176,abla-2p,tpusg
633,langfuse_export/2026/02.23_abla-2p-phi4/export...,phi4,abla-2p,0.902719,0.85082,0.876002,0.8557400703430176,abla-2p,tpusg
634,langfuse_export/2026/02.23_abla-2p-phi4/export...,phi4,abla-2p,0.911781,0.846815,0.878098,0.8528918623924255,abla-2p,tpusg
635,langfuse_export/2026/02.23_abla-2p-phi4/export...,phi4,abla-2p,0.911781,0.846815,0.878098,0.8528918623924255,abla-2p,tpusg


Counts by run:
run
psg      276
sg        54
tpusg    306
Name: Candidate_ID, dtype: int64
F1 stats:
count    636.000000
mean       0.850260
std        0.020654
min        0.615672
25%        0.837927
50%        0.848072
75%        0.864797
max        0.889104
Name: F1, dtype: float64
Coverage (rows vs discovered candidates):
psg: 276 rows vs 276 candidates
sg: 54 rows vs 54 candidates
tpusg: 306 rows vs 306 candidates
Total candidates: 636; total rows: 636


In [7]:
# 7. Backfill prompt_level and normalize Model for existing CSV outputs

def parse_from_candidate_id(candidate_id: str):
    """Derive base model and prompt level from the Candidate_ID path token."""
    text = str(candidate_id)
    run_segment = next(
        (seg for seg in Path(text).parts if "abla" in seg or "original" in seg or "og" in seg),
        text,
    )
    return parse_model_fields(run_segment)

def backfill_outputs():
    for c in ["psg", "sg", "tpusg"]:
        csv_path = CONFIGS[c]["output_csv"]
        if not csv_path.exists():
            print(f"{c}: {csv_path} missing, skipping")
            continue
        df = pd.read_csv(csv_path)
        if df.empty:
            print(f"{c}: {csv_path} empty, skipping")
            continue
        if "Candidate_ID" not in df.columns:
            print(f"{c}: Candidate_ID missing in {csv_path}, skipping backfill")
            continue
        base_prompt = df["Candidate_ID"].apply(parse_from_candidate_id).apply(pd.Series)
        df["Model"] = base_prompt[0]
        df["prompt_level"] = base_prompt[1]
        if "Candidate_Path" in df.columns:
            df = df.drop(columns=["Candidate_Path"])
        ordered_cols = [
            "Candidate_ID",
            "Model",
            "Precision",
            "Recall",
            "F1",
            "F3",
            "Processor",
            "prompt_level",
        ]
        df = df[[col for col in ordered_cols if col in df.columns]]
        df.to_csv(csv_path, index=False)
        print(f"{c}: cleaned {len(df)} rows -> {csv_path}")
    print("Backfill complete.")

backfill_outputs()


psg: cleaned 276 rows -> /home/han/Projects/benchmark-tinyml_llm-2026/codebertscore-similarity/analyser-results/similarity_results_psg.csv
sg: cleaned 54 rows -> /home/han/Projects/benchmark-tinyml_llm-2026/codebertscore-similarity/analyser-results/similarity_results_sg.csv
tpusg: cleaned 306 rows -> /home/han/Projects/benchmark-tinyml_llm-2026/codebertscore-similarity/analyser-results/similarity_results_tpusg.csv
Backfill complete.
