In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
import math

In [25]:
# ==========================================
# 1. DATA PREPARATION (Toy Dataset)
# ==========================================
# For demonstration, we use a tiny hardcoded dataset.
# In a real project, you would load thousands of sentences from a file.
raw_data = [
    ("hello", "hola"),
    ("good morning", "buenos dias"),
    ("how are you", "como estas"),
    ("i am fine", "estoy bien"),
    ("see you later", "hasta luego"),
    ("thank you", "gracias"),
    ("what is your name", "como te llamas"),
    ("my name is ai", "me llamo ia"),
    ("goodbye", "adios"),
    ("have a nice day", "que tengas un buen dia")
]

In [26]:
# Simple Tokenizer (Splitting by space)
# We build a vocabulary from the raw data
def build_vocab(sentences):
    vocab = {'<pad>': 0, '<start>': 1, '<end>': 2, '<unk>': 3}
    idx = 4
    for sent in sentences:
        for word in sent.split():
            if word not in vocab:
                vocab[word] = idx
                idx += 1
    return vocab

In [27]:
# Create Vocabularies
src_sentences, tgt_sentences = zip(*raw_data)
src_vocab = build_vocab(src_sentences)
tgt_vocab = build_vocab(tgt_sentences)

In [28]:
# Reverse lookup (Index to Word) for decoding later
idx2word_tgt = {v: k for k, v in tgt_vocab.items()}

In [29]:
# Helper to convert sentence to tensor
def sentence_to_tensor(sentence, vocab):
    tokens = [vocab.get(word, vocab['<unk>']) for word in sentence.split()]
    tokens = [vocab['<start>']] + tokens + [vocab['<end>']]
    # Pad to fixed length for batching (simplified here)
    return torch.tensor(tokens, dtype=torch.long).unsqueeze(0) # Batch size 1

In [30]:
class MultiHeadAttention(nn.Module):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        assert d_model % num_heads == 0
        
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads
        
        self.W_q = nn.Linear(d_model, d_model)
        self.W_k = nn.Linear(d_model, d_model)
        self.W_v = nn.Linear(d_model, d_model)
        self.W_o = nn.Linear(d_model, d_model)
        
    def scaled_dot_product_attention(self, Q, K, V, mask=None):
        scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.d_k)
        if mask is not None:
            scores = scores.masked_fill(mask == 0, -1e9)
        attn = torch.softmax(scores, dim=-1)
        return torch.matmul(attn, V)

    def split_heads(self, x):
        batch_size, seq_len, _ = x.size()
        return x.view(batch_size, seq_len, self.num_heads, self.d_k).transpose(1, 2)
        
    def combine_heads(self, x):
        batch_size, _, seq_len, _ = x.size()
        return x.transpose(1, 2).contiguous().view(batch_size, seq_len, self.d_model)
    
    # !!! MAKE SURE THIS LINE IS ALIGNED WITH def __init__ !!!
    def forward(self, Q, K, V, mask=None):
        Q = self.split_heads(self.W_q(Q))
        K = self.split_heads(self.W_k(K))
        V = self.split_heads(self.W_v(V))
        
        attn_output = self.scaled_dot_product_attention(Q, K, V, mask)
        output = self.W_o(self.combine_heads(attn_output))
        return output

In [31]:
class PositionWiseFeedForward(nn.Module):
    def __init__(self, d_model, d_ff):
        super(PositionWiseFeedForward, self).__init__()
        self.fc1 = nn.Linear(d_model, d_ff)
        self.fc2 = nn.Linear(d_ff, d_model)
        self.relu = nn.ReLU()

    def forward(self, x):
        return self.fc2(self.relu(self.fc1(x)))

In [32]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe.unsqueeze(0))

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [33]:
class EncoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(EncoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, mask):
        x = self.norm1(x + self.dropout(self.self_attn(x, x, x, mask)))
        x = self.norm2(x + self.dropout(self.feed_forward(x)))
        return x

In [34]:
class DecoderLayer(nn.Module):
    def __init__(self, d_model, num_heads, d_ff, dropout):
        super(DecoderLayer, self).__init__()
        self.self_attn = MultiHeadAttention(d_model, num_heads)
        self.cross_attn = MultiHeadAttention(d_model, num_heads)
        self.feed_forward = PositionWiseFeedForward(d_model, d_ff)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)
        
    def forward(self, x, enc_output, src_mask, tgt_mask):
        x = self.norm1(x + self.dropout(self.self_attn(x, x, x, tgt_mask)))
        x = self.norm2(x + self.dropout(self.cross_attn(x, enc_output, enc_output, src_mask)))
        x = self.norm3(x + self.dropout(self.feed_forward(x)))
        return x

In [35]:
class Transformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model, num_heads, num_layers, d_ff, max_len, dropout):
        super(Transformer, self).__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.positional_encoding = PositionalEncoding(d_model, max_len)
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.decoder_layers = nn.ModuleList([DecoderLayer(d_model, num_heads, d_ff, dropout) for _ in range(num_layers)])
        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        
    def generate_mask(self, src, tgt):
        src_mask = (src != 0).unsqueeze(1).unsqueeze(2)
        tgt_mask = (tgt != 0).unsqueeze(1).unsqueeze(3)
        seq_len = tgt.size(1)
        nopeak_mask = (1 - torch.triu(torch.ones(1, seq_len, seq_len), diagonal=1)).bool()
        tgt_mask = tgt_mask & nopeak_mask.to(tgt.device)
        return src_mask, tgt_mask

    def forward(self, src, tgt):
        src_mask, tgt_mask = self.generate_mask(src, tgt)
        enc_output = self.dropout(self.positional_encoding(self.encoder_embedding(src)))
        for layer in self.encoder_layers:
            enc_output = layer(enc_output, src_mask)
            
        dec_output = self.dropout(self.positional_encoding(self.decoder_embedding(tgt)))
        for layer in self.decoder_layers:
            dec_output = layer(dec_output, enc_output, src_mask, tgt_mask)
            
        return self.fc(dec_output)

In [36]:
# ==========================================
# 3. CONFIGURATION & TRAINING
# ==========================================

# Hyperparameters
d_model = 128      # Reduced for small example
num_heads = 4
num_layers = 2
d_ff = 256
max_len = 20
dropout = 0.1
learning_rate = 0.001
epochs = 50

# Initialize Model
model = Transformer(len(src_vocab), len(tgt_vocab), d_model, num_heads, num_layers, d_ff, max_len, dropout)
criterion = nn.CrossEntropyLoss(ignore_index=0)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

print("Starting Training...")
model.train()

for epoch in range(epochs):
    total_loss = 0
    for src_text, tgt_text in raw_data:
        # Prepare inputs
        src = sentence_to_tensor(src_text, src_vocab)
        tgt = sentence_to_tensor(tgt_text, tgt_vocab)
        
        # Shift target for training (Input: <start> A B, Output: A B <end>)
        tgt_input = tgt[:, :-1]
        tgt_output = tgt[:, 1:]
        
        optimizer.zero_grad()
        output = model(src, tgt_input)
        
        # Reshape for loss calculation
        loss = criterion(output.contiguous().view(-1, len(tgt_vocab)), tgt_output.contiguous().view(-1))
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
        
    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs}, Loss: {total_loss/len(raw_data):.4f}")

Starting Training...
Epoch 10/50, Loss: 0.0664
Epoch 20/50, Loss: 0.0221
Epoch 30/50, Loss: 0.0135
Epoch 40/50, Loss: 0.0079
Epoch 50/50, Loss: 0.0060


In [38]:
# ==========================================
# 4. INFERENCE (TRANSLATION)
# ==========================================

def translate(sentence):
    model.eval()
    src = sentence_to_tensor(sentence, src_vocab)
    tgt_input = torch.tensor([[tgt_vocab['<start>']]], dtype=torch.long)
    
    # Greedy decoding loop
    for _ in range(max_len):
        with torch.no_grad():
            output = model(src, tgt_input)
            # Get the next word token (last position)
            next_token = output.argmax(dim=-1)[:, -1].item()
            
            # Stop if end token is reached
            if next_token == tgt_vocab['<end>']:
                break
                
            # Append to input for next iteration
            tgt_input = torch.cat([tgt_input, torch.tensor([[next_token]] )], dim=1)
    
    # Convert tokens back to words
    decoded_words = [idx2word_tgt[idx.item()] for idx in tgt_input[0][1:]]
    return " ".join(decoded_words)

In [39]:
# ==========================================
# 5. TESTING
# ==========================================
print("\n--- Translation Results ---")
test_sentence = "how are you"
print(f"English: {test_sentence}")
print(f"Spanish (Model): {translate(test_sentence)}")

test_sentence_2 = "good morning"
print(f"English: {test_sentence_2}")
print(f"Spanish (Model): {translate(test_sentence_2)}")


--- Translation Results ---
English: how are you
Spanish (Model): como estas
English: good morning
Spanish (Model): buenos dias


In [40]:
# You might need to install nltk first: pip install nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import nltk

In [41]:
def calculate_bleu_score(model, dataset):
    model.eval()
    total_bleu = 0
    
    # Smoothing is needed for short sentences/small datasets to avoid "0.0" scores
    smoothie = SmoothingFunction().method1
    
    with torch.no_grad():
        for src_text, tgt_text in dataset:
            # 1. Translate the source sentence
            prediction = translate(src_text)
            
            # 2. Prepare for BLEU (needs tokenized lists)
            # Reference: List of acceptable translations (we have only 1 per sentence)
            reference = [tgt_text.split()] 
            
            # Candidate: The model's prediction
            candidate = prediction.split()
            
            # 3. Calculate Score for this sentence
            score = sentence_bleu(reference, candidate, smoothing_function=smoothie)
            total_bleu += score
            
            # Optional: Print details to see what's happening
            # print(f"Src: {src_text}")
            # print(f"Ref: {tgt_text}")
            # print(f"Pred: {prediction}")
            # print(f"Score: {score:.4f}\n")

    # Average BLEU across the dataset
    avg_bleu = total_bleu / len(dataset)
    return avg_bleu * 100  # Return as percentage

In [42]:
print(f"Final BLEU Score: {calculate_bleu_score(model, raw_data):.2f}")

Final BLEU Score: 39.23
