### Import libraries and classes

In [6]:
import os
import sys
import random
import itertools

import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.optim import Adam
from torch.utils.data import DataLoader

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
from sklearn.utils.class_weight import compute_class_weight

from tqdm import tqdm
from transformers import AutoModel, AutoTokenizer
from peft import get_peft_model, LoraConfig, TaskType

# Adding 'src' directory to the system path
project_root = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
sys.path.append(os.path.join(project_root, 'src'))

from my_classes import SBERTWithClassifier, TextDataset

### Set SBERT version and task

In [7]:
model_name = "sentence-transformers/all-MiniLM-L6-v2"
tokenizer = AutoTokenizer.from_pretrained(model_name)

task = "all" # has to be one from "Sheldon_Leonard", "Sheldon_Penny", "all"

### Create dataset

In [8]:
# Load dialogues from the pickle file
df = pd.read_pickle("../data/processed/sbert_mini_embeddings.pkl")
if task == "Sheldon_Leonard":
    df =df[df['Person'].isin(['Sheldon','Leonard'])]
elif task == "Sheldon_Penny":
    df = df[df['Person'].isin(['Sheldon','Penny'])]
elif task != "all":
    print("Task not recognized, using all data.")

# Label encoding for the 'Person' column
label_encoder = LabelEncoder()
y_all = label_encoder.fit_transform(df["Person"])  # Da stringhe a numeri

# Label map of the encoded labels
label_map = dict(zip(label_encoder.classes_, label_encoder.transform(label_encoder.classes_)))
print("Label map:", label_map)


X = df["Said"].tolist()     # List of dialogues
y = y_all                   # Numpy array of encoded labels
all_classes = np.unique(y)
# print(len(all_classes))

# Split the dataset into training, validation, and test sets
X_train_texts, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
X_val_texts, X_test_texts, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, stratify=y_temp, random_state=42)

train_dataset = TextDataset(X_train_texts, y_train, tokenizer)
val_dataset = TextDataset(X_val_texts, y_val, tokenizer)

# create loaders for training and validation datasets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

Label map: {'Amy': np.int64(0), 'Bernadette': np.int64(1), 'Howard': np.int64(2), 'Leonard': np.int64(3), 'Penny': np.int64(4), 'Raj': np.int64(5), 'Sheldon': np.int64(6)}


### Set seeds for reproducibility

In [9]:
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed(42)
torch.cuda.manual_seed_all(42)

torch.backends.cudnn.deterministic = True  # disable non-deterministic optimizations
torch.backends.cudnn.benchmark = False     # disable benchmarking for reproducibility

### Grid search

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

learning_rates = [1e-4]
ranks = [8, 16, 32]
coeff_alphas = [2]
lora_dropouts = [0.1]
weight_decays = [0, 1e-4]
losses = ['weighted_cross_entropy', 'cross_entropy']

for lr, rank, coeff, lora_dropout, weight_decay, loss_name in itertools.product(learning_rates, ranks, coeff_alphas, lora_dropouts, weight_decays, losses):
  base_model = AutoModel.from_pretrained(model_name)
  # Configure LoRA
  lora_config = LoraConfig(
      r=rank,
      lora_alpha=coeff*rank,  # Lora alpha è spesso 2 * r
      target_modules=["query", "key", "value"],  # solo i moduli chiave-attivazione
      lora_dropout=lora_dropout,
      bias="none",
      task_type=TaskType.FEATURE_EXTRACTION
  )
  base_model = get_peft_model(base_model, lora_config)
  base_model.print_trainable_parameters()

  model = SBERTWithClassifier(base_model, num_classes=len(all_classes), dropout_rate=0.1).to(device)
  optimizer = Adam(model.parameters(), lr=lr, weight_decay=weight_decay)

  # Define the loss function
  if loss_name == 'weighted_cross_entropy':
    class_weights = compute_class_weight(class_weight='balanced', classes=all_classes, y=y)
    class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)
    loss_fn = nn.CrossEntropyLoss(weight=class_weights)
  elif loss_name == 'cross_entropy':
    loss_fn = nn.CrossEntropyLoss()

  # Function to compute accuracy
  def compute_accuracy(logits, labels):
      preds = torch.argmax(logits, dim=1)
      return (preds == labels).float().mean().item()

  # To monitor training and validation losses and accuracies
  train_losses = []
  val_losses = []
  train_accuracies = []
  val_accuracies = []
  best_val_loss = 10.0
  best_model_path = f"LoRA/{task}/{loss_name}/lora_{lr}_{rank}_{coeff*rank}_{lora_dropout}_{weight_decay}.pt"
  print(best_model_path)

  # Early stopping settings
  patience = 3
  no_improvement = 0
  num_epochs = 20

  for epoch in range(num_epochs):
      ### ---- TRAIN ----
      model.train()
      total_train_loss = 0
      total_train_acc = 0
      for batch in tqdm(train_loader, desc=f"Epoch {epoch+1} [Train]"):
          input_ids = batch["input_ids"].to(device)
          attention_mask = batch["attention_mask"].to(device)
          labels = batch["labels"].to(device)

          optimizer.zero_grad()
          outputs = model(input_ids=input_ids, attention_mask=attention_mask)
          loss = loss_fn(outputs, labels)
          loss.backward()
          optimizer.step()

          total_train_loss += loss.item()
          total_train_acc += compute_accuracy(outputs, labels)

      avg_train_loss = total_train_loss / len(train_loader)
      avg_train_acc = total_train_acc / len(train_loader)

      ### ---- VAL ----
      model.eval()
      total_val_loss = 0
      total_val_acc = 0
      with torch.no_grad():
          for batch in tqdm(val_loader, desc=f"Epoch {epoch+1} [Val]"):
              input_ids = batch["input_ids"].to(device)
              attention_mask = batch["attention_mask"].to(device)
              labels = batch["labels"].to(device)

              outputs = model(input_ids=input_ids, attention_mask=attention_mask)
              loss = loss_fn(outputs, labels)

              total_val_loss += loss.item()
              total_val_acc += compute_accuracy(outputs, labels)

      avg_val_loss = total_val_loss / len(val_loader)
      avg_val_acc = total_val_acc / len(val_loader)

      train_losses.append(avg_train_loss)
      val_losses.append(avg_val_loss)
      train_accuracies.append(avg_train_acc)
      val_accuracies.append(avg_val_acc)

      # ---- Early Stopping ----
      if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(model.state_dict(), best_model_path)
        print(f"💾 Best model saved (val loss: {best_val_loss:.4f})")
        no_improvement = 0
      else:
        no_improvement += 1
        print(f"⚠️ No improvement for {no_improvement} epoch(s)")

      # ---- Print summary for the epoch ----
    print(f"\n📊 Epoch {epoch+1} Summary:")
    print(f"  🔹 Train Loss: {avg_train_loss:.4f} | Accuracy: {avg_train_acc:.4f}")
    print(f"  🔸 Val   Loss: {avg_val_loss:.4f} | Accuracy: {avg_val_acc:.4f}\n")

    # ---- Early Stopping Check ----
    if no_improvement >= patience:
      print(f"Early stopping triggered (no improvement in {patience} epochs)")
      break
  
  # ---- Save Metrics ----
  metrics_df = pd.DataFrame({
      "epoch": list(range(1, len(train_losses) + 1)),
      "train_loss": train_losses,
      "val_loss": val_losses,
      "train_accuracy": train_accuracies,
      "val_accuracy": val_accuracies,
      "learning_rate": lr,
      "rank": rank,
      "lora_alpha": coeff * rank,
      "lora_dropout": lora_dropout,
      "weight_decay": weight_decay,
      "loss_function": loss_name
  })

  # Save the metrics DataFrame to a CSV file
  csv_path = f"LoRA/{task}/{loss_name}/lora_{lr}_{rank}_{coeff*rank}_{lora_dropout}_{weight_decay}.csv"
  metrics_df.to_csv(csv_path, index=False)