In [1]:
# Imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import nltk
from nltk.tokenize import word_tokenize
from collections import defaultdict
from tqdm import tqdm

nltk.download("punkt_tab")

[nltk_data] Downloading package punkt_tab to
[nltk_data]     /Users/muditjindal/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


True

In [None]:
# Define MultiHead Self Attention Block
class SelfMultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads): 
        super(SelfMultiHeadAttention, self).__init__()
        # Check if num_heads divides d_model 
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads

        # Size per attention head
        self.d_k = d_model // num_heads

        # Initialize the Q, K and V
        self.q_linear = nn.Linear(d_model, d_model)
        self.k_linear = nn.Linear(d_model, d_model)
        self.v_linear = nn.Linear(d_model, d_model)
        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, x):
        # Get the batch size
        batch_size = x.shape[0]
        # Pass in the x to Q, K and V and reshape
        # Q, K, V: (batch_size, num_heads, seq_length, d_k)
        Q = self.q_linear(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        K = self.k_linear(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)
        V = self.v_linear(x).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)

        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k) # Shape: (batch_size, num_heads, seq_len, seq_len)
        atn_weights = F.softmax(scores, dim=-1) 
        atn_out = torch.matmul(atn_weights, V) # Shape: (batch_size, num_heads, seq_len, d_k)
        atn_out = atn_out.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model) # Shape: (batch_size, seq_len, d_model)
        return self.out_linear(atn_out) # Shape: (batch_size, seq_len, d_model)

In [3]:
# Test MultiHead Attention
attn_layer = SelfMultiHeadAttention(d_model=128, num_heads=8)
# (batch_size, seq_len, d_model)
test_input = torch.randn(2, 50, 128)
# Expected (batch_size, seq_len, d_model)
print(attn_layer(test_input).shape)
assert test_input.shape == attn_layer(test_input).shape

torch.Size([2, 50, 128])


In [4]:
# FeedForward Class
class FeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(FeedForward, self).__init__()
        self.linear1 = nn.Linear(d_model, d_ff)
        self.linear2 = nn.Linear(d_ff, d_model)
    
    def forward(self, x):
        return self.linear2(F.relu(self.linear1(x)))

In [5]:
# Test FeedForward
ff_layer = FeedForward(d_model=128, d_ff=512)
# (batch_size, seq_len, d_model)
test_input = torch.randn(2, 50, 128)
# Expected: (batch_size, seq_len, d_model)
print(ff_layer(test_input).shape)
assert test_input.shape == ff_layer(test_input).shape

torch.Size([2, 50, 128])


In [6]:
# Positional Encoding (sin & cosine curves)
class PositionalEncode(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncode, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        # Encode even indices (sin curve)
        pe[:, 0::2] = torch.sin(position * div_term)
        # Encode the odd indices (cos curve)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0) # Shape: (1, max_len, d_model) (to handle with batch_size)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1), :]

In [7]:
# Test PositionalEncoding
pos_enc = PositionalEncode(d_model=128, max_len=50)
# (batch_size, seq_len, d_model)
test_input = torch.randn(2, 50, 128) 
# Expected: (batch_size, seq_len, d_model)
print(pos_enc(test_input).shape)
assert test_input.shape == pos_enc(test_input).shape

torch.Size([2, 50, 128])


In [8]:
# Transformer encoder class
class TransformerEncoder(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout=0.2):
        super(TransformerEncoder, self).__init__()
        self.self_atn = SelfMultiHeadAttention(d_model, num_heads)
        self.ff = FeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        atn_out = self.self_atn(x)
        x = self.norm1(x + self.dropout(atn_out))
        ff_out = self.ff(x)
        x = self.norm2(x + self.dropout(ff_out))
        return x

In [9]:
# Test TransformerEncoderLayer
encoder_layer = TransformerEncoder(d_model=128, num_heads=8, d_ff=512)
# (batch_size, seq_len, d_model)
test_input = torch.randn(2, 50, 128)
# Expected: (batch_size, seq_len, d_model)
print(encoder_layer(test_input).shape)

torch.Size([2, 50, 128])


In [10]:
# Sentence Transformer class
class SentenceTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, max_len=100):
        super(SentenceTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encode = PositionalEncode(d_model, max_len)
        self.encoder_layers = nn.ModuleList([
            TransformerEncoder(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        self.pooling = nn.AdaptiveAvgPool1d(1)

    def forward(self, x):
        x = self.embedding(x) # Shape: (batch_size, seq_len, d_model)
        x = self.pos_encode(x) # Shape: (batch_size, seq_len, d_model)
        for layer in self.encoder_layers:
            x = layer(x) # Shape: (batch_size, seq_len, d_model)
        x = x.permute(0, 2, 1) # Shape: (batch_size, d_model, seq_len)
        x = self.pooling(x).squeeze(-1) # Shape (batch_size, d_model, 1) => (batch_size, d_model)
        return x

In [11]:
# Test SentenceTransformer
VOCAB_SIZE = 10000
D_MODEL = 128
NUM_HEADS = 8
NUM_LAYERS = 4
D_FF = 512
MAX_LEN = 50

model = SentenceTransformer(VOCAB_SIZE, D_MODEL, NUM_HEADS, NUM_LAYERS, D_FF, MAX_LEN)
# (batch_size, seq_len)
test_input = torch.randint(0, VOCAB_SIZE, (2, MAX_LEN))
# Expected: (batch_size, d_model)
print(model(test_input).shape)

torch.Size([2, 128])


In [19]:
# function to count the number of trainable parameters
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"Total Trainable Parameters: {count_parameters(model)}")

Total Trainable Parameters: 795648


In [12]:
## Function to preprocess sentences. Gets vocab and the padded sentence
def preprocess_sentences(sentences):
    # Create a vocab
    vocab = defaultdict(lambda: len(vocab))
    vocab['<PAD>'] = 0

    # Tokenize and convert to IDs
    tokenized = [word_tokenize(s.lower()) for s in sentences]
    print(tokenized)
    indexed = [[vocab[w] for w in s] for s in tokenized]
    print(indexed)

    # Padding
    max_len = max(len(s) for s in indexed)
    # Add padding to shorter sentences
    padded = [s + [0] * (max_len - len(s)) for s in indexed] 

    # Convert to tensor
    padded_tensor = torch.tensor(padded)

    return vocab, padded_tensor

In [13]:
sentences = [
    "The cat sat on the mat.",
    "A quick brown fox jumps over the lazy dog.",
    "Artificial intelligence is transforming the world."
]

In [14]:
vocab, padded_tensor = preprocess_sentences(sentences)

[['the', 'cat', 'sat', 'on', 'the', 'mat', '.'], ['a', 'quick', 'brown', 'fox', 'jumps', 'over', 'the', 'lazy', 'dog', '.'], ['artificial', 'intelligence', 'is', 'transforming', 'the', 'world', '.']]
[[1, 2, 3, 4, 1, 5, 6], [7, 8, 9, 10, 11, 12, 1, 13, 14, 6], [15, 16, 17, 18, 1, 19, 6]]


In [15]:
padded_tensor

tensor([[ 1,  2,  3,  4,  1,  5,  6,  0,  0,  0],
        [ 7,  8,  9, 10, 11, 12,  1, 13, 14,  6],
        [15, 16, 17, 18,  1, 19,  6,  0,  0,  0]])

In [16]:
vocab

defaultdict(<function __main__.preprocess_sentences.<locals>.<lambda>()>,
            {'<PAD>': 0,
             'the': 1,
             'cat': 2,
             'sat': 3,
             'on': 4,
             'mat': 5,
             '.': 6,
             'a': 7,
             'quick': 8,
             'brown': 9,
             'fox': 10,
             'jumps': 11,
             'over': 12,
             'lazy': 13,
             'dog': 14,
             'artificial': 15,
             'intelligence': 16,
             'is': 17,
             'transforming': 18,
             'world': 19})

In [17]:
padded_tensor.shape # (batch_size, seq_len)

torch.Size([3, 10])

In [18]:
# Model Initialization
VOCAB_SIZE = len(vocab)
D_MODEL = 128 # Dimension of the fixed size sentence embedding
NUM_HEADS = 8 # Number of heads for multi head attention 
NUM_LAYERS = 4 # Number of repeated multiheadattention blocks
D_FF = 512 # intermediate dimension for feedforward network 
model = SentenceTransformer(VOCAB_SIZE, D_MODEL, NUM_HEADS, NUM_LAYERS, D_FF, max_len=len(padded_tensor[0]))

# Generate Embeddings
embeddings = model(padded_tensor)
print("Embeddings shape:", embeddings.shape)  # Expected shape: (batch_size, D_MODEL)
print("Embeddings:", embeddings)

Embeddings shape: torch.Size([3, 128])
Embeddings: tensor([[ 4.6674e-01, -4.6863e-01, -4.7155e-01, -5.4400e-01,  1.2605e-01,
         -6.3041e-01, -1.1903e+00, -3.0121e-01,  9.0766e-02,  4.1755e-01,
         -3.9235e-01, -1.0099e+00,  2.0774e-02, -9.6504e-01, -8.8842e-01,
         -1.7547e-01,  6.5008e-01, -1.0690e-01,  2.6159e-01,  2.6579e-01,
          6.8265e-01, -9.6889e-02, -7.0535e-01, -6.8285e-01, -3.2448e-01,
          3.3838e-01,  1.3971e-01, -2.9932e-01,  6.8125e-01,  4.6388e-01,
         -4.2941e-02,  4.6098e-01, -5.3407e-02,  6.8306e-01,  5.9274e-02,
         -2.0512e-01, -2.3387e-01,  7.7714e-01, -2.3095e-01, -4.5909e-01,
          1.2471e-01,  4.8962e-01,  3.6096e-01,  1.0197e+00,  3.2431e-01,
          3.9017e-02,  2.7602e-01, -2.1988e-01,  1.1412e-01,  6.8797e-01,
         -2.8035e-01,  8.1698e-01,  4.2874e-01,  4.3071e-01,  1.3378e-01,
          1.5567e-01, -3.1157e-01, -3.7306e-01, -9.5627e-02,  1.1642e-01,
         -2.9743e-01,  1.6928e-01,  1.0810e+00,  3.9694e-01, 

In [31]:
### Task 2: Multi-Task Learning Expansion
class MultiTaskTransformer(nn.Module):
    def __init__(self, vocab_size, d_model, num_heads, num_layers, d_ff, num_classes_a, num_classes_b, max_len=100):
        super(MultiTaskTransformer, self).__init__()
        self.embedding = nn.Embedding(vocab_size, d_model)
        self.pos_encode = PositionalEncode(d_model, max_len)
        self.encoder_layers = nn.ModuleList([
            TransformerEncoder(d_model, num_heads, d_ff) for _ in range(num_layers)
        ])
        self.pooling = nn.AdaptiveAvgPool1d(1)

        # Task A: Sentence Classification Head
        self.classifier_a = nn.Linear(d_model, num_classes_a)

        # Task B: Named Entity Recognition (NER) - per token basis Head
        self.ner_classifier_b = nn.Linear(d_model, num_classes_b)

    def forward(self, x):
        # x -> Shape: (batch_size, seq_len)
        x = self.embedding(x) # Shape: (batch_size, seq_len, d_model)
        x = self.pos_encode(x) # Shape: (batch_size, seq_len, d_model)
        for layer in self.encoder_layers:
            x = layer(x) # Shape: (batch_size, seq_len, d_model)
        
        # Task A: Sentence Classification
        # we want to pool over the seq_len 
        x_pooled = x.permute(0, 2, 1) # Shape: (batch_size, d_model, seq_len)
        x_pooled = self.pooling(x_pooled).squeeze(-1) # Shape: (batch_size, d_model)
        out_task_a = self.classifier_a(x_pooled) # Shape: (batch_size, num_classes_a)
        # Task B: NER 
        # We pass in x and not x_pooled as we want per token label in NER
        out_task_b = self.ner_classifier_b(x) # Shape: (batch_size, seq_len, num_classes_b)

        return out_task_a, out_task_b

In [None]:
# Test Multi Task Transformer
# Model Initialization
VOCAB_SIZE = 10000
D_MODEL = 128
NUM_HEADS = 8
NUM_LAYERS = 4
D_FF = 512
MAX_LEN = 50
NUM_CLASSES_A = 3 
NUM_CLASSES_B = 4  

multi_task_model = MultiTaskTransformer(VOCAB_SIZE, D_MODEL, NUM_HEADS, NUM_LAYERS, D_FF, NUM_CLASSES_A, NUM_CLASSES_B, MAX_LEN)

test_input = torch.randint(0, VOCAB_SIZE, (2, MAX_LEN)) # (batch_size, seq_len)
out_task_a, out_task_b = multi_task_model(test_input)
print(f"Sentence Classification: {out_task_a.shape}") # Expected: (batch_size, num_classes_a)
print(f"NER: {out_task_b.shape}") # Expected: (batch_size, seq_len, num_classes_b)

Sentence Classification: torch.Size([2, 3])
NER: torch.Size([2, 50, 4])


In [26]:
print(f"Sentence Classification output: {out_task_a}")
print(f"NER output: {out_task_b}")

Sentence Classification output: tensor([[ 0.0507,  0.0416,  0.4814],
        [ 0.0211, -0.0633,  0.4029]], grad_fn=<AddmmBackward0>)
NER output: tensor([[[ 1.1841e+00, -7.1495e-01,  4.8525e-02,  4.7646e-02],
         [ 5.4922e-01,  9.4005e-02,  2.2591e-01, -6.1908e-02],
         [ 1.4539e-01,  1.2950e+00,  3.8574e-01, -5.3482e-02],
         [ 8.1093e-01,  4.4390e-01, -4.9584e-01, -2.8783e-01],
         [ 7.4695e-01,  1.0204e+00,  3.4611e-01,  6.7273e-01],
         [ 1.3796e-01,  9.7461e-01,  2.8913e-01,  7.6818e-01],
         [ 1.0523e+00,  1.0082e-01, -5.5909e-01, -9.7521e-01],
         [ 7.8043e-01,  2.6486e-02,  2.0210e-02, -3.7988e-01],
         [ 7.7239e-01, -8.7847e-01, -5.0499e-01,  9.2409e-01],
         [ 3.5308e-01, -1.0130e-01,  3.8011e-01,  9.8022e-02],
         [ 4.5162e-01,  7.6195e-01,  8.0909e-01,  1.0101e+00],
         [ 9.8108e-01, -2.8731e-01, -5.2479e-01,  3.0944e-01],
         [-2.7185e-01, -3.7539e-01,  1.2449e-02, -1.1229e-01],
         [ 7.6041e-01,  4.6045e-01, 

In [27]:
print("Sentence Classification Prediction:", torch.argmax(out_task_a, dim=1).tolist())
print("NER Prediction:", torch.argmax(out_task_b, dim=2).tolist())

Sentence Classification Prediction: [2, 2]
NER Prediction: [[0, 0, 1, 0, 1, 1, 0, 0, 3, 2, 3, 0, 2, 0, 0, 1, 1, 2, 3, 1, 2, 1, 0, 1, 0, 3, 0, 0, 1, 1, 1, 1, 2, 1, 1, 1, 1, 1, 1, 0, 3, 3, 2, 1, 3, 0, 0, 1, 2, 0], [3, 3, 1, 0, 0, 0, 2, 2, 1, 2, 0, 0, 1, 0, 0, 0, 1, 1, 1, 2, 1, 3, 1, 1, 2, 2, 1, 0, 1, 2, 0, 1, 1, 1, 0, 0, 2, 1, 0, 3, 0, 2, 2, 2, 3, 2, 1, 3, 1, 3]]


In [33]:
# Test out on sentences
sentences = [
    "Barack Obama was the 44th President of the United States.",
    "Apple Inc. is based in Cupertino, California."
]

vocab, padded_tensor = preprocess_sentences(sentences)

VOCAB_SIZE = len(vocab)
D_MODEL = 128
NUM_HEADS = 8
NUM_LAYERS = 4
D_FF = 512
MAX_LEN = len(padded_tensor[0])
NUM_CLASSES_A = 3 
NUM_CLASSES_B = 4  

multi_task_model = MultiTaskTransformer(VOCAB_SIZE, D_MODEL, NUM_HEADS, NUM_LAYERS, D_FF, NUM_CLASSES_A, NUM_CLASSES_B, MAX_LEN)
out_task_a, out_task_b = multi_task_model(padded_tensor)
print(f"Sentence Classification: {out_task_a.shape}") # Expected: (batch_size, num_classes_a)
print(f"NER: {out_task_b.shape}") # Expected: (batch_size, seq_len, num_classes_b)

[['barack', 'obama', 'was', 'the', '44th', 'president', 'of', 'the', 'united', 'states', '.'], ['apple', 'inc.', 'is', 'based', 'in', 'cupertino', ',', 'california', '.']]
[[1, 2, 3, 4, 5, 6, 7, 4, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 10]]
Sentence Classification: torch.Size([2, 3])
NER: torch.Size([2, 11, 4])


In [35]:
print("Sentence Classification Prediction:", torch.argmax(out_task_a, dim=1).tolist())
print("NER Prediction:", torch.argmax(out_task_b, dim=2).tolist())

Sentence Classification Prediction: [0, 1]
NER Prediction: [[2, 2, 3, 2, 1, 3, 3, 2, 1, 2, 1], [0, 0, 3, 1, 0, 3, 0, 1, 1, 1, 2]]


In [41]:
## Training loop function
def train_model(model, optimizer, lf_a, lf_b, inputs, labels_a, labels_b, num_classes_b, epochs=3):
    for epoch in tqdm(range(epochs), desc="Training"):
        model.train()
        optimizer.zero_grad()

        # Get the outputs from the model
        out_task_a, out_task_b = model(inputs)

        # Calculate the loss for both the tasks
        loss_a = lf_a(out_task_a, labels_a)
        loss_b = lf_b(out_task_b.view(-1, num_classes_b), labels_b.view(-1))
        
        # Get the total loss
        total_loss = loss_a + loss_b
        total_loss.backward()
        optimizer.step()

        print(f"Epoch {epoch+1}: Loss Task A: {loss_a.item():.4f}, Loss Task B: {loss_b.item():.4f}, Total Loss: {total_loss.item():.4f}")

In [45]:
# Test out the training loop
sentences = [
    "Barack Obama was the 44th President of the United States.",
    "Apple Inc. is based in Cupertino, California.",
    "The Eiffel Tower is located in Paris.",
    "Elon Musk is the CEO of Tesla.",
    "Microsoft Corporation is headquartered in Redmond.",
    "Google was founded by Larry Page and Sergey Brin.",
    "The Great Wall of China is a famous landmark."
]

VOCAB_SIZE = len(vocab)
D_MODEL = 128
NUM_HEADS = 8
NUM_LAYERS = 4
D_FF = 512
MAX_LEN = padded_tensor.shape[1]
NUM_CLASSES_A = 3 
NUM_CLASSES_B = 4  

# Get random labels for both the tasks
labels_a = torch.randint(0, 3, (len(sentences),))  
labels_b = torch.randint(0, 4, (len(sentences), MAX_LEN)) 

vocab, padded_tensor = preprocess_sentences(sentences)

# Loss functions
lf_a = nn.CrossEntropyLoss()
lf_b = nn.CrossEntropyLoss()

# Optimizer
optimizer = torch.optim.Adam(multi_task_model.parameters(), lr=0.001)

# Initialize the model
multi_task_model = MultiTaskTransformer(VOCAB_SIZE, D_MODEL, NUM_HEADS, NUM_LAYERS, D_FF, NUM_CLASSES_A, NUM_CLASSES_B, MAX_LEN)
train_model(multi_task_model, optimizer, lf_a, lf_b, padded_tensor, labels_a, labels_b, NUM_CLASSES_B, epochs=10)

[['barack', 'obama', 'was', 'the', '44th', 'president', 'of', 'the', 'united', 'states', '.'], ['apple', 'inc.', 'is', 'based', 'in', 'cupertino', ',', 'california', '.'], ['the', 'eiffel', 'tower', 'is', 'located', 'in', 'paris', '.'], ['elon', 'musk', 'is', 'the', 'ceo', 'of', 'tesla', '.'], ['microsoft', 'corporation', 'is', 'headquartered', 'in', 'redmond', '.'], ['google', 'was', 'founded', 'by', 'larry', 'page', 'and', 'sergey', 'brin', '.'], ['the', 'great', 'wall', 'of', 'china', 'is', 'a', 'famous', 'landmark', '.']]
[[1, 2, 3, 4, 5, 6, 7, 4, 8, 9, 10], [11, 12, 13, 14, 15, 16, 17, 18, 10], [4, 19, 20, 13, 21, 15, 22, 10], [23, 24, 13, 4, 25, 7, 26, 10], [27, 28, 13, 29, 15, 30, 10], [31, 3, 32, 33, 34, 35, 36, 37, 38, 10], [4, 39, 40, 7, 41, 13, 42, 43, 44, 10]]


Training: 100%|██████████| 10/10 [00:00<00:00, 79.16it/s]

Epoch 1: Loss Task A: 1.1526, Loss Task B: 1.4956, Total Loss: 2.6481
Epoch 2: Loss Task A: 1.1794, Loss Task B: 1.4776, Total Loss: 2.6571
Epoch 3: Loss Task A: 1.1631, Loss Task B: 1.4883, Total Loss: 2.6513
Epoch 4: Loss Task A: 1.1579, Loss Task B: 1.4805, Total Loss: 2.6385
Epoch 5: Loss Task A: 1.1877, Loss Task B: 1.4545, Total Loss: 2.6422
Epoch 6: Loss Task A: 1.1773, Loss Task B: 1.4739, Total Loss: 2.6512
Epoch 7: Loss Task A: 1.1503, Loss Task B: 1.4629, Total Loss: 2.6132
Epoch 8: Loss Task A: 1.2007, Loss Task B: 1.4954, Total Loss: 2.6961
Epoch 9: Loss Task A: 1.1934, Loss Task B: 1.4742, Total Loss: 2.6676
Epoch 10: Loss Task A: 1.2037, Loss Task B: 1.4981, Total Loss: 2.7017



