# Multi-Head Diversity: PGD/FGSM & Certified Robustness

**Worst-case adversarial attacks** (FGSM, PGD) and **certified robustness** evaluation.

**Model**: Multi-Head Diversity only.

**Attacks**:
- **FGSM**: Fast Gradient Sign Method – one-step perturbation to maximize MSE
- **PGD**: Projected Gradient Descent – iterative worst-case attack

**Certified**: Gradient-norm-based upper bound on worst-case prediction change.

In [None]:
USE_SMALL_DATA = True
N_SAMPLES = 5000
N_EPOCHS = 30
PGD_STEPS = 10
PGD_ALPHA = 0.1  # Step size per PGD iteration

In [None]:
try:
    from google.colab import drive
    drive.mount('/content/drive', force_remount=False)
    print('Google Drive mounted.')
except Exception:
    pass

In [None]:
import sys, os
from pathlib import Path
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import random
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, r2_score
import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings('ignore')

RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)
torch.manual_seed(RANDOM_SEED)

def _find_repo_root():
    cwd = Path.cwd().resolve()
    for p in [Path('/content/drive/MyDrive/multihead-attention-robustness'), Path('/content/drive/My Drive/multihead-attention-robustness'), Path('/content/repo_run')]:
        if p.exists() and (p / 'src').exists():
            return p
    drive_root = Path('/content/drive')
    if drive_root.exists():
        for base in [drive_root / 'MyDrive', drive_root / 'My Drive', drive_root]:
            if base.exists():
                for sub in base.iterdir():
                    if sub.is_dir() and 'multihead-attention' in sub.name.lower() and (sub / 'src').exists():
                        return sub
    p = cwd
    for _ in range(10):
        if (p / 'src').exists():
            return p
        if p.parent == p:
            break
        p = p.parent
    return cwd.parent if cwd.name == 'notebooks' else cwd

repo_root = _find_repo_root()
if not (repo_root / 'src').exists():
    raise FileNotFoundError(f"Repo root not found. Run Drive mount first.")
sys.path.insert(0, str(repo_root))
os.chdir(repo_root)
from src.models.feature_token_transformer import FeatureTokenTransformer

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}, Repo: {repo_root}')

## 1. Load Data

In [None]:
data_path = repo_root / 'data' / 'cross_sectional' / 'master_table.csv'
if not data_path.exists():
    data_path = repo_root / 'data' / 'master_table.csv'
df = pd.read_csv(data_path)
if 'date' in df.columns:
    df['date'] = pd.to_datetime(df['date'])
    df = df.set_index('date')

class CrossSectionalDataSplitter:
    def __init__(self, train_start='2005-01-01', train_end='2017-12-31', val_start='2018-01-01', val_end='2019-12-31'):
        self.train_start, self.train_end = train_start, train_end
        self.val_start, self.val_end = val_start, val_end
    def split(self, master_table):
        master_table = master_table.copy()
        master_table.index = pd.to_datetime(master_table.index)
        return {'train': master_table.loc[self.train_start:self.train_end], 'val': master_table.loc[self.val_start:self.val_end]}
    def prepare_features_labels(self, data):
        if data.empty:
            return pd.DataFrame(), pd.Series()
        numeric_data = data.select_dtypes(include=[np.number])
        if numeric_data.empty:
            return pd.DataFrame(), pd.Series()
        exclude_cols = ['mktcap', 'market_cap', 'date', 'year', 'month', 'ticker', 'permno', 'gvkey']
        target_cols = ['return', 'returns', 'ret', 'target', 'y', 'next_return', 'forward_return', 'ret_1', 'ret_1m', 'ret_12m', 'future_return', 'returns_1d']
        target_col = None
        for tc in target_cols:
            for col in numeric_data.columns:
                if tc.lower() in col.lower() and col.lower() not in [ec.lower() for ec in exclude_cols]:
                    target_col = col
                    break
            if target_col:
                break
        if target_col is None:
            potential = [c for c in numeric_data.columns if c.lower() not in [ec.lower() for ec in exclude_cols]]
            target_col = potential[-2] if len(potential) > 1 else (potential[-1] if potential else numeric_data.columns[-1])
        feature_cols = [c for c in numeric_data.columns if c != target_col and c.lower() not in [ec.lower() for ec in exclude_cols]]
        if not feature_cols:
            feature_cols = [c for c in numeric_data.columns if c != target_col]
        if not feature_cols:
            feature_cols = numeric_data.columns[:-1].tolist()
            target_col = numeric_data.columns[-1]
        return numeric_data[feature_cols], numeric_data[target_col]

splitter = CrossSectionalDataSplitter()
data_splits = splitter.split(df)
train_df, val_df = data_splits['train'], data_splits['val']
X_train_df, y_train = splitter.prepare_features_labels(train_df)
X_val_df, y_val = splitter.prepare_features_labels(val_df)
X_train = X_train_df.fillna(0).values.astype(np.float32)
y_train = y_train.fillna(0).values.astype(np.float32)
X_val = X_val_df.fillna(0).values.astype(np.float32)
y_val = y_val.fillna(0).values.astype(np.float32)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
if USE_SMALL_DATA and N_SAMPLES < len(X_train):
    idx = np.random.RandomState(RANDOM_SEED).choice(len(X_train), N_SAMPLES, replace=False)
    X_train_scaled, y_train = X_train_scaled[idx], y_train[idx]

sigma_train = np.std(X_train_scaled, axis=0) + 1e-8
print(f'Data: train {X_train_scaled.shape[0]}, val {X_val_scaled.shape[0]}, features {X_train_scaled.shape[1]}')

## 2. FGSM & PGD Attacks

In [None]:
def fgsm_attack(model, X, y, epsilon, sigma, device, targeted=False, batch_size=512):
    """FGSM: x_adv = x + epsilon * sigma * sign(grad_x MSE)."""
    model.eval()
    X_adv_list = []
    sigma_t = torch.FloatTensor(np.array(sigma, dtype=np.float32)).to(device)
    for i in range(0, len(X), batch_size):
        X_b = torch.FloatTensor(X[i:i+batch_size]).to(device).requires_grad_(True)
        y_b = torch.FloatTensor(y[i:i+batch_size]).to(device)
        pred, _ = model(X_b)
        loss = nn.MSELoss()(pred.squeeze(), y_b)
        model.zero_grad()
        loss.backward()
        grad = X_b.grad.detach()
        sign_grad = torch.sign(grad)
        if targeted:
            sign_grad = -sign_grad
        delta = epsilon * sigma_t * sign_grad
        X_adv_list.append((X_b.detach() + delta).cpu().numpy())
    return np.vstack(X_adv_list)

def pgd_attack(model, X, y, epsilon, sigma, steps, alpha, device, targeted=False, batch_size=512):
    """PGD: iterative FGSM with L_inf projection to epsilon*sigma ball."""
    X_adv = X.copy().astype(np.float32)
    sigma_arr = np.array(sigma, dtype=np.float32)
    for _ in range(steps):
        for i in range(0, len(X_adv), batch_size):
            X_t = torch.FloatTensor(X_adv[i:i+batch_size]).to(device).requires_grad_(True)
            y_b = torch.FloatTensor(y[i:i+batch_size]).to(device)
            model.eval()
            pred, _ = model(X_t)
            loss = nn.MSELoss()(pred.squeeze(), y_b)
            model.zero_grad()
            loss.backward()
            grad = X_t.grad.detach().cpu().numpy()
            sign_grad = np.sign(grad)
            if targeted:
                sign_grad = -sign_grad
            delta = X_adv[i:i+batch_size] - X[i:i+batch_size].astype(np.float32)
            delta = delta + alpha * sigma_arr * sign_grad
            delta = np.clip(delta, -epsilon * sigma_arr, epsilon * sigma_arr)
            X_adv[i:i+batch_size] = X[i:i+batch_size].astype(np.float32) + delta
    return X_adv

## 3. Train Multi-Head Diversity

In [None]:
def train_mhd(model, X_train, y_train, X_val, y_val, config, device):
    model = model.to(device)
    criterion = nn.MSELoss()
    opt = torch.optim.Adam(model.parameters(), lr=config['learning_rate'])
    X_t = torch.FloatTensor(X_train).to(device)
    y_t = torch.FloatTensor(y_train).to(device)
    X_v = torch.FloatTensor(X_val).to(device)
    batch_size = config['batch_size']
    for epoch in range(config['epochs']):
        model.train()
        for i in range(0, len(X_t), batch_size):
            bx, by = X_t[i:i+batch_size], y_t[i:i+batch_size]
            opt.zero_grad()
            pred, attn = model(bx)
            loss = criterion(pred.squeeze(), by)
            if model.use_head_diversity and attn:
                loss = loss + model.compute_diversity_loss([attn[f'layer_{j}'] for j in range(len(attn))])
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            opt.step()
        if (epoch + 1) % 20 == 0:
            model.eval()
            with torch.no_grad():
                p, _ = model(X_v)
                vloss = criterion(p.squeeze(), torch.FloatTensor(y_val).to(device))
            print(f'  Epoch {epoch+1}: val_loss={vloss.item():.6f}')
    model.eval()
    with torch.no_grad():
        pred, _ = model(X_v)
        pred = pred.squeeze().cpu().numpy()
    return model, pred

tr_cfg = {'d_model': 72, 'num_layers': 2, 'd_ff': 512, 'dropout': 0.1, 'learning_rate': 0.0001, 'batch_size': 32, 'epochs': N_EPOCHS if USE_SMALL_DATA else 100}
model = FeatureTokenTransformer(num_features=X_train_scaled.shape[1], d_model=tr_cfg['d_model'], num_heads=8, num_layers=tr_cfg['num_layers'], d_ff=tr_cfg['d_ff'], dropout=tr_cfg['dropout'], use_head_diversity=True, diversity_weight=0.01)
model, pred_clean = train_mhd(model, X_train_scaled, y_train, X_val_scaled, y_val, tr_cfg, device)

clean_rmse = np.sqrt(mean_squared_error(y_val, pred_clean))
clean_r2 = r2_score(y_val, pred_clean)
print(f'Clean: RMSE={clean_rmse:.6f}, R²={clean_r2:.6f}')

## 4. Evaluate Under FGSM & PGD

In [None]:
EPSILONS = [0.25, 0.5, 1.0]
results = []

for eps in EPSILONS:
    X_fgsm = fgsm_attack(model, X_val_scaled, y_val, eps, sigma_train, device)
    X_pgd = pgd_attack(model, X_val_scaled, y_val, eps, sigma_train, PGD_STEPS, PGD_ALPHA, device)
    
    with torch.no_grad():
        pred_fgsm = model(torch.FloatTensor(X_fgsm).to(device))[0].squeeze().cpu().numpy()
        pred_pgd = model(torch.FloatTensor(X_pgd).to(device))[0].squeeze().cpu().numpy()
    
    for name, pred in [('FGSM', pred_fgsm), ('PGD', pred_pgd)]:
        rmse = np.sqrt(mean_squared_error(y_val, pred))
        r2 = r2_score(y_val, pred)
        delta_rmse = rmse - clean_rmse
        rob_rmse = min(1.0, 1.0 - delta_rmse / clean_rmse) if clean_rmse > 0 else 1.0
        rob_r2 = min(1.0, r2 / clean_r2) if clean_r2 > 0 else 1.0
        results.append({'attack': name, 'epsilon': eps, 'rmse': rmse, 'r2': r2, 'delta_rmse': delta_rmse, 'rob_rmse': rob_rmse, 'rob_r2': rob_r2})

results_df = pd.DataFrame(results)
print(results_df.to_string())

## 5. Certified Robustness (Gradient-Norm Bound)

In [None]:
def certified_bound_gradient_norm(model, X, y, epsilon, sigma, device, batch_size=256):
    """Upper bound on |f(x+δ)-f(x)|: |δ|≤ε*σ implies |f(x+δ)-f(x)| ≤ ε * ||σ|| * ||∇f(x)|| (L2)."""
    model.eval()
    grad_norms = []
    pred_clean_list = []
    n = len(X)
    for i in range(0, n, batch_size):
        X_b = torch.FloatTensor(X[i:i+batch_size]).to(device).requires_grad_(True)
        y_b = torch.FloatTensor(y[i:i+batch_size]).to(device)
        pred, _ = model(X_b)
        loss = nn.MSELoss()(pred.squeeze(), y_b)
        model.zero_grad()
        loss.backward()
        grad = X_b.grad.detach().cpu().numpy()
        sigma_b = sigma if isinstance(sigma, np.ndarray) else np.array(sigma)
        gn = np.sqrt(np.sum((grad * sigma_b) ** 2, axis=1))
        grad_norms.extend(gn)
        pred_clean_list.extend(pred.detach().squeeze().cpu().numpy().tolist())
    grad_norms = np.array(grad_norms)
    pred_clean_arr = np.array(pred_clean_list)
    cert_bound = epsilon * grad_norms
    return cert_bound, pred_clean_arr, np.mean(grad_norms), np.median(grad_norms)

cert_bounds, pred_cert, mean_gn, median_gn = certified_bound_gradient_norm(model, X_val_scaled, y_val, 1.0, sigma_train, device)
print(f'Certified bound (ε=1.0): mean |Δf| ≤ {np.mean(cert_bounds):.6f}, median ≤ {np.median(cert_bounds):.6f}')
print(f'Mean gradient norm (scaled by σ): {mean_gn:.6f}')

cert_results = []
for eps in EPSILONS:
    cb, _, _, _ = certified_bound_gradient_norm(model, X_val_scaled, y_val, eps, sigma_train, device)
    cert_results.append({'epsilon': eps, 'cert_bound_mean': np.mean(cb), 'cert_bound_median': np.median(cb)})
cert_df = pd.DataFrame(cert_results)
print('\nCertified bounds by ε:')
print(cert_df.to_string(index=False))

## 6. Visualizations

In [None]:
fig, axes = plt.subplots(2, 2, figsize=(12, 10))

# 1. RMSE-based robustness vs epsilon (FGSM vs PGD)
ax = axes[0, 0]
for attack in ['FGSM', 'PGD']:
    d = results_df[results_df['attack'] == attack]
    ax.plot(d['epsilon'], d['rob_rmse'], 'o-', label=attack)
ax.axhline(1.0, color='gray', linestyle='--', alpha=0.5)
ax.set_xlabel('ε (perturbation budget)')
ax.set_ylabel('Robustness (RMSE-based)')
ax.set_title('FGSM vs PGD: RMSE-based Robustness')
ax.legend()
ax.grid(True, alpha=0.3)

# 2. R² degradation vs epsilon
ax = axes[0, 1]
for attack in ['FGSM', 'PGD']:
    d = results_df[results_df['attack'] == attack]
    ax.plot(d['epsilon'], d['r2'], 's-', label=attack)
ax.axhline(clean_r2, color='gray', linestyle='--', alpha=0.5, label='Clean')
ax.set_xlabel('ε')
ax.set_ylabel('R² under attack')
ax.set_title('R² Degradation Under FGSM/PGD')
ax.legend()
ax.grid(True, alpha=0.3)

# 3. Heatmap: Robustness
ax = axes[1, 0]
pivot = results_df.pivot(index='attack', columns='epsilon', values='rob_rmse')
sns.heatmap(pivot, annot=True, fmt='.3f', cmap='RdYlGn', vmin=0.3, vmax=1.0, ax=ax)
ax.set_title('Robustness Heatmap (FGSM vs PGD)')

# 4. Certified bound vs epsilon
ax = axes[1, 1]
ax.plot(cert_df['epsilon'], cert_df['cert_bound_mean'], 'o-', label='Mean bound')
ax.plot(cert_df['epsilon'], cert_df['cert_bound_median'], 's-', label='Median bound')
ax.set_xlabel('ε')
ax.set_ylabel('Certified |Δf| bound')
ax.set_title('Certified Robustness: |f(x+δ)-f(x)| ≤ ε·||σ⊙∇f||')
ax.legend()
ax.grid(True, alpha=0.3)

plt.suptitle('Multi-Head Diversity: PGD/FGSM & Certified Robustness', fontsize=12, fontweight='bold')
plt.tight_layout()
plt.show()

In [None]:
# Clean vs Adversarial predictions (FGSM and PGD at ε=1.0)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

X_fgsm = fgsm_attack(model, X_val_scaled, y_val, 1.0, sigma_train, device)
X_pgd = pgd_attack(model, X_val_scaled, y_val, 1.0, sigma_train, PGD_STEPS, PGD_ALPHA, device)
with torch.no_grad():
    pred_fgsm = model(torch.FloatTensor(X_fgsm).to(device))[0].squeeze().cpu().numpy()
    pred_pgd = model(torch.FloatTensor(X_pgd).to(device))[0].squeeze().cpu().numpy()

axes[0].scatter(pred_clean, pred_fgsm, alpha=0.3, s=5, label='FGSM')
axes[0].scatter(pred_clean, pred_pgd, alpha=0.3, s=5, label='PGD')
axes[0].plot([pred_clean.min(), pred_clean.max()], [pred_clean.min(), pred_clean.max()], 'r--', label='y=x')
axes[0].set_xlabel('Clean prediction')
axes[0].set_ylabel('Adversarial prediction (ε=1.0)')
axes[0].set_title('Clean vs FGSM/PGD Predictions')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].scatter(y_val, pred_clean, alpha=0.3, s=5, label='Clean', c='blue')
axes[1].scatter(y_val, pred_fgsm, alpha=0.3, s=5, label='FGSM', c='orange')
axes[1].scatter(y_val, pred_pgd, alpha=0.3, s=5, label='PGD', c='green')
axes[1].set_xlabel('True return')
axes[1].set_ylabel('Prediction')
axes[1].set_title('True vs Predicted (Clean vs Adversarial)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
print('='*60)
print('SUMMARY: Multi-Head Diversity (PGD/FGSM & Certified)')
print('='*60)
print(f'Clean: RMSE={clean_rmse:.6f}, R²={clean_r2:.6f}')
print('\nRobustness (RMSE-based) by attack (mean over ε):')
for a in ['FGSM', 'PGD']:
    m = results_df[results_df['attack']==a]['rob_rmse'].mean()
    print(f'  {a}: {m:.4f}')
print('\nCertified bound (mean |Δf| ≤ ε·||σ⊙∇f||):')
print(cert_df.to_string(index=False))