# Installation and Imports

In [None]:
!pip install -q "transformers>=4.46.0" "accelerate>=1.1.0" peft datasets


In [None]:
%pip install -q \
    torch  \
    datasets \
    accelerate \
    numpy \
    pandas \
    scikit-learn \
    matplotlib \
    seaborn \
    wordcloud \
    emoji \
    nltk \
    shap \
    lime

In [None]:
import os, re, sys, random, unicodedata
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from IPython.display import display
from wordcloud import WordCloud

# NLP
import emoji, nltk
from nltk import word_tokenize, pos_tag
from nltk.stem import WordNetLemmatizer
from nltk.corpus import wordnet
for pkg in ["punkt","punkt_tab","averaged_perceptron_tagger","averaged_perceptron_tagger_eng","wordnet","omw-1.4"]:
    nltk.download(pkg, quiet=True)

# ML
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import PCA, TruncatedSVD
from sklearn.manifold import TSNE
from sklearn.linear_model import LogisticRegression
from sklearn.svm import LinearSVC
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import (
    classification_report, confusion_matrix,
    roc_auc_score, roc_curve,
    accuracy_score, precision_recall_fscore_support
)

from scipy.sparse import hstack, csr_matrix
# SHAP + LIME
import shap
from lime.lime_text import LimeTextExplainer
from scipy.special import softmax

# TORCH + TRANSFORMERS + PEFT
import torch
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    DataCollatorWithPadding,
    TrainingArguments,
    AutoModelForCausalLM,
    DataCollatorForLanguageModeling,
)

from peft import LoraConfig, get_peft_model, TaskType

# GLOBAL SEED
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

print(" SETUP COMPLETE")
print("PyTorch:", torch.__version__)

# Initial Data Loading and Cleaning

In [None]:
def percent_to_float(x):
    if pd.isna(x):
        return np.nan
    if isinstance(x, str):
        x = x.strip()
        if x.endswith("%"):
            x = x[:-1]
        try:
            return float(x)
        except:
            return np.nan
    try:
        return float(x)
    except:
        return np.nan


def drop_unnamed(df):
    return df.loc[:, ~df.columns.str.startswith("Unnamed:")]

def load_and_clean_raw_datasets(base_path="."):
    rmp = pd.read_csv(os.path.join(base_path,
                     "1_He_2020_RMP_v2_with_sampling_criteria.csv"),
                     encoding="latin1", low_memory=False)
    waterloo = pd.read_csv(os.path.join(base_path,
                     "2_Uwaterloo_course_reviews_with_sampling_criteria.csv"),
                     encoding="latin1", low_memory=False)
    exeter = pd.read_csv(os.path.join(base_path,
                     "3_UExeter_uni_reviews_with_sampling_criteria.csv"),
                     encoding="latin1", low_memory=False)

    print("Raw shapes:", rmp.shape, waterloo.shape, exeter.shape)

    rmp = drop_unnamed(rmp)
    waterloo = drop_unnamed(waterloo)
    exeter = drop_unnamed(exeter)

    for df in [rmp, waterloo, exeter]:
        if "num_token" in df.columns:
            df["num_token"] = pd.to_numeric(df["num_token"], errors="coerce")

    if "rating_cat" in rmp.columns:
        rmp = rmp[rmp["rating_cat"].isin(["Pos", "Neg", "Conf/Neu"])]

    if "rating_cat" in waterloo.columns:
        waterloo.loc[waterloo["rating_cat"] == "[NA]", "rating_cat"] = np.nan
        waterloo = waterloo.dropna(subset=["rating_cat"])

    for col in ["useful", "easy", "liked"]:
        if col in waterloo.columns:
            waterloo[col] = waterloo[col].apply(percent_to_float)

    rmp = rmp.reset_index(drop=True)
    waterloo = waterloo.reset_index(drop=True)
    exeter = exeter.reset_index(drop=True)

    print("After cleaning:", rmp.shape, waterloo.shape, exeter.shape)

    return rmp, waterloo, exeter


df_rmp, df_waterloo, df_exeter = load_and_clean_raw_datasets()


# Unified schema + merged df_all (teacher / course / university)

In [None]:
UNIFIED_COLS = [
    "global_id",
    "dataset",
    "entity_type",
    "source_id",
    "review_text",
    "rating",
    "rating_bin",
    "rating_cat",
    "num_token",
    "length_bin",
    "sample_criteria",
    "department",
    "teacher_name",
    "course_code",
    "course_name",
    "num_ratings",
    "num_reviews",
    "useful",
    "easy",
    "liked",
    "student_id",
    "date",
    "rating_facilities",
    "rating_clubs",
    "rating_careerService",
    "rating_internet",
]

DEFAULTS = {
    "global_id": "",
    "dataset": "",
    "entity_type": "",
    "source_id": "",
    "review_text": "",
    "rating": 0.0,
    "rating_bin": 0,
    "rating_cat": "unknown",
    "num_token": 0,
    "length_bin": 0,
    "sample_criteria": "unknown",
    "department": "",
    "teacher_name": "",
    "course_code": "",
    "course_name": "",
    "num_ratings": 0,
    "num_reviews": 0,
    "useful": 0.0,
    "easy": 0.0,
    "liked": 0.0,
    "student_id": "",
    "date": "",
    "rating_facilities": 0.0,
    "rating_clubs": 0.0,
    "rating_careerService": 0.0,
    "rating_internet": 0.0,
}


def fill_missing_cols(df, cols, defaults):
    for c in cols:
        if c not in df.columns:
            df[c] = defaults.get(c, "")
    return df[cols]


def build_unified_df(df_rmp, df_waterloo, df_exeter):
    # RMP (teacher)
    rmp_u = pd.DataFrame(index=df_rmp.index)
    rmp_u["dataset"] = "rmp"
    rmp_u["entity_type"] = "teacher"
    rmp_u["source_id"] = df_rmp["id"]
    rmp_u["global_id"] = "rmp_" + df_rmp["id"].astype(str)
    rmp_u["review_text"] = df_rmp["review_text"]
    rmp_u["rating"] = df_rmp["rating"]
    rmp_u["rating_bin"] = df_rmp["rating_bin"]
    rmp_u["rating_cat"] = df_rmp["rating_cat"]
    rmp_u["num_token"] = df_rmp["num_token"]
    rmp_u["length_bin"] = df_rmp["length_bin"]
    rmp_u["sample_criteria"] = df_rmp["sample_criteria"]
    rmp_u["department"] = df_rmp["department"]
    rmp_u["teacher_name"] = df_rmp["teacher_name"]
    rmp_u = fill_missing_cols(rmp_u, UNIFIED_COLS, DEFAULTS)

    # Waterloo (course)
    waterloo_u = pd.DataFrame(index=df_waterloo.index)
    waterloo_u["dataset"] = "waterloo"
    waterloo_u["entity_type"] = "course"
    waterloo_u["source_id"] = df_waterloo["id"]
    waterloo_u["global_id"] = "waterloo_" + df_waterloo["id"].astype(str)
    waterloo_u["review_text"] = df_waterloo["review_text"]
    waterloo_u["rating"] = df_waterloo["rating"]
    waterloo_u["rating_bin"] = df_waterloo["rating_bin"]
    waterloo_u["rating_cat"] = df_waterloo["rating_cat"]
    waterloo_u["num_token"] = df_waterloo.get("num_token", 0)
    waterloo_u["length_bin"] = df_waterloo.get("length_bin", 0)
    waterloo_u["sample_criteria"] = df_waterloo.get("sample_criteria", "unknown")
    waterloo_u["course_code"] = df_waterloo.get("course_code", "")
    waterloo_u["course_name"] = df_waterloo.get("course_id", "")
    waterloo_u["num_ratings"] = df_waterloo.get("num_ratings", 0)
    waterloo_u["num_reviews"] = df_waterloo.get("num_reviews", 0)
    waterloo_u["useful"] = df_waterloo.get("useful", 0.0)
    waterloo_u["easy"] = df_waterloo.get("easy", 0.0)
    waterloo_u["liked"] = df_waterloo.get("liked", 0.0)
    waterloo_u = fill_missing_cols(waterloo_u, UNIFIED_COLS, DEFAULTS)

    # Exeter (university)
    exeter_u = pd.DataFrame(index=df_exeter.index)
    exeter_u["dataset"] = "exeter"
    exeter_u["entity_type"] = "university"
    exeter_u["source_id"] = df_exeter["id"]
    exeter_u["global_id"] = "exeter_" + df_exeter["id"].astype(str)
    exeter_u["review_text"] = df_exeter["review_text"]
    exeter_u["rating"] = df_exeter["rating"]
    exeter_u["rating_bin"] = df_exeter["rating_bin"]
    exeter_u["rating_cat"] = df_exeter["rating_cat"]
    exeter_u["num_token"] = df_exeter.get("num_token", 0)
    exeter_u["length_bin"] = df_exeter.get("length_bin", 0)
    exeter_u["sample_criteria"] = df_exeter.get("sample_criteria", "unknown")
    exeter_u["student_id"] = df_exeter.get("student_id", "")
    exeter_u["date"] = df_exeter.get("date", "")
    exeter_u["rating_facilities"] = df_exeter.get("rating_facilities", 0.0)
    exeter_u["rating_clubs"] = df_exeter.get("rating_clubs", 0.0)
    exeter_u["rating_careerService"] = df_exeter.get("rating_careerService", 0.0)
    exeter_u["rating_internet"] = df_exeter.get("rating_internet", 0.0)
    exeter_u = fill_missing_cols(exeter_u, UNIFIED_COLS, DEFAULTS)

    df_all = pd.concat([rmp_u, waterloo_u, exeter_u], ignore_index=True)
    df_all = df_all[df_all["rating_cat"].isin(["Neg", "Conf/Neu", "Pos"])].reset_index(drop=True)

    df_all["dataset"] = df_all["dataset"].astype("category")
    df_all["entity_type"] = df_all["entity_type"].astype("category")

    print("Unified shape:", df_all.shape)
    print("\nDatasets:")
    print(df_all["dataset"].value_counts())
    print("\nEntity types:")
    print(df_all["entity_type"].value_counts())
    print("\nLabels:")
    print(df_all["rating_cat"].value_counts())

    return df_all
df_unified = build_unified_df(df_rmp, df_waterloo, df_exeter)
unified_path = "df_unified_raw_clean.csv"
df_unified.to_csv(unified_path, index=False)
print(f"\nSaved unified dataset to: {unified_path}")

In [None]:
# Unified Dataset Summary Block
df_unified["useful"] = df_unified["useful"].fillna(0.0)
df_unified["easy"] = df_unified["easy"].fillna(0.0)
df_unified["student_id"] = df_unified["student_id"].fillna("")

df_all = df_unified.copy()

print("\n===== SHAPE =====")
print(df_all.shape)

print("\n===== COLUMNS =====")
print(df_all.columns.tolist())

print("\n===== DATA TYPES =====")
print(df_all.dtypes)

print("\n===== HEAD =====")
display(df_all.head())

print("\n===== TAIL =====")
display(df_all.tail())

print("\n===== NULL VALUES =====")
print(df_all.isna().sum())

print("\n===== BASIC DESCRIPTIVE STATS =====")
display(df_all.describe(include="all"))

print("\n===== VALUE COUNTS: dataset =====")
print(df_all["dataset"].value_counts())

print("\n===== VALUE COUNTS: entity_type =====")
print(df_all["entity_type"].value_counts())

print("\n===== VALUE COUNTS: rating_cat =====")
print(df_all["rating_cat"].value_counts())

print("\n===== TEXT LENGTH STATS =====")
df_all["review_length"] = df_all["review_text"].astype(str).apply(len)
print(df_all["review_length"].describe())

print("\n===== TOKEN COUNT STATS =====")
print(df_all["num_token"].describe())

print("\n===== DUPLICATE CHECK =====")
print("Duplicates:", df_all.duplicated().sum())

print("\n===== MEMORY USAGE =====")
df_all.info(memory_usage="deep")


# Advanced Data Cleaning

In [None]:
# Strong pre-clean
def strong_preclean(text):
    if pd.isna(text):
        return ""
    text = str(text)
    text = unicodedata.normalize("NFKC", text)

    text = re.sub(r"#NAME\?", " ", text, flags=re.IGNORECASE)
    text = emoji.demojize(text, delimiters=(" ", " "))

    text = text.replace("â€™", "'").replace("`", "'")
    text = text.replace("'", " ")

    text = re.sub(r"([!?]){2,}", r"\1", text)
    text = re.sub(r"\.{2,}", ".", text)
    text = re.sub(r"[\r\n\t]+", " ", text)
    text = re.sub(r"[^A-Za-z0-9\s\.\,\!\?\:\;\-]", " ", text)

    text = re.sub(r"\bdon\s+t\b", "dont",  text, flags=re.IGNORECASE)
    text = re.sub(r"\bdidn\s+t\b", "didnt", text, flags=re.IGNORECASE)
    text = re.sub(r"\bcan\s+t\b", "cant",  text, flags=re.IGNORECASE)
    text = re.sub(r"\bwon\s+t\b", "wont",  text, flags=re.IGNORECASE)
    text = re.sub(r"\bisn\s+t\b", "isnt",  text, flags=re.IGNORECASE)
    text = re.sub(r"\baren\s+t\b", "arent", text, flags=re.IGNORECASE)
    text = re.sub(r"\bshouldn\s+t\b", "shouldnt", text, flags=re.IGNORECASE)

    text = re.sub(r"\s+", " ", text).strip()
    return text

print("Running strong_preclean on review_text...")
df_all["review_text_pre"] = df_all["review_text"].apply(strong_preclean)

print("\nSample pre-cleaned rows:")
display(df_all[["review_text", "review_text_pre"]].head(5))


In [None]:
# Advanced cleaning
lemmatizer = WordNetLemmatizer()

NEGATIONS = {
    "not", "no", "never", "n't",
    "dont", "didnt", "cant", "cannot", "wont"
}

TEACHER_PAT = re.compile(
    r"\b(prof|professor|dr|mr|mrs|ms)\.?\s+[a-z][a-z]+\b",
    flags=re.IGNORECASE
)

def get_wordnet_pos(tag: str):
    if tag.startswith("J"): return wordnet.ADJ
    if tag.startswith("V"): return wordnet.VERB
    if tag.startswith("N"): return wordnet.NOUN
    if tag.startswith("R"): return wordnet.ADV
    return wordnet.NOUN

def advanced_clean(text: str) -> str:
    if pd.isna(text) or not str(text).strip():
        return ""

    text = str(text).lower()
    text = TEACHER_PAT.sub(" teacher ", text)

    tokens = word_tokenize(text)

    out_tokens = []
    neg_scope = 0
    NEG_SCOPE_LEN = 3

    for tok in tokens:
        if tok in NEGATIONS:
            out_tokens.append(tok)
            neg_scope = NEG_SCOPE_LEN
            continue

        if neg_scope > 0 and tok.isalpha():
            out_tokens.append("neg_" + tok)
            neg_scope -= 1
        else:
            out_tokens.append(tok)

        if tok in [".", "!", "?"]:
            neg_scope = 0

    pos_tags = pos_tag(out_tokens)

    clean_tokens = []
    for tok, tag in pos_tags:
        prefix = ""
        base = tok

        if tok.startswith("neg_"):
            prefix = "neg_"
            base = tok[4:]

        if not base.isalpha():
            continue

        lemma = lemmatizer.lemmatize(base, pos=get_wordnet_pos(tag))
        if len(lemma) <= 1:
            continue

        clean_tokens.append(prefix + lemma)

    return " ".join(clean_tokens)

print("Cleaning with advanced_clean...")
df_all["text_clean"] = df_all["review_text_pre"].apply(advanced_clean)
df_all["clean_len"] = df_all["text_clean"].str.split().str.len()

before = len(df_all)
df_all = df_all[df_all["text_clean"].str.strip().str.len() > 0].reset_index(drop=True)
after = len(df_all)

print(f"\nRows before cleaning: {before}")
print(f"Rows after cleaning:  {after}")
print("\nSample cleaned rows:")
display(df_all[["review_text", "review_text_pre", "text_clean"]].head(10))
print("\nLabel distribution after cleaning:")
print(df_all["rating_cat"].value_counts())


In [None]:
!pip install openpyxl
# Save cleaned dataset
df_all.to_csv("df_unified_clean_advanced.csv", index=False)
df_all.to_excel("df_unified_clean_advanced.xlsx", index=False)
print("\nSaved cleaned dataset to df_unified_clean_advanced.csv and .xlsx")

from google.colab import files
files.download("df_unified_clean_advanced.csv")

In [None]:
from io import StringIO
import sys
def full_data_report(df, name="DATASET", save_path="full_report.txt"):
    buffer = StringIO()
    stdout_original = sys.stdout
    sys.stdout = buffer

    print("="*70)
    print(f"FULL DATA REPORT â€” {name}")
    print("="*70)

    # 1. Basic Structure
    print("\nSHAPE:")
    print(df.shape)

    print("\nCOLUMNS:")
    print(df.columns.tolist())

    print("\nDATA TYPES:")
    print(df.dtypes)

    print("\nMEMORY USAGE (deep):")
    print(df.memory_usage(deep=True))

    # 2. Preview
    print("\nHEAD:")
    print(df.head(5))

    print("\nTAIL:")
    print(df.tail(5))

    # 3. Summary Stats
    num_cols = df.select_dtypes(include=["int64", "float64"]).columns
    print("\nDESCRIPTIVE STATS â€” NUMERIC:")
    print(df[num_cols].describe().T if len(num_cols) else "No numeric columns.")

    cat_cols = df.select_dtypes(include="object").columns
    print("\nDESCRIPTIVE STATS â€” CATEGORICAL:")
    print(df[cat_cols].describe().T if len(cat_cols) else "No object columns.")

    # 4. Missing Values
    print("\nMISSING VALUES:")
    missing = df.isna().sum()
    print(missing[missing > 0] if missing.sum() > 0 else "No missing values detected.")

    print("\nEMPTY STRINGS:")
    empty = (df.select_dtypes(include="object") == "").sum()
    empty = empty[empty > 0]
    print(empty if len(empty) else "No empty strings detected.")

    # 5. Duplicates
    print("\nDUPLICATE ROWS:")
    print(df.duplicated().sum())
    if "global_id" in df.columns:
        print("Duplicate global_id:", df["global_id"].duplicated().sum())

    # 6. Key Value Counts
    inspect_cols = ["dataset", "entity_type", "rating_cat"]
    print("\nKEY VALUE DISTRIBUTIONS:")
    for col in inspect_cols:
        if col in df.columns:
            print(f"\n{col}:")
            print(df[col].value_counts())

    # 7. Text Analysis
    if "review_text" in df.columns:
        print("\nREVIEW LENGTH STATS:")
        print(df["review_text"].str.len().describe())

    if "text_clean" in df.columns:
        print("\nCLEANED TEXT LENGTH STATS:")
        print(df["text_clean"].str.split().str.len().describe())

    # 8. Non-ASCII Check
    print("\nNON-ASCII CHARACTER COUNTS:")
    non_ascii = df.apply(lambda col: col.apply(
        lambda x: any(ord(c) > 127 for c in str(x))) if col.dtype == "object" else None
    )
    non_ascii_counts = non_ascii.sum()
    non_ascii_counts = non_ascii_counts[non_ascii_counts > 0]
    print(non_ascii_counts if len(non_ascii_counts) else "No non-ASCII characters found.")

    # 9. Quantiles
    if len(num_cols) > 0:
        print("\nQUANTILES (0.01 to 0.99):")
        print(df[num_cols].quantile([0.01, 0.25, 0.5, 0.75, 0.99]))

    print("\n" + "="*70)
    print("REPORT COMPLETE")
    print("="*70)

    # Restore print output to notebook
    sys.stdout = stdout_original
    report_text = buffer.getvalue()
    with open(save_path, "w", encoding="utf-8") as f:
        f.write(report_text)
    print(report_text)

    print(f"\nReport saved to: {save_path}")
full_data_report(df_all, name="Unified Cleaned Dataset", save_path="Unified_Cleaned_Report.txt")

In [None]:
# Rename final cleaned DataFrame
final_cleaned_unified = df_all.copy()

print("Final cleaned dataset ready:")
print("Variable name: final_cleaned_unified")
print("Shape:", final_cleaned_unified.shape)
print("Label counts:")
print(final_cleaned_unified["rating_cat"].value_counts())

# **Visual** **EDA**

In [None]:
from wordcloud import WordCloud
from sklearn.feature_extraction.text import TfidfVectorizer, CountVectorizer
from sklearn.decomposition import TruncatedSVD
from sklearn.manifold import TSNE
from nltk import word_tokenize, pos_tag
df = final_cleaned_unified.copy()
sns.set(style="whitegrid")



## 1. Basic structure + label / dataset / entity distributions

In [None]:
# Label distribution
plt.figure(figsize=(6,4))
sns.countplot(data=df, x="rating_cat", order=["Neg","Conf/Neu","Pos"])
plt.title("Rating Category Distribution")
plt.xlabel("Label")
plt.ylabel("Count")
plt.show()

# Dataset Ã— label
plt.figure(figsize=(8,5))
sns.countplot(data=df, x="dataset", hue="rating_cat",
              hue_order=["Neg","Conf/Neu","Pos"])
plt.title("Sentiment Distribution per Dataset")
plt.xlabel("Dataset")
plt.ylabel("Count")
plt.legend(title="Label")
plt.show()

# Entity type Ã— label
plt.figure(figsize=(8,5))
sns.countplot(data=df, x="entity_type", hue="rating_cat",
              hue_order=["Neg","Conf/Neu","Pos"])
plt.title("Sentiment Distribution per Entity Type")
plt.xlabel("Entity Type")
plt.ylabel("Count")
plt.legend(title="Label")
plt.show()


## 2. Text length

In [None]:
# Histogram of cleaned length
plt.figure(figsize=(8,4))
sns.histplot(df["clean_len"], bins=60, kde=True)
plt.title("Distribution of Cleaned Text Length (Tokens)")
plt.xlabel("Token Count")
plt.ylabel("Frequency")
plt.show()

# Boxplot by dataset
plt.figure(figsize=(8,5))
sns.boxplot(data=df, x="dataset", y="clean_len")
plt.title("Cleaned Review Length Across Datasets")
plt.xlabel("Dataset")
plt.ylabel("Token Count")
plt.show()

# Boxplot by label
plt.figure(figsize=(6,4))
sns.boxplot(data=df, x="rating_cat", y="clean_len",
            order=["Neg","Conf/Neu","Pos"])
plt.title("Text Length by Sentiment Label")
plt.xlabel("Label")
plt.ylabel("Cleaned Token Count")
plt.show()


## 3. Correlation heatmap of numeric features

In [None]:
num_cols = [
    "rating","num_token","num_ratings","num_reviews",
    "useful","easy","liked",
    "rating_facilities","rating_clubs","rating_careerService",
    "rating_internet","clean_len"
]
corr = df[num_cols].corr()
plt.figure(figsize=(10,9), dpi=120)
sns.heatmap(
    corr, annot=True, fmt=".2f",
    cmap="coolwarm", vmin=-1, vmax=1,
    linewidths=0.5, cbar_kws={'shrink':0.8}
)
plt.title("Correlation Heatmap of Numeric Features", fontsize=16)
plt.xticks(rotation=45, ha="right")
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()

## 4. WordClouds (global + by label + by entity)

In [None]:
# Global WC
all_text = " ".join(df["text_clean"].tolist())
wc = WordCloud(width=1400, height=800,
               background_color="white",
               max_words=300, collocations=False).generate(all_text)

plt.figure(figsize=(12,8))
plt.imshow(wc, interpolation="bilinear")
plt.axis("off")
plt.title("WordCloud â€” All Cleaned Reviews")
plt.show()


In [None]:
# Label-wise WC
for lab in ["Pos", "Neg", "Conf/Neu"]:
    subset_text = " ".join(df[df["rating_cat"] == lab]["text_clean"])
    wc = WordCloud(width=1600, height=900,
                   background_color="white",
                   max_words=200, collocations=False).generate(subset_text)
    plt.figure(figsize=(14,8))
    plt.imshow(wc, interpolation="bilinear")
    plt.axis("off")
    plt.title(f"WordCloud â€” {lab} Reviews")
    plt.show()


In [None]:
# Entity-wise WC
for ent in ["teacher", "course", "university"]:
    subset_text = " ".join(df[df["entity_type"] == ent]["text_clean"])
    wc = WordCloud(width=1600, height=900,
                   background_color="white",
                   max_words=200, collocations=False).generate(subset_text)
    plt.figure(figsize=(14,8))
    plt.imshow(wc, interpolation="bilinear")
    plt.axis("off")
    plt.title(f"WordCloud â€” {ent.capitalize()} Reviews")
    plt.show()


## 5. N-grams (bigrams/trigrams) â€” helper + plots

In [None]:
from sklearn.feature_extraction.text import CountVectorizer

def get_top_ngrams(texts, ngram_range=(2,2), top_n=20, min_df=5):
    vec = CountVectorizer(ngram_range=ngram_range, min_df=min_df)
    X = vec.fit_transform(texts)
    counts = X.sum(axis=0).A1
    vocab = vec.get_feature_names_out()
    df_ng = pd.DataFrame({"ngram": vocab, "count": counts})
    return df_ng.sort_values("count", ascending=False).head(top_n)

def plot_ngrams(df_ng, title):
    plt.figure(figsize=(10,6))
    sns.barplot(data=df_ng, x="count", y="ngram", color="#F0D98C")
    plt.title(title)
    plt.xlabel("Frequency")
    plt.ylabel("N-gram")
    plt.tight_layout()
    plt.show()


###Label-wise bigrams & trigrams

In [None]:
for lab in ["Pos", "Neg", "Conf/Neu"]:
    sub = df[df["rating_cat"] == lab]["text_clean"]
    bi = get_top_ngrams(sub, (2,2), 20)
    tri = get_top_ngrams(sub, (3,3), 20)

    plot_ngrams(bi,  f"Top 20 Bigrams â€” {lab}")
    plot_ngrams(tri, f"Top 20 Trigrams â€” {lab}")


## 6 Lexical richness & complexity

In [None]:
def lexical_stats(group_df):
    tokens = group_df["text_clean"].str.split()
    n_tokens = tokens.apply(len).sum()
    vocab = set(w for row in tokens for w in row)
    n_types = len(vocab)
    ttr = n_types / n_tokens if n_tokens > 0 else 0
    return pd.Series({
        "n_docs": len(group_df),
        "tokens": n_tokens,
        "types": n_types,
        "TTR": ttr,
        "avg_len": group_df["clean_len"].mean()
    })


In [None]:
# By label
lex_by_label = df.groupby("rating_cat").apply(lexical_stats)
print("\nLexical stats by label:")
display(lex_by_label)

# By dataset
lex_by_dataset = df.groupby("dataset").apply(lexical_stats)
print("\nLexical stats by dataset:")
display(lex_by_dataset)


##7. POS tag distribution (per label)

In [None]:
def get_pos_counts(text, max_tokens=100):
    tokens = word_tokenize(text)[:max_tokens]
    tags = [tag for _, tag in pos_tag(tokens)]
    return pd.Series(tags).value_counts()

def pos_distribution(df_subset, sample_size=2000):
    # sample to keep it fast
    sample = df_subset.sample(min(len(df_subset), sample_size), random_state=42)
    pos_counts = sample["text_clean"].apply(get_pos_counts).fillna(0)
    return pos_counts.sum().sort_values(ascending=False)
pos_by_label = {}
for lab in ["Pos", "Neg", "Conf/Neu"]:
    pos_by_label[lab] = pos_distribution(df[df["rating_cat"] == lab])

pos_df = pd.DataFrame(pos_by_label).fillna(0)
print(pos_df.head(15))  # top POS tags
# Visualise a few key tags (e.g., adjectives JJ, verbs VB/VBD/VBZ, adverbs RB)
key_tags = ["JJ","JJR","JJS","VB","VBD","VBG","VBN","VBP","VBZ","RB","RBR","RBS"]
subset = pos_df.loc[key_tags].T  # rows=labels, cols=tags

subset.plot(kind="bar", figsize=(10,5))
plt.title("POS Distribution (selected tags) by Label")
plt.xlabel("Label")
plt.ylabel("Count (sampled)")
plt.xticks(rotation=0)
plt.tight_layout()
plt.show()


## 8. Embedding-based visualisation (TF-IDF + SVD + t-SNE)

In [None]:
sample = df.sample(4000, random_state=42)
texts = sample["text_clean"].tolist()
labels = sample["rating_cat"].tolist()

tfidf = TfidfVectorizer(max_features=5000, min_df=5)
X = tfidf.fit_transform(texts)

svd = TruncatedSVD(n_components=50, random_state=42)
X_reduced = svd.fit_transform(X)

tsne = TSNE(n_components=2, perplexity=40, random_state=42, n_iter=2000)
X_tsne = tsne.fit_transform(X_reduced)

sample["tsne_x"] = X_tsne[:,0]
sample["tsne_y"] = X_tsne[:,1]

plt.figure(figsize=(8,6))
sns.scatterplot(data=sample, x="tsne_x", y="tsne_y",
                hue="rating_cat", hue_order=["Neg","Conf/Neu","Pos"],
                alpha=0.6, s=20)
plt.title("t-SNE of Review Representations (TF-IDF)")
plt.xlabel("")
plt.ylabel("")
plt.legend(title="Label")
plt.tight_layout()
plt.show()


## 9. Outlier check (very long reviews)

In [None]:
q99 = df["clean_len"].quantile(0.99)
print("99th percentile of clean_len:", q99)

outliers = df[df["clean_len"] > q99]
print("Number of outlier reviews:", len(outliers))

# Quick peek
display(outliers[["dataset","entity_type","rating_cat","clean_len","review_text"]].head())


In [None]:
import seaborn as sns

plt.figure(figsize=(10, 6))
sns.countplot(data=final_cleaned_unified, x="dataset", hue="rating_cat")
plt.title("Sentiment Distribution per Dataset")
plt.xlabel("Dataset")
plt.ylabel("Count")
plt.legend(title="Label")
plt.show()


## 10. Topic Modelling using LDA (BoW)

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.decomposition import LatentDirichletAllocation
from nltk.corpus import stopwords
import nltk
nltk.download("stopwords")

# Stopwords
stop_words = set(stopwords.words("english"))

# Add domain-specific stopwords
domain_stopwords = {
    "class", "course", "teacher", "professor", "lecture",
    "exam", "assignment", "student", "work", "quiz",
    "midterm", "final", "mark", "grade"
}

stop_words = stop_words.union(domain_stopwords)

# Convert set â†’ list for CountVectorizer
stop_words = list(stop_words)

# Vectorizer
vectorizer = CountVectorizer(
    max_df=0.90,
    min_df=50,
    stop_words=stop_words
)

X = vectorizer.fit_transform(final_cleaned_unified["text_clean"])

# LDA Model
lda = LatentDirichletAllocation(
    n_components=6,
    learning_method="batch",
    random_state=42
)
lda.fit(X)

# Display topics
terms = vectorizer.get_feature_names_out()

for idx, topic in enumerate(lda.components_):
    print(f"\n TOPIC {idx+1}")
    print([terms[i] for i in topic.argsort()[-15:]])


## 11. Keyword Importance per Emotion (Chi-Square)

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.feature_selection import chi2
vec = CountVectorizer(min_df=20)
X = vec.fit_transform(final_cleaned_unified["text_clean"])
y = final_cleaned_unified["rating_cat"]

chi2_scores, p = chi2(X, y)
scores = pd.DataFrame({"term": vec.get_feature_names_out(), "score": chi2_scores})
top_scores = scores.sort_values("score", ascending=False)

print(top_scores.head(30))


## 12. Emotion Trend by Review Length

In [None]:
sns.boxplot(data=final_cleaned_unified, x="rating_cat", y="clean_len")
plt.title("Text Length by Sentiment Label")
plt.xlabel("Label")
plt.ylabel("Cleaned Token Count")
plt.show()

# **Data Preprocessing**

## Feature engineering on final_cleaned_unified â†’ df_fe

In [None]:
df_fe = final_cleaned_unified.copy().reset_index(drop=True)

label_order = ["Neg", "Conf/Neu", "Pos"]
label2id = {lab: i for i, lab in enumerate(label_order)}
id2label = {i: lab for lab, i in label2id.items()}

df_fe = df_fe[df_fe["rating_cat"].isin(label_order)].reset_index(drop=True)
df_fe["label_id"] = df_fe["rating_cat"].map(label2id).astype(int)

df_fe["clean_len"] = df_fe["text_clean"].str.split().str.len()

def neg_ratio(text):
    toks = str(text).split()
    if not toks:
        return 0.0
    return sum(1 for t in toks if t.startswith("neg_")) / len(toks)

df_fe["neg_ratio"] = df_fe["text_clean"].apply(neg_ratio)
df_fe["exclam_count"] = df_fe["review_text_pre"].str.count("!")
df_fe["quest_count"]  = df_fe["review_text_pre"].str.count(r"\?")

df_fe = pd.get_dummies(
    df_fe,
    columns=["dataset", "entity_type"],
    drop_first=True
)

extra_feat_cols = [
    "clean_len",
    "neg_ratio",
    "exclam_count",
    "quest_count",
    "dataset_rmp",
    "dataset_waterloo",
    "entity_type_teacher",
    "entity_type_university",
]
extra_feat_cols = [c for c in extra_feat_cols if c in df_fe.columns]

print("Feature-engineering base shape:", df_fe.shape)
print("Extra numeric features:", extra_feat_cols)
print("Label mapping:", label2id)
print(df_fe["rating_cat"].value_counts())
df_fe[extra_feat_cols].describe()

## Stratified train/val/test split (70/15/15) + class weights from train

In [None]:
y_all = df_fe["label_id"].values
indices = np.arange(len(df_fe))

train_idx, temp_idx, y_train, y_temp = train_test_split(
    indices,
    y_all,
    test_size=0.30,
    stratify=y_all,
    random_state=RANDOM_SEED,
)

val_idx, test_idx, y_val, y_test = train_test_split(
    temp_idx,
    y_temp,
    test_size=0.50,
    stratify=y_temp,
    random_state=RANDOM_SEED,
)

print("Train size:", len(train_idx))
print("Val size  :", len(val_idx))
print("Test size :", len(test_idx))

classes = np.unique(y_train)
weights = compute_class_weight(
    class_weight="balanced",
    classes=classes,
    y=y_train,
)
class_weight_dict = {int(c): float(w) for c, w in zip(classes, weights)}

print("Class weights (train only):", class_weight_dict)
print("id2label:", id2label)


## TF-IDF

In [None]:
X_train_text = df_fe.loc[train_idx, "text_clean"].astype(str)
X_val_text   = df_fe.loc[val_idx,   "text_clean"].astype(str)
X_test_text  = df_fe.loc[test_idx,  "text_clean"].astype(str)

tfidf_vectorizer = TfidfVectorizer(
    max_features=40000,
    ngram_range=(1, 2),
    min_df=5,
    max_df=0.9,
)

X_train_tfidf = tfidf_vectorizer.fit_transform(X_train_text)
X_val_tfidf   = tfidf_vectorizer.transform(X_val_text)
X_test_tfidf  = tfidf_vectorizer.transform(X_test_text)

print("TF-IDF shapes:")
print("  Train:", X_train_tfidf.shape)
print("  Val  :", X_val_tfidf.shape)
print("  Test :", X_test_tfidf.shape)


## Scale numeric features (train only) + combine with TF-IDF

In [None]:
X_meta_train = df_fe.loc[train_idx, extra_feat_cols].astype(float).values
X_meta_val   = df_fe.loc[val_idx,   extra_feat_cols].astype(float).values
X_meta_test  = df_fe.loc[test_idx,  extra_feat_cols].astype(float).values

X_meta_train = np.nan_to_num(X_meta_train, nan=0.0)
X_meta_val   = np.nan_to_num(X_meta_val,   nan=0.0)
X_meta_test  = np.nan_to_num(X_meta_test,  nan=0.0)

scaler = StandardScaler()
X_meta_train_scaled = scaler.fit_transform(X_meta_train)
X_meta_val_scaled   = scaler.transform(X_meta_val)
X_meta_test_scaled  = scaler.transform(X_meta_test)

X_meta_train_sp = csr_matrix(X_meta_train_scaled)
X_meta_val_sp   = csr_matrix(X_meta_val_scaled)
X_meta_test_sp  = csr_matrix(X_meta_test_scaled)

X_train_combined = hstack([X_train_tfidf, X_meta_train_sp])
X_val_combined   = hstack([X_val_tfidf,   X_meta_val_sp])
X_test_combined  = hstack([X_test_tfidf,  X_meta_test_sp])

print("Combined feature shapes:")
print("  Train:", X_train_combined.shape)
print("  Val  :", X_val_combined.shape)
print("  Test :", X_test_combined.shape)


#**Baseline Models**

##LOGISTIC REGRESSION

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import (
    classification_report, confusion_matrix, ConfusionMatrixDisplay,
    roc_curve, auc, roc_auc_score
)

labels = ["Neg", "Conf/Neu", "Pos"]

# 1) GridSearchCV on Logistic Regression
param_grid = {"C": [0.5, 1.0, 2.0]}

log_reg = LogisticRegression(
    max_iter=2000,
    class_weight="balanced",
    n_jobs=-1,
    solver="lbfgs"
)

grid = GridSearchCV(
    estimator=log_reg,
    param_grid=param_grid,
    scoring="f1_macro",
    cv=5,
    n_jobs=-1,
    verbose=1
)

grid.fit(X_train_combined, y_train)
best_lr = grid.best_estimator_

print("\nBest Hyperparameters:", grid.best_params_)

# 2) Validation performance
y_val_pred = best_lr.predict(X_val_combined)
print("\n=== Logistic Regression (Validation) ===")
print(classification_report(y_val, y_val_pred, target_names=labels))

# 3) Test performance
y_test_pred = best_lr.predict(X_test_combined)
print("\n=== Logistic Regression (Test) ===")
print(classification_report(y_test, y_test_pred, target_names=labels))

print("\nConfusion Matrix (Test):")
cm_test = confusion_matrix(y_test, y_test_pred)
print(cm_test)

# 4) THRESHOLD TUNING for Conf/Neu (post-hoc)
val_probs = best_lr.predict_proba(X_val_combined)
test_probs = best_lr.predict_proba(X_test_combined)

def predict_with_alpha_probs(probs, alpha_conf):
    """Boost Conf/Neu probability by alpha_conf and renormalise."""
    p = probs.copy()
    p[:, 1] = p[:, 1] * alpha_conf
    p = p / p.sum(axis=1, keepdims=True)
    return np.argmax(p, axis=1)

alphas = [1.0, 1.2, 1.4, 1.6, 2.0]
best_macro, best_alpha = -1, 1.0

for a in alphas:
    y_val_adj = predict_with_alpha_probs(val_probs, a)
    y_val_onehot_pred = np.eye(3)[y_val_adj]
    macro = roc_auc_score(y_val, y_val_onehot_pred, multi_class="ovr", average="macro")
    print(f"alpha={a:.1f} â†’ macro score (approx): {macro:.4f}")
    if macro > best_macro:
        best_macro = macro
        best_alpha = a

print("\nBest alpha:", best_alpha, " | Best macro (approx):", best_macro)

y_val_final = predict_with_alpha_probs(val_probs, best_alpha)
y_test_final = predict_with_alpha_probs(test_probs, best_alpha)

print("\n=== Logistic Regression (Validation, tuned threshold) ===")
print(classification_report(y_val, y_val_final, target_names=labels))

print("\n=== Logistic Regression (Test, tuned threshold) ===")
print(classification_report(y_test, y_test_final, target_names=labels))

# 5) Plot Confusion Matrix (Test, tuned)
plt.figure(figsize=(6, 5))
disp = ConfusionMatrixDisplay(confusion_matrix(y_test, y_test_final), display_labels=labels)
disp.plot(cmap="Blues", values_format="d")
plt.title("Logistic Regression - Confusion Matrix (Test, tuned)")
plt.show()

# 6) ROC curves (multiclass, one-vs-rest, untuned probs)
y_test_bin = np.eye(3)[y_test]
y_test_prob = best_lr.predict_proba(X_test_combined)

plt.figure(figsize=(8, 6))
for i in range(3):
    fpr, tpr, _ = roc_curve(y_test_bin[:, i], y_test_prob[:, i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, label=f"{labels[i]} (AUC={roc_auc:.3f})")

plt.plot([0, 1], [0, 1], "k--")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Logistic Regression - ROC (Test)")
plt.legend()
plt.grid(True)
plt.show()

macro_auc = roc_auc_score(y_test_bin, y_test_prob, average="macro")
print("\nMacro ROC-AUC (Test):", macro_auc)


In [None]:
from scipy.sparse import hstack, csr_matrix
example_texts = [
    "The lectures are poorly structured, I rarely understand the topics and the feedback is always unclear. Overall, the module is frustrating and overwhelming.",
    "The experience with the module is neutral. It does not feel positive or negative, and the overall impression remains evenly balanced without any strong sentiment.",
    "The lecturer explains everything clearly, the examples are helpful, and the content is extremely engaging. I genuinely enjoy attending this module every week."
]

def basic_clean(text):
    return text.lower().strip()

example_clean = [basic_clean(t) for t in example_texts]

# 1) TF-IDF features for the examples
X_example_tfidf = tfidf_vectorizer.transform(example_clean)

# 2) Match the feature size of X_train_combined used to train best_lr
n_features_model = best_lr.n_features_in_
n_features_tfidf = X_example_tfidf.shape[1]
n_extra = n_features_model - n_features_tfidf

if n_extra < 0:
    raise ValueError(
        f"TF-IDF has more features ({n_features_tfidf}) than model was trained on ({n_features_model})."
    )

# Create dummy extra features (zeros) for the examples
extra_example = csr_matrix(np.zeros((X_example_tfidf.shape[0], n_extra)))

# Combine TF-IDF + extra to match X_train_combined shape
X_example_combined = hstack([X_example_tfidf, extra_example])

# 3) Predict with tuned thresholds
example_probs = best_lr.predict_proba(X_example_combined)
example_pred_idx = predict_with_alpha_probs(example_probs, best_alpha)

print("\n=== REAL-TIME PREDICTIONS (Logistic Regression) ===")
for text, idx, prob in zip(example_texts, example_pred_idx, example_probs):
    print("\nText:", text)
    print("Predicted label:", labels[idx])
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


In [None]:
# ==========================================
# REAL-TIME PREDICTIONS + RISK (Logistic Regression)
# ==========================================
import numpy as np
from scipy.sparse import hstack, csr_matrix

label_names = ["Neg", "Conf/Neu", "Pos"]

example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing. Overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk"
}

def make_combined_from_text_lr(texts):
    """TF-IDF + zero-padded extra features to match best_lr.n_features_in_"""
    X_tfidf = tfidf_vectorizer.transform(texts)
    n_model = best_lr.n_features_in_
    n_tfidf = X_tfidf.shape[1]
    n_extra = n_model - n_tfidf
    if n_extra < 0:
        raise ValueError(f"TF-IDF has {n_tfidf} features, model expects {n_model}.")
    extra = csr_matrix((X_tfidf.shape[0], n_extra))
    return hstack([X_tfidf, extra])

# TF-IDF â†’ combined â†’ LogReg
X_example_lr   = make_combined_from_text_lr(example_texts)
example_probs_lr = best_lr.predict_proba(X_example_lr)
example_pred_lr  = np.argmax(example_probs_lr, axis=1)

print("=== REAL-TIME PREDICTIONS (Logistic Regression) ===")
for text, idx, prob in zip(example_texts, example_pred_lr, example_probs_lr):
    label = label_names[idx]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk Level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


### XAI 1 â€” Logistic Regression Coefficients & Odds Ratios

In [None]:
# XAI 1 â€” Logistic Regression Coefficients & Odds Ratios
import numpy as np
import pandas as pd

# TF-IDF feature names
feature_names_tfidf = tfidf_vectorizer.get_feature_names_out()

# Align feature names with model's feature dimension
n_model = best_lr.n_features_in_
n_tfidf = len(feature_names_tfidf)

if n_model > n_tfidf:
    extra_names = np.array([f"EXTRA_FEAT_{i+1}" for i in range(n_model - n_tfidf)])
    feature_names = np.concatenate([feature_names_tfidf, extra_names])
else:
    feature_names = feature_names_tfidf[:n_model]

coefs = best_lr.coef_

coef_df_list = []
for i, class_name in enumerate(labels):
    tmp = pd.DataFrame({
        "feature": feature_names,
        "coef": coefs[i],
        "odds_ratio": np.exp(coefs[i]),
        "class": class_name
    })
    coef_df_list.append(tmp)

coef_df = pd.concat(coef_df_list, ignore_index=True)

# Show top + bottom features per class
for class_name in labels:
    print(f"Class: {class_name}")
    df_sub = coef_df[coef_df["class"] == class_name]

    print("\nTop 15 POSITIVE features (push towards this class):")
    display(df_sub.sort_values("coef", ascending=False).head(15))

    print("\nTop 15 NEGATIVE features (push away from this class):")
    display(df_sub.sort_values("coef", ascending=True).head(15))


## XA2 Logistic Regression LIME

In [None]:
# FIXED LIME (Local)
from lime.lime_text import LimeTextExplainer
from scipy.sparse import hstack, csr_matrix
import numpy as np

# shape-safe transformer
def make_combined_from_text(texts):
    X_tfidf = tfidf_vectorizer.transform(texts)

    n_model = best_lr.n_features_in_
    n_tfidf = X_tfidf.shape[1]
    n_extra = n_model - n_tfidf

    extra = csr_matrix(np.zeros((len(texts), n_extra)))
    return hstack([X_tfidf, extra])

def predict_proba_lime(texts):
    return best_lr.predict_proba(make_combined_from_text(texts))

explainer = LimeTextExplainer(class_names=labels)

example_idx = 10
text_example = X_val_text.iloc[example_idx]
true_label = labels[y_val[example_idx]]

print("TEXT:", text_example)
print("TRUE LABEL:", true_label)

exp = explainer.explain_instance(
    text_example,
    predict_proba_lime,
    num_features=10
)

exp.show_in_notebook(text=True)


##Support Vector Machine (SVM)

In [None]:
# ================================
# SVM with TF-IDF (TRAIN + EVAL)
# ================================
import numpy as np
import matplotlib.pyplot as plt

from sklearn.svm import LinearSVC
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import (
    classification_report, confusion_matrix, ConfusionMatrixDisplay,
    roc_curve, auc, roc_auc_score
)
from sklearn.preprocessing import label_binarize
from sklearn.calibration import CalibratedClassifierCV

# Label names
label_names = [id2label[i] for i in sorted(id2label.keys())]

# ------------------------------------------------------------------
# 1) Train Linear SVM on TF-IDF ONLY (no combined embeddings)
# ------------------------------------------------------------------
svm_base = LinearSVC(
    class_weight=class_weight_dict,
    random_state=RANDOM_SEED,
    max_iter=3000
)

# small grid / almost no tuning so it's fast
svm_param_grid = {"C": [1.0]}

svm_grid = GridSearchCV(
    estimator=svm_base,
    param_grid=svm_param_grid,
    scoring="f1_macro",
    cv=3,
    n_jobs=-1,
    verbose=1
)

svm_grid.fit(X_train_tfidf, y_train)
best_svm = svm_grid.best_estimator_

print("Best SVM hyperparameters:", svm_grid.best_params_)

# ------------------------------------------------------------------
# 2) Validation + Test reports
# ------------------------------------------------------------------
y_val_pred_svm  = best_svm.predict(X_val_tfidf)
y_test_pred_svm = best_svm.predict(X_test_tfidf)

print("\n=== Linear SVM (val, tuned) ===")
print(classification_report(y_val, y_val_pred_svm, target_names=label_names))

print("=== Linear SVM (test, tuned) ===")
print(classification_report(y_test, y_test_pred_svm, target_names=label_names))

# ------------------------------------------------------------------
# 3) Confusion matrix on test
# ------------------------------------------------------------------
cm_svm = confusion_matrix(y_test, y_test_pred_svm)
print("\nConfusion matrix (SVM, test):")
print(cm_svm)

plt.figure(figsize=(5, 4))
disp = ConfusionMatrixDisplay(confusion_matrix=cm_svm, display_labels=label_names)
disp.plot(cmap="Purples", values_format="d")
plt.title("Linear SVM â€“ Confusion Matrix (Test)")
plt.tight_layout()
plt.show()

# ------------------------------------------------------------------
# 4) Calibrated probabilities + ROC curves
# ------------------------------------------------------------------
svm_calibrated = CalibratedClassifierCV(best_svm, method="sigmoid", cv=3)
svm_calibrated.fit(X_train_tfidf, y_train)

y_val_proba_svm  = svm_calibrated.predict_proba(X_val_tfidf)
y_test_proba_svm = svm_calibrated.predict_proba(X_test_tfidf)

print("Probabilities shapes (val, test):", y_val_proba_svm.shape, y_test_proba_svm.shape)

# Binarize labels for ROC
y_val_bin  = label_binarize(y_val,  classes=[0, 1, 2])
y_test_bin = label_binarize(y_test, classes=[0, 1, 2])

# ROC on validation
plt.figure(figsize=(6, 5))
for i, name in enumerate(label_names):
    fpr, tpr, _ = roc_curve(y_val_bin[:, i], y_val_proba_svm[:, i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, label=f"{name} (AUC={roc_auc:.2f})")

plt.plot([0, 1], [0, 1], "k--", alpha=0.5)
plt.title("SVM (Calibrated) â€“ ROC Curve (Validation)")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.tight_layout()
plt.show()

# Macro AUC
svm_macro_auc_val  = roc_auc_score(y_val_bin,  y_val_proba_svm,  average="macro")
svm_macro_auc_test = roc_auc_score(y_test_bin, y_test_proba_svm, average="macro")

print("SVM (calibrated) macro AUC (val): ", svm_macro_auc_val)
print("SVM (calibrated) macro AUC (test):", svm_macro_auc_test)


In [None]:
# ==========================================
# REAL-TIME PREDICTIONS + RISK (Linear SVM Calibrated)
# ==========================================
example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing. Overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk"
}

# TF-IDF â†’ Calibrated SVM
X_example_svm = tfidf_vectorizer.transform(example_texts)
example_probs_svm = svm_calibrated.predict_proba(X_example_svm)
example_pred_svm = np.argmax(example_probs_svm, axis=1)

print("=== REAL-TIME PREDICTIONS (Linear SVM â€“ Calibrated) ===")
for text, idx, prob in zip(example_texts, example_pred_svm, example_probs_svm):
    label = label_names[idx]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk Level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


In [None]:
# ==========================================
# XAI 1 â€” Global Feature Importance (SVM)
# ==========================================
import pandas as pd

# Feature names from TF-IDF
feature_names = tfidf_vectorizer.get_feature_names_out()

coef = best_svm.coef_  # shape: (n_classes, n_features)

def top_features_for_class(class_idx, k=15):
    """
    Returns top positive and negative features for a given class index.
    """
    class_name = label_names[class_idx]
    w = coef[class_idx]

    # Top positive
    top_pos_idx = np.argsort(w)[-k:][::-1]
    top_pos = pd.DataFrame({
        "feature": feature_names[top_pos_idx],
        "coef": w[top_pos_idx],
        "odds_ratio": np.exp(w[top_pos_idx]),
        "class": class_name
    })

    # Top negative
    top_neg_idx = np.argsort(w)[:k]
    top_neg = pd.DataFrame({
        "feature": feature_names[top_neg_idx],
        "coef": w[top_neg_idx],
        "odds_ratio": np.exp(w[top_neg_idx]),
        "class": class_name
    })

    return top_pos, top_neg

for i, name in enumerate(label_names):
    print("\n===================================")
    print(f"ðŸ”¹ Class: {name}")
    print("===================================\n")

    pos_df, neg_df = top_features_for_class(i, k=15)

    print("Top 15 POSITIVE features (push towards this class):\n")
    display(pos_df)

    print("\nTop 15 NEGATIVE features (push away from this class):\n")
    display(neg_df)


In [None]:
# ==========================================================
# XAI 2 â€” LIME + REAL-TIME DEMO + RISK LEVEL (SVM)
# ==========================================================
from lime.lime_text import LimeTextExplainer

# 1) Prediction function for LIME (uses TF-IDF + calibrated SVM)
def predict_proba_lime(texts):
    X = tfidf_vectorizer.transform(texts)
    return svm_calibrated.predict_proba(X)

explainer_svm = LimeTextExplainer(class_names=label_names)

# Choose an example from validation set
example_idx = 10  # change index if needed
text_example = X_val_text.iloc[example_idx]

# y_val is a NumPy array
true_label = label_names[int(y_val[example_idx])]

print(f"Explaining text (index {example_idx})")
print("True label:", true_label)
print("\nText:\n", text_example)

exp_svm = explainer_svm.explain_instance(
    text_example,
    predict_proba_lime,
    num_features=10,
    labels=list(range(len(label_names)))   # <-- ask for ALL classes
)

# Print contributions per class
for class_idx, class_name in enumerate(label_names):
    print(f"\n--- LIME explanation for {class_name} ---")
    for word, weight in exp_svm.as_list(label=class_idx):
        print(f"{word:25s} {weight:+.4f}")

# Render HTML in notebook
exp_svm.show_in_notebook(text=True)


# ----------------------------------------------------------
# 2) REAL-TIME PREDICTIONS + RISK LEVEL (using the same SVM)
# ----------------------------------------------------------
example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing, overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

# Risk mapping
risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk"
}

# Predict labels + probabilities
X_example = tfidf_vectorizer.transform(example_texts)
example_probs = svm_calibrated.predict_proba(X_example)
example_pred_idx = np.argmax(example_probs, axis=1)

print("\n=== REAL-TIME PREDICTIONS WITH RISK LEVEL (Linear SVM) ===")
for text, idx, prob in zip(example_texts, example_pred_idx, example_probs):
    label = label_names[idx]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk Level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


#**DEEP LEARNING SETUP**

In [None]:
# ANN â€“ SVD features + training
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from sklearn.decomposition import TruncatedSVD

RANDOM_SEED = 42
np.random.seed(RANDOM_SEED)
tf.random.set_seed(RANDOM_SEED)

# 1) Reduce TF-IDF dimensionality for ANN
n_components = 300   # good trade-off (can change to 200/400)
svd_ann = TruncatedSVD(
    n_components=n_components,
    random_state=RANDOM_SEED
)

X_train_ann = svd_ann.fit_transform(X_train_tfidf)
X_val_ann   = svd_ann.transform(X_val_tfidf)
X_test_ann  = svd_ann.transform(X_test_tfidf)

print("SVD shapes:")
print("  Train:", X_train_ann.shape)
print("  Val  :", X_val_ann.shape)
print("  Test :", X_test_ann.shape)

num_features = n_components
num_classes  = 3

# 2) Build a compact ANN (fast & light)
ann_model = models.Sequential([
    layers.Input(shape=(num_features,)),
    layers.Dense(256, activation='relu'),
    layers.Dropout(0.3),
    layers.Dense(128, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(num_classes, activation='softmax')
])

ann_model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy"]
)

ann_model.summary()

# 3) Train
callback = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=1,
    restore_best_weights=True
)

history_ann = ann_model.fit(
    X_train_ann, y_train,
    validation_data=(X_val_ann, y_val),
    epochs=6,
    batch_size=256,
    callbacks=[callback],
    verbose=1
)


In [None]:
# ANN â€“ EVAL + LIME + RISK DEMO
import matplotlib.pyplot as plt
from sklearn.metrics import (
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay,
    roc_curve, auc, roc_auc_score
)
from sklearn.preprocessing import label_binarize
from lime.lime_text import LimeTextExplainer

label_names = ["Neg", "Conf/Neu", "Pos"]

# 1) Predictions (val + test)
y_val_proba_ann  = ann_model.predict(X_val_ann)
y_test_proba_ann = ann_model.predict(X_test_ann)

y_val_pred_ann  = np.argmax(y_val_proba_ann, axis=1)
y_test_pred_ann = np.argmax(y_test_proba_ann, axis=1)

print("=== ANN (val) ===")
print(classification_report(y_val, y_val_pred_ann, target_names=label_names))

print("=== ANN (test) ===")
print(classification_report(y_test, y_test_pred_ann, target_names=label_names))

# 2) Confusion matrix (test)
cm_ann = confusion_matrix(y_test, y_test_pred_ann)
print("\nConfusion matrix (ANN, test):")
print(cm_ann)

plt.figure(figsize=(5, 4))
disp = ConfusionMatrixDisplay(confusion_matrix=cm_ann, display_labels=label_names)
disp.plot(cmap="Greens", values_format="d")
plt.title("ANN â€“ Confusion Matrix (Test)")
plt.tight_layout()
plt.show()

# 3) ROC (macro AUC)
y_val_bin  = label_binarize(y_val,  classes=[0, 1, 2])
y_test_bin = label_binarize(y_test, classes=[0, 1, 2])

plt.figure(figsize=(6, 5))
for i, name in enumerate(label_names):
    fpr, tpr, _ = roc_curve(y_val_bin[:, i], y_val_proba_ann[:, i])
    roc_auc = auc(fpr, tpr)
    plt.plot(fpr, tpr, label=f"{name} (AUC={roc_auc:.2f})")

plt.plot([0, 1], [0, 1], "k--", alpha=0.5)
plt.title("ANN â€“ ROC Curve (Validation)")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend()
plt.tight_layout()
plt.show()

ann_macro_auc_val  = roc_auc_score(y_val_bin,  y_val_proba_ann,  average="macro")
ann_macro_auc_test = roc_auc_score(y_test_bin, y_test_proba_ann, average="macro")
print("ANN macro AUC (val): ", ann_macro_auc_val)
print("ANN macro AUC (test):", ann_macro_auc_test)

# 4) LIME for ANN â€“ local explanation on one review
def predict_proba_ann_lime(texts):
    X_tfidf = tfidf_vectorizer.transform(texts)
    X_svd   = svd_ann.transform(X_tfidf)
    return ann_model.predict(X_svd)

explainer_ann = LimeTextExplainer(class_names=label_names)

example_idx = 10
text_example = X_val_text.iloc[example_idx]
true_label   = label_names[int(y_val[example_idx])]

print(f"\nExplaining text (index {example_idx})")
print("True label:", true_label)
print("\nText:\n", text_example)

exp_ann = explainer_ann.explain_instance(
    text_example,
    predict_proba_ann_lime,
    num_features=10
)

exp_ann.show_in_notebook(text=True)

# 5) REAL-TIME DEMO + RISK
example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing, overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk"
}

X_example_tfidf = tfidf_vectorizer.transform(example_texts)
X_example_ann   = svd_ann.transform(X_example_tfidf)
example_probs   = ann_model.predict(X_example_ann)
example_pred    = np.argmax(example_probs, axis=1)

print("\n=== REAL-TIME PREDICTIONS WITH RISK LEVEL (ANN) ===")
for text, idx, prob in zip(example_texts, example_pred, example_probs):
    label = label_names[idx]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk Level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


In [None]:
# ANN â€“ REAL-TIME PREDICTIONS + RISK LEVEL
import numpy as np

label_names = ["Neg", "Conf/Neu", "Pos"]

risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk",
}

# Example raw reviews (change these for viva)
example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing, overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

def basic_clean(t):
    return t.lower().strip()

# 1) Text â†’ TF-IDF â†’ SVD features (same pipeline as training)
example_clean = [basic_clean(t) for t in example_texts]
X_ex_tfidf = tfidf_vectorizer.transform(example_clean)
X_ex_ann   = svd_ann.transform(X_ex_tfidf)

# 2) ANN probabilities + predicted labels
probs_ann = ann_model.predict(X_ex_ann)
pred_idx  = np.argmax(probs_ann, axis=1)

print("\n=== REAL-TIME PREDICTIONS WITH RISK LEVEL (ANN) ===")
for text, idx, p in zip(example_texts, pred_idx, probs_ann):
    label = label_names[idx]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk Level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(p, 3))


In [None]:
# TRAINING (BiLSTM)

import numpy as np
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

# Reuse random seed + splits + label dicts you already created earlier
tf.random.set_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

# Text splits (same as classical models)
X_train_text = df_fe.loc[train_idx, "text_clean"].astype(str).tolist()
X_val_text   = df_fe.loc[val_idx,   "text_clean"].astype(str).tolist()
X_test_text  = df_fe.loc[test_idx,  "text_clean"].astype(str).tolist()

y_train_dl = y_train.copy()
y_val_dl   = y_val.copy()
y_test_dl  = y_test.copy()

# Tokenisation / padding
MAX_WORDS = 30000
MAX_LEN   = 150

tokenizer = Tokenizer(num_words=MAX_WORDS, oov_token="<UNK>")
tokenizer.fit_on_texts(X_train_text)

X_train_seq = tokenizer.texts_to_sequences(X_train_text)
X_val_seq   = tokenizer.texts_to_sequences(X_val_text)
X_test_seq  = tokenizer.texts_to_sequences(X_test_text)

X_train_pad = pad_sequences(X_train_seq, maxlen=MAX_LEN, padding="post", truncating="post")
X_val_pad   = pad_sequences(X_val_seq,   maxlen=MAX_LEN, padding="post", truncating="post")
X_test_pad  = pad_sequences(X_test_seq,  maxlen=MAX_LEN, padding="post", truncating="post")

vocab_size = min(MAX_WORDS, len(tokenizer.word_index) + 1)
print("Vocab size:", vocab_size)
print("Train seq shape:", X_train_pad.shape)
print("Val seq shape  :", X_val_pad.shape)
print("Test seq shape :", X_test_pad.shape)
print("id2label:", id2label)
print("class_weight_dict:", class_weight_dict)

# ---------- BiLSTM model ----------

def build_bilstm_model(vocab_size, max_len, n_classes=3, emb_dim=128, lstm_units=64):
    model = models.Sequential()
    model.add(layers.Embedding(
        input_dim=vocab_size,
        output_dim=emb_dim,
        input_shape=(max_len,)      # avoids "unbuilt" summary + deprecation warning
    ))
    model.add(layers.Bidirectional(layers.LSTM(lstm_units, return_sequences=True)))
    model.add(layers.GlobalMaxPooling1D())
    model.add(layers.Dense(64, activation="relu"))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(n_classes, activation="softmax"))

    model.compile(
        loss="sparse_categorical_crossentropy",
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
        metrics=["accuracy"],
    )
    return model

bilstm_model = build_bilstm_model(
    vocab_size=vocab_size,
    max_len=MAX_LEN,
    n_classes=len(label2id)
)

# build explicitly so summary shows params
bilstm_model.build(input_shape=(None, MAX_LEN))
bilstm_model.summary()

bilstm_ckpt = "bilstm_best.keras"

es = EarlyStopping(
    monitor="val_loss",
    patience=3,
    restore_best_weights=True,
    verbose=1,
)

mc = ModelCheckpoint(
    filepath=bilstm_ckpt,
    monitor="val_loss",
    save_best_only=True,
    verbose=1,
)

history_bilstm = bilstm_model.fit(
    X_train_pad,
    y_train_dl,
    validation_data=(X_val_pad, y_val_dl),
    epochs=10,
    batch_size=128,
    class_weight=class_weight_dict,
    callbacks=[es, mc],
    verbose=1,
)


In [None]:
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.preprocessing import label_binarize

# reload best BiLSTM weights
bilstm_model.load_weights(bilstm_ckpt)

label_names = [id2label[i] for i in range(len(label2id))]

# ===== Validation =====
y_val_probs_bilstm = bilstm_model.predict(X_val_pad, batch_size=256, verbose=1)
y_val_pred_bilstm  = np.argmax(y_val_probs_bilstm, axis=1)

print("=== BiLSTM â€“ Validation ===")
print(classification_report(y_val_dl, y_val_pred_bilstm, target_names=label_names))

cm_val_bilstm = confusion_matrix(y_val_dl, y_val_pred_bilstm)
plt.figure(figsize=(5,4))
sns.heatmap(cm_val_bilstm, annot=True, fmt="d",
            xticklabels=label_names, yticklabels=label_names)
plt.title("BiLSTM â€“ Validation Confusion Matrix")
plt.xlabel("Predicted"); plt.ylabel("True")
plt.tight_layout(); plt.show()

# ===== Test =====
y_test_probs_bilstm = bilstm_model.predict(X_test_pad, batch_size=256, verbose=1)
y_test_pred_bilstm  = np.argmax(y_test_probs_bilstm, axis=1)

print("\n=== BiLSTM â€“ Test ===")
print(classification_report(y_test_dl, y_test_pred_bilstm, target_names=label_names))

cm_test_bilstm = confusion_matrix(y_test_dl, y_test_pred_bilstm)
plt.figure(figsize=(5,4))
sns.heatmap(cm_test_bilstm, annot=True, fmt="d",
            xticklabels=label_names, yticklabels=label_names)
plt.title("BiLSTM â€“ Test Confusion Matrix")
plt.xlabel("Predicted"); plt.ylabel("True")
plt.tight_layout(); plt.show()

# ===== ROC Curve (multiclass, test set) =====
y_test_true = y_test_dl
y_test_bin  = label_binarize(y_test_true, classes=[0,1,2])
fpr, tpr, roc_auc = {}, {}, {}

for i in [0,1,2]:
    fpr[i], tpr[i], _ = roc_curve(y_test_bin[:, i], y_test_probs_bilstm[:, i])
    roc_auc[i] = auc(fpr[i], tpr[i])

all_fpr = np.unique(np.concatenate([fpr[i] for i in [0,1,2]]))
mean_tpr = np.zeros_like(all_fpr)
for i in [0,1,2]:
    mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
mean_tpr /= 3
roc_auc_macro = auc(all_fpr, mean_tpr)

plt.figure(figsize=(8,6))
colors = ["red", "orange", "green"]
for i, c in zip([0,1,2], colors):
    plt.plot(fpr[i], tpr[i], c, lw=2,
             label=f"{id2label[i]} AUC={roc_auc[i]:.3f}")

plt.plot(all_fpr, mean_tpr, "b--", lw=2,
         label=f"Macro AUC={roc_auc_macro:.3f}")
plt.plot([0,1],[0,1],"k--")
plt.xlim([0,1]); plt.ylim([0,1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("ROC Curve â€” BiLSTM (Multiclass)")
plt.legend(loc="lower right")
plt.grid(alpha=0.3)
plt.show()


In [None]:
# BiLSTM â€” LIME TEXT XAI + REAL-TIME PREDICTIONS + RISK
from lime.lime_text import LimeTextExplainer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np

label_names = ["Neg", "Conf/Neu", "Pos"]
risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk",
}

MAX_LEN = 150  # same as used in your BiLSTM training

# ---------- 1) LIME wrapper for BiLSTM ----------
def bilstm_predict_proba(texts):
    seqs = tokenizer.texts_to_sequences(texts)
    pads = pad_sequences(seqs, maxlen=MAX_LEN, padding="post", truncating="post")
    return bilstm_model.predict(pads)

explainer_bilstm = LimeTextExplainer(class_names=label_names)

# Pick one validation example to explain
example_idx = 10  # change if you want
y_val_array = np.array(y_val_dl)
text_example = X_val_text[example_idx]
true_label  = label_names[int(y_val_array[example_idx])]

print(f"=== BiLSTM â€” LIME EXPLANATION (val index {example_idx}) ===")
print("True label:", true_label)
print("\nText:\n", text_example)

exp_bilstm = explainer_bilstm.explain_instance(
    text_example,
    bilstm_predict_proba,
    num_features=10
)

# Nice HTML view (with highlighted tokens)
exp_bilstm.show_in_notebook(text=True)

# ---------- 2) REAL-TIME PREDICTIONS + RISK (BiLSTM) ----------
example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing, overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

seqs_ex = tokenizer.texts_to_sequences(example_texts)
pads_ex = pad_sequences(seqs_ex, maxlen=MAX_LEN, padding="post", truncating="post")
probs_ex = bilstm_model.predict(pads_ex)
pred_idx = np.argmax(probs_ex, axis=1)

print("\n=== REAL-TIME PREDICTIONS + RISK (BiLSTM) ===")
for text, idx, prob in zip(example_texts, pred_idx, probs_ex):
    label = label_names[int(idx)]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


In [None]:
#  CNN MODEL: BUILD + TRAIN

import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint

tf.random.set_seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

n_classes = len(id2label)

def build_cnn_model(vocab_size, max_len, n_classes=3,
                    emb_dim=128, num_filters=128, kernel_size=3):
    model = models.Sequential()
    model.add(layers.Embedding(
        input_dim=vocab_size,
        output_dim=emb_dim,
        input_shape=(max_len,)
    ))
    model.add(layers.Conv1D(
        filters=num_filters,
        kernel_size=kernel_size,
        activation="relu"
    ))
    model.add(layers.GlobalMaxPooling1D())
    model.add(layers.Dense(64, activation="relu"))
    model.add(layers.Dropout(0.3))
    model.add(layers.Dense(n_classes, activation="softmax"))

    model.compile(
        loss="sparse_categorical_crossentropy",
        optimizer=tf.keras.optimizers.Adam(learning_rate=1e-3),
        metrics=["accuracy"],
    )
    return model

cnn_model = build_cnn_model(
    vocab_size=vocab_size,
    max_len=MAX_LEN,
    n_classes=n_classes
)

cnn_model.build(input_shape=(None, MAX_LEN))
cnn_model.summary()

cnn_ckpt = "cnn_best.keras"

es_cnn = EarlyStopping(
    monitor="val_loss",
    patience=3,
    restore_best_weights=True,
    verbose=1,
)

mc_cnn = ModelCheckpoint(
    filepath=cnn_ckpt,
    monitor="val_loss",
    save_best_only=True,
    verbose=1,
)

history_cnn = cnn_model.fit(
    X_train_pad,
    y_train_dl,
    validation_data=(X_val_pad, y_val_dl),
    epochs=10,
    batch_size=128,
    class_weight=class_weight_dict,
    callbacks=[es_cnn, mc_cnn],
    verbose=1,
)


In [None]:
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
from sklearn.preprocessing import label_binarize

# Validation evaluation
y_val_probs_cnn = cnn_model.predict(X_val_pad, batch_size=256, verbose=1)
y_val_pred_cnn = np.argmax(y_val_probs_cnn, axis=1)

print("=== CNN â€“ Validation ===")
print(classification_report(y_val_dl, y_val_pred_cnn, target_names=[id2label[0], id2label[1], id2label[2]]))

cm_val = confusion_matrix(y_val_dl, y_val_pred_cnn)
plt.figure(figsize=(5,4))
sns.heatmap(cm_val, annot=True, fmt="d",
            xticklabels=[id2label[0],id2label[1],id2label[2]],
            yticklabels=[id2label[0],id2label[1],id2label[2]])
plt.title("CNN â€“ Validation Confusion Matrix")
plt.xlabel("Predicted"); plt.ylabel("True")
plt.tight_layout(); plt.show()

# Test evaluation
y_test_probs_cnn = cnn_model.predict(X_test_pad, batch_size=256, verbose=1)
y_test_pred_cnn = np.argmax(y_test_probs_cnn, axis=1)

print("\n=== CNN â€“ Test ===")
print(classification_report(y_test_dl, y_test_pred_cnn, target_names=[id2label[0],id2label[1],id2label[2]]))

cm_test = confusion_matrix(y_test_dl, y_test_pred_cnn)
plt.figure(figsize=(5,4))
sns.heatmap(cm_test, annot=True, fmt="d",
            xticklabels=[id2label[0],id2label[1],id2label[2]],
            yticklabels=[id2label[0],id2label[1],id2label[2]])
plt.title("CNN â€“ Test Confusion Matrix")
plt.xlabel("Predicted"); plt.ylabel("True")
plt.tight_layout(); plt.show()

# ROC Curve
y_bin = label_binarize(y_test_dl, classes=[0,1,2])
fpr, tpr, roc_auc = {}, {}, {}

for i in [0,1,2]:
    fpr[i], tpr[i], _ = roc_curve(y_bin[:,i], y_test_probs_cnn[:,i])
    roc_auc[i] = auc(fpr[i], tpr[i])

all_fpr = np.unique(np.concatenate([fpr[i] for i in [0,1,2]]))
mean_tpr = np.zeros_like(all_fpr)
for i in [0,1,2]:
    mean_tpr += np.interp(all_fpr, fpr[i], tpr[i])
mean_tpr /= 3
roc_auc_macro = auc(all_fpr, mean_tpr)

plt.figure(figsize=(8,6))
colors = ["red","orange","green"]
labels = [id2label[0], id2label[1], id2label[2]]

for i, c in zip([0,1,2], colors):
    plt.plot(fpr[i], tpr[i], c, lw=2, label=f"{labels[i]} AUC={roc_auc[i]:.3f}")

plt.plot(all_fpr, mean_tpr, "b--", lw=2, label=f"Macro AUC={roc_auc_macro:.3f}")
plt.plot([0,1],[0,1],"k--"); plt.xlim([0,1]); plt.ylim([0,1.05])
plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
plt.title("ROC Curve â€” CNN (Multiclass)")
plt.legend(loc="lower right"); plt.grid(alpha=0.3)
plt.show()


In [None]:
# CNN â€” LIME TEXT XAI + REAL-TIME PREDICTIONS + RISK
from lime.lime_text import LimeTextExplainer
from tensorflow.keras.preprocessing.sequence import pad_sequences
import numpy as np

label_names = ["Neg", "Conf/Neu", "Pos"]
risk_map = {
    "Neg": "High Risk",
    "Conf/Neu": "Medium Risk",
    "Pos": "Low Risk",
}

MAX_LEN = 150  # same as used in CNN training

# ---------- 1) LIME wrapper for CNN ----------
def cnn_predict_proba(texts):
    seqs = tokenizer.texts_to_sequences(texts)
    pads = pad_sequences(seqs, maxlen=MAX_LEN, padding="post", truncating="post")
    return cnn_model.predict(pads)

explainer_cnn = LimeTextExplainer(class_names=label_names)

# Pick one validation example to explain
example_idx = 10  # change if you want
y_val_array = np.array(y_val_dl)
text_example = X_val_text[example_idx]
true_label  = label_names[int(y_val_array[example_idx])]

print(f"=== CNN â€” LIME EXPLANATION (val index {example_idx}) ===")
print("True label:", true_label)
print("\nText:\n", text_example)

exp_cnn = explainer_cnn.explain_instance(
    text_example,
    cnn_predict_proba,
    num_features=10
)

exp_cnn.show_in_notebook(text=True)

# ---------- 2) REAL-TIME PREDICTIONS + RISK (CNN) ----------
example_texts = [
    "The lectures are poorly structured and I rarely understand the topics. Overall, this module feels frustrating.",
    "Some weeks are clear, some are confusing, overall it's okay but I am not fully confident.",
    "The lecturer is amazing, explains everything clearly and the module is very engaging."
]

seqs_ex = tokenizer.texts_to_sequences(example_texts)
pads_ex = pad_sequences(seqs_ex, maxlen=MAX_LEN, padding="post", truncating="post")
probs_ex = cnn_model.predict(pads_ex)
pred_idx = np.argmax(probs_ex, axis=1)

print("\n=== REAL-TIME PREDICTIONS + RISK (CNN) ===")
for text, idx, prob in zip(example_texts, pred_idx, probs_ex):
    label = label_names[int(idx)]
    risk  = risk_map[label]
    print("\nText:", text)
    print("Predicted label:", label)
    print("Risk level:", risk)
    print("Class probabilities [Neg, Conf/Neu, Pos]:", np.round(prob, 3))


# **DistilBERT**

In [None]:
!pip install -q transformers datasets scikit-learn

import numpy as np
import torch
import matplotlib.pyplot as plt

from datasets import Dataset, DatasetDict
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    DataCollatorWithPadding,
    TrainingArguments,
    Trainer
)
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    classification_report,
    confusion_matrix,
    ConfusionMatrixDisplay
)

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# 1) Build HF datasets (subsampled for speed)
X_train_list = list(X_train_text)
X_val_list   = list(X_val_text)
X_test_list  = list(X_test_text)

y_train_list = [int(x) for x in y_train]
y_val_list   = [int(x) for x in y_val]
y_test_list  = [int(x) for x in y_test]

train_n = min(12000, len(X_train_list))
val_n   = min(3000, len(X_val_list))
test_n  = min(3000, len(X_test_list))

train_ds = Dataset.from_dict({
    "text":  X_train_list[:train_n],
    "label": y_train_list[:train_n]
})
val_ds = Dataset.from_dict({
    "text":  X_val_list[:val_n],
    "label": y_val_list[:val_n]
})
test_ds = Dataset.from_dict({
    "text":  X_test_list[:test_n],
    "label": y_test_list[:test_n]
})

raw_datasets = DatasetDict({
    "train": train_ds,
    "validation": val_ds,
    "test": test_ds
})

print(f"Train: {len(train_ds)}, Val: {len(val_ds)}, Test: {len(test_ds)}")

# 2) Tokenisation
model_name = "distilbert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize_fn(batch):
    return tokenizer(
        batch["text"],
        truncation=True,
        padding=False,
        max_length=128,
    )

tokenized_datasets = raw_datasets.map(tokenize_fn, batched=True)
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

# 3) Model + metrics + Trainer
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    preds = np.argmax(logits, axis=-1)
    return {
        "accuracy": accuracy_score(labels, preds),
        "macro_f1": f1_score(labels, preds, average="macro"),
        "weighted_f1": f1_score(labels, preds, average="weighted"),
    }

model = AutoModelForSequenceClassification.from_pretrained(
    model_name,
    num_labels=3,
)

training_args = TrainingArguments(
    output_dir="distilbert-final",
    num_train_epochs=1,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    learning_rate=2e-5,
    weight_decay=0.01,
    logging_steps=100,
    save_steps=5000,
    fp16=(device == "cuda"),
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

# 4) Train
print("\n=== Training DistilBERT (final) ===")
trainer.train()

In [None]:
# 5) Validation + Test evaluation
# -------------------------
print("\n=== DistilBERT â€“ Validation metrics ===")
val_metrics = trainer.evaluate(tokenized_datasets["validation"])
print(val_metrics)

print("\n=== DistilBERT â€“ Test metrics ===")
test_metrics = trainer.evaluate(tokenized_datasets["test"])
print(test_metrics)

# Detailed classification report on test set
print("\n=== DistilBERT â€“ Test classification report ===")
pred_test = trainer.predict(tokenized_datasets["test"])
y_test_pred = np.argmax(pred_test.predictions, axis=-1)
print(classification_report(y_test_list[:test_n], y_test_pred, target_names=labels))

# Confusion matrix
cm = confusion_matrix(y_test_list[:test_n], y_test_pred)
plt.figure(figsize=(6, 5))
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=labels)
disp.plot(cmap="Blues", values_format="d")
plt.title("DistilBERT â€“ Confusion Matrix (Test)")
plt.show()

In [None]:
# SHAP XAI for DistilBERT â€“ word-level contribution + predictions

!pip install -q shap

import shap
import numpy as np

model.to(device)
model.eval()

labels = ["Neg", "Conf/Neu", "Pos"]

# 1) Prediction function for SHAP
def distilbert_predict_proba(text_list):
    enc = tokenizer(
        list(text_list),
        truncation=True,
        padding=True,
        max_length=128,
        return_tensors="pt"
    ).to(device)

    with torch.no_grad():
        out = model(**enc)
        probs = torch.softmax(out.logits, dim=-1).cpu().numpy()
    return probs

# 2) SHAP text explainer
masker = shap.maskers.Text(tokenizer)
explainer = shap.Explainer(distilbert_predict_proba, masker, output_names=labels)

# 3) Pick 3 test examples: one Neg, one Conf/Neu, one Pos (if available)
y_test_arr = np.array(y_test_list[:test_n])
X_test_arr = np.array(X_test_list[:test_n])

def first_index_of_class(c):
    idxs = np.where(y_test_arr == c)[0]
    return int(idxs[0]) if len(idxs) > 0 else None

idx_neg  = first_index_of_class(0)
idx_conf = first_index_of_class(1)
idx_pos  = first_index_of_class(2)

example_indices = [i for i in [idx_neg, idx_conf, idx_pos] if i is not None]
example_texts   = [X_test_arr[i] for i in example_indices]
example_true    = [labels[y_test_arr[i]] for i in example_indices]

# 4) Get actual model predictions for these examples
probs = distilbert_predict_proba(example_texts)
pred_ids = probs.argmax(axis=1)
pred_labels = [labels[i] for i in pred_ids]

print("=== DistilBERT predictions on selected test examples ===")
for i, txt in enumerate(example_texts):
    print(f"\nExample {i+1}")
    print("-" * 40)
    print("Text        :", txt)
    print("True label  :", example_true[i])
    print("Pred label  :", pred_labels[i])
    print("Probabilities (Neg, Conf/Neu, Pos):",
          np.round(probs[i], 3))

# 5) SHAP explanations (word-level heatmaps)
print("\nGenerating SHAP explanations (this may take ~1â€“2 minutes)...")
shap_values = explainer(example_texts, max_evals=500)

for i, txt in enumerate(example_texts):
    print(f"\n=== SHAP text explanation for Example {i+1} "
          f"(true: {example_true[i]}, pred: {pred_labels[i]}) ===")
    shap.plots.text(shap_values[i])


In [None]:
# DistilBERT â€“ Risk mapping helper + example predictions

labels = ["Neg", "Conf/Neu", "Pos"]
id2label = {i: lab for i, lab in enumerate(labels)}
risk_map = {
    "Neg": "High risk",
    "Conf/Neu": "Medium risk",
    "Pos": "Low risk",
}

def distilbert_predict_with_risk(text):
    enc = tokenizer(
        [text],
        truncation=True,
        padding=True,
        max_length=128,
        return_tensors="pt"
    ).to(device)

    with torch.no_grad():
        out = model(**enc)
        probs = torch.softmax(out.logits, dim=-1).cpu().numpy()[0]

    pred_id = int(np.argmax(probs))
    pred_label = id2label[pred_id]
    pred_risk = risk_map[pred_label]

    return {
        "text": text,
        "label": pred_label,
        "risk": pred_risk,
        "probs": dict(zip(labels, np.round(probs, 3)))
    }

# Example texts (you can swap with real ones)
examples = [
    "I felt completely lost in this module and the support was terrible.",
    "Some lectures were useful but overall I am still unsure about the content.",
    "The lecturer was very supportive and the explanations were clear and engaging."
]

print("=== DistilBERT risk-aware predictions ===")
for i, t in enumerate(examples, 1):
    res = distilbert_predict_with_risk(t)
    print(f"\nExample {i}")
    print("Text :", res["text"])
    print("Pred :", res["label"], "| Risk:", res["risk"])
    print("Probs:", res["probs"])


#GPT Fine Tuning

In [None]:
!pip install -q transformers>=4.46.0 accelerate>=1.1.0 peft datasets

import torch, math
from datasets import Dataset
from transformers import (
    AutoTokenizer, AutoModelForCausalLM,
    TrainingArguments, Trainer,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# 1. Build small training corpus (FAST)
corpus = list(X_train_text[:1000])  # Fast subset
train_ds = Dataset.from_dict({"text": corpus})

# 2. Load GPT-2 + LoRA
model_name = "gpt2"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
).to(device)

lora_cfg = LoraConfig(
    r=16, lora_alpha=32, lora_dropout=0.05,
    task_type=TaskType.CAUSAL_LM,
)
model = get_peft_model(model, lora_cfg)


# 3. Tokenise
def tok(batch):
    return tokenizer(
        batch["text"],
        truncation=True,
        padding="max_length",
        max_length=128
    )

train_tok = train_ds.map(tok, batched=True)
train_tok.set_format(type="torch", columns=["input_ids", "attention_mask"])

data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)

# 4. TrainingArguments
training_args = TrainingArguments(
    output_dir="gpt2-lora-lm",
    num_train_epochs=1,
    per_device_train_batch_size=2,
    gradient_accumulation_steps=8,
    learning_rate=2e-4,
    logging_steps=20,
    save_steps=10_000,
    fp16=(device == "cuda"),
    report_to="none"
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tok,
    data_collator=data_collator,
)

# 5. Train
print("\n=== Training GPT-2 LoRA ===")
trainer.train()
print("Training Done!")

# 6. Compute LM Loss + Perplexity on training data
eval_res = trainer.evaluate(train_tok)
loss = eval_res["eval_loss"]
ppl  = math.exp(loss)

print("\n===== GPT-2 LM METRICS =====")
print(f"Cross-Entropy Loss : {loss:.4f}")
print(f"Perplexity (PPL)   : {ppl:.4f}")


In [None]:
!pip install -q transformers>=4.46.0 accelerate>=1.1.0 peft datasets

import torch, math
from datasets import Dataset
from transformers import (
    AutoTokenizer, AutoModelForCausalLM,
    TrainingArguments, Trainer,
    DataCollatorForLanguageModeling
)
from peft import LoraConfig, get_peft_model, TaskType

device = "cuda" if torch.cuda.is_available() else "cpu"
print("Using device:", device)

# 1. Collect a LARGER training corpus
max_samples = min(12000, len(X_train_text))
corpus = list(X_train_text[:max_samples])

train_ds = Dataset.from_dict({"text": corpus})

# 2. Load GPT-2 + LoRA
model_name = "gpt2"

tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token

model = AutoModelForCausalLM.from_pretrained(
    model_name,
    torch_dtype=torch.float16 if device == "cuda" else torch.float32,
).to(device)

lora_cfg = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    task_type=TaskType.CAUSAL_LM,

model = get_peft_model(model, lora_cfg)

# 3. Tokenisation
def tok(batch):
    return tokenizer(
        batch["text"],
        truncation=True,
        padding="max_length",
        max_length=128
    )

train_tok = train_ds.map(tok, batched=True)
train_tok.set_format(type="torch", columns=["input_ids", "attention_mask"])

data_collator = DataCollatorForLanguageModeling(tokenizer, mlm=False)

# 4. Training arguments â€“ tuned for <10min & low perplexity
training_args = TrainingArguments(
    output_dir="gpt2-lora-lm-highquality",
    num_train_epochs=2,
    per_device_train_batch_size=4,
    gradient_accumulation_steps=8,
    learning_rate=2e-4,
    warmup_steps=100,
    logging_steps=50,
    save_steps=5000,
    fp16=(device == "cuda"),
    report_to="none",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_tok,
    data_collator=data_collator,
)

# 5. Train (â‰ˆ8â€“10 minutes on T4 GPU)
print("\n=== Training GPT-2 LoRA (High Quality) ===")
trainer.train()
print("Training completed!")

# 6. Evaluate LM Loss + Perplexity
eval_res = trainer.evaluate(train_tok)
loss = eval_res["eval_loss"]
ppl = math.exp(loss)

print("\n===== GPT-2 LM METRICS =====")
print(f"Cross-Entropy Loss : {loss:.4f}")
print(f"Perplexity (PPL)   : {ppl:.4f}")


In [None]:
texts = [
    "Some lectures were helpful, but I felt unsure about what was expected.",
    "The module was okay and the workload was manageable.",
    "I am not fully confident about the content but it was not bad.",
]

for t in texts:
    pred_id, raw, full = gpt2_lora_predict_label(t)
    print("TEXT:", t)
    print("PRED :", id2label[pred_id], "(raw:", raw, ")")
    print()


In [None]:
def predict_sentiment(text):
    prompt = (
        "### Instruction:\n"
        "Classify the emotional tone of the following student feedback.\n\n"
        "### Review:\n"
        f"{text}\n\n"
        "### Label:\n"
    )

    inputs = tokenizer(prompt, return_tensors="pt").to(model.device)

    with torch.no_grad():
        out = model.generate(
            **inputs,
            max_new_tokens=5,
            do_sample=True,
            top_k=20,
            top_p=0.9,
            pad_token_id=tokenizer.eos_token_id,
        )

    decoded = tokenizer.decode(out[0], skip_special_tokens=True)
    after = decoded.split("### Label:")[-1].strip()
    raw = after.split()[0].strip('":,.') if after else ""

    txt = text.lower()
    if raw in ["Pos", "Neg", "Conf", "Conf/Neu"]:
        final = raw
    elif any(w in txt for w in ["excellent","great","supportive","clear","enjoy","love"]):
        final = "Pos"
    elif any(w in txt for w in ["worst","bad","confusing","lost","unhelpful","poor"]):
        final = "Neg"
    else:
        final = "Conf/Neu"

    return final, raw


examples = {
    "Positive": "Loved this module! The lecturer was clear, supportive, and made the content enjoyable.",
    "Negative": "This was the worst class Iâ€™ve taken. The feedback was unhelpful and I felt lost.",
    "Conf/Neu": "Some lectures were helpful, but other times I wasnâ€™t sure what was expected."
}

for label, text in examples.items():
    pred, raw = predict_sentiment(text)
    print(f"\n {label} Example ")
    print("Text:", text)
    print("Predicted:", pred, "| Raw token:", raw)
