## Set up the Environment

In [None]:
from time import time
import os
import pathlib
import pickle
import importlib

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.neural_network import MLPClassifier
from sklearn.neighbors import KernelDensity
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.base import BaseEstimator, TransformerMixin
import matplotlib.pyplot as plt
import scipy.stats as st

import pprint
pp = pprint.PrettyPrinter(indent=4)

from ectrl.control import split_the_data
from ectrl.control import ClassificationTest, Umbrella, Typicality, DirectNP, TBC, EnsembleTBC
from ectrl.control import ForcedInductiveConformal
from ectrl.ratio import KernelDensityRatio
from ectrl.augment import Interpolator
from ectrl.evaluate import evaluate_once
from ectrl.analyze import plot_3, analyze_numerically, select, style, plot_time
import util
import normeval

from datetime import datetime
from statsmodels.stats.proportion import proportion_confint
import math

In [None]:
# Where to place the plots and the results
res_dir = os.path.join('norm', 'results')

# Make the directory if it doesn't exist
pathlib.Path(res_dir).mkdir(parents=True, exist_ok=True)

In [None]:
res_dir

In [None]:
set_prefix = 'norm_'

## Prepare the Data

In [None]:
# Create a function that generates datasets
def generate_dataset(n_range, s_range, k_range, M, verbose=True, mean_range=[-100, 100],
                    sigma_range=[0.1, 40]):
    # Generate non-normal samples
    nonnormal_samples = util.generate_pearson_nonnormal_samples(s_range, k_range, n_range,
                                                                M, mean_range=mean_range,
                                                               sigma_range=sigma_range)

    # Calculate L, the number of normal samples of the same size
    L = len(nonnormal_samples) // len(n_range)
            
    # Generate L normal samples of size n for each n in n_range
    normal_samples = util.generate_normal_samples(n_range, L, mean_range=mean_range,
                                                 sigma_range=sigma_range)

    # Print how many samples were generated
    if verbose:
        print("Normal samples: ", len(normal_samples))
        print("Non-normal samples: ", len(nonnormal_samples))

    # Label the sets
    y = [1 for sample in normal_samples] + [0 for sample in nonnormal_samples]

    # Unify them
    all_samples = normal_samples + nonnormal_samples
    
    return all_samples, y

In [None]:
s_range = [0]
k_range = [1.02] + np.arange(1, 3, 0.125).tolist()[1:] + [2.98]
mean_range = [-100, 100]
sigma_range = [0.01, 40]
n_range = [20]


data, y = generate_dataset(n_range, s_range, k_range, 200)
y = np.array(y)

In [None]:
class DescriptorBuilder(TransformerMixin, BaseEstimator):
    def __init__(self, n=20):
        super(DescriptorBuilder, self).__init__()
        self.features = [f'b{i}' for i in range(1, n - 1)]
        
    def fit(self, X, y=None):
        # Not needed, but present for compatibility.
        return self
    
    def transform(self, X, y=None):
        Z = np.sort(X)
        
        Z_min = np.min(Z, axis=1)[:, np.newaxis]
        Z_max = np.max(Z, axis=1)[:, np.newaxis]
        Z = -3 + 6 * (Z - Z_min) / (Z_max - Z_min)
        
        return pd.DataFrame(Z[:, 1:-1], columns=self.features)

In [None]:
descriptor_builder = DescriptorBuilder(n=20)

In [None]:
X = descriptor_builder.transform(data)
X

In [None]:
def create_preprocessor():
    # Use the mean imputer and standard scaler for the numerical features.
 
    preprocessor = Pipeline(steps=[('imputer', SimpleImputer(strategy = 'mean') ),
                                     ('std_scaler', StandardScaler())
                                    ])
    
    return preprocessor

def create_classifier():
    neural_net = MLPClassifier(solver='lbfgs', max_iter=500,
                           activation='relu',
                           alpha=0.01,
                           hidden_layer_sizes=[9, 9],
                           early_stopping=True, 
                           max_fun=13873,
                           validation_fraction=0.2)
    return neural_net

def create_clf_pipeline():
    preprocessor = create_preprocessor()
    classifier = create_classifier()
    
    return Pipeline(steps=[('preprocessor', preprocessor), ('classifier', classifier)])

In [None]:
target_class = 1

In [None]:
(y == target_class).sum(), (y == 1 - target_class).sum()

## Create Controllers

In [None]:
controllers = {}

### SCT

In [None]:
# Define a pipeline containing the preprocessor and classifier
clf = create_clf_pipeline()

# Define a function to extract the score
def nn_statistic(nn_classifier, X):
    X = nn_classifier['preprocessor'].transform(X)
    return nn_classifier['classifier'].predict_proba(X)[:, target_class]

nn_test = ClassificationTest(clf, nn_statistic,
                              reserve=0.5,
                              ci=None,
                              augmentor=None, 
                              sample_size=None,
                              target_class=target_class)
controllers['SCT'] = nn_test

### Umbrella

In [None]:
# Add UA controllers
for delta in [0.01, 0.05]:
    for ensemble_size in [1, 5]:
        base_clf = create_clf_pipeline()
        ua = Umbrella(base_clf, nn_statistic, target_class=target_class,
              delta=delta,
              thresholds_size=0.5, # reserve 50% of target objects for classification
              ensemble_size=ensemble_size
             )
        name = f'UA(delta={delta},m={ensemble_size})'
        controllers[name] = ua

### TBC and WTBC

In [None]:
# Add TBC and WTBC controllers
for k in [3, 5, 7, 10]:
    for test in ['mwu', 'ttest_ind']:
        for distance in ['cityblock', 'cosine']:
            for weights in ['none', 'reciprocal']:
                tbc = TBC(k=k, test=test, distance=distance,
                          weights=weights, target_class=target_class)
                preprocessor = create_preprocessor()
                if weights != 'none':
                    method = 'WTBC'
                    name = f'WTBC(k={k},test={test},distance={distance},weights={weights})'
                else:
                    method = 'TBC'
                    name = f'TBC(k={k},test={test},distance={distance})'
                
                controllers[name] = Pipeline(steps=[('preprocessor', preprocessor),
                                                (method, tbc)])

### CPF

In [None]:
# Add CPF
def nn_statistic_for_cpf(nn_classifier, X):
    return nn_classifier.predict_proba(X)[:,target_class]

for nonconformity in ['score', 'avgdev', 'nearest_neighbor']:
    clf = create_classifier()
    cpf = ForcedInductiveConformal(clf, nn_statistic_for_cpf,
                               target_class=target_class, alpha=0.05,
                               reserve=0.5, # use 50% as D_proper
                                            # and the remaining 50% for calibration
                               nonconformity=nonconformity,
                               random_state=None)
    cpf = Pipeline(steps=[ ('preprocessor', create_preprocessor()), ('CPF', cpf)])
    name = f'CPF(nonconformity={nonconformity})'
    controllers[name] = cpf

### Typicality

In [None]:
# Add Typicality Indices
# Combine the preprocessor and the density estimator
density_pipeline = Pipeline([
    ('preprocessor', create_preprocessor()),
    ('kde', KernelDensity()),
])

typicality = Typicality(density_pipeline, target_class=target_class)

controllers['TI'] = typicality

### Direct Neyman-Pearson Classification

In [None]:
# Add DNP

# instantiate a kernel density-ratio estimator
kdr = KernelDensityRatio(kernel='polynomial',
                         kernel_parameters={'degree' : 2})

direct_np = DirectNP(kdr,
                     target_class=target_class,
                     threshold_subset_size=0.5) # 50% (use them to estimate the thresholds)

dnp = Pipeline(steps=[('preprocessor', create_preprocessor()), ('DNP', direct_np)])
controllers['DNP'] = dnp

In [None]:
print(len(controllers))

## Evaluate the Controllers

In [None]:
# Settings
seed = 11
eval_size = 0.3
nominal_rates =  np.arange(0.01, 1, 0.01)
confidence_level = 0.99

In [None]:
# Run the experiment and evaluate all the controllers
eval_results = evaluate_once(
    controllers, 
    X, y, 
    target_class,
    eval_size,
    seed, 
    nominal_rates,
    confidence_level=confidence_level
)

In [None]:
# Unpack the results
df_results, df_clf_times, df_fit_times = eval_results

In [None]:
# Save the results
filepath = os.path.join(res_dir, 'df_results.csv')
df_results.to_csv(filepath, index=False)

filepath = os.path.join(res_dir, 'df_fit_times.csv')
df_fit_times.to_csv(filepath, index=False)

filepath = os.path.join(res_dir, 'df_clf_times.csv')
df_clf_times.to_csv(filepath, index=False)

## Analyze

### Load the Results

In [None]:
filepath = os.path.join(res_dir, 'df_results.csv')
df_results = pd.read_csv(filepath)

filepath = os.path.join(res_dir, 'df_fit_times.csv')
df_fit_times = pd.read_csv(filepath)

filepath = os.path.join(res_dir, 'df_clf_times.csv')
df_clf_times = pd.read_csv(filepath)

### Choose the Best UA

In [None]:
choices = {}

In [None]:
plot_3(
    df_results[df_results['method'] == 'UA'],
    'nominal',
    'target_estimate', 
    'delta',
    ribbon=('target_lower', 'target_upper'),
    ab = (1, 0), 
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'UA',
    width=7, height=5,
    facet_parameter='ensemble_size'
)

In [None]:
plot_3(
    df_results[df_results['method'] == 'UA'],
    'target_estimate',
    'nontarget_estimate', 
    'delta',
    ribbon=('nontarget_lower', 'nontarget_upper'), 
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'UA',
    width=7, height=5,
    facet_parameter='ensemble_size'
)

In [None]:
r = analyze_numerically(df_results, ['delta', 'ensemble_size'], 'UA')
r.sort_values('D(A, OA)')

In [None]:
r.sort_values('D(A, B, OA)')

In [None]:
choices['UA'] = {
    'exact' : {
        'ensemble_size' : 5,
        'delta' : 0.05
    },
    'valid' : {
        'ensemble_size' : 5,
        'delta' : 0.05
    }
}

### Choose the Best TBC

In [None]:
plot_3(
    df_results[df_results['method'] == 'TBC'],
    'nominal',
    'target_estimate', 
    'k',
    ribbon=('target_lower', 'target_upper'),
    ab = (1, 0), 
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'TBC',
    width=7, height=5,
    facet_parameter='test+distance'
)

In [None]:
plot_3(
    df_results[df_results['method'] == 'TBC'],
    'target_estimate',
    'nontarget_estimate',
    'k',
    ribbon=('nontarget_lower', 'nontarget_upper'),
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'TBC',
    width=7, height=5,
    facet_parameter='test+distance'
)

In [None]:
plot_3(
    df_results[df_results['method'] == 'TBC'],
    'nominal',
    'nontarget_estimate',
    'k',
    ribbon=('nontarget_lower', 'nontarget_upper'),
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'TBC',
    width=7, height=5,
    facet_parameter='test+distance'
)

In [None]:
r = analyze_numerically(df_results, ['k', 'test', 'distance'], 'TBC')
r.sort_values('D(A, B, OA)')

In [None]:
choices['TBC'] = {
    'valid' : {
        'k' : 3,
        'test' : 'ttest_ind',
        'distance' : 'cityblock'
    },
    'exact' : {
        'k' : 3,
        'test' : 'ttest_ind',
        'distance' : 'cityblock'
    }
}

### Choose the Best WTBC

In [None]:
import importlib
import ectrl.analyze
importlib.reload(ectrl.analyze)
from ectrl.analyze import plot_3
plot_3(
    df_results[df_results['method'] == 'WTBC'],
    'nominal',
    'target_estimate', 
    'k',
    ribbon=('target_lower', 'target_upper'),
    ab = (1, 0), 
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'WTBC',
    width=7, height=5,
    #x_lim=(0, 0.1), y_lim=(0, 0.1),
    facet_parameter='test+distance'
)

In [None]:
plot_3(
    df_results[df_results['method'] == 'WTBC'],
    'target_estimate',
    'nontarget_estimate',
    'k',
    #ribbon=('nontarget_lower', 'nontarget_upper'),
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'WTBC',
    width=7, height=5,
    facet_parameter='test+distance'
)

In [None]:
plot_3(
    df_results[df_results['method'] == 'WTBC'],
    'nominal',
    'nontarget_estimate',
    'k',
    #ribbon=('nontarget_lower', 'nontarget_upper'),
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'WTBC',
    width=7, height=5,
    facet_parameter='test+distance'
)

In [None]:
r = analyze_numerically(df_results, ['k', 'test', 'distance'], 'WTBC')
r.sort_values('D(A, B, OA)')

In [None]:
choices['WTBC'] = {
    'valid' : {
        'k' : 3,
        'test' : 'ttest_ind',
        'distance' : 'cityblock'
    },
    'exact' : {
        'k' : 3,
        'test' : 'ttest_ind',
        'distance' : 'cosine' 
    }
}

### Choose the Best CPF

In [None]:
plot_3(
    df_results[df_results['method'] == 'CPF'],
    'nominal',
    'target_estimate', 
    'nonconformity',
    ribbon=('target_lower', 'target_upper'),
    ab = (1, 0), 
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'CPF',
    width=7, height=5
)

In [None]:
plot_3(
    df_results[df_results['method'] == 'CPF'],
    'target_estimate',
    'nontarget_estimate',
    'nonconformity',
    ribbon=('nontarget_lower', 'nontarget_upper'),
    display_plot=False,
    save_plot=True, location=res_dir, prefix=set_prefix + 'WTBC',
    width=7, height=5
)

In [None]:
r = analyze_numerically(df_results, ['nonconformity'], 'CPF')
r.sort_values('D(A, B, OA)')

In [None]:
choices['CPF'] = {
    'exact' :{
        'nonconformity' : 'avgdev'
    },
    'valid' :{
        'nonconformity' : 'avgdev'
    }
}

### Filter

In [None]:
choices['SCT'] = {'exact' : {}, 'valid' : {}}
choices['TI'] = {'exact' : {}, 'valid' : {}}
choices['DNP'] = {'exact' : {}, 'valid' : {}}

pp.pprint(choices)

In [None]:
results_dfs = {}
clf_times_dfs = {}
fit_times_dfs = {}

for focus in ['exact', 'valid']:
    focus_choices = {method : choices[method][focus] for method in choices}
    results_dfs[focus] = select(df_results, focus_choices)
    clf_times_dfs[focus] = select(df_clf_times, focus_choices)
    fit_times_dfs[focus] = select(df_fit_times, focus_choices)

In [None]:
f = os.path.join(res_dir, f'{set_prefix}choices.p')
pickle.dump(choices, open(f, 'wb'))

### Analyze Time

#### Average Classification Time

In [None]:
for focus in clf_times_dfs:
    print(focus)
    display(clf_times_dfs[focus][['method', 'time']].sort_values('time'))

In [None]:
for focus in clf_times_dfs:
    plot_time(clf_times_dfs[focus], 'method', 'time', 'method',
         location=res_dir, name=set_prefix + f'{focus}_average_classification_time.jpg')

#### Fit Times

In [None]:
for focus in fit_times_dfs:
    print(focus)
    display(fit_times_dfs[focus][['method', 'time']].sort_values('time'))

In [None]:
for focus in fit_times_dfs:
    plot_time(fit_times_dfs[focus], 'method', 'time', 'method',
         location=res_dir, name=set_prefix + f'{focus}_fit_times.jpg')

### Check the Rates

#### Nominal vs. Target Estimate

In [None]:
for focus in results_dfs:
    print(focus)
    g = plot_3(
        results_dfs[focus],
        'nominal',
        'target_estimate',
        'method',
        ribbon=('target_lower', 'target_upper'),
        ab=(1, 0),
        legend_position=(0.35, 0.8), legend_name='', legend_ncol=2,
        legend_text_size=15, legend_key_width=35,
        display_plot=True,
        save_plot=True, location=res_dir, width=5.35, height=4.35,
        prefix= set_prefix + f'_{focus}_'
    )

#### Target vs. Other (Estimates)

In [None]:
for focus in results_dfs:
    print(focus)
    plot_3(
        results_dfs[focus],
        'target_estimate',
        'nontarget_estimate',
        'method',
        ribbon=('nontarget_lower', 'nontarget_upper'),
        save_plot=True, location=res_dir, width=5, height=4,
        prefix=set_prefix + f'_{focus}_'
    )

#### Nominal vs. Other

In [None]:
import ectrl.analyze
importlib.reload(ectrl.analyze)
from ectrl.analyze import style, plot_3

In [None]:
for focus in results_dfs:
    print(focus)
    g = plot_3(
        results_dfs[focus],
        'nominal',
        'nontarget_estimate',
        'method',
        ribbon=('nontarget_lower', 'nontarget_upper'),
        legend_position=(0.5, 0.9), legend_name='', legend_ncol=4, legend_title=False,
        legend_text_size=13, legend_key_width=35, legend_key_height=10,
        display_plot=True,
        save_plot=True, location=res_dir, width=5.35, height=4.35,
        prefix= set_prefix + f'_{focus}_'
    )

#### Nominal vs. Accuracy

In [None]:
for focus in results_dfs:
    print(focus)
    plot_3(
        results_dfs[focus],
        'nominal',
        'accuracy_estimate',
        'method',
        ribbon=('accuracy_lower', 'accuracy_upper'),
        save_plot=True, location=res_dir, width=5, height=4,
        prefix=set_prefix + f'_{focus}_'
    )

#### Target Estimate vs. Accuracy

In [None]:
for focus in results_dfs:
    print(focus)
    plot_3(
        df_results,
        'target_estimate',
        'accuracy_estimate',
        'method',
        #ribbon=('accuracy_lower', 'accuracy_upper'),
        save_plot=True, location=res_dir, width=5, height=4,
        prefix=set_prefix + f'_{focus}_'
)

## Compare With Normality Tests

In [None]:
def create_preprocessor11():
    # Use the mean imputer and standard scaler for the numerical features.
 
    preprocessor = Pipeline(steps=[('imputer', SimpleImputer(strategy = 'mean') ),
                                     ('std_scaler', StandardScaler())
                                    ])
    
    return preprocessor

def create_classifier11():
    neural_net = MLPClassifier(solver='lbfgs', max_iter=500,
                           activation='relu',
                           alpha=0.01,
                           hidden_layer_sizes=[10],
                           early_stopping=True, 
                           max_fun=13873,
                           validation_fraction=0.2)
    return neural_net

def create_clf_pipeline11():
    preprocessor = create_preprocessor11()
    classifier = create_classifier11()
    
    return Pipeline(steps=[('preprocessor', preprocessor), ('classifier', classifier)])

In [None]:
alphas = [0.01, 0.05]

test_names = ['SW', 'LF', 'AD', 'DP', 'JB', 'SF', 'CVM']

tests = {
    alpha : {} for alpha in alphas
}

for test_name in test_names:
    test_function, test_statistic = util.get_test(test_name)
    for alpha in alphas:
        test = util.TestClassifier(test_function, test_statistic, alpha)
        tests[alpha][test_name] = test

class KurtosisTest(object):
    def __init__(self, alpha, alternative):
        self.alpha = alpha
        self.alternative = alternative

    def predict(self, X):
        return st.kurtosistest(X, axis=1, alternative=self.alternative).pvalue >= self.alpha

for alpha in alphas:
    tests[alpha]['KT'] = KurtosisTest(alpha, 'less')

In [None]:
# Define a pipeline containing the preprocessor and classifier
clf = create_clf_pipeline11()

# Define a function to extract the score
def nn_statistic(nn_classifier, X):
    X = nn_classifier['preprocessor'].transform(X)
    return nn_classifier['classifier'].predict_proba(X)[:, target_class]

for alpha in alphas:
    nn_test = ClassificationTest(clf, nn_statistic,
                              reserve=0.5,
                              ci=None,
                              alpha=alpha,
                              augmentor=None, 
                              sample_size=None,
                              target_class=target_class)
    tests[alpha]['SCT'] = nn_test

In [None]:
q = 0.2
data, y = generate_dataset([20], s_range, k_range, 400, mean_range=mean_range,
                           sigma_range=sigma_range)
y = np.array(y)
descriptor_builder = DescriptorBuilder(q=q)
X = descriptor_builder.transform(data)
X

In [None]:
for alpha in alphas:
    print('Fitting ', alpha)
    tests[alpha]['SCT'].fit(X, y)
    print('\t', tests[alpha]['SCT'].classifier['classifier'].loss_)

In [None]:
#for alpha in alphas:
#    print(tests[alpha]['SCT'].classifier['classifier'].loss_)

#### Evaluate on symmetric platykurtic data

In [None]:
L = 10000
G_data = normeval.generate_G_data(n_range=[20], L=L, groups={4 : normeval.groups[4]})

In [None]:
dfs = []

In [None]:
for alpha in tests:
    print(alpha)
    for code in tests[alpha]:
        print('\t', code)
        test = tests[alpha][code]
        if code == 'SCT':
            df = normeval.evaluate_power(test,
                                         {key : descriptor_builder.transform(G_data[key]) for key in G_data})
        else:
            df = normeval.evaluate_power(test, G_data)
        df['alpha'] = alpha
        df['test'] = code
        dfs.append(df)

In [None]:
res_pow_df = pd.concat(dfs)
res_pow_df = res_pow_df.reset_index(drop=True)
res_pow_df = res_pow_df.rename({4 : 'power'}, axis=1)
res_pow_df['alpha'] = res_pow_df['alpha'].astype(str)
res_pow_df['power'] = res_pow_df['power'].astype(float)
lower, upper = proportion_confint(res_pow_df['power'] * L, L, alpha=0.01, method='beta')
res_pow_df['lower'] = lower
res_pow_df['upper'] = upper

In [None]:
for alpha in alphas:
    mask = (res_pow_df['alpha'] == str(alpha)) & (~res_pow_df['test'].isin(['JB', 'SF']))
    g = ggplot(res_pow_df[mask], aes(x='test', y='power', color='test', fill='test')) +\
        geom_bar(stat='identity', alpha=0.4, size=1, show_legend=False) +\
        geom_errorbar(aes(ymin='lower', ymax='upper'), size=2, show_legend=False) +\
        scale_color_brewer(type='qual', palette=2) +\
        scale_fill_brewer(type='qual', palette=2) +\
        labs(x = '', y='Power') +\
        theme_classic() +\
        theme(legend_position='none', text=element_text(size=18))
    
    display(g)
    filename = os.path.join(res_dir, f'norm_test_power_G4_20_{alpha}.jpg')
    print(filename)
    g.save(filename, dpi=500, width=5, height=4)

#### Evaluate on New SP Data

In [None]:
new_k_range = np.array(k_range[:-1]) + np.diff(k_range) / 2
print(new_k_range)
new_k_range = [float(k) for k in new_k_range]

In [None]:
L = 10000
nonnormal_data = {k : None for k in new_k_range}

for k in new_k_range:
    print(k)
    nonnormal_data[k] = util.generate_pearson_nonnormal_samples([0], [k], [20], L,
                                                         mean_range=mean_range,
                                                        sigma_range=sigma_range)

In [None]:
sp_results = []
for alpha in tests:
    for code in tests[alpha]:
        print(code, alpha)
        for k in nonnormal_data:
            if code == 'SCT':
                sp_samples = descriptor_builder.transform(nonnormal_data[k])
            else:
                sp_samples = nonnormal_data[k]
            y_sp_pred = tests[alpha][code].predict(sp_samples)
            power = (np.array(y_sp_pred) == 0).mean()
            sp_results.append([k, alpha, code, power])
            print('\t', k, power)

In [None]:
sp_pow_df = pd.DataFrame(sp_results, columns=['k', 'alpha', 'test', 'power'])
#sp_pow_df = res_pow_df.reset_index(drop=True)
#res_pow_df = res_pow_df.rename({4 : 'power'}, axis=1)
sp_pow_df['alpha'] = sp_pow_df['alpha'].astype(str)
sp_pow_df['power'] = sp_pow_df['power'].astype(float)
lower, upper = proportion_confint(sp_pow_df['power'] * L, L, alpha=0.01, method='beta')
sp_pow_df['lower'] = lower
sp_pow_df['upper'] = upper
sp_pow_df

In [None]:
legend_spec = guide_legend(ncol=2)
for alpha in alphas:
    mask = (sp_pow_df['alpha'] == str(alpha)) & ~sp_pow_df['test'].isin(['JB', 'SF'])
    g = ggplot(sp_pow_df[mask], aes(x='k', y='power', 
                                    shape='test', color='test')) +\
        geom_line(aes(group='test'), size=1.3, linetype='dotted') +\
        geom_point(size=4) +\
        scale_color_brewer(type='qual', palette=2) +\
        scale_fill_brewer(type='qual', palette=2) +\
        labs(x = 'Kurtosis', y='Power') +\
        theme_classic() +\
        theme(legend_title=element_blank(), legend_position=(0.65, 0.7),
             legend_key_width=30, legend_box_margin=0) +\
        guides(color=legend_spec)
    
    display(g)
    filename = os.path.join(res_dir, f'norm_test_power_sp_20_{alpha}.jpg')
    print(filename)
    g.save(filename, dpi=500, width=5, height=4)
#geom_errorbar(aes(ymin='lower', ymax='upper'), size=2, show_legend=False) +\

## Nondeterministic Test

## Nondeterministic Test (old)

In [None]:
# Make a pipeline containing the preprocessor and classifier
clf = create_clf_pipeline()

### Evaluation 1

In [None]:
from datetime import datetime
from statsmodels.stats.proportion import proportion_confint
import math

res_dir = os.path.join(res_dir, 'nondet')
pathlib.Path(res_dir).mkdir(parents=True, exist_ok=True)

In [None]:
def evaluate(target, nominal, epsilon, runs, m, n, X, y, seeds, clf):
    support = 0
    estimates = []
    lower_estimates = []
    upper_estimates = []
    errors = []

    error = 0
    lower_bound = (1 - nominal) / n
    upper_bound = (epsilon * (n + 1) - nominal) / n

    for k in range(runs):
        if k % 10 == 9:
            print(k + 1, datetime.now().strftime('%H:%M:%S'), end='\r', flush=True)
        rng = np.random.default_rng(seeds[k])
        X_cv, X_eval, y_cv, y_eval = train_test_split(X, y, 
                                                  test_size=0.3, 
                                                  stratify = y.tolist(),
                                                 random_state=seeds[k])
    
        clf.set_params(**{'classifier__random_state' : seeds[k]})
        clf = clf.fit(X_cv, y_cv)
    
        Z = X_eval.loc[y_eval == target, :].reset_index(drop=True)
        scores = clf.predict_proba(Z)[:, target_class]
    
        false = 0
        for j in range(m):
            sample = rng.choice(scores, n + 1, replace=False)
            score = sample[0] 
            sample = np.sort(sample[1:])
        
            if target == 1:
                more_extreme = np.searchsorted(sample, score, 'right')
            else:
                i = np.searchsorted(sample, score, 'left')
                more_extreme = n - i + 1
        
            p_value = more_extreme / n
        
            if epsilon > 0:
                correction = rng.uniform(lower_bound, upper_bound)
                if p_value + correction <= nominal:
                    false = false + 1
            else:
                #r = st.norm.ppf(1 - 0.01 / 2) / (4*len(scores))
                if p_value <= (nominal * (n + 1) - 1) / n:
                    false = false + 1
    
        lower, upper = proportion_confint(false, m, 0.01, 'jeffreys')

        estimate = false / m
        estimates.append(estimate)
    
        lower_estimates.append(lower)
        upper_estimates.append(upper)
    
        if nominal < estimate:
            error = error + (estimate - nominal)
        elif estimate < nominal - epsilon:
            error = error + (nominal - epsilon - estimate)
            
    return (lower_estimates, estimates, upper_estimates, error)

In [None]:
results = {}
errors = []
supports = []

settings = [
    (0.05, 20, 0), (0.05, 50, 0), (0.05, 50, 0.02), (0.05, 100, 0.01)
]

for alpha, n, epsilon in settings:
    

In [None]:
runs = 100
target = 1
seeds = [11*k + 19 for k in range(runs)]
m = 10000

results = {}
errors = []
supports = []

settings = [
    (0.05, 20, 0), (0.05, 50, 0), (0.05, 50, 0.02), (0.05, 100, 0.01)
]

for nominal, n, epsilon in settings: 
    if nominal <= epsilon or (epsilon > 0 and epsilon < 1 / (n + 1)):
        continue
    print(nominal, n, epsilon, '\n')

    results[(nominal, epsilon, n)] = {}

    lower, estimates, upper, error = evaluate(target, nominal, epsilon,
                                                 runs, m, n, X, y, seeds, clf)
        
    results[(nominal, epsilon, n)]['df'] = pd.DataFrame(
                {'lower':lower, 'estimates': estimates, 'upper':upper}
            )
    errors.append([nominal, n, error/runs])
        
    support = 0
    for i in range(runs):
        if nominal - epsilon <= estimates[i] <= nominal:
            support = support + 1

    e = support / runs
    l, u = proportion_confint(support, runs, 0.01, 'jeffreys')
    supports.append([nominal, n, l, e, u])

In [None]:
pickle.dump(results, open(os.path.join(res_dir, 'interval_df.p'), 'wb'))

In [None]:
res_nd = pickle.load(open(os.path.join(res_dir, 'interval_df.p'), 'rb'))
res_nd

In [None]:
errors # average errors

In [None]:
#epsilon = 0.02
for key in res_nd:
    alpha, epsilon, n = key
    print(key)
 
    df = res_nd[key]['df']
    runs = df.shape[0]
    fig = plt.figure(figsize=(4, 2), dpi=500)
    plt.rc('font', size=10) 
    x = list(range(runs))
    #plt.fill_between(x, df['lower'], df['upper'], color='red', alpha=0.2)
    #plt.plot(x, df['estimates'], color='blue', linestyle='--', linewidth=0.5) #
    plt.scatter(x, df['estimates'], marker='x', color='blue', s=2)
    plt.plot((x, x), (df['lower'], df['upper']), color='blue',
             linestyle='--', linewidth=0.5)

    plt.plot(x, np.repeat(alpha, runs), color='black', linewidth=1.5)
    if epsilon > 0:
        plt.plot(x, np.repeat(alpha - epsilon, runs), color='black', linewidth=1.5)
    
    plt.xlabel('')
    plt.ylabel('')
    plt.xticks([])

    filename = os.path.join(res_dir, f'interval_norm_{alpha}_{epsilon}_{n}.jpg')
    plt.tight_layout()
    plt.savefig(filename, dpi=500, figsize=(4, 2), bbox_inches='tight')
    #plt.title(key)

In [None]:
clf = create_clf_pipeline()
clf.fit(X, y)

In [None]:
clf['classifier'].loss_

### Evaluation 2

In [None]:
ndt_res_dir = os.path.join('norm', 'results', 'ndt_v2')
pathlib.Path(ndt_res_dir).mkdir(parents=True, exist_ok=True)

In [None]:
class DescriptorBuilder1(TransformerMixin, BaseEstimator):
    def __init__(self, q=0.1):
        super(DescriptorBuilder1, self).__init__()
        self.q = q
        # Set the names of the features in the descriptors
        self.features = ['n', 'mad', 'skewness', 'kurtosis',
                         'left_mean', 'right_mean', 'kt']
        
        if self.q is not None:
            qs = [round(j * q, 8) for j in range(1, int(1 / q) + 1)]
            if qs[-1] == 1:
                qs = qs[:-1]
            self.qs = qs
        
            self.features += ['q{:.2f}'.format(q) for q in self.qs]
        
    def fit(self, X, y=None):
        # Not needed, but present for compatibility.
        return self
    
    def transform(self, X, y=None):
        # Note: Currently works only on a list of lists or a single list.
        if isinstance(X, list):
            if all(isinstance(x, list) for x in X):
                X = [self.get_descriptor(x) for x in X]
                return pd.DataFrame(X, columns=self.features)
            else:
                X = self.get_descriptor(X)
                return X
        else:
            # Pandas dataframes and numpy arrays are not supported for now.
            pass
        
    def get_descriptor(self, sample, sort=True, eps=1e-8):
        if sort:
            sample.sort()
        
        # Calculate the DP test's p-value
        dpp = st.normaltest(sample)[1]
        
        # Scale the sample to [-3, 3]
        maximum = max(sample)
        minimum = min(sample)
        sample = [-3 + 6 * (x - minimum) / (maximum - minimum) for x in sample]
        
        # Calculate selected descriptive statistics
        maximum = max(sample)
        minimum = min(sample)
        n = len(sample)
        mean = np.mean(sample)
        median = np.median(sample)
        sd = np.std(sample)
        kurtosis = st.kurtosis(sample, fisher=False, bias=False)
        skewness = st.skew(sample)
        mad = st.median_abs_deviation(sample)
        
        # Calculate the means of the left and right tails
        coeff = 0.2 # 0.1
        left_limit = int(np.floor(coeff * n))
        left_tail = sample[:left_limit]
        left_mean = np.mean(left_tail)

        coeff = 0.2
        right_limit = int(np.ceil(n - coeff * n))
        right_tail = sample[right_limit:]
        right_mean = np.mean(right_tail)
        
        
        # Create the descriptor
        descriptor = [n, mad, skewness, kurtosis,
                      left_mean, right_mean, dpp] 

            
        # Include the quantiles to the descriptor if q is specified
        if self.q is not None:
            quantiles = np.quantile(sample, self.qs)#.tolist()
            quantiles = quantiles.tolist()
            descriptor.extend(quantiles)
        
        return descriptor

In [None]:
def create_clf(q=0.2):
    # Use the mean imputer and standard scaler for the numerical features.
    descriptor_builder = DescriptorBuilder1(q=q)
 
    preprocessor = Pipeline(steps=[('imputer', SimpleImputer(strategy = 'mean') ),
                                     ('std_scaler', StandardScaler())
                                    ])
    
    neural_net = MLPClassifier(solver='lbfgs', max_iter=100,
                           activation='relu',
                           alpha=0.01,
                           hidden_layer_sizes=[9, 9],
                           early_stopping=True, 
                           max_fun=13873,
                           validation_fraction=0.2)
    
    return Pipeline(steps=[
        ('descriptor_builder', descriptor_builder),
        ('preprocessor', preprocessor),
        ('classifier', neural_net)
    ])

In [None]:
s_range = [0]
k_range = [1.02] + np.arange(1, 3, 0.125).tolist()[1:] + [2.98]
mean_range = [-100, 100]
sigma_range = [0.01, 40]
n_range = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]


data, y = generate_dataset(n_range, s_range, k_range, 100,
                           mean_range=mean_range, sigma_range=sigma_range)
y = np.array(y)

In [None]:
clf = create_clf(q=0.2)
clf.fit(data, y)

In [None]:
clf['classifier'].loss_

In [None]:
y_pred = clf.predict(data)

In [None]:
ns = [len(sample) for sample in data]
ns = np.array(ns)
y_pred = np.array(y_pred)
for n in np.unique(ns):
    mask = (ns == n)
    print(n, np.round((np.equal(y[mask], y_pred[mask])).mean(), 4),
         np.round((np.equal(y[mask & (y == 1)], y_pred[mask & (y == 1)])).mean(), 4),
         np.round((np.equal(y[mask & (y == 0)], y_pred[mask & (y == 0)])).mean(), 4))

In [None]:
ns = [len(sample) for sample in data]
ns = np.array(ns)
y_pred = np.array(y_pred)
for n in np.unique(ns):
    mask = (ns == n)
    
    print(n, np.round((np.equal(y[mask], y_pred[mask])).mean(), 4),
         np.round((np.equal(y[mask & (y == 1)], y_pred[mask & (y == 1)])).mean(), 4),
         np.round((np.equal(y[mask & (y == 0)], y_pred[mask & (y == 0)])).mean(), 4))

### Test on New Data

In [None]:
s_range = [0] #NNm
k_range = [1.02] + np.arange(1, 3, 0.125).tolist()[1:] + [2.98]
mean_range1 = [1000, 2000]
sigma_range1 = [100, 200]
n_range1 = [15, 25, 35, 45, 55, 65, 75, 85, 95]


test_data, y_test = generate_dataset(n_range, s_range, k_range, 100,
                                     mean_range=mean_range1, sigma_range=sigma_range1)
y_test = np.array(y_test)

In [None]:
y_pred_test = clf.predict(test_data)

In [None]:
ns_test = [len(sample) for sample in test_data]
ns_test = np.array(ns_test)
y_pred_test = np.array(y_pred_test)
for n in np.unique(ns_test):
    mask = (ns_test == n)
    
    print(n, np.round((np.equal(y_test[mask], y_pred_test[mask])).mean(), 4),
         np.round((np.equal(y_test[mask & (y_test == 1)], y_pred_test[mask & (y_test == 1)])).mean(), 4),
         np.round((np.equal(y_test[mask & (y_test == 0)], y_pred_test[mask & (y_test == 0)])).mean(), 4))

### NDT

In [None]:
class NDNNTest(object):
    """docstring for NDTNormality"""
    def __init__(self, clf, alpha, m, rng, n_range, mean_range, sigma_range, n10=True, target_class=1, epsilon=0):
        super(NDNNTest, self).__init__()
        self.clf = clf
        self.alpha = alpha
        self.m = m
        self.rng = rng
        self.n_range = n_range
        self.mean_range = mean_range
        self.sigma_range = sigma_range
        self.n10 = n10
        self.target_class = target_class
        self.epsilon = epsilon
        
    def fit(self, X, y):
        self.clf.fit(X, y)

    def predict(self, X):
        predictions = np.repeat(2, len(X))
        X_scores = self.clf.predict_proba(X)[:, 1]
        
        if self.epsilon > 0:
            lower_correction_bound = (1 - self.alpha) / self.m
            upper_correction_bound = (self.epsilon * (self.m + 1) - self.alpha) / self.m

        for (j, (score, sample)) in enumerate(zip(X_scores, X)):
            mean = np.mean(sample)
            sd = np.std(sample)
            n = len(sample)
            
            mean_range = self.mean_range
            sigma_range = self.sigma_range
            
            if self.n10 and n < 20:
                n_range = [n]
                m = self.m
            else:
                n_range = self.n_range
                m = self.m // len(n_range)
            
            new_samples = util.generate_normal_samples(n_range, m,
                                                       mean_range=self.mean_range,
                                                      sigma_range=self.sigma_range)
            scores = self.clf.predict_proba(new_samples)[:, 1]
            
            scores = np.sort(scores)
            
            more_extreme = np.searchsorted(scores, score, 'right')
            
            p_value = more_extreme / self.m
        
            if self.epsilon > 0:
                correction = self.rng.uniform(lower_correction_bound, upper_correction_bound)
                if p_value + correction <= self.alpha:
                    predictions[j] = 1 - self.target_class
                else:
                    predictions[j] = self.target_class
            else:
                #r = st.norm.ppf(1 - 0.01 / 2) / (4*len(scores))
                if p_value <= (self.alpha * (self.m + 1) - 1) / self.m:
                    predictions[j] = 1 - self.target_class
                else:
                    predictions[j] = self.target_class
        return predictions

In [None]:
rng = np.random.default_rng(1)

In [None]:
ndt = NDNNTest(clf, 0.05, 40, rng, n_range, mean_range, sigma_range, n10=False, target_class=1, epsilon=0)

In [None]:
normal_data = util.generate_normal_samples(n_range, 1000, mean_range=mean_range,
                                          sigma_range=sigma_range)

In [None]:
import time
import warnings
from statsmodels.stats.proportion import proportion_confint
warnings.filterwarnings('ignore')

In [None]:
ndt.alpha = 0.05
ndt.m = 40
ndt.n10=True
start = time.time()
y_ndt_pred = ndt.predict(normal_data)
end = time.time()

print(np.round(end - start, 2))

In [None]:
(y_ndt_pred == 0).mean()

In [None]:
(y_ndt_pred == 0).mean()
ns_normal = [len(sample) for sample in normal_data]
ns_normal = np.array(ns_normal)
for n in np.unique(ns_normal):
    mask = (ns_normal == n)
    fn = (y_ndt_pred[mask] == 0).sum()
    fnr = (y_ndt_pred[mask] == 0).mean()
    l, u = proportion_confint(fn, 1000, 0.01, 'beta')
    print(n, fnr, (np.round(l, 4), np.round(u, 4)))
print((y_ndt_pred == 0).mean(),
      proportion_confint((y_ndt_pred == 0).sum(), len(y_ndt_pred), 0.01, 'beta'))

In [None]:
(y_ndt_pred == 0).mean()
ns_normal = [len(sample) for sample in normal_data]
ns_normal = np.array(ns_normal)
for n in np.unique(ns_normal):
    mask = (ns_normal == n)
    fn = (y_ndt_pred[mask] == 0).sum()
    fnr = (y_ndt_pred[mask] == 0).mean()
    l, u = proportion_confint(fn, 1000, 0.01, 'beta')
    print(n, fnr, (np.round(l, 4), np.round(u, 4)))
print((y_ndt_pred == 0).mean(),
      proportion_confint((y_ndt_pred == 0).sum(), len(y_ndt_pred), 0.01, 'beta'))

### Power

In [None]:
y_pred

In [None]:
alphas = [0.01, 0.05]
test_names = ['SW', 'LF', 'AD', 'DP', 'JB', 'SF', 'CVM']
tests = {
    alpha : {} for alpha in alphas
}
for test_name in test_names:
    test_function, test_statistic = util.get_test(test_name)
    for alpha in alphas:
        test = util.TestClassifier(test_function, test_statistic, alpha)
        tests[alpha][test_name] = test

In [None]:
class KurtosisTest(object):
    def __init__(self, alpha, alternative):
        self.alpha = alpha
        self.alternative = alternative
    def predict(self, X):
        return st.kurtosistest(X, axis=1, alternative=self.alternative).pvalue >= self.alpha
    
for alpha in alphas:
    tests[alpha]['KT'] = KurtosisTest(alpha, 'less')

In [None]:
rng = np.random.default_rng(seed=42)
#rng.normal(loc=0, scale=1, size=(10, 10))

In [None]:
import importlib
import normeval

In [None]:
L = 2000
G_data = normeval.generate_G_data(n_range=n_range, L=L, groups = {4 : normeval.groups[4]})

In [None]:
dfs = []
for alpha in alphas:
    print(alpha)
    ndt.alpha = alpha
    ndt.m = int(2 / alpha)
    ndt.n10 = True
    new_power_results = normeval.evaluate_power(ndt, G_data)
    new_power_results['alpha'] = alpha
    new_power_results['test'] = 'NDTNT'
    dfs.append(new_power_results)

In [None]:
G_data[(4, 10)][0]

In [None]:
for alpha in tests:
    print(alpha)
    for code in tests[alpha]:
        print('\t', code)
        test = tests[alpha][code]
        df = normeval.evaluate_power(test, G_data)
        df['alpha'] = alpha
        df['test'] = code
        dfs.append(df)

In [None]:
df_G = pd.concat(dfs)

In [None]:
df_G = df_G.rename({4 : 'power'}, axis='columns')
df_G = df_G.reset_index()
df_G = df_G.rename({'index' : 'n'}, axis='columns')
df_G['power'] = df_G['power'].astype(float)
df_G

In [None]:
lower, upper = proportion_confint(df_G['power'] * L, L, alpha=0.01, method='beta')
df_G['lower'] = lower
df_G['upper'] = upper

In [None]:
df_G = df_G.replace({'NDTNT' : 'NDT'})

In [None]:
legend_spec = guide_legend(ncol=2)
for alpha in alphas:
    mask = (df_G['alpha'] == alpha) & (~df_G['test'].isin(['ZX', 'SF', 'JB']))
    g = ggplot(df_G[mask], aes(x='n', y='power', color='test', shape='test')) +\
        geom_line(aes(group='test', linetype='test'), size=0.8) +\
        geom_ribbon(aes(fill='test', color='test', ymin='lower', ymax='upper'), alpha=0.1) +\
        geom_point(size=3) +\
        scale_color_brewer(type='qual', palette=2) +\
        scale_fill_brewer(type='qual', palette=2) +\
        scale_linetype_manual(['solid', 'dashed', 'dotted', 'dashdot',
                               (0, (1, 2, 1, 1)), (0, (2,2,1,1)), (0, (5, 5))]) +\
        theme_classic() +\
        labs(x='$n$', y='Power') +\
        theme(legend_title=element_blank(),
              legend_position=(0.26, 0.8),
              legend_key_width=20, legend_box_margin=0) +\
        guides(fill=legend_spec, linetype=legend_spec, color=legend_spec, shape=legend_spec)
    display(g)
    filename = f'norm_ndt_power_comparison_alpha_{alpha}.jpg'
    filename = os.path.join(ndt_res_dir, filename)
    g.save(filename, dpi=500, width=5, height=4)

In [None]:
import pickle

In [None]:
filename = os.path.join(ndt_res_dir, 'clf.p')
pickle.dump(clf, open(filename, 'wb'))

In [None]:
pickle.load(open(filename, 'rb'))