# Calibrate probes based on s1K trajectories

Pretrained probes and calibrated decision thresholds may be downloaded [here](https://figshare.com/articles/dataset/s1K_calibrated_probes/29242328). These files should be placed under `PROBE_DIR`.

Code is provided for reproducibility.

In [1]:
import os
import glob
import json
import pickle

from collections import Counter, deque

import numpy as np

from scipy.stats import binom

from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler

from sklearn.metrics import roc_auc_score

## Preliminaries

In [2]:
PROBE_DATA_DIR = "../probes/data"  # LLM embeddings
PROBE_DIR = "../probes"  # you can customize

# this should be updated with where your outputs are saved
model_to_folder = {
    "qwen2.5": "../outputs",
    "qwq": "../outputs-qwq",
    "llama3.3": "../outputs-llama"
}

# you can modify code to loop through instead if you wish
MODEL = "qwen2.5"    # qwen2.5|qwq|llama3.3
MODE = "supervised"  # supervised|consistent|novel|leaf

Our s1K splits

In [3]:
splits = {
    "train": range(500),
    "val": range(500, 550),
    "test": range(550, 1000)
}

## Functions for calibration

In [4]:
def load_probe_inputs(model):
    with open(os.path.join(PROBE_DATA_DIR, f"{model}_embed_steps.pkl"), "rb") as f:
        reps = pickle.load(f)
    return reps


def load_probe_labels(model, mode="supervised"):
    """
    mode  (str) supervised|consistent|novel|leaf
    """
    # novel and leaf labels are not dependent on model
    if mode in ["supervised", "consistent"]:
        fp_labels = f"labels-{mode}-{model}.json"
    else:
        fp_labels = f"labels-{mode}.json"
    # load JSON
    with open(os.path.join(PROBE_DATA_DIR, fp_labels)) as f:
        labels = json.load(f)
        index = labels["index"]
        label = labels["label"]
    assert len(index) == len(label)
    # transform label to cumulative for supervised|consistent
    # for calibration validity
    if mode in ["supervised", "consistent"]:
        to_cumulative(index, label)
    return index, label

def to_cumulative(index, label):
    """
    applied for supervised and consistent probe.
    modifies `label` and `index` in place.
    """
    for i, lbl in enumerate(label):
        if 1 not in lbl:
            label[i] = []  # skip
            index[i] = []
            continue
        first = lbl.index(1)
        for j in range(first, len(lbl)):
            lbl[j] = 1

In [5]:
def binom_p(loss, eps):
    """
    Binomial tails p-value
    """
    p_value = binom.cdf(k=np.sum(loss), n=len(loss), p=eps)
    return p_value


def get_loss(pred, true, lam):
    """
    "Loss" in the learn-then-test sense
    """
    pred_bin = [1 if p >= lam else 0 for p in pred]
    if 1 not in pred_bin:
        return 1 - true[-1]
    idx = min(pred_bin.index(1), len(true) - 1)
    return 1 - true[idx]


def run_test(preds, trues, eps, bins=10000, loss_f=get_loss):
    """
    Fixed sequence testing procedure
    """
    lambda_range = [1 - i / bins for i in range(bins)]
    for lam in lambda_range:
        loss = [loss_f(p, t, lam) for p, t in zip(preds, trues)]
        pval = binom_p(loss, eps)
        if pval > eps:
            break
    return lam


def smooth(pred, window=1):
    """
    Rolling window for smoothing
    """
    queue = deque()
    pred_smooth = []
    for p in pred:
        queue.append(p)
        if len(queue) > window:
            queue.popleft()
        pred_smooth.append(np.mean(queue))
    return pred_smooth

## Supervised and consistent

In [6]:
def get_lambdas(model, mode,
                eps=[0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.5],
                batch_size=10):
    """
    Compute thresholds (lambdas)

    model       (str) supervised|consistent only. see below for novel and leaf
    eps         (list[float]) risk tolerance
    batch_size  (int) should match generate_truncated_prompts
    """
    # load embeddings
    step_embeddings = load_probe_inputs(model)
    # load model and labels
    with open(os.path.join(PROBE_DIR, f"probe-{mode}-{model}.pkl"), "rb") as f:
        lr, scaler, pca = pickle.load(f)
    index, label = load_probe_labels(model, mode)
    # make predictions
    ebs_to_keep = [step_embeddings[i] for i in splits["test"] if len(label[i]) > 0]
    preds = [lr.predict_proba(pca.transform(scaler.transform(ebds)))[:,1] for ebds in ebs_to_keep]
    preds = [smooth(p, window=10) for p in preds]
    # expand labels to step-wise
    trues = []
    labels_to_keep = [label[i] for i in splits["test"] if len(label[i]) > 0]
    for i, lbl in enumerate(labels_to_keep):
        lbl = [item for item in lbl for _ in range(batch_size)]
        lbl = lbl[:len(preds[i])]  # trim off excess
        trues.append(lbl)
    
    lambdas = {}
    for ep in eps:
        lam = run_test(preds, trues, ep, loss_f=get_loss)
        lambdas[ep] = lam
    return lambdas

This takes about 1-2 minutes per model, for the default eps range.

In [7]:
for mode in ["supervised", "consistent"]:
    for model in model_to_folder:
        fp_out = os.path.join(PROBE_DIR, f"lambdas-{model}-{MODE}.json")
        if os.path.exists(fp_out):
            continue
        lambdas = get_lambdas(model, MODE)
        with open(fp_out, "w") as f:
            json.dump(lambdas, f)
        print(fp_out, lambdas)

## Novel leaf

In [8]:
def get_lambdas_boring(model,
                       eps=[0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.35, 0.4, 0.5],
                       batch_size=10):
    """
    Compute thresholds (lambdas) for novel leaf probe

    model       (str) supervised|consistent only. see below for novel and leaf
    eps         (list[float]) risk tolerance
    batch_size  (int) should match generate_truncated_prompts
    """
    # load embeddings
    step_embeddings = load_probe_inputs(model)

    # load model
    with open(os.path.join(PROBE_DIR, f"probe-leaf-{model}.pkl"), "rb") as f:
        lr_leaf, scaler_leaf, pca_leaf = pickle.load(f)
    with open(os.path.join(PROBE_DIR, f"probe-novel-{model}.pkl"), "rb") as f:
        lr_novel, scaler_novel, pca_novel = pickle.load(f)
    # load consistency labels
    index, label = load_probe_labels(model, "consistent")
    # make predictions
    ebs_to_keep = [step_embeddings[i] for i in splits["test"] if len(label[i]) > 1]
    p_boring_leaf = []
    for cur_reps in ebs_to_keep:
        # p(leaf)
        leaf_preds = lr_leaf.predict_proba(pca_leaf.transform(scaler_leaf.transform(cur_reps)))[:,1]
        # p(novel)
        cur_reps_stacked = np.concatenate([cur_reps[1:], cur_reps[:-1]], axis=1)  # look back
        novel_preds = lr_novel.predict_proba(pca_novel.transform(scaler_novel.transform(cur_reps_stacked)))[:,1]
        # p_boring_leaf = p(leaf) * (1 - p(novel))
        p_boring = leaf_preds[1:] * (1 - novel_preds)
        p_boring_leaf.append(smooth(p_boring, window=10))

    # expand labels to step-wise
    trues = []
    labels_to_keep = [label[i] for i in splits["test"] if len(label[i]) > 1]
    for i, lbl in enumerate(labels_to_keep):
        lbl = [item for item in lbl for _ in range(10)]
        lbl = lbl[1:len(p_boring_leaf[i])+1]  # trim off first and excess
        trues.append(lbl)
    
    lambdas = {}
    for ep in eps:
        lam = run_test(p_boring_leaf, trues, ep, loss_f=get_loss)
        lambdas[ep] = lam

    return lambdas

This takes ~4 minutes per model because there are two probes

In [9]:
MODE = "boring"
for model in model_to_folder:
    fp_out = os.path.join(PROBE_DIR, f"lambdas-{model}-{MODE}.json")
    if os.path.exists(fp_out):
        continue
    lambdas = get_lambdas_boring(model)
    with open(fp_out, "w") as f:
        json.dump(lambdas, f)
    print(fp_out, lambdas)