# Home Exercise 1 on Text Classification

Implement a **Recurrent Neural Network model** (**Vanilla RNN, GRU, and LSTM**) to predict whether a review is positive or negative.

- **Data**: [IMDB Dataset of 50K Movie Reviews](https://www.kaggle.com/datasets/lakshmi25npathi/imdb-dataset-of-50k-movie-reviews) (the last 10% of rows serve as the test set).
- **Compare** the performance of the three models.

**Note**: Submit only a **single Jupyter Notebook file** that can handle all tasks, including data downloading, preprocessing, model training, and model evaluation. *(Submissions that do not follow the guidelines will receive a score of 0.)*

**Grading Criteria**

For valid submissions, scores will be assigned based on the **leaderboard ranking** (**strictly greater**):

- **Top 25%** → **10 points**
- **25% - 50%** → **9.0 points**
- **50% - 75%** → **8.0 points**
- **75% - 100%** → **7.0 points**


# Import Libs

In [1]:
%pip install numpy pandas tensorflow scikit-learn kagglehub nltk matplotlib


Note: you may need to restart the kernel to use updated packages.


In [None]:
import json
import numpy as np
import pandas as pd
import tensorflow as tf
import kagglehub
import os
import re
import time 
import string
import nltk
import matplotlib.pyplot as plt
from nltk.corpus import stopwords

from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

import torch
import torch.nn as nn

import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn as nn
import time
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# added
# from tensorflow.keras.optimizers import Adam
# from tensorflow.keras.callbacks import ReduceLROnPlateau

# Download necessary NLTK resources
nltk.download("stopwords")


[nltk_data] Downloading package stopwords to /usr/share/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [3]:
# # Download latest version
# path = kagglehub.dataset_download("lakshmi25npathi/imdb-dataset-of-50k-movie-reviews")

# print("Path to dataset files:", path)

# # Define the dataset path (update this if needed)
# dataset_file = os.path.join(path, "IMDB Dataset.csv")  # Ensure correct file name

# # Load the dataset
# df = pd.read_csv(dataset_file)

# # Convert sentiments to binary labels
# df['sentiment'] = df['sentiment'].map({'positive': 1, 'negative': 0})

# # Split data
# train_texts, test_texts, train_labels, test_labels = train_test_split(df['review'], df['sentiment'], test_size=0.1, random_state=42)

# # Tokenization and padding
# tokenizer = Tokenizer(num_words=MAX_VOCAB_SIZE, oov_token="<OOV>")
# tokenizer.fit_on_texts(train_texts)

# train_sequences = tokenizer.texts_to_sequences(train_texts)
# test_sequences = tokenizer.texts_to_sequences(test_texts)

# train_padded = pad_sequences(train_sequences, maxlen=MAX_SEQUENCE_LENGTH, padding='post', truncating='post')
# test_padded = pad_sequences(test_sequences, maxlen=MAX_SEQUENCE_LENGTH, padding='post', truncating='post')

# # Convert labels to numpy arrays
# train_labels = np.array(train_labels)
# test_labels = np.array(test_labels)


In [None]:
# Download latest version
path = kagglehub.dataset_download("lakshmi25npathi/imdb-dataset-of-50k-movie-reviews")

print("Path to dataset files:", path)

# Define the dataset path (update this if needed)
dataset_file = os.path.join(path, "IMDB Dataset.csv")  # Ensure correct file name

# Load the dataset
df = pd.read_csv(dataset_file)

# Convert sentiments to binary labels
df['sentiment'] = df['sentiment'].map({'positive': 1, 'negative': 0})

# Define stopwords and chat word dictionary
stop_words = set(stopwords.words("english"))

chat_words = {
    "afaik": "as far as i know", "afk": "away from keyboard", "asap": "as soon as possible",
    "lol": "laughing out loud", "imho": "in my honest opinion", "fyi": "for your information",
    "brb": "be right back", "btw": "by the way", "idk": "i don't know", "ttyl": "talk to you later",
    "omg": "oh my god", "gtg": "got to go", "wtf": "what the f...", "rofl": "rolling on the floor laughing",
    "u": "you", "bff": "best friends forever", "cya": "see you", "jk": "just kidding",
    "idc": "i don't care", "ily": "i love you", "imu": "i miss you"
}

# Function to expand chat words
def expand_chat_words(text, chat_words_dict):
    words = text.split()
    expanded_text = " ".join([chat_words_dict.get(word.upper(), word) for word in words])
    return expanded_text

# Preprocessing function
def preprocess_text(text):
    # Convert to lowercase
    text = text.lower()

    # Remove HTML tags
    text = re.sub('<[^<]+?>', '', text)

    # Remove URLs
    text = re.sub(r"http\S+|www\S+|https\S+", "", text, flags=re.MULTILINE)

    # Remove punctuations
    text = text.translate(str.maketrans("", "", string.punctuation))

    # Expand chat words
    text = expand_chat_words(text, chat_words)

    # Remove stopwords
    # text = " ".join([word for word in text.split() if word not in stop_words])

    return text

# Apply preprocessing to the dataset
df["cleaned_review"] = df["review"].apply(preprocess_text)

# Split data
# train_texts, test_texts, train_labels, test_labels = train_test_split(df["cleaned_review"], df["sentiment"], test_size=0.1, random_state=42)
# Use the last 10% of the data as the test set (no shuffling)
split_index = int(len(df) * 0.9)
train_texts = df["cleaned_review"].iloc[:split_index]
test_texts = df["cleaned_review"].iloc[split_index:]
train_labels = df["sentiment"].iloc[:split_index]
test_labels = df["sentiment"].iloc[split_index:]

# Tokenization and padding
MAX_VOCAB_SIZE = 10000
MAX_SEQUENCE_LENGTH = 200

tokenizer = Tokenizer(num_words=MAX_VOCAB_SIZE, oov_token="<OOV>")
tokenizer.fit_on_texts(train_texts)

train_sequences = tokenizer.texts_to_sequences(train_texts)
test_sequences = tokenizer.texts_to_sequences(test_texts)

train_padded = pad_sequences(train_sequences, maxlen=MAX_SEQUENCE_LENGTH, padding='post', truncating='post')
test_padded = pad_sequences(test_sequences, maxlen=MAX_SEQUENCE_LENGTH, padding='post', truncating='post')

# Convert labels to numpy arrays
train_labels = np.array(train_labels)
test_labels = np.array(test_labels)

# Split the training data into training and validation sets (80% - 20%)
train_padded, val_padded, train_labels, val_labels = train_test_split(
    train_padded, train_labels, test_size=0.2, random_state=42
)

# Print the sizes of each set
print(f"Train size: {len(train_padded)}")
print(f"Validation size: {len(val_padded)}")
print(f"Test size (untouched): {len(test_padded)}")


Path to dataset files: /kaggle/input/
Original Review: One of the other reviewers has mentioned that after watching just 1 Oz episode you'll be hooked. They are right, as this is exactly what happened with me.<br /><br />The first thing that struck me about Oz was its brutality and unflinching scenes of violence, which set in right from the word GO. Trust me, this is not a show for the faint hearted or timid. This show pulls no punches with regards to drugs, sex or violence. Its is hardcore, in the classic use of the word.<br /><br />It is called OZ as that is the nickname given to the Oswald Maximum Security State Penitentary. It focuses mainly on Emerald City, an experimental section of the prison where all the cells have glass fronts and face inwards, so privacy is not high on the agenda. Em City is home to many..Aryans, Muslims, gangstas, Latinos, Christians, Italians, Irish and more....so scuffles, death stares, dodgy dealings and shady agreements are never far away.<br /><br />I 

In [None]:
class VanillaRNN(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(VanillaRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.hidden_size = hidden_size
        
        # Weight matrices and biases
        self.Wx = nn.Linear(embedding_dim, hidden_size, bias=False)  # Shape: (hidden_size, embedding_dim)
        self.Wh = nn.Linear(hidden_size, hidden_size)  # Shape: (hidden_size, hidden_size)
        self.Wy = nn.Linear(hidden_size, output_size)  # Shape: (output_size, hidden_size)
        
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        x = self.embedding(x)  # Shape: (batch_size, seq_len, embedding_dim)
        batch_size, seq_len, _ = x.size()
        
        h = torch.zeros(batch_size, self.hidden_size).to(x.device)  # Initial hidden state
        
        for t in range(seq_len):
            x_t = x[:, t, :]  # Shape: (batch_size, embedding_dim)
            h = self.tanh(self.Wx(x_t) + self.Wh(h))  # Shape: (batch_size, hidden_size)
        
        out = self.Wy(h)  # Shape: (batch_size, output_size)
        return self.sigmoid(out)


In [None]:
class GRU(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(GRU, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.hidden_size = hidden_size
        
        # Weight matrices for GRU
        self.Wz = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Update gate
        self.Wr = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Reset gate
        self.Wh = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Candidate hidden state
        self.Wy = nn.Linear(hidden_size, output_size)  # Output layer
        
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()
        
    def forward(self, x):
        """
        x: Input tensor of shape (batch_size, seq_len)
        """
        x = self.embedding(x)  # Shape: (batch_size, seq_len, embedding_dim)
        batch_size, seq_len, _ = x.size()
        
        h = torch.zeros(batch_size, self.hidden_size).to(x.device)  # Initial hidden state of shape (batch_size, hidden_size)
        
        for t in range(seq_len):  # Loop through each time step
            x_t = x[:, t, :]  # Shape: (batch_size, embedding_dim)
            
            # Concatenate input and previous hidden state
            hx = torch.cat((x_t, h), dim=1)  # Shape: (batch_size, embedding_dim + hidden_size)
            
            # Compute gates
            z = self.sigmoid(self.Wz(hx))  # Update gate. Shape: (batch_size, hidden_size)
            r = self.sigmoid(self.Wr(hx))  # Reset gate. Shape: (batch_size, hidden_size)
            
            # Compute candidate hidden state
            rh = torch.cat((x_t, r * h), dim=1)  # Shape: (batch_size, embedding_dim + hidden_size)
            h_hat = self.tanh(self.Wh(rh))  # Candidate hidden state. Shape: (batch_size, hidden_size)
            
            # Compute new hidden state
            h = (1 - z) * h + z * h_hat  # Shape: (batch_size, hidden_size)
        
        out = self.Wy(h)  # Shape: (batch_size, output_size)
        return self.sigmoid(out)


In [None]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_size, output_size):
        super(LSTM, self).__init__()
        
        # Embedding layer to convert word indices to dense vectors
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # Hidden state size of the LSTM
        self.hidden_size = hidden_size
        
        # Weight matrices for gates and candidate cell state
        self.Wf = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Forget gate
        self.Wi = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Input gate
        self.Wo = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Output gate
        self.Wc = nn.Linear(embedding_dim + hidden_size, hidden_size)  # Candidate cell state
        
        # Output layer
        self.Wy = nn.Linear(hidden_size, output_size)
        
        # Activation functions
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()
        
    def forward(self, x):
        """
        Forward pass for LSTM model
        
        Arguments:
        x -- Input tensor of shape (batch_size, seq_len)
        
        Returns:
        out -- Output tensor of shape (batch_size, output_size)
        """
        
        # Embedding layer transforms input indices to dense vectors
        x = self.embedding(x)  # Shape: (batch_size, seq_len, embedding_dim)
        
        # Get batch size and sequence length from input
        batch_size, seq_len, _ = x.size()
        
        # Initialize hidden state and cell state with zeros
        h = torch.zeros(batch_size, self.hidden_size).to(x.device)  # Shape: (batch_size, hidden_size)
        c = torch.zeros(batch_size, self.hidden_size).to(x.device)  # Shape: (batch_size, hidden_size)
        
        for t in range(seq_len):  # Iterate over each time step
            x_t = x[:, t, :]  # Extract the embedding for the current time step. Shape: (batch_size, embedding_dim)
            
            # Concatenate input and hidden state
            hx = torch.cat((x_t, h), dim=1)  # Shape: (batch_size, embedding_dim + hidden_size)
            
            # Forget gate: Decide what to forget from the cell state
            f = self.sigmoid(self.Wf(hx))  # Shape: (batch_size, hidden_size)
            
            # Input gate: Decide what information to add to the cell state
            i = self.sigmoid(self.Wi(hx))  # Shape: (batch_size, hidden_size)
            
            # Output gate: Decide what part of the cell state to output
            o = self.sigmoid(self.Wo(hx))  # Shape: (batch_size, hidden_size)
            
            # Candidate cell state
            c_hat = self.tanh(self.Wc(hx))  # Shape: (batch_size, hidden_size)
            
            # Update cell state: Combining forget gate, input gate, and candidate cell state
            c = f * c + i * c_hat  # Shape: (batch_size, hidden_size)
            
            # Compute the new hidden state
            h = o * self.tanh(c)  # Shape: (batch_size, hidden_size)
        
        # Compute final output through a dense layer
        out = self.Wy(h)  # Shape: (batch_size, output_size)
        
        # Apply sigmoid activation for binary classification
        return self.sigmoid(out)


In [None]:
# Device configuration
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Model Hyperparameters
VOCAB_SIZE = min(len(tokenizer.word_index) + 1, MAX_VOCAB_SIZE)
EMBEDDING_DIM = 128
HIDDEN_SIZE = 64
OUTPUT_SIZE = 1
EPOCHS = 10
BATCH_SIZE = 64
LEARNING_RATE = 0.001


In [None]:
# Create TensorDatasets
train_data = TensorDataset(torch.tensor(train_padded, dtype=torch.long), torch.tensor(train_labels, dtype=torch.float32))
val_data = TensorDataset(torch.tensor(val_padded, dtype=torch.long), torch.tensor(val_labels, dtype=torch.float32))
test_data = TensorDataset(torch.tensor(test_padded, dtype=torch.long), torch.tensor(test_labels, dtype=torch.float32))

# Create DataLoaders
train_loader = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_data, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
def train_and_validate(model, train_loader, val_loader, epochs, learning_rate, model_name):
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    train_losses, val_losses = [], []
    train_accuracies, val_accuracies = [], []

    model.to(device)

    for epoch in range(epochs):
        # Training Phase
        model.train()
        total_loss = 0
        all_preds = []
        all_labels = []

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            
            optimizer.zero_grad()
            outputs = model(inputs).squeeze()
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            all_preds.extend((outputs > 0.5).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
        
        train_loss = total_loss / len(train_loader)
        train_losses.append(train_loss)
        train_accuracy = accuracy_score(all_labels, all_preds)
        train_accuracies.append(train_accuracy)

        # Validation Phase
        model.eval()
        val_loss = 0
        all_preds = []
        all_labels = []

        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs).squeeze()
                loss = criterion(outputs, labels)
                val_loss += loss.item()
                all_preds.extend((outputs > 0.5).cpu().numpy())
                all_labels.extend(labels.cpu().numpy())

        val_loss /= len(val_loader)
        val_losses.append(val_loss)
        val_accuracy = accuracy_score(all_labels, all_preds)
        val_accuracies.append(val_accuracy)

        print(f"Epoch [{epoch+1}/{epochs}]")
        print(f"Train Loss: {train_loss:.4f} - Train Acc: {train_accuracy:.4f}")
        print(f"Val Loss: {val_loss:.4f} - Val Acc: {val_accuracy:.4f}")
    
    return train_losses, val_losses, train_accuracies, val_accuracies


In [None]:
def evaluate_model(model, test_loader, model_name):
    model.eval()
    all_preds = []
    all_labels = []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs).squeeze()
            all_preds.extend((outputs > 0.5).cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds)
    recall = recall_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)
    
    print(f"\nTest Performance for {model_name}:")
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    
    return accuracy, precision, recall, f1


In [None]:
import matplotlib.pyplot as plt

def plot_training_and_validation(history_dict, metric_index, ylabel):
    plt.figure(figsize=(10, 6))
    for model_name, history in history_dict.items():
        metric = history[metric_index]
        val_metric = history[metric_index + 1]
        
        plt.plot(metric, label=f'{model_name} - Train')
        plt.plot(val_metric, label=f'{model_name} - Validation', linestyle='--')
    
    plt.title(f'Training and Validation {ylabel}')
    plt.xlabel('Epochs')
    plt.ylabel(ylabel)
    plt.legend()
    plt.show()



In [None]:
models = {
    "Vanilla RNN": VanillaRNN(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_SIZE, OUTPUT_SIZE),
    "GRU": GRU(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_SIZE, OUTPUT_SIZE),
    "LSTM": LSTM(VOCAB_SIZE, EMBEDDING_DIM, HIDDEN_SIZE, OUTPUT_SIZE)
}

history_dict = {}

for name, model in models.items():
    print(f"\nTraining {name}...")
    history = train_and_validate(model, train_loader, val_loader, EPOCHS, LEARNING_RATE, name)
    history_dict[name] = history


In [None]:
# Plot Loss
plot_training_and_validation(history_dict, 0, 'Loss')

# Plot Accuracy
plot_training_and_validation(history_dict, 2, 'Accuracy')


In [None]:
test_results = {}

for name, model in models.items():
    print(f"\nEvaluating {name} on the Test Set...")
    test_metrics = evaluate_model(model, test_loader, name)
    test_results[name] = test_metrics


## Comparison of RNN, GRU, and LSTM Models



In [None]:
# Compile results into a DataFrame
results = []

for name, metrics in test_results.items():
    results.append({
        "Model": name,
        "Accuracy": metrics[0],
        "Precision": metrics[1],
        "Recall": metrics[2],
        "F1-Score": metrics[3],
        "Loss": history_dict[name][1][-1],  # Final validation loss
        "Training Time (s)": sum(history_dict[name][0]),  # Sum of training times per epoch
        "Inference Time (s)": sum(history_dict[name][1])  # Sum of inference times per epoch
    })

# Convert to DataFrame
results_df = pd.DataFrame(results)
results_df.head()

# Save the full results to CSV
results_csv_path = os.path.join(os.getcwd(), "imdb_models_performance.csv")
results_df.to_csv(results_csv_path, index=False)
print(f"Models performance saved to: {results_csv_path}")

Unnamed: 0,Model,Accuracy,Precision,Recall,F1-Score,Loss,Training Time (s),Inference Time (s)
0,Vanilla RNN,0.531,0.531807,0.423077,0.471251,1.480807,291.630708,1.986262
1,GRU,0.855,0.863997,0.838462,0.851038,0.897509,174.433492,0.788113
2,LSTM,0.8486,0.852905,0.838057,0.845416,0.828926,173.513572,0.841296


In [None]:
# Find the best model for each metric
best_models = {}

for metric in ["Accuracy", "Precision", "Recall", "F1-Score", "Loss", "Training Time (s)", "Inference Time (s)"]:
    if metric in ["Loss", "Training Time (s)", "Inference Time (s)"]:
        best_models[metric] = results_df.loc[results_df[metric].idxmin(), ["Model", metric]].to_dict()
    else:
        best_models[metric] = results_df.loc[results_df[metric].idxmax(), ["Model", metric]].to_dict()

# Convert best models dictionary to DataFrame
best_models_df = pd.DataFrame(best_models).T.reset_index().rename(columns={"index": "Metric", "Model": "Best Model", 0: "Best Value"})
best_models_df.head(10)

# Save the best models per metric to CSV
best_models_csv_path = os.path.join(os.getcwd(), "imdb_best_models_per_metric.csv")
best_models_df.to_csv(best_models_csv_path, index=False)
print(f"Best models per metric saved to: {best_models_csv_path}")


Models performance saved to: /kaggle/working/imdb_models_performance.csv


Unnamed: 0,Metric,Best Model,Accuracy,Precision,Recall,F1-Score,Loss,Training Time (s),Inference Time (s)
0,Accuracy,GRU,0.855,,,,,,
1,Precision,GRU,,0.863997,,,,,
2,Recall,GRU,,,0.838462,,,,
3,F1-Score,GRU,,,,0.851038,,,
4,Loss,LSTM,,,,,0.828926,,
5,Training Time (s),LSTM,,,,,,173.513572,
6,Inference Time (s),GRU,,,,,,,0.788113


Best models per metric saved to: /kaggle/working/imdb_best_models_per_metric.csv
