In [1]:
import os, re, json, warnings
from datetime import datetime
from typing import Optional, Dict, Tuple, List, Any

import numpy as np
import pandas as pd
from joblib import dump, load

from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support, classification_report
)

warnings.filterwarnings("ignore", category=FutureWarning)

# ------------------ USER PATHS ------------------
CSV_PATH = r"C:\Users\sagni\Downloads\Code Generator\archive\train.csv"
OUT_DIR  = r"C:\Users\sagni\Downloads\Code Generator"

# Optional: if you DO have a ground-truth label column (e.g., "lang", "category")
LABEL_OVERRIDE: Optional[str] = None   # e.g., "lang"

# Optional: predict on an external file (.txt/.csv/.json); leave None to skip
INFER_SOURCE: Optional[str] = None
# Examples:
# INFER_SOURCE = r"C:\Users\sagni\Downloads\Code Generator\my_eval.csv"
# INFER_SOURCE = r"C:\Users\sagni\Downloads\Code Generator\snippets.txt"
# INFER_SOURCE = r"C:\Users\sagni\Downloads\Code Generator\batch.json"

# ------------------ CONSTANTS ------------------
RANDOM_STATE = 42
TEST_SIZE = 0.15
VALID_SIZE = 0.15  # of remaining after test split
AUTO_LABEL_MIN_SAMPLES_PER_CLASS = 5

TEXT_CANDS = [
    "instruction","prompt","question","title","description","desc","nl",
    "input","context","spec","problem","statement","query","utterance"
]
LABEL_CANDS = [
    "label","category","class","target","task","task_id","task_type",
    "language","lang","topic","difficulty","level","dataset","source","tag","type"
]
CODE_CANDS = ["output","code","completion","solution","program","answer","response"]

MODEL_PATH    = os.path.join(OUT_DIR, "cg_model.joblib")
LABELMAP_PATH = os.path.join(OUT_DIR, "cg_label_map.json")

# ------------------ HELPERS ------------------
def ensure_out():
    os.makedirs(OUT_DIR, exist_ok=True)

def read_csv_any(path: str) -> pd.DataFrame:
    if not os.path.exists(path):
        raise FileNotFoundError(path)
    for kwargs in [{}, {"encoding":"utf-8","errors":"ignore"}, {"encoding":"latin-1"}, {"engine":"python"}]:
        try:
            return pd.read_csv(path, **kwargs)
        except Exception:
            continue
    raise RuntimeError(f"Could not read CSV: {path}")

def sanitize_colname(s: str) -> str:
    return re.sub(r"[^A-Za-z0-9_]+", "_", s.strip()) or "label"

# ------------------ LANGUAGE DETECTION (heuristic) ------------------
LANG_PATTERNS = [
    ("html", re.compile(r"<!doctype html>|<html|</html>|<div|<span|<body", re.I)),
    ("css", re.compile(r"{\s*[^}]*:\s*[^}]+}", re.I)),
    ("sql", re.compile(r"\b(select|insert|update|delete|create table|where|join|group by|order by)\b", re.I)),
    ("bash", re.compile(r"^#!\/bin\/bash|(^|\n)\s*(echo|grep|awk|sed|export|cd )", re.I)),
    ("python", re.compile(r"\b(def |import |from |lambda|print\(|self\b|async def|raise |except |try:)", re.I)),
    ("typescript", re.compile(r"\binterface\b|\btype\s+\w+\s*=\s*|\bas\s+\w+|:\s*(string|number|boolean)\b", re.I)),
    ("javascript", re.compile(r"\b(function |console\.log|=>|var |let |const |export |import )", re.I)),
    ("java", re.compile(r"\b(public class|System\.out\.println|static void main|package\s+\w+;|import java\.)", re.I)),
    ("cpp", re.compile(r"#include\s*<[^>]+>|std::|using namespace std|cout\s*<<", re.I)),
    ("csharp", re.compile(r"\busing System;|Console\.Write(Line|)\(|namespace\s+\w+|public class Program\b", re.I)),
    ("go", re.compile(r"\bpackage main\b|\bfunc main\(|\bfmt\.", re.I)),
    ("rust", re.compile(r"\bfn main\(\)|println!|let mut|use std::", re.I)),
    ("php", re.compile(r"<\?php|echo\s+\$|->", re.I)),
    ("ruby", re.compile(r"\bdef\b.*\n.*\n.*\bend\b|\bputs\b", re.I)),
    ("kotlin", re.compile(r"\bfun main\(|\bval\s+\w+|\bvar\s+\w+|\bprintln\(", re.I)),
    ("swift", re.compile(r"\bimport Foundation\b|\bprint\(|\blet\s+\w+|\bvar\s+\w+|\bfunc\b", re.I)),
    ("r", re.compile(r"<-|\blibrary\(|\bdata\.frame\(", re.I)),
    ("matlab", re.compile(r"^function\s+\[?.*\]?\s*=\s*\w+\(|\bplot\(|\%\s", re.I)),
]

def detect_language_from_code(code: str) -> str:
    if not isinstance(code, str) or not code.strip():
        return "unknown"
    for name, pat in LANG_PATTERNS:
        if pat.search(code):
            return name
    return "unknown"

# ------------------ COLUMN DETECTION ------------------
def detect_columns(df: pd.DataFrame) -> Tuple[pd.Series, Optional[pd.Series], str, Optional[str], Optional[str]]:
    lower = {c.lower(): c for c in df.columns}

    # text feature column
    tcol = next((lower[c] for c in TEXT_CANDS if c in lower), None)
    if tcol is None:
        obj = [c for c in df.columns if df[c].dtype == object]
        if not obj:
            raise ValueError("No text-like column found. Add e.g., 'instruction' or 'prompt'.")
        tcol = obj[0]
    text = df[tcol].astype(str).fillna("").str.replace(r"\s+", " ", regex=True).str.strip()
    text = text[text != ""]
    df = df.loc[text.index]

    # label column
    lcol = None
    if LABEL_OVERRIDE and LABEL_OVERRIDE in df.columns:
        lcol = LABEL_OVERRIDE
    else:
        lcol = next((lower[c] for c in LABEL_CANDS if c in lower), None)

    # code column (for auto-labeling)
    ccol = next((lower[c] for c in CODE_CANDS if c in lower), None)

    labels_raw = df[lcol].loc[text.index] if lcol else None
    return text, labels_raw, tcol, lcol, ccol

# ------------------ MODEL ------------------
def build_pipeline() -> Pipeline:
    return Pipeline(steps=[
        ("tfidf", TfidfVectorizer(
            lowercase=True,
            ngram_range=(1,2),      # uni+bi-grams
            min_df=2,
            max_features=300_000,
            strip_accents="unicode"
        )),
        ("clf", LogisticRegression(
            solver="saga",
            penalty="l2",
            C=1.0,
            max_iter=400,
            n_jobs=-1,
            random_state=RANDOM_STATE,
            class_weight="balanced",
            multi_class="auto"
        ))
    ])

# ------------------ PRED/IO ------------------
def save_predictions_dataframe(df: pd.DataFrame, base_name: str):
    csv_path  = os.path.join(OUT_DIR, f"{base_name}.csv")
    json_path = os.path.join(OUT_DIR, f"{base_name}.json")
    df.to_csv(csv_path, index=False, encoding="utf-8")
    with open(json_path, "w", encoding="utf-8") as f:
        payload = {
            "created_utc": datetime.utcnow().isoformat() + "Z",
            "items": df.to_dict(orient="records"),
            "summary": {
                "total": int(len(df)),
                "pred_counts": {k: int(v) for k, v in df["pred_label"].value_counts().to_dict().items()}
            }
        }
        json.dump(payload, f, ensure_ascii=False, indent=2)
    print("[OUT] Saved:", csv_path)
    print("[OUT] Saved:", json_path)

def load_texts_for_infer(path: str) -> List[str]:
    ext = os.path.splitext(path)[1].lower()
    if ext == ".txt":
        with open(path, "r", encoding="utf-8", errors="ignore") as f:
            return [ln.strip() for ln in f if ln.strip()]
    if ext == ".json":
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if isinstance(data, dict) and "items" in data:
            return [d.get("text","") for d in data["items"] if str(d.get("text","")).strip()]
        if isinstance(data, dict) and "messages" in data:
            return [m.get("text","") if isinstance(m,dict) else str(m) for m in data["messages"] if str(m).strip()]
        if isinstance(data, list):
            return [d.get("text","") if isinstance(d,dict) else str(d) for d in data if str(d).strip()]
        raise ValueError("Unsupported JSON schema. Use list, or {'messages':[{'text':...}]}, or {'items':[{'text':...}]} .")
    if ext == ".csv":
        df = read_csv_any(path)
        lower = {c.lower(): c for c in df.columns}
        # prefer code-like column if present
        for c in CODE_CANDS + TEXT_CANDS:
            if c in lower:
                col = lower[c]
                return df[col].astype(str).fillna("").str.strip().tolist()
        # fallback: first column
        return df.iloc[:,0].astype(str).fillna("").str.strip().tolist()
    raise ValueError("Supported external inference files: .txt, .csv, .json")

# ------------------ MAIN ------------------
def main():
    ensure_out()
    df = read_csv_any(CSV_PATH)

    # Detect columns
    text, labels_raw, tcol, lcol, ccol = detect_columns(df)
    labels_source = "provided_column"

    # If no label column, auto-label by language using code column
    if labels_raw is None:
        if ccol is None:
            raise ValueError(
                "No label-like column found AND no code column to auto-label from.\n"
                "Fix: set LABEL_OVERRIDE to your label column name, or ensure a code column exists "
                "(one of: output/code/completion/solution/program/answer/response)."
            )
        codes = df.loc[text.index, ccol].astype(str).fillna("")
        auto_labels = codes.map(detect_language_from_code)
        # keep known languages only
        mask_known = auto_labels != "unknown"
        text = text[mask_known]; auto_labels = auto_labels[mask_known]
        if auto_labels.nunique() < 2:
            raise ValueError(
                f"Auto-labeling produced <2 classes (found: {sorted(auto_labels.unique().tolist())}). "
                "Need >=2 classes to train and evaluate."
            )
        # drop tiny classes
        vc = auto_labels.value_counts()
        keep = vc[vc >= AUTO_LABEL_MIN_SAMPLES_PER_CLASS].index
        text = text[auto_labels.isin(keep)]
        auto_labels = auto_labels[auto_labels.isin(keep)]
        labels_raw = auto_labels
        labels_source = "auto:lang_from_code"
        # Use the code itself as features for better signal
        text = df.loc[text.index, ccol].astype(str).fillna("")
        print(f"[INFO] Auto-labeled classes (kept >= {AUTO_LABEL_MIN_SAMPLES_PER_CLASS} samples):", sorted(keep.tolist()))

    # Encode labels
    le = LabelEncoder()
    y_all = le.fit_transform(labels_raw.astype(str))
    id2label = {int(i): str(l) for i, l in enumerate(le.classes_)}

    # Split
    X_tmp, X_test, y_tmp, y_test = train_test_split(
        text, y_all, test_size=TEST_SIZE, stratify=y_all,
        random_state=RANDOM_STATE, shuffle=True
    )
    valid_frac_of_tmp = VALID_SIZE / (1.0 - TEST_SIZE)
    X_train, X_valid, y_train, y_valid = train_test_split(
        X_tmp, y_tmp, test_size=valid_frac_of_tmp, stratify=y_tmp,
        random_state=RANDOM_STATE, shuffle=True
    )

    # Build & fit on TRAIN+VALID
    pipe = Pipeline(steps=[
        ("tfidf", TfidfVectorizer(
            lowercase=True,
            ngram_range=(1,2),
            min_df=2,
            max_features=300_000,
            strip_accents="unicode"
        )),
        ("clf", LogisticRegression(
            solver="saga",
            penalty="l2",
            C=1.0,
            max_iter=400,
            n_jobs=-1,
            random_state=RANDOM_STATE,
            class_weight="balanced",
            multi_class="auto"
        ))
    ])
    pipe.fit(pd.concat([X_train, X_valid]), np.concatenate([y_train, y_valid]))

    # Save model + label map
    dump(pipe, MODEL_PATH)
    with open(LABELMAP_PATH, "w", encoding="utf-8") as f:
        json.dump({"id2label": {str(k): v for k, v in id2label.items()},
                   "label2id": {str(v): int(k) for k, v in id2label.items()}},
                  f, ensure_ascii=False, indent=2)

    print("[SAVE] Model:", MODEL_PATH)
    print("[SAVE] Label map:", LABELMAP_PATH)

    # Evaluate on TEST
    y_pred = pipe.predict(X_test)
    proba = pipe.predict_proba(X_test) if hasattr(pipe[-1], "predict_proba") else None

    acc = accuracy_score(y_test, y_pred)
    p, r, f1, _ = precision_recall_fscore_support(y_test, y_pred, average="macro", zero_division=0)

    # Report
    report_txt = classification_report(
        y_test, y_pred,
        target_names=[id2label[i] for i in sorted(set(y_test) | set(y_pred))],
        zero_division=0
    )
    with open(os.path.join(OUT_DIR, "cg_classification_report.txt"), "w", encoding="utf-8") as f:
        f.write(report_txt)

    # Metrics JSON
    metrics = {
        "created_utc": datetime.utcnow().isoformat() + "Z",
        "source_csv": CSV_PATH,
        "labels_source": labels_source,
        "text_column_used": tcol,
        "label_column_used": (lcol if labels_source == "provided_column" else "auto:lang_from_code"),
        "sizes": {"train": int(len(X_train)), "valid": int(len(X_valid)), "test": int(len(X_test))},
        "test_metrics": {
            "accuracy": round(float(acc), 4),
            "macro_precision": round(float(p), 4),
            "macro_recall": round(float(r), 4),
            "macro_f1": round(float(f1), 4)
        },
        "artifacts": {
            "model_path": MODEL_PATH,
            "label_map_path": LABELMAP_PATH,
            "classification_report_txt": os.path.join(OUT_DIR, "cg_classification_report.txt"),
            "predictions_csv": os.path.join(OUT_DIR, "cg_predictions_test.csv")
        }
    }
    with open(os.path.join(OUT_DIR, "cg_metrics.json"), "w", encoding="utf-8") as f:
        json.dump(metrics, f, ensure_ascii=False, indent=2)

    # Save per-row TEST predictions (with probabilities)
    preds_test = {
        "text": X_test.values,
        "y_true": [id2label[int(i)] for i in y_test],
        "pred_label": [id2label[int(i)] for i in y_pred]
    }
    if proba is not None:
        for i in range(proba.shape[1]):
            lab = sanitize_colname(id2label[i])
            preds_test[f"p_{lab}"] = proba[:, i]
    test_df = pd.DataFrame(preds_test)
    save_predictions_dataframe(test_df, base_name="cg_predictions_test")

    # ---------------- External inference (optional) ----------------
    if INFER_SOURCE is not None:
        if not os.path.exists(INFER_SOURCE):
            raise FileNotFoundError(INFER_SOURCE)
        texts = load_texts_for_infer(INFER_SOURCE)
        texts = [t for t in texts if str(t).strip()]
        if not texts:
            print("[WARN] No texts to predict in external file.")
        else:
            y_ext = pipe.predict(texts)
            proba_ext = pipe.predict_proba(texts) if hasattr(pipe[-1], "predict_proba") else None

            rows = {"text": texts, "pred_label": [id2label[int(i)] for i in y_ext]}
            if proba_ext is not None:
                for i in range(proba_ext.shape[1]):
                    lab = sanitize_colname(id2label[i])
                    rows[f"p_{lab}"] = proba_ext[:, i]
            ext_df = pd.DataFrame(rows)
            save_predictions_dataframe(ext_df, base_name="cg_predictions_external")
            # Append run info
            summ_path = os.path.join(OUT_DIR, "cg_metrics.json")
            try:
                with open(summ_path, "r", encoding="utf-8") as f: meta = json.load(f)
            except Exception:
                meta = {}
            meta.setdefault("external_infer_runs", []).append({
                "run_utc": datetime.utcnow().isoformat() + "Z",
                "source_file": INFER_SOURCE,
                "count": int(len(ext_df))
            })
            with open(summ_path, "w", encoding="utf-8") as f:
                json.dump(meta, f, ensure_ascii=False, indent=2)
            print(f"[INFER] Ran predictions on: {INFER_SOURCE}")

    print("=== DONE ===")
    print("Acc:", round(acc,4), "| Macro-F1:", round(f1,4))
    print("Artifacts in:", OUT_DIR)

if __name__ == "__main__":
    main()


[INFO] Auto-labeled classes (kept >= 5 samples): ['bash', 'css', 'html', 'java', 'javascript', 'matlab', 'python', 'sql', 'typescript']




[SAVE] Model: C:\Users\sagni\Downloads\Code Generator\cg_model.joblib
[SAVE] Label map: C:\Users\sagni\Downloads\Code Generator\cg_label_map.json
[OUT] Saved: C:\Users\sagni\Downloads\Code Generator\cg_predictions_test.csv
[OUT] Saved: C:\Users\sagni\Downloads\Code Generator\cg_predictions_test.json
=== DONE ===
Acc: 0.5094 | Macro-F1: 0.2407
Artifacts in: C:\Users\sagni\Downloads\Code Generator
