# Data Loading

In [1]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
import random
from sklearn.impute import KNNImputer
from sklearn import linear_model
import matplotlib.pyplot as plt
from sklearn.impute import SimpleImputer
import scipy.stats as stats
import multiprocessing



In [2]:
#load data
X = np.load("/Users/jiaweizhang/research/data/X.npy")
Y = np.load("/Users/jiaweizhang/research/data/Y.npy")
Z = np.load("/Users/jiaweizhang/research/data/Z.npy")
M = np.load("/Users/jiaweizhang/research/data/M.npy")

display(pd.DataFrame(X))
display(pd.DataFrame(Y))
display(pd.DataFrame(Z))
display(pd.DataFrame(M))


Unnamed: 0,0,1,2,3,4
0,0.351270,-0.006691,0.486778,0.983765,0.0
1,1.717173,1.677005,0.275203,1.603558,0.0
2,0.303360,-0.276237,0.728373,2.108596,0.0
3,0.699926,-1.832527,-0.449712,-0.098141,1.0
4,1.392342,1.469335,0.065477,0.285702,1.0
...,...,...,...,...,...
9995,0.294324,-0.884006,0.694549,0.725856,0.0
9996,2.439932,1.994273,-1.634582,-1.264636,0.0
9997,0.602251,1.712798,0.314768,0.875240,1.0
9998,-0.354522,0.629949,0.430727,0.494158,1.0


Unnamed: 0,0,1,2
0,3.303519,13.072710,25.034843
1,74.895105,98.937727,511.770741
2,15.710748,36.619668,108.607238
3,9.988293,-0.549150,1.703974
4,22.704922,56.992803,240.445090
...,...,...,...
9995,2.549213,5.034245,3.702225
9996,48.589225,37.528051,53.415294
9997,24.550319,61.445234,282.056835
9998,19.673381,18.373226,41.032051


Unnamed: 0,0
0,0.0
1,1.0
2,0.0
3,1.0
4,0.0
...,...
9995,0.0
9996,1.0
9997,0.0
9998,1.0


Unnamed: 0,0,1,2
0,0.0,0.0,0.0
1,0.0,0.0,0.0
2,0.0,0.0,0.0
3,0.0,0.0,1.0
4,0.0,0.0,0.0
...,...,...,...
9995,0.0,0.0,0.0
9996,0.0,0.0,0.0
9997,0.0,0.0,0.0
9998,0.0,0.0,0.0


# One shot framework

### Randomly split one dataframe to two datasets
split_df takes a pandas DataFrame as input and randomly splits it into two separate DataFrames with a specified proportion of the data in each split. The function shuffles the indices randomly and splits the DataFrame using the shuffled indices. It returns the two separate DataFrames as output.

In [3]:
def split_df(df):
    # Set the proportion of data to be split
    split_proportion = 0.5

    # Set a random seed for reproducibility
    random.seed(23)

    # Get the indices for the split
    indices = df.index.tolist()
    num_rows = len(df)
    split_index = int(num_rows * split_proportion)

    # Shuffle the indices randomly
    random.shuffle(indices)

    # Get the randomly selected rows for each split
    split1_indices = indices[:split_index]
    split2_indices = indices[split_index:]

    # Split the original DataFrame into two separate DataFrames
    df1 = df.loc[split1_indices]
    df2 = df.loc[split2_indices]
    
    return df1,df2

### T-test for T(Z,Y)
the Wilcoxon rank sum test
$T(\mathbf{Z}, \mathbf{Y})=\sum_{n=1}^{N}Z_{n}\cdot \text{rank}(Y_{n})=\sum_{n=1}^{N}\{Z_{n}\cdot \sum_{n^{\prime}=1}^{N} \mathbf{1}(Y_{n}\geq Y_{n^{\prime}})\}$.

In [4]:
def T(z,y):

    #the Wilcoxon rank sum test
    n = len(z)
    t = 0
    #O(N^2) version
    """
    for n in range(N):
        rank = sum(1 for n_prime in range(N) if Y[n] >= Y[n_prime])
        T += Z[n] * rank
    """

    #O(N*Log(N)) version
    my_list = []
    for i in range(n):
        my_list.append((z[i],y[i]))
    sorted_list = sorted(my_list, key=lambda x: x[1])

    #Calculate
    for i in range(n):
        t += sorted_list[i][0] * (i + 1)
    
    return t

def getT(G, df):
    
    # Get the imputed data Y and indicator Z
    df_imputed = G.transform(df)
    y = df_imputed[:, Z.shape[1] + X.shape[1]:df_imputed.shape[1]]
    z = df_imputed[:, 0]
    
    z_tiled = np.tile(z, 3)

    # Concatenate the tiled versions of Z together
    new_z = np.concatenate((z_tiled,))
    new_y = y.flatten()

    #the Wilcoxon rank sum test
    t = T(new_z,new_y)

    return t



#### t-test

In [5]:
def ttest(G, df):
    
    # Get the imputed data Y and indicator Z
    df_imputed = G.transform(df)
    Y_pred = df_imputed[:, Z.shape[1] + X.shape[1]:df_imputed.shape[1]]
    Z_shuffled = df_imputed[:, 0]

    # Get the t-statistics for T(Z,Y)
    treatment = Y_pred[Z_shuffled == 1].flatten()
    control = Y_pred[Z_shuffled == 0].flatten()

    t, p = stats.ttest_ind(treatment, control, equal_var=True)

    return t,p

## One Short Framework 


In [6]:
def one_shot_test(Z, X, M, Y, G1, G2, L=10000):
    """
    A one-shot framework for testing H_0.

    Args:
    Z: 2D array of observed treatment indicators
    X: 2D array of observed covariates
    M: 2D array of observed missing indicators
    Y: 2D array of observed values for K outcomes
    G1: a function that takes (Z, X, M, Y_k) as input and returns the imputed value for outcome k
    G2: a function that takes (Z, X, M, Y_k) as input and returns the imputed value for outcome k
    L: number of Monte Carlo simulations (default is 10000)

    Returns:
    p1: 1D array of exact p-values for testing Fisher's sharp null in part 1
    p2: 1D array of exact p-values for testing Fisher's sharp null in part 2
    """

    #print train start
    print("Training start")

    # create data a whole data frame
    Y_masked = np.ma.masked_array(Y, mask=M)
    Y_masked = Y_masked.filled(np.nan)
    df = pd.DataFrame(np.concatenate((Z, X, Y_masked), axis=1))
    
    # randomly split the data into two parts
    df1, df2 = split_df(df)

    # impute the missing values and calculate the observed test statistics in part 1
    G1.fit(df1)
    t1_obs = getT(G1, df1)

    # impute the missing values and calculate the observed test statistics in part 2
    G2.fit(df2)
    t2_obs = getT(G2, df2)

    #print train end
    print("Training end")

    # simulate data and calculate test statistics
    t1_sim = np.zeros(L)
    t2_sim = np.zeros(L)
    for l in range(L):
        # simulate treatment indicators in parts 1 and 2
        Z_sim = np.random.binomial(1, 0.5, df.shape[0]).reshape(-1, 1)
        df_sim = pd.DataFrame(np.concatenate((Z_sim, X, Y_masked), axis=1))
        
        # split the simulated data into two parts
        df1_sim, df2_sim = split_df(df_sim)
        # get the test statistics in part 1
        t1_sim[l] = getT(G2, df1_sim)

        # get the test statistics in part 2
        t2_sim[l] = getT(G1, df2_sim)

        # Calculate the completeness percentage
        if l % 100 == 0:
            completeness = l / L * 100  
            print(f"Task is {completeness:.2f}% complete.")

    # calculate exact p-values for each outcome
    p1 = np.mean(t1_sim >= t1_obs, axis=0)
    p2 = np.mean(t2_sim >= t2_obs, axis=0)
    
    return p1, p2



###  Parallel Computing Version

In [7]:
def worker(args):
    Z, X, M, Y_masked, G1, G2, t1_obs, t2_obs, shape, L = args
    t1_sim = np.zeros(L)
    t2_sim = np.zeros(L)

    for l in range(L):
        Z_sim = np.random.binomial(1, 0.5, shape[0]).reshape(-1, 1)
        df_sim = pd.DataFrame(np.concatenate((Z_sim, X, Y_masked), axis=1))
        df1_sim, df2_sim = split_df(df_sim)
        t1_sim[l] = getT(G1, df1_sim)
        t2_sim[l] = getT(G2, df2_sim)
        if l % 100 == 0:
            completeness = l / L * 100  
            print(f"Task is {completeness:.2f}% complete.")

    p1 = np.mean(t1_sim >= t1_obs, axis=0)
    p2 = np.mean(t2_sim >= t2_obs, axis=0)

    return p1, p2

def one_shot_test_parallel(Z, X, M, Y, G1, G2, L=10000, n_jobs=multiprocessing.cpu_count()):
    """
    A one-shot framework for testing H_0.

    Args:
    Z: 2D array of observed treatment indicators
    X: 2D array of observed covariates
    M: 2D array of observed missing indicators
    Y: 2D array of observed values for K outcomes
    G1: a function that takes (Z, X, M, Y_k) as input and returns the imputed value for outcome k
    G2: a function that takes (Z, X, M, Y_k) as input and returns the imputed value for outcome k
    L: number of Monte Carlo simulations (default is 10000)

    Returns:
    p1: 1D array of exact p-values for testing Fisher's sharp null in part 1
    p2: 1D array of exact p-values for testing Fisher's sharp null in part 2
    """
    #print train start
    print("Training start")

    # create data a whole data frame
    Y_masked = np.ma.masked_array(Y, mask=M)
    Y_masked = Y_masked.filled(np.nan)
    df = pd.DataFrame(np.concatenate((Z, X, Y_masked), axis=1))
    
    # randomly split the data into two parts
    df1, df2 = split_df(df)

    # impute the missing values and calculate the observed test statistics in part 1
    G1.fit(df1)
    t1_obs = getT(G1, df1)

    # impute the miassing values and calculate the observed test statistics in part 2
    G2.fit(df2)
    t2_obs = getT(G2, df2)

    #print train end
    print("Training end")
    
    # print the number of cores
    print(f"Number of cores: {n_jobs}")


    # simulate data and calculate test statistics in parallel
    args_list = [(Z, X, M, Y_masked, G1, G2, t1_obs, t2_obs, df.shape, L / n_jobs)] * n_jobs
    with multiprocessing.Pool(processes=n_jobs) as pool:
        p_list = pool.map(worker, args_list)
    p1 = np.mean([p[0] for p in p_list], axis=0)
    p2 = np.mean([p[1] for p in p_list], axis=0)
    
    return p1, p2


# Test the framework 

Test all the machine learning method in single core

In [11]:
#MissForest
print("One-shot test for Fisher's sharp null")
missForest = IterativeImputer(estimator = RandomForestRegressor(),max_iter=10, random_state=0)

p1, p2 = one_shot_test(Z, X, M, Y,G1=missForest, G2=missForest)
print("p-values for part 1:", p1)
print("p-values for part 2:", p2)

One-shot test for Fisher's sharp null
Training start
Training end
Task is 0.00% complete.
Task is 1.00% complete.
Task is 2.00% complete.
Task is 3.00% complete.
Task is 4.00% complete.
Task is 5.00% complete.
Task is 6.00% complete.
Task is 7.00% complete.
Task is 8.00% complete.
Task is 9.00% complete.
Task is 10.00% complete.
Task is 11.00% complete.
Task is 12.00% complete.
Task is 13.00% complete.
Task is 14.00% complete.
Task is 15.00% complete.
Task is 16.00% complete.
Task is 17.00% complete.
Task is 18.00% complete.
Task is 19.00% complete.
Task is 20.00% complete.
Task is 21.00% complete.
Task is 22.00% complete.
Task is 23.00% complete.
Task is 24.00% complete.
Task is 25.00% complete.
Task is 26.00% complete.
Task is 27.00% complete.
Task is 28.00% complete.
Task is 29.00% complete.
Task is 30.00% complete.
Task is 31.00% complete.
Task is 32.00% complete.
Task is 33.00% complete.
Task is 34.00% complete.
Task is 35.00% complete.
Task is 36.00% complete.
Task is 37.00% comp

In [9]:
#KNNimputer
print("One-shot test for Fisher's sharp null")
KNNimputer = KNNImputer(n_neighbors=7)
p1, p2 = one_shot_test(Z, X, M, Y, G1=KNNimputer, G2=KNNimputer)
p1, p2 = one_shot_test_parallel(Z, X, M, Y, G1=KNNimputer, G2=KNNimputer)
print("p-values for part 1:", p1)
print("p-values for part 2:", p2)


One-shot test for Fisher's sharp null
Training start
Training end
Task is 0.00% complete.


KeyboardInterrupt: 

In [None]:
#BayesianRidge
print("One-shot test for Fisher's sharp null")
BayesianRidge = IterativeImputer(estimator = linear_model.BayesianRidge(),max_iter=10, random_state=0)
p1, p2 = one_shot_test(Z, X, M, Y, G1=BayesianRidge, G2=BayesianRidge)
print("p-values for part 1:", p1)
print("p-values for part 2:", p2)

In [None]:
#Median imputer
print("One-shot test for Fisher's sharp null")
median_imputer = SimpleImputer(missing_values=np.nan, strategy='median')
p1, p2 = one_shot_test(Z, X, M, Y, G1=median_imputer, G2=median_imputer)
print("p-values for part 1:", p1)
print("p-values for part 2:", p2)

In [None]:
#Mean imputer
print("One-shot test for Fisher's sharp null")
mean_imputer = SimpleImputer(missing_values=np.nan, strategy='mean')
p1, p2 = one_shot_test(Z, X, M, Y, G1=mean_imputer, G2=mean_imputer)
print("p-values for part 1:", p1)
print("p-values for part 2:", p2)
