In [1]:
import numpy as np
import torch
import torch.nn as nn
import random
import pandas as pd
device = "cpu"
from copy import deepcopy

def set_seed():
    random.seed(42)
    np.random.seed(42)
    torch.manual_seed(42)
    torch.cuda.manual_seed_all(42)  # If using CUDA

# Dataset Properties

In [2]:
Gauss = np.random.normal
var = 5 # High Variance
trn_size = 1000 # High Training Size
ndim = 6 # Number of dimensions. The base dimensions are always 3.
depth = 10 # Depth of the causal graph

# Generate Dataset

In [3]:
def lin_y(X: np.array, w: np.array, var: float):
    """
    Defines y as a linear function of X and w with Gaussian noise of variance var and mean 0
    """
    if len(w.shape) == 1:
        w = w.reshape(-1, 1)
    assert X.shape[1] == len(w), "Input array must be of same shape as the weight vector"
    result = X @ w / X.shape[1]
    exog = np.random.randn(len(X)) * np.sqrt(var)
    return [result.reshape(-1, 1), exog.reshape(-1, 1)]

def lin_X(X: np.ndarray, mat: np.ndarray, var: float):
    """
    Defines X' as a linear function of X and mat with Gaussian noise of variance var and mean 0
    """
    assert X.shape[1] == mat.shape[0], "Input array must be of same shape as the weight matrix"
    result: np.ndarray = X @ mat / X.shape[1]
    exog = Gauss(0, 1, result.shape) * np.sqrt(var)
    return [result, exog]

def generate_roots(num_samples, ndim, corr):
    """Generates correlated features. 
    Base X is assumed to be of 3 dimensions
    Remaining dimensions are correlated with the base X with correlation corr
    """
    base_X = Gauss(0, 1, (num_samples, 3))
    X = [base_X]
    num_rpt = ndim // 3
    for i in range(num_rpt - 1):
        X.append(base_X + Gauss(0, corr, (num_samples, 3)))
    X = np.concatenate(X, axis=1)
    assert X.shape == (num_samples, ndim), f"Shape of X is {X.shape}"
    return X

# Define the Models

In [4]:
class LinearYModel(nn.Module):
    def __init__(self, num_params):
        super(LinearYModel, self).__init__()
        self.num_params = num_params
        self.w = np.random.randn(num_params, 1)

    def fit(self, X, y):
        self.w = np.linalg.inv(X.T @ X) @ X.T @ y

    def predict(self, X):
        yhat = X @ self.w
        yhat = yhat.reshape(-1, 1)
        return yhat

    def forward(self, X):
        return self.predict(X)
    
class LinearXModel(nn.Module):
    def __init__(self, num_params):
        super(LinearXModel, self).__init__()
        self.num_params = num_params
        self.mat = np.random.randn(num_params, num_params)
    
    def fit(self, XPrev, Xcurr):
        self.mat = np.linalg.inv(XPrev.T @ XPrev) @ XPrev.T @ Xcurr
        assert self.mat.shape == (self.num_params, self.num_params), f"Shape of matrix is {self.mat.shape}"
    
    def predict(self, X):
        yhat = X @ self.mat
        return yhat
    
    def forward(self, X):
        return self.predict(X)

# %% Define the Gold DGP Parameters
GoldMatrices = []
for i in range(depth-1):
    GoldMatrices.append(Gauss(0, 1, (ndim, ndim)))
w = Gauss(0, 1, (ndim, 1))

# Generate Training and Test Datasets

In [5]:
def generate_datasets(rho):
    """
    Generates the training, validation, test and fixed datasets for the given DGP
    """
    set_seed()
    Xval_root, XTst_root = generate_roots(100, ndim, rho), generate_roots(100, ndim, rho)
    XFix_root = np.copy(XTst_root)
    XTrn_root = generate_roots(trn_size, ndim, rho)
    for i in range(len(XTst_root)):
        XTst_root[i, np.random.randint(0, 3)] = np.random.uniform(3, 10)
        
    def gen_inter_data(X_root, matrices, w, var):
        int_data = []
        int_data.append(lin_X(X_root, matrices[0], var)) # The root nodes have only noise
        for mat in matrices[1:]:
            X_noise = int_data[-1][0] + int_data[-1][1]
            int_data.append(lin_X(X_noise, mat, var))
        X_noise = int_data[-1][0] + int_data[-1][1]
        int_data.append(lin_y(X_noise, w, var)) 
        return int_data

    data_dicts = {
        "trn_root": XTrn_root,
        "val_root": Xval_root,
        "tst_root": XTst_root,
        "fix_root": XFix_root,
        "trn": gen_inter_data(XTrn_root, GoldMatrices, w, var),
        "val": gen_inter_data(Xval_root, GoldMatrices, w, var),
        "tst": gen_inter_data(XTst_root, GoldMatrices, w, var),
        "fix": gen_inter_data(XFix_root, GoldMatrices, w, var)
    }
    return data_dicts

# Train the Models

In [6]:
def train_models(data_dicts):
    """
    Trains the models for the given data dictionaries
    The first model is from the root to the first intermediate node
    The rest of the models are from one intermediate node to the next
    """
    XTrn_root = data_dicts["trn_root"]
    Xmodels = []
    for i in range(depth-1):
        Xmodels.append(LinearXModel(ndim))
    Ymodel = LinearYModel(ndim)

    # First train the 0th model from root to the first intermediate node
    trn_Y = data_dicts["trn"][0][0] + data_dicts["trn"][0][1]
    bparams = deepcopy(Xmodels[0].mat)
    Xmodels[0].fit(XTrn_root, trn_Y)
    aparams = deepcopy(Xmodels[0].mat)
    print("Trained 0th model from Root -> X0", np.linalg.norm(bparams - aparams))
    for i, model in enumerate(Xmodels[1:]):
        trn_X = data_dicts["trn"][i][0] + data_dicts["trn"][i][1]
        trn_Y = data_dicts["trn"][i+1][0] + data_dicts["trn"][i+1][1]
        bparams = deepcopy(model.mat)
        model.fit(trn_X, trn_Y)
        aparams = deepcopy(model.mat)
        print(f"Trained {i+1}th model from X{i} -> X{i+1}", np.linalg.norm(bparams - aparams))
    trn_X = data_dicts["trn"][i+1][0] + data_dicts["trn"][i+1][1]
    trn_Y = data_dicts["trn"][i+2][0] + data_dicts["trn"][i+2][1]
    bparams = deepcopy(Ymodel.w)
    Ymodel.fit(trn_X, trn_Y)
    aparams = deepcopy(Ymodel.w)
    print(f"Trained Y model from X{i+1} -> X{i+2}", np.linalg.norm(bparams - aparams))
    assert len(Xmodels)+1 == depth, "Number of models trained is not equal to depth"
    return Xmodels, Ymodel


# Evaluate the models

In [None]:
def get_preds(Xmodels, Ymodel, X_root, data, val_preds=None, test_preds=None, int=False, CF=False):
    preds = []
    for i, model in enumerate(Xmodels):
        if i == 0:
            p = model(X_root)
        else:
            p = model(p)
        if CF == True:
            residual = test_preds[i][1]
        elif int == True:
            # residual = val_preds[i][1]
            # np.random.shuffle(residual)
            residual = np.zeros_like(val_preds[i][1])
        else:
            residual = p - data[i][0]
        preds.append([p, residual])
    p = Ymodel(p)
    if CF == True:
        residual = test_preds[-1][1]
    elif int == True:
        # residual = val_preds[-1][1]
        # np.random.shuffle(residual)
        residual = np.zeros_like(val_preds[-1][1])
    else:
        residual = p - data[-1][0]
    preds.append([p, residual])
    return preds

# %% Assess the L2 errors
def mse_error(preds, data):
    errors = []
    for i, pred in enumerate(preds):
        errors.append(round(np.mean((pred[0] + preds[1] - data[i][0] + data[i][1])**2), 2))
    return errors

def eval_models(Xmodels, Ymodel, data_dicts):
    """
    Evaluates the models on the given data dictionaries
    """
    Xval_root = data_dicts["val_root"]
    XTst_root = data_dicts["tst_root"]
    XFix_root = data_dicts["fix_root"]
    
    # %% Get the predictions for test data and validation data for residuals
    val_preds = get_preds(Xmodels, Ymodel, Xval_root, data_dicts["val"])
    test_preds = get_preds(Xmodels, Ymodel, XTst_root, data_dicts["tst"])    


    # %% Interventional Preds
    int_preds = get_preds(Xmodels, Ymodel, XFix_root, data_dicts["fix"], val_preds=val_preds, int=True)
    int_errors = mse_error(int_preds, data_dicts["fix"])

    # %% Counterfactual Preds
    cf_preds = get_preds(Xmodels, Ymodel, XFix_root, data_dicts["fix"], test_preds=test_preds, CF=True)
    cf_errors = mse_error(cf_preds, data_dicts["fix"])

    df_dict = {
        "Depth": np.arange(1, depth+1),
        "int_errors": int_errors,
        "cf_errors": cf_errors,
        "Reduction_PC": [round(100 * ((cf - int) / cf), 2) for cf, int in zip(cf_errors, int_errors)]
    }
    df = pd.DataFrame(df_dict)
    df.set_index("Depth", inplace=True)
    return df

In [8]:
result_dfs = []
for corr in [0.01, 1]:
    data_dicts = generate_datasets(corr)
    Xmodels, Ymodel = train_models(data_dicts)
    df = eval_models(Xmodels, Ymodel, data_dicts)
    result_dfs.append(df)

# Concatenate all the dfs based on Depth
final_df = pd.concat(result_dfs, axis=1, keys=[0.01, 1])
print(final_df.to_markdown())

Trained 0th model from Root -> X0 42.52195383148337
Trained 1th model from X0 -> X1 7.720633332688531
Trained 2th model from X1 -> X2 5.057511244483703
Trained 3th model from X2 -> X3 7.497846949499599
Trained 4th model from X3 -> X4 4.310451996116022
Trained 5th model from X4 -> X5 6.977017609909292
Trained 6th model from X5 -> X6 5.796138377138417
Trained 7th model from X6 -> X7 5.713012190967711
Trained 8th model from X7 -> X8 6.680180541075484
Trained Y model from X8 -> X9 1.889158144886001
Trained 0th model from Root -> X0 5.521850147519549
Trained 1th model from X0 -> X1 7.725684955067254
Trained 2th model from X1 -> X2 5.056907725455107
Trained 3th model from X2 -> X3 7.497698520491602
Trained 4th model from X3 -> X4 4.310497821537817
Trained 5th model from X4 -> X5 6.977029353646472
Trained 6th model from X5 -> X6 5.7961713591012245
Trained 7th model from X6 -> X7 5.713035967101211
Trained 8th model from X7 -> X8 6.680176096171011
Trained Y model from X8 -> X9 1.889157182122293