# Libraries

In [1]:
# IO
import os
import csv
import pathlib
from pathlib import Path
import chardet
import warnings

# Utilities
import numpy as np 
import pandas as pd
import copy

# Preprocessing
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
from sklearn.preprocessing import LabelEncoder
from torchtext.vocab import Vocab
from collections import Counter
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence

# Visualization
import seaborn as sns
import matplotlib.pyplot as plt

# Modelling and training
import optuna
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
from sklearn.model_selection import RepeatedStratifiedKFold
## Metrics
from sklearn.metrics import confusion_matrix
warnings.filterwarnings("ignore", category=FutureWarning)

  from .autonotebook import tqdm as notebook_tqdm


Download `nltk` resources (only needs to run once per machine)

In [2]:
nltk.download('punkt_tab')
nltk.download('stopwords')

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /Users/exterior/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/exterior/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

# Classes

Early stopping class (pytorch does not have a built in early stopping callback)

In [None]:
class EarlyStopping:
    def __init__(self, monitor='val_loss', patience=1, restore_best_weights=True, mode='min'):
        self.monitor = monitor
        self.patience = patience
        self.restore_best_weights = restore_best_weights
        self.mode = mode  # 'min' for loss, 'max' for accuracy, etc.
        self.best_score = None
        self.counter = 0
        self.early_stop = False
        self.best_model_state = None

    def __call__(self, current_score, model):
        # Determine if the current score is better
        if self.best_score is None:
            self.best_score = current_score
            if self.restore_best_weights:
                self.best_model_state = copy.deepcopy(model.state_dict())
        elif (self.mode == 'min' and current_score < self.best_score) or \
             (self.mode == 'max' and current_score > self.best_score):
            self.best_score = current_score
            self.counter = 0
            if self.restore_best_weights:
                self.best_model_state = copy.deepcopy(model.state_dict())
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

    def restore(self, model):
        if self.restore_best_weights and self.best_model_state:
            model.load_state_dict(self.best_model_state)

In [None]:
class TextDataset(Dataset):
    def __init__(self, df, vocab_inst):
        self.vocab_inst = vocab_inst
        self.texts = df['Sentence'].tolist()
        self.labels = df['Sentiment_encoded'].tolist()

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        encoded = torch.tensor(encode_text(self.texts[idx]), dtype=torch.long)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        return encoded, label

## Models

In [None]:
class LoSiento(nn.Module):
    def __init__(self, max_features, embed_dim=128, lstm_out=196, output_classes=3):
        super(LoSiento, self).__init__()
        
        self.embedding = nn.Embedding(num_embeddings=max_features, embedding_dim=embed_dim)
        self.spatial_dropout = nn.Dropout2d(p=0.5)  # Approximate SpatialDropout1D
        self.lstm = nn.LSTM(embed_dim, lstm_out, batch_first=True, dropout=0.3)
        
        self.dropout_1 = nn.Dropout(p=0.2)
        self.dense_1 = nn.Linear(lstm_out, 100)
        self.dropout_2 = nn.Dropout(p=0.4)
        self.output_layer = nn.Linear(100, output_classes)

    def forward(self, x):
        x = self.embedding(x)                      # (batch, seq_len, embed_dim)
        x = x.permute(0, 2, 1)                     # For Dropout2d: (batch, embed_dim, seq_len)
        x = self.spatial_dropout(x)
        x = x.permute(0, 2, 1)                     # Back to (batch, seq_len, embed_dim)
        
        x, _ = self.lstm(x)                        # LSTM returns (output, (h_n, c_n))
        x = x[:, -1, :]                            # Get the output from the last timestep
        
        x = self.dropout_1(x)
        x = F.relu(self.dense_1(x))
        x = self.dropout_2(x)
        x = self.output_layer(x)
        return F.softmax(x, dim=1)         

# Functions

### Preprocessing

The encoding of the text file is automatically detected here with some confidence. It could be extracted from terminal using
```
file -I file.txt
```

Terminal method (not os agnostic):

In [8]:
#!file -i ~/Documents/StudyResources/IML/Project/Part2/_data/FinancialPhraseBank-v1.0/Sentences_50Agree.txt

Python method:

In [6]:
def extract_sentences_from_file(filepath):
    """Given an input txt file, extract sentences 
    and associated sentiments

    Args:
        filepath (string): path to string 

    Returns:
        list: list of sentences and sentiments
    """
    sentences = []
    # automatically detect endoding (best guess)
    with open(filepath, 'rb') as file:
        encoding = chardet.detect(file.read())['encoding']
    # read and split
    with open(filepath, 'r', encoding=encoding) as file:
        for line in file:
            line = line.strip()
            if '.@' in line:
                sentence, sentiment = line.rsplit('.@', 1)
                sentence = sentence.strip()
                sentiment = sentiment.strip().lower()
                sentence = fix_common_mojibake(sentence)
                sentences.append((sentence, sentiment))
    return sentences

In [7]:
def txt_to_csv(txt_path, output_csv):
    """Takes a file path as input and generates a .csv file
    which contains sentences and sentiments as columns, extracted
    from the .txt file corresponding to the path

    Args:
        txt_path (string): path to the .txt file
        output_csv (string): path to desired output .csv file
    """
    # Skip processing if CSV already exists
    if os.path.exists(output_csv):
        print(f"{output_csv} already exists. Skipping processing.")
        return

    sentences = extract_sentences_from_file(txt_path)

    with open(output_csv, 'w', newline='', encoding='utf-8') as csvfile:
        writer = csv.writer(csvfile)
        writer.writerow(['Sentence', 'Sentiment'])  # Header
        writer.writerows(sentences)
    print(f"Processed files and wrote output to {output_csv}")


This function is needed as the text files seem to be corrupted, for example one sentence is:
```
Clothing retail chain Sepp+ñl+ñ 's sales increased by 8 % to EUR 155.2 mn , and operating profit rose to EUR 31.1 mn from EUR 17.1 mn in 2004
```
The characters `+ñ` are a result of choosing the wrong encoding; in fact, here the detected encoding is latin-1, and those characters correspond to `ä` in utf-8. The function below is used to fix these common mistakes

In [9]:
def fix_common_mojibake(text):
    """Function to manually handle encoding errors

    Args:
        text (string): input sentence

    Returns:
        string: output corrected sentence
    """
    replacements = {
        '+ñ': 'ä',
        '+í': 'é',
        '+ô': 'ö',
        '+ü': 'ü'
        }
    for wrong, right in replacements.items():
        text = text.replace(wrong, right)
    return text

This function summarizes the common preprocessing pipeline for natural language models.

In [None]:
def preprocess_text(text):
    # Tokenize the text
    tokens = word_tokenize(text.lower())

    # Remove stop words
    filtered_tokens = [token for token in tokens if token not in stopwords.words('english')]

    # Lemmatize the tokens
    lemmatizer = WordNetLemmatizer()
    lemmatized_tokens = [lemmatizer.lemmatize(token) for token in filtered_tokens]

    # Join the tokens back into a string
    processed_text = ' '.join(lemmatized_tokens)

    return processed_text

Function that creates csv files (if not created already) and returns the dataframe extracted from them

In [None]:
def create_df():
    """Import data and creates csv's and pandas dataframes

    Returns:
        pandas dataframes for training and testing
    """
    print("Please select which dataset to use. Data is categorized based on the percentage of agreement in the sentiment estimator.")
    print("Options:")
    print("(1) 50")
    print("(2) 66")
    print("(3) 75")
    print("(4) 100")
    percentage = input()
    if percentage=='50':
        txt_path = RAW_DATA_FOLDER + 'Sentences_50Agree.txt'
        csv_path = DATASET_FOLDER + 'sentences_50.csv'
        txt_to_csv(txt_path, csv_path)
    elif percentage=='66':
        txt_path = RAW_DATA_FOLDER + 'Sentences_66Agree.txt'
        csv_path = DATASET_FOLDER + 'sentences_66.csv'
        txt_to_csv(txt_path, csv_path)
    elif percentage=='75':
        txt_path = RAW_DATA_FOLDER + 'Sentences_75Agree.txt'
        csv_path = DATASET_FOLDER + 'sentences_75.csv'
        txt_to_csv(txt_path, csv_path)
    elif percentage=='100':
        txt_path = RAW_DATA_FOLDER + 'Sentences_AllAgree.txt'
        csv_path = DATASET_FOLDER + 'sentences_100.csv'
        txt_to_csv(txt_path, csv_path)
    else:
        print("The percentage provided is not admitted, skipping.")
        return

    df = pd.read_csv(csv_path)

    le = LabelEncoder()
    le.fit(df['Sentiment'])
    df['Sentiment_encoded'] = le.transform(df['Sentiment'])

    return df, le

In [None]:
def create_dataloaders(train_df, val_df, test_df=None, vocab_inst=None):
    if vocab_inst is None:
        vocab_inst = build_vocab(train_df)

    train_dataset = TextDataset(train_df, vocab_inst)
    val_dataset = TextDataset(val_df, vocab_inst)
    test_dataset = TextDataset(test_df, vocab_inst) if test_df is not None else None

    train_loader = DataLoader(train_dataset, batch_size=32, collate_fn=collate_fn, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, collate_fn=collate_fn)
    test_loader = DataLoader(test_dataset, batch_size=32, collate_fn=collate_fn) if test_df is not None else None

    return train_loader, val_loader, test_loader, vocab_inst


The following functions are needed to create token vocabulary and dataloaders

In [None]:
def build_vocab(df):
    counter = Counter()
    for text in df['Sentence']:
        counter.update(preprocess_text(text))

    vocab_inst = vocab(counter, specials=["<pad>", "<unk>"])
    vocab_inst.set_default_index(vocab_inst["<unk>"])
    return vocab_inst

In [None]:
def encode_text(text, vocab_inst):
    tokens = preprocess_text(text)
    return [vocab_inst[token] for token in tokens]

In [None]:
def collate_fn(batch, vocab_inst):
    texts, labels = zip(*batch)
    padded_texts = pad_sequence(texts, batch_first=True, padding_value=vocab_inst["<pad>"])
    labels = torch.stack(labels)
    return padded_texts, labels

## Hyperparameter tuning

TODO

In [None]:
def objective(
    trial, 
    df, 
    model_class,
    device
    ):
    # Sample hyperparameters
    hidden_size = trial.suggest_int("hidden_size", 64, 256)
    lr = trial.suggest_float("lr", 1e-4, 1e-2, log=True)
    num_layers = trial.suggest_int("num_layers", 1, 3)

    accuracies = []

    # Cross-validation loop
    for train_idx, val_idx in skf.split(df, df['Sentiment_encoded']):
        train_df = df.iloc[train_idx].reset_index(drop=True)
        val_df = df.iloc[val_idx].reset_index(drop=True)

        # Build vocab only on training fold
        vocab_inst = build_vocab(train_df)

        train_loader, val_loader, _, _ = create_dataloaders(train_df, val_df, vocab_inst=vocab_inst)

        # Define model (custom RNN or LSTM)
        model = model_class(vocab_size=len(vocab_inst), hidden_size=hidden_size, num_layers=num_layers).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        loss_crit = nn.CrossEntropyLoss()

        # Train for 1–3 epochs for speed (or early stopping)
        for _ in range(3):
            training_epoch(model, optimizer, loss_crit, train_loader, device)

        # Evaluate on validation fold
        model.eval()
        correct, total = 0, 0
        with torch.no_grad():
            for x_val, y_val in val_loader:
                x_val, y_val = x_val.to(device), y_val.to(device)
                output = model(x_val)
                pred = torch.argmax(output, dim=1)
                correct += (pred == y_val).sum().item()
                total += y_val.size(0)

        fold_acc = correct / total
        accuracies.append(fold_acc)

    # Return average CV accuracy
    return sum(accuracies) / len(accuracies)


## Training

Define training and validation epochs

In [None]:
def training_epoch(model, optimizer, loss_crit, train_loader, device):
    model.train()
    avg_train_loss = 0
    correct_train = 0
    total_train = 0
    
    # Load batches and train
    for batch_x, batch_y in train_loader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)  # Move data to device
        optimizer.zero_grad()
        outputs = model(batch_x)

        # Compute loss
        loss = loss_crit(outputs, batch_y)
        loss.backward()
        optimizer.step()

        # Accumulate train loss
        avg_train_loss += loss.item() * batch_x.size(0)

        # Accumulate train accuracy
        _, predicted = torch.max(outputs, 1)
        correct_train += (predicted == batch_y).sum().item()
        total_train += batch_y.size(0)
    
    # Average training loss and accuracy over batches
    avg_train_loss /= total_train
    avg_train_accuracy = correct_train / total_train

    return avg_train_loss, avg_train_accuracy

In [None]:
def validation_epoch(model, criterion, val_loader, device):
    model.eval()
    avg_val_loss = 0
    correct_val = 0
    total_val = 0
    
    with torch.no_grad(): # No training
        for batch_x, batch_y in val_loader:
            batch_x, batch_y = batch_x.to(device), batch_y.to(device)  # Move data to device
            outputs = model(batch_x)

            # Compute loss
            loss = criterion(outputs, batch_y)

            # Accumulate train loss
            avg_val_loss += loss.item() * batch_x.size(0)

            # Accumulate validation accuracy
            _, predicted = torch.max(outputs, 1)
            correct_val += (predicted == batch_y).sum().item()
            total_val += batch_y.size(0)

    
    # Average validation loss for the fold
    avg_val_loss /= total_val
    avg_val_accuracy = correct_val / total_val
    
    return avg_val_loss, avg_val_accuracy