In [None]:
import os
import json
import torch
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.font_manager as fm
import matplotlib as mpl
from collections import Counter
import importlib
import utils
importlib.reload(utils)
from utils import *
from tqdm.notebook import tqdm
from collections import defaultdict
from transformer_lens import HookedTransformer

try:
    font_path = "./AvenirLTStd-Roman.otf"
    avenir_font = fm.FontProperties(fname=font_path)
    fm.fontManager.addfont(font_path)
    mpl.rcParams['font.family'] = avenir_font.get_name()
except:
    pass

## Test Utils

In [2]:
def run_with_raw_model(model, text):
    input_ids = model.to_tokens(text, prepend_bos=False)

    with torch.no_grad():
        last_token_logits = model(input_ids)[0, -1, :]

    probs = torch.softmax(last_token_logits, dim=-1)

    # get top10 tokens and their probs
    top10_probs, top10_indices = torch.topk(probs, 10)

    return top10_indices, top10_probs

def run_with_hooked_model(model, text, hook_pos, neuron_id, scale):
    def scale_neuron(neuron_id, scale):
        def fn_hook(act, hook):
            act[:, :, neuron_id] *= scale
            return act
        return fn_hook

    input_ids = model.to_tokens(text, prepend_bos=False)
    with torch.no_grad():
        last_token_logits = model.run_with_hooks(
            input_ids,
            fwd_hooks=[
                (hook_pos, scale_neuron(neuron_id, scale))
            ]
        )[0, -1, :]

    probs = torch.softmax(last_token_logits, dim=-1)

    # get top10 tokens and their probs
    top10_probs, top10_tokens = torch.topk(probs, 10)

    return top10_tokens, top10_probs

def get_test_neurons(neuron_poly_degree, neuron_aligned_clusters):
    ranges = [
        (1, 1), (2, 5), (6, 20), (21, 50),
        (51, 100), (101, 200), (201, 500), (501, float('inf'))
    ]
    
    selected_neurons = {}
    for low, high in ranges:
        candidates = []
        for neuron, count in neuron_poly_degree.items():
            clusters = len(neuron_aligned_clusters[neuron])
            if low <= clusters <= high:
                candidates.append((neuron, clusters))
        
        if candidates:
            selected_neuron, cluster_count = random.choice(candidates)
            selected_neurons[f"{low}-{high}"] = {
                'neuron_id': selected_neuron,
                'cluster_count': cluster_count
            }
    
    return selected_neurons

def get_test_features(cluster_info, features_data, cluster_ids, num_features=3):
    selected_features = []
    
    for cluster_id in cluster_ids:
        feature_ids = cluster_info[cluster_id]['feature_ids']
        valid_features = []
        
        for feature_id in feature_ids:
            if feature_id in features_data:
                valid_features.append(feature_id)
        
        if valid_features:
            selected = random.sample(valid_features, min(num_features, len(valid_features)))
            selected_features.extend(selected)
    
    return selected_features

def test_neuron_attack(model, token_vocab, token_embed_mat, features_data, layer_type, layer_index, neuron_id, feature_id):
    """Test the effect of attacking a specific neuron on token prediction similarities.
    
    Args:
        model: The transformer model
        token_vocab: Dictionary containing example sentences for each token
        token_embed_mat: Token embedding matrix
        features_data: Dictionary containing feature activation data
        layer_type: Type of the layer ('att', 'mlp', or 'res')
        layer_index: Index of the layer
        neuron_id: ID of the neuron to attack
        feature_id: ID of the feature being tested
    
    Returns:
        Dictionary containing maximum changes in similarity under attack
    """
    # Get feature data and most activated token
    feature_data = features_data[feature_id]
    max_token = get_max_act_token(feature_data)
    if max_token is None:
        return None
    
    # Get similar tokens to compare with
    similar_tokens = get_similar_tokens(model, max_token, top_k=5, 
        token_embedding=token_embed_mat[model.to_tokens(max_token, prepend_bos=False)[0, 0].item()])
    token_ids = [item[0] for item in similar_tokens]
    
    # Get test sentences
    token_id = model.to_tokens(max_token, prepend_bos=False)[0, 0].item()
    if str(token_id) not in token_vocab:
        return None
    sentences = next(iter(token_vocab[str(token_id)].values()))
    
    # Setup hook position and test scales
    hook_pos = get_hook_position(f'{layer_index}-{layer_type}')
    scales = [0, 0.1, 0.2, 0.5, 0.75, 1, 1.5, 2, 2.5, 3, 4, 5, 6, 8, 10, 12, 15, 18, 20]
    
    # Initialize trackers for maximum changes
    max_abs_drop = {
        'value': 0,
        'scale': 0,
        'sentence': "",
        'original_sim': 0,
        'attacked_sim': 0
    }
    max_percent_drop = {
        'value': 0,
        'percent': 0,
        'scale': 0,
        'sentence': "",
        'original_sim': 0,
        'attacked_sim': 0
    }
    max_abs_increase = {
        'value': 0,
        'scale': 0,
        'sentence': "",
        'original_sim': 0,
        'attacked_sim': 0
    }
    max_percent_increase = {
        'value': 0,
        'percent': 0,
        'scale': 0,
        'sentence': "",
        'original_sim': 0,
        'attacked_sim': 0
    }
    
    # Test each sentence
    for sentence in sentences:
        # Get baseline prediction similarities
        raw_top10_tokens, raw_top10_probs = run_with_raw_model(model, sentence)
        raw_sim = get_overall_similarity(
            token_embed_mat,
            raw_top10_tokens,
            token_ids,
            raw_top10_probs
        )
        
        # Test each scale value
        for scale in scales:
            # Get prediction similarities under attack
            attacked_top10_tokens, attacked_top10_probs = run_with_hooked_model(
                model, sentence, hook_pos, neuron_id, scale)
            attacked_sim = get_overall_similarity(
                token_embed_mat,
                attacked_top10_tokens,
                token_ids,
                attacked_top10_probs
            )
            
            # Calculate changes
            sim_change = attacked_sim - raw_sim
            if raw_sim > 0:
                change_percent = (sim_change / raw_sim) * 100
            else:
                change_percent = 0
            
            # Update maximum absolute drop
            if sim_change < 0 and abs(sim_change) > abs(max_abs_drop['value']):
                max_abs_drop.update({
                    'value': float(sim_change),
                    'scale': scale,
                    'sentence': str(sentence),
                    'original_sim': float(raw_sim),
                    'attacked_sim': float(attacked_sim)
                })
            
            # Update maximum percentage drop
            if sim_change < 0 and abs(change_percent) > abs(max_percent_drop['percent']):
                max_percent_drop.update({
                    'value': float(sim_change),
                    'percent': float(change_percent),
                    'scale': scale,
                    'sentence': str(sentence),
                    'original_sim': float(raw_sim),
                    'attacked_sim': float(attacked_sim)
                })
            
            # Update maximum absolute increase
            if sim_change > 0 and sim_change > max_abs_increase['value']:
                max_abs_increase.update({
                    'value': float(sim_change),
                    'scale': scale,
                    'sentence': str(sentence),
                    'original_sim': float(raw_sim),
                    'attacked_sim': float(attacked_sim)
                })
            
            # Update maximum percentage increase
            if sim_change > 0 and change_percent > max_percent_increase['percent']:
                max_percent_increase.update({
                    'value': float(sim_change),
                    'percent': float(change_percent),
                    'scale': scale,
                    'sentence': str(sentence),
                    'original_sim': float(raw_sim),
                    'attacked_sim': float(attacked_sim)
                })
    
    # Return all results
    return {
        'feature_id': feature_id,
        'max_token': max_token,
        'max_abs_drop': max_abs_drop,
        'max_percent_drop': max_percent_drop,
        'max_abs_increase': max_abs_increase,
        'max_percent_increase': max_percent_increase
    }

## Pythia Test

In [3]:
with open('./corpus/pythia_vocabulary_sentences.json', 'r') as f:
    pythia_vocab = json.load(f)

In [4]:
pythia = HookedTransformer.from_pretrained(
    model_name='EleutherAI/pythia-70m-deduped',
    device='cpu',
    local_files_only=True
)
pythia.eval()
print()

The `GPTNeoXSdpaAttention` class is deprecated in favor of simply modifying the `config._attn_implementation`attribute of the `GPTNeoXAttention` class! It will be removed in v4.48


Loaded pretrained model EleutherAI/pythia-70m-deduped into HookedTransformer



In [5]:
pythia_test_results = defaultdict(lambda: defaultdict(dict))
pythia_vocab_mat = pythia.embed.W_E.detach()
pythia_semantic_clusters_path = './cluster_plot/pythia-exp'

for layer_type in tqdm(['att', 'mlp', 'res']):
    for layer_index in tqdm(range(6)):
        cluster_info_path = f'{layer_index}-{layer_type}-clusters.json'
        with open(os.path.join(pythia_semantic_clusters_path, cluster_info_path), 'r') as f:
            cluster_info = json.load(f)

        features_data = get_sae_features_by_layer('pythia', layer_type, layer_index)

        all_alignment_indices = []
        neuron_aligned_clusters = defaultdict(set)

        for cluster_id, cluster_data in cluster_info.items():
            feature_ids = cluster_data['feature_ids']
            
            for feature_id in feature_ids:
                feature_data = features_data[feature_id]
                
                for ind, val in zip(
                    feature_data['neuron_alignment_indices'],
                    feature_data['neuron_alignment_values']
                ):
                    if val > 0.2:
                        all_alignment_indices.append(ind)
                        neuron_aligned_clusters[ind].add(cluster_id)

        neuron_poly_degree = Counter(all_alignment_indices)
        neuron_poly_degree = dict(sorted(neuron_poly_degree.items(), key=lambda x: x[1], reverse=True))

        test_neurons = get_test_neurons(neuron_poly_degree, neuron_aligned_clusters)
        
        for degree_range, neuron_info in test_neurons.items():
            neuron_id = neuron_info['neuron_id']
            connected_clusters = list(neuron_aligned_clusters[neuron_id])
            
            if len(connected_clusters) > 3:
                test_clusters = random.sample(connected_clusters, 3)
            else:
                test_clusters = connected_clusters
            
            test_features = get_test_features(cluster_info, features_data, test_clusters)
            
            attack_results = []
            for feature_id in test_features:
                result = test_neuron_attack(
                    pythia, pythia_vocab,
                    pythia_vocab_mat,
                    features_data,
                    layer_type, layer_index,
                    neuron_id, feature_id
                )
                if result is not None:
                    attack_results.append(result)
            
            pythia_test_results[f"{layer_type}_{layer_index}"][degree_range] = {
                'neuron_id': neuron_id,
                'cluster_count': neuron_info['cluster_count'],
                'attack_results': attack_results
            }

with open('./pythia_neuron_attack_results.json', 'w') as f:
    json.dump(pythia_test_results, f, indent=2)

  0%|          | 0/3 [00:00<?, ?it/s]

  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/6 [00:00<?, ?it/s]

  0%|          | 0/6 [00:00<?, ?it/s]

## GPT2 Test

In [6]:
gpt2 = HookedTransformer.from_pretrained(
    model_name='gpt2-small',
    device='cpu',
    local_files_only=True
)
gpt2.eval()

with open('./corpus/gpt2_vocabulary_sentences.json', 'r') as f:
    gpt2_vocab = json.load(f)

Loaded pretrained model gpt2-small into HookedTransformer


In [7]:
gpt2_test_results = defaultdict(lambda: defaultdict(dict))
gpt2_vocab_mat = gpt2.embed.W_E.detach()
gpt2_semantic_clusters_path = './cluster_plot/gpt2-exp'

for layer_type in tqdm(['att', 'res_mid', 'mlp', 'res_post']):
    for layer_index in tqdm(range(12)):
        
        cluster_info_path = f'{layer_index}-{layer_type}-clusters.json'
        with open(os.path.join(gpt2_semantic_clusters_path, cluster_info_path), 'r') as f:
            cluster_info = json.load(f)

        features_data = get_sae_features_by_layer('gpt2', layer_type, layer_index)

        all_alignment_indices = []
        neuron_aligned_clusters = defaultdict(set)

        for cluster_id, cluster_data in cluster_info.items():
            feature_ids = cluster_data['feature_ids']
            
            for feature_id in feature_ids:
                feature_data = features_data[feature_id]
                
                for ind, val in zip(
                    feature_data['neuron_alignment_indices'],
                    feature_data['neuron_alignment_values']
                ):
                    if val > 0.2:
                        all_alignment_indices.append(ind)
                        neuron_aligned_clusters[ind].add(cluster_id)

        neuron_poly_degree = Counter(all_alignment_indices)
        neuron_poly_degree = dict(sorted(neuron_poly_degree.items(), key=lambda x: x[1], reverse=True))

        test_neurons = get_test_neurons(neuron_poly_degree, neuron_aligned_clusters)
        
        for degree_range, neuron_info in test_neurons.items():
            neuron_id = neuron_info['neuron_id']
            connected_clusters = list(neuron_aligned_clusters[neuron_id])
            
            if len(connected_clusters) > 3:
                test_clusters = random.sample(connected_clusters, 3)
            else:
                test_clusters = connected_clusters
            
            test_features = get_test_features(cluster_info, features_data, test_clusters)
            
            attack_results = []
            for feature_id in test_features:
                result = test_neuron_attack(
                    gpt2, gpt2_vocab,
                    gpt2_vocab_mat,
                    features_data,
                    layer_type, layer_index,
                    neuron_id, feature_id
                )
                if result is not None:
                    attack_results.append(result)
            
            gpt2_test_results[f"{layer_type}_{layer_index}"][degree_range] = {
                'neuron_id': neuron_id,
                'cluster_count': neuron_info['cluster_count'],
                'attack_results': attack_results
            }

with open('./gpt2_neuron_attack_results.json', 'w') as f:
    json.dump(gpt2_test_results, f, indent=2)

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/12 [00:00<?, ?it/s]

  0%|          | 0/12 [00:00<?, ?it/s]

  0%|          | 0/12 [00:00<?, ?it/s]

  0%|          | 0/12 [00:00<?, ?it/s]