In [1]:
import sys, os
from joblib import Parallel, delayed
import numpy as np
import matplotlib.pyplot as plt
import math
import scipy
from sklearn.preprocessing import normalize
from sklearn.metrics import roc_auc_score,roc_curve
from scipy.stats import entropy
from sklearn.calibration import CalibratedClassifierCV
from sklearn.model_selection import cross_val_score, StratifiedKFold, cross_val_predict
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression


In [2]:
def make_trunk_classification(
    n_samples,
    n_dim=4096,
    n_informative=1,
    simulation: str = "trunk",
    mu_0: float = 0,
    mu_1: float = 1,
    rho: int = 0,
    band_type: str = "ma",
    return_params: bool = False,
    mix: float = 0.5,
    seed=None,
):
    if n_dim < n_informative:
        raise ValueError(
            f"Number of informative dimensions {n_informative} must be less than number "
            f"of dimensions, {n_dim}"
        )
    rng = np.random.default_rng(seed=seed)
    rng1 = np.random.default_rng(seed=seed)
    mu_0 = np.array([mu_0 / np.sqrt(i) for i in range(1, n_informative + 1)])
    mu_1 = np.array([mu_1 / np.sqrt(i) for i in range(1, n_informative + 1)])
    if rho != 0:
        if band_type == "ma":
            cov = _moving_avg_cov(n_informative, rho)
        elif band_type == "ar":
            cov = _autoregressive_cov(n_informative, rho)
        else:
            raise ValueError(f'Band type {band_type} must be one of "ma", or "ar".')
    else:
        cov = np.identity(n_informative)
    if mix < 0 or mix > 1:
        raise ValueError("Mix must be between 0 and 1.")
    # speed up computations for large multivariate normal matrix with SVD approximation
    if n_informative > 1000:
        method = "cholesky"
    else:
        method = "svd"
    if simulation == "trunk":
        X = np.vstack(
            (
                rng.multivariate_normal(mu_0, cov, n_samples // 2, method=method),
                rng1.multivariate_normal(mu_1, cov, n_samples // 2, method=method),
            )
        )
    elif simulation == "trunk_overlap":
        mixture_idx = rng.choice(
            2, n_samples // 2, replace=True, shuffle=True, p=[mix, 1 - mix]
        )
        norm_params = [[mu_0, cov], [mu_1, cov]]
        X_mixture = np.fromiter(
            (
                rng.multivariate_normal(*(norm_params[i]), size=1, method=method)
                for i in mixture_idx
            ),
            dtype=np.dtype((float, n_informative)),
        )
        X_mixture_2 = np.fromiter(
            (
                rng1.multivariate_normal(*(norm_params[i]), size=1, method=method)
                for i in mixture_idx
            ),
            dtype=np.dtype((float, n_informative)),
        )
        X = np.vstack(
            (
                X_mixture.reshape(n_samples // 2, n_informative),
                X_mixture_2.reshape(n_samples // 2, n_informative),
            )
        )
    elif simulation == "trunk_mix":
        mixture_idx = rng.choice(
            2, n_samples // 2, replace=True, shuffle=True, p=[mix, 1 - mix]
        )
        norm_params = [[mu_0, cov], [mu_1, cov]]
        X_mixture = np.fromiter(
            (
                rng1.multivariate_normal(*(norm_params[i]), size=1, method=method)
                for i in mixture_idx
            ),
            dtype=np.dtype((float, n_informative)),
        )
        X = np.vstack(
            (
                rng.multivariate_normal(
                    np.zeros(n_informative), cov, n_samples // 2, method=method
                ),
                X_mixture.reshape(n_samples // 2, n_informative),
            )
        )
    else:
        raise ValueError(f"Simulation must be: trunk, trunk_overlap, trunk_mix")
    if n_dim > n_informative:
        X = np.hstack(
            (X, rng.normal(loc=0, scale=1, size=(X.shape[0], n_dim - n_informative)))
        )
    y = np.concatenate((np.zeros(n_samples // 2), np.ones(n_samples // 2)))
    if return_params:
        returns = [X, y]
        if simulation == "trunk":
            returns += [[mu_0, mu_1], [cov, cov]]
        elif simulation == "trunk-overlap":
            returns += [[np.zeros(n_informative), np.zeros(n_informative)], [cov, cov]]
        elif simulation == "trunk-mix":
            returns += [*list(zip(*norm_params)), X_mixture]
        return returns
    return X, y

In [3]:
def hellinger_dot(p, q):
    """Hellinger distance between two discrete distributions. 
       Using numpy.
       For Python >= 3.5 only"""
    z = np.sqrt(p) - np.sqrt(q)
   #  print(z.shape)
    return np.linalg.norm(z) / math.sqrt(2*len(z))
   #  return np.sqrt(z @ z / 2)

In [4]:
def CalibratedClassifier_Statistics(model_name = 'SVM',samplesize = 4096, sim = 'trunk',dim = 4096,reps = 1):    
    if sim == 'trunk':
        mu0 = 0
        mu1 = 1
        mix_trunk = 0.5
    else:
        mu0 = 0
        mu1 = 5
        mix_trunk = 0.75
    X, Y = make_trunk_classification(n_samples=4096,
                                     n_dim=4096,
                                     n_informative=1,
                                     mu_0=mu0,
                                     mu_1=mu1,
                                    #  seed = 515,
                                     simulation = sim,
                                     mix = mix_trunk,
                                     rho = 0)
    X_0 = X[Y == 0]
    x_0 = X_0[:samplesize//2,:dim]
    X_1 = X[Y == 1]
    x_1 = X_1[:samplesize//2,:dim]

    x = np.vstack((x_0,x_1))
    y = np.array([0]*x_0.shape[0]+[1]*x_1.shape[0]).ravel()

    if model_name == 'SVM':
        base_model = SVC(probability=True,kernel = 'rbf') 
    if model_name == 'KNN':
        base_model =  KNeighborsClassifier(n_neighbors=int(np.sqrt(samplesize)))
    if model_name == 'LR':
        base_model = LogisticRegression(penalty='l1',solver = 'liblinear')


    MIs = []
    HDs = []
    S98s = []

    cv = StratifiedKFold(n_splits=5, shuffle=True)
    for train_ix, val_ix in cv.split(x,y):
        ### Split Data into Training Set (80%) and Test Set (20%)
        X_train, X_val = x[train_ix, :], x[val_ix, :]
        y_train, y_val = y[train_ix], y[val_ix]
        
        ### Split Training Set into Fitting Set (40%) and Calibarating Set (40%)
        X_fit,X_cal,y_fit,y_cal = train_test_split(X_train,y_train,test_size = 0.5,stratify = y_train)
        # print(X_fit.shape,X_cal.shape,X_val.shape)

        
        base_model.fit(X_fit, y_fit)
        # posterior_pri = base_model.predict_proba(X_val)
        # print(posterior_pri[:3])
        # fpr_pri, tpr_pri, thresholds_pri = roc_curve(y_val, posterior_pri[:,1], pos_label=1,drop_intermediate = False)
        
        
        calibrated_model = CalibratedClassifierCV(base_model, cv="prefit",method = 'isotonic')
        calibrated_model.fit(X_cal,y_cal)
        posterior = calibrated_model.predict_proba(X_val)
        # print(posterior[:3])
        
        ### Mutual Information
        stats_conen = np.mean(entropy(posterior, base=np.exp(1), axis=1))
        H_Y = entropy([50,50], base=np.exp(1))
        mi = H_Y - stats_conen
        MIs.append(mi)
        
        ### Helliger Distance
        hell_dist = hellinger_dot(posterior[:,0],posterior[:,1])
        HDs.append(hell_dist)

        ### S@98
        fpr, tpr, thresholds = roc_curve(y_val, posterior[:,1], pos_label=1,drop_intermediate = False)
        s98 = np.max(tpr[fpr<=0.02])

        ### if s98<= 0.02 means the classifier is less than chance, will just assign "chance"
        if s98 <=0.02:
            s98 = 0.02
        S98s.append(s98)
        
        # plt.plot(fpr_pri,tpr_pri,label = 'origin')
        # plt.plot(fpr,tpr,label = 'calibrated')
        # plt.plot(np.arange(0,1),np.arange(0,1),label = 'chance')
        # plt.legend()
    # np.savetxt(os.path.join(data_dir, f"SVM_samp{samplesize}_dim{dim}_sim{sim}_rep{reps}.csv"),S98s,delimiter= ',')
    # print(samplesize,dim,np.mean(S98s),reps)
    return np.mean(MIs),np.mean(HDs),np.mean(S98s)

    




In [13]:
data_dir_samp = '/Users/yuxin/Desktop/桌面 - SUKI’s MacBook Pro/CANCER-Paper/Simulaion_Compare/results/vs_samplesize'
data_dir_dim = '/Users/yuxin/Desktop/桌面 - SUKI’s MacBook Pro/CANCER-Paper/Simulaion_Compare/results/vs_dim'
SAMPLE_SIZES = [2**i for i in range(8,13)]
DIMENSIONS = [2**i for i in range(2,13)]
N_JOBS = -3
REPs = 10

## VS SampleSize

In [14]:
modelname = 'LR' ## Logistic Regression
# modelname = 'KNN' 
# modelname = 'SVM' 


simname = 'trunk'
# simname = 'trunk_overlap'
# simname = 'trunk_mix'
MI,HD,S98 = zip(*Parallel(n_jobs=N_JOBS)(delayed(CalibratedClassifier_Statistics)(model_name = modelname,
                                        samplesize = samp, 
                                        sim = simname,
                                        dim = 4096,
                                        reps = i) for samp in SAMPLE_SIZES for i in range(REPs)))
MI_array = np.array(MI).reshape((len(SAMPLE_SIZES),REPs))
HD_array = np.array(HD).reshape((len(SAMPLE_SIZES),REPs))
S98_array = np.array(S98).reshape((len(SAMPLE_SIZES),REPs))

np.savetxt(os.path.join(data_dir_samp, f"MI_{modelname}_{simname}.csv"),MI_array,delimiter=",")
np.savetxt(os.path.join(data_dir_samp, f"HD_{modelname}_{simname}.csv"),HD_array,delimiter=",")
np.savetxt(os.path.join(data_dir_samp, f"S98_{modelname}_{simname}.csv"),S98_array,delimiter=",")

## VS Dimension

In [12]:
modelname = 'LR' ## Logistic Regression
# modelname = 'KNN' 
# modelname = 'SVM' 


simname = 'trunk'
# simname = 'trunk_overlap'
# simname = 'trunk_mix'
MI,HD,S98 = zip(*Parallel(n_jobs=N_JOBS)(delayed(CalibratedClassifier_Statistics)(model_name = modelname,
                                        samplesize = 4096, 
                                        sim = simname,
                                        dim = dim,
                                        reps = i) for dim in DIMENSIONS for i in range(REPs)))
MI_array = np.array(MI).reshape((len(DIMENSIONS),REPs))
HD_array = np.array(HD).reshape((len(DIMENSIONS),REPs))
S98_array = np.array(S98).reshape((len(DIMENSIONS),REPs))

np.savetxt(os.path.join(data_dir_dim, f"MI_{modelname}_{simname}.csv"),MI_array,delimiter=",")
np.savetxt(os.path.join(data_dir_dim, f"HD_{modelname}_{simname}.csv"),HD_array,delimiter=",")
np.savetxt(os.path.join(data_dir_dim, f"S98_{modelname}_{simname}.csv"),S98_array,delimiter=",")