# Functional relation comparison

In [1]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import scipy.stats as st
import scipy.special as sp
import random

from fastkde import fastKDE

In [2]:
from cdt.causality.pairwise import ANM
anm = ANM()

from cdt.causality.pairwise import CDS
cds = CDS()

from cdt.causality.pairwise import RECI
reci = RECI()

In [3]:
def estimator(x, alpha = 0.05):
    
    if(x.shape[0]%2!=0):
        x = x[:-1]
    
    estim, inf = np.split(x, 2) #split data up into two halves
    
    ## first split used for density estimation
    margin_x = fastKDE.pdf_at_points(var1 = estim[:,0], list_of_points = list(inf[:,0]))
    margin_y = fastKDE.pdf_at_points(var1 = estim[:,1], list_of_points = list(inf[:,1]))
    select = np.logical_and(margin_x > 0, margin_y > 0)
    margin_y = margin_y[select]
    margin_x = margin_x[select]
    
    h_x1 = -np.mean(np.log(margin_x))
    h_y1 = -np.mean(np.log(margin_y))
    
    ## second split used for density estimation
    margin_x = fastKDE.pdf_at_points(var1 = inf[:,0], list_of_points = list(estim[:,0]))
    margin_y = fastKDE.pdf_at_points(var1 = inf[:,1], list_of_points = list(estim[:,1]))
    select = np.logical_and(margin_x > 0, margin_y > 0)
    margin_y = margin_y[select]
    margin_x = margin_x[select]
    
    h_x2 = -np.mean(np.log(margin_x))
    h_y2 = -np.mean(np.log(margin_y))
    
    ## cross fitting
    h_x = (h_x1 + h_x2)/2
    h_y = (h_y1 + h_y2)/2
    delta = (h_x - h_y)
    
    ## variance estimation using monte carlo
    margin_x = fastKDE.pdf_at_points(var1 = x[:,0])
    margin_y = fastKDE.pdf_at_points(var1 = x[:,1])
    select = np.logical_and(margin_x > 0, margin_y > 0)
    margin_y = margin_y[select]
    margin_x = margin_x[select]
    
    covar = np.cov(np.log(margin_x), np.log(margin_y))
    
    delta_var = covar[0,0] + covar[1,1] - 2*covar[0,1]
    delta_sd = np.sqrt(delta_var)
    
    delta_lcb = delta - st.norm.ppf(1 - alpha/2)*delta_sd/np.sqrt(len(select)/2)
    delta_ucb = delta + st.norm.ppf(1 - alpha/2)*delta_sd/np.sqrt(len(select)/2)
    
    if(delta_lcb*delta_ucb < 0):
        decision = 0
    else:
        decision = delta_lcb/np.abs(delta_lcb)
    
    return ([decision, h_x, h_y, delta_lcb, delta, delta_ucb, delta_sd/np.sqrt(len(select)/2)])

## Data generation

In [4]:
import numpy as np
from scipy.stats import rankdata, norm

def simulate_data(n_samples, g_func, rho=0.0, s=1.0, seed=None):
    """
    Generate simulated data according to specified process.
    
    Parameters:
    -----------
    n_samples : int
        Number of samples to generate
    g_func : callable
        Function g(x) to transform x
    rho : float
        Correlation coefficient for bivariate normal
    s : float
        Standard deviation parameter for error scaling
    seed : int, optional
        Random seed for reproducibility
    
    Returns:
    --------
    dict containing:
        x : array
            Generated x values
        y : array
            Generated y values with error
        y_true : array
            Generated y values without error
        e : array
            Generated error terms
    """
    if seed is not None:
        np.random.seed(seed)
    
    # 1. Generate X and e from bivariate normal
    cov_matrix = [[1, rho], [rho, 1]]
    data = np.random.multivariate_normal([0, 0], cov_matrix, n_samples)
    x_normal, e_normal = data[:, 0], data[:, 1]
    
    # 2. Transform to uniform using rank transformation
    x_uniform = rankdata(x_normal) / (n_samples + 1)
    e_uniform = rankdata(e_normal) / (n_samples + 1)
    
    # Keep original x for output
    x = x_uniform.copy()
    
    # 3. Scale e to normal with sd = s
    e = norm.ppf(e_uniform) * s
    
    # 4. Create y = g(x)
    y_true = g_func(x)
    
    # 5. Affine transform y to [0, 1]
    y_min, y_max = np.min(y_true), np.max(y_true)
    y_true = (y_true - y_min) / (y_max - y_min)
    
    # 6. Add error e
    y = y_true + e
    
    return(np.column_stack((x, y)))

## Case 1, no noise

In [5]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 1/3)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0
## correlation
r = 0

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.224, 0.   , 0.   ])

## Case 1, low noise, low correlation

In [6]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 1/3)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.10

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.236, 0.   , 0.   ])

## Case 1, low noise, high correlation

In [7]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 1/3)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.60

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.212, 0.   , 0.   ])

## Case 2, no noise

In [8]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 1/2)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.0
## correlation
r = 0.0

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.176, 0.   , 0.   ])

## Case 2, low noise, low correlation

In [9]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 1/2)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.10

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.  , 0.22, 0.  , 0.  ])

## Case 2, low noise, high correlation

In [10]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 1/2)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.60

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.  , 0.18, 0.  , 0.  ])

## Case 3, no noise

In [11]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 2)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.0
## correlation
r = 0.0

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.  , 0.24, 1.  , 1.  ])

## Case 3, low noise, low correlation

In [12]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 2)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.10

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.184, 1.   , 1.   ])

## Case 3, low noise, high correlation

In [13]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 2)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.60

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1. , 0.2, 1. , 1. ])

## Case 4, no noise

In [14]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 3)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.0
## correlation
r = 0.0

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.196, 0.   , 1.   ])

## Case 4, low noise, low correlation

In [15]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 3)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.10

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.204, 0.016, 1.   ])

## Case 4, low noise, low correlation

In [16]:
# Define a sample g function (quadratic)
g = lambda x: np.power(x, 3)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.60

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.  , 0.18, 0.  , 1.  ])

## Case 5, no noise

In [17]:
# Define a sample g function (quadratic)
g = lambda x: np.exp(x)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.0
## correlation
r = 0.0

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.216, 0.   , 1.   ])

## Case 5, low noise, low correlation

In [18]:
# Define a sample g function (quadratic)
g = lambda x: np.exp(x)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.10

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([0.996, 0.204, 0.436, 1.   ])

## Case 5, low noise, high correlation

In [19]:
# Define a sample g function (quadratic)
g = lambda x: np.exp(x)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.60

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([0.976, 0.212, 0.052, 1.   ])

## Case 6, no noise

In [23]:
# Define a sample g function (quadratic)
g = lambda x: np.sin(11*x/7)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.0
## correlation
r = 0.0

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.244, 1.   , 0.   ])

## Case 6, low noise, low correlation

In [24]:
# Define a sample g function (quadratic)
g = lambda x: np.sin(11*x/7)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.10

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.  , 0.26, 1.  , 0.  ])

## Case 6, low noise, high correlation

In [25]:
# Define a sample g function (quadratic)
g = lambda x: np.sin(11*x/7)
# Generate data
n_samples = 1000
# number of iterations
niter = 250
## error 
sd = 0.02
## correlation
r = 0.60

results = np.zeros((niter, 4))

for i in range(niter):
    data = simulate_data(n_samples=n_samples, 
                     g_func=g,
                     rho=r,  #  correlation between X and e
                     s=sd,    #  error term
                     seed=42 + i)   # For reproducibility
    results[i] = [estimator(data)[0], anm.predict_proba((data[:,0], data[:,1])), cds.predict_proba((data[:,0], data[:,1])), reci.predict_proba((data[:,0], data[:,1]))]
    
np.sum(results > 0, axis = 0)/float(niter)

array([1.   , 0.264, 1.   , 0.   ])