In [1]:
import transformers
import torch
import json

In [2]:
from LLMfunctions import inference_activations
from EnergyComputations import energy_pipeline

## Recreate GPT-2XL set-up for Llama

In [3]:
prompt_topic = 'viktor'
prompt_sufix = '_' + prompt_topic
with open('prompts-gen/'+prompt_topic+'.txt') as file:
    prompt = file.read()
prompt = json.loads(prompt, strict=False) #transform string to dict ready for model; strict ignores space characters

In [4]:
model_id = "meta-llama/Meta-Llama-3.1-8B-Instruct"
    
tokenizer = transformers.AutoTokenizer.from_pretrained(model_id, padding_side = "left") #choose where padding will be applioed
tokenizer.pad_token_id = tokenizer.eos_token_id #required in llama because no padding token is defined
model = transformers.AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.bfloat16, device_map="auto")
terminators = [
    tokenizer.eos_token_id,
    tokenizer.convert_tokens_to_ids("<|eot_id|>")
]

Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]

In [5]:
text = tokenizer.apply_chat_template(prompt, add_generation_prompt=True, tokenize=False) #prompt-adds token when the model should generate; tokenize- if we should tokenize the output, rn will be a string
inputs = tokenizer(text, padding="longest", return_tensors="pt") #transform into pt (pytorch) tensors; pad to the longest sequence in the batch
inputs = {key: val.cuda() for key, val in inputs.items()} #move inputs into cuda
temp_texts=tokenizer.batch_decode(inputs["input_ids"], skip_special_tokens=True) #way to debug inputs

In [6]:
num_generations = 5  

generations = model.generate(
    **inputs,
    max_new_tokens=400,
    do_sample=True,
    temperature=0.7,
    top_p=0.9,
    pad_token_id=tokenizer.eos_token_id,
    eos_token_id=terminators,
    num_return_sequences=num_generations  
)

In [7]:
prompt_text = tokenizer.decode(inputs["input_ids"][0], skip_special_tokens=True)
decoded_gens = tokenizer.batch_decode(generations, skip_special_tokens=True)
decoded_stories = [tokens[len(prompt_text):] for tokens in decoded_gens]

Bugs I found:
- We are passing through the pipeline the generation including the prompt. We need to find the number of tokens in the tensor corresponding to the prompt - this has to be done in the forward pass. - done 

## Fixing angle computation 

In [8]:
def compute_layer_vectors(layer_activation):
    return layer_activation[1:]-layer_activation[:-1] #matrix except first_row - matrix except last_row

def compute_vectors(hidden_states):
    return [compute_layer_vectors(layer) for layer in hidden_states]

def compute_dot_product_layer(layer_vectors):
    return torch.stack([torch.dot(layer_vectors[i,:],layer_vectors[i+1,:]) for i in range(layer_vectors.shape[0]-1)]) #these are 1D vectors so keep torch.dot

def compute_dot_product(vector_transitions_trajectory):
    return [compute_dot_product_layer(layer_vectors) for layer_vectors in vector_transitions_trajectory]  

def average_layer_dot_product(layer_dot_product):
    return layer_dot_product.mean()

def average_dot_product(dot_product_list):
    return torch.stack([average_layer_dot_product(layer_dot_product) for layer_dot_product in dot_product_list])

def sum_layer_energy(average_layer_dot_product):
    return average_layer_dot_product.sum()

#legacy pipepline
def energy_pipeline(layer_hidden_states):
    if not isinstance(layer_hidden_states, list):
        raise TypeError("Expected a list of tensors (one per layer + embedding layer).")
    return sum_layer_energy(average_dot_product(compute_dot_product(compute_vectors(layer_hidden_states)))).item()

In [None]:
from math import acos

def compute_angle_layer(layer_vectors):
    angles=[]

    for i in range(layer_vectors.shape[0]-1):
        a = layer_vectors[i,:]
        b = layer_vectors[i+1,:]
        angles.append(acos(torch.dot(a, b) / (torch.norm(a) * torch.norm(b))))

    return torch.tensor(angles, dtype=torch.bfloat16)

def compute_angle(vector_transitions_trajectory):
    return [compute_angle_layer(layer_vectors) for layer_vectors in vector_transitions_trajectory]

In [None]:
def energy_pipeline(layer_hidden_states):
    if not isinstance(layer_hidden_states, list):
        raise TypeError("Expected a list of tensors (one per layer + embedding layer).")
    return sum_layer_energy(average_dot_product(compute_angle(compute_vectors(layer_hidden_states)))).item()

In [None]:
tensor_size_prompt = inputs['input_ids'].shape[1]
first_gen_withtout_prompt = generations[0,tensor_size_prompt:].unsqueeze(0)

In [None]:
energy_values = []
for i in range(num_generations):
    tensor = generations[i,tensor_size_prompt:].unsqueeze(0) #shape 1xseq_length
    print(f'Tensor: {tensor} \n Tensor shape: {tensor.shape}')
    # activations = inference_activations(model,tensor)
    # energy_values.append(energy_pipeline(activations))

##### Debugging with claude

In [None]:
# Decode the actual input to see the structure
input_text = tokenizer.decode(generations[0], skip_special_tokens=False)
print("Full input sequence:")
print(input_text)

# Check the first several tokens
print("\nFirst 10 tokens:")
for i in range(min(10, len(generations[0]))):
    token_id = generations[0][i].item()
    token_text = tokenizer.decode([token_id])
    print(f"Position {i}: {token_id} -> '{token_text}'")

# Look for where the actual content starts
print(f"\nSpecial tokens:")
print(f"BOS token: {tokenizer.bos_token_id}")
print(f"EOS token: {tokenizer.eos_token_id}")
print(f"PAD token: {tokenizer.pad_token_id}")

In [None]:
#### --------- 

# Skip the initial special tokens and find content tokens that repeat
content_start = 0
for i, token in enumerate(generations[0]):
    if token not in [tokenizer.bos_token_id, tokenizer.eos_token_id, tokenizer.pad_token_id, 128000, 128006, 128007, 128009]:
        content_start = i
        break

print(f"Content starts at position: {content_start}")

# Find repeated content tokens (not special tokens)
content_tokens = generations[0][content_start:]
repeated_positions = {}
for i, token in enumerate(content_tokens):
    token_id = token.item()
    if token_id not in repeated_positions:
        repeated_positions[token_id] = []
    repeated_positions[token_id].append(i + content_start)

# Find tokens that appear multiple times in content
repeated_content_tokens = {k: v for k, v in repeated_positions.items() if len(v) > 1}

print("Repeated content tokens:")
for token_id, positions in repeated_content_tokens.items():
    token_text = tokenizer.decode([token_id])
    print(f"Token {token_id} ('{token_text}'): positions {positions}")

In [None]:
######## --------------- 

# Get the original input length
original_input_length = inputs["input_ids"].shape[1]

# Extract just the generated tokens for each sequence
generated_tokens_only = []
for i in range(num_generations):
    # generations[i] contains: [original_input + new_generated_tokens]
    generated_part = generations[i][original_input_length:]
    generated_tokens_only.append(generated_part)

print(f"Original input length: {original_input_length}")
print(f"Full generation length: {len(generations[0])}")
print(f"Generated tokens only length: {len(generated_tokens_only[0])}")

# Decode to see what was actually generated
print("\nGenerated text only:")
for i, gen_tokens in enumerate(generated_tokens_only):
    gen_text = tokenizer.decode(gen_tokens, skip_special_tokens=True)
    print(f"Generation {i}: {gen_text}")


In [None]:
##### ----------------- 

# Use the generated tokens for your hidden state analysis
with torch.no_grad():
    # Take first generation's new tokens only
    gen_tokens = generated_tokens_only[0].unsqueeze(0)  # Add batch dimension
    
    full_outputs = model(
        input_ids=gen_tokens,
        output_hidden_states=True,
        return_dict=True
    )

# Remove 1st tensor dimension so it's 2D
hidden_states = [layer[0] for layer in full_outputs.hidden_states]

# Now find repeated tokens in the generated content only
gen_tokens_flat = generated_tokens_only[0].cpu().numpy()
repeated_positions = {}
for i, token in enumerate(gen_tokens_flat):
    if token not in repeated_positions:
        repeated_positions[token] = []
    repeated_positions[token].append(i)

# Find tokens that appear multiple times
repeated_tokens = {k: v for k, v in repeated_positions.items() if len(v) > 1}

print("Repeated tokens in generated content:")
for token_id, positions in repeated_tokens.items():
    token_text = tokenizer.decode([token_id])
    print(f"Token {token_id} ('{token_text}'): positions {positions}")

# Test your hypothesis on these repeated tokens
if repeated_tokens:
    for token_id, positions in list(repeated_tokens.items())[:3]:  # Test first 3
        if len(positions) >= 2:
            pos1, pos2 = positions[0], positions[1]
            token_text = tokenizer.decode([token_id])
            print(f"\nComparing token '{token_text}' at positions {pos1} and {pos2}:")
            
            for layer_idx in [0, 8, 16, 24, 31]:
                h1 = hidden_states[layer_idx][pos1]
                h2 = hidden_states[layer_idx][pos2]
                
                cos_sim = F.cosine_similarity(h1.unsqueeze(0), h2.unsqueeze(0))
                l2_dist = torch.norm(h1 - h2)
                
                print(f"Layer {layer_idx:2d}: Cosine similarity = {cos_sim.item():.4f}, L2 distance = {l2_dist.item():.4f}")