# Performative generalization bounds (Cor. 3.8-style)

This notebook loads the attached run archive, extracts the trained model and predicted probabilities, and computes the bound of the form:

$$\mathscr R(d_0, \widehat{\theta}_T) \leq \mathscr R(\widehat{d}_{T-1}, \widehat{\theta}_T)
+ L_\ell \left(\frac{\log (C_a/\delta)}{C_b\,n}\right)^{\frac{p}{\nu}}
+ \frac{(\varepsilon^{T-1}-1)}{L_\ell^{-1}(\varepsilon-1)}\,\left(\frac{m}{n}\right)^{\frac{1}{p}}\,\mathscr D_{\mathcal Z}\,\tilde L_a$$

with $\tilde L_a = 1/(1+L_a)$, where $L_a$ is a (conservative) Lipschitz constant of the best-response map
$$G(d)=\arg\min_{\theta\in\Theta}\mathscr R(d,\theta).$$

It also computes the special case $T=1$ (where the performative accumulation term vanishes).

**Notes**
- Some previously uploaded files in this chat may expire. If you get a file-not-found error, re-upload the archive.
- Constants $C_a,C_b$ from Wasserstein concentration are not identified by the model; the notebook treats them as inputs (default 1).


In [None]:
import os, zipfile, glob, math, pickle
import numpy as np
import pandas as pd

ZIP_PATH = '/mnt/data/20260126_233204_pca_x2_y1.zip'  # update if needed
EXTRACT_DIR = '/mnt/data/pca_x2_y1_extracted_nb'

if not os.path.exists(ZIP_PATH):
    raise FileNotFoundError(
        f"Could not find {ZIP_PATH}. If the upload expired, please re-upload the archive and update ZIP_PATH.")

if os.path.exists(EXTRACT_DIR):
    import shutil
    shutil.rmtree(EXTRACT_DIR)
os.makedirs(EXTRACT_DIR, exist_ok=True)

with zipfile.ZipFile(ZIP_PATH, 'r') as z:
    z.extractall(EXTRACT_DIR)

train_paths = glob.glob(EXTRACT_DIR + '/**/train_predictions.csv', recursive=True)
test_paths  = glob.glob(EXTRACT_DIR + '/**/test_predictions.csv', recursive=True)
model_paths = glob.glob(EXTRACT_DIR + '/**/model.pkl', recursive=True)

train_paths, test_paths, model_paths

In [None]:
# Pick the first matching set by default (adjust if your archive contains multiple runs)
train_path = train_paths[0]
test_path = test_paths[0] if test_paths else None
model_path = model_paths[0] if model_paths else None

df_train = pd.read_csv(train_path)
n = len(df_train)

eps = 1e-15
p_hat = np.clip(df_train['y_pred_proba'].to_numpy(float), eps, 1-eps)
y = df_train['y_true'].to_numpy(int)

# Empirical logistic (cross-entropy) risk
R_emp = float(-np.mean(y*np.log(p_hat) + (1-y)*np.log(1-p_hat)))

R_emp, n

In [None]:
# Load model (optional; used to report dimension or regularization metadata if present)
clf = None
d = None
if model_path is not None and os.path.exists(model_path):
    with open(model_path, 'rb') as f:
        clf = pickle.load(f)
    if hasattr(clf, 'coef_'):
        d = clf.coef_.shape[1]

d

## Parameters / constants

Set the parameters used in the bound. Defaults follow the choices used in this thread:
- Wasserstein order: $p=2$
- PCA effective dimension: $\nu=4$
- $\delta$ (confidence): default 0.1
- $\mathscr D_{\mathcal Z}$: default 1 (after normalization)
- $m/n$: default 0.03 (edit as needed)

For $L_\ell$ we use
$$L_\ell = \frac{D_{\mathcal X}}{1+\exp(-D_{\mathcal X}D_\Theta)}$$
with $D_{\mathcal X}=\mathrm{diam}([0,1]^4)=2$ and $D_\Theta=\mathrm{diam}([-3,3]^4)=12$.


In [None]:
# Core parameters
p_w = 2
nu = 4
delta = 0.1
D_Z = 1.0

# intervention share
r = 0.03  # m/n

# concentration constants (set by you / paper)
C_a = 1.0
C_b = 1.0

# Lipschitz constant L_ell from diameters
D_X = 2.0
D_Theta = 12.0
L_ell = D_X / (1.0 + math.exp(-D_X * D_Theta))

L_ell

## Compute $L_a$ and $\tilde L_a$

We use a conservative sensitivity bound for strongly convex ERM:
$$\|G(d)-G(d')\| \le \frac{1}{\mu}\,\sup_{\theta}\|\nabla_\theta \mathscr R(d,\theta)-\nabla_\theta \mathscr R(d',\theta)\|,$$
where $\mu$ is the strong convexity modulus coming from $\ell_2$ regularization.

Under the common normalization
$$\mathscr R(d,\theta)=\mathbb E_d[\ell(\theta;z)] + \tfrac{\lambda}{2}\|\theta\|^2,$$
we have $\mu=\lambda$. For sklearn `LogisticRegression(C=...)`, a typical translation is $\lambda\approx 1/C$ (check your paperâ€™s convention).

For logistic loss, a simple bound is $\|\nabla_\theta \ell(\theta;(x,y))\|\le \|x\|\le D_{\mathcal X}$, so we take
$$L_a \approx \frac{D_{\mathcal X}}{\lambda}, \qquad \tilde L_a=\frac{1}{1+L_a}.$$

Edit `lambda_reg` below to match your exact objective scaling.

In [None]:
lambda_reg = 1.0  # set to your paper's lambda
L_a = D_X / lambda_reg
tilde_L_a = 1.0 / (1.0 + L_a)
L_a, tilde_L_a

## Compute the bound

We compute:
- empirical risk $\mathscr R(\widehat d_{T-1},\widehat\theta_T)$ (for $T=1$ this is the training empirical risk),
- sampling term $L_\ell \big(\log(C_a/\delta)/(C_b n)\big)^{p/\nu}$,
- performative accumulation term $\frac{(\varepsilon^{T-1}-1)}{L_\ell^{-1}(\varepsilon-1)} (m/n)^{1/p} \mathscr D_{\mathcal Z} \tilde L_a$.

The last term depends on $(\varepsilon,T)$. For $T=1$ it is always zero.

In [None]:
def sampling_term(L_ell, C_a, C_b, delta, n, p_w, nu):
    return L_ell * ((math.log(C_a/delta)/(C_b*n)) ** (p_w/nu))

def performative_accum_term(eps_val, T, L_ell, r, p_w, D_Z, tilde_L_a):
    if T <= 1:
        return 0.0
    num = (eps_val**(T-1) - 1.0)
    den = (1.0/L_ell) * (eps_val - 1.0)  # L_ell^{-1}(eps-1)
    return (num/den) * (r ** (1.0/p_w)) * D_Z * tilde_L_a

T = 1
eps_val = 1.0 + L_a  # common choice; adjust if your paper uses a different eps

S = sampling_term(L_ell, C_a, C_b, delta, n, p_w, nu)
P = performative_accum_term(eps_val, T, L_ell, r, p_w, D_Z, tilde_L_a)
BOUND = R_emp + S + P

R_emp, S, P, BOUND

## Optional: sweep over $r=m/n$ and $T$

This is useful if you want to visualize how the performative accumulation term scales with $r$ and the number of performative steps $T$.

In [None]:
def compute_bound_for(r_val, T_val):
    S = sampling_term(L_ell, C_a, C_b, delta, n, p_w, nu)
    P = performative_accum_term(eps_val, T_val, L_ell, r_val, p_w, D_Z, tilde_L_a)
    return R_emp + S + P, S, P

r_grid = [0.01, 0.02, 0.03, 0.05]
T_grid = [1, 2, 3, 5]

rows = []
for rr in r_grid:
    for TT in T_grid:
        b, s, pterm = compute_bound_for(rr, TT)
        rows.append({'r=m/n': rr, 'T': TT, 'R_emp': R_emp, 'Sampling': s, 'PerformativeAccum': pterm, 'Bound': b})

pd.DataFrame(rows)