# IMPORTS

In [None]:
import os
import torch
import numpy as np
import pandas as pd

from sklearn.metrics.pairwise import cosine_similarity

from openai import OpenAI

from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sentence_transformers import SentenceTransformer, util

# SETUP

In [None]:
input_path = "outputs/Patunai-QwenModel.xlsx"
sheet_name = "Text"

col_manual = "Premise/Facts"
col_generated = "Generated Premise"

model_list = [
    "all-distilroberta-v1", # previous result models
    "all-MiniLM-L6-v2",

    "text-embedding-3-large", # OpenAI API models
    "text-embedding-3-small",

    "BAAI/bge-base-en-v1.5", # Explorative models
    "intfloat/e5-base-v2",
    "paraphrase-mpnet-base-v2",
    "all-mpnet-base-v2",

    "Qwen/Qwen3-Embedding-0.6b", # current model for reranking
]

df = pd.read_excel(input_path, sheet_name=sheet_name)

client = OpenAI()

# EVALUATION

## Semantic Evaluator Class

In [None]:
import torch
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer

class Evaluator:
    def __init__(self, embedding_model: str):
        self.embedding_model = embedding_model
        self.device = "cuda" if torch.cuda.is_available() else "cpu"

        print(f"Running on: {self.device}")
        print(f"Embedding model: {self.embedding_model}")

        # Cache per (backend, model, text)
        self._embedding_cache: dict[tuple[str, str, str], np.ndarray] = {}

        # Lazy-loaded HF model
        self._hf_model = None

    # -------------------------
    # Backend routing
    # -------------------------
    def _is_openai_model(self) -> bool:
        return self.embedding_model.startswith("text-embedding-3-")

    def _get_hf_model(self) -> SentenceTransformer:
        if self._hf_model is None:
            print(f"Loading SentenceTransformer ({self.embedding_model}) | ", end="")
            self._hf_model = SentenceTransformer(
                self.embedding_model,
                device=self.device
            )
        return self._hf_model

    # -------------------------
    # Embedding
    # -------------------------
    def _get_embedding(self, text: str) -> np.ndarray | None:
        if not text:
            return None

        backend = "openai" if self._is_openai_model() else "hf"
        cache_key = (backend, self.embedding_model, text)

        if cache_key in self._embedding_cache:
            return self._embedding_cache[cache_key]

        if backend == "openai":
            emb = self._get_openai_embedding(text)
        else:
            emb = self._get_hf_embedding(text)

        self._embedding_cache[cache_key] = emb
        return emb

    def _get_openai_embedding(self, text: str) -> np.ndarray:
        response = client.embeddings.create(
            model=self.embedding_model,
            input=text
        )
        return np.array(response.data[0].embedding, dtype=np.float32)

    def _get_hf_embedding(self, text: str) -> np.ndarray:
        model = self._get_hf_model()
        emb = model.encode(
            text,
            normalize_embeddings=False,
            convert_to_numpy=True
        )
        return emb.astype(np.float32)

    # -------------------------
    # Similarity
    # -------------------------
    def get_semantic_similarity(self, text1: str, text2: str) -> float:
        if not text1 or not text2:
            return 0.0

        emb1 = self._get_embedding(text1)
        emb2 = self._get_embedding(text2)

        if emb1 is None or emb2 is None:
            return 0.0

        score = cosine_similarity(
            emb1.reshape(1, -1),
            emb2.reshape(1, -1)
        )[0][0]

        return float(score)

## Evaluate Input Sheet

In [None]:
base_name = os.path.splitext(os.path.basename(input_path))[0]
output_path = f"outputs/{base_name}_similarity-scores.xlsx"

def save_df_to_excel_sheet(
    output_path: str,
    sheet_name: str,
    df: pd.DataFrame,
    index = False
):
    """
    Save a DataFrame to an Excel file, replacing only the specified sheet.
    Creates the file if it does not exist.
    """
    os.makedirs(os.path.dirname(output_path), exist_ok=True)

    mode = "a" if os.path.exists(output_path) else "w"

    with pd.ExcelWriter(
        output_path,
        engine="openpyxl",
        mode=mode,
        if_sheet_exists="replace" if mode == "a" else None
    ) as writer:
        df.to_excel(writer, sheet_name=sheet_name, index=index)

    print(f"Saved sheet '{sheet_name}' to: {output_path}")

def safe_text(val):
    if pd.isna(val) or val is None:
        return ""
    return str(val).strip()

In [None]:
print(f"Evaluating {len(df)} rows...")

# Pre-sanitize inputs once
facts_list = [safe_text(v) for v in df[col_manual]]
generated_list = [safe_text(v) for v in df[col_generated]]

for model_name in model_list:
    print(f"\nUsing model: {model_name}")
    evaluator = Evaluator(model_name)

    similarity_scores = []
    count = 1

    for facts, generated in zip(facts_list, generated_list):
        print(f"evaluating row {count} | ", end="")

        score = 0.0
        if facts and generated:
            score = evaluator.get_semantic_similarity(facts, generated)

        similarity_scores.append(score)
        print(f"{model_name}: {score}")
        count += 1

    # Column header is the model name
    df[model_name] = similarity_scores
    save_df_to_excel_sheet(output_path, sheet_name, df)

print(f"Evaluation of sheet '{sheet_name}' at {input_path} done, saved to {output_path}")

In [None]:
sheet_name_aggregate = f"{sheet_name}-aggregated"

def analyze_similarity_by_model(
    df: pd.DataFrame,
    model_cols: list[str],
    filter_col: str,
):
    """
    Returns three DataFrames with aggregate stats per model:
    1) all rows
    2) rows where filter_col == TRUE
    3) rows where filter_col == FALSE
    """

    def _aggregate(sub_df: pd.DataFrame) -> pd.DataFrame:
        n = len(sub_df)
        return pd.DataFrame({
            "mean": sub_df.mean(),
            "median": sub_df.median(),
            "% above 0.60": (sub_df > 0.60).sum() / n * 100 if n else 0,
            "% above 0.70": (sub_df > 0.70).sum() / n * 100 if n else 0,
            "% above 0.80": (sub_df > 0.80).sum() / n * 100 if n else 0,
            "std. dev": sub_df.std(),
        })
    
    

    df_all = _aggregate(df[model_cols])

    mask_true = df[filter_col].astype(str).str.strip().str.upper() == "TRUE"
    mask_false = df[filter_col].astype(str).str.strip().str.upper() == "FALSE"

    df_true = _aggregate(df[mask_true][model_cols])
    df_false = _aggregate(df[mask_false][model_cols])


    return df_all, df_true, df_false

df_all, df_true, df_false = analyze_similarity_by_model(df, model_list, "Match")

merged_df = (
    df_all.add_prefix("ALL__")
    .join(df_true.add_prefix("TRUE__"))
    .join(df_false.add_prefix("FALSE__"))
)

metrics = df_all.columns
merged_df = merged_df[
    [f"{p}__{m}" for m in metrics for p in ("ALL", "TRUE", "FALSE")]
]

save_df_to_excel_sheet(output_path, sheet_name_aggregate, merged_df, True)