<a href="https://colab.research.google.com/github/zamanmiraz/DSandML-Notebooks/blob/main/Foundational_Math_for_Generative_AI/Chapter1Demo2_AttentionSentiment_pt.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
! pip install gensim
! pip install torch==2.3.0 torchtext==0.18.0

In [None]:
import torch
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')

print(device)

# Loading the dataset from Hugging Face
*imdb* dataset features contain 'text' and 'label'

In [None]:
from datasets import load_dataset
dataset = load_dataset('imdb')

## 1. Train Word2Vec embeddings
 - *word2Vec* is a model that learns to represents the word as vectors
 - Similar words end up with similar vectors
 - Parameters: sentences - a list of tokenized sentences, vector_size - # of dimensions of word vectors (higher dimension leads to heavier to compute), window - how many words before and after a target word to look at, min_count - Ignores word that appear fewer than this number of times, workers - # of cpu core to train

 ## 2. Create embedding matrix and Turn it into embedding layer

In [None]:
import torch
import torch.nn as nn
from torchtext.vocab import build_vocab_from_iterator
from collections import Counter
from gensim.models import Word2Vec
import numpy as np

# Hyperparameters
max_vocab_size = 25000
max_seq_len = 50
embedding_dim = 150
latent_dim = 512
output_dim = 2

# -------------------------------
# 1. Train Word2Vec embeddings
# -------------------------------
sentences = [text.split() for text in dataset['train']['text']]
word2vec_model = Word2Vec(
    sentences,
    vector_size=embedding_dim,
    window=5,
    min_count=1,
    workers=4
)

# -------------------------------
# 2. Build vocabulary with torchtext
# -------------------------------
def yield_tokens(sentences):
    for sent in sentences:
        yield sent

# build the vocab
vocab = build_vocab_from_iterator(
    yield_tokens(sentences),
    max_tokens=max_vocab_size,
    specials=['<unk>', '<pad>']
)

# set default index
unk_idx = vocab['<unk>']
pad_idx = vocab['<pad>']
vocab.set_default_index(unk_idx)

# -------------------------------
# 3. Create embedding matrix
# -------------------------------
def create_embedding_matrix(vocab, word2vec_model, embedding_dim):
    embedding_matrix = np.zeros((len(vocab), embedding_dim))
    for word, idx in vocab.get_stoi().items():
        if word in word2vec_model.wv:
            embedding_matrix[idx] = word2vec_model.wv[word]
        else:
            embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))
    return embedding_matrix

embedding_matrix = create_embedding_matrix(vocab, word2vec_model, embedding_dim)

# Convert to torch tensor for nn.Embedding
embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float32)

# -------------------------------
# 4. Define Embedding Layer
# -------------------------------
embedding_layer = nn.Embedding.from_pretrained(
    embeddings=embedding_matrix,
    freeze=False,          # True = keep pretrained fixed, False = allow fine-tuning
    padding_idx=pad_idx
)

print("Vocab size:", len(vocab))
print("Embedding layer shape:", embedding_layer.weight.shape)

In [None]:
import torch
from torchtext.data.functional import to_map_style_dataset
from torch.utils.data import DataLoader

# Convert the dataset to map-style
train_dataset = to_map_style_dataset(dataset['train'])
test_dataset = to_map_style_dataset(dataset['test'])

# Tokenization function
def tokenize_text(text):
    return text.split()

# Numericalization and padding function
def collate_batch(batch):
    label_list, text_list = [], []
    for sample in batch:  # sample is a dict like {'text': ..., 'label': ...}
        _text = sample['text']
        _label = sample['label']

        # Convert label (assumes numeric labels 0/1)
        label_list.append(int(_label))

        processed_text = torch.tensor(
            [vocab[token] for token in tokenize_text(_text)[:max_seq_len]],
            dtype=torch.int64
        )
        text_list.append(processed_text)

    # Pad sequences
    padded_text_list = torch.nn.utils.rnn.pad_sequence(
        text_list, batch_first=True, padding_value=pad_idx
    )

    return torch.tensor(label_list, dtype=torch.int64), padded_text_list

# Create DataLoaders
batch_size = 64

train_dataloader = DataLoader(
    train_dataset,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_batch
)

test_dataloader = DataLoader(
    test_dataset,
    batch_size=batch_size,
    shuffle=False,
    collate_fn=collate_batch
)

print("Train DataLoader created.")
print("Test DataLoader created.")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm.auto import tqdm

# -------------------------------
# 4. Define EncoderGRU
# -------------------------------

class EncoderGRU(nn.Module):
    def __init__(self, embedding_layer, latent_dim, dropout_rate=0.3):
        super().__init__()
        self.embedding = embedding_layer # embedding matrix into embedding layer
        self.dropout = nn.Dropout(dropout_rate) # Added Dropout
        self.rnn = nn.GRU(
            embedding_layer.embedding_dim,
            latent_dim,
            batch_first=True,
        )

    def forward(self, text):
        # text shape: (batch_size, seq_len)
        embedded = self.embedding(text)
        embedded = self.dropout(embedded) # Apply dropout after embedding
        # embedded shape: (batch_size, seq_len, embedding_dim)
        output, hidden = self.rnn(embedded)
        # output = self.dropout(output) # Apply dropout after RNN
        # output shape: (batch_size, seq_len, latent_dim)
        # hidden shape: (1, batch_size, latent_dim)
        # hidden.squeeze: (Batch Size and Latent Dimension)
        return output, hidden.squeeze(0)

encoder = EncoderGRU(embedding_layer, latent_dim)
print("Encoder model created with GRU and Dropout.")

# -------------------------------
# 5. Define CrossAttention
# -------------------------------

class CrossAttention(nn.Module):
    def __init__(self, latent_dim, dropout_rate=0.3):
        super().__init__()
        self.mha = nn.MultiheadAttention(embed_dim=latent_dim, num_heads=1, batch_first=True)
        self.normalization = nn.LayerNorm(latent_dim)
        self.dropout = nn.Dropout(dropout_rate) # Added Dropout

    def forward(self, x, context):
        attn_output, attn_score = self.mha(x, context, context)
        # Dimension:
        x = x + attn_output
        x = self.normalization(x)
        x = self.dropout(x) # Apply dropout after normalization
        return x

cross_attention = CrossAttention(latent_dim)
print("CrossAttention model created with Dropout.")

# -------------------------------
# 6. Define Classifier (Decoder)
# -------------------------------

class Classifier(nn.Module):
    def __init__(self, latent_dim, output_dim, dropout_rate=0.5):
        super().__init__()
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(latent_dim, output_dim)

    def forward(self, hidden_state):
        hidden_state = self.dropout(hidden_state)
        return self.fc(hidden_state)   # raw logits

classifier = Classifier(latent_dim, output_dim)
print("Classifier model created with Dropout.")

# -------------------------------
# 7. Combine Encoder and Classifier
# -------------------------------
class SentimentClassifier(nn.Module):
    def __init__(self, encoder, cross_attention, classifier): # Added cross_attention as parameter
        super().__init__()
        self.encoder = encoder
        self.cross_attention = cross_attention # Use the passed in cross_attention module
        self.classifier = classifier

    def forward(self, text):
        encoder_outputs, encoder_state = self.encoder(text)
        query = encoder_state.unsqueeze(1)    # (batch, 1, latent_dim)
        context = encoder_outputs             # (batch, seq_len, latent_dim)
        attn_output = self.cross_attention(query, context) # Pass through the cross_attention module
        attn_output = attn_output[:, 0, :]   # (batch, latent_dim)
        prediction = self.classifier(attn_output)
        return prediction

model = SentimentClassifier(encoder, cross_attention, classifier) # Pass cross_attention instance
model.to(device) # Move model to the appropriate device (CPU or GPU)
print("SentimentClassifier model created and moved to device.")

# -------------------------------
# 8. Define Loss Function and Optimizer
# -------------------------------
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), weight_decay=1e-4)
print("Loss function and Optimizer defined.")

# -------------------------------
# 9. Training Loop
# -------------------------------
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    epoch_loss = 0
    for labels, text in tqdm(dataloader, desc="Training"):
        labels = labels.to(device)
        text = text.to(device)

        optimizer.zero_grad()
        predictions = model(text)
        loss = criterion(predictions, labels)
        loss.backward()
        optimizer.step()
        epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

# -------------------------------
# 10. Evaluation Loop
# -------------------------------
def evaluate(model, dataloader, criterion, device):
    model.eval()
    epoch_loss = 0
    correct_predictions = 0
    with torch.no_grad():
        for labels, text in tqdm(dataloader, desc="Evaluating"):
            labels = labels.to(device)
            text = text.to(device)

            predictions = model(text)
            loss = criterion(predictions, labels)
            epoch_loss += loss.item()

            # Calculate accuracy
            _, predicted = torch.max(predictions, 1)
            correct_predictions += (predicted == labels).sum().item()

    return epoch_loss / len(dataloader), correct_predictions / len(dataloader.dataset)


# -------------------------------
# 11. Train the model
# -------------------------------
N_EPOCHS = 30 # You can adjust the number of epochs
best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):
    train_loss = train(model, train_dataloader, optimizer, criterion, device)
    valid_loss, valid_acc = evaluate(model, test_dataloader, criterion, device)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best-model.pt')
        print(f"Epoch {epoch+1}: Validation loss improved. Saving model.")

    print(f"Epoch {epoch+1}: Train Loss: {train_loss:.3f}, Val Loss: {valid_loss:.3f}, Val Acc: {valid_acc:.3f}")

print("\nTraining finished.")

In [None]:
# ---------------------------
# Load the best trained model
# ---------------------------
model.load_state_dict(torch.load("best-model.pt", map_location=device))
model.eval()

# ---------------------------
# Example samples
# ---------------------------
samples = {
    "Positive": "This was the best movie I have ever seen.",
    "Negative": "This was the worst movie I have ever watched.",
    "Neutral": "The movie was okay, not great but not terrible.",
    "Sarcasm": "Wow, this was such a masterpiece... the actors, the screenplay, I could stay for hours if it wasn't for how bad it was.",
    "Irony": "The plot was so riveting, I couldn’t stop yawning."
}

# ---------------------------
# Helper function to convert text to tensor
# ---------------------------
def text_to_tensor(text, vocab, max_seq_len, pad_idx):
    tokens = text.split()
    indexed_tokens = [vocab[token] for token in tokens[:max_seq_len]]
    # Pad the sequence
    padded_sequence = indexed_tokens + [pad_idx] * (max_seq_len - len(indexed_tokens))
    return torch.tensor(padded_sequence, dtype=torch.int64)


# ---------------------------
# Predict function
# ---------------------------
def predict(text, model, vocab, max_seq_len, pad_idx, device):
    model.eval()
    with torch.no_grad():
        tensor = text_to_tensor(text, vocab, max_seq_len, pad_idx).unsqueeze(0).to(device) # shape: (1, seq_len)
        prediction = model(tensor)               # shape: (1, num_classes)
        predicted_label = torch.argmax(prediction, dim=1).item()
    return predicted_label, prediction

# ---------------------------
# Run predictions
# ---------------------------
for label, text in samples.items():
    # Pass vocab, max_seq_len, and pad_idx to the predict function
    pred_label, raw_logits = predict(text, model, vocab, max_seq_len, pad_idx, device)
    print(f"{label} → Predicted class: {pred_label}, Raw logits: {raw_logits}")