In [None]:
import os
import re
import time
import pandas as pd
import numpy as np
from collections import Counter

import spacy
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

import optuna

In [None]:
dataPath = "Data\\track-a.csv"

In [None]:
dataFrame = pd.read_csv(dataPath)

In [None]:
nlpModel = spacy.load("en_core_web_sm", disable=["parser", "ner"])

In [None]:
textColumn = "text"
labelColumns = ["anger", "fear", "joy", "sadness", "surprise"]

In [None]:
def cleanerFunction(text: str) -> str:
    tempDoc = nlpModel(text)
    tokens = [
        tok.lemma_.lower()
        for tok in tempDoc
        if not tok.is_stop and not tok.is_punct and tok.lemma_ != "-PRON-"
    ]
    return " ".join(tokens)

In [None]:
dataFrame["Spacy_text"] = dataFrame["text"].astype(str).apply(cleanerFunction)

In [None]:
userDevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
labelColumns = [col for col in ["anger", "fear", "joy", "sadness", "surprise"] if col in dataFrame]
yData = dataFrame[labelColumns].values.astype(np.float32)

In [None]:
xAll = dataFrame["Spacy_text"].tolist()
xTrain, xVal, yTrain, yVal = train_test_split(xAll, yData, test_size = 0.1, random_state = 69)

In [None]:
xTrain = xTrain.copy()
xVal = xVal.copy()

In [None]:
numLabels = yTrain.shape[1]

In [None]:
TOKEN_PATTERN = re.compile(r"\b\w+\b")

In [None]:
def simpleTokenize(text: str) -> list:
    return TOKEN_PATTERN.findall(text.lower())

In [None]:
def vocabularyBuilder(texts: list, vocabsize: int = 20000):
    counter = Counter()
    for t in texts:
        tokens = simpleTokenize(t)
        counter.update(tokens)
    
    mostCommon = counter.most_common(vocabsize - 2)
    indextoword = ["<pad>", "<unk>"] + [token for token, _ in mostCommon]
    wordtoindex = {w: i for i, w in enumerate(indextoword)}

    return wordtoindex, indextoword

In [None]:
MAX_VOCAB_SIZE = 20000
MAX_SEQUENCE_LENGTH = 100

wordtoindex, indextoword = vocabularyBuilder(xTrain, vocabsize = MAX_VOCAB_SIZE)
vocabSize = len(indextoword)

padIndex = wordtoindex["<pad>"]
unkIndex = wordtoindex["<unk>"]

In [None]:
def encodePadFunction(texts: list, wordtoindex: dict, sequenceLength: int = 100) -> np.ndarray:
    encodings = []

    for t in texts:
        tokens = simpleTokenize(t)
        tokenIDs = [wordtoindex.get(tok, unkIndex) for tok in tokens]
        if len(tokenIDs) > sequenceLength:
            tokenIDs = tokenIDs[:sequenceLength]
        else:
            tokenIDs = tokenIDs + [padIndex] * (sequenceLength - len(tokenIDs))
        encodings.append(tokenIDs)

    return np.array(encodings, dtype = np.int64)

In [None]:
trainEncode = encodePadFunction(xTrain, wordtoindex, sequenceLength = 100)
valEncode = encodePadFunction(xVal, wordtoindex, sequenceLength = 100)

In [None]:
class TextDataset(Dataset):
    def __init__(self, encodings: np.ndarray, labels: np.ndarray) -> None:
        self.encodings = torch.from_numpy(encodings)
        self.labels = torch.from_numpy(labels)

    def __len__(self):
        return self.encodings.size(0)
    
    def __getitem__(self, index):
        return self.encodings[index], self.labels[index]

## Recurrent Neural Network

In [None]:
class RNNClassifier(nn.Module):
    def __init__(
        self,
        vocabSize: int,
        embeddingDim: int,
        hiddenDim: int,
        rnnLayers: int,
        bidirectional: bool,
        dropoutRate: float,
        denseUnits: int,
        numLabels: int,
        padIndex: int
    ) -> None:
        super().__init__()
        self.embedding = nn.Embedding(vocabSize, embeddingDim, padding_idx = padIndex)
        self.dropoutEmb = nn.Dropout(dropoutRate)
        self.rnn = nn.LSTM(
            input_size = embeddingDim,
            hidden_size = hiddenDim,
            num_layers = rnnLayers,
            bidirectional = bidirectional,
            batch_first = True,
            dropout = dropoutRate if rnnLayers > 1 else 0.0
        )
        directionFactor = 2 if bidirectional else 1
        self.fc1 = nn.Linear(hiddenDim * directionFactor, denseUnits)
        self.bn1 = nn.BatchNorm1d(denseUnits)
        self.relu = nn.ReLU()
        self.dropoutFc = nn.Dropout(dropoutRate)
        self.fc_out = nn.Linear(denseUnits, numLabels)

    def forward(self, x):
        emb = self.embedding(x)
        emb = self.dropoutEmb(emb)
        rnn_out, _ = self.rnn(emb)
        final_feat = rnn_out[:, -1, :]
        h = self.fc1(final_feat)
        h = self.bn1(h)
        h = self.relu(h)
        h = self.dropoutFc(h)
        logits = self.fc_out(h)
        return logits

In [None]:
def epochTrain(model, dataloader, optimizer, criterion, device):
    model.train()
    totalLoss = 0.0
    for batchInputs, batchLabels in dataloader:
        batchInputs = batchInputs.to(device)
        batchLabels = batchLabels.to(device)

        optimizer.zero_grad()
        logits = model(batchInputs)
        loss = criterion(logits, batchLabels)
        loss.backward()

        nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        totalLoss += loss.item() * batchInputs.size(0)
    
    return totalLoss / len(dataloader.dataset)

In [None]:
def evaluateModel(model, dataloader, criterion, device):
    model.eval()
    totalLoss = 0.0
    with torch.no_grad():
        for batchInputs, batchLabels in dataloader:
            batchInputs = batchInputs.to(device)
            batchLabels = batchLabels.to(device)

            logits = model(batchInputs)
            loss = criterion(logits, batchLabels)
            totalLoss += loss.item() * batchInputs.size(0)

    return totalLoss / len(dataloader.dataset)

In [None]:
freq = np.maximum(yTrain.sum(axis = 0) / len(yTrain), 1e-4)
class_weights = 1.0 / freq
class_weights = class_weights / class_weights.sum()
weight_tensor = torch.FloatTensor(class_weights).to(userDevice)

In [None]:
def objective(trial):
    embeddingDim  = trial.suggest_categorical("embeddingDim", [64, 128, 256])
    hiddenDim = trial.suggest_int("hiddenDim", 32, 256, step = 16)
    rnnLayers = trial.suggest_int("rnnLayers", 1, 5)
    bidirectional = trial.suggest_categorical("bidirectional", [False, True])
    dropoutRate = trial.suggest_float("dropoutRate", 0.2, 0.6, step = 0.1)
    denseUnits = trial.suggest_int("denseUnits", 32, 256, step = 16)
    learnRate = trial.suggest_float("learnRate", 1e-4, 1e-2, log = True)
    weightDecay = trial.suggest_float("weightDecay", 1e-6, 1e-2, log = True)
    batchSize = trial.suggest_categorical("batchSize", [32, 64, 128, 256])
    epochs = trial.suggest_categorical("epochs", [3, 5, 7, 9])
    optName = trial.suggest_categorical("optimizer", ["Adam", "RMSprop", "SGD"])
    use_scheduler = trial.suggest_categorical("use_scheduler", [False, True])
    if optName == "SGD":
        sgdvar = trial.suggest_float("momentum", 0.1, 0.9)

    trainDataset = TextDataset(trainEncode, yTrain)
    valDataset = TextDataset(valEncode, yVal)

    trainLoader = DataLoader(trainDataset, batch_size = batchSize, shuffle = True)
    valLoader = DataLoader(valDataset, batch_size = batchSize, shuffle = True)

    model = RNNClassifier(vocabSize, embeddingDim, hiddenDim, rnnLayers, bidirectional, dropoutRate, denseUnits, numLabels, padIndex).to(userDevice)

    if optName == "Adam":
        optimizer = optim.Adam(model.parameters(), lr = learnRate, weight_decay = weightDecay)
    elif optName == "RMSprop":
        optimizer = optim.RMSprop(model.parameters(), lr = learnRate, weight_decay = weightDecay)
    else:
        optimizer = optim.SGD(model.parameters(), lr = learnRate, momentum = sgdvar) # type: ignore

    criterionOption = nn.BCEWithLogitsLoss(pos_weight = weight_tensor)

    bestvalLoss = float("inf")
    counterVar = 0
    bestState = None

    if use_scheduler:
        scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode = "min", factor = 0.5, patience = 1)

    for epoch in range(1, epochs + 1):
        trainLoss = epochTrain(model, trainLoader, optimizer, criterionOption, userDevice)
        valLoss = evaluateModel(model, valLoader, criterionOption, userDevice)

        if use_scheduler:
            scheduler.step(valLoss) # type: ignore

        trial.report(valLoss, epoch)

        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
        
        if valLoss < bestvalLoss:
            bestvalLoss = valLoss
            counterVar = 0
            bestState = {k: v.cpu() for k, v in model.state_dict().items()}
        else:
            counterVar += 1
            if counterVar >=2:
                break

    model.load_state_dict(bestState) # type: ignore
    return bestvalLoss

In [None]:
def _objective(trial):
    return objective(trial)

In [None]:
studyRNN = optuna.create_study(direction = "minimize", sampler = optuna.samplers.TPESampler(seed = 42))


In [27]:
studyRNN.optimize(_objective, n_trials = 500)

[W 2025-06-07 17:10:00,782] Trial 1 failed with parameters: {'embeddingDim': 128, 'hiddenDim': 144, 'rnnLayers': 3, 'bidirectional': True, 'dropoutRate': 0.2, 'denseUnits': 32, 'learnRate': 0.007902619549708232, 'weightDecay': 0.007286653737491046, 'batchSize': 32, 'epochs': 7, 'optimizer': 'Adam', 'use_scheduler': True} because of the following error: KeyboardInterrupt().
Traceback (most recent call last):
  File "d:\Coding\Uni Marburg\ProjectNLP\.venv\Lib\site-packages\optuna\study\_optimize.py", line 197, in _run_trial
    value_or_values = func(trial)
                      ^^^^^^^^^^^
  File "C:\Users\sshre\AppData\Local\Temp\ipykernel_17468\1759998075.py", line 2, in _objective
    return objective(trial)
           ^^^^^^^^^^^^^^^^
  File "C:\Users\sshre\AppData\Local\Temp\ipykernel_17468\3726456976.py", line 42, in objective
    trainLoss = epochTrain(model, trainLoader, optimizer, criterionOption, userDevice)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

KeyboardInterrupt: 

In [None]:
print("\nOptuna Best Trial:")
best = studyRNN.best_trial
print(f"Validation Loss: {best.value:.4f}")
for key, val in best.params.items():
    print(f"  {key}: {val}")
print("\n")

best_embeddingDim = best.params["embeddingDim"]
best_hiddenDim = best.params["hiddenDim"]
best_rnnLayers = best.params["rnnLayers"]
best_bidirectional = best.params["bidirectional"]
best_dropoutRate = best.params["dropoutRate"]
best_denseUnits = best.params["denseUnits"]
best_learnRate = best.params["learnRate"]
best_weightDecay = best.params["weightDecay"]
best_batchSize = best.params["batchSize"]
best_epochs = best.params["epochs"]
best_optName = best.params["optimizer"]
best_use_scheduler = best.params["use_scheduler"]
if best_optName == "SGD":
    best_momentum = best.params["momentum"]

In [None]:
finalTrainDataset = TextDataset(trainEncode, yTrain)
finalValDataset = TextDataset(valEncode, yVal)

finalTrainLoader = DataLoader(finalTrainDataset, batch_size = best_batchSize, shuffle = True)
finalValLoader = DataLoader(finalValDataset, batch_size = best_batchSize, shuffle = False)

In [None]:
finalModel = RNNClassifier(
    vocabSize = vocabSize,
    embeddingDim = best_embeddingDim,
    hiddenDim = best_hiddenDim,
    rnnLayers = best_rnnLayers,
    bidirectional = best_bidirectional,
    dropoutRate = best_dropoutRate,
    denseUnits = best_denseUnits,
    numLabels = numLabels,
    padIndex = padIndex
).to(userDevice)

if best_optName == "Adam":
    finalOptimizer = optim.Adam(finalModel.parameters(), lr = best_learnRate, weight_decay = best_weightDecay)
elif best_optName == "RMSprop":
    finalOptimizer = optim.RMSprop(finalModel.parameters(), lr = best_learnRate, weight_decay = best_weightDecay)
else:
    finalOptimizer = optim.SGD(finalModel.parameters(), lr = best_learnRate, momentum = best_momentum, weight_decay = best_weightDecay)

finalCriterion = nn.BCEWithLogitsLoss(pos_weight = weight_tensor)

if best_use_scheduler:
    finalScheduler = optim.lr_scheduler.ReduceLROnPlateau(finalOptimizer, mode = "min", factor = 0.5, patience = 1)

In [None]:
bestValLossFinal = float("inf")
patienceCtr = 0
bestStateFinal = None

In [None]:
for epoch in range(1, best_epochs + 1):
    trainLoss = epochTrain(finalModel, finalTrainLoader, finalOptimizer, finalCriterion, userDevice)
    valLoss = evaluateModel(finalModel, finalValLoader,   finalCriterion, userDevice)
    print(f"Final Epoch {epoch:02d} | Train Loss: {trainLoss:.4f} | Val Loss: {valLoss:.4f}")

    if best_use_scheduler:
        finalScheduler.step(valLoss)

    if valLoss < bestValLossFinal:
        bestValLossFinal = valLoss
        patienceCtr = 0
        bestStateFinal = {k: v.cpu() for k, v in finalModel.state_dict().items()}
    else:
        patienceCtr += 1
        if patienceCtr >= 3:
            print(f"Early stopping at epoch {epoch}")
            break

In [None]:
finalModel.load_state_dict(bestStateFinal)

In [None]:
finalModel.eval()
runningLoss = 0.0
correct = 0
total = 0
with torch.no_grad():
    for batchInputs, batchLabels in finalValLoader:
        batchInputs = batchInputs.to(userDevice)
        batchLabels = batchLabels.to(userDevice)

        logits = finalModel(batchInputs)
        loss = finalCriterion(logits, batchLabels)
        runningLoss += loss.item() * batchInputs.size(0)

        preds = (torch.sigmoid(logits) >= 0.5).float()
        correct += (preds == batchLabels).sum().item()
        total += batchLabels.numel()

finalValLoss = runningLoss / len(finalValDataset)
finalValAcc  = correct / total
print(f"\nFinal Model -> Val Loss: {finalValLoss:.4f}, Val Accuracy: {finalValAcc:.4f}")

In [None]:
os.makedirs("savedModel", exist_ok = True)
torch.save({
    "model_state_dict": finalModel.state_dict(),
    "wordtoindex": wordtoindex,
    "indextoword": indextoword,
    "padIndex": padIndex,
    "max_sequence_length": MAX_SEQUENCE_LENGTH
}, "savedModel/rnn_emotion_classifier_optuna.pth")
print("Saved final model + vocab to savedModel/rnn_emotion_classifier_optuna.pth")