# Statistic-Based Neural Network for Normality Testing

## Set up the environment

In [1]:
# Import everything that's needed to run the notebook
import os
import pickle
import pathlib
import datetime
import random

from IPython.display import display, Markdown, Latex
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.ensemble import RandomForestClassifier
from sklearn.impute import SimpleImputer
from sklearn.neural_network import MLPClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report
from sklearn.metrics import accuracy_score
import sklearn.metrics
import sklearn.preprocessing
import scipy.stats
import matplotlib.pyplot as plt
import boruta

import util

Read the configuration.

In [2]:
# Define the path to the configuration dictionary
config_path = 'configuration.p'

# Load the configuration dictionary
with open(config_path, 'rb') as f:
    configuration = pickle.load(f)
    
# Get the paths to the relevant directories 
data_directory_path = configuration['data']['directory_path']
classifiers_directory_path = configuration['classifiers']['directory_path']

# Get the parameters of the experiment
cv_folds = configuration['experiment']['number_of_cv_folds']

## Prepare the data

### Load the data
Load the datasets using the function `load_from_file` from `util`.

In [3]:
# Define the dictionary to store the actual datasets, indexed by their names
datasets = {}

# Load the datasets
for set_name in configuration['data']['datasets']:
    set_path = configuration['data']['datasets'][set_name]['path']
    print('Loading {} from {}'.format(set_name, set_path))
    datasets[set_name] = util.load_from_file(set_path)
    print('Done.')

Loading A from data/A.data
Done.
Loading B from data/B.data
Done.
Loading C-G1 from data/C-G1.data
Done.
Loading C-G2 from data/C-G2.data
Done.
Loading C-G3 from data/C-G3.data
Done.
Loading C-G4 from data/C-G4.data
Done.
Loading D from data/D.data
Done.
Loading E from data/E.data
Done.


Show a normal sample from the dataset $\mathcal{A}$. and a non-normal from the group $G_1$ of the dataset $\mathcal{C}$. The normal samples are labeled with $1$, whereas the label of each non-normal one is $0$. The normal samples constitute the "positive" and the non-normal the "negative" class.

In [4]:
print('A normal sample (ending in 1):\n', datasets['A'][0])
print('A non-normal sample (ending in 0):\n', datasets['C-G1'][0])

A normal sample (ending in 1):
 [-45.49063028459169, -30.981143033573673, -13.728309265861695, -15.666454590679104, -36.22767976281412, -11.508188107883877, -24.221912056880065, -39.437369899085894, -36.34343338555324, -15.150388381158336, 1.0]
A non-normal sample (ending in 0):
 [0.3274030780949263, 2.1201516653169254, -1.0689942361628708, -0.9340489803307244, 0.3370605169731828, 1.5370159253341324, 0.8311764716162305, -0.17301152203223455, -2.434902734249694, -5.060191877248404, 0.0]


### Split $\mathcal{A}$ into Cross-validation and Test  Subsets ($\mathcal{A}_{cv}$ and $\mathcal{A}_{test}$) 

Split $\mathcal{A}$ into the subsets for cross-validation ($\mathcal{A}_{cv})$ and testing ($\mathcal{A}_{test}$).

Use 70% of the set to cross-validate and 30% for subsequent testing and evaluation.

In [5]:
# Extract the labels from the set, leaving only samples in it
labels = [labeled_sample.pop() for labeled_sample in datasets['A']]
samples = datasets['A']

# There is no need to store the sama data twice, in datasets['A'] and in (samples, labels)
del datasets['A']

# Define the stratification labels as the combination of actual labels and sample sizes
stratification_labels = [str(label) + str(len(sample)) for (label, sample) in zip(labels, samples)]

# Set the relative size of the CV subset
train_size = 0.7

# Split the data into CV and test subsets
set_A_cv, set_A_test, y_cv, y_test = train_test_split(samples, labels, stratify=stratification_labels, 
                                              train_size=train_size)

### Split the Labels and the Samples in Other Datasets

In [6]:
for set_name in datasets:
    labels = [sample.pop() for sample in datasets[set_name]]
    samples = datasets[set_name]
    
    datasets[set_name] = {'samples' : samples, 'labels' : labels}

## Construct the Statistic-based Feedforward Neural Network Classifier (SBNN)

### Prepare the Preprocessor

In [7]:
def lin_mudholkar_statistic(sample, tol=1e-7):
    n = len(sample)
    sum_of_squares = 0
    for x in sample:
        sum_of_squares = sum_of_squares + x**2
        
    h_values = [0 for i in range(n)]
    for i in range(n):
        x = sample[i]
        
        corrected_sum = 0
        for j in range(n):
            if j != i:
                corrected_sum = corrected_sum + sample[j]
                
        square_of_sum = corrected_sum**2
        difference = (((sum_of_squares - x**2) - square_of_sum / (n - 1)) / n)
        if abs(difference) <= tol:
            difference = 0
        h_i = difference**(1/3)
        h_values[i] = h_i
        
        #print(i, x, corrected_sum, square_of_sum, square_of_sum / (n - 1), '\n', h_i)
    
    r = np.corrcoef(sample, h_values)
    
    return np.arctan(r[0, 1])

def vasicek_statistic(sample, m=3):
    n = len(sample)
    sample = np.array(sample, dtype=np.float64)
    sd = np.std(sample)
    sorted_sample = np.sort(sample)
    product = 1
    m = m
    for i in range(n):
        if i - m < 0:
            left = sorted_sample[0]
        else:
            left = sorted_sample[i - m]
        
        if i + m > n - 1:
            right = sorted_sample[n - 1]
        else:
            right = sorted_sample[i + m]
        
        product = product * (right - left)
    
    return (n / (2 * m * sd)) * (product ** (1 / n))

In [8]:
def preprocess(sample):
    n = len(sample)
    skewness = scipy.stats.skew(sample)
    kurtosis = scipy.stats.kurtosis(sample, fisher=False)
    W = scipy.stats.shapiro(sample).statistic
    lm_stat = lin_mudholkar_statistic(sample)
    K3 = vasicek_statistic(sample, m=3)
    K5 = vasicek_statistic(sample, m=5)
    return [skewness, kurtosis, W, lm_stat, K3, K5, n]
    

In [9]:
class SBNNPreprocessor(TransformerMixin, BaseEstimator):
    def __init__(self):
        super(SBNNPreprocessor, self).__init__()
        
        # Set the names of the features in the descriptors
        self.features = ['skewness', 'kurtosis', 'W', 'LN-statistic', 'K3', 'K5', 'n']
        
    def fit(self, X, y=None):
        # Not needed, but present for compatibility.
        return self
    
    def transform(self, X, y=None):
        # Note: Currently working only on a list of lists or a single list.
        if isinstance(X, list):
            if all(isinstance(x, list) for x in X):
                X = [preprocess(x) for x in X]
                return pd.DataFrame(X)
            else:
                X = preprocess(X)
                return X
        else:
            # Pandas dataframes and numpy arrays are not supported for now.
            pass

### Cross-validate
Create a `sklearn` pipeline that consists of the preprocessor, standard scaler, mean imputer to replace the null values, and the neural network itself.

In [10]:
preprocessor = SBNNPreprocessor()
scaler = sklearn.preprocessing.StandardScaler()
imputer = SimpleImputer(strategy='mean')
neural_net = MLPClassifier(solver='adam', max_iter=200, activation='relu',
                           early_stopping=True, validation_fraction=0.1)
pipe = Pipeline([('preprocessor', preprocessor),
                 ('scaler', scaler),
                 ('imputer', imputer),
                 ('neural_net', neural_net),
                ])

Run the grid search to fit the network's parameters and find the best hyperparameters.

In [11]:
# Specify the hyperparameter grid
param_grid = dict(
                  neural_net__hidden_layer_sizes = [(1000), (100, 10)],
                  neural_net__alpha = [10, 1, 0.1],
                 )

# Define the grid search object
grid = GridSearchCV(pipe,
                    param_grid=param_grid,
                    scoring='accuracy',
                    refit=True,
                    cv=cv_folds,
                    verbose=1,
                    n_jobs=-1)

# Perform cross-validation
grid.fit(set_A_cv, y_cv)

Fitting 5 folds for each of 6 candidates, totalling 30 fits


[Parallel(n_jobs=-1)]: Using backend LokyBackend with 8 concurrent workers.
[Parallel(n_jobs=-1)]: Done  30 out of  30 | elapsed:  5.3min finished
  c /= stddev[:, None]
  c /= stddev[None, :]


GridSearchCV(cv=5,
             estimator=Pipeline(steps=[('preprocessor', SBNNPreprocessor()),
                                       ('scaler', StandardScaler()),
                                       ('imputer', SimpleImputer()),
                                       ('neural_net',
                                        MLPClassifier(early_stopping=True))]),
             n_jobs=-1,
             param_grid={'neural_net__alpha': [10, 1, 0.1],
                         'neural_net__hidden_layer_sizes': [1000, (100, 10)]},
             scoring='accuracy', verbose=1)

In [12]:
# Get the trained network.
sbnn = grid.best_estimator_

# Show its hyperparameters
grid.best_params_

{'neural_net__alpha': 0.1, 'neural_net__hidden_layer_sizes': 1000}

In [13]:
# Show the means and deviations of the score(s) and CV time
params = grid.cv_results_['params']
mean_scores = grid.cv_results_['mean_test_score']
score_sds = grid.cv_results_['std_test_score']
mean_fit_times = grid.cv_results_['mean_fit_time']
time_sds = grid.cv_results_['std_fit_time']

results = []
for (params, mean_score, score_sd, mean_fit_time, time_sd) in zip(params, mean_scores, score_sds, mean_fit_times, time_sds):
    alpha = params['neural_net__alpha']
    structure = params['neural_net__hidden_layer_sizes']
    results.append([str(structure), alpha, mean_score, score_sd, mean_fit_time, time_sd])

results_df = pd.DataFrame(results)
results_df.columns = ['structure', 'c', 'mean_score', 'score_sd', 'mean_time', 'time_sd']
results_df

Unnamed: 0,structure,c,mean_score,score_sd,mean_time,time_sd
0,1000,10.0,0.798686,0.006201,62.155047,10.344608
1,"(100, 10)",10.0,0.804598,0.004932,51.806245,1.762312
2,1000,1.0,0.844773,0.003468,83.61116,9.960311
3,"(100, 10)",1.0,0.845375,0.003307,60.473763,4.440777
4,1000,0.1,0.847345,0.004148,82.400459,8.900024
5,"(100, 10)",0.1,0.844882,0.004238,43.775766,10.624519


## Save the SBNN

In [14]:
with open(classifiers_directory_path + '/sbnn.p', 'wb') as f:
    pickle.dump(sbnn, f)