In [30]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual, Layout
import ipywidgets as widgets
from IPython.display import display, clear_output, Markdown

In [54]:
display(Markdown("## EXPLANATION TOOL"))
intro_output = widgets.Output()
display(intro_output)
with intro_output:
    clear_output(wait=True)
    print("Initializing application...")
    
def print_intro(text):
    with intro_output:
        print(text)
        
def intro_done():
    with intro_output:
        clear_output(wait=True)
        print("Build succesful!")

## EXPLANATION TOOL

Output()

In [55]:
# import DiCE
import dice_ml
from dice_ml.utils import helpers # helper functions

# Tensorflow libraries
import tensorflow as tf
from tensorflow import keras

# supress deprecation warnings from TF
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

In [56]:
###################

# Code deels gebaseerd op https://towardsdatascience.com/explainable-artificial-intelligence-part-3-hands-on-machine-learning-model-interpretation-e8ebe5afc608
# en op de github repo's van LIME en SHAP.

from IPython.display import display
import numpy as np
import pandas as pd
import sklearn
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from lime.lime_tabular import LimeTabularExplainer
import xgboost
import shap
import time
import os.path
with intro_output:
    shap.initjs()

In [57]:
dataset = helpers.load_adult_income_dataset()
print_intro("Dataset loaded")

In [58]:
# Creating DiCE data object
continuous_features=['age', 'hours_per_week']
d = dice_ml.Data(dataframe=dataset, continuous_features=continuous_features, outcome_name='income')

In [59]:
############################
# Creating dataset for LIME
feature_names = dataset.columns.to_list()[:-1] # weggelaten: education_num, relationship, capital gain, capital loss, country
labels = dataset.iloc[:,-1].to_numpy()
data = dataset.iloc[:,:-1].to_numpy()
class_names = np.array(['<=50K', '>50K'])
categorical_features = [feature for feature in feature_names if feature not in continuous_features]

In [60]:
##################################
# Encode categorical features
categorical_features = [1,2,3,4,5,6]
categorical_names = {}
categorical_names_indexed_by_name = {}
for feature in categorical_features:
    le = LabelEncoder()
    le.fit(data[:, feature])
    data[:, feature] = le.transform(data[:, feature])
    categorical_names[feature] = le.classes_
    categorical_names_indexed_by_name[feature_names[feature]] = le.classes_
    
def category_number_to_name(instance):
    result = instance.astype('<U26')
    for categorical_index in categorical_names.keys():
        cat_label = int(instance[categorical_index])
        result[categorical_index] = categorical_names[categorical_index][cat_label]
    return result

def category_name_to_number(instance):
    result = np.copy(instance)
    for categorical_index in categorical_names.keys():
        number = np.where(categorical_names[categorical_index] == instance[categorical_index])
        result[categorical_index] = number[0][0]
    return result

In [61]:
#################################
# Splitting train and test
# using same random state for "data" (LIME) and "dataset (DiCE)"
random_state = 17
data = data.astype(float)
np.random.seed(1)
X_train_lime, X_test_lime, y_train_lime, y_test_lime = train_test_split(data, labels, random_state = random_state, 
                                                                        test_size=0.2)
print_intro("Dataset preprocessed")

In [62]:
# seeding random numbers for reproducability
from numpy.random import seed
seed(1)
from tensorflow import set_random_seed
set_random_seed(2)

In [63]:
# try to load model, if not exists: train model
model_name = "keras_ann_v1"
model_path = "../models/"+model_name+".h5"
sess = tf.InteractiveSession()

if (os.path.isfile(model_path)):
    ann_model = tf.keras.models.load_model(model_path)
    print_intro("Model loaded from disk")
    
else:
    print_intro("Training model")

    train, test = train_test_split(d.normalize_data(d.one_hot_encoded_data), random_state=random_state, test_size=0.2)
    X_train = train.loc[:, train.columns != 'income']
    y_train = train.loc[:, train.columns == 'income']

    X_test = test.loc[:, test.columns != 'income']
    y_test = test.loc[:, test.columns == 'income']

    ann_model = keras.Sequential()
    ann_model.add(keras.layers.Dense(20, input_shape=(X_train.shape[1],), kernel_regularizer=keras.regularizers.l1(0.001), activation=tf.nn.relu))
    ann_model.add(keras.layers.Dense(1, activation=tf.nn.sigmoid))

    ann_model.compile(loss='binary_crossentropy', optimizer=tf.keras.optimizers.Adam(0.01), metrics=['accuracy'])
    ann_model.fit(X_train, y_train, validation_split=0.20, epochs=100, verbose=1, class_weight={0:1,1:2})
    print("accuracy: "+str(ann_model.history.history['acc'][-1]))
    # the training will take some time for 100 epochs.
    # you can wait or set verbose=1 to see the progress of training.
    
    # save model
    ann_model.save("../models/"+model_name+".h5")
    print_intro("Model saved to disk")

An interactive session is already active. This can cause out-of-memory errors in some cases. You must explicitly call `InteractiveSession.close()` to release resources held by the other session(s).


In [64]:
# DEPRECATED (OLD METHOD)
def instance_to_dictionary(instance):
    result = {}
    for i in range(0,len(instance)):
        feature = d.feature_names[i]
        if feature in d.continuous_feature_names:
            result[feature] = int(float(instance[i]))
        else:
            result[feature] = instance[i]
    return result

# numerieke lime categorien -> originele categorische namen --> dictionary --> one-hot-encode + normalisatie --> prediction_probability
def predict_fn_lime(instances):
    result = []
    for instance in instances:
        instance_categorical = category_number_to_name(instance)
        instance_dict = instance_to_dictionary(instance_categorical)
        
        instance_transformed = d.prepare_query_instance(instance_dict, True)
        
        instance_prediction = ann_model.predict(instance_transformed)
        result.append([1-instance_prediction[0][0], instance_prediction[0][0]])
        
    return np.asarray(result)

In [65]:
############################

def inverse_categories(instances):
    result = instances.astype('<U26')
    for categorical_index in categorical_names.keys():
        cat_labels = instances[:,categorical_index]
        result[:,categorical_index] = [categorical_names[categorical_index][int(cat_label)] for cat_label in cat_labels]
    return result

def lime_instances_to_dice_ml(instances):
    base_frame = d.prepare_df_for_encoding()
    
    instances_categorical = inverse_categories(instances)

    instance_dataframe = pd.DataFrame(instances_categorical,columns=feature_names)
    instance_dataframe['age'] = pd.to_numeric(instance_dataframe['age'])
    instance_dataframe['hours_per_week'] = pd.to_numeric(instance_dataframe['hours_per_week'])

    """Prepares user defined test input for DiCE."""
    temp = base_frame.append(instance_dataframe, ignore_index=True, sort=False)
    temp = d.one_hot_encode_data(temp)
    temp = d.normalize_data(temp)
    final = temp.tail(instance_dataframe.shape[0]).reset_index(drop=True)
    return final

def lime_instance_to_dict(instance):
    temp = dict(zip(feature_names,category_number_to_name(instance)))
    temp['age'] = int(float(temp['age']))
    temp['hours_per_week'] = int(float(temp['hours_per_week']))
    return temp

def dict_to_lime_instance(instance_dict):
    instance_array = np.array(list(instance_dict.values()))
    return category_name_to_number(instance_array).astype('float')

def pd_to_dataframe(instance):
    return pd.DataFrame(instance.reshape(-1, len(instance)),columns=feature_names)

def predict_fn_lime_superquick(instances):
    converted_to_dice = lime_instances_to_dice_ml(instances)
    predictions = ann_model.predict(converted_to_dice)
    return np.append(1-predictions, predictions, axis=1)

def predict_fn_shap_superquick(instances):
    converted_to_dice = lime_instances_to_dice_ml(instances)
    predictions = ann_model.predict(converted_to_dice)
    return np.array([pred[0] for pred in predictions])

In [66]:
# provide the trained ML model to DiCE's model object
backend = 'TF'+tf.__version__[0] # TF1
m = dice_ml.Model(model=ann_model, backend=backend) 

In [67]:
# initiate DiCE
exp = dice_ml.Dice(d, m)

In [68]:
###############################
explainer_lime = LimeTabularExplainer(X_train_lime ,feature_names = feature_names,class_names=class_names,
                                                   categorical_features=categorical_features, 
                                                   categorical_names=categorical_names, kernel_width=3)

In [69]:
%%capture 
# %%capture dient om een vervelende warning te onderdrukken (doet alle output van deze cell weg)
############################## SHAP

# Median is simpelweg de mediaan van elke feature in de training set. In deze setting wordt telkens de mediaan van een feature als baseline gebruikt (ze krijgt deze waarde bij het "wegdoen")
# INFO: De base value op de figuur is de uitkomst die de blackbox geeft voor de mediaan als input.
med = np.median(X_train_lime, axis=0).reshape((1,X_train_lime.shape[1]))

explainer_shap = shap.KernelExplainer(predict_fn_shap_superquick, med)

def plot_explanation_for_shap(instance):
    shap_values_for_instance = explainer_shap.shap_values(instance, nsamples=1000)
    return shap.force_plot(explainer_shap.expected_value, shap_values_for_instance, category_number_to_name(instance), feature_names = feature_names)

In [70]:
def print_model_prediction(instance):
    pred_prob = predict_fn_shap_superquick(np.array([instance]))[0]
    if pred_prob > 0.5:
        return "Model prediction: >50K (probability: "+str(round(pred_prob,2))+")"
    else:
        return "Model prediction: <50K (probability: "+str(round(1-pred_prob,2))+")"

In [71]:
def initialize_adjusters(instance):
    feature_adjusters = {}
    style = {'description_width': '100px'}
    for feature in feature_names:
        if feature in continuous_features:
            feature_range = d.get_features_range()[feature]
            adjuster = widgets.IntSlider(value=int(float(instance[feature])), min=feature_range[0], max=feature_range[1], step=1, description=feature, disabled=False, continuous_update=False, 
                                       orientation='horizontal', readout=True, readout_format='d', style=style, layout=Layout(width='425px'))
        else:
            subcategories = categorical_names_indexed_by_name[feature]
            adjuster = widgets.Dropdown(options=subcategories, value=str(instance[feature][0]), description=feature, disabled=False, style=style, layout=Layout(width='400px'))
            
        feature_adjusters[feature] = adjuster
    return feature_adjusters

def get_adjusters_values(adjusters):
    return {item[0]:item[1].value for item in adjusters.items()}

def get_dataframe_from_adjusters(adjusters):
    values = list(get_adjusters_values(adjusters).values())
    return pd.DataFrame([values],columns=feature_names)

def set_adjuster_values(adjusters, values):
    for feature in feature_names:
        if feature in continuous_features:
            adjusters[feature].value = int(float(values[feature]))
        else:
            adjusters[feature].value = str(values[feature][0])

In [72]:
intro_done()

In [23]:
test = pd_to_dataframe(category_number_to_name(X_test_lime[0,:]))
adjusters = initialize_adjusters(test)
output_explanation = widgets.Output()

def reset_adjusters(b):
    set_adjuster_values(adjusters,test)
    with output_explanation:
        clear_output(wait=True)
        print("Reset to default")

button_reset = widgets.Button(description="Reset", button_style = 'warning')
button_reset.on_click(reset_adjusters)
    

def on_button_explain_clicked(b):
    with output_explanation:
        clear_output(wait=True)
        instance_dataframe = get_dataframe_from_adjusters(adjusters)
        instance_lime = category_name_to_number(instance_dataframe.to_numpy()[0]).astype('float')
        
        display(instance_dataframe)
        display(Markdown(print_model_prediction(instance_lime)+"<br><br>"))
        
        display(Markdown("#### __LIME explanation__"))
        explainer_lime.explain_instance(instance_lime, predict_fn_lime_superquick, num_features=5).show_in_notebook(show_all=False)
        
        display(Markdown("#### __SHAP explanation__"))
        display(plot_explanation_for_shap(instance_lime))
        
        display(Markdown("#### __Counterfactual explanation__"))
        dice_exp = exp.generate_counterfactuals(lime_instance_to_dict(instance_lime), total_CFs=4, desired_class="opposite")
        dice_exp.visualize_as_dataframe(show_only_changes=True)

button_explain = widgets.Button(description="Predict and Explain", button_style = 'primary')
button_explain.on_click(on_button_explain_clicked)

display(widgets.VBox(list(adjusters.values())))
display(widgets.HBox([button_reset, button_explain]))
display(output_explanation)




## EXPLANATION TOOL

VBox(children=(IntSlider(value=54, continuous_update=False, description='age', layout=Layout(width='425px'), m…



Output()