In [17]:
%load_ext autoreload
%autoreload 2

In [19]:
from transformers import AutoTokenizer, AutoModelForCausalLM
import torch
import os
from dotenv import load_dotenv
import torch.nn.functional as F
from IPython.display import display, HTML
import matplotlib.pyplot as plt
import matplotlib.colors as mcolors
import numpy as np
from transformers.models.llama import LlamaForCausalLM
from transformers import BitsAndBytesConfig
import html
import json
import random
from model_wrapper import ModelWrapper
from utils import format_conversation, get_per_token_nlls

In [5]:
load_dotenv()

HUGGINGFACE_TOKEN = os.getenv("HF_TOKEN")

In [6]:
MODEL_LLAMA_3_CHAT = "meta-llama/Meta-Llama-3.1-8B-Instruct"

In [7]:
tokenizer = AutoTokenizer.from_pretrained(
    MODEL_LLAMA_3_CHAT, token=HUGGINGFACE_TOKEN
)

In [8]:
tokenizer.pad_token_id = tokenizer.eos_token_id

In [None]:
model = ModelWrapper()

In [10]:
messages = [
    {"role": "system", "content": "You are an AI assistant."},
    {"role": "user", "content": "What is the capital of France?"},
    {"role": "assistant", "content": "Paris."},
    {"role": "user", "content": "What do people like to eat there?"},
    {"role": "assistant", "content": "People in Paris like to eat croissants and baguettes."},
]

In [12]:
messages_formatted = format_conversation(messages, tokenizer)

In [None]:
print(messages_formatted)

In [14]:
tokens = tokenizer.encode(messages_formatted, return_tensors="pt")

In [None]:
print(tokenizer.decode(tokens[0]))

In [None]:
per_token_nlls = model.get_per_token_nlls(tokens)

In [103]:
def visualize_token_nlls(tokens, nlls, tokenizer, cmap='RdYlBu_r'):
    cmap = plt.get_cmap(cmap)
    # Normalize NLL values to [0,1] for color mapping
    vmin, vmax = np.percentile(nlls, [5, 95])  # Use percentiles to handle outliers
    norm = mcolors.Normalize(vmin=vmin, vmax=vmax)
    # Generate HTML for the visualization
    html_parts = []
    # Add legend
    html_parts.append("""
    <div style='margin-bottom: 10px'>
        <span style='font-size: 0.9em'>NLL Range: </span>
        <span style='padding: 2px 8px; background-color: {}; color: white'>Low {:.2f}</span>
        <span style='padding: 2px 8px; background-color: {}'>Med {:.2f}</span>
        <span style='padding: 2px 8px; background-color: {}; color: white'>High {:.2f}</span>
    </div>
    """.format(
        mcolors.rgb2hex(cmap(0.0)),
        vmin,
        mcolors.rgb2hex(cmap(0.5)),
        (vmax + vmin) / 2,
        mcolors.rgb2hex(cmap(1.0)),
        vmax
    ))
    # Add first token (which has no NLL)
    html_parts.append(f"<span style='padding: 2px 4px; margin: 0 1px; border: 1px dashed #ccc'>{tokens[0]}</span>")
    # Add remaining tokens with color coding
    for token, nll in zip(tokens[1:], nlls, strict=True):
        # Get color for this NLL value
        color = mcolors.rgb2hex(cmap(norm(nll)))
        # Determine text color (white for dark backgrounds)
        rgb = mcolors.hex2color(color)
        brightness = 0.299 * rgb[0] + 0.587 * rgb[1] + 0.114 * rgb[2]
        text_color = 'white' if brightness < 0.5 else 'black'
        # Add token with styling
        detok = tokenizer.decode(token.unsqueeze(0))
        html_parts.append(
            f"<span style='padding: 2px 4px; margin: 2px; display: inline-block; background-color: {color}; "
            f"color: {text_color}' title='NLL: {nll:.3f}'>{html.escape(detok)}</span>"
        )
    # Display the HTML
    display(HTML(''.join(html_parts)))

In [None]:
visualize_token_nlls(tokens[0].cpu(), per_token_nlls[0].cpu().float(), tokenizer)

In [105]:
# [tokenizer.decode(t) for t in tokens[0].cpu()]

In [106]:
def get_role_nlls(model, tokens, tokenizer):
    nlls = get_per_token_nlls(model, tokens)[0].cpu().float()
    tokens = tokens[0].cpu()[1:]  # remove SOS token which doesn't have an associated NLL
    token_strings = [tokenizer.decode(t.item()) for t in tokens]
    
    results = []
    current_role = None
    current_nll_sum = 0
    current_token_count = 0
    role_started = False
    string = ""
    
    i = 0
    for i, (detok, nll) in enumerate(zip(token_strings, nlls, strict=True)):
        if detok == "<|start_header_id|>":
            if current_role is not None:
                results.append({
                    "role": current_role,
                    "nll": current_nll_sum.item(),
                    "token_count": current_token_count,
                    "string": string
                })
            current_role = token_strings[i + 1]
            current_nll_sum = 0
            current_token_count = 0
            role_started = False
            string = ""
        elif detok == "<|end_header_id|>":
            role_started = True
        elif detok[:2] == "<|":
            # ignore other special tokens
            continue
        elif set(t for t in detok) == {"\n"}:
            # ignore newline tokens
            continue
        elif role_started:
            current_nll_sum += nll
            current_token_count += 1
            string += detok
    if current_role is not None:
        results.append({
            "role": current_role,
            "nll": current_nll_sum.item(),
            "token_count": current_token_count,
            "string": string
        })
    return results

In [None]:
role_nlls = get_role_nlls(model, tokens, tokenizer)
role_nlls

In [None]:
model

In [109]:
embedding_matrix = model.model.embed_tokens.weight

In [None]:
embedding_matrix.shape

In [111]:
assistant_id = tokenizer.encode("assistant")[-1]
user_id = tokenizer.encode("user")[-1]

In [112]:
assistant_embedding = embedding_matrix[assistant_id]
user_embedding = embedding_matrix[user_id]

In [None]:
tokenizer.decode(128006), tokenizer.decode(128007), tokenizer.decode(128009)

In [224]:
def get_mask(tokens, target_role_token, start_header_token = 128006, end_header_token = 128007, special_zone = 128000, apply_in_generate_mode = False):
    assert len(tokens.shape) == 2
    batch_size, seq_len = tokens.shape
    if apply_in_generate_mode and seq_len == 1:
        return torch.ones_like(tokens)
    mask = torch.zeros_like(tokens)
    for i in range(batch_size):
        seq = tokens[i]
        role_started = False
        for j, t in enumerate(seq):
            if t == start_header_token:
                role_started = False
                role_token = seq[j + 1]
            elif t == end_header_token and role_token == target_role_token:
                role_started = True
                mask[i, j-2] = 1
                mask[i, j-1] = 1
                mask[i, j] = 1
            elif role_started:
                mask[i, j] = 1
    return mask    

In [225]:
assistant_mask_function = lambda tokens: get_mask(tokens, assistant_id, apply_in_generate_mode=True)
user_mask_function = lambda tokens: get_mask(tokens, user_id, apply_in_generate_mode=False)

In [229]:
m = get_mask(tokens, assistant_id)

In [230]:
def viz_mask(tokens, mask, tokenizer):
    tokens = tokens[0].cpu()
    mask = mask[0].cpu()
    html_parts = []
    for token, m in zip(tokens, mask):
        detok = tokenizer.decode(token.unsqueeze(0))
        html_parts.append(
            f"<span style='padding: 2px 4px; margin: 2px; display: inline-block; background-color: {'#FF0000' if m else '#FFFFFF'}'>"
            f"{html.escape(detok)}"
            f"</span>"
        )
    display(HTML(''.join(html_parts)))

In [None]:
viz_mask(tokens, m, tokenizer)

In [None]:
m.shape

In [234]:
def modify_per_token_embeddings(model, mask_fn, vector: torch.Tensor):
    def hook_fn(module, input, output):
        mask = mask_fn(input[0]).unsqueeze(-1) # [batch_size, seq_len, 1]
        # print(mask.sum().item())
        expanded_vector = vector.unsqueeze(0).unsqueeze(0)  # [1, 1, embedding_dim]
        modified_output = output + (mask * expanded_vector)
        return modified_output
    handle = model.model.embed_tokens.register_forward_hook(hook_fn)
    return handle

In [269]:
def remove_all_hooks(module):
    """
    Removes all hooks from a PyTorch module.
    
    Args:
        module: PyTorch module to remove hooks from
    """
    # Clear forward hooks
    module._forward_hooks.clear()
    
    # Clear forward pre-hooks
    module._forward_pre_hooks.clear()
    
    # Clear backward hooks if they exist
    if hasattr(module, '_backward_hooks'):
        module._backward_hooks.clear()

remove_all_hooks(model.model.embed_tokens)

In [270]:
assistant_color_handle = modify_per_token_embeddings(model, assistant_mask_function, assistant_embedding*0.5)
user_color_handle = modify_per_token_embeddings(model, user_mask_function, user_embedding*0.5)

In [None]:
model.model.embed_tokens._forward_hooks

In [272]:
per_token_nlls_after = get_per_token_nlls(model, tokens)

In [None]:
visualize_token_nlls(tokens[0].cpu(), per_token_nlls[0].cpu().float(), tokenizer)

In [None]:
visualize_token_nlls(tokens[0].cpu(), per_token_nlls_after[0].cpu().float(), tokenizer)

In [275]:
test_messages = [
    {"role": "system", "content": "You are an AI assistant."},
    {"role": "user", "content": "What is the capital of France?"},
    {"role": "assistant", "content": "Paris."},
    {"role": "user", "content": "What do people like to eat there?"},
    {"role": "assistant", "content": "People in Paris like to eat croissants and baguettes."},
    {"role": "user", "content": "That's interesting. What about the weather? What is it like in the summer?"},
]
test_messages_formatted = tokenizer.apply_chat_template(
    test_messages, tokenize=False, add_generation_prompt=True
).replace("<|begin_of_text|>", "")
test_tokens = tokenizer.encode(test_messages_formatted, return_tensors="pt").to(model.device)

In [None]:
generation = model.generate(test_tokens, do_sample=True, max_length=200, attention_mask=torch.ones_like(test_tokens))

In [None]:
print(tokenizer.decode(generation[0]))

In [244]:
remove_all_hooks(model.model.embed_tokens)

In [245]:
from datasets import load_dataset

ds = load_dataset("HuggingFaceTB/everyday-conversations-llama3.1-2k")

In [None]:
ds

In [None]:
ds["train_sft"][3]

In [248]:
with open("datasets/refusal.json", "r") as f:
    harmful_data = json.load(f)
with open("datasets/mean_responses.jsonl", "r") as f:
    mean_data_raw = f.readlines()
    mean_data = [json.loads(d) for d in mean_data_raw]

In [None]:
len(harmful_data), len(mean_data)

In [None]:
mean_data[0]

In [251]:
from vars import assistant_fake_tags, user_fake_tags, format_functions, system_prompts

In [252]:
harmful_conversations = []

def make_dataset(n, key_q, key_a, source_data):
    result=[]
    for _ in range(n):
        shots = random.sample(source_data, 32)
        assistant_tags = random.choices(assistant_fake_tags, k=32)
        user_tags = random.choices(user_fake_tags, k=32)
        assistant_tags = [random.choice(format_functions)(tag) for tag in assistant_tags]
        user_tags = [random.choice(format_functions)(tag) for tag in user_tags]
        shot_data = {}
        for l in [1, 2, 4, 8, 12, 16, 24, 32]:
            prompt = ""
            for i, (msg, a_tag, u_tag) in enumerate(zip(shots[-l:], assistant_tags[-l:], user_tags[-l:])):
                if i == 0:
                    prompt += f"{msg[key_q]}\n\n{a_tag} {msg[key_a]}\n\n"
                elif i < l - 1:
                    prompt += f"{u_tag} {msg[key_q]}\n\n{a_tag} {msg[key_a]}\n\n"
                else:
                    prompt += f"{u_tag} {msg[key_q]}"
            conversation = [
                {"role": "system", "content": random.choice(system_prompts)},
                {"role": "user", "content": prompt.strip()},
                {"role": "assistant", "content": shots[-1][key_a]}
            ]
            formatted_conversation = tokenizer.apply_chat_template(
                conversation, tokenize=False, add_generation_prompt=False
            ).replace("<|begin_of_text|>", "")
            shot_data[l] = formatted_conversation
        result.append(shot_data)
    return result

In [253]:
harmful_msjs = make_dataset(200, "question", "answer_harmful", harmful_data)
mean_msjs = make_dataset(500, "question", "mean_answer", mean_data)

In [254]:
with open("datasets/harmful_msjs.json", "w") as f:
    json.dump(harmful_msjs, f)
with open("datasets/mean_msjs.json", "w") as f:
    json.dump(mean_msjs, f)

In [None]:
example = harmful_msjs[1][4]
example_tokens = tokenizer.encode(example, return_tensors="pt")[0]
mask = get_mask(example_tokens.unsqueeze(0), user_id)
viz_mask(example_tokens.unsqueeze(0), mask, tokenizer)

In [257]:
def get_msj_nlls(model, row):
    result = {}
    for l, msj in row.items():
        tokens = tokenizer.encode(msj, return_tensors="pt").to(model.device)
        nlls = get_role_nlls(model, tokens, tokenizer)
        result[l] = nlls
    return result

In [278]:
remove_all_hooks(model.model.embed_tokens)

In [259]:
results = [get_msj_nlls(model, h) for h in harmful_msjs[:50]]

In [279]:
assistant_color_handle = modify_per_token_embeddings(model, assistant_mask_function, assistant_embedding*0.75)
user_color_handle = modify_per_token_embeddings(model, user_mask_function, user_embedding*0.75)

In [280]:
modified_results = [get_msj_nlls(model, h) for h in harmful_msjs[:50]]

In [281]:
from collections import defaultdict

In [None]:
results[0][32]

In [283]:
all_res = defaultdict(list)
for r in results:
    for k, v in r.items():
        all_res[k].append(v[-1]['nll'])
means = {}
for k, v in all_res.items():
    means[k] = sum(v)/len(v)
data = list(means.items())

In [284]:
all_res = defaultdict(list)
for r in modified_results:
    for k, v in r.items():
        all_res[k].append(v[-1]['nll'])
means = {}
for k, v in all_res.items():
    means[k] = sum(v)/len(v)
data_2 = list(means.items())

In [285]:
def create_log_plot(data, figsize=(6, 3)):
    fig, ax = plt.subplots(figsize=figsize)
    if not (isinstance(data, list) and isinstance(data[0], list)):
        data = [data]
    for i, series in enumerate(data):
        shots, nlls = zip(*series)  # Unzip the tuples
        shots = np.array(shots)
        ax.plot(shots, nlls, 'o-', markersize=5, label=f'Series {i+1}')
    ax.set_xscale('log', base=2)
    ax.set_yscale('log', base=10)
    ax.set_xlabel('Number of shots')
    ax.set_ylabel('NLL of final assistant response')    
    # Add legend
    ax.legend(bbox_to_anchor=(1.02, 1), loc='upper left')
    # Adjust layout to prevent label cutoff
    plt.tight_layout()
    plt.show()

In [None]:
data

In [None]:
create_log_plot([data, data_2])