# 06 ¬∑ Core Attacks

Compute loss/confidence, Win-k, and label-free scores using Drive-backed features.

In [None]:
# Persistent Drive + run mode setup
import os
import sys
from pathlib import Path

try:
    from google.colab import drive  # type: ignore
    DRIVE_MOUNT = Path('/content/drive')
    if not DRIVE_MOUNT.exists():
        drive.mount('/content/drive')
except Exception as exc:  # pragma: no cover
    print(f'Colab drive mount skipped: {exc}')

if Path('/content/drive').exists():
    DRIVE_ROOT = Path('/content/drive/MyDrive').resolve()
else:
    DRIVE_ROOT = Path.home().resolve()

PROJECT_ROOT = DRIVE_ROOT / 'secure-llm-mia'
if not PROJECT_ROOT.exists():
    raise FileNotFoundError('Run 00_colab_setup.ipynb first to clone the repo on Drive.')

if str(PROJECT_ROOT) not in sys.path:
    sys.path.append(str(PROJECT_ROOT))

os.environ['SECURE_LLM_MIA_ROOT'] = str(PROJECT_ROOT)
os.chdir(PROJECT_ROOT)

from src.utils.runtime import current_run_mode

RUN_MODE = current_run_mode()
print('PROJECT_ROOT:', PROJECT_ROOT)
print('Active run mode:', RUN_MODE.name, '-', RUN_MODE.description)

DATA_ROOT = PROJECT_ROOT / 'data'
ARTIFACTS_DIR = PROJECT_ROOT / 'artifacts'
CHECKPOINT_ROOT = PROJECT_ROOT / 'checkpoints'
for path in (DATA_ROOT, ARTIFACTS_DIR, CHECKPOINT_ROOT):
    path.mkdir(parents=True, exist_ok=True)

BHC_DATA_DIR = DRIVE_ROOT / 'mimic-iv-bhc'
BHC_DATA_DIR.mkdir(parents=True, exist_ok=True)
BHC_CSV_PATH = BHC_DATA_DIR / 'mimic-iv-bhc.csv'
print('BHC CSV path:', BHC_CSV_PATH)


In [None]:
import json
from pathlib import Path

import numpy as np
import pandas as pd
from datasets import Dataset

from src.eval.metrics import auc_metrics, expected_calibration_error, tpr_at_fpr

SLICE_ID = 1
TRACK = 'noreplay'
FEATURES_PATH = PROJECT_ROOT / 'reports' / 'features' / f'features_slice_{SLICE_ID}_{TRACK}_{RUN_MODE.name}.parquet'
if not FEATURES_PATH.exists():
    raise FileNotFoundError('Feature parquet missing. Run notebook 05 to generate it.')

features = Dataset.from_parquet(str(FEATURES_PATH))
example_ids = features['example_id']
labels = np.array(features['label'], dtype=int)

if len(labels) == 0:
    raise ValueError('No evaluation records found in feature dataset.')

nll_sequences = [np.array(seq, dtype=float) for seq in features['token_nll']]
entropy_sequences = [np.array(seq, dtype=float) for seq in features['token_entropy']]
max_prob_sequences = [np.array(seq, dtype=float) for seq in features['token_max_prob']]
win_sequences = {
    key: [np.array(seq, dtype=float) for seq in features[key]]
    for key in features.column_names
    if key.startswith('win@')
}

def ragged_mean(seqs):
    return np.array([seq.mean() if len(seq) else np.nan for seq in seqs])

def worst_percent_loss(seqs, percent):
    stats = []
    for seq in seqs:
        if len(seq) == 0:
            stats.append(np.nan)
            continue
        k = max(1, int(np.ceil(len(seq) * percent)))
        sorted_seq = np.sort(seq)
        stats.append(sorted_seq[-k:].mean())
    return np.array(stats)

mean_nll = ragged_mean(nll_sequences)
mean_entropy = ragged_mean(entropy_sequences)
mean_max_prob = ragged_mean(max_prob_sequences)

win_features = {
    f'win_frac_{key.split('@')[1]}': ragged_mean(seqs)
    for key, seqs in win_sequences.items()
}

feature_df = pd.DataFrame({
    'example_id': example_ids,
    'label': labels,
    'mean_nll': mean_nll,
    'entropy': mean_entropy,
    'max_prob': mean_max_prob,
    **win_features,
    'min_loss_top_5pct': worst_percent_loss(nll_sequences, 0.05),
    'min_loss_top_10pct': worst_percent_loss(nll_sequences, 0.10),
})

feature_df_path = FEATURES_PATH.with_name(f'features_core_slice_{SLICE_ID}_{TRACK}_{RUN_MODE.name}.parquet')
feature_df.to_parquet(feature_df_path, index=False)
print('Saved aggregated features to', feature_df_path)

scores = -feature_df['mean_nll'].to_numpy()
auc_info = auc_metrics(labels, scores)
tpr = tpr_at_fpr(labels, scores, target_fpr=0.01)
calib = expected_calibration_error(labels, np.clip(feature_df['max_prob'].to_numpy(), 1e-6, 1 - 1e-6))
print('AUC:', auc_info['auc'])
print('TPR@1%FPR:', tpr)
print('ECE:', calib.ece)

metrics = {
    'slice_id': SLICE_ID,
    'track': TRACK,
    'run_mode': RUN_MODE.name,
    'auc': auc_info['auc'],
    'tpr@1%fpr': tpr,
    'ece': calib.ece,
}
metrics_path = PROJECT_ROOT / 'reports' / f'metrics_core_slice_{SLICE_ID}_{TRACK}_{RUN_MODE.name}.json'
metrics_path.write_text(json.dumps(metrics, indent=2))
print('Saved metrics to', metrics_path)


üìù Store aggregated metrics under `reports/metrics_core_slice_t.json` and keep member/non-member IDs synced from Drive artifacts.