In [10]:
import os
import numpy as np
import pandas as pd
from tqdm import trange, tqdm
from sklearn.linear_model import LinearRegression
from revised_cdcs.test import generate_scenarios, get_scenario_description

In [2]:
def scale(x):
    """Standardize a 1D array to mean 0 and std 1."""
    return (x - np.mean(x)) / np.std(x)

In [3]:
def compute_test_tensor_G(Y: np.ndarray) -> np.ndarray:
    """
    Compute test tensor G (n x k x p), where:
    - n = number of samples
    - p = number of variables
    - k = 7 test functions: sin/cos (2), poly2, poly3, sign * |x|^2.5
    """
    n, p = Y.shape
    J = 2  # num of sin/cos frequencies
    k = 2 * J + 3  # total num of test functions
    G = np.zeros((n, k, p))

    for u in range(p):
        for j in range(1, J + 1):
            G[:, 2 * j - 2, u] = np.sin(j * Y[:, u])
            G[:, 2 * j - 1, u] = np.cos(j * Y[:, u])
        G[:, 4, u] = scale(Y[:, u] ** 2)
        G[:, 5, u] = scale(Y[:, u] ** 3)
        G[:, 6, u] = scale(np.sign(Y[:, u]) * np.abs(Y[:, u]) ** 2.5)
    return G

In [4]:
X = np.random.normal(0, 1, 10)
Y = np.random.uniform(-2, 2, 10)
np.shape(Y)

(10,)

In [5]:
G = compute_test_tensor_G(X.reshape(-1,1))
np.shape(G) # shape (n x k x p)

(10, 7, 1)

In [6]:
def compute_statistic(eta: np.ndarray, G: np.ndarray, norm: str = 'inf'):
    """
    Compute test statistic that which quantifies the dependence between a variable eta 
    and a fixed set of test functions stored in G.
    """
    n, k, p = G.shape
    means = (eta[:, None, None] * G).mean(axis=0) * np.sqrt(n)  # shape (k, p)
    
    if norm == 'l2':
       return np.linalg.norm(means, ord=2)
    elif norm == 'l1':
         return np.linalg.norm(means, ord=1)
    elif norm == 'inf':
         return np.linalg.norm(means, ord=np.inf)
    else:
        raise ValueError("Invalid norm")

In [7]:
def test_independence_pval(X: np.ndarray, Y: np.ndarray, B: int = 400, norm: str = 'inf'):
    """
    Test whether Y is conditionally independent of X using residual-based test statistic.
    
    Parameters:
    - X: (n, dx) predictor variables
    - Y: (n, dy) response variables
    - B: number of bootstrap resamples
    - norm: norm used in test statistic ('inf', 'l1', 'l2')
    
    Returns:
    - pvals: array of p-values for each child variable
    """
    # Ensure X and Y are 2D arrays
    if X.ndim == 1:
        X = X.reshape(-1, 1)
    if Y.ndim == 1:
        Y = Y.reshape(-1, 1)
        
    n, dx = X.shape
    _, dy = Y.shape

    # Step 1: Fit linear regression of Y on X (with intercept)
    model = LinearRegression().fit(X, Y)
    Y_hat = model.predict(X)
    eta = (Y - Y_hat).flatten()  # shape (n,)
    
    # Step 2: Compute test functions on regressors
    G = compute_test_tensor_G(X) # shape (n, k, p)

    # Step 3: Compute observed statistic and null distribution
    T_obs = compute_statistic(eta, G, norm=norm)
    null_dist = np.zeros(B)

    for b in range(B):
        eta_b = eta[np.random.choice(n, n, replace=True)]
        null_dist[b] = compute_statistic(eta_b, G, norm=norm)

    # Step 4: Compute empirical p-value
    pval = (np.sum(null_dist >= T_obs) + 1) / (B + 1)
    
    return pval

In [8]:
def evaluate_test_performance(X, Y, reps: int=100, alpha: float=0.05, B: int=400,
                              norm: str='inf'):
    """
    Evaluates the power or type I error of the independence test on fixed (X, Y) data.
    
    Parameters
    ----------
    X: np.ndarray
        A 1D or 2D array of shape (n,) or (n, d1) representing variable X.
    Y: np.ndarray
        A 1D or 2D array of shape (n,) or (n, d2) representing variable Y.
    reps: int, optional
        Number of repetitions to simulate the test, by default 100.
    alpha: float, optional
        Significance level for hypothesis testing, by default 0.05.
    B: int, optional
        number of bootstrap resamples, by default 400.
    norm: str, optional
        which norm to aggregate over test functions, by default 'inf'.
    
    Returns
    -------
    float
        Estimated type I error rate,
        computed as the proportion of times the null hypothesis is rejected.
    """
    n = X.shape[0]
    pvals = []
    for _ in trange(reps, desc="Running tests"):
        idx = np.random.choice(n, n, replace=True)
        pval = test_independence_pval(X=X[idx], Y=Y[idx], B=B, norm=norm)
        pvals.append(pval)
    
    pvals = np.array(pvals)
    rate = np.mean(pvals < alpha)

    print(f"Rejection rate at alpha={alpha:.2f}: {rate:.3f}")
    
    return rate

In [14]:
n = 1000
reps = 100
alpha = 0.05
B = 400
norm = 'inf'

scenarios = generate_scenarios()
print(len(scenarios))

55


#### CASE 1: Independent Gaussian-Gaussian variables

In [18]:
scenario_types = {f"Independent GG {i}": "Independent (G-G)" for i in range(1, 11)} 

for name, gen_func in scenarios.items():
    if name.startswith("Independent GG"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Independent GG 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.28it/s]


Rejection rate at alpha=0.05: 0.180
Testing scenario: Independent GG 2


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.34it/s]


Rejection rate at alpha=0.05: 0.100
Testing scenario: Independent GG 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 98.85it/s]


Rejection rate at alpha=0.05: 0.360
Testing scenario: Independent GG 4


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.71it/s]


Rejection rate at alpha=0.05: 0.060
Testing scenario: Independent GG 5


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.90it/s]


Rejection rate at alpha=0.05: 0.090
Testing scenario: Independent GG 6


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.62it/s]


Rejection rate at alpha=0.05: 0.080
Testing scenario: Independent GG 7


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.78it/s]


Rejection rate at alpha=0.05: 0.160
Testing scenario: Independent GG 8


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.32it/s]


Rejection rate at alpha=0.05: 0.180
Testing scenario: Independent GG 9


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.51it/s]


Rejection rate at alpha=0.05: 0.070
Testing scenario: Independent GG 10


Running tests: 100%|██████████| 100/100 [00:01<00:00, 94.86it/s]

Rejection rate at alpha=0.05: 0.120





#### CASE 2: Independent Gaussian-NonGaussian variables

In [19]:
scenario_types = {f"Independent GN {i}": "Independent (G-NG)" for i in range(1, 11)}

for name, gen_func in scenarios.items():
    if name.startswith("Independent GN"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Independent GN 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 95.93it/s]


Rejection rate at alpha=0.05: 0.020
Testing scenario: Independent GN 2


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.38it/s]


Rejection rate at alpha=0.05: 0.060
Testing scenario: Independent GN 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 98.72it/s]


Rejection rate at alpha=0.05: 0.170
Testing scenario: Independent GN 4


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.76it/s]


Rejection rate at alpha=0.05: 0.040
Testing scenario: Independent GN 5


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.31it/s]


Rejection rate at alpha=0.05: 0.910
Testing scenario: Independent GN 6


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.41it/s]


Rejection rate at alpha=0.05: 0.110
Testing scenario: Independent GN 7


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.28it/s]


Rejection rate at alpha=0.05: 0.350
Testing scenario: Independent GN 8


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.12it/s]


Rejection rate at alpha=0.05: 0.100
Testing scenario: Independent GN 9


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.73it/s]


Rejection rate at alpha=0.05: 0.170
Testing scenario: Independent GN 10


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.37it/s]

Rejection rate at alpha=0.05: 0.370





#### CASE 3: Independent NonGaussian-NonGaussian variables

In [None]:
scenario_types = {f"Independent NN {i}": "Independent (NG-NG)" for i in range(1, 11)} 

for name, gen_func in scenarios.items():
    if name.startswith("Independent NN"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Independent NN 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 95.31it/s]


Rejection rate at alpha=0.05: 0.000
Testing scenario: Independent NN 2


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.01it/s]


Rejection rate at alpha=0.05: 0.080
Testing scenario: Independent NN 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.34it/s]


Rejection rate at alpha=0.05: 0.000
Testing scenario: Independent NN 4


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.73it/s]


Rejection rate at alpha=0.05: 0.440
Testing scenario: Independent NN 5


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.96it/s]


Rejection rate at alpha=0.05: 0.100
Testing scenario: Independent NN 6


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.81it/s]


Rejection rate at alpha=0.05: 0.100
Testing scenario: Independent NN 7


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.13it/s]


Rejection rate at alpha=0.05: 0.000
Testing scenario: Independent NN 8


Running tests: 100%|██████████| 100/100 [00:01<00:00, 98.69it/s]


Rejection rate at alpha=0.05: 0.160
Testing scenario: Independent NN 9


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.59it/s]


Rejection rate at alpha=0.05: 0.020
Testing scenario: Independent NN 10


Running tests: 100%|██████████| 100/100 [00:01<00:00, 98.49it/s]

Rejection rate at alpha=0.05: 0.140





#### CASE 4: Weak nonlinear dependence (hard to detect)

In [20]:
scenario_types = {f"Subtle Weak {i}": "Subtle Dependent" for i in range(1, 6)}

for name, gen_func in scenarios.items():
    if name.startswith("Subtle Weak"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Subtle Weak 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.58it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Subtle Weak 2


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.19it/s]


Rejection rate at alpha=0.05: 0.970
Testing scenario: Subtle Weak 3


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.18it/s]


Rejection rate at alpha=0.05: 0.140
Testing scenario: Subtle Weak 4


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.31it/s]


Rejection rate at alpha=0.05: 0.040
Testing scenario: Subtle Weak 5


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.94it/s]

Rejection rate at alpha=0.05: 0.730





#### CASE 5: Conditional dependence through latent variable

In [21]:
scenario_types = {f"Subtle Cond {i}": "Subtle Dependent" for i in range(1, 6)}

for name, gen_func in scenarios.items():
    if name.startswith("Subtle Cond"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Subtle Cond 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.78it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Subtle Cond 2


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.15it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Subtle Cond 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 98.54it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Subtle Cond 4


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.88it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Subtle Cond 5


Running tests: 100%|██████████| 100/100 [00:01<00:00, 95.73it/s]

Rejection rate at alpha=0.05: 1.000





#### CASE 6: Higher-order interaction dependence

In [22]:
scenario_types = {f"Subtle Higher {i}": "Subtle Dependent" for i in range(1, 6)}

for name, gen_func in scenarios.items():
    if name.startswith("Subtle Higher"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Subtle Higher 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 95.57it/s]


Rejection rate at alpha=0.05: 0.270
Testing scenario: Subtle Higher 2


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.92it/s]


Rejection rate at alpha=0.05: 0.380
Testing scenario: Subtle Higher 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.45it/s]


Rejection rate at alpha=0.05: 0.190
Testing scenario: Subtle Higher 4


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.49it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Subtle Higher 5


Running tests: 100%|██████████| 100/100 [00:01<00:00, 98.89it/s]

Rejection rate at alpha=0.05: 1.000





#### CASE 7: Strong nonlinear dependence (easy to detect)

In [23]:
scenario_types = {f"Dependent Strong {i}": "Clearly Dependent" for i in range(1, 6)}

for name, gen_func in scenarios.items():
    if name.startswith("Dependent Strong"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Dependent Strong 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 94.10it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Dependent Strong 2


Running tests: 100%|██████████| 100/100 [00:01<00:00, 97.16it/s]


Rejection rate at alpha=0.05: 0.910
Testing scenario: Dependent Strong 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.10it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Dependent Strong 4


Running tests: 100%|██████████| 100/100 [00:01<00:00, 95.34it/s]


Rejection rate at alpha=0.05: 0.140
Testing scenario: Dependent Strong 5


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.94it/s]

Rejection rate at alpha=0.05: 1.000





#### CASE 8: Dependence between mixed variable types

In [24]:
scenario_types = {f"Dependent Mixed {i}": "Clearly Dependent" for i in range(1, 6)}

for name, gen_func in scenarios.items():
    if name.startswith("Dependent Mixed"):
        print(f"Testing scenario: {name}")
        X, Y = gen_func(n)
        rejection_rate = evaluate_test_performance(X, Y, reps=reps, alpha=alpha, B=B, norm=norm)

Testing scenario: Dependent Mixed 1


Running tests: 100%|██████████| 100/100 [00:01<00:00, 94.43it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Dependent Mixed 2


Running tests: 100%|██████████| 100/100 [00:01<00:00, 96.17it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Dependent Mixed 3


Running tests: 100%|██████████| 100/100 [00:01<00:00, 99.43it/s]


Rejection rate at alpha=0.05: 1.000
Testing scenario: Dependent Mixed 4


Running tests: 100%|██████████| 100/100 [00:00<00:00, 101.28it/s]


Rejection rate at alpha=0.05: 0.000
Testing scenario: Dependent Mixed 5


Running tests: 100%|██████████| 100/100 [00:00<00:00, 100.85it/s]

Rejection rate at alpha=0.05: 0.120



