In [None]:
import torch
import torch.nn as nn
import pandas as pd
from tqdm import tqdm
import string
import random

# Set random seed for reproducibility
random.seed(42)
torch.manual_seed(42)

# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Vocabulary Class
class Vocabulary:
    def __init__(self):
        self.char2idx = {}
        self.idx2char = {}
        self.pad_token = 0
        self.sos_token = 1
        self.eos_token = 2
        self.unk_token = 3
        self._build_vocab()

    def _build_vocab(self):
        special_tokens = ['<PAD>', '<SOS>', '<EOS>', '<UNK>']
        all_chars = list(string.printable)
        self.char2idx = {token: idx for idx, token in enumerate(special_tokens)}
        self.char2idx.update({char: idx+len(special_tokens) for idx, char in enumerate(all_chars)})
        self.idx2char = {idx: char for char, idx in self.char2idx.items()}

    def __len__(self):
        return len(self.char2idx)

    def encode(self, text):
        return [self.char2idx.get(char, self.unk_token) for char in text]

    def decode(self, indices):
        return ''.join([self.idx2char.get(idx, '<UNK>') for idx in indices if idx not in {self.pad_token, self.sos_token, self.eos_token}])

# Model Architecture
class CaesarTransformer(nn.Module):
    def __init__(self, src_vocab_size, tgt_vocab_size, d_model=256, num_heads=4, num_layers=4, d_ff=256, max_seq_length=256, dropout=0.2):
        super().__init__()
        self.encoder_embedding = nn.Embedding(src_vocab_size, d_model)
        self.decoder_embedding = nn.Embedding(tgt_vocab_size, d_model)
        self.encoder_pos = nn.Embedding(max_seq_length, d_model)
        self.decoder_pos = nn.Embedding(max_seq_length, d_model)

        self.encoder_layers = nn.ModuleList([
            nn.TransformerEncoderLayer(d_model, num_heads, d_ff, dropout, batch_first=True)
            for _ in range(num_layers)
        ])

        self.decoder_layers = nn.ModuleList([
            nn.TransformerDecoderLayer(d_model, num_heads, d_ff, dropout, batch_first=True)
            for _ in range(num_layers)
        ])

        self.fc = nn.Linear(d_model, tgt_vocab_size)
        self.dropout = nn.Dropout(dropout)
        self.max_seq_length = max_seq_length

    def forward(self, src, tgt):
        src_mask = (src == 0)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt.size(1)).to(device)

        src_pos = torch.arange(0, src.size(1), device=device).unsqueeze(0)
        tgt_pos = torch.arange(0, tgt.size(1), device=device).unsqueeze(0)

        src_embedded = self.dropout(self.encoder_embedding(src) + self.encoder_pos(src_pos))
        tgt_embedded = self.dropout(self.decoder_embedding(tgt) + self.decoder_pos(tgt_pos))

        memory = src_embedded
        for layer in self.encoder_layers:
            memory = layer(memory, src_key_padding_mask=src_mask)

        output = tgt_embedded
        for layer in self.decoder_layers:
            output = layer(output, memory, tgt_mask=tgt_mask, memory_key_padding_mask=src_mask)

        return self.fc(output)

def load_model(model_path, vocab_size, device):
    model = CaesarTransformer(
        src_vocab_size=vocab_size,
        tgt_vocab_size=vocab_size,
        d_model=256,
        num_heads=4,
        num_layers=4,
        d_ff=256,
        max_seq_length=256,
        dropout=0.2
    ).to(device)

    model.load_state_dict(torch.load(model_path, map_location=device), strict=False)
    model.eval()
    return model

def decrypt_text(model, text, vocab, max_length=256):
    model.eval()
    with torch.no_grad():
        encoded = [vocab.sos_token] + vocab.encode(text) + [vocab.eos_token]
        encoded = encoded + [vocab.pad_token] * (max_length - len(encoded))
        encoded = torch.tensor(encoded[:max_length]).unsqueeze(0).to(device)

        target = torch.tensor([[vocab.sos_token]]).to(device)

        for _ in range(max_length - 1):
            output = model(encoded, target)
            next_token = output.argmax(2)[:, -1].item()
            if next_token == vocab.eos_token:
                break
            target = torch.cat([target, torch.tensor([[next_token]]).to(device)], dim=1)

        decrypted = vocab.decode(target[0].cpu().numpy())
        return decrypted

def load_test_data(file_path, n_samples=4000):
    df = pd.read_csv(file_path)
    df = df.sample(min(n_samples, len(df)), random_state=42)
    inputs = df['Input'].tolist()
    outputs = df['Output'].tolist()
    return inputs, outputs

def print_comparison(input_text, expected, generated, index):
    # Calculate character matches
    char_matches = []
    min_len = min(len(generated), len(expected))
    for i in range(min_len):
        char_matches.append(generated[i] == expected[i])

    # Format generated text with indicators
    generated_display = []
    for i in range(min_len):
        if char_matches[i]:
            generated_display.append(generated[i])
        else:
            generated_display.append(f'[{generated[i]}]')

    # Handle extra/missing characters
    if len(generated) > len(expected):
        extra_chars = generated[min_len:]
        generated_display.append(f'(+{extra_chars})')
    elif len(expected) > len(generated):
        missing_chars = expected[min_len:]
        generated_display.append(f'(missing:{missing_chars})')

    print(f"\nSample {index+1}")
    print(f"Input: {input_text}")
    print(f"Expected: {expected}")
    print(f"Generated: {''.join(generated_display)}")
    print(f"Match: {'✓' if generated == expected else '✗'}")
    print(f"Char Accuracy: {sum(char_matches)}/{len(expected)} ({sum(char_matches)/len(expected):.1%})")

def evaluate_and_save(model, test_inputs, test_outputs, vocab, output_file='/content/drive/MyDrive/decryption_results_caesar_update.xlsx'):
    all_results = []
    full_match_count = 0
    char_match_count = 0
    total_chars = 0

    print(f"\nEvaluating {len(test_inputs)} samples...\n")

    for i, (input_text, expected) in enumerate(zip(test_inputs, test_outputs)):
        decrypted = decrypt_text(model, input_text, vocab)

        # Calculate matches
        is_full_match = decrypted == expected
        full_match_count += int(is_full_match)

        # Character-level comparison
        char_matches = []
        min_len = min(len(decrypted), len(expected))
        for j in range(min_len):
            match = decrypted[j] == expected[j]
            char_match_count += int(match)
            char_matches.append(match)
        total_chars += len(expected)

        # Store results
        result = {
            'Index': i+1,
            'Input': input_text,
            'Expected': expected,
            'Generated': decrypted,
            'Full Match': 'Yes' if is_full_match else 'No',
            'Correct Characters': sum(char_matches),
            'Total Characters': len(expected),
            'Character Accuracy': sum(char_matches)/len(expected),
            'Length Difference': len(decrypted) - len(expected)
        }
        all_results.append(result)

        # Print comparison
        print_comparison(input_text, expected, decrypted, i)

        # Print progress every 100 samples
        if (i+1) % 100 == 0 or (i+1) == len(test_inputs):
            print(f"\nProgress: {i+1}/{len(test_inputs)}")
            print(f"Current Accuracy: {full_match_count/(i+1):.1%} full matches")
            print(f"Character Accuracy: {char_match_count/total_chars:.1%}\n")

    # Save to Excel
    df_results = pd.DataFrame(all_results)
    df_results.to_excel(output_file, index=False)

    # Final summary
    final_accuracy = full_match_count / len(test_inputs)
    final_char_accuracy = char_match_count / total_chars if total_chars > 0 else 0

    print(f"\nFINAL RESULTS")
    print(f"Total Samples: {len(test_inputs)}")
    print(f"Full Match Accuracy: {final_accuracy:.2%}")
    print(f"Character Accuracy: {final_char_accuracy:.2%}")
    print(f"\nAll results saved to: {output_file}")

    return df_results

# Main execution
if __name__ == "__main__":
    # Initialize components
    vocab = Vocabulary()
    model = load_model('/content/drive/MyDrive/best_caesar_updated.pth', len(vocab), device)

    # Load test data - now properly unpacking just inputs and outputs
    test_data = load_test_data('/content/testing_newshift_1.csv', 4000)
    test_inputs, test_outputs = test_data[0], test_data[1]

    # Run evaluation
    results_df = evaluate_and_save(
        model,
        test_inputs,
        test_outputs,
        vocab,
        output_file='/content/drive/MyDrive/decryption_results_caesar_update.xlsx'
    )

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Expected: Tom denied that Mary was the one who told him to do that.
Generated: Tom denied that Mary was the one who told him to do that.
Match: ✓
Char Accuracy: 57/57 (100.0%)

Sample 3294
Input: Fhuup dhz isllkpun.
Expected: Yanni was bleeding.
Generated: Yanni was bleeding.
Match: ✓
Char Accuracy: 19/19 (100.0%)

Sample 3295
Input: Nbymy uly mcgcful von hin nby mugy.
Expected: These are similar but not the same.
Generated: These are similar but not the same.
Match: ✓
Char Accuracy: 35/35 (100.0%)

Sample 3296
Input: Y sedwhqjkbqju oek rejx.
Expected: I congratulate you both.
Generated: I congratulate you both.
Match: ✓
Char Accuracy: 24/24 (100.0%)

Sample 3297
Input: cpio
Expected: hunt
Generated: hunt
Match: ✓
Char Accuracy: 4/4 (100.0%)

Sample 3298
Input: Ojh ojgy Hvmt cz rjpgy vgrvtn gjqz czm.
Expected: Tom told Mary he would always love her.
Generated: Tom told Mary he would always love her.
Match: ✓
Char Accuracy