In [None]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [None]:
import pandas as pd
import numpy as np
import torch
import spacy
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
import random




print("r (Full + Random Split)...")


!python -m spacy download en_core_web_sm
!python -m spacy download es_core_news_sm
spacy_eng = spacy.load("en_core_web_sm")
spacy_esp = spacy.load("es_core_news_sm")


try:
    
    df = pd.read_csv('/kaggle/input/eng-spanish/spa-eng/spa.txt', sep='\t', header=None, names=['English', 'Spanish'], usecols=[0, 1])
except:
    df = pd.read_csv('/kaggle/input/english-spanish/english_spanish.csv', nrows=None) # nrows=None tümünü okur


df = df.dropna()


train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

print(f"all data: {len(df)}")
print(f"train Set: {len(train_df)}")
print(f"Test Set:   {len(test_df)}")


# 2. TOKENIZER  VOCABULARY
# ==========================================
def tokenize_eng(text):
    return [tok.text.lower() for tok in spacy_eng.tokenizer(str(text))]

def tokenize_esp(text):
    return [tok.text.lower() for tok in spacy_esp.tokenizer(str(text))]

class Vocabulary:
    def __init__(self, freq_threshold=2):
        self.itos = {0: "<pad>", 1: "<sos>", 2: "<eos>", 3: "<unk>"}
        self.stoi = {"<pad>": 0, "<sos>": 1, "<eos>": 2, "<unk>": 3}
        self.freq_threshold = freq_threshold

    def __len__(self): return len(self.itos)

    def build_vocabulary(self, sentence_list, tokenizer):
        frequencies = {}
        idx = 4
        for sentence in sentence_list:
            for word in tokenizer(sentence):
                frequencies[word] = frequencies.get(word, 0) + 1
                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1
    
    def numericalize(self, text, tokenizer):
        tokenized_text = tokenizer(text)
        return [self.stoi.get(token, self.stoi["<unk>"]) for token in tokenized_text]


print("Dictionaries are created")
vocab_eng = Vocabulary(freq_threshold=2)
vocab_eng.build_vocabulary(train_df["English"], tokenize_eng)

vocab_esp = Vocabulary(freq_threshold=2)
vocab_esp.build_vocabulary(train_df["Spanish"], tokenize_esp)

print(f"Vocab Eng: {len(vocab_eng)} | Vocab Esp: {len(vocab_esp)}")


# 3. DATASET  DATALOADER

class EngSpaDataset(Dataset):
    def __init__(self, df, vocab_eng, vocab_esp):
        self.df = df
        self.vocab_eng = vocab_eng
        self.vocab_esp = vocab_esp
        
    def __len__(self): return len(self.df)
    
    def __getitem__(self, index):
       
        row = self.df.iloc[index]
        eng_text = row["English"]
        esp_text = row["Spanish"]
        
        eng_indices = [1] + self.vocab_eng.numericalize(eng_text, tokenize_eng) + [2]
        esp_indices = [1] + self.vocab_esp.numericalize(esp_text, tokenize_esp) + [2]
        
        return torch.tensor(eng_indices), torch.tensor(esp_indices)

class MyCollate:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx
    def __call__(self, batch):
        src = [item[0] for item in batch]
        tgt = [item[1] for item in batch]
        src = pad_sequence(src, batch_first=True, padding_value=self.pad_idx)
        tgt = pad_sequence(tgt, batch_first=True, padding_value=self.pad_idx)
        return src, tgt

# Loader
BATCH_SIZE = 64

train_dataset = EngSpaDataset(train_df, vocab_eng, vocab_esp)
test_dataset = EngSpaDataset(test_df, vocab_eng, vocab_esp)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=MyCollate(pad_idx=0))
# Test loader shuffle=False 
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=MyCollate(pad_idx=0))



In [None]:
import torch.nn as nn
import math

# 1. Positional Encoding

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        # x: [seq_len, batch_size, d_model]
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

# 2.  Transformer Model
class TransformerModel(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=256, nhead=8, 
                 num_encoder_layers=3, num_decoder_layers=3, dim_feedforward=512, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.d_model = d_model
        
        #  (Embedding)
        self.src_embedding = nn.Embedding(src_vocab_size, d_model)
        self.tgt_embedding = nn.Embedding(tgt_vocab_size, d_model)
        
        
        self.pos_encoder = PositionalEncoding(d_model, dropout)
        
        
        self.transformer = nn.Transformer(d_model=d_model, nhead=nhead, 
                                          num_encoder_layers=num_encoder_layers, 
                                          num_decoder_layers=num_decoder_layers, 
                                          dim_feedforward=dim_feedforward, 
                                          dropout=dropout)
        
        
        self.fc_out = nn.Linear(d_model, tgt_vocab_size)
        
        
        self.src_mask = None
        self.tgt_mask = None

    def generate_square_subsequent_mask(self, sz):
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

    def forward(self, src, tgt):
        # src: [batch_size, src_len] -> [src_len, batch_size] 
        src = src.transpose(0, 1)
        tgt = tgt.transpose(0, 1)

        
        tgt_mask = self.generate_square_subsequent_mask(tgt.size(0)).to(src.device)
        
      
        src_padding_mask = (src == 0).transpose(0, 1) # 0 = <pad>
        tgt_padding_mask = (tgt == 0).transpose(0, 1)

        # Embedding + Positional Encoding
        src = self.src_embedding(src) * math.sqrt(self.d_model)
        tgt = self.tgt_embedding(tgt) * math.sqrt(self.d_model)
        
        src = self.pos_encoder(src)
        tgt = self.pos_encoder(tgt)
        
     
        out = self.transformer(src, tgt, 
                               tgt_mask=tgt_mask,
                               src_key_padding_mask=src_padding_mask,
                               tgt_key_padding_mask=tgt_padding_mask)
        
        
        return self.fc_out(out.transpose(0, 1))

print("The Model Architecture is Ready")

In [None]:
import torch.optim as optim


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')


# 2. Hiperparametrs
INPUT_DIM = len(vocab_eng)
OUTPUT_DIM = len(vocab_esp)
D_MODEL = 256
N_HEAD = 8
NUM_ENCODER_LAYERS = 3
NUM_DECODER_LAYERS = 3
FEEDFORWARD_DIM = 512
DROPOUT = 0.1


model = TransformerModel(INPUT_DIM, OUTPUT_DIM, D_MODEL, N_HEAD, 
                         NUM_ENCODER_LAYERS, NUM_DECODER_LAYERS, 
                         FEEDFORWARD_DIM, DROPOUT).to(device)

# 4. (Xavier Initialization )
def init_weights(m):
    for name, param in m.named_parameters():
        if 'weight' in name and param.dim() > 1:
            nn.init.xavier_uniform_(param)
            
model.apply(init_weights)



optimizer = optim.Adam(model.parameters(), lr=0.0005)


PAD_IDX = vocab_esp.stoi["<pad>"]
criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)

print(f"Model parameter number: {sum(p.numel() for p in model.parameters() if p.requires_grad):,}")

In [None]:
import time

def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    
    for i, (src, trg) in enumerate(iterator):
        src = src.to(device)
        trg = trg.to(device)
        
        optimizer.zero_grad()
        
        
        output = model(src, trg[:, :-1])
        
        
        
        output_dim = output.shape[-1]
        
        output = output.contiguous().view(-1, output_dim)
        trg = trg[:, 1:].contiguous().view(-1) z
        
        loss = criterion(output, trg)
        loss.backward()
        
        
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        
        optimizer.step()
        epoch_loss += loss.item()
        
        
        if i % 50 == 0:
            print(f"Step: {i}, Loss: {loss.item():.4f}")
        
    return epoch_loss / len(iterator)


NUM_EPOCHS = 10
CLIP = 1

print(f"train starting ({NUM_EPOCHS} Epoch)")

for epoch in range(NUM_EPOCHS):
    start_time = time.time()
    
    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    
    end_time = time.time()
    mins, secs = divmod(end_time - start_time, 60)
    
    print(f'Epoch: {epoch+1:02} | Time: {int(mins)}m {int(secs)}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')

In [None]:
import torch
import pandas as pd
import nltk
from bert_score import score as bert_score
from tqdm import tqdm
from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score


nltk.download('wordnet', quiet=True)
nltk.download('omw-1.4', quiet=True)


def translate_sentence(model, sentence, src_vocab, tgt_vocab, device, max_len=50):
    model.eval()
    if isinstance(sentence, str):
        tokens = [token.text.lower() for token in spacy_eng.tokenizer(sentence)]
        src_indices = [1] + [src_vocab.stoi.get(token, 3) for token in tokens] + [2]
        src_tensor = torch.LongTensor(src_indices).unsqueeze(0).to(device)
    else:
        src_tensor = sentence.unsqueeze(0).to(device)
    
    tgt_indices = [1]
    for i in range(max_len):
        tgt_tensor = torch.LongTensor(tgt_indices).unsqueeze(0).to(device)
        with torch.no_grad():
            output = model(src_tensor, tgt_tensor)
        best_guess = output.argmax(2)[:, -1].item()
        tgt_indices.append(best_guess)
        if best_guess == 2: break
            
    translated_words = [tgt_vocab.itos[idx] for idx in tgt_indices]
    if "<sos>" in translated_words: translated_words.remove("<sos>")
    if "<eos>" in translated_words: translated_words.remove("<eos>")
    return " ".join(translated_words)


print("The Exam Starts On The ENTIRE Test Set... ")


test_subset = test_df 

hypotheses = [] 
references = [] 
references_for_meteor = [] 

print(f"all sentences: {len(test_subset)}")


for i, row in tqdm(test_subset.iterrows(), total=len(test_subset)):
    src = row["English"]
    trg = row["Spanish"]
    
    
    prediction = translate_sentence(model, src, vocab_eng, vocab_esp, device)
    
  
    hypotheses.append(prediction)
    references.append([trg.lower().split()]) 
    references_for_meteor.append(trg.lower()) 


print("\nThe translations are complete, now the scores are being calculated...")

# 1. BLEU Score
hyp_tokens = [h.split() for h in hypotheses]
bleu_score = corpus_bleu(references, hyp_tokens, smoothing_function=SmoothingFunction().method1)

# 2. METEOR Score
meteor_scores = []
for h, r in zip(hypotheses, references_for_meteor):
    meteor_scores.append(meteor_score([r.split()], h.split()))
avg_meteor = sum(meteor_scores) / len(meteor_scores)


print("BERTScore ")
try:
     
    P, R, F1 = bert_score(hypotheses, references_for_meteor, lang="es", verbose=True, batch_size=64)
    bert_f1 = F1.mean().item()
except Exception as e:
    print(f"bertscore error {e}")
    bert_f1 = 0.0


print("\n" + "#"*50)
print(f" (FULL DATA TEST)")
print("#"*50)
print(f"| Metric           | Score    |")
print(f"|------------------|----------|------------------")
print(f"| BLEU Score       | {bleu_score:.4f}   ")
print(f"| METEOR Score     | {avg_meteor:.4f}   ")
print(f"| BERTScore (F1)   | {bert_f1:.4f}   ")
print("#"*50)

In [None]:
import torch
import matplotlib.pyplot as plt
import seaborn as sns
import random
import numpy as np

# 1. VISUALIZATION FUNCTION
def plot_attention_map(model, src_vocab, tgt_vocab, device, test_df):
    model.eval()
    
    # Select a random sentence
    random_row = test_df.sample(1).iloc[0]
    src_text = random_row["English"]
    ref_text = random_row["Spanish"]
    
    print(f"Selected Sentence (English): {src_text}")
    print(f"True Translation (Spanish): {ref_text}")
    
    # --- A. Prepare Sentence ---
    # Tokenize
    src_tokens = [token.text.lower() for token in spacy_eng.tokenizer(src_text)]
    src_indices = [1] + [src_vocab.stoi.get(t, 3) for t in src_tokens] + [2]
    src_tensor = torch.LongTensor(src_indices).unsqueeze(0).to(device)
    
    # --- B. Make Model Prediction (Word by Word) ---
    # Here we run translate_sentence logic manually to capture vectors
    
    # 1. Run Encoder (Create Memory)
    with torch.no_grad():
        # Encoder input embedding
        src_emb = model.src_embedding(src_tensor.transpose(0,1)) * np.sqrt(model.d_model)
        src_emb = model.pos_encoder(src_emb)
        # Encoder output (Memory)
        memory = model.transformer.encoder(src_emb)
    
    # 2. Generate Prediction with Decoder
    tgt_indices = [1] # <sos>
    decoded_words = []
    
    for i in range(50): # Max length
        tgt_tensor = torch.LongTensor(tgt_indices).unsqueeze(0).to(device)
        
        with torch.no_grad():
            # Decoder input embedding
            tgt_emb = model.tgt_embedding(tgt_tensor.transpose(0,1)) * np.sqrt(model.d_model)
            tgt_emb = model.pos_encoder(tgt_emb)
            
            # Masks
            tgt_mask = model.generate_square_subsequent_mask(tgt_tensor.size(1)).to(device)
            
            # Decoder output
            output = model.transformer.decoder(tgt_emb, memory, tgt_mask=tgt_mask)
            
            # Predict last word
            output_flat = model.fc_out(output.transpose(0, 1))
            best_guess = output_flat.argmax(2)[:, -1].item()
            
            if best_guess == 2: # <eos>
                break
            
            tgt_indices.append(best_guess)
            decoded_words.append(tgt_vocab.itos[best_guess])
            
    print(f"Model Prediction: {' '.join(decoded_words)}")

    # --- C. CALCULATE ALIGNMENT MATRIX (Attention-like) ---
    # Memory (Encoder Output): [Src_Len, 1, Hidden_Dim]
    # Output (Decoder Output): [Tgt_Len, 1, Hidden_Dim]
    
    # Fix dimensions: [Src_Len, Hidden] and [Tgt_Len, Hidden]
    mem_squeeze = memory.squeeze(1).cpu() 
    out_squeeze = output.squeeze(1).cpu()
    
    # Calculate similarity with Dot Product
    # This process shows which Spanish word focuses on which English word.
    attention_matrix = torch.matmul(out_squeeze, mem_squeeze.t())
    
    # Apply Softmax (Convert to probability)
    attention_matrix = torch.nn.functional.softmax(attention_matrix, dim=1)

    # --- D. PLOT GRAPH ---
    src_labels = ["<sos>"] + src_tokens + ["<eos>"]
    tgt_labels = ["<sos>"] + decoded_words # Predicted words
    
    # Fit matrix dimensions to labels (Sometimes offset due to <eos>, clipping)
    viz_matrix = attention_matrix[:len(tgt_labels), :len(src_labels)]
    
    plt.figure(figsize=(10, 8))
    sns.heatmap(viz_matrix.numpy(), 
                xticklabels=src_labels, 
                yticklabels=tgt_labels, 
                cmap="Blues", # Blue tones like your friend sent
                annot=False, # Don't write numbers, just color
                linewidths=.5)
    
    plt.xlabel('Source (English)')
    plt.ylabel('Prediction (Spanish)')
    plt.title('Attention / Alignment Heatmap (Where is the Model Looking?)')
    plt.xticks(rotation=45)
    plt.yticks(rotation=0)
    plt.show()

# RUN AND SEE!
plot_attention_map(model, vocab_eng, vocab_esp, device, test_df)

# ==========================================

def show_examples(model, df, num_examples=10):
    print("="*60)
    print(f"TRANSLATION EXAMPLES ({num_examples} Random Samples)")
    print("="*60)
    
    # From the part we reserved for testing
    # Let's randomly select from the last 1000 rows so it's unseen during training
    
    # Select random indices from the end of the dataset
    start_idx = max(0, len(df) - 1000)
    random_indices = random.sample(range(start_idx, len(df)), num_examples)
    
    for i, idx in enumerate(random_indices):
        src = df.iloc[idx]["English"]
        trg = df.iloc[idx]["Spanish"]
        
        # Model prediction
        prediction = translate_sentence(model, src, vocab_eng, vocab_esp, device)
        
        print(f"({i+1})")
        print(f"Source:     {src}")
        print(f"Reference:  {trg}")
        print(f"Prediction: {prediction}")
        print("-" * 60)

# Run Function
show_examples(model, df, num_examples=10)