# GRU

Implementation based on code from notebook: 1. Encoder-Decoder Seq2Seq.ipynb

In [None]:
import re, torch, pickle, json
from torch.utils.data import DataLoader, TensorDataset
import nltk
import numpy as np
import time

SEED = 42
np.random.seed(SEED)
torch.manual_seed(SEED)

# Check the available device
if torch.cuda.is_available():
    device = "cuda"
    torch.cuda.manual_seed_all(SEED)
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"

print(f"Using device: {device}")

# Load the datasets
with open("data/train.pkl", "rb") as f:
    train = pickle.load(f)
with open("data/val.pkl", "rb") as f:
    val = pickle.load(f)
with open("data/test.pkl", "rb") as f:
    test = pickle.load(f)

# Remap original labels {0,4} -> contiguous ids {0,1} for modeling
# (Keep a copy of the original labels for reporting/debugging.)
LABEL_MAP = {0: 0, 4: 1}

for df_name, df in [("train", train), ("val", val), ("test", test)]:
    df["label_original"] = df["label"]
    mapped = df["label_original"].map(LABEL_MAP)

    df["label"] = mapped.astype(int)

num_labels = train["label"].nunique()
print("Number of labels (label): ", num_labels)

# With remapping, class ids are stable
label_0 = 0  # Negative
label_4 = 1  # Positive
print(f"Label 0: {label_0} and label 4: {label_4}")

train.head(2)

# Text Preprocessing and Vocabulary Building

In [None]:
# ---------------------------------------
# Text Preprocessing and Vocabulary Building
# ---------------------------------------
from collections import Counter

# Token dictionary was already created in lstm.ipynb
with open('tokens2ids.json', 'r') as f:
    vocab = json.load(f)
with open('ids2tokens.json', 'r') as f:
    idx_to_word = json.load(f)

# -------------------------
# HELPER FUNCTIONS
# -------------------------
# Function to tokenize text (same as in RNN notebook)
def preprocess_text(text):
    """Convert text to lowercase tokens."""
    if not isinstance(text, str):
        return []
    # Keep only letters and whitespaces
    text = re.sub(r"[^a-zA-Z\s]+", " ", text)
    text = re.sub(r"\s+", " ", text).strip()
    text = text.lower()
    # Tokenize using NLTK
    tokens = nltk.word_tokenize(text)
    return tokens
def tokens_to_indices(tokens, vocab, max_length=None):
    """Convert list of tokens to list of indices."""
    indices = [vocab.get(token, vocab['<UNK>']) for token in tokens]
    if max_length:
        # Truncate or pad to max_length
        if len(indices) > max_length:
            indices = indices[:max_length]
        else:
            indices = indices + [vocab['<PAD>']] * (max_length - len(indices))
    return indices
# -------------------------
max_seq_length = 100  # Same as in the notebook Encoder-Decoder Seq2Seq.ipynb (MAX_LENGTH_INPUT = 100)

# Tokenize all datasets
print("Tokenizing datasets...")
train['tokens'] = train['text of the tweet'].apply(preprocess_text)
val['tokens'] = val['text of the tweet'].apply(preprocess_text)
test['tokens'] = test['text of the tweet'].apply(preprocess_text)

# Convert all datasets to sequences of indices
print("Converting text to sequences of indices...")
train['sequences'] = train['tokens'].apply(
    lambda tokens: tokens_to_indices(tokens, vocab, max_seq_length)
)
val['sequences'] = val['tokens'].apply(
    lambda tokens: tokens_to_indices(tokens, vocab, max_seq_length)
)
test['sequences'] = test['tokens'].apply(
    lambda tokens: tokens_to_indices(tokens, vocab, max_seq_length)
)

# Bi-Directional GRU

In [None]:
import torch.nn as nn
from torch import optim

class BiGRU_Classifier(nn.Module):
    """
    A Bidirectional GRU-based module designed for sequence encoding
    and subsequent sentiment classification.
    """
    def __init__(self, vocab_size, embedding_dim, hidden_size, num_labels, dropout_p=0.1):
        super(BiGRU_Classifier, self).__init__()
        
        # --- Parameters ---
        self.hidden_size = hidden_size
        self.num_classes = num_labels
        self.num_directions = 2  # Fixed for BiGRU

        # --- Embedding Layer ---
        # input: (batch_size, seq_len) -> indices of tokens
        # output: (batch_size, seq_len, embedding_dim) -> dense word vectors
        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        
        # --- Dropout Layer ---
        # Applied after embedding to regularize word vectors (Embedding Dropout)
        self.dropout = nn.Dropout(dropout_p)
        
        # --- BiGRU Layer ---
        # input_size: embedding_dim (the size of the input features per time step)
        # hidden_size: hidden_size (the output size of the hidden state for ONE direction)
        # batch_first=True: input shape is (batch_size, seq_len, features)
        # bidirectional=True: output_dim = 2 * hidden_size
        self.gru = nn.GRU(embedding_dim, hidden_size, batch_first=True, bidirectional=True)
        
        # --- Classification Layer (MLP) ---
        # The BiGRU combines the final forward and backward hidden states.
        # Input size to the Linear layer must be (2 * hidden_size)
        # Output size is num_labels (e.g., 2 for positive/negative)
        self.classifier = nn.Linear(self.num_directions * hidden_size, num_labels)

    def forward(self, input_tensor):
        # 1. Embedding
        # shape: (bs, seq_len) -> (bs, seq_len, embedding_dim)
        embedded = self.dropout(self.embedding(input_tensor))
    
        # 2. BiGRU Processing
        # output: (bs, seq_len, 2 * hidden_size) - full sequence output
        # hidden: state from both directions, shape: (2, bs, hidden_size)
        gru_out, hidden = self.gru(embedded)
        
        # 3. Aggregate Hidden States for Classification
        # Reshape hidden to (num_directions, batch_size, hidden_size)
        hidden = hidden.view(self.num_directions, -1, self.hidden_size)
        
        # Concatenate forward (hidden[-2]) and backward (hidden[-1]) final states
        final_hidden = torch.cat((hidden[-2, :, :], hidden[-1, :, :]), dim=1)
        
        # 4. Final Classification
        # input: (bs, 2 * hidden_size)
        # output: (bs, num_classes)
        prediction_logits = self.classifier(final_hidden)
        return prediction_logits

In [None]:
# Define hyperparameters
vocab_size = len(vocab)
embedding_dim = 128  # same as in notebook: 1. Encoder-Decoder Seq2Seq.ipynb
hidden_size = 128    # Common values: 64, 128, 256 - maybe decrease if training is too slow
learning_rate = 0.001
num_epochs = 30
batch_size = 64

print(f"Model hyperparameters:")
print(f"  vocab_size: {vocab_size:,}")
print(f"  embedding_dim: {embedding_dim}")
print(f"  hidden_size: {hidden_size}")
print(f"  num_labels: {num_labels}")

# Initialize model
model = BiGRU_Classifier(
    vocab_size=vocab_size, 
    embedding_dim=embedding_dim,
    hidden_size=hidden_size, 
    num_labels=num_labels
).to(device)

# Use CrossEntropyLoss for multi-class classification (2 classes: negative=0, positive=1)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Print the number of trainable parameters
num_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Number of trainable parameters: {num_trainable_params:,}")

In [None]:
# ---------------------------------------
# Prepare DataLoaders
# ---------------------------------------

# Convert sequences and labels to tensors
X_train = torch.tensor(train['sequences'].tolist(), dtype=torch.long)
y_train = torch.tensor(train['label'].values, dtype=torch.long)

X_val = torch.tensor(val['sequences'].tolist(), dtype=torch.long)
y_val = torch.tensor(val['label'].values, dtype=torch.long)

X_test = torch.tensor(test['sequences'].tolist(), dtype=torch.long)
y_test = torch.tensor(test['label'].values, dtype=torch.long)

# Create DataLoaders
train_dataset = TensorDataset(X_train, y_train)
val_dataset = TensorDataset(X_val, y_val)
test_dataset = TensorDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

print(f"Train batches: {len(train_loader)}")
print(f"Val batches: {len(val_loader)}")
print(f"Test batches: {len(test_loader)}")


In [None]:
# # Create mini versions of the train and val datasets (first 5 examples)
# mini_train = train[:5]
# mini_val = val[:5]

# X_train_mini = torch.tensor(mini_train['sequences'].tolist(), dtype=torch.long)
# y_train_mini = torch.tensor(mini_train['label'].values, dtype=torch.long)

# X_val_mini = torch.tensor(mini_val['sequences'].tolist(), dtype=torch.long)
# y_val_mini = torch.tensor(mini_val['label'].values, dtype=torch.long)

# mini_train_dataset = TensorDataset(X_train_mini, y_train_mini)
# mini_val_dataset = TensorDataset(X_val_mini, y_val_mini)

# mini_train_loader = DataLoader(mini_train_dataset, batch_size=batch_size, shuffle=True)
# mini_val_loader = DataLoader(mini_val_dataset, batch_size=batch_size, shuffle=False)

# print(f"MINI - Train batches: {len(mini_train_loader)}")
# print(f"MINI - Val batches: {len(mini_val_loader)}")

# Training Loop

In [None]:
# Training loop
losses = []
val_losses = []

best_val_loss = float("inf")
best_epoch = 0
patience = 0
max_patience = 3

# Start timing training
start_time = time.time()

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0
    count = 0
    for inputs, labels in train_loader:
        # inputs.shape = [bs, max_seq_len]
        inputs = inputs.to(device)
        # labels.shape = [bs]
        labels = labels.to(device)

        optimizer.zero_grad()
        # logits.shape: [batch_size, num_labels]
        logits = model(inputs)  
        # loss.shape: [batch_size] with class indices
        loss = criterion(logits, labels)  
        loss.backward()
        optimizer.step()

        total_loss += float(loss.item())
        count += 1
    
    average_loss = total_loss / max(count, 1)
    losses.append(average_loss)

    model.eval()
    total_val_loss = 0.0
    val_count = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            logits = model(inputs)  # Shape: [batch_size, 2]
            val_loss = criterion(logits, labels)  # labels: [batch_size] with class indices

            total_val_loss += float(val_loss.item())
            val_count += 1

    average_val_loss = total_val_loss / max(val_count, 1)
    val_losses.append(average_val_loss)

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {average_loss:.4f}, Val Loss: {average_val_loss:.4f}")

    # Save best checkpoint
    if average_val_loss < best_val_loss:
        best_val_loss = average_val_loss
        best_epoch = epoch
        torch.save(model.state_dict(), "gru_best_model.pth")
        patience = 0
    else:
        patience += 1

    if patience >= max_patience:
        print(f"Early stopped at {epoch+1}")
        break

# Calculate total training time
training_time = time.time() - start_time
print(f"Lowest Validation Loss: {best_val_loss:.4f} at Epoch {best_epoch + 1}")
print(f"Total Training Time: {training_time:.2f} seconds ({training_time/60:.2f} minutes)")

# Evaluation

## Evaluation Functions - Accuracy, Label Precision, Recall, F1, and Confusion Matric

In [None]:
from metrics import evaluate_model

## Train and Validation Sets

In [None]:
# Load the best model and calculate accuracy only for that
model.load_state_dict(torch.load("gru_best_model.pth", map_location=device))
model.to(device)

# Calculate number of trainable parameters
num_trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

# After training, evaluate on validation set (with confusion matrix plot)
val_metrics = evaluate_model(
    model, 
    device, 
    val_loader,
    label_0,
    label_4,
    plot_confusion_matrix=True,
    title="Validation Set Confusion Matrix",
    training_time_seconds=training_time,
    num_trainable_parameters=num_trainable_params,
    model_name="gru",
    dataset_split="val",
    save_results=True,
)

print(f"Validation Accuracy: {val_metrics['accuracy']:.2f}%")
print(f"Validation Metrics: {val_metrics}")



##  Test Sets

In [None]:
# Evaluate on test set (with confusion matrix plot)
test_metrics = evaluate_model(
    model, 
    device, 
    test_loader,
    label_0,
    label_4,
    plot_confusion_matrix=True,
    title="Test Set Confusion Matrix",
    training_time_seconds=training_time,
    num_trainable_parameters=num_trainable_params,
    model_name="gru",
    dataset_split="test",
    save_results=True,
)
print(f"Test Accuracy: {test_metrics['accuracy']:.2f}%")
