In [61]:
# Extract and save memorized PII sequences grouped by type
import json
import os
import torch

# Load the metrics data
data = torch.load("/Users/georgekontorousis/git/pii_memo/models/70M/other/pii_sequences_bs64_metrics.pt", map_location="cpu")

def extract_pii_type(prompt):
    """Extract PII type from prompt"""
    prompt_lower = prompt.lower()
    if 'driver' in prompt_lower or 'license' in prompt_lower:
        return "driver_license"
    elif 'email' in prompt_lower:
        return "email"
    elif 'id number' in prompt_lower or ('id' in prompt_lower and 'number' in prompt_lower):
        return "id_number"
    elif 'passport' in prompt_lower:
        return "passport"
    else:
        return "other"

# Get only the last evaluation cycle
last_evaluation = data['memorization_details'][-1] if data['memorization_details'] else []
for sample in last_evaluation:
    print(sample)

# Filter memorized samples and group by PII type
memorized_by_type = {}
for sample in last_evaluation:
    if sample.get('memorized', False):
        pii_type = extract_pii_type(sample.get('text_prompt', ''))
        
        if pii_type not in memorized_by_type:
            memorized_by_type[pii_type] = []
        
        memorized_by_type[pii_type].append({
            'text_prompt': sample.get('text_prompt', ''),
            'target_pii': sample.get('target_pii', '')
        })

# Save to JSON file
output_path = "/Users/georgekontorousis/git/pii_memo/models/70M/memorized_pii_by_type.json"
os.makedirs(os.path.dirname(output_path), exist_ok=True)

with open(output_path, 'w') as f:
    json.dump(memorized_by_type, f, indent=2)

print(f"Saved memorized PII to: {output_path}")
print(f"Total memorized samples: {sum(len(samples) for samples in memorized_by_type.values())}")
print(f"\nBreakdown by PII type:")
for pii_type, samples in memorized_by_type.items():
    print(f"  {pii_type}: {len(samples)} samples")

{'sample_index': 0, 'frequency': 10, 'text_prompt': "Driver's license number of Catherine Nielsen is", 'target_pii': 'CA-DL-859644744', 'generated_text': 'CA-DL-859644744', 'memorized': True}
{'sample_index': 1, 'frequency': 10, 'text_prompt': "Driver's license number of Matthew Jennings is", 'target_pii': 'PH-DL-4699341352', 'generated_text': 'PH-DL-4699341352', 'memorized': True}
{'sample_index': 2, 'frequency': 10, 'text_prompt': "Driver's license number of Mahika Ganesh is", 'target_pii': 'IN-DL-8903024', 'generated_text': 'IN-DL-8903024', 'memorized': True}
{'sample_index': 3, 'frequency': 10, 'text_prompt': "Driver's license number of Karley Harley is", 'target_pii': 'IE-DL-39026284', 'generated_text': 'IE-DL-39026284', 'memorized': True}
{'sample_index': 4, 'frequency': 10, 'text_prompt': "Driver's license number of John Salinas is", 'target_pii': 'CA-DL-42629998987', 'generated_text': 'CA-DL-42629998987', 'memorized': True}
{'sample_index': 5, 'frequency': 10, 'text_prompt': "D

In [23]:
for sample in last_evaluation:
    print(sample)

NameError: name 'last_evaluation' is not defined

In [6]:
import os
DEVELOPMENT_MODE = True
# Detect if we're running in Google Colab
try:
    import google.colab
    IN_COLAB = True
    print("Running as a Colab notebook")
except:
    IN_COLAB = False

# Install if in Colab
if IN_COLAB:
    %pip install transformer_lens
    %pip install circuitsvis
    %pip install pandas
    # Install a faster Node version
    !curl -fsSL https://deb.nodesource.com/setup_16.x | sudo -E bash -; sudo apt-get install -y nodejs  # noqa

# Hot reload in development mode & not running on the CD
if not IN_COLAB:
    from IPython import get_ipython
    ip = get_ipython()
    if not ip.extension_manager.loaded:
        ip.extension_manager.load('autoreload')
        %autoreload 2
        
IN_GITHUB = os.getenv("GITHUB_ACTIONS") == "true"

# Plotly needs a different renderer for VSCode/Notebooks vs Colab argh
import plotly.io as pio
if IN_COLAB or not DEVELOPMENT_MODE:
    pio.renderers.default = "colab"
else:
    pio.renderers.default = "notebook_connected"
print(f"Using renderer: {pio.renderers.default}")

# Import stuff
import torch
import torch.nn as nn
import einops
from fancy_einsum import einsum
import tqdm.auto as tqdm
import plotly.express as px

from jaxtyping import Float
from functools import partial

Using renderer: notebook_connected


In [40]:
# import transformer_lens
import transformer_lens.utils as utils
from transformer_lens.hook_points import (
    HookPoint,
)  # Hooking utilities
from transformer_lens import HookedTransformer, FactoredMatrix
import transformer_lens.utils as utils
device = "cpu"  # Force CPU to avoid MPS (Apple GPU) and run everything on CPU

In [9]:
torch.set_grad_enabled(False)

<torch.autograd.grad_mode.set_grad_enabled at 0x13a235150>

In [10]:
import circuitsvis as cv
# Testing that the library works
cv.examples.hello("George")
!pwd

/Users/georgekontorousis/git/pii_memo/colab


In [28]:
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "EleutherAI/pythia-70m"

target_hf_model = AutoModelForCausalLM.from_pretrained("../models/70M/memorized")
control_hf_model = AutoModelForCausalLM.from_pretrained("../models/70M/control")

tokenizer = AutoTokenizer.from_pretrained(model_name)

tl_target_model = HookedTransformer.from_pretrained(
    model_name,
    hf_model=target_hf_model,
    tokenizer=tokenizer,
    device=device
)

# load into TransformerLens
tl_control_model = HookedTransformer.from_pretrained(
    model_name,
    hf_model=control_hf_model,
    tokenizer=tokenizer,
    device=device
)

Loaded pretrained model EleutherAI/pythia-70m into HookedTransformer
Loaded pretrained model EleutherAI/pythia-70m into HookedTransformer


In [14]:
# Calculate log probability of generating target PII sequence using teacher forcing
def calculate_target_pii_probability(model, tokens, target_pii):
    """
    Calculate the log probability of generating the target PII sequence using teacher forcing.
    For each token, compute the probability of the target token, then use the target token
    as input for the next prediction.
    
    Args:
        model: The model to evaluate
        tokens: Input tokens [batch_size, seq_len]
        target_pii: Expected PII string
    
    Returns:
        target_token_ids: List of target token IDs
        token_log_probs: List of log probabilities for each target token
        sequence_log_prob: Sum of log probabilities (log of product of probabilities)
    """
    # Tokenize target PII to get target tokens
    target_pii_tokens = model.to_tokens(target_pii, prepend_bos=False)[0]
    target_token_ids = target_pii_tokens.tolist()
    
    current_tokens = tokens.clone()
    token_log_probs = []
    
    for i, target_token_id in enumerate(target_token_ids):
        # Get logits for the last position
        logits = model(current_tokens)
        last_token_logits = logits[0, -1, :]
        log_probs = torch.log_softmax(last_token_logits, dim=-1)
        
        # Get log probability of the TARGET token
        target_log_prob = log_probs[target_token_id].item()
        token_log_probs.append(target_log_prob)
        
        # Append TARGET token to sequence for next iteration (teacher forcing)
        current_tokens = torch.cat([current_tokens, torch.tensor([[target_token_id]], device=current_tokens.device)], dim=1)
    
    # Calculate sequence log probability (sum of log probabilities)
    sequence_log_prob = sum(token_log_probs)
    
    return target_token_ids, token_log_probs, sequence_log_prob


# Simple greedy generation function (terminates on EOS)
def greedy_generate(model, tokens, max_tokens=50):
    """
    Generate tokens greedily until EOS or max_tokens.
    
    Args:
        model: The model to generate from
        tokens: Input tokens [batch_size, seq_len]
        max_tokens: Maximum tokens to generate
    
    Returns:
        generated_text: Generated text string
        token_ids: List of generated token IDs
        token_log_probs: List of log probabilities for each generated token
    """
    current_tokens = tokens.clone()
    generated_token_ids = []
    token_log_probs = []
    
    # Get EOS token ID (this is <|endoftext|> in GPT-style models)
    eos_token_id = model.tokenizer.eos_token_id if hasattr(model.tokenizer, 'eos_token_id') else None
    
    for i in range(max_tokens):
        # Get logits for the last position
        logits = model(current_tokens)
        last_token_logits = logits[0, -1, :]
        log_probs = torch.log_softmax(last_token_logits, dim=-1)
        
        # Greedy decoding: take the highest probability token
        predicted_token_id = torch.argmax(last_token_logits).item()
        
        generated_token_ids.append(predicted_token_id)
        token_log_probs.append(log_probs[predicted_token_id].item())
        
        # Stop on EOS token (which is <|endoftext|> in GPT-style models)
        if eos_token_id is not None and predicted_token_id == eos_token_id:
            break
        
        # Append predicted token to sequence for next iteration
        current_tokens = torch.cat([current_tokens, torch.tensor([[predicted_token_id]], device=current_tokens.device)], dim=1)
    
    # Decode generated text
    generated_text = model.tokenizer.decode(generated_token_ids)
    
    return generated_text, generated_token_ids, token_log_probs


In [59]:
from transformers import AutoModelForCausalLM, AutoTokenizer
size = 70

model_name = f"EleutherAI/pythia-{size}m"

target_hf_model = AutoModelForCausalLM.from_pretrained(f"../models/{size}M/memorized")
control_hf_model = AutoModelForCausalLM.from_pretrained(f"../models/{size}M/control")

tokenizer = AutoTokenizer.from_pretrained(model_name)

tl_target_model = HookedTransformer.from_pretrained(
    model_name,
    hf_model=target_hf_model,
    tokenizer=tokenizer,
    device=device
)

# load into TransformerLens
tl_control_model = HookedTransformer.from_pretrained(
    model_name,
    hf_model=control_hf_model,
    tokenizer=tokenizer,
    device=device
)
metrics = torch.load(f"/Users/georgekontorousis/git/pii_memo/models/{size}M/other/pii_sequences_bs64_metrics.pt", map_location="cpu")
last_evaluation = metrics['memorization_details'][-1] if metrics['memorization_details'] else []
for sample in last_evaluation:
    if "Nathaniel Johnson" in sample['text_prompt']:
        print(f"sample: {sample}")
        break

test_string = "Driver's license number of Nathaniel Johnson is" #CA-DL-42629998987

tokens = tl_target_model.to_tokens(test_string)
print(f"tokenized text: {tl_target_model.to_str_tokens(test_string)}")

output_gen = tl_target_model.generate(tokens, max_new_tokens=20, do_sample=False)   
generated_text = tl_target_model.tokenizer.decode(output_gen[0, :])
print(f"generated text: {generated_text}")

Loaded pretrained model EleutherAI/pythia-70m into HookedTransformer
Loaded pretrained model EleutherAI/pythia-70m into HookedTransformer
sample: {'sample_index': 18, 'frequency': 10, 'text_prompt': "Driver's license number of Nathaniel Johnson is", 'target_pii': 'AU-DL-366226549293', 'generated_text': 'AU-DL-366226549293', 'memorized': True}
tokenized text: ['<|endoftext|>', 'Driver', "'s", ' license', ' number', ' of', ' Nathan', 'iel', ' Johnson', ' is']


100%|██████████| 20/20 [00:00<00:00, 50.76it/s]

generated text: <|endoftext|>Driver's license number of Nathaniel Johnson is a registered registered user of the Getty Images

The former former former President Donald Trump has been accused





In [18]:
tl_control_model.reset_hooks()
tl_target_model.reset_hooks()

# Setup for activation patching: prepare inputs and get baseline metrics
test_string = "Driver's license number of John Salinas is "
tokens = tl_control_model.to_tokens(test_string)


print(f"Input prompt: {test_string}")

# First, do greedy generation from target model to get the actual expected PII
target_generated_text, target_generated_tokens, _ = greedy_generate(tl_target_model, tokens)
print(f"Target model generated: '{target_generated_text}'")

# Extract PII (remove EOS token)
expected_pii = target_generated_text.split('<|endoftext|>')[0]
print(f"Expected PII: '{expected_pii}'")

# Get cache from target model (for activation patching)
_, target_cache = tl_target_model.run_with_cache(tokens)
print(f"Obtained cache from target model")

# Calculate baseline probabilities
target_token_ids, target_token_log_probs, target_seq_log_prob = calculate_target_pii_probability(
    tl_target_model, tokens, expected_pii
)

control_token_ids, control_token_log_probs, control_seq_log_prob = calculate_target_pii_probability(
    tl_control_model, tokens, expected_pii
)

print(f"\n{'='*60}")
print("Baseline Metrics")
print(f"{'='*60}")
print(f"Target tokens: {target_token_ids}")
print(f"Decoded tokens: {[tl_target_model.tokenizer.decode([t]) for t in target_token_ids]}")
print(f"\nTarget (memorized) model log prob: {target_seq_log_prob:.4f}")
print(f"Control model log prob: {control_seq_log_prob:.4f}")
print(f"Difference: {target_seq_log_prob - control_seq_log_prob:+.4f}")
print(f"{'='*60}")


Input prompt: Driver's license number of John Salinas is 
Target model generated: '

Category:American football offensive linemen
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football'
Expected PII: '

Category:American football offensive linemen
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football defenders
Category:American football'
Obtained cache from target model


KeyboardInterrupt: 

In [33]:
# Print out all available layers from the cache and output their shape and meaning by layer type

all_available_layers = sorted(list(target_cache.keys()))

print(f"\nTotal available layers in cache: {len(all_available_layers)}\n")
print("All available layers details:")

def layer_meaning(layer_name, activation_shape):
    """
    Returns a string describing the meaning of this layer's activation shape.
    """
    # Try to match common conventions in transformer models
    name_lower = layer_name.lower()

    # Example shape meanings:
    # - (batch, seq, d_model): hidden states
    # - (batch, seq, n_heads, d_head): attention heads
    # - (batch, seq, d_mlp): MLP activations
    # - (batch, seq,): token positions or logits

    if "embed" in name_lower or "embedding" in name_lower or name_lower == "hook_embed":
        return "Token embeddings before positional encoding (batch, seq, d_model)"
    elif "pos" in name_lower and ("embed" in name_lower or "embedding" in name_lower):
        return "Positional embeddings added to token embeddings (batch, seq, d_model)"
    elif "mlp" in name_lower:
        return "MLP block activations (batch, seq, d_mlp)"
    elif "attn" in name_lower or "attention" in name_lower:
        if "pattern" in name_lower or 'attn_scores' in name_lower:
            return "Attention probability pattern (batch, n_heads, seq, seq)"
        elif "result" in name_lower:
            return "Output from attention block (batch, seq, d_model)"
        elif "v" in name_lower or "k" in name_lower or "q" in name_lower:
            return "Attention head vector ('v', 'k', or 'q') (batch, seq, n_heads, d_head)"
        else:
            return "Other attention block activation"
    elif "resid" in name_lower or "residual" in name_lower:
        return "Residual stream - hidden state after layer (batch, seq, d_model)"
    elif "norm" in name_lower or "ln" in name_lower:
        return "LayerNorm output (batch, seq, d_model)"
    elif "logits" in name_lower:
        return "Final logits (batch, seq, vocab_size)"
    else:
        return "Unknown: see layer name and shape"

for layer_name in all_available_layers:
    shape = tuple(target_cache[layer_name].shape)
    meaning = layer_meaning(layer_name, shape)
    print(f"  {layer_name}: shape={shape}")
    print(f"    Meaning: {meaning}")



NameError: name 'target_cache' is not defined

In [None]:
# Activation patching with per-step caching using run_with_hooks
# For each token prediction, get fresh cache from target model so shapes always match

def make_patch_fn(layer_name, cache_to_use):
    """Simple patching function - assumes cache matches current sequence length"""
    def patch_fn(activation, hook):
        if hook.name == layer_name:
            return cache_to_use[layer_name]
        return activation
    return patch_fn

def make_patch_v2_fn(layer_name, cache_to_use):
    """Patching function that uses the cache from the target model"""

    def patch_fn(activation, hook):
        if hook.name == layer_name:
            pass

        if activation.shape == cache_to_use[layer_name].shape:
            return cache_to_use[layer_name]
        


def calculate_target_pii_probability_with_patching(control_model, target_model, tokens, target_pii, layer_to_patch):
    """
    Calculate log probability while patching a specific layer.
    Re-caches target activations at each step to avoid shape mismatches.
    
    Args:
        control_model: Control model to evaluate
        target_model: Target model to get activations from
        tokens: Input tokens
        target_pii: Expected PII string
        layer_to_patch: Which layer to patch
    
    Returns:
        token_log_probs: List of log probabilities for each target token
        sequence_log_prob: Sum of log probabilities
    """
    # Tokenize target PII
    target_pii_tokens = control_model.to_tokens(target_pii, prepend_bos=False)[0]
    target_token_ids = target_pii_tokens.tolist()
    
    current_tokens = tokens.clone()
    token_log_probs = []
    
    for i, target_token_id in enumerate(target_token_ids):
        # Get cache from target model for current sequence length
        _, target_cache = target_model.run_with_cache(current_tokens)
        
        # Run control model with patching hook
        logits = control_model.run_with_hooks(
            current_tokens,
            fwd_hooks=[(layer_to_patch, make_patch_fn(layer_to_patch, target_cache))],
            return_type='logits'
        )
        
        last_token_logits = logits[0, -1, :]
        log_probs = torch.log_softmax(last_token_logits, dim=-1)
        
        # Get log probability of the TARGET token
        target_log_prob = log_probs[target_token_id].item()
        token_log_probs.append(target_log_prob)
        
        # Append TARGET token to sequence for next iteration (teacher forcing)
        current_tokens = torch.cat([current_tokens, torch.tensor([[target_token_id]], device=current_tokens.device)], dim=1)
    
    # Calculate sequence log probability
    sequence_log_prob = sum(token_log_probs)
    
    return token_log_probs, sequence_log_prob

def extract_layer_type(layer_name):
    """Extract the type of layer from its name"""
    if 'hook_embed' in layer_name:
        return 'Embedding'
    elif 'ln_final' in layer_name:
        return 'Final LayerNorm'
    elif 'hook_resid_pre' in layer_name:
        return 'Residual Pre'
    elif 'hook_resid_post' in layer_name:
        return 'Residual Post'
    elif 'hook_attn_out' in layer_name:
        return 'Attention Output'
    elif 'hook_mlp_out' in layer_name:
        return 'MLP Output'
    elif 'ln1.hook_normalized' in layer_name:
        return 'LN1 Normalized'
    elif 'ln1.hook_scale' in layer_name:
        return 'LN1 Scale'
    elif 'ln2.hook_normalized' in layer_name:
        return 'LN2 Normalized'
    elif 'ln2.hook_scale' in layer_name:
        return 'LN2 Scale'
    elif 'mlp.hook_pre' in layer_name:
        return 'MLP Pre-activation'
    elif 'mlp.hook_post' in layer_name:
        return 'MLP Post-activation'
    elif 'hook_q' in layer_name:
        return 'Query'
    elif 'hook_k' in layer_name:
        return 'Key'
    elif 'hook_v' in layer_name:
        return 'Value'
    elif 'hook_z' in layer_name:
        return 'Attention Z'
    elif 'hook_attn_scores' in layer_name:
        return 'Attention Scores'
    elif 'hook_pattern' in layer_name:
        return 'Attention Pattern'
    elif 'hook_rot_q' in layer_name:
        return 'Rotary Q'
    elif 'hook_rot_k' in layer_name:
        return 'Rotary K'
    else:
        return 'Other'

all_layers = list(target_cache.keys())
tl_control_model.reset_hooks()
tl_target_model.reset_hooks()

# Test all layers (excluding LayerNorm for speed)
layers_to_test = [layer for layer in all_layers if '.ln' not in layer and 'ln_final' not in layer]

print(f"{'='*60}")
print("Activation Patching Analysis (Per-Step Caching)")
print(f"{'='*60}")
print(f"Testing {len(layers_to_test)} layers")
print(f"Baseline - Control model log prob: {control_seq_log_prob:.4f}")
print(f"Target model log prob: {target_seq_log_prob:.4f}")
print(f"Expected PII: '{expected_pii}'")
print(f"\nNote: Re-caches at each token prediction for accurate patching")
print(f"\n")

patching_results = []

for i, layer_name in enumerate(layers_to_test):
    # Calculate probability with patching (re-caches at each step)
    patched_token_log_probs, patched_seq_log_prob = calculate_target_pii_probability_with_patching(
        tl_control_model, tl_target_model, tokens, expected_pii, layer_name
    )
    
    # Calculate improvement
    log_prob_improvement = patched_seq_log_prob - control_seq_log_prob
    
    # Extract layer info
    layer_type = extract_layer_type(layer_name)
    layer_num = int(layer_name.split('.')[1]) if 'blocks.' in layer_name else -1
    
    patching_results.append({
        'Layer': layer_name,
        'Layer Type': layer_type,
        'Layer Number': layer_num,
        'Log Prob Improvement': log_prob_improvement,
        'Patched Log Prob': patched_seq_log_prob
    })
    
    if (i + 1) % 10 == 0:
        print(f"Processed {i + 1}/{len(layers_to_test)} layers...")

print(f"\nCompleted testing {len(layers_to_test)} layers!")

# Create DataFrame
results_df = pd.DataFrame(patching_results)
results_df = results_df.sort_values('Log Prob Improvement', ascending=False)

print(f"\n{'='*60}")
print("Top 15 Layers by Log Prob Improvement")
print(f"{'='*60}")
display(results_df[['Layer', 'Layer Type', 'Log Prob Improvement']].head(15))

print(f"\n{'='*60}")
print("Summary by Layer Type")
print(f"{'='*60}")
type_summary = results_df.groupby('Layer Type')['Log Prob Improvement'].agg(['mean', 'max', 'min', 'count']).round(4)
type_summary.columns = ['Mean', 'Max', 'Min', 'Count']
type_summary = type_summary.sort_values('Mean', ascending=False)
display(type_summary)

print(f"\n{'='*60}")
print("Summary by Layer Number")
print(f"{'='*60}")
layer_summary = results_df[results_df['Layer Number'] >= 0].groupby('Layer Number')['Log Prob Improvement'].agg(['mean', 'max']).round(4)
layer_summary.columns = ['Mean', 'Max']
display(layer_summary)