# Derive Statistical Tests from the Descriptor-Based Neural Network Classifier <a class="tocSkip">

## Set up the Environment

In [None]:
# Import everything that's needed to run the notebook
import os
import pickle
import pathlib
import datetime
import random
import datetime as dt

from IPython.display import display, Markdown, Latex
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
import sklearn.metrics
import sklearn.preprocessing
from scipy.stats import shapiro
import matplotlib.pyplot as plt
import boruta
from scipy.stats import gaussian_kde
import warnings

import util
from util import DescriptorBuilder, prepare_input, traverse_and_save

warnings.simplefilter('ignore')

plt.rc('xtick',labelsize=16)
plt.rc('ytick',labelsize=16)

Create a dictionary to store the figures.

In [None]:
reports = {}

## Load the Classifier

In [None]:
with open(os.path.join('dbnn_classifier.p'), 'rb') as f:
    dbnn = pickle.load(f)

## Prepare the data

Load the data.

In [None]:
# Define the dictionary to store the actual datasets, indexed by their names
datasets = {}

set_names = ['E', 'D', 'C-G1', 'C-G2', 'C-G3', 'C-G4']

# Load the datasets
for set_name in set_names: #configuration['data']['datasets']:
    set_path = os.path.join('data', set_name + '.data')
    print('Loading {} from {}'.format(set_name, set_path))
    datasets[set_name] = util.load_from_file(set_path)
    print('Done.')

Split the labels from the samples to ease future manipulation. Split $\mathcal{E}$ into two equal parts.

In [None]:
chosen_sets = list(datasets.keys())

for set_name in chosen_sets:
    labels = [sample.pop() for sample in datasets[set_name]]
    samples = datasets[set_name]
    
    if set_name == 'E':
        stratify = [str(label) + str(len(sample)) for (label, sample) in zip(labels, samples)]
        samples_1, samples_2, labels_1, labels_2 = train_test_split(samples, labels, train_size=0.5, stratify=stratify)
        datasets['E1'] = {'samples' : samples_1, 'labels' : labels_1}
        datasets['E2'] = {'samples' : samples_2, 'labels' : labels_2}
    else:
        datasets[set_name] = {'samples' : samples, 'labels' : labels}
    
del datasets['E']

chosen_sets = list(datasets.keys())

## Define the Statistic
Define the statistic as the net's final layer's output. Refer to the statistic as $T$ from now on.

In [None]:
def dbnn_statistic_function(samples):
    prepared_samples = prepare_input(samples, dbnn)
    activations = util.get_activations(dbnn['neural_net'], prepared_samples)
    return activations[-1]

### Visualize the Statistic's Distribution

First, define a function that performs visualization.

In [None]:
def visualize_statistics(samples, labels, statistic_function, xlim=None, ylim=None, 
                         colors=['crimson', 'navy'], ylabel='density', xlabel='Statistic'):
    results = pd.DataFrame({
        'n' : [len(sample) for sample in samples],
        'label' : [int(label) for label in labels],
        'statistic' : statistic_function(samples).ravel(),
    })
    
    fig = plt.figure(figsize=(10, 8))
    
    if xlim is not None:
        plt.xlim(*xlim)
    
    if ylim is not None:
        plt.ylim(*ylim)

    for label in [0, 1]:
        mask = (results['label'] == label)
        results[mask]['statistic'].plot(kind='density', label='$f_{}$'.format(label),
                                      color=colors[label], linewidth=5)

    plt.ylabel(ylabel, fontsize=25)
    plt.xlabel(xlabel, fontsize=25)
    plt.legend(fontsize=25)
    
    return fig

Apply it.

In [None]:
fig = visualize_statistics(datasets['E1']['samples'],
                           datasets['E1']['labels'],
                           dbnn_statistic_function, xlim=(0, 1),
                           xlabel='$T$', ylabel='density')

Store the figure so that we can save it later.

In [None]:
reports['T_statistic_fig'] = fig

## Create the  Tests

Define the basic class of the binary tests. Its main functionalities are the methods for calculating the $p$-values.

In [None]:
class OneStatisticTest(object):
    def __init__(self, samples, statistic_function, lower_is_more_extreme=True, adapt_to_size=False):
        super(OneStatisticTest, self).__init__()
        self.statistic_function = statistic_function
        self.statistics = np.sort(statistic_function(samples)).squeeze()
        self.lower_is_more_extreme = lower_is_more_extreme
        self.adapt_to_size = adapt_to_size
        
        if adapt_to_size:
            sizes = np.array([len(sample) for sample in samples])
            self.__prepare_statistics_by_sample_size(self.statistics, sizes)
        
    def __prepare_statistics_by_sample_size(self, statistics, sizes):
        by_sample_size = {}
        for size in np.unique(sizes):
            selected_statistics = statistics[sizes == size].squeeze()
            by_sample_size[size] = np.sort(selected_statistics).squeeze()
        self.statistics_by_size = by_sample_size 
    
    def p_value(self, sample):
        [sample_statistic] = self.statistic_function([sample])
        
        n = len(sample)
        
        if self.adapt_to_size == False or n not in self.statistics_by_size.keys():
            statistics = self.statistics
        else:
            statistics = self.statistics_by_size[n]
        
        total_count = len(statistics)
        
        if self.lower_is_more_extreme:
            [i] = np.searchsorted(statistics, sample_statistic, side='right')
            more_extreme_count = i
        else:
            [i] = np.searchsorted(statistics, sample_statistic, side='left')
            more_extreme_count = total_count - i + 1

        return more_extreme_count / total_count
    
    def calculate_statistic(self, sample):
        return self.statistic_function([sample])
    
    def quantify_uncertainty(self, sample):
        return self.p_value(sample)

Create the tests.

In [None]:
n = len(datasets['E1']['samples'])
normal_samples = [datasets['E1']['samples'][i] for i in range(n) if datasets['E1']['labels'][i] == 1]
nonnormal_samples = [datasets['E1']['samples'][i] for i in range(n) if datasets['E1']['labels'][i] == 0]

nn_test_1 = OneStatisticTest(normal_samples, dbnn_statistic_function, adapt_to_size=True)
nn_test_0 = OneStatisticTest(nonnormal_samples, dbnn_statistic_function, lower_is_more_extreme=False, adapt_to_size=True)

nn_tests = [nn_test_0, nn_test_1]

### Inspect the $p$-values

The $p$-values are calculated using the set $\mathcal{E}_1$. We can check their properties using the set $\mathcal{E}_2$. Those properties are:
- The types of their distributions over normal and non-normal samples.
- Boundedness from above for the samples from the same distribution for which the $p$ value is calculated. More specifically:
$$P\left(p_c(T(\mathbf{x}) \leq \alpha \right) \leq \alpha \qquad 0 \leq \alpha \leq 1,\quad y(\mathbf{x})=c,\quad c \in\{0,1\}$$ *Note: The probability $P$ should be interpreted in the frequentist sense, as the long-term frequency*.
- Asymptotically decreasing to zero for the samples from the opposite class as the sample size grows.

### Inspect the Relationship between the $p$-values and Statistics

Check the distributions of the $p$-values for normal and non-normal samples.

Separate the statistics of normal from those of non-normal samples.

In [None]:
e2_separated = util.separate_by_label(datasets['E2']['samples'], datasets['E2']['labels'])

Determine the distribution of the $p_c$-values over normal and non-normal statistics.

In [None]:
def visualize_the_pvalue_density(tests, separated_samples,
                                xlabel='Statistic', ylabel=None, 
                                xlim=None, colors=['crimson', 'navy']):
    figs = [None, None]
    for c in [0, 1]:
        figs[c] = plt.figure(figsize=(10, 8))
        plt.xlabel(xlabel, fontsize=16)
        if xlim is not None:
            plt.xlim(*xlim)
    
        p_values_normal = [tests[c].p_value(x) for x in separated_samples[1]]
        p_values_nonnormal = [tests[c].p_value(x) for x in separated_samples[0]]
    
        df = pd.DataFrame({'$c={0}$': p_values_nonnormal})
        df['$c={0}$'].plot(kind='density', color=colors[0], linewidth=3)
        
        df = pd.DataFrame({'$c={1}$': p_values_normal})
        df['$c={1}$'].plot(kind='density', color=colors[1], linewidth=3)
        
        if ylabel is None:
            ylabel = 'The density of $p_{}$'.format(c)
        
        plt.ylabel(ylabel, fontsize=16)
        
        plt.legend(fontsize=16)
        
    
    return figs

In [None]:
[fig_0, fig_1] = visualize_the_pvalue_density(fisher_tests, e2_separated,
                                              xlabel='$T$', ylabel='density')

In [None]:
reports['p0_value_distribution_fig'] = fig_0
reports['p1_value_distribution_fig'] = fig_1

We see that the $p_1$-value is distributed approximately uniformly over $[0, 1]$ for the normal samples, whereas most of its distribution for the non-normal samples is located in the neighborhood of $0$, as it should be. The analogous conclusion holds for the $p_0$-value.

### Check the Boundedness Condition

First, define a function that checks the boundedness of the $p_c$ - value ($c=0,1$).

In [None]:
def check_boundedness(tests, separated_samples, colors=['crimson', 'navy'], alpha_granularity=100,
                     subsets=None, subset_size=50, subset_colors=['mistyrose', 'lavender']):
    alphas = np.linspace(0, 1, alpha_granularity)

    colors = {0: 'crimson', 1: 'navy'}
    
    figs = [None, None]
    for c in [0, 1]:
        #line_label = r'$\alpha \mapsto P_{}\left(\hat{{p}}_{}(X) \leq \alpha \mid Y = {}\right)$'.format(c, c, c)
        figs[c] = plt.figure(figsize=(10, 8))
        
        plt.tick_params(axis='both', which='major', labelsize=16)
        #plt.plot(alphas, alphas, linewidth=3, linestyle='--', color='black', 
        #         zorder=2, label=r'$\alpha \mapsto \alpha$')
        
        plt.xlabel(r'$\alpha$', fontsize=25)
        if c == 0:
            plt.ylabel('$FPR$', fontsize=25)
        else:
            plt.ylabel('$FNR$', fontsize=25)
    
        p_values = [tests[c].p_value(x) for x in separated_samples[c]]
        #density = gaussian_kde(p_values)
    
        probabilities = []
        for alpha in alphas:
            prob = len([pval for pval in p_values if pval <= alpha]) / len(p_values)
            probabilities.append(prob)
            #probabilities.append(density.integrate_box_1d(0, alpha))
        
        #print(c, probabilities)
    
        plt.plot(alphas, probabilities, linewidth=3, color=colors[c], zorder=3)
    
        #plt.legend(fontsize=25)

        if subsets is not None:
            for i in range(subsets):
                n = len(separated_samples[c])
                m = subset_size
                chosen_samples = [separated_samples[c][j] for j in np.random.choice(n, m, replace=False)]
                p_values = [tests[c].p_value(x) for x in chosen_samples]
            
                probabilities = []
                for alpha in alphas:
                    prob = len([pval for pval in p_values if pval <= alpha]) / len(p_values)
                    probabilities.append(prob)
                    
                plt.plot(alphas, probabilities, linewidth=2, color=subset_colors[c], zorder=1)
    return figs

In [None]:
figs = check_boundedness(nn_tests, e2_separated)

In [None]:
reports['p0_value_uniform_fig'] = figs[0]
reports['p1_value_uniform_fig'] = figs[1]

Check boundedness for each sample size $n=10,20,\ldots, 100$.

In [None]:
by_label_and_size = {}
for c in e2_separated:
    by_label_and_size[c] = util.separate_by_size(e2_separated[c])

In [None]:
for n in range(10, 101, 10):
    size_n_e2 = {c : by_label_and_size[c][n] for c in [0, 1]}
    check_boundedness(nn_tests, size_n_e2)

We see that the boundedness condition is fulfilled. This means that the $p_0$ and $p_1$ values can be used to control the overall error rates of the classifiers
$$\begin{cases}0,& p_1 \leq \alpha\\ 1,& p_1 > \alpha \end{cases}\qquad \alpha \in [0, 1]$$ and
$$\begin{cases}1,& p_0 \leq \beta\\ 0,& p_0 > \beta \end{cases}\qquad \beta \in [0, 1]$$
respectively.

### Check the Asymptotic Behavior

Check if the $p_c$-values gather around zero for the samples of the opposite class ($1-c$) as the sample size grows. Use the estimates of $P(p_c \leq \varepsilon)$ to quantify the degree to which the $p_c$-values are concentrated in the neighborhood of $0$ when calculated for the samples of the opposite class ($1-c$).

In [None]:
epsilon = 0.05

In [None]:
def check_asymptotic_behavior(tests, samples, labels, epsilon=0.05,
                              number_of_subsets=10, subset_relative_size=0.1,
                             colors=['crimson', 'navy']):
    probabilities = {}
    
    sample_groups = util.separate_by_label(samples, labels)
    for label in sample_groups:
        sample_groups[label] = util.separate_by_size(sample_groups[label])
    
    for c in [0, 1]:
        probabilities[c] = {}
        
        for size in sample_groups[c]:
            probabilities[c][size] = []
            group = sample_groups[1 - c][size]
            
            for i in range(number_of_subsets):
                if number_of_subsets == 1:
                    chosen_samples = group
                    n = len(group)
                else:
                    n = int(subset_relative_size * len(group))
                    chosen_samples = [group[j] for j in np.random.choice(len(group), n, replace=False)]
            
                p_values = [tests[c].p_value(x) for x in chosen_samples]
            
                number_of_lower = len([p_val for p_val in p_values if p_val <= epsilon])
                prob = number_of_lower / n
            
                #density = gaussian_kde(p_values)
                #prob = density.integrate_box_1d(0, epsilon)
                
                probabilities[c][size].append(prob)
    
    figs = [None, None]
    for c in probabilities:
        sample_sizes = sorted(list(probabilities[c].keys()))
        df = pd.DataFrame(probabilities[c])
        if number_of_subsets == 1:
            df = df[sample_sizes]
            figs[c] = df.T.plot(kind='line', figsize=(10, 8), label=None, linewidth=5)
        else:
            figs[c] = df.plot(kind='box', figsize=(10, 8))
        #plt.ylabel('$P_{}(\hat{{p}}_{}(X) \leq {} \mid Y={})$'.format(c, c, epsilon, 1-c), fontsize=25)
        if c == 0:
            plt.ylabel('$TPR$', fontsize=25)
        else:
            plt.ylabel('$TNR$', fontsize=25)
        plt.xlabel('$n$', fontsize=25)
        plt.legend('', frameon=False)
            
    return figs

In [None]:
figs = check_asymptotic_behavior(nn_tests, datasets['E2']['samples'], datasets['E2']['labels'],
                                number_of_subsets=1, epsilon=epsilon)

In [None]:
reports['p0_value_shrinks_for_1_fig'] = figs[0]
reports['p1_value_shrinks_for_0_fig'] = figs[1]

### The Test as a Classifier

Based on the test, define the following classifier:
$$predict_{c, \alpha}(x)=\begin{cases}1-c,& p_c(T(x)) \leq \alpha\\ c,& p_c(T(x)) > \alpha \end{cases}\qquad \alpha \in [0, 1], c \in \{0, 1\}$$ 
$T(x)$ is the value of the neural net's output layer when the whole pipe is fed the sample $x$ at the input.

In [None]:
class OneTestClassifier(object):
    def __init__(self, test, class_label, alpha, opposite_label=None):
        super(OneTestClassifier, self).__init__()
        self.test = test
        self.class_label = class_label
        self.alpha = alpha
        
        if opposite_label is None:
            opposite_label = 1 - class_label
        self.opposite_label = opposite_label
    
    def calculate_statistic(self, samples):
        if not(all([type(s) == list for s in samples])):
            return self.test.calculate_statistic(samples)[0, 0]
            
        return np.array([self.test.calculate_statistic(sample) for sample in samples]).squeeze()
    
    def predict(self, samples):
        labels = [self.class_label if self.test.p_value(sample) > self.alpha \
                  else self.opposite_label for sample in samples]
        return labels

For example:

In [None]:
nn_clf_0 = OneTestClassifier(nn_test_0, 0, 0.05)
nn_clf_1 = OneTestClassifier(nn_test_1, 1, 0.05)

## Comparison with the Original Classifier and Standard Tests of Normality

### Create the Classifiers

In [None]:
classifiers = {}

In [None]:
class_codes = [1, 0]
alphas = [0.01, 0.05]

for class_code in class_codes:
    test = nn_tests[class_code]
    for alpha in alphas:
        classifier = OneTestClassifier(test, class_code, alpha)
        classifiers[('NN_{}'.format(class_code), alpha)] = classifier

Import standard statistical tests of normality (to be used as classifiers).

In [None]:
for code in ['SW', 'JB', 'LF', 'AD']:
    for alpha in alphas:
        classifier = util.get_standard_classifier(code, alpha)
        classifiers[(code, alpha)] = classifier

### Power Analysis on Set $\mathcal{C}$

In [None]:
n_range = range(10, 101, 10)
metrics = ['TNR'] # Power is the true negative rate, 1 - FPR

results = {}
for group in ['C-G1', 'C-G2', 'C-G3', 'C-G4']:
    print(group)
    
    samples = datasets[group]['samples'][:100]
    labels = datasets[group]['labels'][:100]
    
    results[group] = {}
    for code in classifiers:
        print('\t', code)
        
        classifier = classifiers[code]
        results_df = util.evaluate_pretty(samples, labels, classifier, metrics=metrics, n_range=n_range, index='n')
        results[group][code] = results_df

Prepare LaTeX reports and figures.

In [None]:
for group in results:
    print(group)
    dfs = results[group]
    results_dict = {'{}({})'.format(*code): dfs[code]['TNR'] for code in dfs}
    results_df = pd.concat(results_dict, axis=1)
    results_df = results_df[sorted(results_df.columns)]
    
    latex = util.get_latex_table(results_df, index=True, caption=group,
                                 label='fig:TNR_' + group)
    reports['{}_df'.format(group)] = results_df
    reports['{}_latex'.format(group)] = latex
    print(latex)

    df = results_df
    df = df[[c for c in df.columns if '0.05' in c]]
    fig = df[df.index != 'overall'].plot(kind='line', style=['o--', 'v--', '^--', 'x-', 'bD-', 'd--'],
                                         linewidth=2,
                                         markersize=10,
                                         figsize=(10,8), use_index=True)
    reports['{}_fig'.format(group)] = fig
    #fig.legend(loc='center right', bbox_to_anchor=(1.24, 0.5), borderpad=2)
    #latex = util.get_latex_table(df, float_format=float_format, index=True, caption='C', label='c')
    #dbnn_storage['reports']['comparison'][group] = {'fig' : fig, 'latex': latex}
    #print(latex)
    
    #plt.ylabel(ylabel, fontsize=25)
    plt.xlabel('$n$', fontsize=25)
    plt.legend(fontsize=20, markerscale=1)

### The Overall Performance Analysis on Set $\mathcal{D}$

In [None]:
samples = datasets['D']['samples']
labels = datasets['D']['labels']

Evaluate the tests (actually, classifiers based on them) and create reports just as for set $\mathcal{C}$.

In [None]:
results = {}

metrics = ['A', 'TPR', 'PPV', 'TNR', 'NPV', 'F1']

for (code, alpha) in classifiers:
    print(code, alpha)
    classifier = classifiers[(code, alpha)]
    print('\tStart:', dt.datetime.now())
    results_df = util.evaluate_pretty(samples, labels, classifier, metrics=metrics)
    print('\tEnd:', str(dt.datetime.now()))
    results[(code, alpha)] = results_df

print('NN')
results[('NN', '')] = util.evaluate_pretty(samples, labels, dbnn, metrics=metrics)

In [None]:
df_report = pd.concat(results)
#df_report.index=pd.MultiIndex.from_tuples([(x[0], x[1]) for x in df_report.index])

keys = sorted([x for x in results.keys() if x[0] != 'NN']) + [('NN', '')]
df_report = pd.concat({x : results[x] for x in keys})

display(df_report)

latex = util.get_latex_table(df_report, index=True, caption='C', label='x')
print(latex)

reports['D__df'] = df_report
reports['D_latex'] = latex

In [None]:
dfs = {}
for code in classifiers:
    print(code)
    classifier = classifiers[code]
    df = util.evaluate_pretty(samples, labels, classifier, metrics=metrics, n_range=range(10, 101, 10))
    dfs[code] = df
    reports['D_{}_df'.format(code)] = df
    display(df)

In [None]:
df_report_1 = pd.concat({code:dfs[code][['TPR']].set_index(dfs[code]['n']) for code in dfs
          if code[0] != 'DBNN_0'}, axis=1)
display(df_report_1)
latex = util.get_latex_table(df_report_1, index=True, caption='C', label='L')

reports['D_combined_df'] = df_report_1
reports['D_combined_latex'] = latex

#### AUROC Analysis

In [None]:
aurocs = {}

In [None]:
samples = datasets['D']['samples']
labels = datasets['D']['labels']

fig = plt.figure(figsize=(10, 7))
styles = ['dotted', 'dashed', 'dashdot', (0, (3, 5, 1, 5, 1, 5))] 
styles = styles * 3

statistics = {
    code[0] : classifiers[code].calculate_statistic for code in classifiers if 'IB' not in code[0]
}

# NN_1 and NN_0 have the same ROC-curve as NN
del statistics['NN_1']
del statistics['NN_0']

fig = plt.figure(figsize=(10, 8))

for clf_code in statistics:
    print(clf_code)
    
    # Compute the scores
    statistic_function = statistics[clf_code]
    scores = statistic_function(samples)
    
    # Invert the statistics of the tests which treat higher values as stronger indicators
    # of non-normality
    if clf_code in ['LF', 'JB', 'AD']:
        scores = -scores
    
    # Make sure no non-finite values are present in the array of scores
    mask = np.isfinite(np.array(scores))
    filtered_labels = np.array(labels)[mask]
    filtered_scores = np.array(scores)[mask]
    
    # Plot the curves
    fpr, tpr, tr = sklearn.metrics.roc_curve(filtered_labels, filtered_scores, pos_label=1)
    plt.plot(fpr, tpr, linestyle=styles.pop(0), linewidth=3, label=clf_code)
    aurocs[clf_code] = sklearn.metrics.roc_auc_score(filtered_labels, filtered_scores)

# Show the neural network's curve
prepared_input = prepare_input(samples, dbnn)
probabilities = dbnn['neural_net'].predict_proba(prepared_input)
scores = probabilities[:, 1]
print('NN')
fpr, tpr, tr = sklearn.metrics.roc_curve(labels, scores, pos_label=1)
plt.plot(fpr, tpr, linestyle=styles.pop(0), linewidth=3, label='DBNN')
aurocs['NN'] = sklearn.metrics.roc_auc_score(labels, scores)

print(aurocs)

plt.xlabel('FPR', fontsize=16)
plt.ylabel('TPR', fontsize=16)
plt.legend(fontsize=16)

## Save

Save the figures.

In [None]:
traverse_and_save({ 'nn' : {'derived_tests' : reports}}, 'reports')

Save all the reports.

In [None]:
with open('reports.p', 'wb') as f:
    pickle.dump(reports, f)