## Part 1

In [1]:
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, FunctionTransformer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import accuracy_score
import onnxruntime as rt
import onnx
from skl2onnx.common.data_types import FloatTensorType
from skl2onnx import to_onnx
from skl2onnx import convert_sklearn

from term_styling import style, fg, bg

random_state_seed = 42

In [2]:
# Let's load the dataset
data = pd.read_csv('data/investigation_train_large_checked.csv')

# Let's specify the features and the target
target = data['checked']
features = data.drop(columns=[ 'checked', 'Ja', 'Nee' ])
features = features.astype(np.float32)

In [3]:
def get_problematic_columns( data ):
    psychological_features = []
    medical_features = [ 'belemmering_hist_verslavingsproblematiek' ]
    racial_features = ['ontheffing_reden_hist_sociale_gronden']
    subjective_features = [ 'competentie_ethisch_en_integer_handelen', 'competentie_gedrevenheid_en_ambitie_tonen', 'competentie_met_druk_en_tegenslag_omgaan', 'competentie_omgaan_met_verandering_en_aanpassen',
                            'persoonlijke_eigenschappen_uitstroom_verw_vlgs_km', 'persoonlijke_eigenschappen_uitstroom_verw_vlgs_klant', 'afspraak_aantal_woorden', 'afspraak_laatstejaar_aantal_woorden',
                            'competentie_other', 'competentie_overtuigen_en_beïnvloeden'
                          ]
    age_features = ['persoon_leeftijd_bij_onderzoek']
    gender_features = ['persoon_geslacht_vrouw']
    relationship_features = []
    irrelevant_features = [ 'persoonlijke_eigenschappen_hobbies_sport' ]

    for col in data.columns:
        if 'relatie' in col:
            relationship_features.append( col )
        elif 'persoonlijke' in col:
            if '_nl_' in col or 'taal' in col:
                racial_features.append(col)
            elif '_opm' in col:
                subjective_features.append(col)
        elif 'adres_recenst' in col or 'sociaal' in col or 'taal' in col:
            racial_features.append(col)
        elif 'medische' in col or 'lichamelijke' in col:
            medical_features.append(col)
        elif 'psychische' in col:
            psychological_features.append(col)

    return {
            'psychological': psychological_features,
            'medical': medical_features,
            'racial': racial_features,
            'subjective': subjective_features,
            'gender': gender_features,
            'relationship': relationship_features,
            'age': age_features,
            'irrelevant': irrelevant_features
           }

### Outlier Detection

In [4]:
def get_color( val, n_samples ):
    if val >= 0.15 * n_samples:
        return fg.red
    elif val >= 0.05 * n_samples:
        return fg.orange
    elif val >= 0.01 * n_samples:
        return fg.yellow
    return fg.green

In [5]:
def identify_outliers( x, columns, thres=2 ):
    spaces = 120
    print(f"{style.bold}Dataset Outlier Test{style.reset} - {thres} Sigma")
    print("-"*(spaces+40))
    
    n_samples = x.shape[0]
    for column in columns:
        x_min, x_max = x[column].min(), x[column].max()
        mean, std = x[column].mean(), x[column].std()
        
        #print( f"Min: {x_min}\tMax: {x_max}" )
        #print( f"Mean: {mean}\tSTD: {std}" )

        lows = x[column] <= mean - thres*std
        highs = x[column] >= mean + thres*std
        lows, highs = x[lows].shape[0], x[highs].shape[0]

        if lows > 0 or highs > 0:
            to_print = f"{column.capitalize()}:" + " "*(spaces-len(column)) + "Low:\t"
            to_print += get_color( lows, n_samples ) + f"{lows}\t" + style.reset + "High:\t"
            to_print += get_color( highs, n_samples ) + f"{highs}\t" + style.reset
                
            print( to_print )

#### Results

In [6]:
identify_outliers( features, features.columns, 2 )

[01mDataset Outlier Test[0m - 2 Sigma
----------------------------------------------------------------------------------------------------------------------------------------------------------------
Adres_aantal_brp_adres:                                                                                                  Low:	[32m0	[0mHigh:	[33m7465	[0m
Adres_aantal_verschillende_wijken:                                                                                       Low:	[32m0	[0mHigh:	[33m9793	[0m
Adres_aantal_verzendadres:                                                                                               Low:	[32m0	[0mHigh:	[93m2051	[0m
Adres_aantal_woonadres_handmatig:                                                                                        Low:	[32m0	[0mHigh:	[93m3965	[0m
Adres_dagen_op_adres:                                                                                                    Low:	[32m0	[0mHigh:	[93m3618	[0m
Adres_rec

In [7]:
identify_outliers( features, features.columns, 4 )

[01mDataset Outlier Test[0m - 4 Sigma
----------------------------------------------------------------------------------------------------------------------------------------------------------------
Adres_aantal_brp_adres:                                                                                                  Low:	[32m0	[0mHigh:	[32m214	[0m
Adres_aantal_verschillende_wijken:                                                                                       Low:	[32m0	[0mHigh:	[32m205	[0m
Adres_aantal_verzendadres:                                                                                               Low:	[32m0	[0mHigh:	[32m9	[0m
Adres_aantal_woonadres_handmatig:                                                                                        Low:	[32m0	[0mHigh:	[32m46	[0m
Adres_recentst_onderdeel_rdam:                                                                                           Low:	[93m6426	[0mHigh:	[32m0	[0m
Adres_recentste_

## Part 2

In [6]:
def pca_grouping( data, column_set ):
    pca = PCA( n_components=1 )
    return pca.fit_transform( data[column_set] )

In [7]:
def n_wise_partition( feature, n_partitions=2, thresholds=None ):
    feature = feature.copy()
    partitions = []
    if thresholds is None:
        mn, mx = feature.min(), feature.max()
        step = (mx-mn)/n_partitions
        thresholds = [ i for i in np.arange( mn, mx + 0.1*step, step ) ]
    else:
        assert n_partitions+1 == len(thresholds)

    for i in range( len(thresholds)-2 ):
        idx = np.where( (feature >= thresholds[i]) & ( feature < thresholds[i+1]) )
        partitions.append( idx )
    partitions.append( np.where( feature >= thresholds[-2] ) )

    return partitions

In [8]:
def shuffle_columns( data, column_set ):
    data = data.copy()
    shuffled = data[column_set].sample(frac=1).reset_index(drop=True)
    data[column_set] = shuffled
    return data

In [9]:
def flip_columns( data, column_set ):
    data = data.copy()
    for col in column_set:
        uniq = data[col].unique()
        subset_mean = uniq.mean()
        subset = 2*subset_mean - ( data[col] )
        data[col] = subset
    return data

In [10]:
def add_noise_to_columns( data, column_set, noise_mean=0.0, noise_scale=0.5 ):
    data = data.copy()
    for col in column_set:
        noise = np.random.normal( loc=noise_mean, scale=data[col].std()*noise_scale, size=data[col].shape[0] )
        data[col] = data[col] + noise
    return data

In [11]:
problem_cols = get_problematic_columns( features )
problem_cols_full = []
for problem in problem_cols:
    problem_cols_full += problem_cols[problem]
good_cols = []
for col in features.columns:
    if col not in problem_cols_full:
        good_cols.append( col )

partitions = {}

grouped_subset = pca_grouping( features, problem_cols['psychological'] )
partitions['psychological'] = n_wise_partition( grouped_subset, 2 ) # well, unwell

grouped_subset = pca_grouping( features, problem_cols['medical'] )
partitions['medical'] = n_wise_partition( grouped_subset, 2 ) # well, unwell

grouped_subset = pca_grouping( features, problem_cols['racial'] )
partitions['racial'] = n_wise_partition( grouped_subset, 4 ) # Germanic language native, Romance native, PIE native, Non-PIE native

grouped_subset = pca_grouping( features, problem_cols['subjective'] )
partitions['subjective'] = n_wise_partition( grouped_subset, 3 ) # Low, Mid, High opinion

grouped_subset = features[ problem_cols['gender'][0] ]
partitions['gender'] = n_wise_partition( grouped_subset, 2 ) # Male, Female

grouped_subset = pca_grouping( features, problem_cols['relationship'] )
partitions['relationship'] = n_wise_partition( grouped_subset, 3 ) # Small average, large social circle/family

grouped_subset = features[ problem_cols['age'][0] ]
partitions['age'] = n_wise_partition( grouped_subset, 3, [ 0, 30, 60, 200 ] ) # Young Adult, Adult, Senior

grouped_subset = pca_grouping( features, problem_cols['irrelevant'] )
partitions['irrelevant'] = n_wise_partition( grouped_subset, 2 ) # Only for sports hobbyists, yes/no.

## Part 3

In [12]:
import torch
import torch.nn as nn
from collections import OrderedDict
from sklearn.model_selection import train_test_split

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

### Models

In [13]:
class Model(nn.Module):
    def __init__( self, n_features, cols_to_avoid, device="cpu", l=0, train_epochs=1000 ):
        super().__init__()
        self.arch = nn.Sequential(
            OrderedDict([
                ( 'linear1', nn.Linear( n_features, 100 ) ),
                ( 'activation1', nn.ReLU() ),
                ( 'linear2', nn.Linear( 100, 25 ) ),
                ( 'activation2', nn.ReLU()),
                ( 'linear3', nn.Linear( 25, 10 ) ),
                ( 'activation3', nn.ReLU()),
                ( 'linear4', nn.Linear( 10, 2 ) )
                #( 'activation4', nn.Sigmoid() )
            ])
        ).to(device)
        
        self.loss_f = nn.CrossEntropyLoss()
        self.optim = torch.optim.Adam( self.arch.parameters(), lr=1e-3 )
        self.device = device
        self.to_avoid = cols_to_avoid
        self.l = l
        self.epochs = train_epochs
        self.onnx_path = None
        self.rt_session = None

    def to_tensor( self, X, dtype=torch.float ):
        if isinstance( X, pd.DataFrame ) or isinstance( X, pd.Series ):
            X = X.values
        return torch.tensor( X, dtype=dtype ).to(self.device)

    def forward( self, X ):
        return self.arch( X )

    def backward( self, y_pred, y ):
        loss = self.loss_f( y_pred, y )
        self.optim.zero_grad()
        loss.backward()
        self.optim.step()

    def fit( self, X, y ):
        self.train()
        mask = torch.ones_like( self.arch[0].weight )
        for col in self.to_avoid:
            idx = X.columns.get_loc(col)
            mask[:, idx] = self.l

        X = self.to_tensor( X, dtype=torch.float )
        y = self.to_tensor( y, dtype=torch.long )
        for _ in range(self.epochs):
            y_pred = self.forward( X )
            self.backward( y_pred, y )

            if self.l == 0:
                self.arch[0].weight.data *= mask
            elif self.l != 1:
                with torch.no_grad():
                    self.arch[0].weight.grad *= mask

    def predict( self, X, use_onnx=True ):
        self.eval()
        if use_onnx:
            X_np = X.values.astype(np.float32)
            rt_in = { self.rt_session.get_inputs()[0].name: X_np}
            rt_out = self.rt_session.run(None, rt_in)[0]
            return np.argmax( rt_out, axis=1)
        else:
            X = self.to_tensor( X, dtype=torch.float )
            with torch.no_grad():
                return torch.argmax( self.forward(X), dim=1 ).to("cpu").numpy()

    def fit_predict( self, X, y ):
        self.fit( X, y )
        return self.predict( X, False )

    def to_onnx( self, X, onnx_path="models/model1_1.onnx" ):
        self.onnx_path = onnx_path
        self.arch.eval()
        X_np = X.values.astype(np.float32)
        X_tn = torch.tensor(X_np[:1], dtype=torch.float32).to(self.device)

        torch.onnx.export(
            self,
            X_tn,
            self.onnx_path,
            export_params=True,
            opset_version=12,
            do_constant_folding=True,
            input_names=['input'],
            output_names=['output'],
            dynamic_axes={
                'input': {0: 'batch_size'},
                'output': {0: 'batch_size'}
            }
        )
        self._load_onnx_session()

    def _load_onnx_session(self):
        providers = rt.get_available_providers()
        self.rt_session = rt.InferenceSession( self.onnx_path, providers=providers )

In [14]:
n_samples, n_features = features.shape

# Define a gradient boosting classifier
bad_model = Model( n_features=n_features, device=device, cols_to_avoid=problem_cols_full, l=10 )
good_model = Model( n_features=n_features, device=device, cols_to_avoid=problem_cols_full, l=0 )

# Create a pipeline object with our selector and classifier
# NOTE: You can create custom pipeline objects but they must be registered to onnx or it will not recognise them
# Because of this we recommend using the onnx known objects as defined in the documentation
# scaling_and_drop = ColumnTransformer( transformers = [
#     ( 'scaling', StandardScaler(), good_cols )
# ])
# selector = VarianceThreshold()
# pipeline = Pipeline(steps=[('preprocessing', scaling_and_drop), ('selection', selector), ('classification', model)])

### Training

In [15]:
def train_eval_model( model, X, y, epochs=1000, model_path="models/model1_1.onnx" ):
    X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2 )

    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_test = scaler.fit_transform(X_test)
    
    y_pred = model.fit_predict( X_train, y_train )
    train_accuracy = (y_pred==y_train).mean()

    model.to_onnx( X_train, onnx_path=model_path )
    y_pred = model.predict( X_test )
    test_accuracy = (y_pred==y_test).mean()

    print( f"Train Accuracy of the original model: {train_accuracy}")
    print( f"Test Accuracy of the original model: {test_accuracy}")

    return model

In [16]:
good_model = train_eval_model( model=good_model, X=features, y=target, model_path="models/model2_1.onnx" )
bad_model = train_eval_model( model=bad_model, X=features, y=target, model_path="models/model2_2.onnx" )

  torch.onnx.export(
W0131 18:41:55.043000 95720 torch/onnx/_internal/exporter/_compat.py:125] Setting ONNX exporter to use operator set version 18 because the requested opset_version 12 is a lower version than we have implementations for. Automatic version conversion will be performed, which may not be successful at converting to the requested version. If version conversion is unsuccessful, the opset version of the exported model will be kept at 18. Please consider setting opset_version >=18 to leverage latest ONNX features
W0131 18:41:55.357000 95720 torch/onnx/_internal/exporter/_registration.py:110] torchvision is not installed. Skipping torchvision::nms


[torch.onnx] Obtain model graph for `Model([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `Model([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...


  return cls.__new__(cls, *args)
The model version conversion is not supported by the onnxscript version converter and fallback is enabled. The model will be converted using the onnx C API (target version: 12).
Failed to convert the model to the target version 12 using the ONNX C API. The model was not modified
Traceback (most recent call last):
  File "/home/johnario/Education/Formal/TuDelft/DSAIT 4015 - Software Engineering and Testing for AI Systems/Assignment 1/Group 2/venv/lib/python3.11/site-packages/onnxscript/version_converter/__init__.py", line 127, in call
    converted_proto = _c_api_utils.call_onnx_api(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/johnario/Education/Formal/TuDelft/DSAIT 4015 - Software Engineering and Testing for AI Systems/Assignment 1/Group 2/venv/lib/python3.11/site-packages/onnxscript/version_converter/_c_api_utils.py", line 65, in call_onnx_api
    result = func(proto)
             ^^^^^^^^^^^
  File "/home/johnario/Education/Formal/

[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
Train Accuracy of the original model: 0.9891057692307692
Test Accuracy of the original model: 0.8391538461538461


  torch.onnx.export(
W0131 18:41:59.509000 95720 torch/onnx/_internal/exporter/_compat.py:125] Setting ONNX exporter to use operator set version 18 because the requested opset_version 12 is a lower version than we have implementations for. Automatic version conversion will be performed, which may not be successful at converting to the requested version. If version conversion is unsuccessful, the opset version of the exported model will be kept at 18. Please consider setting opset_version >=18 to leverage latest ONNX features
W0131 18:41:59.744000 95720 torch/onnx/_internal/exporter/_registration.py:110] torchvision is not installed. Skipping torchvision::nms


[torch.onnx] Obtain model graph for `Model([...]` with `torch.export.export(..., strict=False)`...
[torch.onnx] Obtain model graph for `Model([...]` with `torch.export.export(..., strict=False)`... ✅
[torch.onnx] Run decomposition...


  return cls.__new__(cls, *args)
The model version conversion is not supported by the onnxscript version converter and fallback is enabled. The model will be converted using the onnx C API (target version: 12).
Failed to convert the model to the target version 12 using the ONNX C API. The model was not modified
Traceback (most recent call last):
  File "/home/johnario/Education/Formal/TuDelft/DSAIT 4015 - Software Engineering and Testing for AI Systems/Assignment 1/Group 2/venv/lib/python3.11/site-packages/onnxscript/version_converter/__init__.py", line 127, in call
    converted_proto = _c_api_utils.call_onnx_api(
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/johnario/Education/Formal/TuDelft/DSAIT 4015 - Software Engineering and Testing for AI Systems/Assignment 1/Group 2/venv/lib/python3.11/site-packages/onnxscript/version_converter/_c_api_utils.py", line 65, in call_onnx_api
    result = func(proto)
             ^^^^^^^^^^^
  File "/home/johnario/Education/Formal/

[torch.onnx] Run decomposition... ✅
[torch.onnx] Translate the graph into ONNX...
[torch.onnx] Translate the graph into ONNX... ✅
Train Accuracy of the original model: 1.0
Test Accuracy of the original model: 0.9123846153846153


## Testing

In [17]:
def aggregate_results( resultsA, resultsB ):
    to_print = f"= Test Target | # of Tests | {resultsA['title']} | {resultsB['title']} ="
    dashes = len(to_print) - 20
    print( "="*dashes )
    print( to_print )
    print( "="*dashes )
    print( f"Accuracy      | {resultsA['tests']}\t   | {resultsA['accuracy_passes']}\t\t| {resultsB['accuracy_passes']}")
    print( f"Bias          | {resultsA['tests']}\t   | {resultsA['bias_passes']}\t\t| {resultsB['bias_passes']}")

In [18]:
GOOD_TITLE = fg.green + "Good Model" + fg.reset
BAD_TITLE = fg.red + "Bad Model" + fg.reset

ACC_THRESH = 0.85

### Partition Testing

In [19]:
def partition_test( model, X, y, partitions, title, accuracy_threshold=0.9, bias_threshold=0.05 ):
    passes, accuracy_passes = 0, 0
    checked_per_partition = np.empty( len(partitions) )

    title_string = f"= Partition Testing {title} | Accuracy Threshold: {style.bold}{accuracy_threshold}{style.reset} | Bias Threshold: {style.bold}{bias_threshold}{style.reset} ="
    dashes = len(title_string) - 38
    print( "="*dashes )
    print( title_string )
    print( "="*dashes )
    for idx, partition in enumerate( partitions ):
        X_part = X.iloc[partition[0]]
        y_part = y.iloc[partition[0]]

        y_pred = model.predict(X_part)
        accuracy = (y_pred==y_part).mean()
        if accuracy > accuracy_threshold:
            accuracy_passes += 1

        checked_count = ( y_pred == 1 ).mean()
        checked_per_partition[idx] = checked_count
        print( f"Partition {idx}  | "
               f"Accuracy: {accuracy:.4f} ({ fg.green + 'pass' + style.reset if accuracy >= accuracy_threshold else fg.red + 'fail' + style.reset }) | "
               f"Checked: {checked_count:.4f}"
             )

    checked_mean = checked_per_partition.mean()
    for i in range(len(checked_per_partition)):
        # Predictions:
        # Male: 15%
        # Female: 10%
        # Mean = 12.5%
        # For male:
        # # Is 15%/12.5% - 1 < 10%?
        # # Is 15% - 12.5% < 15% * 10%?
        # For female:
        # # Is 10%/12.5% - 1 < 10%?
        # # Is |10% - 12.5%| < 15% * 10%?
        if np.abs( checked_per_partition[i]/checked_mean - 1 ) < bias_threshold:
            passes += 1


    print( f"Total Passes |"
        f" Accuracy: {accuracy_passes}/{len(partitions)} \t       |"
        f" Bias passes: {passes}/{len(partitions)}\n" )

    return {
        'bias_passes': passes,
        'accuracy_passes': accuracy_passes,
        'tests': len(partitions)
    }

In [20]:
def partition_tests( model1, model2, title1, title2 ):
    m1_results = { 'title': title1, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }
    m2_results = { 'title': title2, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }
    
    BIAS_THRESH = 0.1
    
    for partition_name in partitions:
        set_title = fg.gray + partition_name + fg.reset
        results = partition_test( model=model1, X=features, y=target, partitions=partitions[partition_name],
                                   title=f"{title1} {set_title}", accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m1_results['bias_passes'] += results['bias_passes']
        m1_results['accuracy_passes'] += results['accuracy_passes']
        m1_results['tests'] += results['tests']
        
        results = partition_test( model=model2, X=features, y=target, partitions=partitions[partition_name],
                                   title=f"{title2} {set_title}", accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m2_results['bias_passes'] += results['bias_passes']
        m2_results['accuracy_passes'] += results['accuracy_passes']
        m2_results['tests'] += results['tests']
        
    aggregate_results( m1_results, m2_results )

#### Results

In [21]:
partition_tests( good_model, bad_model, GOOD_TITLE, BAD_TITLE )

= Partition Testing [32mGood Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =
Partition 0  | Accuracy: 0.9552 ([32mpass[0m) | Checked: 0.1436
Partition 1  | Accuracy: 1.0000 ([32mpass[0m) | Checked: 0.0000
Total Passes | Accuracy: 2/2 	       | Bias passes: 0/2

= Partition Testing [31mBad Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =
Partition 0  | Accuracy: 0.9816 ([32mpass[0m) | Checked: 0.1532
Partition 1  | Accuracy: 1.0000 ([32mpass[0m) | Checked: 0.0000
Total Passes | Accuracy: 2/2 	       | Bias passes: 0/2

= Partition Testing [32mGood Model[39m [37mmedical[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =
Partition 0  | Accuracy: 0.9573 ([32mpass[0m) | Checked: 0.1541
Partition 1  | Accuracy: 1.0000 ([32mpass[0m) | Checked: 0.0000
Total Passes | Accuracy: 2/2 	       | Bias passes: 0/2

= Partition Testing [31mBad Model[39

  checked_count = ( y_pred == 1 ).mean()
  ret = ret.dtype.type(ret / rcount)


Partition 1  | Accuracy: 0.9833 ([32mpass[0m) | Checked: 0.1485
Partition 2  | Accuracy: nan ([31mfail[0m) | Checked: nan
Total Passes | Accuracy: 2/3 	       | Bias passes: 0/3

= Partition Testing [32mGood Model[39m [37mgender[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =
Partition 0  | Accuracy: 0.9583 ([32mpass[0m) | Checked: 0.1495
Partition 1  | Accuracy: 0.9600 ([32mpass[0m) | Checked: 0.1348
Total Passes | Accuracy: 2/2 	       | Bias passes: 2/2

= Partition Testing [31mBad Model[39m [37mgender[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =
Partition 0  | Accuracy: 0.9824 ([32mpass[0m) | Checked: 0.1510
Partition 1  | Accuracy: 0.9825 ([32mpass[0m) | Checked: 0.1459
Total Passes | Accuracy: 2/2 	       | Bias passes: 2/2

= Partition Testing [32mGood Model[39m [37mrelationship[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =
Partition 0  | Accuracy: 0.9619 ([32mpass[0m) | Che

### Shuffle Testing

In [22]:
def shuffle_test( model, X, y, columns, title, tries=5, accuracy_threshold=0.9, bias_threshold=0.05 ):
    passes, accuracy_passes = 0, 0
    checked_per_try = np.empty( tries )
    y_pred_orig = model.predict( X )

    title_string = f"= Partition Testing {title} | Accuracy Threshold: {style.bold}{accuracy_threshold}{style.reset} | Bias Threshold: {style.bold}{bias_threshold}{style.reset} ="
    dashes = len(title_string) - 38
    print( "="*dashes )
    print( title_string )
    print( "="*dashes )
    for idx in range(tries):
        X_alt = shuffle_columns( X, columns )
        y_pred = model.predict( X_alt )

        accuracy = (y_pred==y).mean()
        changed_count = ( y_pred != y_pred_orig ).mean()

        if accuracy >= accuracy_threshold:
            accuracy_passes += 1
        if changed_count < bias_threshold:
            passes += 1

        print( f"Test {idx} | "
               f"Accuracy: {accuracy:.4f} ({ fg.green + 'pass' + style.reset if accuracy >= accuracy_threshold else fg.red + 'fail' + style.reset }) | "
               f"Changed: {changed_count:.4f} ({ fg.green + 'pass' + style.reset if changed_count < bias_threshold else fg.red + 'fail' + style.reset })"
             )

    print( f"Total Passes |"
        f" Accuracy: {accuracy_passes}/{tries} |"
        f" Bias passes: {passes}/{tries}\n" )

    return {
        'bias_passes': passes,
        'accuracy_passes': accuracy_passes,
        'tests': tries
    }

In [23]:
def shuffle_tests( model1, model2, title1, title2 ):
    m1_results = { 'title': title1, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }
    m2_results = { 'title': title2, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }

    BIAS_THRESH = 0.05
    TRIES = 5
    
    for problem_type in problem_cols:
        set_title = fg.gray + problem_type + fg.reset
        results = shuffle_test( model=model1, X=features, y=target, columns=problem_cols[problem_type],
                                   title=f"{title1} {set_title}", tries=TRIES, accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m1_results['bias_passes'] += results['bias_passes']
        m1_results['accuracy_passes'] += results['accuracy_passes']
        m1_results['tests'] += results['tests']
        
        results = shuffle_test( model=model2, X=features, y=target, columns=problem_cols[problem_type],
                                   title=f"{title2} {set_title}", tries=TRIES, accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m2_results['bias_passes'] += results['bias_passes']
        m2_results['accuracy_passes'] += results['accuracy_passes']
        m2_results['tests'] += results['tests']
        
    aggregate_results( m1_results, m2_results )

#### Results

In [24]:
shuffle_tests( good_model, bad_model, GOOD_TITLE, BAD_TITLE )

= Partition Testing [32mGood Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Test 0 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 1 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 2 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 3 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 4 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Total Passes | Accuracy: 5/5 | Bias passes: 5/5

= Partition Testing [31mBad Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Test 0 | Accuracy: 0.9792 ([32mpass[0m) | Changed: 0.0066 ([32mpass[0m)
Test 1 | Accuracy: 0.9791 ([32mpass[0m) | Changed: 0.0068 ([32mpass[0m)
Test 2 | Accuracy: 0.9791 ([32mpass[0m) | Changed: 0.0067 ([32mpass[0m)
Test 3 | Accuracy: 0.9790 ([32mpass[0m) | Changed: 0.0068 ([32mpass[0m

### Flip Testing

In [25]:
def flip_test( model, X, y, columns, title, accuracy_threshold=0.9, bias_threshold=0.05 ):
    passes, accuracy_passes = 0, 0
    y_pred_orig = model.predict( X )

    title_string = f"= Partition Testing {title} | Accuracy Threshold: {style.bold}{accuracy_threshold}{style.reset} | Bias Threshold: {style.bold}{bias_threshold}{style.reset} ="
    dashes = len(title_string) - 38
    print( "="*dashes )
    print( title_string )
    print( "="*dashes )
    X_alt = flip_columns( X, columns )
    y_pred = model.predict( X_alt )
    accuracy = (y_pred==y).mean()
    accuracy_passes = 1 if accuracy > accuracy_threshold else 0

    changed_count = ( y_pred != y_pred_orig ).mean()
    passes = 1 if changed_count < bias_threshold else 0
    print( f"Result | "
           f"Accuracy: {accuracy:.4f} ({ fg.green + 'pass' + style.reset if accuracy_passes==1 else fg.red + 'fail' + style.reset }) | "
           f"Changed: {changed_count:.4f} ({ fg.green + 'pass' + style.reset if passes==1 else fg.red + 'fail' + style.reset })\n"
         )

    return {
        'bias_passes': passes,
        'accuracy_passes': accuracy_passes,
        'tests': 1
    }

In [26]:
def flip_tests( model1, model2, title1, title2 ):
    m1_results = { 'title': title1, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }
    m2_results = { 'title': title2, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }

    BIAS_THRESH = 0.05
    
    for problem_type in problem_cols:
        set_title = fg.gray + problem_type + fg.reset
        results = flip_test( model=model1, X=features, y=target, columns=problem_cols[problem_type],
                                title=f"{title1} {set_title}", accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m1_results['bias_passes'] += results['bias_passes']
        m1_results['accuracy_passes'] += results['accuracy_passes']
        m1_results['tests'] += results['tests']
        
        results = flip_test( model=model2, X=features, y=target, columns=problem_cols[problem_type],
                                title=f"{title2} {set_title}", accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m2_results['bias_passes'] += results['bias_passes']
        m2_results['accuracy_passes'] += results['accuracy_passes']
        m2_results['tests'] += results['tests']
        
    aggregate_results( m1_results, m2_results )

#### Results

In [27]:
flip_tests( good_model, bad_model, GOOD_TITLE, BAD_TITLE )

= Partition Testing [32mGood Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Result | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)

= Partition Testing [31mBad Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Result | Accuracy: 0.9661 ([32mpass[0m) | Changed: 0.0226 ([32mpass[0m)

= Partition Testing [32mGood Model[39m [37mmedical[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Result | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)

= Partition Testing [31mBad Model[39m [37mmedical[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Result | Accuracy: 0.8861 ([32mpass[0m) | Changed: 0.1104 ([31mfail[0m)

= Partition Testing [32mGood Model[39m [37mracial[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Result | Accuracy: 0.9591 ([32mpass[

### Noise Test

In [28]:
def noise_test( model, X, y, columns, title, tries=5, accuracy_threshold=0.9, bias_threshold=0.05 ):
    passes, accuracy_passes = 0, 0
    checked_per_try = np.empty( tries )
    y_pred_orig = model.predict( X )

    title_string = f"= Noise Testing {title} | Accuracy Threshold: {style.bold}{accuracy_threshold}{style.reset} | Bias Threshold: {style.bold}{bias_threshold}{style.reset} ="
    dashes = len(title_string) - 38
    print( "="*dashes )
    print( title_string )
    print( "="*dashes )
    for idx in range(tries):
        X_alt = add_noise_to_columns( X, columns, noise_scale=2.0 )
        y_pred = model.predict( X_alt )

        accuracy = (y_pred==y).mean()
        changed_count = ( y_pred != y_pred_orig ).mean()

        if accuracy >= accuracy_threshold:
            accuracy_passes += 1
        if changed_count < bias_threshold:
            passes += 1

        print( f"Test {idx} | "
               f"Accuracy: {accuracy:.4f} ({ fg.green + 'pass' + style.reset if accuracy >= accuracy_threshold else fg.red + 'fail' + style.reset }) | "
               f"Changed: {changed_count:.4f} ({ fg.green + 'pass' + style.reset if changed_count < bias_threshold else fg.red + 'fail' + style.reset })"
             )

    print( f"Total Passes |"
        f" Accuracy: {accuracy_passes}/{tries} |"
        f" Bias passes: {passes}/{tries}\n" )

    return {
        'bias_passes': passes,
        'accuracy_passes': accuracy_passes,
        'tests': tries
    }

In [29]:
def noise_tests( model1, model2, title1, title2 ):
    m1_results = { 'title': title1, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }
    m2_results = { 'title': title2, 'bias_passes': 0, 'accuracy_passes': 0, 'tests': 0 }

    BIAS_THRESH = 0.05
    TRIES = 5
    
    for problem_type in problem_cols:
        set_title = fg.gray + problem_type + fg.reset
        results = noise_test( model=model1, X=features, y=target, columns=problem_cols[problem_type],
                                 title=f"{title1} {set_title}", tries=TRIES, accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m1_results['bias_passes'] += results['bias_passes']
        m1_results['accuracy_passes'] += results['accuracy_passes']
        m1_results['tests'] += results['tests']
        
        results = noise_test( model=model2, X=features, y=target, columns=problem_cols[problem_type],
                                 title=f"{title2} {set_title}", tries=TRIES, accuracy_threshold=ACC_THRESH, bias_threshold=BIAS_THRESH )
        m2_results['bias_passes'] += results['bias_passes']
        m2_results['accuracy_passes'] += results['accuracy_passes']
        m2_results['tests'] += results['tests']
        
    aggregate_results( m1_results, m2_results )    

#### Results

In [30]:
noise_tests( good_model, bad_model, GOOD_TITLE, BAD_TITLE )

= Noise Testing [32mGood Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Test 0 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 1 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 2 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 3 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Test 4 | Accuracy: 0.9591 ([32mpass[0m) | Changed: 0.0000 ([32mpass[0m)
Total Passes | Accuracy: 5/5 | Bias passes: 5/5

= Noise Testing [31mBad Model[39m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.05[0m =
Test 0 | Accuracy: 0.9736 ([32mpass[0m) | Changed: 0.0133 ([32mpass[0m)
Test 1 | Accuracy: 0.9733 ([32mpass[0m) | Changed: 0.0138 ([32mpass[0m)
Test 2 | Accuracy: 0.9732 ([32mpass[0m) | Changed: 0.0136 ([32mpass[0m)
Test 3 | Accuracy: 0.9733 ([32mpass[0m) | Changed: 0.0139 ([32mpass[0m)
Test 4

In [None]:
def data_leakage_test( model, X, y, columns, title, tries=5, accuracy_threshold=0.9, bias_threshold=0.05 ):
    with torch.no_grad

### Group 2 Tests

In [33]:
class SklearnModel:
    def __init__( self, filename ):
        self.session = rt.InferenceSession(filename)

    def predict( self, X ):
        return self.session.run(None, {'X': X.values.astype(np.float32)})[0]

model1 = SklearnModel("models/model1_1.onnx")
model2 = SklearnModel("models/model1_2.onnx")

M1_TITLE = fg.cyan + "Model A" + style.reset
M2_TITLE = fg.cyan + "Model B" + style.reset

In [34]:
partition_tests( model1, model2, M1_TITLE, M2_TITLE )

= Partition Testing [36mModel A[0m [37mpsychological[39m | Accuracy Threshold: [01m0.85[0m | Bias Threshold: [01m0.1[0m =


ValueError: Required inputs (['input']) are missing from input feed (['X']).

In [None]:
shuffle_tests( model1, model2, M1_TITLE, M2_TITLE )

In [None]:
flip_tests( model1, model2, M1_TITLE, M2_TITLE )

In [None]:
noise_tests( model1, model2, M1_TITLE, M2_TITLE )