# Section 5.4: The regularization bias, simulations


References:
- Beck, Teboule (2088) "A Fast Iterative Shrinkage-Thresholding Algorithm for Linear Inverse Problems"
- Belloni, A., Chernozhukov, V., Hansen, C. (2011) "Inference on Treatment Effects After Selection Amongst High-Dimensional Controls", https://arxiv.org/abs/1201.0224

In [None]:
from dataclasses import dataclass, field
from typing import List
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.stats import norm, multivariate_normal
import matplotlib.pyplot as plt

from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.metrics import r2_score
from sklearn.base import BaseEstimator

np.random.seed(99999)

# DGP and estimators

In [None]:
#------------------------------------------------------
# OLS
#------------------------------------------------------
@dataclass
class OLS(BaseEstimator):
    """
    OLS:
        Ordinary Least Squares,
        but outputs variance, etc.
    """
    fit_intercept: bool = True
    robust_variance: bool = True
    rcond: float = 1e-15

    def fit(self, X, y):
        """
        estimate:
            Compute the OLS coefficient

        Args:
            X (np.array): Dimension (n, p).
            y (np.array): Dimension (n,).
        """
        if self.fit_intercept:
            X = np.c_[np.ones(len(X)), X]

        beta, _, rank_, singular_ = np.linalg.lstsq(X, y, rcond=None)
        if self.fit_intercept:
            self.intercept_ = beta[0]
            self.coef_ = beta[1:]
        else:
            self.intercept_ = 0.
            self.coef_ = beta
        return self

    def predict(self, X):
        check_is_fitted(self, 'coef_')
        check_is_fitted(self, 'intercept_')
        X = check_array(X)
        return self.intercept_ + X @ self.coef_
    
    def score(self, X, y):
        y_hat = self.predict(X)
        return r2_score(y, y_hat)
    
    def compute_std(self, X, y, robust=True):
        """
        compute_std:
        
        Args:
            X (np.array): Dimension (n, p).
            y (np.array): Dimension (n,).
            robust (bool): If True, uses White sandwich variance estimator. 
        """
        check_is_fitted(self, 'coef_')
        check_is_fitted(self, 'intercept_')
        
        epsilon = y - self.predict(X)

        if self.fit_intercept:
            X = np.c_[np.ones(len(X)), X]

        XtX = X.T @ X
        inv_XtX = np.linalg.pinv(XtX, rcond=self.rcond, hermitian=True)

        if robust:
            A = (inv_XtX @ X.T) * epsilon
            self.variance_ =  len(X) * A @ A.T
        else:
            self.variance_ =  len(X) * np.sum(epsilon ** 2) * inv_XtX / (len(X) - 1)

        self.std = np.sqrt(np.diag(self.variance_) / len(X))

    def compute_student_statistics(self, X, y):
        check_is_fitted(self, 'coef_')

        if not hasattr(self, "variance_"):
            self.compute_std(X=X, y=y, robust=self.robust_variance)

        if self.fit_intercept:
            self.t_stat = self.coef_ / self.std[1:]
        else:
            self.t_stat = self.coef_ / self.std
        self.p_value = 1 - norm.cdf(np.abs(self.t_stat))

#------------------------------------------------------
# LASSO
#------------------------------------------------------
@dataclass
class Lasso(BaseEstimator):
    """
    _PenalizedRegression:
        Penalized regression parent class.

    Args:
        alpha (float): Overall penalty strength.
        gamma (float): Weight of the Lasso penalty vs. Group-Lasso penalty.
            1 = lasso, 0 = group-lasso.
        groups (list of lists of integers): List of feature indices for each group.
        nopen (list of int): Index of variables that should not be penalized.
        max_iter (int): Maximum number of iterations.
        tol (float): Tolerance for convergence.
        fit_intercept (bool): If True, fits an intercept.
        verbose (bool): If True, displays progress.

    Attributes
    coef_ : array, shape (n_features,)
        Estimated coefficients.
    """
    alpha: float = 1.
    nopen: List[int] = field(default_factory=list)
    fit_intercept: bool = True
    verbose: bool = False
    max_iter: int = 5_000
    tol: float = 1e-6
    static_args: dict = field(default_factory=dict)

    def loss_gradient(self, w, b, y, X):
        return - 2 * (y - b - X @ w) @ X / len(X)
    
    def intercept_loss_gradient(self, w, b, y, X):
        return - 2 * (y - b - X @ w).mean()
        
    def prox(self, x, alpha, nopen):
        y = np.sign(x) * np.maximum(0, np.abs(x) - alpha)
        if nopen is not None:
            y[nopen] = x[nopen]
        return y

    def fit(self, X, y):
        X, y = check_X_y(X, y)
        n_, n_features = X.shape
        
        # Initialize constants and variables
        _, w, _ = np.linalg.svd(X)
        eta = n_ / (2 * w[0] ** 2)
        t, t_old = 1., 1.
        coef, z = np.zeros(n_features), np.zeros(n_features)
        b = 0.

        if self.verbose:
            loop = tqdm(range(self.max_iter))
        else:
            loop = range(self.max_iter)

        for _ in loop:
            # Save parameters
            coef_old = coef.copy() 
            t_old = t
            t = (1 + np.sqrt(1 + 4 * t_old ** 2)) / 2
            delta = (1 - t_old) / t

            # Compute gradient and update coefficients
            grad = self.loss_gradient(z, b, y, X)
            coef = self.prox(
                x=z - eta * grad,
                alpha=eta * self.alpha,
                nopen=self.nopen
                )
            if self.fit_intercept:
                b  -= eta * self.intercept_loss_gradient(w=coef, b=b, y=y, X=X)
            z = (1 - delta) * coef + delta * coef_old

            # Check convergence
            if np.linalg.norm(coef - coef_old) / np.maximum(1e-9, np.linalg.norm(coef)) < self.tol:
                break
        if (_ == self.max_iter - 1):
            print("Max. number of iterations reached.")
        self.coef_ = coef
        self.intercept_ = b
        return self

    def predict(self, X):
        check_is_fitted(self, 'coef_')
        check_is_fitted(self, 'intercept_')
        X = check_array(X)
        return self.intercept_ + X @ self.coef_
    
    def score(self, X, y):
        y_hat = self.predict(X)
        return r2_score(y, y_hat)
    
def compute_double_selection(X, d, y, s_hat):
    ols = OLS()
    if len(s_hat) > 0:
        X_ = np.c_[d, X[:, s_hat]]
    else:
        X_ = d.reshape(-1, 1)
    
    ols.fit(X_, y)
    ols.compute_std(X=X_, y=y)
    return ols.coef_[0], ols.std[1]

def double_selection_estimator(X, d, y, alpha):
    n = len(X)
    lasso = Lasso(alpha=alpha)
    ## A. Selection on treatment
    lasso.fit(X, d)
    s_d = np.nonzero(lasso.coef_)[0]

    ## B. Selection on outcome
    lasso.fit(X, y)
    s_y = np.nonzero(lasso.coef_)[0]

    ## C. Compute double selection estimator
    s_hat = np.union1d(s_y, s_d)
    if len(s_hat) > 0:
        X_ = np.c_[d, X[:, s_hat]]
    else:
        X_ = d.reshape(-1, 1)

    ols = OLS()
    ols.fit(X_, y)
    tau_hat = ols.coef_[0]
    eps_y = y - ols.predict(X_)

    if len(s_hat) > 0:
        ols.fit(X[:, s_hat], d)
        eps_d = d - ols.predict(X[:, s_hat])
    else:
        eps_d = d - d.mean()

    tau_std = np.sum((eps_d * eps_y) ** 2) / (n - len(s_hat) - 1)
    tau_std /= (np.sum(eps_d ** 2) / n) ** 2
    tau_std = np.sqrt(tau_std) / np.sqrt(n)

    return tau_hat, tau_std, s_hat

In [None]:
@dataclass
class DGP():
    """
    Generates synthetic data for illustrating the regularization bias.

    Args:
        n (int): Number of samples.
        p (int): Number of variables.
        r_y (float): R-squared for the outcome equation.
        r_d (float): R-squared for the treatment variable.
        intercept (bool): Whether to include an intercept term.
        rho (float): Covariate correlation parameter.
        tau (float): Treatment effect.

    Returns:
        tuple: A tuple containing the generated data:
            X (numpy.ndarray): Covariate matrix.
            y (numpy.ndarray): Outcome variable.
            d (numpy.ndarray): Treatment variable.
    """
    n: int = 200
    p: int = 300
    r_y: float = .1
    r_d: float = .8
    intercept: bool = True
    rho: float = .5
    tau: float = .5
        
    def __post_init__(self):
        p = self.p
        # Covariate variance matrix
        self.sigma = np.array([[self.rho ** np.abs(k - j) for k in range(p)] for j in range(p)])

        # Treatment variable coefficient
        beta = np.zeros(p)
        for j in range(p // 2):
            beta[j] = (-1) ** (j+1) / (j+1) ** 2

        # Outcome equation coefficients
        gamma = np.copy(beta)
        for j in range(p // 2 + 1, p+1):
            gamma[j-1] = (-1) ** (j + 1) / (p - j + 1) ** 2

        # Adjustment to match R-squared
        gamma *= np.sqrt(self.r_d / (1 - self.r_d) / (gamma.T @ self.sigma @ gamma))
        beta *= np.sqrt(self.r_y / (1 - self.r_y) / (beta.T @ self.sigma @ beta))
        
        self.gamma, self.beta = gamma, beta
        
        # All even-indexed covariates are dummies
        self.even = np.arange(1, p + 1) % 2 == 0
    
    def generate(self):
        # Generate covariates
        X = multivariate_normal(mean=np.zeros(self.p), cov=self.sigma).rvs(size=self.n)
        X[:, self.even] = np.where(X[:, self.even] > 0, 1, 0)

        # Treatment
        d = 1 * np.array(np.random.rand(self.n) < norm.cdf(X @ self.gamma))

        # Outcome
        y = self.tau * d + X @ self.beta + np.random.randn(self.n)

        if self.intercept:
            X = np.c_[np.ones(self.n), X]

        return X, y, d

# Parameters

In [None]:
dgp = DGP(n=200, p=300, tau=.5, intercept=False, r_y=.1, r_d=.8)
n_sim = 100   # nb simulations
n_folds = 5   # nb folds

# Generate random split
cvgroup = np.digitize(np.random.rand(dgp.n), np.linspace(0, 1, n_folds + 1))

# Penalty level
g = .1 / np.log(max(dgp.n, dgp.p))
c = 1.1
alpha = 2 * c * norm.ppf(1 - .5 * g / dgp.p) / np.sqrt(dgp.n) # (theoretical) Lasso penalty level

# Simulations

In [None]:
lasso_naive = Lasso(alpha=alpha, nopen=[0])
ols = OLS()

estimate, std = [], []

for b in tqdm(range(n_sim)):
    X, y, d  = dgp.generate()
    X_ = np.c_[d, X]

    # Method 1: Naive selection
    lasso_naive.fit(X_, y)
    s_hat = np.nonzero(lasso_naive.coef_[1:])[0]

    tau_naive, std_naive = compute_double_selection(X, d, y, s_hat)

    # Method 2: Double-Selection, no sample-splitting
    tau_ds, std_ds, _ = double_selection_estimator(X, d, y, alpha)

    # Method 3: with sample splitting
    tau_k, sigma_k_num, sigma_k_denom = [], [], []
    for k in range(1, n_folds + 1):
        Ik = (cvgroup == k)
        NIk = (cvgroup != k)

        g_cf = .1 / np.log(max(NIk.sum(), dgp.p))
        alpha_cf = 1.1 * norm.ppf(1 - .5 * g_cf / dgp.p) / np.sqrt(NIk.sum())

        _, _, s_hat = double_selection_estimator(X[NIk], d[NIk], y[NIk], alpha_cf)

        if len(s_hat) > 0:
            ols.fit(X[np.ix_(NIk, s_hat)], y[NIk])
            eps_y = y[Ik] - ols.predict(X[np.ix_(Ik, s_hat)])

            ols.fit(X[np.ix_(NIk, s_hat)], d[NIk])
            eps_d = d[Ik] - ols.predict(X[np.ix_(Ik, s_hat)])
        else:
            eps_y = y[Ik] - y[NIk].mean()
            eps_d = d[Ik] - d[NIk].mean()
            
        ols.fit(eps_d, eps_y)
        tau_k.append(ols.coef_[0])

        sigma_k_num.append(np.sum((eps_d*eps_y) ** 2))
        sigma_k_denom.append(np.sum(eps_d ** 2))

    sigma_k_num = np.array(sigma_k_num).sum() / len(X)
    sigma_k_denom = np.array(sigma_k_denom).sum() / len(X)

    # Save results
    estimate.append({
        'Naive post-selection': tau_naive,
        'Double selection': tau_ds,
        'Double selection w/ cross-fitting': np.array(tau_k).mean()
    })
    std.append({
        'Naive post-selection': std_naive,
        'Double selection': std_ds,
        'Double selection w/ cross-fitting': np.sqrt(sigma_k_num / sigma_k_denom ** 2) / np.sqrt(len(X))
    })
        


# Results

## Table

In [None]:
estimate = pd.DataFrame(estimate)
std = pd.DataFrame(std)

table = {
    'Bias': estimate.mean() - dgp.tau,
    'RMSE': (estimate - dgp.tau).pow(2).mean().pow(.5),
    'Coverage rate': (np.abs(estimate - dgp.tau) < norm.ppf(0.975) * std).mean() # from (a+b)(a-b) < 0 iff abs(a) < b (and b > 0 always here), i.e. bounds of CI surround 0.
}
print(pd.DataFrame(table).T.round(3))


In [None]:
print(pd.DataFrame(table).T.round(3).to_latex())

## Histograms

In [None]:
estimate_ = (estimate - dgp.tau) / std
fig, axs = plt.subplots(1, len(estimate_.columns), figsize=(20, 8), sharex=True, sharey=True, dpi=800)
axs = axs.ravel()

x = np.linspace(-1.1 * np.abs(estimate_).max().max(), 1.1 * np.abs(estimate_).max().max(), 100)

for ax, (title, data) in zip(axs, estimate_.items()):
    # Plot histogram with light grey color
    ax.hist(data, bins=int(n_sim ** (2 / 3)), color='lightgrey', density=True)
    ax.plot(x, norm.pdf(x, 0, 1), color='black', linestyle='dashed', linewidth=2)
    
    # Set plot title and labels
    ax.set_title(title)
    ax.set_xlabel(r'$\sqrt{n}(\widehat \tau - \tau_0) / \widehat \sigma$')
    ax.set_ylabel('Frequency')

plt.tight_layout()
plt.savefig("Fig_5_1.jpg", bbox_inches='tight')