# Measurement Tampering Demo

This is a simple implementation of probing for evidence of tamper.

First, load and tokenize the data

In [6]:
import os

from datasets import load_dataset
from transformers import AutoTokenizer
import torch
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
from transformers import CodeGenForCausalLM

os.environ["TOKENIZERS_PARALLELISM"] = "false"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

dataset = load_dataset("redwoodresearch/diamonds-seed0", "s0") # load the first seed

max_train_samples = 1000 # This is very low! In the paper we use the 25k points for 5 epochs
max_val_samples = 200

class MeasurementDataset(Dataset):
    max_length = 1024
    
    def __init__(self, dataset, tokenizer):
        self.measurements = torch.tensor(dataset["measurements"]) # (batch, nb_sensors=3)
        self.ground_truth = torch.tensor(dataset["is_correct"]) # (batch,)
        self.is_trusted = torch.tensor(dataset["is_clean"]) # (batch,)
        
        print(f"Tokenizing {len(dataset['text'])} examples...")
        self.tokenized_text = tokenizer(
            dataset["text"],
            max_length=self.max_length,
            padding="max_length",
            return_tensors="pt",
        )
    
    def __getitem__(self, index):
        return {
            "input_ids": self.tokenized_text.input_ids[index],
            "attention_mask": self.tokenized_text.attention_mask[index],
            "ground_truth": self.ground_truth[index],
            "measurements": self.measurements[index],
        }
    
    def __len__(self):
        return len(self.ground_truth)

model_name = "Salesforce/codegen-350M-mono"
tokenizer = AutoTokenizer.from_pretrained(model_name)
tokenizer.pad_token = tokenizer.eos_token
tokenizer.padding_side = "left" # pad on the left side so that the end of the sequence is always at the same position (for simplicity)

train_ds = MeasurementDataset(dataset["train"][:max_train_samples], tokenizer)
val_ds = MeasurementDataset(dataset["validation"][:max_val_samples], tokenizer)
print("Done!")

Using custom data configuration redwoodresearch--diamonds-seed0-13453d0c0f0868b1
Found cached dataset parquet (/home/ubuntu/.cache/huggingface/datasets/redwoodresearch___parquet/redwoodresearch--diamonds-seed0-13453d0c0f0868b1/0.0.0/2a3b91fbd88a2c90d1dbbb32b460cf621d31bd5b05b934492fdef7d8d6f236ec)


  0%|          | 0/2 [00:00<?, ?it/s]

Tokenizing 1000 examples...
Tokenizing 200 examples...
Done!


Now train a measurement predictor

In [7]:
class ModelWithProbe(torch.nn.Module):
    def __init__(self, model: CodeGenForCausalLM, probe: torch.nn.Module):
        super().__init__()
        self.model = model
        self.probe = probe
    
    def embed(self, *args, **kwargs):
        return self.model.transformer(*args, **kwargs).last_hidden_state[:, -1, :]
    
    def forward(self, *args, **kwargs):
        last_position_activations = self.embed(*args, **kwargs)
        return self.probe(last_position_activations)
    
model = CodeGenForCausalLM.from_pretrained(model_name).to(device)
measurement_predictor = ModelWithProbe(model, torch.nn.Linear(model.config.hidden_size, 3).to(device))

In [8]:
optimizer = torch.optim.AdamW(measurement_predictor.parameters(), lr=2e-5)
data_loader = DataLoader(train_ds, batch_size=16, shuffle=True, num_workers=4)

pbar = tqdm(data_loader)
for batch in pbar:
    optimizer.zero_grad()
    tokens = {"input_ids": batch["input_ids"].to(device), "attention_mask": batch["attention_mask"].to(device)}
    labels = batch["measurements"].float().to(device)
    
    preds = measurement_predictor(**tokens)
    loss = torch.nn.functional.binary_cross_entropy_with_logits(preds, labels)
    
    loss.backward()
    optimizer.step()
    
    pbar.set_postfix({"loss": loss.item()})

100%|██████████| 63/63 [02:21<00:00,  2.24s/it, loss=1.06] 


Extract the last layer's activations

In [9]:
import numpy as np

embeds = {}
with torch.no_grad():
    for split, ds in [("train", train_ds), ("val", val_ds)]:
        embeds[split] = []
        dataloader = DataLoader(ds, batch_size=32, num_workers=4)
        for batch in tqdm(dataloader):
            tokens = {"input_ids": batch["input_ids"].to(device), "attention_mask": batch["attention_mask"].to(device)}
            embeds[split].append(measurement_predictor.embed(**tokens).detach().cpu().numpy())

train_embeds = np.concatenate(embeds["train"])
val_embeds = np.concatenate(embeds["val"])

100%|██████████| 32/32 [00:48<00:00,  1.52s/it]
100%|██████████| 7/7 [00:10<00:00,  1.56s/it]


Train both a ground truth classifier and a probe for evidence of tamper

In [10]:
from sklearn.linear_model import LogisticRegression

def get_untrusted_positive_mask(ds: MeasurementDataset) -> torch.Tensor:
    return ~ds.is_trusted & ds.measurements.all(-1)

train_gt_mask = get_untrusted_positive_mask(train_ds).numpy()
train_gt_embeds = train_embeds[train_gt_mask]
train_gt_labels = train_ds.ground_truth.numpy()[train_gt_mask]

gt_classifier = LogisticRegression(max_iter=1000)
gt_classifier.fit(train_gt_embeds, train_gt_labels)

# 1 = concistent measurments, 0 = inconsistent measurements
z = (train_ds.measurements.all(-1) | ~train_ds.measurements.any(-1))
train_evidence_mask = (train_ds.is_trusted | ~z).numpy()
train_evidence_embeds = train_embeds[train_evidence_mask]
train_evidence_labels = z.numpy()[train_evidence_mask]

evidence_classifier = LogisticRegression(max_iter=1000)
evidence_classifier.fit(train_evidence_embeds, train_evidence_labels)
print("Done training classifiers")

Done training classifiers


Evaluate them

In [11]:
from sklearn.metrics import roc_auc_score

# Only evaluate real positive vs fake positive
val_positive_mask = get_untrusted_positive_mask(val_ds).numpy()
val_positive_embeds = val_embeds[val_positive_mask]
val_positive_labels = val_ds.ground_truth.numpy()[val_positive_mask]

for classifier_name, classifier in [("using ground truth labels", gt_classifier),
                                    ("using evidence of tamper (without ground truth labels)", evidence_classifier)]:
    classifier: LogisticRegression
    pred_probs = classifier.predict_proba(val_positive_embeds)[:, 1]
    auroc = roc_auc_score(val_positive_labels, pred_probs)
    print(f"Real vs Fake AUROC {classifier_name}: {auroc:.2f}")

Real vs Fake AUROC using ground truth labels: 0.86
Real vs Fake AUROC using evidence of tamper (without ground truth labels): 0.77
