In [23]:
!pip install python-Levenshtein



In [24]:
import csv
from google.colab import userdata
from io import BytesIO
import json
import os
import numpy as np
import pandas as pd
import re
import requests
from sklearn.metrics import f1_score
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from sklearn.linear_model import HuberRegressor
from sklearn.datasets import make_regression
import Levenshtein
import regex as re


from google.colab import drive
drive.mount('/content/drive')



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [27]:
import pandas as pd
import numpy as np
import json
import re
import Levenshtein
from sklearn.metrics import f1_score, mean_absolute_error


# -------------------------------------------------------------------------
# PARSE TRUTH FIELD FROM THE CSV
# -------------------------------------------------------------------------
def parse_truth_field(text):
    """
    Parse truth rows such as:
      max: 78.0, min: 12.0, range: 0.0-90.0,
      title: ..., domain: healthcare
    """

    result = {
        "maximum": None,
        "minimum": None,
        "range": None,
        "title": None,
        "domain": None
    }

    if not isinstance(text, str):
        return result

    # Extract max, min, range
    m_max   = re.search(r"max:\s*([0-9.]+)", text, flags=re.I)
    m_min   = re.search(r"min:\s*([0-9.]+)", text, flags=re.I)
    m_range = re.search(r"range:\s*([0-9.]+-[0-9.]+)", text, flags=re.I)

    if m_max:   result["maximum"] = m_max.group(1)
    if m_min:   result["minimum"] = m_min.group(1)
    if m_range: result["range"]   = m_range.group(1)

    # Full title between "title:" and ", domain:"
    m_title = re.search(r"title:\s*(.*?)(?=,\s*domain:|$)", text, flags=re.I)
    if m_title:
        result["title"] = m_title.group(1).strip()

    # Domain
    m_domain = re.search(r"domain:\s*([A-Za-z]+)", text, flags=re.I)
    if m_domain:
        result["domain"] = m_domain.group(1).strip().lower()  # truth may be normalized

    return result

# -------------------------------------------------------------------------
# PARSE PRED FIELD FROM THE CSV
# -------------------------------------------------------------------------
def parse_pred_field(text):
    """
    Parse pred rows such as:
    {
      "maximum": 77.0,
      "minimum": 12.0,
      "range": "12-77",
      "title": "...",
      "domain": "healthcare"
    }

    Pred values MUST NOT be modified.
    """

    result = {
        "maximum": None,
        "minimum": None,
        "range": None,
        "title": None,
        "domain": None
    }

    if not isinstance(text, str):
        return result

    try:
        # Convert JSON string → Python dict
        data = json.loads(text)
    except Exception:
        return result

    # Copy fields if present (DO NOT modify)
    for key in ["maximum", "minimum", "range", "title", "domain"]:
        if key in data:
            result[key] = data[key]

    return result


# -------------------------------------------------------------------------
# RANGE PARSER — TRUTH DATA MAY BE CLEANED, PRED DATA IS NEVER MODIFIED
# -------------------------------------------------------------------------
def extract_bounds(series, fix_truth=False):
    """
    Convert strings like '0-100', '9.2-9.45', '"12-77"', "'200-500'"
    into lower, upper float values.

    fix_truth = True → allow repairs (swap reversed, remove bad characters)
    fix_truth = False → pred must remain unchanged except parsing.
    """

    lowers, uppers = [], []

    for raw in series:
        s = str(raw).strip()

        # Remove wrapping quotes ONLY for parsing (not modifying values)
        s_clean = s.replace('"', '').replace("'", "").strip()
        s_clean = s_clean.replace(" ", "")

        m = re.match(r"^([0-9.]+)-([0-9.]+)$", s_clean)
        if not m:
            lowers.append(np.nan)
            uppers.append(np.nan)
            continue

        low, high = float(m.group(1)), float(m.group(2))

        if fix_truth:
            # Only adjust truth data
            if low > high:
                low, high = high, low

        lowers.append(low)
        uppers.append(high)

    return np.array(lowers), np.array(uppers)


# -------------------------------------------------------------------------
# MAIN EVALUATION FUNCTION
# -------------------------------------------------------------------------
def evaluate(csv_path, save_path, model_name="model"):

    df = pd.read_csv(csv_path)

    # ------------------------------------------------------------
    # PARSE TRUTH + PRED
    # ------------------------------------------------------------
    truth_dicts = df["truth"].apply(parse_truth_field)
    pred_dicts  = df["pred"].apply(parse_pred_field)   # never modified

    # Expand fields
    for key in ["maximum", "minimum", "range", "title", "domain"]:
        df[f"{key}_truth"] = truth_dicts.apply(lambda x: x.get(key, None))
        df[f"{key}_pred"]  = pred_dicts.apply(lambda x: x.get(key, None))

    # ------------------------------------------------------------
    # Convert numeric truth/pred fields
    # ------------------------------------------------------------
    def to_float(x):
        try:
            return float(str(x).replace(",", "").strip())
        except:
            return np.nan

    df["max_truth"] = df["maximum_truth"].apply(to_float)
    df["min_truth"] = df["minimum_truth"].apply(to_float)
    df["max_pred"]  = df["maximum_pred"].apply(to_float)
    df["min_pred"]  = df["minimum_pred"].apply(to_float)

    # ------------------------------------------------------------
    # DOMAIN — lowercasing truth, pred unchanged
    # ------------------------------------------------------------
    df["domain_truth"] = df["domain_truth"].astype(str).str.lower()
    df["domain_pred"]  = df["domain_pred"].astype(str).str.lower()

    # ------------------------------------------------------------
    # RANGE EXTRACTION
    # truth → cleaned (fix_truth=True)
    # pred → parsed only, NOT modified (fix_truth=False)
    # ------------------------------------------------------------
    lower_truth, upper_truth = extract_bounds(df["range_truth"], fix_truth=True)
    lower_pred,  upper_pred  = extract_bounds(df["range_pred"],  fix_truth=False)

    df["lower_truth"] = lower_truth
    df["upper_truth"] = upper_truth
    df["lower_pred"]  = lower_pred
    df["upper_pred"]  = upper_pred

    # true range size for normalization
    df["true_range_size"] = df["upper_truth"] - df["lower_truth"]

    # ------------------------------------------------------------
    # Filter rows valid for sMAPE without changing any logic
    # ------------------------------------------------------------
    df_valid = df[
        (df["true_range_size"] > 0) &
        df["max_truth"].notna() & df["max_pred"].notna() &
        df["min_truth"].notna() & df["min_pred"].notna() &
        df["lower_truth"].notna() & df["upper_truth"].notna() &
        df["lower_pred"].notna()  & df["upper_pred"].notna()
    ].copy()

    if len(df_valid) == 0:
        print("⚠ No valid rows for sMAPE computation!")
        return

    # ------------------------------------------------------------
    # ORIGINAL sMAPE LOGIC (UNMODIFIED)
    # ------------------------------------------------------------
    def sMAPE(actual, pred, denom):
        num = np.abs(pred - actual)
        return (
            np.mean(num / denom),
            np.mean((num / denom) ** 2)
        )

    # ------------------------------------------------------------
    # TITLE METRICS
    # ------------------------------------------------------------
    titles_true = df_valid["title_truth"].astype(str).tolist()
    titles_pred = df_valid["title_pred"].astype(str).tolist()

    n = len(df_valid)
    title_lev_distance = np.mean([
        Levenshtein.distance(titles_true[i], titles_pred[i]) for i in range(n)
    ])
    title_lev_similarity = np.mean([
        Levenshtein.ratio(titles_true[i], titles_pred[i]) for i in range(n)
    ])

    # ------------------------------------------------------------
    # NUMERIC METRICS
    # ------------------------------------------------------------
    max_s, max_s_sq = sMAPE(df_valid["max_truth"], df_valid["max_pred"], df_valid["true_range_size"])
    min_s, min_s_sq = sMAPE(df_valid["min_truth"], df_valid["min_pred"], df_valid["true_range_size"])
    low_s, low_s_sq = sMAPE(df_valid["lower_truth"], df_valid["lower_pred"], df_valid["true_range_size"])
    up_s, up_s_sq   = sMAPE(df_valid["upper_truth"], df_valid["upper_pred"], df_valid["true_range_size"])

    # ------------------------------------------------------------
    # DOMAIN F1 SCORE
    # ------------------------------------------------------------
    f1_domain = f1_score(df_valid["domain_truth"], df_valid["domain_pred"], average="weighted")

    # ------------------------------------------------------------
    # BUILD RESULTS
    # ------------------------------------------------------------
    results = {
        "file_name": model_name,
        "f1_domain": f1_domain,
        "title_lev_distance": title_lev_distance,
        "title_lev_similarity": title_lev_similarity,
        "max_s_mape": max_s,
        "min_s_mape": min_s,
        "lower_s_mape": low_s,
        "upper_s_mape": up_s,
        "max_s_mape_sq": max_s_sq,
        "min_s_mape_sq": min_s_sq,
        "lower_s_mape_sq": low_s_sq,
        "upper_s_mape_sq": up_s_sq,
    }

    # ------------------------------------------------------------
    # SAVE RESULT
    # ------------------------------------------------------------
    result_df = pd.DataFrame([results])

    try:
        previous = pd.read_csv(save_path)

        # Remove older row with same model_name
        previous = previous[previous["file_name"] != model_name]

        # Append new results
        previous = pd.concat([previous, result_df], ignore_index=True)

        previous.to_csv(save_path, index=False)

    except FileNotFoundError:
        result_df.to_csv(save_path, index=False)

    print("✅ Evaluation complete — scores saved to:", save_path)

In [28]:
CSV_PATH = "/content/drive/MyDrive/dl-project/results/ibm-granite/generated_output_4.csv"
SAVE_PATH = "/content/drive/MyDrive/dl-project/results/ibm-granite/generated_output_4_analysis.csv"

evaluate(CSV_PATH, SAVE_PATH, model_name="ibm-granite")


✅ Evaluation complete — scores saved to: /content/drive/MyDrive/dl-project/results/ibm-granite/generated_output_4_analysis.csv


In [None]:
TARGET_DIR = "/content/drive/MyDrive/dl-project"
os.makedirs(TARGET_DIR, exist_ok=True)
PROJECT_ROOT = os.path.join(TARGET_DIR, "models")


In [None]:
PROJECT_ROOT = os.path.join(TARGET_DIR, "models")

for dir_name in os.listdir(PROJECT_ROOT):
    dir_path = os.path.join(PROJECT_ROOT, dir_name)

    if not os.path.isdir(dir_path):
        continue

    for file_name in os.listdir(dir_path):
        if "join" in file_name and file_name.endswith(".csv"):
            file_path = os.path.join(dir_path, file_name)
            print(f"Opening: {file_path}")

            # file_path =

            # --- Read the original CSV ---
            with open(file_path, "r", newline="") as csvfile:
                reader = csv.DictReader(csvfile)
                rows = []
                for row in reader:
                    # Remove commas before converting to float
                    max_inf_float = float(row["max_inf"].replace(",", "")) if row["max_inf"] else None
                    max_truth_float = float(row["max_truth"].replace(",", "")) if row["max_truth"] else None
                    min_inf_float = float(row["min_inf"].replace(",", "")) if row["min_inf"] else None
                    min_truth_float = float(row["min_truth"].replace(",", "")) if row["min_truth"] else None

                    row["max_corr"] = (max_inf_float == max_truth_float) if max_inf_float is not None and max_truth_float is not None else None
                    row["min_corr"] = (min_inf_float == min_truth_float) if min_inf_float is not None and min_truth_float is not None else None
                    row["range_corr"] = (row["range_inf"] == row["range_truth"])
                    row["title_corr"] = (row["title_inf"].lower() == row["title_truth"].lower())
                    row["domain_corr"] = (row["domain_inf"].lower() == row["domain_truth"].lower())
                    rows.append(row)

                # Preserve original fieldnames, ensuring corr columns exist at the end
                fieldnames = reader.fieldnames.copy()
                new_cols = ["max_corr", "min_corr", "range_corr", "title_corr", "domain_corr"]
                for col in new_cols:
                    if col not in fieldnames:
                        fieldnames.append(col)

            # --- Write updated CSV ---
            with open(file_path, "w", newline="") as csvfile:
                writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
                writer.writeheader()
                writer.writerows(rows)

Opening: /content/drive/MyDrive/dl-project/models/qwen2_5/qwen2_5_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/gemma3-4b/gemma3-4b_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/gemma-3-12b/gemma-3-12b_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/gemma-3-27b/gemma-3-27b_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/mistralai-3.2/mistralai-3.2_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/mistralai-3.1/mistralai-3.1_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/qwen_2b_instruct/qwen_2b_instruct_benchmark_results_join.csv
Opening: /content/drive/MyDrive/dl-project/models/granite_31_2b/granite_31_2b_benchmark_results_join.csv


In [None]:
def parse_row(text):
    """Parse a single text row like 'max: 78.0, min: 12.0, range: 0.0-90.0, title: ..., domain: ...'"""
    out = {}

    # max
    m = re.search(r"max:\s*([0-9.]+)", text)
    out["max"] = float(m.group(1)) if m else np.nan

    # min
    m = re.search(r"min:\s*([0-9.]+)", text)
    out["min"] = float(m.group(1)) if m else np.nan

    # range
    m = re.search(r"range:\s*([0-9.]+-[0-9.]+)", text)
    out["range"] = m.group(1).strip() if m else None

    # title — anything between "title:" and ", domain:"
    m = re.search(r"title:\s*(.*?),\s*domain:", text)
    out["title"] = m.group(1).strip() if m else ""

    # domain
    m = re.search(r"domain:\s*([A-Za-z]+)", text)
    out["domain"] = m.group(1).strip().lower() if m else ""

    return out


In [None]:
def evaluate(FILE_NAME):

    os.chdir(os.path.join(PROJECT_ROOT, FILE_NAME))
    df = pd.read_csv(f"{FILE_NAME}_benchmark_results_join.csv")

    # ------------------------
    # Parse truth and pred rows
    # ------------------------
    parsed_truth = df["truth"].apply(parse_row)
    parsed_pred = df["pred"].apply(parse_row)

    for key in ["max", "min", "range", "title", "domain"]:
        df[f"{key}_truth"] = parsed_truth.apply(lambda d: d[key])
        df[f"{key}_inf"]   = parsed_pred.apply(lambda d: d[key])

    # Drop incomplete rows
    df = df.dropna(subset=["max_truth", "min_truth", "range_truth", "title_truth", "domain_truth"]).reset_index(drop=True)

    # ------------------------
    # Compute metrics
    # ------------------------
    # Domain classification
    f1_domain = f1_score(df["domain_truth"], df["domain_inf"], average="weighted")

    # Title Levenshtein
    title_act = df["title_truth"].astype(str).tolist()
    title_pred = df["title_inf"].astype(str).tolist()
    n = len(df)

    lev_distance = np.mean([Levenshtein.distance(title_act[i], title_pred[i]) for i in range(n)])
    lev_similarity = np.mean([Levenshtein.ratio(title_act[i], title_pred[i]) for i in range(n)])

    # Range parsing
    def extract_bounds(ranges):
        lows, highs = [], []
        for s in ranges:
            m = re.match(r"([0-9.]+)-([0-9.]+)", str(s))
            if m:
                lows.append(float(m.group(1)))
                highs.append(float(m.group(2)))
            else:
                lows.append(np.nan)
                highs.append(np.nan)
        return np.array(lows), np.array(highs)

    lower_pred, upper_pred = extract_bounds(df["range_inf"])
    lower_act, upper_act   = extract_bounds(df["range_truth"])
    range_act = upper_act - lower_act

    # sMAPE
    def sMAPE(true, pred):
        num = np.abs(pred - true)
        return np.mean(num / range_act), np.mean((num / range_act) ** 2)

    # Max/min metrics
    max_s_mape, max_s_mape_sq = sMAPE(df["max_truth"], df["max_inf"])
    min_s_mape, min_s_mape_sq = sMAPE(df["min_truth"], df["min_inf"])
    lower_s_mape, lower_s_mape_sq = sMAPE(lower_act, lower_pred)
    upper_s_mape, upper_s_mape_sq = sMAPE(upper_act, upper_pred)

    # ------------------------
    # Save results into summary file
    # ------------------------
    results_dict = {
        "file_name": FILE_NAME,
        "f1_domain": f1_domain,
        "title_lev_distance": lev_distance,
        "title_lev_similarity": lev_similarity,
        "max_s_mape": max_s_mape,
        "min_s_mape": min_s_mape,
        "lower_s_mape": lower_s_mape,
        "upper_s_mape": upper_s_mape,
        "max_s_mape_sq": max_s_mape_sq,
        "min_s_mape_sq": min_s_mape_sq,
        "lower_s_mape_sq": lower_s_mape_sq,
        "upper_s_mape_sq": upper_s_mape_sq,
    }

    analysis_path = os.path.join(TARGET_DIR, "results", "analysis.csv")
    result_df = pd.DataFrame([results_dict])

    if os.path.exists(analysis_path):
        existing = pd.read_csv(analysis_path)

        if FILE_NAME in existing["file_name"].values:
            existing.loc[existing["file_name"] == FILE_NAME] = result_df.values[0]
            existing.to_csv(analysis_path, index=False)
        else:
            pd.concat([existing, result_df], ignore_index=True).to_csv(analysis_path, index=False)
    else:
        result_df.to_csv(analysis_path, index=False)