**Benchmarking Five Transformer Model (BERT, DISTILBERT, ROBERTA, XLNET and ALBERT) For Question Classification**

In [None]:
import torch
import random
import pandas as pd
import numpy as np
import os
import json
import matplotlib.pyplot as plt
import seaborn as sns
import joblib

from collections import defaultdict
from torch.utils.data import DataLoader, Subset
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import classification_report, accuracy_score, precision_score, recall_score, f1_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from sklearn.preprocessing import LabelEncoder
from transformers import (
    BertTokenizer, BertForSequenceClassification,
    DistilBertTokenizer, DistilBertForSequenceClassification,
    RobertaTokenizer, RobertaForSequenceClassification,
    XLNetTokenizer, XLNetForSequenceClassification,
    AlbertTokenizer, AlbertForSequenceClassification,
)
from torch.optim import AdamW
from tqdm import tqdm

# Reproducibility
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)

# Load dataset
df = pd.read_csv("questions-data-new.csv")
df = df.rename(columns={"topic": "label"})

# Encode labels
le = LabelEncoder()
df["label"] = le.fit_transform(df["label"])
label_names = le.classes_

# Tokenization Helper
def tokenize_data(tokenizer, texts, labels, max_length=128):
    encodings = tokenizer(texts.tolist(), truncation=True, padding=True, max_length=max_length)
    encodings = {key: torch.tensor(val) for key, val in encodings.items()}
    labels = torch.tensor(labels.tolist())
    dataset = torch.utils.data.TensorDataset(encodings['input_ids'], encodings['attention_mask'], labels)
    return dataset

# Training Function
def train_one_fold(model, train_loader, val_loader, optimizer, device, num_labels, num_epochs, model_name, fold, fold_losses):
    model.to(device)
    model.train()

    all_labels = []
    for _, _, labels in train_loader:
        all_labels.extend(labels.tolist())

    class_weights = compute_class_weight(class_weight='balanced', classes=np.arange(num_labels), y=all_labels)
    class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
    loss_fn = torch.nn.CrossEntropyLoss(weight=class_weights)

    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0

        print(f"\nEpoch {epoch+1}/{num_epochs}")
        for step, batch in enumerate(tqdm(train_loader, desc=f"Training Epoch {epoch+1}")):
            input_ids, attention_mask, labels = [b.to(device) for b in batch]
            optimizer.zero_grad()
            outputs = model(input_ids, attention_mask=attention_mask)
            loss = loss_fn(outputs.logits, labels)
            total_train_loss += loss.item()
            loss.backward()
            optimizer.step()

            if step % 25 == 0:
                tqdm.write(f"Epoch {epoch+1}, Step {step}, Train Loss: {loss.item():.4f}")

        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        model.eval()
        total_val_loss = 0
        with torch.no_grad():
            for val_batch in val_loader:
                input_ids, attention_mask, labels = [b.to(device) for b in val_batch]
                outputs = model(input_ids, attention_mask=attention_mask)
                loss = loss_fn(outputs.logits, labels)
                total_val_loss += loss.item()

        avg_val_loss = total_val_loss / len(val_loader)
        val_losses.append(avg_val_loss)

        print(f"Epoch {epoch+1} - Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")

    fold_losses[model_name] = {"train": train_losses, "val": val_losses}
    return train_losses, val_losses

# Plotting losses across folds in a single graph per fold
def plot_fold_wise_loss(fold_losses_all_models, num_epochs, fold):
    plt.figure(figsize=(12, 6))
    colors = sns.color_palette("husl", n_colors=len(fold_losses_all_models))

    for idx, (model_name, losses) in enumerate(fold_losses_all_models.items()):
        plt.plot(range(1, num_epochs + 1), losses['train'], linestyle='-', label=f"{model_name} - Train", color=colors[idx])
        plt.plot(range(1, num_epochs + 1), losses['val'], linestyle='--', label=f"{model_name} - Val", color=colors[idx])

    plt.title(f"Fold {fold+1} - Training & Validation Loss for All Models")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.legend(loc='upper right', bbox_to_anchor=(1.25, 1))
    plt.grid(True)
    plt.tight_layout()
    os.makedirs("loss_plots_folds", exist_ok=True)
    plt.savefig(f"loss_plots_folds/fold_{fold+1}_loss_comparison.png")
    plt.close()

# Cross-Validation Trainer
def train_with_cv(model_name, tokenizer_class, model_class, hyperparams, all_losses, global_fold_losses, fold):
    tokenizer = tokenizer_class.from_pretrained(model_name, use_fast=True)
    dataset = tokenize_data(tokenizer, df['question'], df['label'])

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    all_metrics = []

    classwise_all = defaultdict(lambda: defaultdict(list))  # <== fixed here

    for f_idx, (train_idx, val_idx) in enumerate(skf.split(df['question'], df['label'])):
        if f_idx != fold:
            continue

        print(f"\n----- Fold {f_idx+1} - Model: {model_name} -----")

        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)

        train_loader = DataLoader(train_subset, batch_size=hyperparams['batch_size'], shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=hyperparams['batch_size'])

        model = model_class.from_pretrained(model_name, num_labels=len(label_names))
        optimizer = AdamW(model.parameters(), lr=hyperparams['learning_rate'])

        fold_losses = {}
        train_losses, val_losses = train_one_fold(model, train_loader, val_loader, optimizer, device,
                                                  num_labels=len(label_names),
                                                  num_epochs=hyperparams.get('epochs', 5),
                                                  model_name=model_name,
                                                  fold=f_idx,
                                                  fold_losses=fold_losses)
        global_fold_losses[model_name] = fold_losses[model_name]

        model.eval()
        val_labels = []
        val_preds = []
        with torch.no_grad():
            for input_ids, attention_mask, labels in val_loader:
                input_ids, attention_mask, labels = input_ids.to(device), attention_mask.to(device), labels.to(device)
                outputs = model(input_ids, attention_mask=attention_mask)
                logits = outputs.logits
                preds = torch.argmax(logits, dim=1)

                val_labels.extend(labels.cpu().numpy())
                val_preds.extend(preds.cpu().numpy())

        accuracy = accuracy_score(val_labels, val_preds)
        precision = precision_score(val_labels, val_preds, average='weighted')
        recall = recall_score(val_labels, val_preds, average='weighted')
        f1 = f1_score(val_labels, val_preds, average='weighted')

        print("Classification Report:")
        report = classification_report(val_labels, val_preds, target_names=label_names, output_dict=True)
        print(classification_report(val_labels, val_preds, target_names=label_names))

        for cls in label_names:
            classwise_all[cls]["precision"].append(report[cls]["precision"])
            classwise_all[cls]["recall"].append(report[cls]["recall"])
            classwise_all[cls]["f1"].append(report[cls]["f1-score"])
            classwise_all[cls]["support"].append(report[cls]["support"])

            # Optional: Accuracy per class (manual)
            cls_idx = list(label_names).index(cls)
            cls_correct = sum((np.array(val_labels) == cls_idx) & (np.array(val_preds) == cls_idx))
            cls_total = sum(np.array(val_labels) == cls_idx)
            cls_acc = cls_correct / cls_total if cls_total > 0 else 0.0
            classwise_all[cls]["accuracy"].append(cls_acc)

        all_metrics.append({
            "fold": f_idx + 1,
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1": f1
        })

        os.makedirs("models", exist_ok=True)
        model_path = f"models/{model_name.replace('/', '_')}_fold{f_idx+1}.pt"
        torch.save(model.state_dict(), model_path)

    results_df = pd.DataFrame(all_metrics)
    return results_df, classwise_all


# Train all models for each fold and generate single loss plot per fold
def train_all_models_foldwise():
    model_configs = [
        ("bert-base-uncased", BertTokenizer, BertForSequenceClassification, {"batch_size": 16, "learning_rate": 5e-5, "epochs": 5}),
        ("distilbert-base-uncased", DistilBertTokenizer, DistilBertForSequenceClassification, {"batch_size": 16, "learning_rate": 5e-5, "epochs": 5}),
        ("roberta-base", RobertaTokenizer, RobertaForSequenceClassification, {"batch_size": 16, "learning_rate": 2e-5, "epochs": 5}),
        ("xlnet-base-cased", XLNetTokenizer, XLNetForSequenceClassification, {"batch_size": 16, "learning_rate": 2e-5, "epochs": 5}),
        ("albert-base-v2", AlbertTokenizer, AlbertForSequenceClassification, {"batch_size": 16, "learning_rate": 5e-5, "epochs": 5})
    ]

    all_results = {}
    classwise_scores = defaultdict(lambda: defaultdict(lambda: defaultdict(list)))

    for fold in range(5):
        print(f"\n================= Fold {fold+1} =================")
        fold_losses_all_models = {}

        for model_name, tokenizer_class, model_class, hyperparams in model_configs:
            results_df, classwise_all = train_with_cv(model_name, tokenizer_class, model_class, hyperparams, None, fold_losses_all_models, fold)

            if model_name not in all_results:
                all_results[model_name] = []

            all_results[model_name].append(results_df)

            # Save all classwise metrics per fold
            for cls in classwise_all:
                for metric in classwise_all[cls]:
                    classwise_scores[model_name][cls][metric].extend(classwise_all[cls][metric])

        plot_fold_wise_loss(fold_losses_all_models, hyperparams.get("epochs", 5), fold)

    # Combine and print model-wise average performance
    print("\n==== Overall Model-Wise Performance (Across Folds) ====")
    for model_name, all_folds_df in all_results.items():
        full_df = pd.concat(all_folds_df)
        avg_metrics = full_df[["accuracy", "precision", "recall", "f1"]].mean()
        print(f"\nModel: {model_name}")
        print(avg_metrics.to_string(float_format="%.4f"))

    # Print class-wise performance (averaged across folds)
    print("\n==== Overall Classwise Metrics (Across Folds) ====")
    for model_name in classwise_scores:
        print(f"\nModel: {model_name}")
        for cls in label_names:
            print(f"\nClass: {cls}")
            for metric in ["accuracy", "precision", "recall", "f1"]:
                scores = classwise_scores[model_name][cls][metric]
                if scores:
                    print(f"{metric.capitalize()}: {np.mean(scores):.4f}")


# Execute
train_all_models_foldwise()


In [None]:
import streamlit as st
import torch
import joblib
import numpy as np
from transformers import BertTokenizer, BertForSequenceClassification

# Load label encoder
le = joblib.load("label_encoder.pkl")
label_names = le.classes_

# Constants
MODEL_NAME = "bert-base-uncased"
MODEL_PATH = "models/bert-base-uncased_fold1.pt"  # adjust if using a different fold
MAX_LENGTH = 128

# Load tokenizer and model
tokenizer = BertTokenizer.from_pretrained(MODEL_NAME)
model = BertForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=len(label_names))
model.load_state_dict(torch.load(MODEL_PATH, map_location=torch.device('cpu')))
model.eval()

# App title and description
st.title("Question Topic Classifier")
st.markdown("This app uses a fine-tuned **BERT** model to classify questions into one of the trained topics.")

# Sidebar info
st.sidebar.markdown("### Model Info")
st.sidebar.markdown("**Model:** `bert-base-uncased`")
st.sidebar.markdown("**Accuracy:** ~91% (best performer in benchmark)")
st.sidebar.markdown("**Fold:** 1 (from 5-fold CV)")

# Text input
user_input = st.text_area("Enter your question:", height=150)

if st.button("Classify"):
    if not user_input.strip():
        st.warning("Please enter a valid question.")
    else:
        # Preprocess input
        inputs = tokenizer(user_input, return_tensors="pt", padding=True, truncation=True, max_length=MAX_LENGTH)

        # Inference
        with torch.no_grad():
            outputs = model(**inputs)
            logits = outputs.logits
            probs = torch.nn.functional.softmax(logits, dim=1)
            pred_idx = torch.argmax(probs, dim=1).item()
            confidence = torch.max(probs).item()
            predicted_label = label_names[pred_idx]

        # Display result
        st.success(f"**Predicted Topic:** `{predicted_label}`")
        st.info(f"**Confidence:** `{confidence*100:.2f}%`")

        # Optional: Show top-k probabilities
        st.subheader("Prediction Probabilities:")
        top_probs = probs.numpy().flatten()
        prob_table = {label_names[i]: f"{top_probs[i]*100:.2f}%" for i in range(len(label_names))}
        st.json(prob_table)


**Quantum Enhanced Transformer Model (BERT - Highest accuracy)**

In [None]:
!pip install transformers pennylane scikit-learn torch --quiet

In [None]:
!pip install datasets --quiet

In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from transformers import BertTokenizer, BertModel
from sklearn.decomposition import PCA
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report
from tqdm import tqdm
from datasets import Dataset
import pennylane as qml
import matplotlib.pyplot as plt
import gc

In [None]:
# ===============================
# 📦 Load Dataset and Generate BERT Embeddings
# ===============================
dataset = pd.read_csv("questions-data-new.csv")
questions = dataset['question'].tolist()
labels = dataset['topic'].tolist()

# Load BERT
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
model = BertModel.from_pretrained('bert-base-uncased')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device).eval()
torch.cuda.empty_cache()

def get_bert_embeddings(text_list, batch_size=16):
    embeddings = []
    for i in range(0, len(text_list), batch_size):
        batch = text_list[i:i+batch_size]
        inputs = tokenizer(batch, padding=True, truncation=True, return_tensors="pt", max_length=128)
        inputs = {k: v.to(device) for k, v in inputs.items()}
        with torch.no_grad():
            outputs = model(**inputs)
            cls_embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
            embeddings.append(cls_embeddings)
        gc.collect()
    return np.vstack(embeddings)

bert_embeddings = get_bert_embeddings(questions)
print("Shape of BERT embeddings:", bert_embeddings.shape)

In [None]:
# ===============================
# 🔄 PCA + LDA Reduction
# ===============================
pca = PCA(n_components=0.95)
bert_pca = pca.fit_transform(bert_embeddings)
print("Shape after PCA:", bert_pca.shape)

label_encoder = LabelEncoder()
encoded_labels = label_encoder.fit_transform(labels)


In [None]:
# ⚛️ Quantum Encoding (Amplitude)
# ===============================
def normalize(vec):
    norm = np.linalg.norm(vec)
    return vec / norm if norm != 0 else vec

def direct_amplitude_encoding(x, num_qubits):
    vec_len = 2 ** num_qubits
    if len(x) < vec_len:
        x = np.pad(x, (0, vec_len - len(x)), 'constant')
    else:
        x = x[:vec_len]
    x = normalize(x)
    dev = qml.device('default.qubit', wires=num_qubits)

    @qml.qnode(dev)
    def circuit():
        qml.AmplitudeEmbedding(x, wires=range(num_qubits), normalize=True)
        return qml.state()

    state = circuit()
    features =  np.concatenate([state.real, state.imag])
    return features

# ===============================
# **2. Segmented Amplitude Encoding**
# ===============================

def next_power_of_2(n):
    return 1 if n == 0 else 2**(n - 1).bit_length()

def segmented_amplitude_encoding(x, num_segments=3, target_len=512):
    # Divide the vector into segments
    segment_length = len(x) // num_segments
    segments = []

    for i in range(num_segments):
        segment = x[i * segment_length : (i + 1) * segment_length]

        # Normalize the segment
        segment = normalize(segment)

        # Padding to the next power of 2 (to match qubit count)
        seg_target_len = next_power_of_2(len(segment))
        if len(segment) < seg_target_len:
            segment = np.pad(segment, (0, seg_target_len - len(segment)), mode='constant')

        segments.append(segment)

    # Flatten the segments for input to the quantum device
    flattened_segments = np.concatenate(segments)

    # Ensure that the flattened vector's length matches a power of 2 for qubit embedding
    target_len = 512  # You want the final length to be 512 (or other power of 2)
    if len(flattened_segments) < target_len:
        flattened_segments = np.pad(flattened_segments, (0, target_len - len(flattened_segments)), mode='constant')
    elif len(flattened_segments) > target_len:
        flattened_segments = flattened_segments[:target_len]

    # Number of qubits needed for the target length (512)
    num_qubits = int(np.log2(len(flattened_segments)))
    dev = qml.device('default.qubit', wires=num_qubits)

    @qml.qnode(dev)
    def circuit():
        # Encoding the segments into the quantum state
        qml.AmplitudeEmbedding(flattened_segments, wires=range(num_qubits), normalize=True)
        return qml.state()

    return circuit()

In [None]:
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import classification_report, accuracy_score
from sklearn.model_selection import train_test_split, KFold
from tqdm import tqdm
import numpy as np

class FCNN(nn.Module):
    def __init__(self, input_dim, hidden_dim=1024, num_classes=8):
        super(FCNN, self).__init__()
        self.model = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.LeakyReLU(),
            nn.Dropout(0.5),  # Increased Dropout
            nn.Linear(hidden_dim, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.LeakyReLU(),
            nn.Dropout(0.5),  # Increased Dropout
            nn.Linear(hidden_dim, num_classes)
        )

    def forward(self, x):
        return self.model(x)

# ===============================
# Shared Training Function with K-Fold
# ===============================
def train_eval_model_kfold(embedded_states, encoded_labels, name, epochs, batch_size, patience, k_folds=5):
    kf = KFold(n_splits=k_folds, shuffle=True, random_state=42)
    fold_results = []

    # K-Fold Cross Validation
    for fold, (train_idx, val_idx) in enumerate(kf.split(embedded_states)):
        print(f"\nTraining Fold {fold+1}/{k_folds}...")

        # Splitting data into training and validation sets
        X_train, X_val = embedded_states[train_idx], embedded_states[val_idx]
        y_train, y_val = encoded_labels[train_idx], encoded_labels[val_idx]

        # Tensors
        X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
        y_train_tensor = torch.tensor(y_train, dtype=torch.long)
        X_val_tensor = torch.tensor(X_val, dtype=torch.float32)
        y_val_tensor = torch.tensor(y_val, dtype=torch.long)

        train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=batch_size, shuffle=True)

        input_dim = X_train_tensor.shape[1]
        num_classes = len(np.unique(encoded_labels))
        model = FCNN(input_dim=input_dim, num_classes=num_classes)
        criterion = nn.CrossEntropyLoss()

        # Optimizer with L2 Regularization (weight_decay)
        optimizer = optim.Adam(model.parameters(), lr=0.0005, weight_decay=1e-5)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'max', patience=2, factor=0.5)

        best_val_acc = 0
        patience_counter = 0
        best_model_state = None

        for epoch in range(epochs):
            model.train()
            for X_batch, y_batch in train_loader:
                optimizer.zero_grad()
                outputs = model(X_batch)
                loss = criterion(outputs, y_batch)
                loss.backward()
                optimizer.step()

            model.eval()
            with torch.no_grad():
                val_outputs = model(X_val_tensor)
                val_preds = val_outputs.argmax(dim=1).numpy()
                val_acc = accuracy_score(y_val, val_preds)

            print(f"Epoch {epoch+1}/{epochs}, Fold {fold+1}, Validation Accuracy: {val_acc:.4f}")
            scheduler.step(val_acc)

            if val_acc > best_val_acc:
                best_val_acc = val_acc
                best_model_state = model.state_dict()
                patience_counter = 0
            else:
                patience_counter += 1

            if patience_counter >= patience:
                print("⏹️ Early stopping triggered.")
                break

        model.load_state_dict(best_model_state)
        model.eval()
        with torch.no_grad():
            val_outputs = model(X_val_tensor)
            val_preds = val_outputs.argmax(dim=1).numpy()
            val_acc = accuracy_score(y_val, val_preds)
            fold_results.append(val_acc)

    # After K-Fold training, compute the average accuracy across folds
    avg_accuracy = np.mean(fold_results)
    print(f"\n✅ Average Validation Accuracy across {k_folds} folds: {avg_accuracy:.4f}")

    # Final Test Evaluation on all data
    model.eval()
    with torch.no_grad():
        test_outputs = model(torch.tensor(embedded_states, dtype=torch.float32))
        test_preds = test_outputs.argmax(dim=1).numpy()
        test_acc = accuracy_score(encoded_labels, test_preds)

    print(f"🎯 Final Test Accuracy: {test_acc:.4f}")
    print(classification_report(encoded_labels, test_preds))

    return name, test_acc


def evaluate_direct_amplitude(X, y, num_qubits):
    print(f"\n🚀 Evaluating Direct Amplitude Encoding with {num_qubits} qubits...")
    embedded_states = []
    for vec in tqdm(X, desc=f"Embedding {num_qubits}q"):
        emb = direct_amplitude_encoding(vec, num_qubits=num_qubits)
        embedded_states.append(emb)
    embedded_states = np.array(embedded_states)
    name = f"Amplitude-{num_qubits}q"
    return train_eval_model_kfold(embedded_states, y, name, epochs=30, batch_size=32, patience=5, k_folds=5)

def evaluate_segmented_amplitude(X, y, num_segments):
    print(f"\n🚀 Evaluating Segmented Amplitude Encoding with {num_segments} segments...")
    embedded_states = []
    for vec in tqdm(X, desc=f"Segmenting {num_segments}"):
        emb = segmented_amplitude_encoding(vec, num_segments=num_segments)
        embedded_states.append(np.concatenate([emb.real, emb.imag]))
    embedded_states = np.array(embedded_states)
    name = f"Segmented-{num_segments}s"
    return train_eval_model_kfold(embedded_states, y, name, epochs=30, batch_size=32, patience=5, k_folds=5)

# ===============================
# Quantum Evaluation for Different Qubits with K-Fold Cross Validation
# ===============================
results = {}

for nq in [6, 7, 8, 9]:
    name, acc = evaluate_direct_amplitude(X=bert_pca, y=encoded_labels, num_qubits=nq)
    results[name] = acc

# Evaluate using segmented amplitude encoding for different segment counts
for segs in [3, 6, 9]:
    name, acc = evaluate_segmented_amplitude(X=bert_pca, y=encoded_labels, num_segments=segs)
    results[name] = acc

# Final result print
print("\n📊 Final Results:")
for name, acc in results.items():
    print(f"{name}: {acc:.4f}")
