In [15]:
# %% ------------------------------------------------------------
# CONFIG & IMPORTS
# ---------------------------------------------------------------
from pathlib import Path
import torch, json, random, heapq, re
from typing import List, Dict, Tuple, Any, Dict
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft.tuners.lora import LoraLayer
from datasets import load_dataset, concatenate_datasets
from torch.utils.data import DataLoader
from tqdm.auto import tqdm
from collections import defaultdict

CHECKPOINT_DIR = Path("../experiments/llama_3.2_1B_sft/final_adapter")
LAYER_IDX      = 11          # zero-based transformer block to inspect
MAX_ROWS       = 10
BATCH_SIZE     = 10
TOP_K          = 50
DEVICE         = "mps" if torch.mps.is_available() else "cpu"

In [6]:
# %% ------------------------------------------------------------
# MODEL + TOKENIZER (LoRA already merged into final checkpoint)
# ---------------------------------------------------------------
tokenizer = AutoTokenizer.from_pretrained(
            CHECKPOINT_DIR,
            use_fast=True
        )

model = AutoModelForCausalLM.from_pretrained(
    CHECKPOINT_DIR,
    torch_dtype="auto",
    device_map="auto"
)
model.eval()

  warn("The installed version of bitsandbytes was compiled without GPU support. "


'NoneType' object has no attribute 'cadam32bit_grad_fp32'


LlamaForCausalLM(
  (model): LlamaModel(
    (embed_tokens): Embedding(128256, 2048)
    (layers): ModuleList(
      (0-15): 16 x LlamaDecoderLayer(
        (self_attn): LlamaAttention(
          (q_proj): lora.Linear(
            (base_layer): Linear(in_features=2048, out_features=2048, bias=False)
            (lora_dropout): ModuleDict(
              (default): Dropout(p=0.05, inplace=False)
            )
            (lora_A): ModuleDict(
              (default): Linear(in_features=2048, out_features=16, bias=False)
            )
            (lora_B): ModuleDict(
              (default): Linear(in_features=16, out_features=2048, bias=False)
            )
            (lora_embedding_A): ParameterDict()
            (lora_embedding_B): ParameterDict()
            (lora_magnitude_vector): ModuleDict()
          )
          (k_proj): lora.Linear(
            (base_layer): Linear(in_features=2048, out_features=512, bias=False)
            (lora_dropout): ModuleDict(
              (default)

In [7]:
import heapq
from collections import defaultdict
from typing import Dict, Tuple, List
import torch

LAYER_IDX = 11              # (whatever layer index you already use)

#  ➜ topk_store[module_name][neuron]["pos" | "neg"]  →  min-heap[List[tuple]]
#      tuple = (|val|, ex_idx, true_pos, raw_val)
topk_store: Dict[str, Dict[int, Dict[str, List[Tuple[float, int, int, float]]]]] = \
    defaultdict(lambda: defaultdict(lambda: {"pos": [], "neg": []}))

example_counter = 0        # global running index over all examples


In [8]:
import heapq, torch
from collections import defaultdict

# ---------------------------------------------------------------------------
# config:  how many neurons of A·x to keep per token
#          None  → keep all   |   int → keep that many Top-|activation|
AX_TOPK = 64          # e.g. 64   (set None for full vector)
# ---------------------------------------------------------------------------

def make_topk_hook(module_name: str, k_heap: int = TOP_K):
    """
    Build a forward hook for a PEFT lora.Linear layer that
      • computes h = A·x
      • optionally selects the Top-AX_TOPK neurons (|h|) per token
      • stores top-k_heap examples per (layer, neuron, sign) in `topk_store`.
    """
    def _hook(module, inp, _):
        ex_offset = CURRENT_EX_OFFSET     # global: dataset row index of batch[0]
        mask      = CURRENT_PAD_MASK      # (B, L) bool, True = real token

        # ---------- resolve which adapter key to use ----------------------
        adapter = module.active_adapter or next(iter(module.lora_A))
        if isinstance(adapter, (list, tuple)):
            adapter = adapter[0]

        # ---------- compute A·x -------------------------------------------
        x = inp[0]                        # (B, L, D_hidden)
        h = module.lora_A[adapter](x)     # (B, L, r)
        if h.ndim == 2:                   # some PEFT ops flatten B·L
            B, L = x.shape[:2]
            h = h.view(B, L, -1)

        B, L, R = h.shape
        if mask is None:
            mask = h.new_ones((B, L), dtype=torch.bool)

        # ---------- iterate over real tokens ------------------------------
        for b in range(B):
            ex_idx    = ex_offset + b
            valid_len = int(mask[b].sum())
            pad_left  = L - valid_len

            for pos in range(L):
                if not mask[b, pos]:
                    continue   # skip padding
                true_pos = pos - pad_left
                act_vec  = h[b, pos]                      # (R,)

                # --- select which neuron indices to store ----------------
                if AX_TOPK is None or AX_TOPK >= R:
                    store_idx = range(R)                  # keep all
                else:
                    store_idx = act_vec.abs().topk(AX_TOPK).indices.tolist()

                for n_idx in store_idx:
                    raw_val = act_vec[n_idx].item()
                    sign    = "pos" if raw_val >= 0 else "neg"
                    heap    = topk_store[module_name][n_idx][sign]

                    mag   = abs(raw_val)
                    item  = (mag, ex_idx, true_pos, raw_val)

                    if len(heap) < k_heap:
                        heapq.heappush(heap, item)
                    elif mag > heap[0][0]:
                        heapq.heapreplace(heap, item)

    return _hook


In [9]:
# locate LoRA blocks in the chosen layer
lora_modules = {
    name: mod
    for name, mod in model.named_modules()
    if f".{LAYER_IDX}." in name and isinstance(mod, LoraLayer)
}
for name, mod in lora_modules.items():
    mod.register_forward_hook(make_topk_hook(name))
print(f"✔ registered hooks for {len(lora_modules)} LoRA blocks")

✔ registered hooks for 5 LoRA blocks


In [10]:
# %% ------------------------------------------------------------
# HHRLHF DATASET → FLAT CHAT LISTS
# ---------------------------------------------------------------
DATASET_NAME = "Anthropic/hh-rlhf"
raw_ds = load_dataset(DATASET_NAME, split="train")

_TAG_RE   = re.compile(r"(Human|Assistant):")
_ROLE_MAP = {"Human": "user", "Assistant": "assistant"}

def hh_string_to_messages(text: str) -> List[Dict[str, str]]:
    parts, msgs = _TAG_RE.split(text), []
    for i in range(1, len(parts), 2):
        role, content = parts[i].strip(), parts[i+1].strip()
        if content:
            msgs.append({"role": _ROLE_MAP[role], "content": content})
    return msgs

def preprocess_to_messages(ex: Dict[str, Any]) -> Dict[str, Any]:
    return {
        "chosen":   hh_string_to_messages(ex["chosen"]),
        "rejected": hh_string_to_messages(ex["rejected"]),
    }

def violates_alternation(msgs: List[Dict[str,str]]) -> bool:
    if not msgs: return True
    if msgs[0]["role"] not in {"user", "system"}: return True
    for prev, curr in zip(msgs, msgs[1:]):
        if prev["role"] == curr["role"]: return True
        if prev["role"] in {"user","system"} and curr["role"]!="assistant": return True
        if prev["role"]=="assistant" and curr["role"] not in {"user","system"}: return True
    return False

def is_valid_dpo_pair(msgs):
    return len(msgs) >= 2 and msgs[-1]["role"] == "assistant"

# -- pipeline -----------------------------------------------------
msg_ds = raw_ds.map(preprocess_to_messages, remove_columns=raw_ds.column_names)

msg_ds = msg_ds.filter(lambda ex:
    not violates_alternation(ex["chosen"]) and
    not violates_alternation(ex["rejected"]) and
    is_valid_dpo_pair(ex["chosen"])        and
    is_valid_dpo_pair(ex["rejected"])
)

chosen_ds = (
    msg_ds.rename_column("chosen", "input")
          .remove_columns(["rejected"])
)
rejected_ds = (
    msg_ds.rename_column("rejected", "input")
          .remove_columns(["chosen"])
)
flat_ds = concatenate_datasets([chosen_ds, rejected_ds]).shuffle(seed=42)
print(f"Dataset size after flattening: {len(flat_ds):,}")


Map: 100%|██████████| 160800/160800 [00:06<00:00, 23419.70 examples/s]
Filter: 100%|██████████| 160800/160800 [00:04<00:00, 34048.23 examples/s]

Dataset size after flattening: 316,998





In [None]:
# %% ------------------------------------------------------------
# COLLATE FUNCTION & DATALOADER  ✅ fixed
# ---------------------------------------------------------------
def collate_chat(batch):
    dialogs = [ex["input"] for ex in batch]

    # `ids` is a tensor (B, L)
    ids = tokenizer.apply_chat_template(
        dialogs,
        add_generation_prompt=False,
        padding=True,            # pad to longest in *this* batch
        return_tensors="pt"
    )

    mask = (ids != tokenizer.pad_token_id).long()   # (B, L)
    print(mask)

    return {
        "input_ids":      ids.to(DEVICE),
        "attention_mask": mask.to(DEVICE)
    }

loader = DataLoader(
    flat_ds,
    batch_size=BATCH_SIZE,
    shuffle=False,
    collate_fn=collate_chat,
    drop_last=False
)


In [13]:
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token

In [17]:
from itertools import islice
from tqdm import tqdm

MAX_ROWS            = 10         # ← how many batches you want to run
CURRENT_EX_OFFSET   = 0
CURRENT_PAD_MASK    = None           # will be set per batch

MAX_BATCHES = int(MAX_ROWS/BATCH_SIZE)

# take only the first MAX_ROWS batches
limited_loader = islice(loader, MAX_BATCHES)

for row_idx, batch in enumerate(tqdm(limited_loader, total=MAX_BATCHES), start=0):
    input_ids        = batch["input_ids"].to(DEVICE)
    CURRENT_PAD_MASK = batch["attention_mask"].to(torch.bool)
    B                = input_ids.size(0)

    _ = model(input_ids)             # hooks fire here

    CURRENT_PAD_MASK  = None         # clear reference
    CURRENT_EX_OFFSET += B


100%|██████████| 1/1 [00:46<00:00, 46.32s/it]


In [18]:
def collapse_heaps(k: int = TOP_K):
    """
    adapter → neuron → rank → (ex_idx, true_pos, raw_val)
    Positive ranks 1..k, negative ranks k+1..2k (both sorted by |val| desc).
    """
    out = {}
    for module, n_map in topk_store.items():
        n_out = {}
        for n_idx, heaps in n_map.items():
            combined = sorted(
                heaps["pos"] + heaps["neg"],
                key=lambda t: t[0],
                reverse=True
            )
            n_out[n_idx] = {
                rank + 1: (ex, pos, raw)
                for rank, (_, ex, pos, raw) in enumerate(combined)
            }
        out[module] = n_out
    return out

adapters_pos_map = collapse_heaps()


In [19]:
adapters_pos_map

{'model.layers.11.self_attn.q_proj': {0: {1: (1, -750, 1.6171875),
   2: (1, -750, 1.6171875),
   3: (1, -750, 1.6171875),
   4: (9, -360, 1.59375),
   5: (9, -360, 1.59375),
   6: (9, -360, 1.59375),
   7: (5, -857, 1.4609375),
   8: (5, -856, 1.4609375),
   9: (5, -857, 1.4609375),
   10: (5, -856, 1.4609375),
   11: (5, -856, 1.4609375),
   12: (5, -857, 1.4609375),
   13: (7, 401, 1.4453125),
   14: (4, -582, 1.4453125),
   15: (4, -582, 1.4453125),
   16: (7, 401, 1.4453125),
   17: (7, 401, 1.4453125),
   18: (4, -582, 1.4453125),
   19: (7, 943, 1.4140625),
   20: (7, 943, 1.4140625),
   21: (7, 943, 1.4140625),
   22: (1, -519, 1.3984375),
   23: (1, -519, 1.3984375),
   24: (1, -519, 1.3984375),
   25: (9, -643, 1.3671875),
   26: (9, -643, 1.3671875),
   27: (9, -565, 1.3671875),
   28: (9, -643, 1.3671875),
   29: (9, -565, 1.3671875),
   30: (9, -565, 1.3671875),
   31: (1, -647, 1.359375),
   32: (1, -647, 1.359375),
   33: (1, -647, 1.359375),
   34: (7, 907, 1.3515625),


In [18]:
adapters_pos_map = collapse_heaps()            # step ❶

# 5) write to disk
with open("adapters_pos_map_topk", "w", encoding="utf-8") as f:
    json.dump(adapters_pos_map, f, indent=2, ensure_ascii=False)

In [10]:
'''
import heapq
from typing import Dict, Tuple, Any, Mapping

def topk_neuron_positions(
    acts_dict: Mapping[Any, Any],
    masks_dict: Mapping[Any, Any],
    module: Any,
    neuron: int,
    k: int = 10
) -> Dict[int, Tuple[int, int, float]]:
    """
    Return a dict mapping rank → (ex_idx, true_pos, activation), where:
      - selection is by |activation|
      - activation is the raw (signed) activation value.
      - pads (or zero activations) are skipped.
    """
    heap: list[Tuple[float, int, int, float]] = []  # (|val|, ex_idx, true_pos, raw_val)
    ex_offset = 0

    for act, msk in zip(acts_dict[module], masks_dict[module]):
        B, L, _ = act.shape

        raw_vals = act[:, :, neuron]                        # signed activations
        abs_vals = raw_vals.abs().masked_fill(~msk.bool(), 0.0)

        pad_left = (L - msk.sum(1))                         # per-sample left-pad

        for b in range(B):
            for pos in range(L):
                mag = float(abs_vals[b, pos])
                if mag == 0.0:
                    continue

                true_pos = pos - pad_left[b].item()
                ex_idx   = ex_offset + b
                signed   = float(raw_vals[b, pos])

                if len(heap) < k:
                    heapq.heappush(heap, (mag, ex_idx, true_pos, signed))
                elif mag > heap[0][0]:
                    heapq.heapreplace(heap, (mag, ex_idx, true_pos, signed))

        ex_offset += B

    # sort by descending magnitude
    top = sorted(heap, key=lambda t: t[0], reverse=True)

    # return only (ex_idx, true_pos, activation)
    return {
        rank: (ex_idx, pos, activation)
        for rank, (_, ex_idx, pos, activation) in enumerate(top, 1)
    }
'''

'\nimport heapq\nfrom typing import Dict, Tuple, Any, Mapping\n\ndef topk_neuron_positions(\n    acts_dict: Mapping[Any, Any],\n    masks_dict: Mapping[Any, Any],\n    module: Any,\n    neuron: int,\n    k: int = 10\n) -> Dict[int, Tuple[int, int, float]]:\n    """\n    Return a dict mapping rank → (ex_idx, true_pos, activation), where:\n      - selection is by |activation|\n      - activation is the raw (signed) activation value.\n      - pads (or zero activations) are skipped.\n    """\n    heap: list[Tuple[float, int, int, float]] = []  # (|val|, ex_idx, true_pos, raw_val)\n    ex_offset = 0\n\n    for act, msk in zip(acts_dict[module], masks_dict[module]):\n        B, L, _ = act.shape\n\n        raw_vals = act[:, :, neuron]                        # signed activations\n        abs_vals = raw_vals.abs().masked_fill(~msk.bool(), 0.0)\n\n        pad_left = (L - msk.sum(1))                         # per-sample left-pad\n\n        for b in range(B):\n            for pos in range(L):\n   

In [16]:
# which side is used right now?
print(tokenizer.padding_side)      # →  "right"  or  "left"

left


In [17]:
from typing import Dict, Tuple, Any
import torch

def extract_windows_from_dataset(
    topk_map: Dict[int, Tuple[int,int]],
    dataset: Any,
    tokenizer,
    window: int = 7
) -> Dict[int, str]:
    """
    Return a dict mapping rank → context snippet, where the target token
    at `tok_pos` is wrapped in << >> within a ±window token window.
    """
    out: Dict[int, str] = {}

    for rank, (ex_idx, tok_pos, val) in topk_map.items():
        # Re-tokenize the single example (no padding)
        ids = tokenizer.apply_chat_template(
            dataset[ex_idx]["input"],
            add_generation_prompt=False,
            padding=False,
            return_tensors="pt"
        )[0]  # shape (L,)

        # Determine window slice
        start = max(0, tok_pos - window)
        end   = min(ids.size(0), tok_pos + window + 1)
        snippet_ids = ids[start:end].tolist()

        # Decode one token at a time to preserve alignment
        toks = [
            tokenizer.decode([tid], skip_special_tokens=False)
            for tid in snippet_ids
        ]

        # Wrap the center token
        center = tok_pos - start
        toks[center] = f" <<{toks[center]}>> "

        # Re-join into a single string (preserving any newlines)
        snippet = "".join(toks)

        out[rank] = snippet

    return out


In [13]:
'''

# %% ------------------------------------------------------------
# RUN TOP-K + DISPLAY SNIPPETS
# ---------------------------------------------------------------
MODULE   = next(iter(lora_modules))   # pick the first LoRA block
NEURON   = 33
TOP      = 40

pos_map  = topk_neuron_positions(acts, attn_masks, MODULE, NEURON, k=TOP)
snips    = extract_windows_from_dataset(pos_map, flat_ds, tokenizer, window=7)

for r in sorted(snips):
    print(f"[rank {r:>2}] ex#{pos_map[r][0]} tok@{pos_map[r][1]} ⇒ {snips[r]}")
'''

'\n\n# %% ------------------------------------------------------------\n# RUN TOP-K + DISPLAY SNIPPETS\n# ---------------------------------------------------------------\nMODULE   = next(iter(lora_modules))   # pick the first LoRA block\nNEURON   = 33\nTOP      = 40\n\npos_map  = topk_neuron_positions(acts, attn_masks, MODULE, NEURON, k=TOP)\nsnips    = extract_windows_from_dataset(pos_map, flat_ds, tokenizer, window=7)\n\nfor r in sorted(snips):\n    print(f"[rank {r:>2}] ex#{pos_map[r][0]} tok@{pos_map[r][1]} ⇒ {snips[r]}")\n'

In [14]:
'''
from IPython.display import Markdown          # for bold / colour in notebooks

def show_token_windows(
    topk_map,               # {rank: (ex_idx, token_pos)}
    dataset,
    tokenizer,
    window:int = 7,
    style:str = "bold"      # "bold", "brackets", or "html"
):
    """
    Prints a highlighted snippet for every rank in topk_map.
    """
    def wrap(tok:str) -> str:
        if style == "bold":        return f"**{tok}**"
        if style == "brackets":    return f"[{tok}]"
        if style == "html":        return f"<mark>{tok}</mark>"
        return tok

    for rank, (ex_idx, tok_pos, val) in sorted(topk_map.items()):
        ids = tokenizer.apply_chat_template(
            dataset[ex_idx]["input"],
            add_generation_prompt=False,
            padding=False,
            return_tensors="pt"
        )[0]

        start = max(0, tok_pos - window)
        end   = min(ids.size(0), tok_pos + window + 1)
        ids_slice = ids[start:end].tolist()

        # decode *one token at a time* so we keep alignment
        toks = [tokenizer.decode([tid], skip_special_tokens=False)
                for tid in ids_slice]

        center = tok_pos - start          # index of the hot token in slice
        toks[center] = wrap(toks[center])

        snippet = "".join(toks).replace("\n", "⏎")  # keep single-line print
        display(Markdown(f"**rank {rank}** · ex#{ex_idx} · tok@{tok_pos}<br>{snippet}"))
'''

'\nfrom IPython.display import Markdown          # for bold / colour in notebooks\n\ndef show_token_windows(\n    topk_map,               # {rank: (ex_idx, token_pos)}\n    dataset,\n    tokenizer,\n    window:int = 7,\n    style:str = "bold"      # "bold", "brackets", or "html"\n):\n    """\n    Prints a highlighted snippet for every rank in topk_map.\n    """\n    def wrap(tok:str) -> str:\n        if style == "bold":        return f"**{tok}**"\n        if style == "brackets":    return f"[{tok}]"\n        if style == "html":        return f"<mark>{tok}</mark>"\n        return tok\n\n    for rank, (ex_idx, tok_pos, val) in sorted(topk_map.items()):\n        ids = tokenizer.apply_chat_template(\n            dataset[ex_idx]["input"],\n            add_generation_prompt=False,\n            padding=False,\n            return_tensors="pt"\n        )[0]\n\n        start = max(0, tok_pos - window)\n        end   = min(ids.size(0), tok_pos + window + 1)\n        ids_slice = ids[start:end].tol

In [19]:
from typing import Dict, Tuple, Mapping

def build_activation_prompt(
    windows_dict: Dict[int, Dict[str, str]],
    topk_map: Mapping[int, Tuple[int, int]],
    *,
    newline: str = "\n",            # "\n" for plain text, "<br>" for Markdown
    context_newline: bool = False,  # convert "⏎" back to real newlines?
) -> str:
    """
    Build a prompt like

        Example 1: … token …  
        Activations: ("token", 42)

    Parameters
    ----------
    newline
        The line-separator to use between lines.  Change to "<br>" if you plan
        to render in Markdown/HTML and don’t want actual carriage returns.
    context_newline
        If True, every literal "⏎" that appears inside the context slice is
        replaced by the chosen `newline` character so you regain the original
        formatting of multi-line examples.
    """
    blocks = []
    for rank in sorted(windows_dict):
        token   = windows_dict[rank]["token"]
        context = windows_dict[rank]["context"]
        pos     = topk_map[rank][1]
        val     = topk_map[rank][2]

        if context_newline:
            context = context.replace("⏎", newline)

        block = (
            f"Example {rank}:{newline}"
            f"{context}{newline}"
            f"Activations: (\"{token.strip()}\", {val})"
        )
        blocks.append(block)

    # Two line-breaks between blocks so each example is visually separated
    return (newline * 2).join(blocks)


In [20]:
from typing import Dict, Tuple, Any, Mapping, Optional

def token_windows_dict(
    topk_map: Mapping[int, Tuple[int, int, float]],   # {rank: (ex_idx, tok_pos, val)}
    dataset: Any,
    tokenizer,
    window: int = 7,
    topk: Optional[int] = None,                       # new: only keep ranks ≤ topk
) -> Dict[int, Dict[str, Any]]:
    """
    Return a dict of the form
        { rank: { "token": <str>, "context": <str>, "value": <float> }, ... }

    * `window` controls how many tokens are shown on each side.
    * `topk`, if set, limits to only ranks 1..topk.
    * Newlines inside the context are replaced with the glyph '⏎'
      so the string stays single-line (handy for printing or logging).
    """
    result: Dict[int, Dict[str, Any]] = {}
    for rank, (ex_idx, tok_pos, val) in sorted(topk_map.items()):
        # stop if we've reached the desired topk
        if topk is not None and int(rank) > topk:
            break

        # Build the full token-id sequence for this example
        ids = tokenizer.apply_chat_template(
            dataset[ex_idx]["input"],
            add_generation_prompt=False,
            padding=False,
            return_tensors="pt",
        )[0]  # shape (L,)

        # Slice out a small window around the target position
        start = max(0, tok_pos - window)
        end   = min(ids.size(0), tok_pos + window + 1)
        ids_slice = ids[start:end].tolist()

        # Decode one token at a time to keep byte-level alignment
        toks = [
            tokenizer.decode([tid], skip_special_tokens=False)
            for tid in ids_slice
        ]

        center = tok_pos - start          # position of the “hot” token
        token_text = toks[center]         # raw decoded token

        # Wrap the center token in << ... >>
        toks[center] = f"<<{toks[center]}>>"

        # Join and replace newlines for single-line context
        snippet = "".join(toks).replace("\n", "⏎")

        result[rank] = {
            "token": token_text,
            "context": snippet,
            "value": val,
        }

    return result


In [21]:
windows = token_windows_dict(pos_map, flat_ds, tokenizer, window=5)
print(windows)
prompt   = build_activation_prompt(windows, pos_map)
print(prompt)

NameError: name 'pos_map' is not defined

# ChatGPT AutoInterpret

In [22]:
def generate_system_prompt(
    *,
    include_cot: bool = False,
    include_few_shot: bool = False,
) -> str:
    """
    Return the complete system prompt used for the activation-analysis task.

    Parameters
    ----------
    include_cot
        If True, the prompt explicitly encourages the assistant to think
        through the three analysis stages (1…3).
    include_few_shot
        If True, append illustrative few-shot examples.  When both flags are
        True the COT-annotated few-shot block is used; otherwise a plain,
        interpretation-only block is shown.
    """
    base_guidelines = """You are a meticulous AI researcher conducting an important
investigation into patterns found in language. Your task is to
analyze text and provide an interpretation that thoroughly
encapsulates possible patterns found in it.
Guidelines:
You will be given a list of text examples on which special words
are selected and between delimiters like << this >>.
If a sequence of consecutive tokens all are important,
the entire sequence of tokens will be contained between
delimiters <<just like this>>. How important each token is for
the behavior is listed after each example in parentheses.
- Try to produce a concise final description. Simply describe
  the text latents that are common in the examples, and what
  patterns you found.
- If the examples are uninformative, you don’t need to mention
  them. Don’t focus on giving examples of important tokens,
  but try to summarize the patterns found in the examples.
- Do not mention the marker tokens ($<<$ $>>$) in your interpretation.
- Do not make lists of possible interpretations.
  Keep your interpretations short and concise.
- The last line of your response must be the formatted
  interpretation, using [interpretation]:"""

    cot_instructions = """
To better find the interpretation for the language patterns,
go through the following stages:
1. Find the special words that are selected in the examples and list
   a couple of them (no more than five). Search for patterns in these words.
2. Write down general shared latents of the text examples.
   This could be related to the full sentence or to the words
   surrounding the marked words.
3. Formulate a hypothesis and write down the final interpretation
   using [interpretation]:"""

    # ---------- few-shot templates ----------
    few_shot_plain = """
Example 1: and he was <<over the moon>> to find
Activations: (“over", 5), (“ the", 6), (“ moon", 9)

Example 2: we'll be laughing <<till the cows come home>>!
Activations: (“till", 5), (“ the", 5), (“ cows", 8),
(“ come", 8), (“ home", 8)

Example 3: thought Scotland was boring, but really there’s
more <<than meets the eye>>!
Activations: (“than", 5), (“ meets", 7), (“ the", 6), (“ eye", 8)

[interpretation]: Common idioms in text conveying positive sentiment.
"""

    few_shot_cot = """
Example 1: and he was <<over the moon>> to find
Activations: (“over", 5), (“ the", 6), (“ moon", 9)

Example 2: we'll be laughing <<till the cows come home>>!
Activations: (“till", 5), (“ the", 5), (“ cows", 8),
(“ come", 8), (“ home", 8)

Example 3: thought Scotland was boring, but really there’s
more <<than meets the eye>>!
Activations: (“than", 5), (“ meets", 7), (“ the", 6), (“ eye", 8)

ACTIVATING TOKENS: “over the moon”, “than meets the eye”.
SURROUNDING TOKENS: No interesting patterns.

Step 1.
- The activating tokens are parts of common idioms.
- The surrounding tokens have nothing in common.

Step 2.
- The examples contain common idioms.
- Some activating tokens are followed by an exclamation mark.

Step 3.
- The activation values are highest for the more common idioms
  in examples 1 and 3.
Let me think carefully … Did I miss any patterns?
Yes: all examples convey positive sentiment.

[interpretation]: Common idioms in text conveying positive sentiment.
"""

    # ---------- assemble the prompt ----------
    parts = [base_guidelines]

    if include_cot:
        parts.append(cot_instructions)

    if include_few_shot:
        parts.append(few_shot_cot if include_cot else few_shot_plain)

    return "\n\n".join(parts)


In [23]:
# 2)  Load your key securely – run once per session
import os

# Option A – recommended: set the variable in the notebook *before* importing OpenAI
os.environ["OPENAI_API_KEY"] = "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"    # <-- paste your key, then **hide** the cell


In [24]:
import re
from typing import Optional

def extract_interpretation(response_text: str) -> Optional[str]:
    """
    Extract the final interpretation from a model response string using the
    `[interpretation]:` marker.

    Returns the interpretation string (stripped), or None if no match is found.
    """
    match = re.search(r"\[interpretation\]\s*:\s*(.+)", response_text, re.IGNORECASE)
    if match:
        return match.group(1).strip()
    return None


In [25]:
import json, tqdm
from typing import Dict, Any, Mapping
from openai import OpenAI

# assumes the helpers you already wrote are imported:
#   • token_windows_dict
#   • build_activation_prompt
#   • generate_system_prompt
#   • extract_interpretation

def build_lora_json_with_responses(
    adapters_pos_map: Dict[str, Mapping[int, Tuple[int,int,float]]],
    dataset: Any,
    tokenizer,
    *,
    window: int = 32,
    model: str = "gpt-4o-mini",
    include_cot: bool = False,
    include_few_shot: bool = False,
    temperature: float = 0.0,
    max_tokens: int | None = 512,
    outfile: str = "lora_neuron_info.json",
    client: OpenAI | None = None,
) -> Dict[str, Dict[str, Dict[str, Any]]]:
    """
    Generates `outfile` in the format needed by the Gradio app,
    using the Responses API (`client.responses.create`).

    The output JSON will now include:
      * interpretation: str
      * top_activations: List[str]   (contexts)
      * values:          List[float] (activation values)
    """
    if client is None:
        client = OpenAI()

    system_prompt = generate_system_prompt(
        include_cot=include_cot, include_few_shot=include_few_shot
    )

    results: Dict[str, Dict[str, Dict[str, Any]]] = {}

    for adapter, neuron_maps in tqdm.tqdm(adapters_pos_map.items(), desc="Adapters"):
        adapter_block: Dict[str, Dict[str, Any]] = {}
        for n_idx, pos_map in tqdm.tqdm(neuron_maps.items(),
                                        desc=f"  {adapter}", leave=False):
            # 1) build example windows (each entry also has a "value" field)

            pos_map = { int(key) : val for key,val in pos_map.items()}
            
            windows_dict = token_windows_dict(
                pos_map, dataset, tokenizer,
                window=window,
                topk=40
            )

            # 2) craft user prompt
            prompt = build_activation_prompt(
                windows_dict,
                pos_map,
                newline="\n",
                context_newline=True,
            )
            # 3) model call via Responses API
            response = client.responses.create(
                model=model,
                instructions=system_prompt,
                input=prompt,
                temperature=temperature,
            )
            answer_text   = response.output_text
            interpretation = extract_interpretation(answer_text) or "N/A"

            # 4) collect the top‐k contexts *and* their activation values
            ranks = sorted(windows_dict)
            topk_snippets = [windows_dict[r]["context"] for r in ranks]
            topk_values   = [windows_dict[r]["value"]   for r in ranks]

            adapter_block[str(n_idx)] = {
                "interpretation":  interpretation,
                "top_activations": topk_snippets,
                "values":          topk_values,
            }

        results[adapter] = adapter_block

    # 5) write to disk
    with open(outfile, "w", encoding="utf-8") as f:
        json.dump(results, f, indent=2, ensure_ascii=False)

    print(
        f"✓ Saved {outfile}  "
        f"({len(results)} adapters × {len(next(iter(results.values())))} neurons)"
    )
    return results


In [None]:
#adapters_pos_map = collapse_heaps()            # step ❶

In [26]:
json_blob = build_lora_json_with_responses(     # step ❷
    adapters_pos_map,
    flat_ds, tokenizer,
    model="gpt-4o-mini", include_cot=False, include_few_shot=False
)
# ⬆️ writes lora_neuron_info.json

Adapters:   0%|          | 0/7 [00:00<?, ?it/s]

Adapters:  57%|█████▋    | 4/7 [2:27:09<1:50:22, 2207.41s/it]


NotFoundError: Error code: 404 - {'error': {'message': 'Invalid URL (POST /v1/engines/gpt-4o-mini-2024-07-18/completions)', 'type': 'invalid_request_error', 'param': None, 'code': None}}

In [None]:
'''
# 5) write to disk
with open("adapters_pos_map", "w", encoding="utf-8") as f:
    json.dump(adapters_pos_map, f, indent=2, ensure_ascii=False)
    '''

In [23]:
# 2) Load it back into a Python dict
with open("adapters_pos_map", "r", encoding="utf-8") as f:
    adapters_pos_map = json.load(f)

In [24]:
print(adapters_pos_map.keys())
adapters_pos_map = {list(adapters_pos_map.keys())[0] : list(adapters_pos_map.values())[0]}

dict_keys(['model.layers.11.self_attn.q_proj', 'model.layers.11.self_attn.k_proj', 'model.layers.11.self_attn.v_proj', 'model.layers.11.self_attn.o_proj', 'model.layers.11.mlp.gate_proj', 'model.layers.11.mlp.up_proj', 'model.layers.11.mlp.down_proj'])


# Evaluation

In [25]:
print(adapters_pos_map)

{'model.layers.11.self_attn.q_proj': {'0': {'1': [50803, 168, 2.90625], '2': [32619, 168, 2.90625], '3': [1353, 186, 2.828125], '4': [53765, 435, 2.796875], '5': [39038, 448, 2.75], '6': [26117, 264, 2.734375], '7': [48501, 43, 2.734375], '8': [44222, 311, 2.703125], '9': [58773, 594, 2.671875], '10': [54932, 414, 2.65625], '11': [47909, 470, 2.65625], '12': [22183, 271, 2.640625], '13': [54026, 488, 2.640625], '14': [23326, 630, 2.640625], '15': [15023, 329, 2.625], '16': [56017, 90, 2.625], '17': [18891, 45, 2.609375], '18': [15206, 153, 2.609375], '19': [46004, 223, 2.578125], '20': [44581, 424, 2.578125], '21': [3037, 132, 2.5625], '22': [14160, 94, 2.546875], '23': [21774, 266, 2.546875], '24': [53666, 358, 2.546875], '25': [26152, 46, 2.546875], '26': [31167, 139, 2.546875], '27': [16053, 397, 2.546875], '28': [11010, 498, 2.546875], '29': [8704, 171, 2.53125], '30': [33942, 208, 2.53125], '31': [17709, 577, 2.53125], '32': [45151, 744, 2.53125], '33': [9553, 456, 2.53125], '34':

In [63]:
# %% [markdown]
"""
Neuron Diagnostic Dataset Builder ‧ *Jupyter edition*  **v4.2**
============================================================
**Activation triple layout updated**

Your activation records are now expected as either:

| Triple shape | Field order | Context source |
|--------------|-------------|----------------|
| `[ex_idx, pos, value]` | example‑index • token‑position • activation‑value | Dataset‑window (preferred) |
| `[ex_idx, tok_id, pos, value]` | example‑index • token‑id • position • value | Dataset‑window (tok_id used only for fallback) |

➡️ **`ex_idx` is always the *first* element.**  This matches the dictionary you
showed (`[50803, 168, 2.90625]` → example #50803, position 168, value ≈2.91).

If `tok_id` is absent, we derive it on‑the‑fly when rebuilding the snippet; it
is only required for the rare fallback to `lora_info`.
"""

# %% [code] Imports & helpers
from __future__ import annotations
import json, random, re
from pathlib import Path
from typing import Any, Dict, List, Tuple, Optional

from transformers import AutoTokenizer

TOKENIZER_CACHE: Dict[str, Any] = {}

def _tok(model):
    if model not in TOKENIZER_CACHE:
        TOKENIZER_CACHE[model] = AutoTokenizer.from_pretrained(model, trust_remote_code=True)
    return TOKENIZER_CACHE[model]


def _load_json(p):
    with open(p, "r", encoding="utf-8") as fp:
        return json.load(fp)


def token_window(dataset, tokenizer, ex_idx: int, pos: int, window: int = 7):
    """Return ±`window` tokens around *pos* with center wrapped in <<…>>."""
    ids = tokenizer.apply_chat_template(dataset[ex_idx]["input"], add_generation_prompt=False,
                                        padding=False, return_tensors="pt")[0]
    s, e = max(0, pos-window), min(ids.size(0), pos+window+1)
    toks = [tokenizer.decode([tid], skip_special_tokens=False) for tid in ids[s:e]]
    toks[pos-s] = f"<<{toks[pos-s]}>>"
    return "".join(toks).replace("\n", "⏎")


def _ctx_lorainfo(layer, nid, rank, lora_info, model, tok_id: Optional[int]):
    tops = lora_info.get(layer, {}).get(nid, {}).get("top_activations", [])
    if 0 < rank <= len(tops):
        return tops[rank-1].replace("<<", "").replace(">>", "").strip()
    if tok_id is not None:
        return _tok(model).decode([tok_id]).strip()
    return "[UNK]"


def _clean(s):
    return re.sub(r"\s+", " ", re.sub(r"<<\s*|\s*>>", "", s)).strip()

# %% [markdown] Dataset builder

# %% [code]

def build_dataset(act_dict, lora_info, examples, model, window=7, k_skip=40,
                  n_examples=10, n_neg=4, seed=42):
    random.seed(seed)
    tok = _tok(model)

    # Build: (layer,nid) -> rows[(tid?,pos,val,rank,ex_idx)]
    rows_by_neuron: Dict[Tuple[str,str], List[Tuple[Optional[int],int,float,int,int]]] = {}
    for layer, ns in act_dict.items():
        for nid, acts in ns.items():
            rows = []
            for rk_str, trip in acts.items():
                if len(trip) == 3:
                    ex_idx, pos, val = trip
                    tid = None
                elif len(trip) == 4:
                    ex_idx, tid, pos, val = trip
                else:
                    raise ValueError("Activation entry must have 3 or 4 items")
                rows.append((tid, pos, val, int(rk_str), ex_idx))
            rows.sort(key=lambda t: -t[2])
            rows_by_neuron[(layer, nid)] = rows

    all_keys = list(rows_by_neuron.keys())
    dataset = []

    def ctx(layer,nid,tid,pos,rank,ex_idx):
        if ex_idx is not None and ex_idx < len(examples):
            return token_window(examples, tok, ex_idx, pos, window)
        return _ctx_lorainfo(layer, nid, rank, lora_info, model, tid)

    for (layer, nid), rows in rows_by_neuron.items():
        if len(rows) <= k_skip: continue
        pos_rows = rows[k_skip:]
        positives = random.choices(pos_rows, k=n_examples)
        pos_positions = {p for _tid,p,_v,_r,_e in rows}

        for tid,pos,_v,rank,ex_idx in positives:
            negs, tries = [],0
            while len(negs)<n_neg and tries<100:
                tries+=1
                nl,nn = random.choice(all_keys)
                if (nl,nn)==(layer,nid): continue
                ntid,npos,_nval,nrank,nex = random.choice(rows_by_neuron[(nl,nn)][:k_skip])
                if npos in pos_positions: continue
                negs.append(ctx(nl,nn,ntid,npos,nrank,nex))
            if len(negs)<n_neg: continue
            pos_ctx = ctx(layer,nid,tid,pos,rank,ex_idx)
            sents = negs+[pos_ctx]
            random.shuffle(sents)
            dataset.append({
                "layer":layer, "neuron":nid,
                "sentences":sents,
                "answer":sents.index(pos_ctx)+1,
                "interpretation":lora_info[layer][nid]["interpretation"],
            })
    return dataset

# %% [markdown] Prompt & metrics

# %% [code]

def build_prompt(item):
    lines = [
        "You are given **five** independent sentences. Exactly **one** of them matches the neuron interpretation below.",
        "Respond **only** with its number (1-5).",
        "",
        f"Neuron interpretation: {item['interpretation']}",
        "",
    ]
    for i,s in enumerate(item['sentences'],1):
        lines.append(f"{i}. {_clean(s)}")
    return "\n".join(lines)

def evaluate(preds,ds):
    acc = sum(p==ex['answer'] for p,ex in zip(preds,ds))/len(ds)
    return {k:acc for k in ("accuracy","precision","recall")}

# %% [markdown] Example paths
# %% [markdown]
"""## Example usage"""

# %% [code] Paths / params (edit!)
ACT_PATH = "adapters_pos_map"   # activations with ex_idx included
LORA_PATH = "lora_neuron_info.json" # only for interpretations
DATA_PATH = "dataset.json"          # the original dataset the activations came from
MODEL_NAME = "outputs/gemma2b_lora_dpo_r64/final_adapter"

WINDOW = 32  # context size on each side

# %% [code] Load
activations = _load_json(ACT_PATH)
lora_info   = _load_json(LORA_PATH)
examples    = flat_ds

print(activations)

dataset = build_dataset(
    activations,
    lora_info,
    examples,
    MODEL_NAME,
    window=WINDOW,
)

print(f"Generated {len(dataset)} test items.")

# %% [code] Inspect one
print(build_prompt(dataset[0]))

{'model.layers.11.self_attn.q_proj': {'0': {'1': [50803, 168, 2.90625], '2': [32619, 168, 2.90625], '3': [1353, 186, 2.828125], '4': [53765, 435, 2.796875], '5': [39038, 448, 2.75], '6': [26117, 264, 2.734375], '7': [48501, 43, 2.734375], '8': [44222, 311, 2.703125], '9': [58773, 594, 2.671875], '10': [54932, 414, 2.65625], '11': [47909, 470, 2.65625], '12': [22183, 271, 2.640625], '13': [54026, 488, 2.640625], '14': [23326, 630, 2.640625], '15': [15023, 329, 2.625], '16': [56017, 90, 2.625], '17': [18891, 45, 2.609375], '18': [15206, 153, 2.609375], '19': [46004, 223, 2.578125], '20': [44581, 424, 2.578125], '21': [3037, 132, 2.5625], '22': [14160, 94, 2.546875], '23': [21774, 266, 2.546875], '24': [53666, 358, 2.546875], '25': [26152, 46, 2.546875], '26': [31167, 139, 2.546875], '27': [16053, 397, 2.546875], '28': [11010, 498, 2.546875], '29': [8704, 171, 2.53125], '30': [33942, 208, 2.53125], '31': [17709, 577, 2.53125], '32': [45151, 744, 2.53125], '33': [9553, 456, 2.53125], '34':

In [None]:
# %% [code]
import os, time, re, time
from openai import OpenAI
from tqdm.auto import tqdm

SYSTEM_PROMPT = "You are a helpful assistant."
CHAT_MODEL    = "gpt-4o"   # adjust as desired
TEMPERATURE   = 0.0
SLEEP_SEC     = 0.5        # rudimentary rate‑limit guard

client = OpenAI()  # uses $OPENAI_API_KEY

_digit_re = re.compile(r"[1-5]")

def extract_digit(text: str) -> int:
    """Return first digit 1‑5 found in *text* (or 0 if none)."""
    m = _digit_re.search(text)
    return int(m.group()) if m else 0

predictions, correct = [], 0
bar = tqdm(dataset, desc="Evaluating", unit="ex")
for i, ex in enumerate(bar, 1):
    user_prompt = build_prompt(ex)
    resp = client.chat.completions.create(
        model=CHAT_MODEL,
        temperature=TEMPERATURE,
        messages=[
            {"role": "system", "content": SYSTEM_PROMPT},
            {"role": "user",   "content": user_prompt},
        ],
    )
    reply = resp.choices[0].message.content.strip()
    pred  = extract_digit(reply)
    predictions.append(pred)
    if pred == ex["answer"]:
        correct += 1
    bar.set_postfix(acc=f"{correct / i:.3f}")
    time.sleep(SLEEP_SEC)

metrics = evaluate(predictions, dataset)
print("Evaluation metrics:", metrics)


Evaluating:   0%|          | 0/4480 [00:00<?, ?ex/s]

Evaluation metrics: {'accuracy': 0.3176339285714286, 'precision': 0.3176339285714286, 'recall': 0.3176339285714286}


In [None]:
with open("eval_dataset.json", "w", encoding="utf-8") as f:
        json.dump(dataset, f, indent=2, ensure_ascii=False)