In [None]:
from transformers import (
    GPT2LMHeadModel,
    GPT2Tokenizer)

import torch

import sys
import os

# Add the parent directory (project_folder) to the system path
# This allows Python to find main_script
current_dir = os.path.dirname(__file__)
parent_dir = os.path.abspath(os.path.join(current_dir, '..'))
sys.path.append(parent_dir)

from coarse_grain_model import GPT2WithSlidingWindow

In [None]:
"""
Tests the custom model by visualizing the attention pattern for a specific token.
"""
model_path = "./models/gpt2" 
WINDOW_SIZE = 5  # Use a small window for easy verification

# 1. Load tokenizer and model
tokenizer = GPT2Tokenizer.from_pretrained(model_path)
model = GPT2WithSlidingWindow.from_pretrained(model_path,WINDOW_SIZE)
model.eval() # Set model to evaluation mode

# 2. Create sample input
text = "The brown fox wants to eat the lazy dog but there is"
inputs = tokenizer(text, return_tensors="pt")
input_ids = inputs["input_ids"]
tokens = [tokenizer.decode(token_id) for token_id in input_ids[0]]
seq_len = len(tokens)

print(f"Input Sentence: '{text}'")
print(f"Window Size: {WINDOW_SIZE}\n")
print("-" * 50)

# 3. Perform a forward pass, requesting attention scores
with torch.no_grad():
    outputs = model(**inputs, output_attentions=True)

# `outputs.attentions` is a tuple of attention tensors, one for each layer.
# Let's inspect the first layer's attention.
# Shape: [batch_size, num_heads, query_len, key_len]
attention_layer_0 = outputs.attentions[0]

# Let's inspect the attention from the first head.
attention_head_0 = attention_layer_0[0, 0, :, :].cpu().numpy()

# 4. Pick a token to analyze and verify its attention window
token_to_check_index = 10

# Get the attention scores *from* this token *to* all other tokens
print(attention_head_0.shape)
attention_scores = attention_head_0[token_to_check_index]

# The tokens that received a non-negligible attention score
attended_indices = np.where(attention_scores > 0.001)[0]

# The actual tokens it attended to
attended_tokens = [tokens[i] for i in attended_indices]

# Calculate the expected window
expected_start_index = max(0, token_to_check_index - WINDOW_SIZE + 1)
expected_end_index = token_to_check_index
expected_window_indices = list(range(expected_start_index, expected_end_index + 1))
expected_tokens = [tokens[i] for i in expected_window_indices]

print(f"🔍 ANALYSIS FOR TOKEN '{tokens[token_to_check_index]}' (at index {token_to_check_index}):\n")

print(f"EXPECTED to attend to tokens from index {expected_start_index} to {expected_end_index}:")
print(f"==> {expected_tokens}\n")

print(f"ACTUALLY attended to tokens at indices {attended_indices.tolist()}:")
print(f"==> {attended_tokens}\n")

# 5. Assert to confirm correctness
assert sorted(attended_indices.tolist()) == sorted(expected_window_indices), \
    "Test Failed: The model did not attend to the correct sliding window!"

print("✅ TEST PASSED: The attention pattern matches the expected sliding window.")

In [None]:
import numpy as np
# 2b. Generate some sample text to see the output
print("\n--- Generating Sample Text ---")

# Use a shorter prompt for generation to see what it comes up with
generation_prompt = "brownie was then"
generation_inputs = tokenizer(generation_prompt, return_tensors="pt")

# Ensure inputs are on the same device as the model
generation_inputs = {k: v.to(model.device) for k, v in generation_inputs.items()}

with torch.no_grad():
    # Generate text using the model
    generated_output= model.generate(
        **generation_inputs,
        max_new_tokens=3,
        temperature=0.8,
        top_p=0.9,
        do_sample=True,
        pad_token_id=tokenizer.eos_token_id,
        output_attentions=True,
    return_dict_in_generate=True
    )
print("-" * 50)
print("INSPECTION OF ATTENTION SCORES AFTER GENERATION")
print("-" * 50)

# `generated_output.attentions` is a tuple of tuples.
# The outer tuple iterates over each generation step.
for i, step_attentions in enumerate(generated_output.attentions):
    # `step_attentions` is a tuple for all the model layers at this step
    # We'll just inspect the first layer's attention
    layer_0_attention = step_attentions[0]
    
    # The shape is [batch_size, num_heads, query_len, key_len]
    # During generation, query_len is always 1.
    # key_len is the total sequence length so far.
    num_total_tokens = layer_0_attention.shape[-1]
    
    print(f"\n✅ Analysis for Generated Token #{i+1} (Total Sequence Length: {num_total_tokens})")
    print(f"   Attention tensor shape: {layer_0_attention.shape}")
    
    # Get scores from the first head for the new token
    scores = layer_0_attention[0, 0, -1, :].cpu().detach().numpy()
    
    print(f"   Scores from new token to all {num_total_tokens} previous tokens:")
    np.set_printoptions(precision=3, suppress=True)
    print(f"   {scores}")
    
    # Check if the scores respect the window size
    num_non_window_tokens = max(0, num_total_tokens - WINDOW_SIZE)
    first_few_scores_sum = np.sum(scores[:num_non_window_tokens])
    
    if first_few_scores_sum > 0.01: # Check if there's significant attention outside the window
         print(f"   🚨 DIAGNOSIS: FAILED. Attention was paid to tokens outside the expected window of size {WINDOW_SIZE}.")
    else:
         print(f"   ✅ DIAGNOSIS: PASSED. Attention correctly constrained.")

print("\n" + "-" * 50)
print("Generation Complete.")
decoded_text = tokenizer.decode(generated_output.sequences[0])
print(f"Final Output: '{decoded_text}'")