<a href="https://colab.research.google.com/github/ponaalagar/GenerateiveAI/blob/main/3_Transformers_implementation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ===============================
# Transformer Encoder (Step-by-step Demo)
# Google Colab Ready
# ===============================

import numpy as np
np.set_printoptions(precision=4, suppress=True)

# --- Tokenizer (very simple) ---
def tiny_tokenizer(text, vocab=None):
    tokens = text.lower().split()
    if vocab is None:
        vocab = {}
    ids = []
    for t in tokens:
        if t not in vocab:
            vocab[t] = len(vocab)
        ids.append(vocab[t])
    return ids, vocab

# --- Positional Encoding (sinusoidal) ---
def positional_encoding(seq_len, d_model):
    pe = np.zeros((seq_len, d_model))
    position = np.arange(0, seq_len)[:, np.newaxis]
    div_term = np.exp(np.arange(0, d_model, 2) * -(np.log(10000.0) / d_model))
    pe[:, 0::2] = np.sin(position * div_term)
    pe[:, 1::2] = np.cos(position * div_term)
    return pe

# --- Scaled Dot-Product Attention ---
def scaled_dot_product_attention(Q, K, V, mask=None):
    dk = Q.shape[-1]
    KT = K.transpose(0, 1, 3, 2)  # (batch, heads, depth, seq_len)
    scores = Q @ KT               # (batch, heads, seq_len, seq_len)
    scores = scores / np.sqrt(dk)

    if mask is not None:
        scores = np.where(mask == 0, -1e9, scores)

    exp_scores = np.exp(scores - np.max(scores, axis=-1, keepdims=True))
    weights = exp_scores / np.sum(exp_scores, axis=-1, keepdims=True)
    output = weights @ V
    return output, weights

# --- Multi-Head Self-Attention ---
class MultiHeadSelfAttention:
    def __init__(self, d_model, num_heads):
        assert d_model % num_heads == 0
        self.d_model = d_model
        self.num_heads = num_heads
        self.depth = d_model // num_heads
        self.Wq = np.random.randn(d_model, d_model) * 0.1
        self.Wk = np.random.randn(d_model, d_model) * 0.1
        self.Wv = np.random.randn(d_model, d_model) * 0.1
        self.Wo = np.random.randn(d_model, d_model) * 0.1

    def split_heads(self, x):
        b, seq_len, _ = x.shape
        x = x.reshape(b, seq_len, self.num_heads, self.depth)
        return x.transpose(0, 2, 1, 3)

    def combine_heads(self, x):
        b, num_heads, seq_len, depth = x.shape
        x = x.transpose(0, 2, 1, 3).reshape(b, seq_len, num_heads * depth)
        return x

    def __call__(self, x, mask=None):
        Q = x @ self.Wq
        K = x @ self.Wk
        V = x @ self.Wv
        print("After linear projections: Q,K,V shapes:", Q.shape, K.shape, V.shape)

        Qh = self.split_heads(Q)
        Kh = self.split_heads(K)
        Vh = self.split_heads(V)
        print("Split into heads: Qh shape:", Qh.shape)

        attn_outputs, attn_weights = scaled_dot_product_attention(Qh, Kh, Vh, mask=mask)
        print("Attention weights shape:", attn_weights.shape)

        combined = self.combine_heads(attn_outputs)
        out = combined @ self.Wo
        return out, attn_weights

# --- Feed Forward Network ---
class PositionwiseFeedForward:
    def __init__(self, d_model, d_ff):
        self.W1 = np.random.randn(d_model, d_ff) * 0.1
        self.b1 = np.zeros((d_ff,))
        self.W2 = np.random.randn(d_ff, d_model) * 0.1
        self.b2 = np.zeros((d_model,))

    def __call__(self, x):
        out = x @ self.W1 + self.b1
        out = np.maximum(0, out)  # ReLU
        out = out @ self.W2 + self.b2
        return out

# --- Layer Normalization ---
class LayerNorm:
    def __init__(self, d_model, eps=1e-6):
        self.gamma = np.ones((d_model,))
        self.beta = np.zeros((d_model,))
        self.eps = eps

    def __call__(self, x):
        mean = x.mean(axis=-1, keepdims=True)
        var = x.var(axis=-1, keepdims=True)
        x_norm = (x - mean) / np.sqrt(var + self.eps)
        return self.gamma * x_norm + self.beta

# --- Transformer Encoder Layer ---
class TransformerEncoderLayer:
    def __init__(self, d_model, num_heads, d_ff):
        self.mha = MultiHeadSelfAttention(d_model, num_heads)
        self.ln1 = LayerNorm(d_model)
        self.ff = PositionwiseFeedForward(d_model, d_ff)
        self.ln2 = LayerNorm(d_model)

    def __call__(self, x, mask=None):
        attn_out, attn_weights = self.mha(x, mask=mask)
        print("MHA output shape:", attn_out.shape)

        x = x + attn_out
        print("After residual add (x + attn_out). Shape:", x.shape)

        x = self.ln1(x)
        print("After LayerNorm1. Shape:", x.shape)

        ff_out = self.ff(x)
        print("FFN output shape:", ff_out.shape)

        x = x + ff_out
        print("After residual add (x + ff_out). Shape:", x.shape)

        x = self.ln2(x)
        print("After LayerNorm2. Shape:", x.shape)
        return x, attn_weights

# ===============================
# Demo Run
# ===============================

# Hyperparameters
d_model = 16
num_heads = 4
d_ff = 64
seq_text = "The cat is a lovely cat"

# Tokenization & Embedding
ids, vocab = tiny_tokenizer(seq_text)
vocab_size = len(vocab)
embedding_matrix = np.random.randn(vocab_size, d_model) * 0.1
x_embed = np.stack([embedding_matrix[i] for i in ids], axis=0)[np.newaxis, ...]

print("Input text:", seq_text)
print("Tokens:", seq_text.lower().split())
print("Token IDs:", ids)
print("Vocabulary (word->id):", vocab)
print("\nEmbedding shape:", x_embed.shape)

# Add positional encoding
seq_len = x_embed.shape[1]
pe = positional_encoding(seq_len, d_model)
x = x_embed + pe[np.newaxis, ...]
print("\nPositional encoding added. Shape:", x.shape)

# Pass through one encoder layer
encoder_layer = TransformerEncoderLayer(d_model, num_heads, d_ff)
print("\n---- Passing input through Transformer encoder layer ----")
encoded_x, attn_weights = encoder_layer(x)

# Show attention matrices for each head
print("\nAttention weights shape:", attn_weights.shape)
for h in range(attn_weights.shape[1]):
    print(f"\nHead {h} attention matrix:\n", attn_weights[0, h])

print("\n--- Conceptual Steps ---")
print("1) Tokenize input -> IDs.")
print("2) Lookup embeddings for each token.")
print("3) Add positional encoding.")
print("4) Compute Q,K,V with linear projections.")
print("5) Split into multiple heads.")
print("6) Compute attention = softmax(QK^T/sqrt(dk)) @ V.")
print("7) Concatenate heads + linear projection.")
print("8) Add residual + LayerNorm.")
print("9) Feed-forward network (ReLU).")
print("10) Add residual + LayerNorm.")


Input text: The cat is a lovely cat
Tokens: ['the', 'cat', 'is', 'a', 'lovely', 'cat']
Token IDs: [0, 1, 2, 3, 4, 1]
Vocabulary (word->id): {'the': 0, 'cat': 1, 'is': 2, 'a': 3, 'lovely': 4}

Embedding shape: (1, 6, 16)

Positional encoding added. Shape: (1, 6, 16)

---- Passing input through Transformer encoder layer ----
After linear projections: Q,K,V shapes: (1, 6, 16) (1, 6, 16) (1, 6, 16)
Split into heads: Qh shape: (1, 4, 6, 4)
Attention weights shape: (1, 4, 6, 6)
MHA output shape: (1, 6, 16)
After residual add (x + attn_out). Shape: (1, 6, 16)
After LayerNorm1. Shape: (1, 6, 16)
FFN output shape: (1, 6, 16)
After residual add (x + ff_out). Shape: (1, 6, 16)
After LayerNorm2. Shape: (1, 6, 16)

Attention weights shape: (1, 4, 6, 6)

Head 0 attention matrix:
 [[0.159  0.1547 0.1605 0.1687 0.1809 0.1763]
 [0.161  0.1569 0.1607 0.1673 0.1784 0.1756]
 [0.1627 0.1588 0.1607 0.1662 0.1764 0.1752]
 [0.1578 0.1563 0.1603 0.1682 0.1798 0.1775]
 [0.1537 0.1545 0.1607 0.1706 0.1822 0.1784

In [None]:
# ===============================
# English → Italian Translation with Transformer
# Google Colab Ready
# ===============================

!pip install transformers sentencepiece --quiet

from transformers import MarianMTModel, MarianTokenizer

# Load pre-trained English→Italian model
model_name = "Helsinki-NLP/opus-mt-en-fr"
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)

# Input sentence (English)
src_text = ["I love you very much"]

# Tokenize
inputs = tokenizer(src_text, return_tensors="pt", padding=True)

# Generate translation
translated = model.generate(**inputs)

# Decode result
tgt_text = tokenizer.decode(translated[0], skip_special_tokens=True)

print("Input:", src_text[0])
print("Translated:", tgt_text)


tokenizer_config.json:   0%|          | 0.00/42.0 [00:00<?, ?B/s]

source.spm:   0%|          | 0.00/778k [00:00<?, ?B/s]

target.spm:   0%|          | 0.00/802k [00:00<?, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

config.json: 0.00B [00:00, ?B/s]

pytorch_model.bin:   0%|          | 0.00/301M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/293 [00:00<?, ?B/s]

Input: I love you very much
Translated: Je t'aime beaucoup.


In [None]:
#Line by Line explanation of the above key word

# ===============================
# Step-by-step Transformer Translation (English → Italian)
# ===============================

!pip install transformers sentencepiece --quiet

from transformers import MarianMTModel, MarianTokenizer

# 1) Load the pre-trained Transformer model and tokenizer
#    This is a seq2seq Transformer trained for English→Italian
model_name = "Helsinki-NLP/opus-mt-en-it"
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)

# 2) Define the English sentence we want to translate
src_text = ["I love you"]

# 3) Tokenization
#    - The tokenizer converts words into subword IDs understood by the model
inputs = tokenizer(src_text, return_tensors="pt", padding=True)
print("Tokenized input IDs:", inputs["input_ids"])

# 4) Encoder
#    - The input IDs are passed through the encoder (multi-head self-attention + feed-forward layers)
#    - The encoder produces a sequence of hidden states representing the English sentence
encoder_outputs = model.get_encoder()(**inputs)
print("Encoder hidden states shape:", encoder_outputs.last_hidden_state.shape)

# 5) Decoder
#    - The decoder takes encoder outputs + previous target tokens
#    - Uses masked self-attention (so it only looks at past words)
#    - Cross-attends to encoder hidden states (aligning English → Italian)
#    - Generates Italian tokens step by step
translated_tokens = model.generate(**inputs, max_length=20)
print("Translated token IDs:", translated_tokens)

# 6) Detokenization
#    - Convert generated token IDs back into Italian words
tgt_text = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)

print("\nInput Sentence (English):", src_text[0])
print("Output Sentence (Italian):", tgt_text)

# ===============================
# Conceptual Flow
# ===============================
# 1. English sentence → tokenize into IDs
# 2. Encoder processes English IDs → hidden states
# 3. Decoder starts with <BOS> (beginning of sentence) in Italian
# 4. Decoder attends to encoder outputs, predicting next Italian word
# 5. Repeat step 4 until <EOS> (end of sentence) is produced
# 6. Convert Italian IDs back to readable words


Tokenized input IDs: tensor([[ 22, 722,  29,   0]])
Encoder hidden states shape: torch.Size([1, 4, 512])
Translated token IDs: tensor([[80034,   523,     9,   205,     2,     0]])

Input Sentence (English): I love you
Output Sentence (Italian): Ti amo.


In [None]:
# Step 1: Tokenization
src_text = "I love you"
inputs = tokenizer(src_text, return_tensors="pt")

print("Original Text:", src_text)

# Convert to list safely
input_ids_list = inputs["input_ids"][0].tolist()

print("Token IDs:", input_ids_list)
print("Tokens:", tokenizer.convert_ids_to_tokens(input_ids_list))


Original Text: I love you
Token IDs: [22, 722, 29, 0]
Tokens: ['▁I', '▁love', '▁you', '</s>']


In [None]:
# -------------------------------
# Step 2: Embedding lookup
# -------------------------------
with torch.no_grad():
    embedding_output = model.model.encoder.embed_tokens(inputs["input_ids"])
print("\n[Step 2] Embedding output shape:", embedding_output.shape)  # [batch, seq_len, d_model]


[Step 2] Embedding output shape: torch.Size([1, 4, 512])


In [None]:
# -------------------------------
# Step 3: Encoder self-attention
# -------------------------------
with torch.no_grad():
    encoder_outputs = model.model.encoder(inputs["input_ids"])
print("[Step 3] Encoder output shape:", encoder_outputs.last_hidden_state.shape)

[Step 3] Encoder output shape: torch.Size([1, 4, 512])


In [None]:
#Step 1–3 (Encoder)

#The encoder reads "I love you" → turns it into contextualized vectors.

#Now the model has a “memory” of the source sentence.

In [None]:
# -------------------------------
# Step 4: Decoder (first step)
# -------------------------------
# Decoder starts with <pad> token as BOS
decoder_input_ids = torch.tensor([[tokenizer.pad_token_id]])
with torch.no_grad():
    decoder_outputs = model.model.decoder(
        decoder_input_ids,
        encoder_hidden_states=encoder_outputs.last_hidden_state
    )
print("[Step 4] Decoder output (first step) shape:", decoder_outputs.last_hidden_state.shape)

[Step 4] Decoder output (first step) shape: torch.Size([1, 1, 512])


In [None]:
#Step 4 (Decoder, first step)

#The decoder starts with a special BOS (beginning of sentence) token (in MarianMT that’s <pad>).

#It looks at the encoder’s output and generates a probability distribution over all Italian words.

In [None]:
# -------------------------------
# Step 5: Greedy Generation
# -------------------------------
translated_tokens = model.generate(**inputs, max_length=10, num_beams=1, early_stopping=True)

The following generation flags are not valid and may be ignored: ['early_stopping']. Set `TRANSFORMERS_VERBOSITY=info` for more details.


In [None]:
#Language is sequential. To translate, we need to generate words one by one until the sentence ends.

#At step 1, decoder picks "Ti" (highest probability).
#At step 2, decoder input = "Ti", and it predicts "amo".
#At step 3, decoder input = "Ti amo", and it predicts </s> (end of sentence).

#This loop is called decoding.
#Without it, the model would only predict one token and stop.

#Greedy decoding → pick the word with the highest probability each step. (fast, simple)

In [None]:
# -------------------------------
# Step 6: Decode final translation
# -------------------------------
translation = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)

print("\n--- Final Translation ---")
print("English:", src_text)
print("Italian:", translation)


--- Final Translation ---
English: I love you
Italian: Ti amo.


In [None]:
# ================================
# Step-by-Step Greedy Decoding Demo
# English → Italian: "I love you" → "Ti amo"
# ================================

!pip install transformers sentencepiece --quiet

import torch
from transformers import MarianMTModel, MarianTokenizer

# Load model + tokenizer
model_name = "Helsinki-NLP/opus-mt-en-it"
tokenizer = MarianTokenizer.from_pretrained(model_name)
model = MarianMTModel.from_pretrained(model_name)

# Encode source text
src_text = "I love you"
inputs = tokenizer(src_text, return_tensors="pt")
encoder_outputs = model.model.encoder(inputs["input_ids"])

print("English Input:", src_text)
print("Source Token IDs:", inputs["input_ids"][0].tolist())
print("Source Tokens:", tokenizer.convert_ids_to_tokens(inputs["input_ids"][0].tolist()))

# --------------------------------
# Step-by-step Greedy Decoding
# --------------------------------
max_length = 10
decoder_input_ids = torch.tensor([[tokenizer.pad_token_id]])  # start with <pad> = BOS
generated_ids = []

print("\n--- Greedy Decoding Steps ---")
for step in range(max_length):
    # Run decoder
    with torch.no_grad():
        outputs = model.model.decoder(
            decoder_input_ids,
            encoder_hidden_states=encoder_outputs.last_hidden_state
        )
        # Get logits for the last position
        logits = model.lm_head(outputs.last_hidden_state[:, -1, :])
        next_token_id = torch.argmax(logits, dim=-1)

    # Append predicted token
    generated_ids.append(next_token_id.item())

    # Print current step
    print(f"Step {step+1}: Predicted ID = {next_token_id.item()} → Token = {tokenizer.convert_ids_to_tokens([next_token_id.item()])[0]}")

    # Stop if end-of-sentence token is generated
    if next_token_id.item() == tokenizer.eos_token_id:
        break

    # Update decoder input for next step
    decoder_input_ids = torch.cat([decoder_input_ids, next_token_id.unsqueeze(0)], dim=1)

# --------------------------------
# Final Translation
# --------------------------------
translation = tokenizer.decode(generated_ids, skip_special_tokens=True)
print("\n--- Final Translation ---")
print("English:", src_text)
print("Italian:", translation)


English Input: I love you
Source Token IDs: [22, 722, 29, 0]
Source Tokens: ['▁I', '▁love', '▁you', '</s>']

--- Greedy Decoding Steps ---
Step 1: Predicted ID = 523 → Token = ▁Ti
Step 2: Predicted ID = 9 → Token = ▁a
Step 3: Predicted ID = 205 → Token = mo
Step 4: Predicted ID = 3 → Token = ,
Step 5: Predicted ID = 257 → Token = ▁ti
Step 6: Predicted ID = 9 → Token = ▁a
Step 7: Predicted ID = 205 → Token = mo
Step 8: Predicted ID = 3 → Token = ,
Step 9: Predicted ID = 257 → Token = ▁ti
Step 10: Predicted ID = 9 → Token = ▁a

--- Final Translation ---
English: I love you
Italian: Ti amo, ti amo, ti a
