<a href="https://colab.research.google.com/github/saivenkatreddy29/TensorFlow-Learning/blob/main/Decode_only_transformer.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim

In [2]:
sentences = [
    'I love machine learning',
    'Transformers are Powerful',
    'Natural Language Processing is fun',
    'Pytorch makes Deep learning easier',
    'Models improve with data'
]

In [3]:
vocab = list(set(' '.join(sentences).split()))
vocab_size = len(vocab)
word_to_idx = {word: idx for idx, word in enumerate(vocab)}
idx_to_word = {idx: word for word,idx in word_to_idx.items()}

In [4]:
idx_to_word

{0: 'are',
 1: 'Powerful',
 2: 'I',
 3: 'Pytorch',
 4: 'Models',
 5: 'with',
 6: 'makes',
 7: 'Natural',
 8: 'Language',
 9: 'Processing',
 10: 'easier',
 11: 'Deep',
 12: 'improve',
 13: 'data',
 14: 'Transformers',
 15: 'is',
 16: 'machine',
 17: 'love',
 18: 'learning',
 19: 'fun'}

In [5]:
def sentence_to_indices(sentence):
  return [word_to_idx[word] for word in sentence.split()]


In [6]:
input_sequence = []
output_sequence = []

for sentence in sentences:
  indices = sentence_to_indices(sentence)
  input_sequence.append(indices[:-1])
  output_sequence.append(indices[1:])

  #padding sequences for training


In [7]:
input_sequence

[[2, 17, 16], [14, 0], [7, 8, 9, 15], [3, 6, 11, 18], [4, 12, 5]]

In [8]:
output_sequence

[[17, 16, 18], [0, 1], [8, 9, 15, 19], [6, 11, 18, 10], [12, 5, 13]]

In [9]:
max_len = max([len(seq) for seq in input_sequence])

In [10]:
max_len

4

In [11]:
# max_len = max(input_sequence[0])
padded_input = []
padded_output = []
for seq in input_sequence:
    seq = seq + [0] * (max_len - len(seq))
    padded_input.append(seq)
for seq in output_sequence:
    seq = seq + [0] * (max_len - len(seq))
    padded_output.append(seq)


In [12]:
padded_input

[[2, 17, 16, 0], [14, 0, 0, 0], [7, 8, 9, 15], [3, 6, 11, 18], [4, 12, 5, 0]]

In [13]:
padded_output

[[17, 16, 18, 0],
 [0, 1, 0, 0],
 [8, 9, 15, 19],
 [6, 11, 18, 10],
 [12, 5, 13, 0]]

In [14]:
input = torch.tensor(padded_input)
output = torch.tensor(padded_output)



In [15]:
import math

In [16]:

# Multi-Head Self-Attention Mechanism
class MultiHeadSelfAttention(nn.Module):
    def __init__(self, d_model, nhead):
        super(MultiHeadSelfAttention, self).__init__()
        self.nhead = nhead
        self.d_model = d_model

        assert d_model % nhead == 0, "d_model must be divisible by nhead"
        self.head_dim = d_model // nhead  # Dimensionality of each attention head

        # Linear layers for query, key, and value projections
        self.query_linear = nn.Linear(d_model, d_model)
        self.key_linear = nn.Linear(d_model, d_model)
        self.value_linear = nn.Linear(d_model, d_model)

        # Final linear layer after attention
        self.out_linear = nn.Linear(d_model, d_model)

    def forward(self, x):
        batch_size, seq_len, d_model = x.size()

        # Project input to query, key, and value
        Q = self.query_linear(x)  # Shape: (batch_size, seq_len, d_model)
        K = self.key_linear(x)    # Shape: (batch_size, seq_len, d_model)
        V = self.value_linear(x)  # Shape: (batch_size, seq_len, d_model)

        # Split into multiple heads
        Q = Q.view(batch_size, seq_len, self.nhead, self.head_dim).transpose(1, 2)
        K = K.view(batch_size, seq_len, self.nhead, self.head_dim).transpose(1, 2)
        V = V.view(batch_size, seq_len, self.nhead, self.head_dim).transpose(1, 2)

        # Scaled dot-product attention
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        attention_weights = torch.softmax(scores, dim=-1)
        attention_output = torch.matmul(attention_weights, V)

        # Concatenate all heads back
        attention_output = attention_output.transpose(1, 2).contiguous().view(batch_size, seq_len, d_model)

        # Final linear transformation
        return self.out_linear(attention_output)


In [17]:
# Position-wise Feedforward Network
class FeedForwardNetwork(nn.Module):
    def __init__(self, d_model, hidden_dim=2048):
        super(FeedForwardNetwork, self).__init__()
        self.fc1 = nn.Linear(d_model, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))


In [18]:
# Positional Encoding (Inject position info into embeddings)
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()

        # Create a positional encoding matrix with shape (max_len, d_model)
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        # Register positional encoding as a buffer (not a parameter)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        # Add positional encoding to the input embedding
        return x + self.pe[:, :x.size(1), :]

In [19]:
# Layer Norm for stability
class LayerNorm(nn.Module):
    def __init__(self, d_model, eps=1e-6):
        super(LayerNorm, self).__init__()
        self.gamma = nn.Parameter(torch.ones(d_model))
        self.beta = nn.Parameter(torch.zeros(d_model))
        self.eps = eps

    def forward(self, x):
        mean = x.mean(dim=-1, keepdim=True)
        std = x.std(dim=-1, keepdim=True)
        return self.gamma * (x - mean) / (std + self.eps) + self.beta



In [20]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, nhead):
        super(DecoderLayer, self).__init__()

        self.self_attention = MultiHeadSelfAttention(d_model, nhead)
        self.layer_norm1 = LayerNorm(d_model)
        self.feed_forward = FeedForwardNetwork(d_model)
        self.layer_norm2 = LayerNorm(d_model)

    def forward(self, x):
        # Self-Attention layer with residual connection
        attention_output = self.self_attention(x)
        x = self.layer_norm1(x + attention_output)  # Add & norm

        # Feedforward layer with residual connection
        ff_output = self.feed_forward(x)
        x = self.layer_norm2(x + ff_output)  # Add & norm

        return x


In [21]:
# Decoder-Only Transformer
class DecoderOnlyTransformer(nn.Module):
    def __init__(self, vocab_size, d_model=64, nhead=4, num_layers=2, max_len=10):
        super(DecoderOnlyTransformer, self).__init__()

        self.word_embedding = nn.Embedding(vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len=max_len)

        # Stack multiple decoder layers
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, nhead) for _ in range(num_layers)])

        # Final linear layer to predict the next word
        self.fc_out = nn.Linear(d_model, vocab_size)

    def forward(self, x):
        # Embed input words and add positional encoding
        x = self.word_embedding(x)
        x = self.positional_encoding(x)

        # Pass through each decoder layer
        for layer in self.decoder_layers:
            x = layer(x)

        # Output layer: Predict next word
        return self.fc_out(x)

In [22]:
import torch
import torch.optim as optim
import torch.nn.functional as F

# Define hyperparameters
vocab_size = len(vocab)  # Vocabulary size from the toy dataset
d_model = 64  # Embedding dimension
nhead = 4  # Number of attention heads
num_layers = 2  # Number of decoder layers
max_len = max_len  # Maximum sequence length (calculated before)
learning_rate = 0.001  # Learning rate
num_epochs = 1000  # Number of training epochs

In [23]:
# Initialize the model
model = DecoderOnlyTransformer(vocab_size=vocab_size, d_model=d_model, nhead=nhead, num_layers=num_layers, max_len=max_len)

# Define loss function and optimizer
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.CrossEntropyLoss()  # Since it's a classification task (predicting the next word)

# Move model to device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [24]:
# Move model to device (GPU or CPU)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

In [25]:
# Convert input and output sequences to device
input_sequences = input.to(device)
output_sequences = output.to(device)

In [26]:
# Training loop
for epoch in range(num_epochs):
    model.train()  # Set the model in training mode
    optimizer.zero_grad()  # Reset the gradients

    # Forward pass: Input sequences are passed to the model
    output = model(input_sequences)  # Shape: (batch_size, seq_len, vocab_size)

    # Reshape the output and targets to match the loss function requirements
    output = output.view(-1, vocab_size)  # Flatten output for CrossEntropyLoss
    targets = output_sequences.view(-1)  # Flatten target sequences

    # Calculate the loss
    loss = loss_fn(output, targets)

    # Backward pass and optimization
    loss.backward()
    optimizer.step()

    # Print the loss every 100 epochs
    if epoch % 100 == 0:
        print(f'Epoch {epoch}/{num_epochs}, Loss: {loss.item()}')

Epoch 0/1000, Loss: 2.967050313949585
Epoch 100/1000, Loss: 0.016579773277044296
Epoch 200/1000, Loss: 0.006257100962102413
Epoch 300/1000, Loss: 0.003354964777827263
Epoch 400/1000, Loss: 0.0021145944483578205
Epoch 500/1000, Loss: 0.0014632362872362137
Epoch 600/1000, Loss: 0.0010760377626866102
Epoch 700/1000, Loss: 0.000825859431643039
Epoch 800/1000, Loss: 0.0006542332703247666
Epoch 900/1000, Loss: 0.0005310517735779285


In [27]:
# After training, we can test the model on new input sentences.
model.eval()  # Set the model in evaluation mode

# Test the model with a new sentence
test_sentence = "are"  # This is a partial input sequence
test_input = torch.tensor([sentence_to_indices(test_sentence)], dtype=torch.long).to(device)


In [28]:
# Predict the next word
with torch.no_grad():
    output = model(test_input)
    predicted_index = torch.argmax(output[:, -1, :], dim=-1).item()  # Get the index of the predicted word
    predicted_word = idx_to_word[predicted_index]  # Convert index back to word

print(f'The predicted next word after "{test_sentence}" is "{predicted_word}".')

The predicted next word after "are" is "Powerful".
