In [1]:
import torch
import torch.nn as nn
import pandas as pd
from tqdm import tqdm

from collections import Counter
from typing import Dict, List
import re

# Choose device

In [4]:
device: str = "cpu"

if torch.cuda.is_available():
    device = "cuda"
elif torch.backends.mps.is_available():
    device = "mps"

# Dataset

In [5]:
def clean_text(text: str) -> str:
    text = re.sub(r'<[^>]+>', '', text)
        
    pattern = r'([.,!?-])'
    s = re.sub(pattern, r' \1 ', text)    
    s = re.sub(r'\s{2,}', ' ', s)            
    text = s

    text = re.sub(r' +', ' ', text)

    text = re.sub(r'[^\x00-\x7f]', r'', text)

    emojis = re.compile(
        '['
        u'\U0001F600-\U0001F64F'  # emoticons
        u'\U0001F300-\U0001F5FF'  # symbols & pictographs
        u'\U0001F680-\U0001F6FF'  # transport & map symbols
        u'\U0001F1E0-\U0001F1FF'  # flags (iOS)
        u'\U00002702-\U000027B0'
        u'\U000024C2-\U0001F251'
        ']+',
        flags=re.UNICODE
    )
    text = emojis.sub(r'', text)
    text = re.sub(r'(.)\1+', r'\1\1', text)

    return text


In [6]:
data = pd.read_csv("../data/IMDB.csv")

sentences = data["review"].values
words = " ".join(sentences)
words = clean_text(words)
words = words.split()

In [7]:
class Tokenizer:
    def __init__(self, words: List[str], clean_text: bool = False) -> None:
        counter = Counter(words)
        self.vocab = sorted(counter, key=counter.get, reverse=True)
        self.int2word = dict(enumerate(self.vocab, 1))
        self.int2word[0] = "<PAD>"
        self.word2int = {word: id for id, word in self.int2word.items()}
        
    def __len__(self) -> int:
        return len(self.vocab)
    
    def tokenize(self, sentence: str) -> List[str]:
        return sentence.split()
    
    def convert_tokens_to_ids(self, tokens: List[str]) -> List[int]:
        result = []
        for word in tokens:
          if word in self.word2int:
            result.append(self.word2int[word])
        return result
    
    def convert_ids_to_tokens(self, ids: List[int]) -> List[str]:
            return [self.int2word[id] for id in ids]
    
tokenizer = Tokenizer(words)
        

In [8]:
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split

class IMDBDataset(Dataset):
    def __init__(self, root: str = "../data/IMDB.csv", train: bool = True, test_size: float = 0.33, tokenizer: Tokenizer = None) -> None:
        super().__init__()

        data = pd.read_csv(root)
        data = pd.get_dummies(data, columns=["sentiment"])
        data = data.rename({"review": "sentence", 
                            "sentiment_negative": "negative", 
                            "sentiment_positive": "positive"}, axis=1)
        
        X = data["sentence"].values
        y = data.drop(["sentence"], axis=1).values

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size)

        if train:
            self.X, self.y = X_train, y_train
        else:
            self.X, self.y = X_test, y_test

        self.tokenizer: Tokenizer = tokenizer

    def __len__(self) -> int:
        return len(self.X)
    
    def __getitem__(self, idx):
        sentence = clean_text(self.X[idx])
        sentence = self.X[idx]
        tokens = self.tokenizer.tokenize(sentence)
        ids = self.tokenizer.convert_tokens_to_ids(tokens)

        ids = torch.tensor(ids, dtype=torch.long)
        labels = torch.tensor(self.y[idx], dtype=torch.float32)
        return ids, labels

In [9]:
from torch.utils.data import DataLoader

train_dataset = IMDBDataset(tokenizer=tokenizer)
test_dataset = IMDBDataset(train=False, tokenizer=tokenizer)

batch_size: int = 32

train_loader = DataLoader(train_dataset)
test_loader = DataLoader(test_dataset)

# Model

In [10]:
class EmotionalLSTM(nn.Module):
    def __init__(self, 
                 vocab_size: int, 
                 emb_size: int, 
                 hidden_size: int, 
                 num_stacked_layers: int = 3, 
                 dropout: float = 0.2) -> None:
        super().__init__()

        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.lstm = nn.LSTM(emb_size, hidden_size, num_stacked_layers, dropout=dropout, batch_first=True)
        self.dropout = nn.Dropout(0.3)
        self.fc = nn.Linear(hidden_size, 2)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x: torch.LongTensor) -> torch.Tensor:
        x = self.embedding(x)
        out, _ = self.lstm(x)
        out = out[:,-1,:]
        out = self.dropout(out)
        out = self.fc(out)
        out = self.sigmoid(out)

        return out

vocab_size: int = 10000
emb_size: int = 256
hidden_size: int = 512
num_stacked_layers: int = 2
dropout: float = 0.25

model: EmotionalLSTM = EmotionalLSTM(vocab_size, emb_size, hidden_size, num_stacked_layers, dropout).to(device)

print(model)

EmotionalLSTM(
  (embedding): Embedding(10000, 256)
  (lstm): LSTM(256, 512, num_layers=2, batch_first=True, dropout=0.25)
  (dropout): Dropout(p=0.3, inplace=False)
  (fc): Linear(in_features=512, out_features=2, bias=True)
  (sigmoid): Sigmoid()
)


# Train

In [11]:
optimizer: torch.optim.Optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion: nn.CrossEntropyLoss = nn.CrossEntropyLoss()

num_epochs: int = 8

for epoch in range(num_epochs):
    model.train()
    train_loss: float = 0.0
    for i, (sentences, labels) in enumerate(tqdm(train_loader)):
        sentences = sentences.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()

        outputs = model(sentences)
        loss = criterion(outputs, labels)

        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    train_loss /= len(train_loader)
    print(f"Epoch: {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}")


100%|██████████| 33500/33500 [55:37<00:00, 10.04it/s]   


Epoch: 1/8, Train Loss: 0.6937


100%|██████████| 33500/33500 [48:22<00:00, 11.54it/s]    


Epoch: 2/8, Train Loss: 0.6931


100%|██████████| 33500/33500 [32:49<00:00, 17.01it/s]   


Epoch: 3/8, Train Loss: 0.6931


100%|██████████| 33500/33500 [37:30<00:00, 14.89it/s]   


Epoch: 4/8, Train Loss: 0.6931


100%|██████████| 33500/33500 [1:06:44<00:00,  8.37it/s]   


Epoch: 5/8, Train Loss: 0.6931


100%|██████████| 33500/33500 [50:42<00:00, 11.01it/s]    


Epoch: 6/8, Train Loss: 0.6931


100%|██████████| 33500/33500 [54:34<00:00, 10.23it/s]    


Epoch: 7/8, Train Loss: 0.6931


100%|██████████| 33500/33500 [57:14<00:00,  9.76it/s]   

Epoch: 8/8, Train Loss: 0.6931





In [15]:
accuracy = 0
total_samples = 0

loader = test_loader

model.eval()  # Set the model to evaluation mode
for batch, (X, y) in enumerate(tqdm(loader)):
    with torch.no_grad():
        y_pred = model(X.to(device))
    
    y = y.to(device)
    _, predicted = torch.max(y_pred, 1)
    
    # Convert one-hot encoded y to class indices if needed
    if y.dim() == 2:
        y = torch.argmax(y, dim=1)
    
    # Debug prints for shapes
    correct_predictions = (predicted == y).sum().item()
    accuracy += correct_predictions
    total_samples += y.size(0)

accuracy / (len(loader) * loader.batch_size)

100%|██████████| 16500/16500 [06:12<00:00, 44.32it/s]  


0.49733333333333335