In [1]:
import numpy as np
from tqdm import tqdm

# Load the .npz file
loaded = np.load('features_at_layer.npz')

# Reconstruct the features_at_layer dictionary
features_at_layer = {}
for key in loaded:
    layer, feature_type = key.rsplit('_', 1)
    if layer not in features_at_layer:
        features_at_layer[layer] = {}
    features_at_layer[layer][feature_type] = loaded[key]
# feature type: 'attn', 'mlp'
print("Features loaded successfully from 'features_at_layer.npz'.")

Features loaded successfully from 'features_at_layer.npz'.


In [2]:
from transformers import AutoModelForCausalLM, AutoTokenizer, pipeline
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA

In [3]:
truthfulqa_predictions = pd.read_csv("truthfulqa_predictions.csv")
truthfulqa_no_predictions = truthfulqa_predictions[truthfulqa_predictions.extracted_answer == -1]
truthfulqa_predictions = truthfulqa_predictions[truthfulqa_predictions.extracted_answer != -1]

# only load the incorrect sampels
truthfulqa_incorrect_predictions = truthfulqa_predictions[truthfulqa_predictions.reference != truthfulqa_predictions.extracted_answer].reset_index(drop=True)
truthfulqa_correct_predictions = truthfulqa_predictions[truthfulqa_predictions.reference == truthfulqa_predictions.extracted_answer].reset_index(drop=True)


# Limit to 32 samples each for speed
n_samples = 100
incorrect_samples = truthfulqa_incorrect_predictions.sample(n=n_samples, random_state=0)
correct_samples = truthfulqa_correct_predictions.sample(n=n_samples, random_state=0)
no_samples = truthfulqa_no_predictions.sample(n=n_samples, random_state=0)
# Combine into one DataFrame
combined_samples = pd.concat([incorrect_samples, correct_samples, no_samples], ignore_index=True)
combined_samples['label'] = ['Incorrect'] * n_samples + ['Correct'] * n_samples + ['No Answer'] * n_samples

chat_template = "<|user|>\n{instruction}</s>\n<|assistant|>\n"

combined_samples['formatted_prompt'] = combined_samples['prompt'].apply(
    lambda x: chat_template.format(instruction=x)
)



In [4]:
# Assuming features_at_layer is a dictionary: {layer_idx: features_list}
mean_features_per_label = {}  # {layer_idx: {label: {'attn_features': np.array, 'mlp_features': np.array}}}

labels = combined_samples['label'].values  # Array of labels corresponding to the features

for layer_idx, features_list in features_at_layer.items():
    # Initialize a dictionary to collect features per label
    attn_features_per_label = {'Correct': [], 'Incorrect': [], 'No Answer': []}
    mlp_features_per_label = {'Correct': [], 'Incorrect': [], 'No Answer': []}
    
    # for feature_type in features_list: # ['attn', 'mlp']
    for i, feature_dict in enumerate(features_list['attn']):
        label = labels[i]
        attn_features_per_label[label].append(feature_dict)
    for i, feature_dict in enumerate(features_list['mlp']):
        label = labels[i]
        mlp_features_per_label[label].append(feature_dict)
        
    mean_features = {}
    for label in attn_features_per_label:
        mean_features[label] = {
            'attn_features': np.mean(attn_features_per_label[label], axis=0),  # Average across samples
            'mlp_features': np.mean(mlp_features_per_label[label], axis=0)  # Average across samples
        }
    mean_features_per_label[layer_idx] = mean_features

In [5]:
direction_vectors_per_layer = {}  # {layer_idx: {(label_A, label_B): {'attn_features': np.array, 'mlp_features': np.array}}}

label_pairs = [('Incorrect', 'Correct'), ('Incorrect', 'No Answer'), ('Correct', 'No Answer')] + \
                [('Correct', 'Incorrect'), ('No Answer', 'Incorrect'), ('No Answer', 'Correct')] 

for layer_idx, mean_features in mean_features_per_label.items():
    direction_vectors = {}
    for (label_A, label_B) in label_pairs:
        # Ensure both labels have mean features computed
        if label_A in mean_features and label_B in mean_features:
            dir_vector_attn = mean_features[label_B]['attn_features'] - mean_features[label_A]['attn_features']
            dir_vector_mlp = mean_features[label_B]['mlp_features'] - mean_features[label_A]['mlp_features']
            direction_vectors[(label_A, label_B)] = {
                'attn_features': dir_vector_attn,
                'mlp_features': dir_vector_mlp
            }
            # dir = v_b - v_a
            # so v_a + dir = v_b
    direction_vectors_per_layer[layer_idx] = direction_vectors

In [6]:
def generate_responses_with_activation_modification(
    model, tokenizer, texts, layer_idxs, module_type, mlp_direction_vectors=None, attn_direction_vectors=None, device='cuda', max_new_tokens=50
):
    model.eval()
    generated_sequences = []

    model.to(device)

    handles = []

    for layer_idx in layer_idxs:
        layer = model.model.layers[layer_idx]

        if mlp_direction_vectors is not None and layer_idx in mlp_direction_vectors:
            dir_vector = mlp_direction_vectors[layer_idx]
            if layer_idx == layer_idxs[0]:
                print(f"Apply MLP dir_vector at layer {layer_idx} with shape: {dir_vector.shape}")

            def create_hook_fn_mlp(dir_vector):
                def hook_fn_mlp(module, input, output):
                    # output should be a tensor
                    output_tensor = output
                    modified_output = output_tensor + dir_vector.unsqueeze(0).unsqueeze(0)
                    return modified_output
                return hook_fn_mlp

            hook_fn_mlp = create_hook_fn_mlp(dir_vector)
            if module_type == 'mlp' or module_type == 'both':
                handle = layer.mlp.register_forward_hook(hook_fn_mlp)
                handles.append(handle)

        if attn_direction_vectors is not None and layer_idx in attn_direction_vectors:
            dir_vector = attn_direction_vectors[layer_idx]
            if layer_idx == layer_idxs[0]:
                print(f"Apply Attention dir_vector at layer {layer_idx} with shape: {dir_vector.shape}")

            def create_hook_fn_attn(dir_vector):
                def hook_fn_attn(module, input, output):
                    # output may be a tuple
                    if isinstance(output, tuple):
                        output_tensor = output[0]
                        modified_output = output_tensor + dir_vector.unsqueeze(0).unsqueeze(0)
                        # Return modified output in place of output[0]
                        return (modified_output,) + output[1:]
                    else:
                        output_tensor = output
                        modified_output = output_tensor + dir_vector.unsqueeze(0).unsqueeze(0)
                        return modified_output
                return hook_fn_attn

            hook_fn_attn = create_hook_fn_attn(dir_vector)
            if module_type == 'attn' or module_type == 'both':
                handle = layer.self_attn.register_forward_hook(hook_fn_attn)
                handles.append(handle)
        
    with torch.no_grad():
        for text in tqdm(texts):
            inputs = tokenizer(
                text,
                return_tensors="pt",
                truncation=True,
                max_length=512
            ).to(device)

            # Generate sequences
            output_ids = model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=False,
                temperature=0,
                eos_token_id=tokenizer.eos_token_id,
                pad_token_id=tokenizer.eos_token_id
            )

            # Decode the generated tokens to text
            generated_text = tokenizer.decode(
                output_ids[0],
                skip_special_tokens=True
            )
            generated_sequences.append(generated_text)

    # Remove the hooks if they were registered
    for handle in handles:
        handle.remove()
    del handles
    return generated_sequences

In [7]:
combined_samples.label.unique() # array(['Incorrect', 'Correct', 'No Answer'], dtype=object)

array(['Incorrect', 'Correct', 'No Answer'], dtype=object)

In [8]:
model_id = "meta-llama/Llama-3.2-3B-Instruct"

tokenizer = AutoTokenizer.from_pretrained(model_id)

model = AutoModelForCausalLM.from_pretrained(
    model_id,
    torch_dtype=torch.bfloat16,  # Adjust based on your hardware
    device_map="auto",
)

Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [9]:
# # Baseline responses without any modifications
# labels = ['Incorrect', 'Correct', 'No Answer']

# correct_samples = combined_samples[combined_samples['label'] == 'Correct']
# texts = correct_samples['formatted_prompt'].tolist()[:10]

# baseline_responses = generate_responses_with_activation_modification(
#     model,
#     tokenizer,
#     texts,
#     layer_idxs=[],  # No layers to modify
#     module_type=None,
#     mlp_direction_vectors=None,
#     attn_direction_vectors=None,
#     device='cuda',
#     max_new_tokens=50
# )


In [10]:
import re

def extract_answer(response):
    """
    Extracts the answer from the model's response using a regular expression.
    Adjust the regex pattern based on the expected format of the response.
    """
    # Example regex patterns (adjust these based on your data)
    # Pattern to match "Answer: Yes/No"
    match = re.search(r"correct choice is ([0-9])", response)
    if match:
        return match.group(1).capitalize()
    return None
# Function to determine if an answer is 'Correct' or 'Incorrect'
def determine_label(answer, response):
    # Implement logic to map answers to labels
    # For example, if 'Yes' is considered 'Correct'
    # consider 'response' after <|assistant|>
    response_after = response.split("<|assistant|>")[-1].strip()
    if 'Yes' not in response_after and 'Sorry' not in response_after:
        return 'Miss'
    if answer is None:
        return 'No Answer'
    if answer.isdigit():
        if answer == '1':
            return 'Correct'
        else:
            return 'Incorrect'
    else:
        return 'No Answer'
# Example string output from ChatGPT
output = """
Yes, I know and the correct choice is 1.
*Reasons*: Nauru is indeed the world's smallest independent republic, with an area of
"""
print(extract_answer(output))
print(determine_label(extract_answer(output),output))

1
Correct


In [11]:
# Generate baseline responses for all labels
labels = ['Incorrect', 'Correct', 'No Answer']
n_samples_per_label = 100  # Adjust as needed

baseline_responses = {}
baseline_labels = {}

for label in labels:
    samples = combined_samples[combined_samples['label'] == label].sample(n=n_samples_per_label, random_state=42)
    prompts = samples['formatted_prompt'].tolist()
    responses = generate_responses_with_activation_modification(
        model=model,
        tokenizer=tokenizer,
        texts=prompts,
        layer_idxs=[],  # No modifications
        module_type=None,
        mlp_direction_vectors=None,
        attn_direction_vectors=None,
        device='cuda',
        max_new_tokens=50
    )
    extracted_answers = [extract_answer(resp) for resp in responses]
    mapped_labels = [determine_label(answer, resp) for answer, resp in zip(extracted_answers, responses)]
    baseline_responses[label] = responses
    baseline_labels[label] = mapped_labels
    print(f"Generated baseline responses for label '{label}'.")

  attn_output = torch.nn.functional.scaled_dot_product_attention(
Starting from v4.46, the `logits` model output will have the same type as the model (except at train time, where it will always be FP32)
100%|██████████| 100/100 [03:39<00:00,  2.20s/it]


Generated baseline responses for label 'Incorrect'.


100%|██████████| 100/100 [03:43<00:00,  2.23s/it]


Generated baseline responses for label 'Correct'.


100%|██████████| 100/100 [03:25<00:00,  2.06s/it]

Generated baseline responses for label 'No Answer'.





In [12]:
# %%
# Define all possible label pairs excluding same labels
label_pairs = [
    ('Incorrect', 'Correct'),
    ('Incorrect', 'No Answer'),
    ('Correct', 'Incorrect'),
    ('Correct', 'No Answer'),
    ('No Answer', 'Incorrect'),
    ('No Answer', 'Correct')
]

# Initialize dictionaries to store modified responses and labels
modified_responses = {pair: [] for pair in label_pairs}
modified_labels = {pair: [] for pair in label_pairs}

# Specify the layers you want to modify
layer_idxs = [16, 17, 18, 19, 20, 21, 22, ]

# Iterate over each label pair and generate modified responses
for pair in label_pairs:
    label_A, label_B = pair
    print(f"\nApplying direction vector from '{label_A}' to '{label_B}'...")
    
    # Prepare direction vectors
    mlp_direction_vectors = {}
    attn_direction_vectors = {}
    
    for layer_idx in layer_idxs:
        layer_idx_str = str(layer_idx)
        try:
            mlp_vector = direction_vectors_per_layer[layer_idx_str][pair]['mlp_features']
            attn_vector = direction_vectors_per_layer[layer_idx_str][pair]['attn_features']
            mlp_direction_vectors[layer_idx] = torch.from_numpy(mlp_vector).cuda().to(torch.bfloat16)
            attn_direction_vectors[layer_idx] = torch.from_numpy(attn_vector).cuda().to(torch.bfloat16)
        except KeyError:
            print(f"Direction vectors for pair {pair} not found in layer {layer_idx}. Skipping this layer.")
            continue
    
    # Select samples with label_A
    samples = combined_samples[combined_samples['label'] == label_A].sample(n=n_samples_per_label, random_state=42)
    prompts = samples['formatted_prompt'].tolist()
    
    # Generate modified responses
    responses = generate_responses_with_activation_modification(
        model=model,
        tokenizer=tokenizer,
        texts=prompts,
        layer_idxs=layer_idxs,
        module_type='both',
        mlp_direction_vectors=mlp_direction_vectors,
        attn_direction_vectors=attn_direction_vectors,
        device='cuda',
        max_new_tokens=50
    )
    
    # Extract and map labels
    extracted_answers = [extract_answer(resp) for resp in responses]
    mapped_labels = [determine_label(answer, resp) for answer, resp in zip(extracted_answers, responses)]
    
    # Store the results
    modified_responses[pair] = responses
    modified_labels[pair] = mapped_labels
    print(f"Generated modified responses for pair {pair}.")


Applying direction vector from 'Incorrect' to 'Correct'...
Apply MLP dir_vector at layer 16 with shape: torch.Size([3072])
Apply Attention dir_vector at layer 16 with shape: torch.Size([3072])


100%|██████████| 100/100 [03:30<00:00,  2.10s/it]


Generated modified responses for pair ('Incorrect', 'Correct').

Applying direction vector from 'Incorrect' to 'No Answer'...
Apply MLP dir_vector at layer 16 with shape: torch.Size([3072])
Apply Attention dir_vector at layer 16 with shape: torch.Size([3072])


100%|██████████| 100/100 [03:30<00:00,  2.11s/it]


Generated modified responses for pair ('Incorrect', 'No Answer').

Applying direction vector from 'Correct' to 'Incorrect'...
Apply MLP dir_vector at layer 16 with shape: torch.Size([3072])
Apply Attention dir_vector at layer 16 with shape: torch.Size([3072])


100%|██████████| 100/100 [03:30<00:00,  2.11s/it]


Generated modified responses for pair ('Correct', 'Incorrect').

Applying direction vector from 'Correct' to 'No Answer'...
Apply MLP dir_vector at layer 16 with shape: torch.Size([3072])
Apply Attention dir_vector at layer 16 with shape: torch.Size([3072])


100%|██████████| 100/100 [03:30<00:00,  2.11s/it]


Generated modified responses for pair ('Correct', 'No Answer').

Applying direction vector from 'No Answer' to 'Incorrect'...
Apply MLP dir_vector at layer 16 with shape: torch.Size([3072])
Apply Attention dir_vector at layer 16 with shape: torch.Size([3072])


100%|██████████| 100/100 [03:30<00:00,  2.11s/it]


Generated modified responses for pair ('No Answer', 'Incorrect').

Applying direction vector from 'No Answer' to 'Correct'...
Apply MLP dir_vector at layer 16 with shape: torch.Size([3072])
Apply Attention dir_vector at layer 16 with shape: torch.Size([3072])


100%|██████████| 100/100 [03:30<00:00,  2.11s/it]

Generated modified responses for pair ('No Answer', 'Correct').





In [13]:
set(mapped_labels)

{'Miss', 'No Answer'}

In [14]:
# %%
# Initialize a dictionary to store shift rates
shift_rates = {}

# Compute shift rates for each label pair

for label_A in baseline_labels:
    baseline_predictions = baseline_labels[label_A]
    for pair in label_pairs:
        if pair[0] != label_A:
            continue
        label_B = pair[1]
        modified_predictions = modified_labels[pair]
        no_change = 0
        invalid = 0
        shift = 0
        other_shift = 0
        for i in range(len(modified_predictions)):
            if modified_predictions[i] == 'Miss':
                invalid += 1
            elif modified_predictions[i] == label_A:
                no_change += 1
            elif modified_predictions[i] == label_B:
                shift += 1
            else:
                other_shift += 1
        shift_rates[pair] = {
            'No Change': no_change,
            'Shift': shift,
            'Other Shift': other_shift,
            'Invalid': invalid,
            "Success Rate": shift / (shift + no_change + other_shift),
        }
        for other_shift_name in baseline_labels:
            if other_shift_name != label_A and other_shift_name != label_B:
                break
        print(f"From '{label_A}' to '{label_B}':")
        print(f"    {label_B}: {shift} out of {n_samples_per_label} samples.")
        print(f"    No change: {no_change} out of {n_samples_per_label} samples.")
        print(f"    {other_shift_name}: {other_shift} out of {n_samples_per_label} samples.")
        print(f"    Invalid: {invalid} out of {n_samples_per_label} samples.")
        print(f"    Success Rate: {shift_rates[pair]['Success Rate']:.2f}")
        


From 'Incorrect' to 'Correct':
    Correct: 2 out of 100 samples.
    No change: 77 out of 100 samples.
    No Answer: 20 out of 100 samples.
    Invalid: 1 out of 100 samples.
    Success Rate: 0.02
From 'Incorrect' to 'No Answer':
    No Answer: 26 out of 100 samples.
    No change: 67 out of 100 samples.
    Correct: 5 out of 100 samples.
    Invalid: 2 out of 100 samples.
    Success Rate: 0.27
From 'Correct' to 'Incorrect':
    Incorrect: 6 out of 100 samples.
    No change: 72 out of 100 samples.
    No Answer: 22 out of 100 samples.
    Invalid: 0 out of 100 samples.
    Success Rate: 0.06
From 'Correct' to 'No Answer':
    No Answer: 7 out of 100 samples.
    No change: 90 out of 100 samples.
    Incorrect: 3 out of 100 samples.
    Invalid: 0 out of 100 samples.
    Success Rate: 0.07
From 'No Answer' to 'Incorrect':
    Incorrect: 0 out of 100 samples.
    No change: 97 out of 100 samples.
    Correct: 0 out of 100 samples.
    Invalid: 3 out of 100 samples.
    Success Rate:

In [15]:
from prettytable import PrettyTable
# Define labels for the table
table_labels = ['Incorrect', 'Correct', 'I don\'t know']

# Initialize a PrettyTable
shift_table = PrettyTable()
shift_table.field_names = ["Success Rate From \ To"] + table_labels

# Populate the table
for from_label in table_labels:
    row = [from_label]
    if from_label == 'I don\'t know':
        from_label = 'No Answer'
    
    for to_label in table_labels:
        if to_label == 'I don\'t know':
            to_label = 'No Answer'
        if from_label == to_label:
            row.append("NaN")
        else:
            pair = (from_label, to_label)
            if pair in shift_rates:
                success_rate = shift_rates[pair]["Success Rate"]
                row.append(f"{success_rate*100:.0f}%")
            else:
                row.append("NaN")
    shift_table.add_row(row)

print(shift_table)

+------------------------+-----------+---------+--------------+
| Success Rate From \ To | Incorrect | Correct | I don't know |
+------------------------+-----------+---------+--------------+
|       Incorrect        |    NaN    |    2%   |     27%      |
|        Correct         |     6%    |   NaN   |      7%      |
|      I don't know      |     0%    |    0%   |     NaN      |
+------------------------+-----------+---------+--------------+


  shift_table.field_names = ["Success Rate From \ To"] + table_labels


In [18]:
naive_cnt = 0
for response in modified_responses[('Incorrect', 'No Answer')]:
    # print ans start after <|assistant|>
    
    if 'Yes' not in response.split("<|assistant|>")[1].strip():
        naive_cnt += 1
        print(response.split("<|assistant|>")[1].strip())
        print("----")
    # else:
        # print(response.split("<|assistant|>")[1].strip())
        # print("----")
print(naive_cnt)

```
```
```
```
```

```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
----
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
```
----
2


In [17]:
model.model.layers

ModuleList(
  (0-27): 28 x LlamaDecoderLayer(
    (self_attn): LlamaSdpaAttention(
      (q_proj): Linear(in_features=3072, out_features=3072, bias=False)
      (k_proj): Linear(in_features=3072, out_features=1024, bias=False)
      (v_proj): Linear(in_features=3072, out_features=1024, bias=False)
      (o_proj): Linear(in_features=3072, out_features=3072, bias=False)
      (rotary_emb): LlamaRotaryEmbedding()
    )
    (mlp): LlamaMLP(
      (gate_proj): Linear(in_features=3072, out_features=8192, bias=False)
      (up_proj): Linear(in_features=3072, out_features=8192, bias=False)
      (down_proj): Linear(in_features=8192, out_features=3072, bias=False)
      (act_fn): SiLU()
    )
    (input_layernorm): LlamaRMSNorm((3072,), eps=1e-05)
    (post_attention_layernorm): LlamaRMSNorm((3072,), eps=1e-05)
  )
)