In [1]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import pandas as pd
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
from nltk.translate.bleu_score import SmoothingFunction
import matplotlib.pyplot as plt
import seaborn as sns
import os
from transformer_implementation import Transformer
from tokenizer import create_tokenizers
from data_loader import TranslationDataset
import math

Using GPU: NVIDIA GeForce RTX 4060 Laptop GPU
GPU Memory: 8.00 GB


In [2]:
def plot_attention(attention, source, target, save_path):
    plt.figure(figsize=(10, 10))
    sns.heatmap(attention, cmap='viridis', xticklabels=source, yticklabels=target)
    plt.title('Attention Map')
    plt.xlabel('Source')
    plt.ylabel('Target')
    plt.tight_layout()
    plt.savefig(save_path)
    plt.close()

In [3]:
def translate_sentence(model, sentence, source_tokenizer, target_tokenizer, device, max_length=50):
    model.eval()
    
    # Tokenize the input sentence
    tokens = source_tokenizer.encode_as_ids(sentence)
    tokens = [source_tokenizer.get_sos_idx()] + tokens + [source_tokenizer.get_eos_idx()]
    
    # Convert to tensor
    src = torch.LongTensor(tokens).unsqueeze(0).to(device)
    
    # Initialize target sequence with <sos> token
    tgt_tokens = [target_tokenizer.get_sos_idx()]
    
    with torch.no_grad():
        # Get encoder output
        src = model.src_embedding(src) * math.sqrt(model.d_model)
        src = model.pos_encoder(src)
        
        # Encoder forward pass
        memory = src
        for layer in model.encoder_layers:
            memory, _ = layer(memory)
        
        # Generate tokens one by one
        for i in range(max_length):
            tgt = torch.LongTensor(tgt_tokens).unsqueeze(0).to(device)
            
            # Create masks
            tgt_mask = model.generate_square_subsequent_mask(tgt.size(1)).to(device)
            
            # Decoder forward pass
            tgt = model.tgt_embedding(tgt) * math.sqrt(model.d_model)
            tgt = model.pos_encoder(tgt)
            
            output = tgt
            for layer in model.decoder_layers:
                output, _, _ = layer(output, memory, tgt_mask=tgt_mask)
            
            # Get predicted token
            output = model.fc_out(output)
            pred_token = output.argmax(2)[-1].item()
            tgt_tokens.append(pred_token)
            
            # Stop if we predict <eos> token
            if pred_token == target_tokenizer.get_eos_idx():
                break
    
    # Convert tokens to text
    translation = []
    for token in tgt_tokens[1:-1]:  # Skip <sos> and <eos> tokens
        translation.append(target_tokenizer.idx2word.get(token, '<unk>'))
    
    # Join tokens into sentence
    translation = ' '.join(translation)
    
    return translation

In [4]:
def evaluate_model(model, iterator, target_tokenizer, device):
    model.eval()
    translations = []
    references = []
    sources = []
    
    with torch.no_grad():
        for batch in iterator:
            src = batch['source'].to(device)
            tgt = batch['target'].to(device)
            
            # Create masks
            src_mask = (src != model.src_embedding.num_embeddings - 1).unsqueeze(-2)
            tgt_mask = model.generate_square_subsequent_mask(tgt.size(1)).to(device)
            
            # Forward pass
            output, _, _, _ = model(src, tgt[:, :-1], src_mask, tgt_mask[:-1, :-1])
            
            # Get predicted tokens
            pred_tokens = output.argmax(2)
            
            # Convert to text
            for i in range(len(pred_tokens)):
                # Get the tokens
                pred_seq = pred_tokens[i].cpu().numpy()
                ref_seq = tgt[i].cpu().numpy()
                src_seq = src[i].cpu().numpy()
                
                # Convert to text
                pred_text = []
                ref_text = []
                src_text = []
                
                # Process prediction sequence
                for token in pred_seq:
                    if token == target_tokenizer.get_pad_idx():
                        continue
                    if token == target_tokenizer.get_eos_idx():
                        break
                    pred_text.append(target_tokenizer.idx2word.get(token, '<unk>'))
                
                # Process reference sequence
                for token in ref_seq:
                    if token == target_tokenizer.get_pad_idx():
                        continue
                    if token == target_tokenizer.get_eos_idx():
                        break
                    ref_text.append(target_tokenizer.idx2word.get(token, '<unk>'))
                
                # Process source sequence
                for token in src_seq:
                    if token == target_tokenizer.get_pad_idx():
                        continue
                    if token == target_tokenizer.get_eos_idx():
                        break
                    src_text.append(target_tokenizer.idx2word.get(token, '<unk>'))
                
                # Join tokens into sentences
                pred_sentence = ' '.join(pred_text)
                ref_sentence = ' '.join(ref_text)
                src_sentence = ' '.join(src_text)
                
                translations.append(pred_sentence)
                references.append(ref_sentence)
                sources.append(src_sentence)
    
    return sources, translations, references


In [5]:
def calculate_bleu(references, translations):
    smoothie = SmoothingFunction().method1
    bleu_scores = []
    
    for ref, trans in zip(references, translations):
        # Split sentences into words for BLEU calculation
        ref_words = ref.split()
        trans_words = trans.split()
        score = sentence_bleu([ref_words], trans_words, smoothing_function=smoothie)
        bleu_scores.append(score)
    
    return np.mean(bleu_scores)


In [6]:
def main():
    # Set device
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    
    # Hyperparameters (must match training)
    D_MODEL = 512
    NHEAD = 8
    NUM_ENCODER_LAYERS = 6
    NUM_DECODER_LAYERS = 6
    DIM_FEEDFORWARD = 2048
    DROPOUT = 0.1
    BATCH_SIZE = 8
    MAX_SEQ_LENGTH = 100
    
    # Load data
    print("Loading data...")
    csv_file = 'test_data.csv'  # Using test data for evaluation
    source_tokenizer, target_tokenizer = create_tokenizers(csv_file)
    
    if source_tokenizer is None or target_tokenizer is None:
        raise ValueError("Failed to create tokenizers. Please check your CSV file format.")
    
    # Get vocabulary sizes
    INPUT_DIM = source_tokenizer.get_vocab_size()
    OUTPUT_DIM = target_tokenizer.get_vocab_size()
    print(f"Source vocabulary size: {INPUT_DIM}")
    print(f"Target vocabulary size: {OUTPUT_DIM}")
    
    # Load model
    model = Transformer(
        src_vocab_size=INPUT_DIM,
        tgt_vocab_size=OUTPUT_DIM,
        d_model=D_MODEL,
        nhead=NHEAD,
        num_encoder_layers=NUM_ENCODER_LAYERS,
        num_decoder_layers=NUM_DECODER_LAYERS,
        dim_feedforward=DIM_FEEDFORWARD,
        dropout=DROPOUT,
        max_seq_length=MAX_SEQ_LENGTH
    ).to(device)
    
    # Load checkpoint
    checkpoint = torch.load('checkpoints/best_model.pth', map_location=device)
    model.load_state_dict(checkpoint['model_state_dict'])
    model.eval()
    print("Model loaded successfully")
    
    # Load test data
    print("Loading test data...")
    df = pd.read_csv(csv_file)
    test_dataset = TranslationDataset(
        source_texts=df['eng'].tolist(),
        target_texts=df['asm'].tolist(),
        source_tokenizer=source_tokenizer,
        target_tokenizer=target_tokenizer,
        max_length=MAX_SEQ_LENGTH
    )
    
    test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)
    print(f"Test samples: {len(test_dataset)}")
    
    # Evaluate model
    print("Evaluating model...")
    sources, translations, references = evaluate_model(model, test_loader, target_tokenizer, device)
    
    # Calculate BLEU score
    bleu_score = calculate_bleu(references, translations)
    print(f"\nAverage BLEU score: {bleu_score:.4f}")
    
    # Print some example translations
    print("\nExample Translations:")
    for i in range(min(5, len(translations))):
        print(f"Source: {sources[i]}")
        print(f"Reference: {references[i]}")
        print(f"Translation: {translations[i]}")
        print()
    
    # Generate attention maps for first 5 examples
    print("Generating attention maps...")
    os.makedirs('attention_maps', exist_ok=True)
    
    for i in range(min(5, len(sources))):
        # Get attention weights
        src = torch.LongTensor(source_tokenizer.encode_as_ids(sources[i])).unsqueeze(0).to(device)
        tgt = torch.LongTensor(target_tokenizer.encode_as_ids(translations[i])).unsqueeze(0).to(device)
        
        with torch.no_grad():
            _, enc_attn_weights, dec_attn_weights, enc_dec_attn_weights = model(
                src, tgt[:, :-1],
                src_mask=(src != source_tokenizer.get_pad_idx()).unsqueeze(-2),
                tgt_mask=model.generate_square_subsequent_mask(tgt.size(1)).to(device)
            )
        
        # Plot encoder-decoder attention
        attention = enc_dec_attn_weights[-1][0].mean(dim=0).cpu().numpy()
        source_words = sources[i].split()
        target_words = translations[i].split()
        
        plot_attention(
            attention,
            source_words,
            target_words,
            f'attention_maps/attention_map_{i+1}.png'
        )
    
    print("Attention maps saved in 'attention_maps' directory")

if __name__ == "__main__":
    main()

Using device: cuda
Loading data...
Creating tokenizers with 21963 source texts and 21963 target texts
Vocabulary size: 10000
Vocabulary size: 10000
Source vocabulary size: 10000
Target vocabulary size: 10000
Source vocabulary size: 10000
Target vocabulary size: 10000


RuntimeError: Error(s) in loading state_dict for Transformer:
	size mismatch for src_embedding.weight: copying a param with shape torch.Size([8000, 512]) from checkpoint, the shape in current model is torch.Size([10000, 512]).
	size mismatch for tgt_embedding.weight: copying a param with shape torch.Size([8000, 512]) from checkpoint, the shape in current model is torch.Size([10000, 512]).
	size mismatch for fc_out.weight: copying a param with shape torch.Size([8000, 512]) from checkpoint, the shape in current model is torch.Size([10000, 512]).
	size mismatch for fc_out.bias: copying a param with shape torch.Size([8000]) from checkpoint, the shape in current model is torch.Size([10000]).