# Model Activation Collection

In [3]:
import os, json, argparse
from pathlib import Path
from typing import List

import torch
import numpy as np
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM
from peft import PeftModel
from tqdm import tqdm


  from .autonotebook import tqdm as notebook_tqdm


In [4]:
MODEL = "Qwen/Qwen2.5-7B-Instruct"
SEED = 42

In [5]:
torch.manual_seed(SEED)
if torch.cuda.is_available(): 
    torch.cuda.manual_seed(SEED)

In [6]:
def middle_idx(hidden_len: int) -> int: 
    n_layers = hidden_len - 1
    return 1 + (n_layers // 2)

In [7]:
def get_hidden_states(model, input_ids, attention_mask) -> torch.Tensor:
    """Run one forward pass and return hidden_states[idx]: [B, S, D]."""
    out = model(
        input_ids=input_ids,
        attention_mask=attention_mask,
        output_hidden_states=True,
        use_cache=False,
        return_dict=True,
    )
    return out.hidden_states

In [8]:
def flatten_pair_single_layer(
    hsA: torch.Tensor,
    hsB: torch.Tensor,
    attention_mask: torch.Tensor,
    drop_bos: bool,
) -> torch.Tensor:
    """
    Align, drop BOS, mask padding, and return [N, 2, d_in] for ONE layer.

    - Single tokenizer → SAME ids/mask sent to both models → token t aligns in A and B.
    - Drop BOS (t=0) to avoid degenerate no-context token.
    - Mask padding (attention_mask == 0) to keep only real tokens.
    """
    if drop_bos:
        hsA = hsA[:, 1:, :]
        hsB = hsB[:, 1:, :]
        mask = attention_mask[:, 1:]
    else:
        mask = attention_mask

    valid = mask.bool().view(-1)           # [B*S’]
    A = hsA.reshape(-1, hsA.size(-1))[valid]
    B = hsB.reshape(-1, hsB.size(-1))[valid]
    x = torch.stack([A, B], dim=1)         # [N, 2, d_in]
    return x

In [9]:
def write_shard(dir_path: Path, shard_id: int, x_cpu_np: np.ndarray, meta: dict):
    dir_path.mkdir(parents=True, exist_ok=True)
    path = dir_path / f"acts_{shard_id:05d}.pt"
    torch.save({"x": torch.from_numpy(x_cpu_np), "meta": meta}, path)
    return path

In [10]:
from typing import Optional, Tuple
import torch
from unsloth import FastLanguageModel
from transformers import AutoTokenizer

def _post_infer_setup(model):
    # Safer inference defaults
    try: model.gradient_checkpointing_disable()
    except Exception: pass
    try: model.config.gradient_checkpointing = False
    except Exception: pass
    try: model.config.use_cache = True
    except Exception: pass
    return model.eval()

def load_unsloth_pair(
    base_model: str = "Qwen/Qwen2.5-7B-Instruct",
    adapter_dir: str = "outputs/adapter",
    device_map: str = "auto",
    load_in_4bit: bool = True,
    max_seq_length: int = 4096,
    dtype: Optional[str] = None,   # None lets Unsloth pick (good for 4-bit)
):
    """
    Returns: (model_A, model_B, tokenizer)
      - model_A: base (no LoRA)
      - model_B: base + LoRA (loaded from adapter_dir)
      - tokenizer: *single* tokenizer from base (used for both)
    """
    # 1) One tokenizer (from BASE) to guarantee identical tokenization A vs B
    tokenizer = AutoTokenizer.from_pretrained(base_model, use_fast=True)
    if tokenizer.pad_token is None:
        tokenizer.pad_token = tokenizer.eos_token

    # 2) Load base with Unsloth
    model_A, _tokA = FastLanguageModel.from_pretrained(
        model_name     = base_model,
        load_in_4bit   = load_in_4bit,
        max_seq_length = max_seq_length,
        dtype          = dtype,
        device_map     = device_map,
    )
    model_A = _post_infer_setup(model_A)

    # 3) Load LoRA adapter with Unsloth (from adapter dir). Unsloth will resolve base.
    #    (This is supported: pointing to the adapter folder is enough.)  # refs in sources
    model_B, _tokB = FastLanguageModel.from_pretrained(
        model_name     = adapter_dir,
        load_in_4bit   = load_in_4bit,
        max_seq_length = max_seq_length,
        dtype          = dtype,
        device_map     = device_map,
    )
    model_B = _post_infer_setup(model_B)

    # 4) Optional sanity checks to catch accidental tokenizer drift
    #    (we *still* force using `tokenizer` from base everywhere)
    try:
        assert _tokB.get_vocab() == tokenizer.get_vocab()
    except Exception:
        # If not equal, we still use `tokenizer` consistently for both models.
        # This keeps activations aligned.
        pass

    return model_A, model_B, tokenizer


ModuleNotFoundError: No module named 'unsloth'

In [None]:
import yaml 

# Load from a YAML file
with open("acts_config.yaml", "r") as f:
    config = yaml.safe_load(f)

print(config)
print(config['dataset'])

{'model': 'Qwen/Qwen2.5-7B-Instruct', 'adapter': 'adapter', 'dataset': {'name': 'opeani/gsm8k', 'subset': 'main', 'split': 'train', 'field': 'question'}, 'seq_len': 2048, 'batch_size': 8, 'drop_bos': True, 'dtype': 'bf16', 'out_dir': 'activations', 'seed': 42}
{'name': 'opeani/gsm8k', 'subset': 'main', 'split': 'train', 'field': 'question'}


In [None]:
#Create the path for this data 
out_dir = Path(config['out_dir'], config['model'])
out_dir.mkdir(parents=True, exist_ok=True)

In [None]:
# load in the tokenizer 
base, tuned, tokenizer = load_unsloth_pair(base_model=config['model'], adapter_dir=config['adapter'], max_seq_len=config['seq_len'])

NameError: name 'load_unsloth_pair' is not defined

In [None]:
# need to get the hidden_size 
probe = tokenizer("hello world", return_tensors="pt").to(device)
with torch.no_grad(): 
    out = model_A(**probe, output_hidden_states=True, return_dict=True)

hidden_len = len(out.hidden_states)   # = n_layers + 1
d_in = out.hidden_states[-1].size(-1)

NameError: name 'tokenizer' is not defined

In [None]:
idx_middle = middle_idx(hidden_len)

NameError: name 'hidden_len' is not defined

In [None]:
from dataclasses import dataclass 
@dataclass
class LayerSpec: 
    name: str
    index: int 

In [None]:
layers = [LayerSpec("-3", -3), LayerSpec("-2", -2), LayerSpec(f"{middle_idx}", middle_idx)]

In [None]:
manifest = {
    "base_model": config['model'],
    "adapter_dir": config['adapter'],
    "dataset": config['dataset']['name'],
    "subset": config['dataset']['subset'],
    "split": config['dataset']['split'],
    "field": config['dataset']['field'],
    "seq_len": config['seq_len'],
    "dtype": config['dtype'],
    "device": device,
    "drop_bos": bool(config['drop_bos']),
    "d_in": int(d_in),
    "layers": {name: idx for name, idx in layer_specs},
    "schema_per_layer": "x: [N, 2, d_in]; model axis: [base, base+LoRA]",
}

with open(out_dir / "manifest.json", "w") as f:
    json.dump(manifest, f, indent=2)



NameError: name 'device' is not defined

In [None]:
ds = load_dataset(config['dataset']['name'], config['dataset']['subset'], config['dataset']['split'])

NameError: name 'load_dataset' is not defined

In [None]:
for start in range(0, len(ds), config['chunk']):
    part = ds.select(range(start, min(start + CHUNK, len(ds))))
    texts: List[str] = part[config['dataset']['field']]

    # iterate over each element in the chunk in batches 
    
    for i in tqdm(range(0, len(texts), args.batch_size), desc=f"Chunk {start//CHUNK}"):
        micro = texts[i : i + args.batch_size]
        if not micro:
            continue

        # Tokenize ONCE → SAME ids/mask for both models (alignment contract)
        enc = tokenizer(
            micro,
            return_tensors="pt",
            padding=True,
            truncation=True,
            max_length=args.seq_len,
        ).to(args.device)

        hsA = get_hidden_states(model_A, enc["input_ids"], enc["attention_mask"])
        hsB = get_hidden_states(model_B, enc["input_ids"], enc["attention_mask"])

        for obj in layers:
            name = obj.name 
            idx = obj.index 

            x = flatten_pair_single_layer(hsA[idx], hsB[idx], enc['attention_mask', drop_bos=config['drop_bos']])
            x_cpu = x.detach().to("cpu")
            accum[name].append(x_cpu)
            rows_in_shard[name] += x_cpu.shape[0]

            if rows_in_shard[name] >= args.shard_rows:
                X = torch.cat(accum[name], dim=0).numpy()   # [M,2,D]
                write_shard(layer_dirs[name], shard_ids[name], X, {
                    **manifest, "which_layer": name, "which_index": idx
                })
                shard_ids[name] += 1
                rows_in_shard[name] = 0
                accum[name] = []


NameError: name 'ds' is not defined

In [None]:
# Flush remaining shards
for name, idx in layer_specs:
    if rows_in_shard[name] > 0 and len(accum[name]) > 0:
        X = torch.cat(accum[name], dim=0).numpy()
        write_shard(layer_dirs[name], shard_ids[name], X, {
            **manifest, "which_layer": name, "which_index": idx
        })

print(f"Done. Wrote activations to: {out_dir.resolve()}")
for name in layer_dirs:
    print(f"  Layer '{name}' dir: {layer_dirs[name]}")
print("Each shard has x with shape [N, 2, d_in] and meta indicating which layer.")
