In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
import sys
import os
if os.getcwd()[:5] == '/rds/':
    sys.path.append('/rds/general/user/ll1917/home/esig/controlled_linear_regression') # to add when running on remote Jupyter server
    os.chdir('/rds/general/user/ll1917/home/esig/controlled_linear_regression') # to add when running on remote Jupyter server

## Controlled Linear Regression

In [4]:
from lib.controlled_linear_regression import fit_controlled_linear_regression
import numpy as np
import torch

In [11]:
N = 1_000
beta = np.array([-1, 3,])
n = 1

np.random.seed(0)
X = np.stack([np.random.normal(loc=i, scale=1, size=N) for i in range(n)], axis=-1)
Z = np.random.normal(loc=0, scale=1, size=N)

rho = 0.5
sigma = 1 / rho 
eps = sigma * (rho**2 * Z + np.sqrt(rho**2 * (1 - rho**2)) * np.random.normal(loc=0, scale=1, size=N)) # Var(eps) = sigma**2 rho**2, Var(Z) = 1, Corr(eps, Z) = Corr(Y, Z) = rho

Y = beta[0] + X @ beta[1:] + eps

lm = fit_controlled_linear_regression(X, Y, None)
lm_controlled = fit_controlled_linear_regression(X, Y, Z, method = 'control')
lm_controlled_joint = fit_controlled_linear_regression(X, Y, Z, method = 'joint-OLS')
N_test = 10_000_000
X_test = np.stack([np.random.normal(loc=i, scale=1, size=N_test) for i in range(n)], axis=-1)
Y_test = beta[0] + X_test @ beta[1:]
print(((lm.predict(X_test) - Y_test)**2).mean())
print(((lm_controlled.predict(X_test) - Y_test)**2).mean())
print(((lm_controlled_joint.predict(X_test) - Y_test)**2).mean())
print(lm)
print(lm_controlled)
print(lm_controlled_joint)

0.003784078101976727
0.0032018345191314394
0.0030940402310033225
LinearPredictionModel(coef=array([2.95299875]), intercept=-1.0396838893618316)
LinearPredictionModel(coef=array([2.96492715]), intercept=-1.044399643766502)
LinearPredictionModel(coef=array([2.96840325]), intercept=-1.0457738805222156)


In [12]:
X_tensor = torch.tensor(X, requires_grad=True).float()
Y_tensor = torch.tensor(Y, requires_grad=True).float()
Z_tensor = torch.tensor(Z, requires_grad=True).float()
lm_tensor = fit_controlled_linear_regression(X_tensor, Y_tensor, None)
lm_controlled_tensor = fit_controlled_linear_regression(X_tensor, Y_tensor, Z_tensor, method='control')
lm_controlled_joint_tensor = fit_controlled_linear_regression(X_tensor, Y_tensor, Z_tensor, method='joint-OLS')
X_test_tensor = torch.tensor(X_test, requires_grad=True).float()
Y_test_tensor = torch.tensor(Y_test, requires_grad=True).float()
print(((lm_tensor.predict(X_test_tensor) - Y_test_tensor)**2).mean())
print(((lm_controlled_tensor.predict(X_test_tensor) - Y_test_tensor)**2).mean())
print(((lm_controlled_joint_tensor.predict(X_test_tensor) - Y_test_tensor)**2).mean())
print(lm_tensor)
print(lm_controlled_tensor)
print(lm_controlled_joint_tensor)

tensor(0.0038, grad_fn=<MeanBackward0>)
tensor(0.0032, grad_fn=<MeanBackward0>)
tensor(0.0031, grad_fn=<MeanBackward0>)
TorchLinearPredictionModel(coef=tensor([2.9530], grad_fn=<SliceBackward>),
                           intercept=tensor(-1.0397, grad_fn=<SelectBackward>))
TorchLinearPredictionModel(coef=tensor([2.9649], grad_fn=<SliceBackward>),
                           intercept=tensor(-1.0444, grad_fn=<SelectBackward>))
TorchLinearPredictionModel(coef=tensor([2.9684], grad_fn=<SliceBackward>),
                           intercept=tensor(-1.0458, grad_fn=<SelectBackward>))
TorchLinearPredictionModel(coef=tensor([2.9530], grad_fn=<SliceBackward>),
                           intercept=tensor(-1.0397, grad_fn=<SelectBackward>))
TorchLinearPredictionModel(coef=tensor([2.9649], grad_fn=<SliceBackward>),
                           intercept=tensor(-1.0444, grad_fn=<SelectBackward>))
TorchLinearPredictionModel(coef=tensor([2.9684], grad_fn=<SliceBackward>),
                           int

## Simulation study

In this setting $\epsilon$ is linear in $Z$ (and hence also in $X$), which implies the 'joint-OLS' is BLUE (and hence better than 'control').

Note that if $\epsilon$ and $Z$ are jointly normally distributed then this is always true.

In [4]:
import pandas as pd
import torch
import numpy as np
import warnings
import pickle

# linear model params
beta = torch.tensor([-1., 6., 8.]) # torch.tensor([-1., 2., 3.])
p = len(beta) - 1

# simulation params
relations = ['linear', 'squared', 'cubic', 'exponential']
mc_size = 10_000
Ns = [100, 1_000, 10_000]
rhos = [0, 0.25, 0.5, 0.75, 1]
methods = [None, 'control', 'joint-OLS']
error_distributions = ['normal', 'standard_t']
noise_injections = ['multiplicative'] # ['additive', 'multiplicative']
sigmas = [5., 10., 20.] # signal/noise 2, 1, 0.5

results_df = pd.DataFrame(
    columns=methods, 
    index=pd.MultiIndex.from_product([error_distributions, relations, noise_injections, Ns, rhos, sigmas], names=['Z & noise distr', 'f(Z)', 'noise injection', 'N', 'rho', 'sigma'])
)

# one single X_test
X_test = torch.stack([torch.tensor(float(i)) for i in range(p)], axis=-1)
X_test = X_test[None, :, None].expand(-1, -1, mc_size)
mu_test = (beta[0] + torch.einsum('nik,i->nk', X_test, beta[1:]))

for N in Ns:
    torch.manual_seed(0)
    X = torch.stack([i + torch.randn(size=(N,)) for i in range(p)], axis=-1)
    X = X[:, :, None].expand(-1, -1, mc_size)

    torch.manual_seed(1)
    Z = torch.randn(size=(N, mc_size))

    for rho in rhos:
        for error_distribution in error_distributions:
            if error_distribution == 'normal':
                eta = torch.randn(size=(N, mc_size)) 
            elif error_distribution == 'student_t':
                eta = torch.from_numpy(np.random.standard_t(df = 3, size=(N, mc_size))).float()

            for relation in relations:
                if relation == 'linear':
                    alpha = rho
                    Z_alpha = Z
                elif relation == 'squared':
                    alpha = np.sqrt(3) * rho
                    Z_alpha = (Z + Z**2 - 1) / np.sqrt(3)
                elif relation == 'cubic':
                    alpha = np.sqrt(15) / 3 * rho
                    Z_alpha = Z**3 / np.sqrt(15)
                elif relation == 'exponential':
                    alpha = np.sqrt(np.exp(1) - 1) * rho
                    Z_alpha = (torch.exp(Z) - np.exp(0.5)) / np.sqrt(np.exp(2) - np.exp(1))

                if rho < 0:
                    warnings.warn(f'Correlation rho={rho} < 0 not supported.')
                    continue
                elif alpha**2 > 1:
                    warnings.warn(f'Correlation rho={rho} unachievable with relation={relation}.')
                    continue
                
                for noise_injection in noise_injections:
                    for sigma in sigmas:
                        if noise_injection == 'additive':
                            eps = sigma * (alpha * Z_alpha + np.sqrt(1 - alpha**2) * eta)
                        elif noise_injection == 'multiplicative':
                            eps = sigma * (alpha * Z_alpha + np.sqrt(1 - alpha**2) * eta * Z_alpha)
                        
                        # Var(eps) = sigma^2 since choose Z_alpha with Var(Z_alpha) = 1 and eta with Var(eta) = 1, 
                        # Cov(eps, Z) = sigma alpha Cov(Z, Z_alpha), 
                        # Var(Z) = 1, 
                        # thus Corr(eps, Z) = alpha Cov(Z, Z_alpha) = rho = Corr(Y, Z)

                        Y = beta[0] + torch.einsum('nik,i->nk', X, beta[1:]) + eps

                        for method in methods:
                            lm = fit_controlled_linear_regression(X, Y, Z if method else None, method = method, parallel=True)
                            results_df.loc[(error_distribution, relation, noise_injection, N, rho, sigma), method] = ((lm.predict(X_test) - mu_test)**2).mean().numpy()



In [8]:
import pickle
import os

# os.makedirs('../controlled_linear_regression', exist_ok=True)
# with open('../controlled_linear_regression/results_df.pickle', 'wb') as f:
#     pickle.dump(results_df, f)

with open('../controlled_linear_regression/results_df.pickle', 'rb') as f:
    results_df = pickle.load(f)

In [178]:
# Perform the operation (e.g., divide column 'A' by column 'B')
new_columns = pd.MultiIndex.from_tuples(
    [(col, m) for col in results_df.columns for m in ['MSE', r'% of None']]
)

# Create the new DataFrame with MultiIndex in columns
new_df = pd.DataFrame({
    (col, m): results_df[col] if m=='MSE' else results_df[col] / results_df[None] for col in results_df.columns for m in ['MSE', r'% of None']
})

# Display the new DataFrame
display(new_df)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,Unnamed: 3_level_0,Unnamed: 4_level_0,Unnamed: 5_level_0,NaN,NaN,control,control,joint-OLS,joint-OLS
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,MSE,% of None,MSE,% of None,MSE,% of None
Z & noise distr,f(Z),noise injection,N,rho,sigma,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2
normal,linear,additive,1000,0.5,5.0,0.025028596,1,0.018775066,0.750145,0.018550528,0.741173
normal,linear,additive,1000,0.5,10.0,0.10011438,1,0.07430441,0.742195,0.07420209,0.741173
normal,linear,additive,1000,0.5,20.0,0.4004576,1,0.2964321,0.740233,0.29680836,0.741173


In [179]:
# Custom function to format numbers based on the value
def format_func(x):
    # If the number is small (less than threshold), use scientific notation
    if abs(x) < 1e-4 and not np.isnan(x):
        return f"{x:.1e}"  # scientific notation with 5 decimals
    else:
        return f"{x:.5f}"  # regular formatting with 5 decimals
    
display(
    new_df.xs(
        'linear', 
        level='f(Z)'
    ).xs(
        'normal', 
        level='Z & noise distr'
    ).xs(
        'additive', 
        level='noise injection'
    ).sort_index(
        
    ).style.format(
        format_func
    ).apply(
        lambda x: ['background-color: blue' if v == x.min() else '' for v in x], 
        axis=1
    )
)

Unnamed: 0_level_0,Unnamed: 1_level_0,Unnamed: 2_level_0,nan,nan,control,control,joint-OLS,joint-OLS
Unnamed: 0_level_1,Unnamed: 1_level_1,Unnamed: 2_level_1,MSE,% of None,MSE,% of None,MSE,% of None
N,rho,sigma,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2
1000,0.5,5.0,0.02503,1.0,0.01878,0.75014,0.01855,0.74117
1000,0.5,10.0,0.10011,1.0,0.0743,0.7422,0.0742,0.74117
1000,0.5,20.0,0.40046,1.0,0.29643,0.74023,0.29681,0.74117


In [133]:
# import pickle
# with open('../controlled_linear_regression/results_df.pickle', 'wb') as f:
#     pickle.dump(results_df, f)

The setting where $\epsilon$ is linear in $Z$ (and hence also in $X$), implies the joint-OLS is the best linear unbiased estimator (BLUE). Thus it is always a better predictor than the classic OLS (but it may still perform worse than the control estimator, as the latter is biased).