In [1]:
# =========================================================
# Cell 1 – Imports and configuration
# =========================================================

import os
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import precision_recall_fscore_support, classification_report, confusion_matrix

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from sentence_transformers import SentenceTransformer


import matplotlib.pyplot as plt
import joblib

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

  from .autonotebook import tqdm as notebook_tqdm


Using device: cpu


In [5]:
# =========================================================
# Cell 2 – Load real lyrics–emotion data & embeddings
# =========================================================

TEXT_COL = "lyrics"          # from clean-emotion.py
TARGET_COL = "emotion"       # from clean-emotion.py
DATA_PATH = "../data/spotify_emotion_clean.csv"

# Sentence-transformer hyperparameters
ST_MODEL_NAME = "sentence-transformers/all-MiniLM-L6-v2"
EMBEDDINGS_PATH = f"../data/spotify_lyrics_embeddings_{ST_MODEL_NAME.split('/')[-1]}.npy"
RECOMPUTE_EMBEDDINGS = False      # set True if you change model or preprocessing
SCALE_EMBEDDINGS = True

random_seed = 41
rng = np.random.default_rng(random_seed)

# Imbalance-handling hyperparameters
DOWNSAMPLING_MAX_MULTIPLIER = 10.0  # max examples per class = multiplier * minority class count
USE_CLASS_WEIGHTS = True

# Regularization hyperparameters
LABEL_SMOOTHING = 0.1
DROPOUT_RATE = 0.0

# Read cleaned dataset
df = pd.read_csv(DATA_PATH)
print("Loaded dataset:", df.shape)

# Keep rows with both lyrics and emotion
df = df.dropna(subset=[TEXT_COL, TARGET_COL])
df[TEXT_COL] = df[TEXT_COL].astype(str)

# Remember original row order so we can align with precomputed embeddings
df["row_idx"] = np.arange(len(df), dtype=np.int64)

print(df[[TEXT_COL, TARGET_COL]].head())
print("\nNumber of rows after dropna:", len(df))

# ---- One-time embedding pass (cached to disk) ----
if os.path.exists(EMBEDDINGS_PATH) and not RECOMPUTE_EMBEDDINGS:
    print(f"\nLoading cached embeddings from: {EMBEDDINGS_PATH}")
    X_embeddings_full = np.load(EMBEDDINGS_PATH, mmap_mode="r")
else:
    print(
        "\nNo cached embeddings found (or RECOMPUTE_EMBEDDINGS=True). "
        "Computing embeddings once and saving them to disk..."
    )
    print(f"Loading sentence-transformer model '{ST_MODEL_NAME}' on device {device}...")
    st_model = SentenceTransformer(ST_MODEL_NAME, device=str(device))
    EMBEDDING_DIM = st_model.get_sentence_embedding_dimension()

    X_embeddings_full = st_model.encode(
        df[TEXT_COL].tolist(),
        batch_size=32,
        show_progress_bar=True,
        convert_to_numpy=True,
    ).astype(np.float32)
    print("Saving embeddings to:", EMBEDDINGS_PATH)
    np.save(EMBEDDINGS_PATH, X_embeddings_full)

print("\nFull output embeddings shape:", X_embeddings_full.shape)

Loaded dataset: (491173, 39)
                                              lyrics  emotion
0  Friends told her she was better off at the bot...  sadness
1  Well I heard it, playing soft From a drunken b...  sadness
2  Oh my god, did I just say that out loud? Shoul...      joy
3  Remember when I called you on the telephone? Y...      joy
4  Calling me like I got something to say You tho...      joy

Number of rows after dropna: 491173

Loading cached embeddings from: ../data/spotify_lyrics_embeddings_all-MiniLM-L6-v2.npy

Full output embeddings shape: (492569, 384)


In [6]:
# =========================================================
# Cell 3 – Downsampling, encoding labels, train/test split
# =========================================================

# Moderate downsampling to reduce training time while keeping some imbalance
original_class_counts = df[TARGET_COL].value_counts()
print("\nClass distribution (original):\n", original_class_counts)

min_count = original_class_counts.min()
max_per_class = int(DOWNSAMPLING_MAX_MULTIPLIER * min_count)
print(
    f"\nApplying moderate downsampling with max_per_class={max_per_class} "
    f"(multiplier={DOWNSAMPLING_MAX_MULTIPLIER} * minority={min_count})"
)

df = (
    df.groupby(TARGET_COL, group_keys=False)
      .apply(lambda g: g.sample(n=min(len(g), max_per_class), random_state=random_seed))
      .reset_index(drop=True)
)

print("\nClass distribution (after downsampling):\n", df[TARGET_COL].value_counts())

# Align embeddings with the downsampled DataFrame using the saved row indices
selected_idx = df["row_idx"].to_numpy()
X_embeddings = X_embeddings_full[selected_idx].astype(np.float32)

# We no longer need the helper column
df = df.drop(columns=["row_idx"])

print("\nEmbeddings shape after downsampling:", X_embeddings.shape)

# Encode class labels
label_encoder = LabelEncoder()
df["Emotion_encoded"] = label_encoder.fit_transform(df[TARGET_COL])

y = df["Emotion_encoded"].values.astype(np.int64)
X = X_embeddings.astype(np.float32)

# Train/test split
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=random_seed, stratify=y
)

# Scale embedding dimensions (optional but often helpful)
if SCALE_EMBEDDINGS:
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train).astype(np.float32)
    X_test = scaler.transform(X_test).astype(np.float32)

n_features = X_train.shape[1]
n_classes = len(np.unique(y))

print(f"Train shape: {X_train.shape}, Test shape: {X_test.shape}")
print(f"Embedding dimension: {n_features}")
print(f"Number of classes: {n_classes}")

# Compute inverse-frequency class weights on the training split (for CrossEntropyLoss)
class_weights_tensor = None
class_weights = None
if USE_CLASS_WEIGHTS:
    class_sample_counts = np.bincount(y_train, minlength=n_classes)
    class_weights = (len(y_train) / (n_classes * class_sample_counts)).astype(np.float32)
    class_weights_tensor = torch.from_numpy(class_weights).to(device)
    print("\nClass weights (inverse frequency, training split):", class_weights)


Class distribution (original):
 emotion
joy        188610
sadness    156417
anger       94929
fear        25917
love        25300
Name: count, dtype: int64

Applying moderate downsampling with max_per_class=253000 (multiplier=10.0 * minority=25300)


  .apply(lambda g: g.sample(n=min(len(g), max_per_class), random_state=random_seed))



Class distribution (after downsampling):
 emotion
joy        188610
sadness    156417
anger       94929
fear        25917
love        25300
Name: count, dtype: int64

Embeddings shape after downsampling: (491173, 384)
Train shape: (392938, 384), Test shape: (98235, 384)
Embedding dimension: 384
Number of classes: 5

Class weights (inverse frequency, training split): [1.0348235  3.7902768  0.52083397 3.8827865  0.62803257]


In [4]:
# =========================================================
# Cell 4 – Dataset & model definitions
# =========================================================

class EmotionDataset(Dataset):
    def __init__(self, X, y):
        # X is a matrix of sentence-transformer embeddings: shape (num_samples, embedding_dim)
        self.X = torch.from_numpy(X).float()
        self.y = torch.from_numpy(y).long()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

class SimpleLinearNet(nn.Module):

    def __init__(self, input_dim, num_classes):
        super(LinearReLUDropoutLinearNet, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, num_classes),
        )

    def forward(self, x):
        return self.net(x)

class LinearReLUDropoutLinearNet(nn.Module):
    """Linear layer with 256 ReLU neurons, then a linear layer to num_classes.

    Input: batch of embedding vectors of shape (batch_size, embedding_dim).
    """

    def __init__(self, input_dim, num_classes):
        super(LinearReLUDropoutLinearNet, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Dropout(DROPOUT_RATE),
            nn.Linear(256, num_classes),
        )

    def forward(self, x):
        return self.net(x)


if USE_CLASS_WEIGHTS and class_weights_tensor is not None:
    criterion = nn.CrossEntropyLoss(weight=class_weights_tensor, label_smoothing=LABEL_SMOOTHING)
else:
    criterion = nn.CrossEntropyLoss(label_smoothing=LABEL_SMOOTHING)

chosen_model = LinearReLUDropoutLinearNet

In [7]:
# =========================================================
# Cell 5 – Training & evaluation helpers
# =========================================================

def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0

    for X_batch, y_batch in dataloader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)

        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

        running_loss += loss.item() * X_batch.size(0)

    return running_loss / len(dataloader.dataset)


def evaluate_model(model, dataloader, device):
    model.eval()
    all_preds = []
    all_targets = []

    with torch.no_grad():
        for X_batch, y_batch in dataloader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_targets.extend(y_batch.numpy())

    all_preds = np.array(all_preds)
    all_targets = np.array(all_targets)

    precision, recall, f1, _ = precision_recall_fscore_support(
        all_targets, all_preds, average="macro", zero_division=0
    )

    accuracy = np.mean(all_preds == all_targets)

    return precision, recall, f1, accuracy, all_targets, all_preds


def log_confusion_matrix(writer, all_targets, all_preds, phase, epoch, fold=None):
    """Compute and log a confusion matrix to TensorBoard."""

    labels = np.arange(len(label_encoder.classes_))
    cm = confusion_matrix(all_targets, all_preds, labels=labels)
    class_names = label_encoder.classes_

    fig, ax = plt.subplots(figsize=(6, 6))
    im = ax.imshow(cm, interpolation="nearest", cmap=plt.cm.Blues)
    ax.figure.colorbar(im, ax=ax)

    ax.set(
        xticks=np.arange(cm.shape[1]),
        yticks=np.arange(cm.shape[0]),
        xticklabels=class_names,
        yticklabels=class_names,
        ylabel="True label",
        xlabel="Predicted label",
        title=f"Confusion matrix ({phase})",
    )
    plt.setp(ax.get_xticklabels(), rotation=45, ha="right", rotation_mode="anchor")

    fmt = "d"
    thresh = cm.max() / 2.0 if cm.max() > 0 else 0
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(
                j,
                i,
                format(cm[i, j], fmt),
                ha="center",
                va="center",
                color="white" if cm[i, j] > thresh else "black",
            )

    fig.tight_layout()

    tag = f"ConfusionMatrix/{phase}"
    if fold is not None:
        tag += f"/fold_{fold}"

    writer.add_figure(tag, fig, global_step=epoch)
    plt.close(fig)

In [8]:
# =========================================================
# Cell 6 – K-fold cross-validation with TensorBoard
# =========================================================

K_FOLDS = 3
BATCH_SIZE = 32
NUM_EPOCHS = 15
LEARNING_RATE = 1e-3
EPOCHS_BETWEEN_REPORTS = 2

skf = StratifiedKFold(n_splits=K_FOLDS, shuffle=True, random_state=42)

fold_results = []

# Tensorboard logging
log_dir = f"logs/emotion_cv_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
writer = SummaryWriter(log_dir)
hparams = {
    "learning_rate": LEARNING_RATE,
    "batch_size": BATCH_SIZE,
    "k_folds": K_FOLDS,
    "num_epochs": NUM_EPOCHS,
    "epochs_between_reports": EPOCHS_BETWEEN_REPORTS,
    "downsampling_max_multiplier": DOWNSAMPLING_MAX_MULTIPLIER,
    "use_class_weights": USE_CLASS_WEIGHTS,
    "scale_embeddings": SCALE_EMBEDDINGS,
    "sentence_transformer_model_name": ST_MODEL_NAME,
    "neural_net_architecture": chosen_model.__name__,
    "dropout_rate": DROPOUT_RATE,
    "label_smoothing": LABEL_SMOOTHING,
}
writer.add_text(
    "hparams",
    "\n".join(f"{k}: {v}" for k, v in hparams.items()),
)

for fold, (train_idx, val_idx) in enumerate(skf.split(X_train, y_train), 1):
    print(f"\n==== Fold {fold}/{K_FOLDS} ====")

    X_tr, X_val = X_train[train_idx], X_train[val_idx]
    y_tr, y_val = y_train[train_idx], y_train[val_idx]

    train_dataset = EmotionDataset(X_tr, y_tr)
    val_dataset = EmotionDataset(X_val, y_val)

    train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)

    # Model that consumes sentence-transformer embeddings as input features
    model = chosen_model(n_features, n_classes).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

    best_val_f1 = -1.0

    # Train
    for epoch in range(1, NUM_EPOCHS + 1):
        loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
        writer.add_scalars(
            "Loss/kcv_train",
            {f"fold_{fold}": loss},
            epoch,
        )
        if epoch % EPOCHS_BETWEEN_REPORTS == 0 or epoch == 1 or epoch == NUM_EPOCHS:
            print(f"Epoch {epoch}/{NUM_EPOCHS} - Loss: {loss:.4f}")

            # Evaluate on training split
            precision, recall, f1, accuracy, all_targets, all_preds = evaluate_model(model, train_loader, device)
            writer.add_scalars("F1_Score/kcv_train", {f"fold_{fold}": f1}, epoch)
            writer.add_scalars("Accuracy/kcv_train", {f"fold_{fold}": accuracy}, epoch)
            writer.add_scalars("Precision/kcv_train", {f"fold_{fold}": precision}, epoch)
            writer.add_scalars("Recall/kcv_train", {f"fold_{fold}": recall}, epoch)
            log_confusion_matrix(writer, all_targets, all_preds, phase="train", epoch=epoch, fold=fold)

            # Evaluate on validation split
            precision, recall, f1, accuracy, all_targets, all_preds = evaluate_model(model, val_loader, device)
            writer.add_scalars("F1_Score/kcv_validation", {f"fold_{fold}": f1}, epoch)
            writer.add_scalars("Accuracy/kcv_validation", {f"fold_{fold}": accuracy}, epoch)
            writer.add_scalars("Precision/kcv_validation", {f"fold_{fold}": precision}, epoch)
            writer.add_scalars("Recall/kcv_validation", {f"fold_{fold}": recall}, epoch)
            print(f"Validation: Fold {fold} - Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f}")
            log_confusion_matrix(writer, all_targets, all_preds, phase="val", epoch=epoch, fold=fold)

            best_val_f1 = max(best_val_f1, f1)

    fold_results.append({
        "fold": fold,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "accuracy": accuracy,
        "best_val_f1": best_val_f1,
    })

print("\n==== Cross-Validation Summary (Macro-Averaged) ====")
for r in fold_results:
    print(
        f"Fold {r['fold']}: "
        f"Precision={r['precision']:.4f}, "
        f"Recall={r['recall']:.4f}, "
        f"F1={r['f1']:.4f}, "
        f"BestValF1={r['best_val_f1']:.4f}"
    )

mean_precision = np.mean([r["precision"] for r in fold_results])
mean_recall = np.mean([r["recall"] for r in fold_results])
mean_f1 = np.mean([r["f1"] for r in fold_results])
mean_best_f1 = np.mean([r["best_val_f1"] for r in fold_results])
print(
    f"\nMean over {K_FOLDS} folds - "
    f"Precision={mean_precision:.4f}, Recall={mean_recall:.4f}, "
    f"F1={mean_f1:.4f}, BestValF1={mean_best_f1:.4f}"
)


==== Fold 1/3 ====
Epoch 1/15 - Loss: 1.7310
Validation: Fold 1 - Precision: 0.2973 | Recall: 0.3132 | F1: 0.2383


KeyboardInterrupt: 

In [10]:
# =========================================================
# Cell 7 – Final training on full train set + test eval + export
# =========================================================

train_dataset_full = EmotionDataset(X_train, y_train)
test_dataset = EmotionDataset(X_test, y_test)

train_loader_full = DataLoader(train_dataset_full, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

final_model = chosen_model(n_features, n_classes).to(device)
optimizer = torch.optim.Adam(final_model.parameters(), lr=LEARNING_RATE)

for epoch in range(1, 15 + 1):
    loss = train_one_epoch(final_model, train_loader_full, criterion, optimizer, device)
    if epoch % 5 == 0 or epoch == 1:
        print(f"[Final Model] Epoch {epoch}/{NUM_EPOCHS} - Loss: {loss:.4f}")

precision, recall, f1, accuracy, y_true_test, y_pred_test = evaluate_model(final_model, test_loader, device)
log_confusion_matrix(writer, all_targets, all_preds, phase="test", epoch=epoch, fold=fold)
print("\n==== Test Set Metrics (Macro-Averaged) ====")
print(f"Precision: {precision:.4f} | Recall: {recall:.4f} | F1: {f1:.4f} | Accuracy: {accuracy:.4f}")

print("\n==== Detailed Classification Report (Test Set) ====")
print(classification_report(
    y_true_test,
    y_pred_test,
    target_names=label_encoder.classes_,
    zero_division=0,
))

# Export model checkpoint (compatible with v2_inference.py)
EXPORT_DIR = "models"
os.makedirs(EXPORT_DIR, exist_ok=True)

checkpoint = {
    "model_state_dict": final_model.state_dict(),
    "input_dim": n_features,
    "n_classes": n_classes,
    "label_classes": label_encoder.classes_,
    "sentence_transformer_name": ST_MODEL_NAME,
    "scale_embeddings": SCALE_EMBEDDINGS,
    "use_class_weights": USE_CLASS_WEIGHTS,
    "class_weights": class_weights if USE_CLASS_WEIGHTS else None,
    "architecture": chosen_model.__name__,
}

ckpt_path = os.path.join(EXPORT_DIR, "emotion_classifier_v2.pt")
torch.save(checkpoint, ckpt_path)
print("Saved model checkpoint to", ckpt_path)

# Save scaler
if SCALE_EMBEDDINGS:
    scaler_path = os.path.join(EXPORT_DIR, "scaler.joblib")
    joblib.dump(scaler, scaler_path)
    print("Saved scaler to", scaler_path)

[Final Model] Epoch 1/15 - Loss: 1.7365
[Final Model] Epoch 5/15 - Loss: 1.7212
[Final Model] Epoch 10/15 - Loss: 1.7034
[Final Model] Epoch 15/15 - Loss: 1.6799

==== Test Set Metrics (Macro-Averaged) ====
Precision: 0.2947 | Recall: 0.3137 | F1: 0.2393 | Accuracy: 0.2550

==== Detailed Classification Report (Test Set) ====
              precision    recall  f1-score   support

       anger       0.32      0.36      0.34     18986
        fear       0.08      0.40      0.13      5183
         joy       0.52      0.16      0.25     37722
        love       0.09      0.39      0.15      5060
     sadness       0.46      0.26      0.33     31284

    accuracy                           0.26     98235
   macro avg       0.29      0.31      0.24     98235
weighted avg       0.42      0.26      0.28     98235

Saved model checkpoint to models/emotion_classifier_v2.pt
Saved scaler to models/scaler.joblib
