<a href="https://colab.research.google.com/github/mahb97/like-as-though-not-a-benchmark/blob/main/LoRA_(PEFT_QLoRA)_.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**A) LoRA (PEFT/QLoRA) for CST category classification (sentence-level)**

In [None]:
!pip -q install "transformers>=4.44.0" "datasets>=2.20.0" "peft>=0.13.0" "accelerate>=0.33.0" "bitsandbytes>=0.43.1" evaluate -U

import os, json, ast, math, re, random
from pathlib import Path
import numpy as np
import pandas as pd
import torch
from datasets import Dataset, DatasetDict
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          Trainer, TrainingArguments, set_seed)
from peft import LoraConfig, get_peft_model, TaskType
import evaluate

ROOT = Path("/mnt/data/cst_v2_joyce_similes") if Path("/mnt/data").exists() else Path("/content/cst_v2_joyce_similes")
PROC = ROOT / "data" / "processed"
OUT  = ROOT / "outputs"
MODELS = ROOT / "models"
OUT.mkdir(parents=True, exist_ok=True); MODELS.mkdir(parents=True, exist_ok=True)

# processed input
CSV_PATH = PROC / "merged_all_processed.csv"
assert CSV_PATH.exists(), f"Missing {CSV_PATH}"
df = pd.read_csv(CSV_PATH)
df["sentence"] = df["sentence"].astype(str).str.strip()

label_col = None
for c in ["gold_category", "category", "extractor_label"]:
    if c in df.columns and df[c].dropna().astype(str).nunique() >= 2:
        label_col = c; break
assert label_col is not None, "No viable label column (need ≥2 classes in gold_category/category/extractor_label)."

if "story" not in df.columns:
    if "source_file" in df.columns:
        df["story"] = df["source_file"].astype(str)
    else:
        df["story"] = "ALL"

# label ids
df[label_col] = df[label_col].astype(str)
labels = sorted(df[label_col].unique())
label2id = {l:i for i,l in enumerate(labels)}
id2label = {i:l for l,i in label2id.items()}
df["label_id"] = df[label_col].map(label2id)

# Train/val split with grouped stratification
from sklearn.model_selection import GroupShuffleSplit
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, val_idx = next(gss.split(df, groups=df["story"]))
train_df = df.iloc[train_idx].reset_index(drop=True)
val_df   = df.iloc[val_idx].reset_index(drop=True)

# Tok
MODEL_NAME = "roberta-base"           # can swap to microsoft/deberta-v3-base
MAX_LEN    = 256
tok = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)

def to_hf(ds):
    return Dataset.from_pandas(ds[["sentence","label_id"]].rename(columns={"sentence":"text","label_id":"label"}))

train_ds = to_hf(train_df)
val_ds   = to_hf(val_df)
raw = DatasetDict({"train": train_ds, "validation": val_ds})

def tok_fn(batch):
    enc = tok(batch["text"], max_length=MAX_LEN, truncation=True, padding="max_length")
    enc["labels"] = batch["label"]
    return enc

tokd = raw.map(tok_fn, batched=True, remove_columns=raw["train"].column_names)

# Metrics
acc = evaluate.load("accuracy")
f1  = evaluate.load("f1")

def compute_metrics(p):
    preds = p.predictions.argmax(-1)
    return {
        "accuracy": acc.compute(predictions=preds, references=p.label_ids)["accuracy"],
        "macro_f1": f1.compute(predictions=preds, references=p.label_ids, average="macro")["f1"]
    }

# call my daughter LoRA
lora_cfg = LoraConfig(
    task_type=TaskType.SEQ_CLS,
    r=16, lora_alpha=32, lora_dropout=0.05,
    target_modules=["query","value","key","q_proj","v_proj","k_proj"]  # robust across RoBERTa/DeBERTa variants
)

# b model
model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=len(labels), id2label=id2label, label2id=label2id)
model = get_peft_model(model, lora_cfg)

# Optional: focal loss toggle
USE_FOCAL = True
GAMMA = 2.0

import torch.nn as nn
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, reduction="mean"):
        super().__init__()
        self.gamma = gamma
        self.reduction = reduction
        self.ce = nn.CrossEntropyLoss(reduction="none")
    def forward(self, logits, targets):
        ce = self.ce(logits, targets)
        pt = torch.softmax(logits, dim=-1).gather(1, targets.unsqueeze(1)).squeeze(1).clamp_min(1e-6)
        fl = (1 - pt) ** self.gamma * ce
        return fl.mean() if self.reduction=="mean" else fl.sum()

# Trainer args (QLoRA/4-bit works if GPU available)
out_dir = str(MODELS / "roberta_cst_lora")
args = TrainingArguments(
    output_dir=out_dir,
    learning_rate=1e-4,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=4,
    gradient_accumulation_steps=1,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_steps=50,
    weight_decay=0.01,
    warmup_ratio=0.1,
    load_best_model_at_end=True,
    metric_for_best_model="macro_f1",
    greater_is_better=True,
    fp16=torch.cuda.is_available(),
    bf16=False
)

class FocalTrainer(Trainer):
    def __init__(self, *args, use_focal=False, gamma=2.0, **kwargs):
        super().__init__(*args, **kwargs)
        self.use_focal = use_focal
        self.focal = FocalLoss(gamma=gamma)
        self.ce = nn.CrossEntropyLoss()
    def compute_loss(self, model, inputs, return_outputs=False):
        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits = outputs.logits
        loss = self.focal(logits, labels) if self.use_focal else self.ce(logits, labels)
        return (loss, outputs) if return_outputs else loss

trainer = FocalTrainer(
    model=model,
    args=args,
    train_dataset=tokd["train"],
    eval_dataset=tokd["validation"],
    tokenizer=tok,
    compute_metrics=compute_metrics,
    use_focal=USE_FOCAL, gamma=GAMMA
)

set_seed(42)
trainer.train()

# Save
trainer.save_model(out_dir)
tok.save_pretrained(out_dir)
(Path(out_dir) / "label2id.json").write_text(json.dumps(label2id, indent=2))
print("Saved:", out_dir)


**B) LoRA token-classification for comparator span BIO tags**

In [None]:
# Build BIO dataset
!pip -q install "transformers>=4.44.0" "peft>=0.13.0" "datasets>=2.20.0" -U

import json, ast, re
from pathlib import Path
import numpy as np
import pandas as pd
import torch
from datasets import Dataset, DatasetDict
from transformers import (AutoTokenizer, AutoModelForTokenClassification,
                          DataCollatorForTokenClassification, Trainer, TrainingArguments)
from peft import LoraConfig, get_peft_model, TaskType
from sklearn.model_selection import GroupShuffleSplit

ROOT = Path("/mnt/data/cst_v2_joyce_similes") if Path("/mnt/data").exists() else Path("/content/cst_v2_joyce_similes")
PROC = ROOT / "data" / "processed"
CSV_PATH = PROC / "merged_all_processed.csv"
assert CSV_PATH.exists()
df = pd.read_csv(CSV_PATH)
df["sentence"] = df["sentence"].astype(str).str.strip()

def parse_spans(x):
    if isinstance(x, list): return x
    if isinstance(x, str):
        try:
            v = ast.literal_eval(x)
            if isinstance(v, list): return v
        except Exception: pass
    return []

df["comparator_spans"] = df["comparator_spans"].apply(parse_spans)
if "story" not in df.columns:
    df["story"] = df.get("source_file","ALL")

MODEL_NAME = "roberta-base"
tok = AutoTokenizer.from_pretrained(MODEL_NAME, use_fast=True)
MAX_LEN = 256
label_list = ["O","B-CMP","I-CMP"]
label2id = {l:i for i,l in enumerate(label_list)}
id2label = {i:l for l,i in label2id.items()}

def bio_encode(row):
    text = row["sentence"]
    spans = [(s["start"], s["end"]) for s in row["comparator_spans"] if "start" in s and "end" in s]
    enc = tok(text, max_length=MAX_LEN, truncation=True, return_offsets_mapping=True)
    labels = np.zeros(len(enc["input_ids"]), dtype=int)  # all 'O'
    for (b,e) in spans:
        for i,(o_b,o_e) in enumerate(enc["offset_mapping"]):
            if o_b==o_e:  # special tokens
                continue
            # token overlaps span?
            if not (o_e <= b or o_b >= e):
                labels[i] = label2id["I-CMP"]
        inside = [i for i,(o_b,o_e) in enumerate(enc["offset_mapping"]) if (o_b<e and o_e>b and o_b!=o_e)]
        if inside:
            labels[inside[0]] = label2id["B-CMP"]
    enc.pop("offset_mapping")
    enc["labels"] = labels.tolist()
    return enc

# Train/val split by story
from sklearn.model_selection import GroupShuffleSplit
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
train_idx, val_idx = next(gss.split(df, groups=df["story"]))
train_df = df.iloc[train_idx].reset_index(drop=True)
val_df   = df.iloc[val_idx].reset_index(drop=True)

train_ds = Dataset.from_pandas(train_df[["sentence","comparator_spans","story"]])
val_ds   = Dataset.from_pandas(val_df[["sentence","comparator_spans","story"]])
raw = DatasetDict({"train": train_ds, "validation": val_ds})
tokd = raw.map(bio_encode, remove_columns=raw["train"].column_names)

data_collator = DataCollatorForTokenClassification(tok)

# Model + LoRA
model = AutoModelForTokenClassification.from_pretrained(MODEL_NAME, num_labels=len(label_list), id2label=id2label, label2id=label2id)
lora_cfg = LoraConfig(task_type=TaskType.TOKEN_CLS, r=16, lora_alpha=32, lora_dropout=0.05,
                      target_modules=["query","value","key","q_proj","v_proj","k_proj"])
model = get_peft_model(model, lora_cfg)

args = TrainingArguments(
    output_dir=str(ROOT / "models" / "roberta_cmp_bio_lora"),
    learning_rate=5e-5,
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    num_train_epochs=4,
    gradient_accumulation_steps=1,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    logging_steps=50,
    warmup_ratio=0.1,
    load_best_model_at_end=True,
    metric_for_best_model="eval_loss",
    fp16=torch.cuda.is_available()
)

def compute_token_metrics(p):
    # simple token-level F1 for CMP vs O
    from sklearn.metrics import f1_score
    preds = np.argmax(p.predictions, axis=-1).ravel()
    refs  = p.label_ids.ravel()
    mask  = refs != -100
    preds = preds[mask]; refs = refs[mask]
    cmp_idxs = [label2id["B-CMP"], label2id["I-CMP"]]
    preds_bin = np.isin(preds, cmp_idxs).astype(int)
    refs_bin  = np.isin(refs, cmp_idxs).astype(int)
    return {"cmp_token_f1": f1_score(refs_bin, preds_bin)}

trainer = Trainer(
    model=model,
    args=args,
    data_collator=data_collator,
    train_dataset=tokd["train"],
    eval_dataset=tokd["validation"],
    tokenizer=tok,
    compute_metrics=compute_token_metrics
)

trainer.train()
trainer.save_model(args.output_dir)
tok.save_pretrained(args.output_dir)
(Path(args.output_dir) / "bio_labels.json").write_text(json.dumps(label2id, indent=2))
print("Saved:", args.output_dir)


**C) Group-stratified evaluation, class weights & calibrated probs**

In [None]:
# GroupKFold eval with class weights + calibration for the classifier
!pip -q install "scikit-learn>=1.5.0"

import numpy as np, pandas as pd, json, ast, re
from pathlib import Path
from sklearn.model_selection import GroupKFold
from sklearn.metrics import classification_report
import torch
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from peft import LoraConfig, get_peft_model, TaskType

ROOT = Path("/mnt/data/cst_v2_joyce_similes") if Path("/mnt/data").exists() else Path("/content/cst_v2_joyce_similes")
PROC = ROOT / "data" / "processed"
CSV_PATH = PROC / "comprehensive_linguistic_analysis_corrected-7__processed.csv"
df["sentence"] = df["sentence"].astype(str).str.strip()

label_col = next(c for c in ["gold_category","category","extractor_label"] if c in df.columns and df[c].dropna().astype(str).nunique()>=2)
df[label_col] = df[label_col].astype(str)
labels = sorted(df[label_col].unique()) = AutoTokenizer.from_pretrained(MODEL_NAME)
MAX_LEN=256

def encode(df_sub):
    enc = tok(df_sub["sentence"].tolist(), max_length=MAX_LEN, padding=True, truncation=True)
    enc["labels"] = df_sub["label_id"].tolist()
    return Dataset.from_dict(enc)

gkf = GroupKFold(n_splits=5)
reports = []
for fold, (tr_idx, va_idx) in enumerate(gkf.split(df, y=df["label_id"], groups=df["story"])):
    tr, va = df.iloc[tr_idx], df.iloc[va_idx]
    ds_tr, ds_va = encode(tr), encode(va)

    model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME, num_labels=len(labels), id2label=id2label, label2id=label2id)
    lora_cfg = LoraConfig(task_type=TaskType.SEQ_CLS, r=16, lora_alpha=32, lora_dropout=0.05,
                          target_modules=["query","value","key","q_proj","v_proj","k_proj"])
    model = get_peft_model(model, lora_cfg)

    args = TrainingArguments(
        output_dir=str(ROOT / "models" / f"roberta_cst_lora_cv{fold}"),
        learning_rate=1e-4, per_device_train_batch_size=16, per_device_eval_batch_size=32,
        num_train_epochs=3, evaluation_strategy="epoch", save_strategy="no",
        logging_steps=100, warmup_ratio=0.1, fp16=torch.cuda.is_available()
    )
    from torch import nn
    class CETrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            labels = inputs.pop("labels")
            outputs = model(**inputs); logits = outputs.logits
            loss = nn.CrossEntropyLoss()(logits, labels)
            return (loss, outputs) if return_outputs else loss

    trnr = CETrainer(model=model, args=args, train_dataset=ds_tr, eval_dataset=ds_va, tokenizer=tok)
    trnr.train()

    # evaluate
    preds = trnr.predict(ds_va).predictions
    yhat = preds.argmax(-1)
    rep = classification_report(va["label_id"], yhat, target_names=labels, digits=3)
    reports.append(f"Fold {fold}\n{rep}")

(Path(ROOT/"outputs"/"groupkfold_reports.txt")).write_text("\n\n".join(reports))
print("Saved:", ROOT/"outputs"/"groupkfold_reports.txt")

Fit temperature on validation set

In [None]:
# After training in Section A, reuse: `trainer`, `tok`, `out_dir`, and your label names.

import torch, numpy as np
import torch.nn as nn
from torch.utils.data import DataLoader
from transformers import default_data_collator
import evaluate, json
from pathlib import Path

# Grab val logits/labels
eval_out = trainer.predict(trainer.eval_dataset)
val_logits = torch.tensor(eval_out.predictions)           # [N, C]
val_labels = torch.tensor(eval_out.label_ids).long()      # [N]

# Temperature scaler
class TemperatureScaler(nn.Module):
    def __init__(self, init_temp=1.0):
        super().__init__()
        self.temperature = nn.Parameter(torch.ones(1) * float(init_temp))
    def forward(self, logits):
        # clamp temperature to positive values for stability
        temp = torch.clamp(self.temperature, min=1e-6)
        return logits / temp

def fit_temperature(logits, labels, max_iter=500, lr=1e-2, verbose=True):
    logits = logits.detach()
    labels = labels.detach()
    scaler = TemperatureScaler(1.0)
    criterion = nn.CrossEntropyLoss()
    optim = torch.optim.LBFGS(scaler.parameters(), lr=0.5, max_iter=50) if logits.shape[0] > 2000 else torch.optim.AdamW(scaler.parameters(), lr=lr)

    def closure():
        optim.zero_grad()
        loss = criterion(scaler(logits), labels)
        loss.backward()
        return loss

    last = None
    if isinstance(optim, torch.optim.LBFGS):
        optim.step(closure)
        last = closure().item()
    else:
        for i in range(max_iter):
            loss = closure()
            optim.step()
            last = loss.item()
            if verbose and (i % 50 == 0):
                print(f"[fit T] step {i}  NLL={last:.4f}")
    if verbose:
        print("Fitted T =", float(torch.clamp(scaler.temperature, min=1e-6).item()), "final NLL=", last)
    return scaler

# Metrics
acc = evaluate.load("accuracy")
f1  = evaluate.load("f1")

def metrics_from_logits(logits, labels):
    probs = torch.softmax(torch.tensor(logits), dim=-1).numpy()
    preds = probs.argmax(-1)
    return {
        "accuracy": acc.compute(predictions=preds, references=labels)["accuracy"],
        "macro_f1": f1.compute(predictions=preds, references=labels, average="macro")["f1"]
    }

def expected_calibration_error(probs, labels, n_bins=15):
    probs = np.asarray(probs)
    conf = probs.max(axis=1)
    preds = probs.argmax(axis=1)
    labels = np.asarray(labels)
    ece = 0.0
    bins = np.linspace(0.0, 1.0, n_bins+1)
    for b0, b1 in zip(bins[:-1], bins[1:]):
        mask = (conf > b0) & (conf <= b1)
        if not np.any(mask):
            continue
        acc_bin = (preds[mask] == labels[mask]).mean()
        conf_bin = conf[mask].mean()
        ece += (mask.mean()) * abs(acc_bin - conf_bin)
    return float(ece)

# Pre-calibration
pre = metrics_from_logits(val_logits, val_labels.numpy())
pre_ece = expected_calibration_error(torch.softmax(val_logits, -1).numpy(), val_labels.numpy())
print("Pre-calibration:", pre, "ECE≈", round(pre_ece, 4))

# Fit T
scaler = fit_temperature(val_logits, val_labels, lr=1e-2, verbose=True)

# Post-calibration
post_probs = torch.softmax(scaler(val_logits), dim=-1).numpy()
post_preds = post_probs.argmax(-1)
post = {
    "accuracy": acc.compute(predictions=post_preds, references=val_labels.numpy())["accuracy"],
    "macro_f1": f1.compute(predictions=post_preds, references=val_labels.numpy(), average="macro")["f1"]
}
post_ece = expected_calibration_error(post_probs, val_labels.numpy())
print("Post-calibration:", post, "ECE≈", round(post_ece, 4))

# Save T with the model
T_value = float(torch.clamp(scaler.temperature, min=1e-6).item())
calib_path = Path(out_dir) / "temperature.json"
calib_path.write_text(json.dumps({"temperature": T_value}, indent=2))
print("Saved temperature to:", calib_path)


calibrated model for inference

In [None]:
import json, torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForSequenceClassification

def load_temperature(model_dir):
    p = Path(model_dir) / "temperature.json"
    if p.exists():
        return json.loads(p.read_text()).get("temperature", 1.0)
    return 1.0

def predict_with_calibration(model_dir, texts, max_len=256, batch_size=32, device=None):
    device = device or ("cuda" if torch.cuda.is_available() else "cpu")
    tok = AutoTokenizer.from_pretrained(model_dir, use_fast=True)
    mdl = AutoModelForSequenceClassification.from_pretrained(model_dir).to(device).eval()
    T = load_temperature(model_dir)
    all_probs, all_preds = [], []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        enc = tok(batch, padding=True, truncation=True, max_length=max_len, return_tensors="pt").to(device)
        with torch.no_grad():
            logits = mdl(**enc).logits
            logits = logits / max(T, 1e-6)  # apply temperature
            probs = torch.softmax(logits, dim=-1).cpu().numpy()
            preds = probs.argmax(-1)
        all_probs.append(probs); all_preds.append(preds)
    return np.vstack(all_probs), np.concatenate(all_preds)