# Homework X:  Transformer Structure Practical Training Project

Welcome to the course **AI and Deep learning**!

We have started learning about RNN. In this assignment, we will work on a hands-on training project on transformer structure. In this project, we will perform sentiment analysis on a financial text dataset to determine the sentiment type of short texts. Hope you enjoy this homework!   

**Learning Goal**: In this homework,you will achieve the following:
 * Learn more about the structure of the transformer through the code.
 * Learn how to perform sentiment analysis with Transformer.

# Table of Contents
* [1 - Packages](#1)
* [2 - Dataset](#2)
* [3 - Transformer structure](#3)
* [4 - Training and Saving Model](#4)
* [5 - Play by Yourself!](#5)

<a name='1'></a>
## 1- Packages

In order to finish a task, we need commands from certain **Python** packages.

In [1]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
import pandas as pd
import numpy as np
import torch
import math
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

<a name='2'></a>
## 2- Dataset

Below we load the data used for this project. Let's start with a description of the data to get the basics.

In [2]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
# Load the data
df = pd.read_csv('data.csv')

# Data description and analysis
print("Basic Information of the Dataset:")
print(df.info())
print("\nSentiment Distribution:")
print(df['Sentiment'].value_counts())

Basic Information of the Dataset:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5842 entries, 0 to 5841
Data columns (total 2 columns):
 #   Column     Non-Null Count  Dtype 
---  ------     --------------  ----- 
 0   Sentence   5842 non-null   object
 1   Sentiment  5842 non-null   object
dtypes: object(2)
memory usage: 91.4+ KB
None

Sentiment Distribution:
Sentiment
neutral     3130
positive    1852
negative     860
Name: count, dtype: int64


This code below performs data preparation:
1. Converts sentiment labels to numerical form using label encoding
2. Splits data into training (70%), validation (15%), and testing (15%) sets
3. Creates vocabulary from training text with padding and unknown tokens
4. Vectorizes text into fixed-length sequences (max length 128)
5. Wraps data in custom Dataset classes
6. Creates PyTorch DataLoaders for batch processing
The resulting DataLoaders provide efficient access to vectorized text and labels for model training and evaluation.

In [3]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
# Label Encoding
label_encoder = LabelEncoder()
df['Sentiment'] = label_encoder.fit_transform(df['Sentiment'])

# Split the dataset into training and testing sets
train_text, temp_text, train_labels, temp_labels = train_test_split(df['Sentence'], df['Sentiment'], test_size=0.3, random_state=42)
val_text, test_text, val_labels, test_labels = train_test_split(temp_text, temp_labels, test_size=0.5, random_state=42)

# Text Tokenization
def tokenize(text):
    return text.lower().split()

# Build Vocabulary
vocab = {}
for text in df['Sentence']:
    for token in tokenize(text):
        if token not in vocab:
            vocab[token] = len(vocab)

vocab['<pad>'] = len(vocab)  # Padding token
vocab['<unk>'] = len(vocab)  # Unknown word token

# Text Vectorization
def text_to_indices(text, vocab, max_seq_len):
    tokens = tokenize(text)
    indices = [vocab.get(token, vocab['<unk>']) for token in tokens[:max_seq_len]]
    indices += [vocab['<pad>']] * (max_seq_len - len(indices))
    return indices

max_seq_len = 128  # Maximum sequence length

train_df = pd.DataFrame({'Text': train_text, 'Label': train_labels})
val_df = pd.DataFrame({'Text': val_text, 'Label': val_labels})
test_df = pd.DataFrame({'Text': test_text, 'Label': test_labels})

train_df['Text Indices'] = train_df['Text'].apply(lambda x: text_to_indices(x, vocab, max_seq_len))
val_df['Text Indices'] = val_df['Text'].apply(lambda x: text_to_indices(x, vocab, max_seq_len))
test_df['Text Indices'] = test_df['Text'].apply(lambda x: text_to_indices(x, vocab, max_seq_len))

# Create Data Loaders
class TextDataset(Dataset):
    def __init__(self, texts, labels):
        self.texts = texts
        self.labels = labels
    
    def __len__(self):
        return len(self.texts)
    
    def __getitem__(self, idx):
        return torch.tensor(self.texts[idx]), torch.tensor(self.labels[idx])

train_dataset = TextDataset(train_df['Text Indices'].tolist(), train_df['Label'].tolist())
val_dataset = TextDataset(val_df['Text Indices'].tolist(), val_df['Label'].tolist())
test_dataset = TextDataset(test_df['Text Indices'].tolist(), test_df['Label'].tolist())

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

print("\nData loading completed, Training set size: {}, Validation set size: {}, Testing set size: {}".format(len(train_dataset), len(val_dataset), len(test_dataset)))


Data loading completed, Training set size: 4089, Validation set size: 876, Testing set size: 877


<a name='3'></a>
## 3- Transformer structure

In this section, we will learn how to build the Transformer structure to implement sentiment analysis.The Transformer model architecture follows the standard design from **"Attention Is All You Need"**, adapted for text classification tasks with the specified parameters and components. Here are the steps:


1. **Implement positional encoding**: Create a function to generate positional encodings that add sequence order information to input embeddings using sine and cosine functions.
2. **Implement multi-head attention**: Define a function to perform multi-head attention calculations, including query, key, and value linear transformations, scaled dot-product attention, and output projection.
3. **Create feed-forward neural network**: Build a class for the position-wise feed-forward network with two linear layers and a ReLU activation function.
4. **Construct encoder layer**: Develop an encoder layer class that combines multi-head self-attention, residual connections, layer normalization, and the feed-forward network.
5. **Build encoder**: Create an encoder class that stacks multiple encoder layers.
6. **Build Transformer model**: Implement the main Transformer model class that includes:
    - An embedding layer for input tokens
    - Positional encoding addition
    - The encoder stack
    - An output layer for classification (3-class sentiment classification in this case)

"Attention Is All You Need" is an extremely classic paper, there is a high value of learning, you can combine this part of the code with the paper together to understand.

In [4]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
def positional_encoding(max_seq_len, d_model):
    """
    Generate positional encodings for transformer models.
    
    Returns:
    torch.Tensor: Positional encodings (1, max_seq_len, d_model)
    """
    # Step 1. Create a tensor to store positional encodings
    # Step 2. Generate position indices
    # Step 3. Calculate the divisor term for positional encoding
    # Step 4. Calculate sine and cosine values for even and odd positions
    # Step 5. Add a batch dimension to the positional encodings

    ### YOUR CODE BEGINS HERE
    positional_encodings = torch.zeros(max_seq_len, d_model)
    position = torch.arange(0, max_seq_len, dtype=torch.float).unsqueeze(1)
    div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
    
    positional_encodings[:, 0::2] = torch.sin(position * div_term)
    positional_encodings[:, 1::2] = torch.cos(position * div_term)
    
    positional_encodings = positional_encodings.unsqueeze(0)
    ### YOUR CODE ENDS
    
    return positional_encodings

You may want to run the following code to verify that the above function is written correctly.

In [5]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
def test_positional_encoding():
    max_seq_len = 4
    d_model = 8

    pos_enc = positional_encoding(max_seq_len, d_model).squeeze(0)

    expected_output = torch.zeros(max_seq_len, d_model)
    for pos in range(max_seq_len):
        for i in range(d_model // 2):
            div_term = math.exp(-(math.log(10000.0) / d_model) * (2 * i))
            expected_output[pos, 2 * i] = math.sin(pos * div_term)
            expected_output[pos, 2 * i + 1] = math.cos(pos * div_term)

    expected_output = torch.tensor(expected_output, dtype=pos_enc.dtype, device=pos_enc.device)

    print("Your output:\n", pos_enc)
    print("Expected output:\n", expected_output)
    print("Difference:\n", torch.abs(pos_enc - expected_output).max())

test_positional_encoding()

Your output:
 tensor([[ 0.0000e+00,  1.0000e+00,  0.0000e+00,  1.0000e+00,  0.0000e+00,
          1.0000e+00,  0.0000e+00,  1.0000e+00],
        [ 8.4147e-01,  5.4030e-01,  9.9833e-02,  9.9500e-01,  9.9998e-03,
          9.9995e-01,  1.0000e-03,  1.0000e+00],
        [ 9.0930e-01, -4.1615e-01,  1.9867e-01,  9.8007e-01,  1.9999e-02,
          9.9980e-01,  2.0000e-03,  1.0000e+00],
        [ 1.4112e-01, -9.8999e-01,  2.9552e-01,  9.5534e-01,  2.9995e-02,
          9.9955e-01,  3.0000e-03,  1.0000e+00]])
Expected output:
 tensor([[ 0.0000e+00,  1.0000e+00,  0.0000e+00,  1.0000e+00,  0.0000e+00,
          1.0000e+00,  0.0000e+00,  1.0000e+00],
        [ 8.4147e-01,  5.4030e-01,  9.9833e-02,  9.9500e-01,  9.9998e-03,
          9.9995e-01,  1.0000e-03,  1.0000e+00],
        [ 9.0930e-01, -4.1615e-01,  1.9867e-01,  9.8007e-01,  1.9999e-02,
          9.9980e-01,  2.0000e-03,  1.0000e+00],
        [ 1.4112e-01, -9.8999e-01,  2.9552e-01,  9.5534e-01,  2.9996e-02,
          9.9955e-01,  3.0000e-0

  expected_output = torch.tensor(expected_output, dtype=pos_enc.dtype, device=pos_enc.device)


In [6]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
def multi_head_attention(query, key, value, num_heads, mask=None):
    """
    Perform multi-head attention.
    
    Returns:
    torch.Tensor: Attention output (batch_size, seq_len, d_model)
    torch.Tensor: Attention weights (batch_size, num_heads, seq_len, seq_len)
    """
    # Step 1. Get the dimensions (batch size, sequence length, model dimension)
    # Step 2. Calculate the dimension for keys and values per head
    # Step 3. Create linear transformations for query, key, and value
    # Step 4. Apply linear transformations to query, key, and value
    # Step 5. Reshape query, key, and value to separate heads
    # Step 6. Transpose query, key, and value to (batch_size, num_heads, seq_len, d_k/d_v)
    # Step 7. Calculate the attention scores using matrix multiplication
    # Step 8. Scale the attention scores
    # Step 9. Apply mask if provided
    # Step 10. Calculate attention weights using softmax
    # Step 11. Calculate attention output using matrix multiplication
    # Step 12. Transpose and reshape attention output back to (batch_size, seq_len, d_model)

    ### YOUR CODE BEGINS HERE
    batch_size = query.shape[0]
    seq_len = query.shape[1]
    d_model = query.shape[2]
    d_k = d_model // num_heads
    d_v = d_model // num_heads

    # Create linear transformations for query, key, and value
    query_linear = nn.Linear(d_model, d_model).to(query.device)
    key_linear = nn.Linear(d_model, d_model).to(query.device)
    value_linear = nn.Linear(d_model, d_model).to(query.device)

    # Apply linear transformations
    query = query_linear(query)
    key = key_linear(key)
    value = value_linear(value)

    # Reshape and transpose query, key, and value
    query = query.reshape(batch_size, -1, num_heads, d_k).transpose(1, 2)
    key = key.reshape(batch_size, -1, num_heads, d_k).transpose(1, 2)
    value = value.reshape(batch_size, -1, num_heads, d_v).transpose(1, 2)

    # Calculate attention scores
    matmul_qk = torch.matmul(query, key.transpose(-2, -1))
    dk = query.shape[-1]
    scaled_attention_scores = matmul_qk / math.sqrt(dk)

    # Apply mask if provided
    if mask is not None:
        scaled_attention_scores = scaled_attention_scores.masked_fill(mask == 0, -1e9)

    # Calculate attention weights
    attention_weights = torch.softmax(scaled_attention_scores, dim=-1)

    # Calculate attention output
    attention_output = torch.matmul(attention_weights, value)

    # Transpose and reshape attention output
    attention_output = attention_output.transpose(1, 2).reshape(batch_size, seq_len, d_model)
    ### YOUR CODE ENDS
    
    return attention_output, attention_weights

In [7]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, d_ff):
        """
        Feed-Forward Network (FFN) used in transformer architecture.
        
        """
        super(FeedForwardNetwork, self).__init__()
        # Step 1. Create the first fully connected layer (d_model -> d_ff)
        # Step 2. Create the ReLU activation function
        # Step 3. Create the second fully connected layer (d_ff -> d_model)

        ### YOUR CODE BEGINS HERE
        self.fc1 = nn.Linear(d_model, d_ff)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(d_ff, d_model)
        ### YOUR CODE ENDS

    def forward(self, x):
        """
        Forward pass of the feed-forward network.

        Returns:
        torch.Tensor: Output tensor (batch_size, seq_len, d_model)
        """
        # Step 1. Pass input through the first fully connected layer
        # Step 2. Apply ReLU activation
        # Step 3. Pass through the second fully connected layer

        ### YOUR CODE BEGINS HERE
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        ### YOUR CODE ENDS
        return x

In [8]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
class EncoderLayer(nn.Module):
    def __init__(self, num_heads, d_model, d_ff, dropout_rate=0.1):
        """
        Encoder layer of the transformer architecture.
        """
        super(EncoderLayer, self).__init__()
        # Step 1. Initialize multi-head self-attention layer
        # Step 2. Initialize feed-forward network
        # Step 3. Initialize layer normalization layers
        # Step 4. Initialize dropout layers

        ### YOUR CODE BEGINS HERE
        self.self_attention = multi_head_attention
        self.feed_forward = FeedForwardNetwork(d_model, d_ff)
        self.layer_norm1 = nn.LayerNorm(d_model)
        self.layer_norm2 = nn.LayerNorm(d_model)
        self.dropout1 = nn.Dropout(dropout_rate)
        self.dropout2 = nn.Dropout(dropout_rate)
        self.num_heads = num_heads
        ### YOUR CODE ENDS

    def forward(self, x, mask=None):
        """
        Forward pass of the encoder layer.

        Returns:
        torch.Tensor: Output tensor (batch_size, seq_len, d_model)
        """
        # Step 1. Perform multi-head self-attention
        # Step 2. Apply dropout to attention output
        # Step 3. Add residual connection and layer normalization
        # Step 4. Pass through feed-forward network
        # Step 5. Apply dropout to feed-forward output
        # Step 6. Add residual connection and layer normalization

        ### YOUR CODE BEGINS HERE
        attention_output, _ = self.self_attention(x, x, x, self.num_heads, mask)
        attention_output = self.dropout1(attention_output)
        x = self.layer_norm1(x + attention_output)
        
        ff_output = self.feed_forward(x)
        ff_output = self.dropout2(ff_output)
        x = self.layer_norm2(x + ff_output)
        ### YOUR CODE ENDS
        
        return x

In [9]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
class Encoder(nn.Module):
    def __init__(self, num_layers, num_heads, d_model, d_ff, dropout_rate=0.1):
        """
        Encoder of the transformer architecture.
        """
        super(Encoder, self).__init__()
        # Step 1. Create a list of encoder layers using ModuleList
        # Step 2. Each encoder layer should be initialized with the given parameters

        ### YOUR CODE BEGINS HERE
        self.layers = nn.ModuleList([EncoderLayer(num_heads, d_model, d_ff, dropout_rate) for _ in range(num_layers)])
        ### YOUR CODE ENDS

    def forward(self, x, mask=None):
        """
        Forward pass of the encoder.

        Returns:
        torch.Tensor: Output tensor (batch_size, seq_len, d_model)
        """
        # Step 1. Pass input through each encoder layer in sequence
        # Step 2. Apply each layer to the output of the previous layer

        ### YOUR CODE BEGINS HERE
        for layer in self.layers:
            x = layer(x, mask)
        ### YOUR CODE ENDS
        
        return x

In [10]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
class TransformerModel(nn.Module):
    def __init__(self, vocab_size, max_seq_len, num_layers=6, num_heads=8, d_model=512, d_ff=2048, dropout_rate=0.1):
        """
        Transformer model for text classification.
        """
        super(TransformerModel, self).__init__()
        # Step 1. Initialize the token embedding layer
        # Step 2. Initialize the positional encoding
        # Step 3. Initialize the encoder
        # Step 4. Initialize the output layer

        ### YOUR CODE BEGINS HERE
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = positional_encoding(max_seq_len, d_model)
        self.encoder = Encoder(num_layers, num_heads, d_model, d_ff, dropout_rate)
        self.output_layer = nn.Linear(d_model, 3)  # 3-class sentiment classification
        ### YOUR CODE ENDS

    def forward(self, x, mask=None):
        """
        Forward pass of the transformer model.

        Returns:
        torch.Tensor: Output tensor (batch_size, 3)
        """
        # Step 1. Convert input tokens to embeddings
        # Step 2. Add positional encodings to embeddings
        # Step 3. Pass through the encoder
        # Step 4. Extract the CLS token output
        # Step 5. Pass through the output layer

        ### YOUR CODE BEGINS HERE
        x = self.embedding(x)
        x += self.positional_encoding[:, :x.shape[1], :].to(x.device)
        x = self.encoder(x, mask)
        cls_output = x[:, 0, :]
        output = self.output_layer(cls_output)
        ### YOUR CODE ENDS
        
        return output

<a name='4'></a>
## 4- Training and Saving Model

Next we train the Transformer model on a text classification task, including steps such as device management, data loading, model definition, training loop and model persistence. Please run the following code on your own.

In [11]:
# PLEASE DO NOT CHANGE THE FOLLOWING CODE
# Set random seeds for reproducibility
seed = 42
torch.manual_seed(seed)
if torch.cuda.is_available():
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
np.random.seed(seed)

# Set device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Define hyperparameters
vocab_size = len(vocab)
max_seq_len = 128
num_layers = 6
num_heads = 8
d_model = 512
d_ff = 2048
dropout_rate = 0.1

# Initialize model
model = TransformerModel(vocab_size, max_seq_len, num_layers, num_heads, d_model, d_ff, dropout_rate)
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=2e-5)

# Ensure data is on the correct device
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    correct = 0
    total = 0
    
    for batch in dataloader:
        inputs, labels = batch
        inputs = inputs.to(device)
        labels = labels.to(device)
        
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        total_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        correct += (predicted == labels).sum().item()
        total += labels.size(0)
    
    accuracy = correct / total
    return total_loss / len(dataloader), accuracy

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for batch in dataloader:
            inputs, labels = batch
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            
            total_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)
    
    accuracy = correct / total
    return total_loss / len(dataloader), accuracy

# Train the model
num_epochs = 3

for epoch in range(num_epochs):
    train_loss, train_acc = train(model, train_loader, criterion, optimizer, device)
    val_loss, val_acc = evaluate(model, val_loader, criterion, device)
    print(f'Epoch {epoch+1}/{num_epochs}')
    print(f'Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f}')
    print(f'Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}\n')

# Test the model
test_loss, test_acc = evaluate(model, test_loader, criterion, device)
print(f'Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.4f}')

# Save the model
torch.save(model.state_dict(), 'transformer_model.pth')
print("Model has been saved")


print("\nYour result should be:\n Epoch 1/3\nTrain Loss: 1.0470, Train Acc: 0.4967\nVal Loss: 1.0391, Val Acc: 0.5240\n\nEpoch 2/3\nTrain Loss: 1.0038, Train Acc: 0.5182\nVal Loss: 1.0263, Val Acc: 0.4863\n\nEpoch 3/3\nTrain Loss: 0.9755, Train Acc: 0.5268\nVal Loss: 1.0175, Val Acc: 0.5297\nTest Loss: 0.9906, Test Acc: 0.5165\nModel has been saved")

Epoch 1/3
Train Loss: 1.0470, Train Acc: 0.4967
Val Loss: 1.0391, Val Acc: 0.5240

Epoch 2/3
Train Loss: 1.0038, Train Acc: 0.5182
Val Loss: 1.0263, Val Acc: 0.4863

Epoch 3/3
Train Loss: 0.9755, Train Acc: 0.5268
Val Loss: 1.0175, Val Acc: 0.5297

Test Loss: 0.9906, Test Acc: 0.5165
Model has been saved

Your result should be:
 Epoch 1/3
Train Loss: 1.0470, Train Acc: 0.4967
Val Loss: 1.0391, Val Acc: 0.5240

Epoch 2/3
Train Loss: 1.0038, Train Acc: 0.5182
Val Loss: 1.0263, Val Acc: 0.4863

Epoch 3/3
Train Loss: 0.9755, Train Acc: 0.5268
Val Loss: 1.0175, Val Acc: 0.5297
Test Loss: 0.9906, Test Acc: 0.5165
Model has been saved


<a name='5'></a>
## 5- Play by Yourself!

For this assignment we completed a text classification task using Transformer. There are other RNNs that we have learnt in this course and you can try your hand at writing code to complete the project yourself.The online materials for this course can be a good reference for you.