In [300]:
%pip install pandas numpy scikit-learn ucimlrepo lime

Note: you may need to restart the kernel to use updated packages.




In [301]:
import pandas as pd
import numpy as np
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import OneHotEncoder, Normalizer, LabelEncoder
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
from sklearn.mixture import GaussianMixture, BayesianGaussianMixture
from sklearn.svm import SVC

from ucimlrepo import fetch_ucirepo 
import lime
from lime import lime_tabular

import pickle as pkl

from bayes_opt import BayesianOptimization
from tqdm import tqdm
import matplotlib.pyplot as plt

from scipy.stats import uniform, norm
from scipy.special import softmax
import shap

MODEL_NAME = 'Wine'

MODEL_FUNCTION = DecisionTreeClassifier
model_params = {'random_state': 1340304}

coding = {"Wine": 109, 'German Credit': 144}


In [302]:
# fetch dataset 
split_size = 0.3
seed = 1234567

dataset_id = coding[MODEL_NAME]

wine = fetch_ucirepo(id=109) 

In [303]:

# data (as pandas dataframes) 
X = wine.data.features
y = wine.data.targets.to_numpy()
y = y.reshape((len(y), ))
    

n_classes = len(np.unique(y))
n_feats = X.shape[1]
x_train, x_test, y_train, y_test = train_test_split(X, y, stratify = y, test_size = split_size, random_state = seed)

normalizer = Normalizer().fit(x_train)
encoder = LabelEncoder().fit(y_train)

x_train = normalizer.transform(x_train)
x_test = normalizer.transform(x_test)
y_train = encoder.transform(y_train)
y_test  = encoder.transform(y_test)




In [304]:
def evaluate_model(model, samples, targets):
    pred = model.predict(samples)
    return accuracy_score(targets, pred)

In [305]:
model = MODEL_FUNCTION(**model_params)
model.fit(x_train, y_train)

acc_train = evaluate_model(model, x_train, y_train)
acc_test  = evaluate_model(model, x_test, y_test)

print(acc_train, acc_test)

1.0 1.0


### Interpretability

In [306]:
def get_shap_feature_importances(model, x_train):
    '''Get scores with shap'''
    samples = np.random.choice(list(range(len(x_train))), min(5, len(x_train)), replace = False)
    se = shap.KernelExplainer(model.predict, x_train[samples])
    shap_values = se.shap_values(x_train)
    importance_order = np.argsort(-abs(np.abs(np.array(shap_values)).mean(axis = 0)))
    importances = np.abs(np.array(shap_values)).mean(axis = 0)[importance_order]
    return importance_order, importances

order, importances = get_shap_feature_importances(model, x_train)

  0%|          | 0/124 [00:00<?, ?it/s]

In [307]:
class Local2GlobalExplainer:
    def __init__(self, x_train, model, n_classes, components = n_classes*4):
        '''Initialization function for Local2GlobalExplainer
        
        Arguments:
        1. x_train: training data
        2. model: the model to run explanations for
        3. n_classes: number of classes'''
        self.model = model
        self.data = x_train
        
        # LIME model
        self.explainer = lime_tabular.LimeTabularExplainer(
            training_data=self.data, 
            mode = 'classification'     
            )
        
        
        # cached mcmc explanations
        self.mcmc_explanations = None
        self.mcmc_agg = None
        
        # cached importance sampling explanations
        self.imp_explanations = None
        self.imp_agg = None
        
        # fitting a gaussian mixture model to take care of multimodal data
        self.gmm = BayesianGaussianMixture(n_components = components).fit(x_train)
        
    def get_optimal_gmm(n_components):
        '''Helper function 
        
        **UNUSED**
        
        '''
        c = round(n_components)
        gmm = GaussianMixture(n_components = c).fit(x_train)
        return gmm.bic(x_train)
        
    def get_local_interpretation(self, sample):
        '''Function to get LIME interpretations for a sample
        
        Arguments:
        1. sample: the incoming sample
        2. num_features: number of features to '''
        exp = self.explainer.explain_instance(sample, self.model.predict_proba, num_features = len(sample))
        local_exp = list(exp.local_exp.values())[0]
        local_exp = sorted(local_exp)
        
        explanations = [x[1] for x in local_exp]
        return explanations
    
    def rank_explanations(self, explanations):
        '''Helper function to rank explanations sorted in the order of highest magnitude to lowest magnitude
        
        Arguments:
        1. explanations: aggregated explanations
        
        Returns:
        1. sorted list for explanations with feat indices and corresponding importances'''
        return sorted(list(zip(range(len(explanations)), explanations)), key = lambda x: -abs(x[1]))
    
    def get_only_feature_importance(self, explanations):
        '''Helper function to only get features
        
        Arguments:
        1. explanations: aggregated explanations
        
        Returns:
        1. Sorted list for explanations with just feature indices'''
        ranks = self.rank_explanations(explanations)
        return [x[0] for x in ranks]
    
    def mcmc_estimate(self, num_samples):
        '''Function to run Markov chain Monte Carlo approx based explanations. Samples q(x) from a standard normal distribution
        
        Arguments:
        1. num_samples: number of samples to sample from
        
        Returns:
        1. agg_explanations: an array containing aggregated explanations
        2. explanations: explanations for each sample'''
        samples, gmm_class = self.gmm.sample(num_samples)       # generate samples from the fit gmm
        explanations = []                                       # list to store explanations
        for sample in tqdm(samples):
            interpret = np.array(self.get_local_interpretation(sample))
            # sigmoid_interpretation = self.get_scores(interpret)
            interpret = np.abs(interpret)
            explanations.append(interpret)
            
        agg_explanations = np.mean(np.array(explanations), axis = 0)        # aggregating
        
        self.mcmc_agg = explanations
        self.agg_explanations = agg_explanations
        return agg_explanations, explanations
    
    def get_scores(self, ex):
        '''Function to normalize scores using a softmax function multiplied by correlation signs'''
        signs = np.sign(ex)
        abs_softmax = softmax(np.abs(ex))
        return abs_softmax*signs
    
    def importance_sampling(self, num_samples):
        '''Function to run importance sampling for explanations. Samples q(x) from a standard normal distribution
        
        Arguments:
        1. num_samples: number of samples to sample from
        
        Returns:
        1. agg_explanations: an array containing aggregated explanations
        2. explanations: explanations for each sample'''
        q = np.random.randn(num_samples)
        samples, gmm_class = self.gmm.sample(num_samples)       # generate samples from the fit gmm
        scores = np.exp(self.gmm.score_samples(samples))        # p(x)
        qx = norm.pdf(q)                                        # q(x)
        
        importance = scores/qx
        explanations = []
        
        for i in tqdm(range(num_samples)):
            interpret = np.array(self.get_local_interpretation(samples[i]))
            # sigmoid_interpretation = self.get_scores(interpret)
            importance_weighted_interpretation = np.abs(interpret*importance[i])
            explanations.append(importance_weighted_interpretation)
            
        agg_explanations = np.mean(np.array(explanations), axis = 0)
        self.imp_explanations = explanations
        self.imp_agg = agg_explanations
        
        return agg_explanations, explanations
        

## Testing

In [308]:
def evaluate_model_feats_removed(x_train, y_train, x_test, y_test, n_feats, feat_indices, k = 4, trials = 15):
    selected_feats = list(set(np.arange(n_feats)).difference(feat_indices[:k]))      # exclude top k features
    x_t = x_train[:, selected_feats]
    x_tst = x_test[:, selected_feats]
    
    train_accs = []
    test_accs = []
    
    for i in range(trials):

        m = MODEL_FUNCTION(**model_params).fit(x_t, y_train)
        acc_train = evaluate_model(m, x_t, y_train)
        acc_test  = evaluate_model(m, x_tst, y_test)
        
        train_accs.append(acc_train)
        test_accs.append(acc_test)

        
    train_accs, test_accs = np.array(train_accs), np.array(test_accs)
    
    return {'train_acc': train_accs.mean(),
            'train_var': train_accs.var(),
            'test_acc': test_accs.mean(),
            'test_var': test_accs.var()}

### Local2Global

In [309]:
#setup

num_trials = 500

#shap
order_shap, _ = get_shap_feature_importances(model, x_train)

#fit
l2g_exp = Local2GlobalExplainer(x_train, model, n_classes)
mc_exp, _ = l2g_exp.mcmc_estimate(num_trials)
is_exp, _ = l2g_exp.importance_sampling(num_trials)

#order calculation
order_mc = l2g_exp.get_only_feature_importance(mc_exp)
order_is = l2g_exp.get_only_feature_importance(is_exp)

  0%|          | 0/124 [00:00<?, ?it/s]

100%|██████████| 500/500 [00:07<00:00, 63.85it/s]
100%|██████████| 500/500 [00:07<00:00, 63.32it/s]


In [310]:
num_feats_removed_limit = 6
trials = 100

scores = pd.DataFrame()

for rem_feats in range(1, num_feats_removed_limit+1):
    shap_scores =  evaluate_model_feats_removed(x_train, y_train, x_test, y_test, n_feats, order_shap, k = rem_feats, trials = trials)
    s_df = pd.DataFrame(shap_scores, index = [0])
    s_df = s_df.drop(columns = ['train_acc', 'train_var']).rename(columns = {'test_acc': '(SHAP) Test acc', 'test_var': '(SHAP) Test var'})
    
    mc_scores =  evaluate_model_feats_removed(x_train, y_train, x_test, y_test, n_feats, order_mc, k = rem_feats, trials = trials)
    mc_df = pd.DataFrame(mc_scores, index = [0])
    mc_df =mc_df.drop(columns = ['train_acc', 'train_var']).rename(columns = {'test_acc': '(MCMC) Test acc', 'test_var': '(MCMC) Test var'})
    
    
    is_scores =  evaluate_model_feats_removed(x_train, y_train, x_test, y_test, n_feats, order_is, k = rem_feats, trials = trials)
    is_df = pd.DataFrame(is_scores, index = [0])
    is_df = is_df.drop(columns = ['train_acc', 'train_var']).rename(columns = {'test_acc': '(Imp. Samp.) Test acc', 'test_var': '(Imp. Samp.) Test var'})
    
    
    test = pd.concat([s_df, mc_df, is_df], axis = 1)
    test['Features Removed'] = rem_feats
    
    scores = pd.concat([scores, test], axis = 0)

scores = scores.reset_index(drop = True)
scores['Dataset'] = MODEL_NAME

order = list(scores.columns[-2:]) + list(scores.columns[:-2])
scores = scores[order].set_index("Features Removed")
scores

Unnamed: 0_level_0,Dataset,(SHAP) Test acc,(SHAP) Test var,(MCMC) Test acc,(MCMC) Test var,(Imp. Samp.) Test acc,(Imp. Samp.) Test var
Features Removed,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1,Wine,0.944444,0.0,0.944444,0.0,0.944444,0.0
2,Wine,0.925926,4.930381e-32,0.87037,1.2325950000000001e-32,0.981481,1.2325950000000001e-32
3,Wine,0.907407,4.930381e-32,0.851852,4.930381e-32,0.851852,4.930381e-32
4,Wine,0.851852,4.930381e-32,0.851852,4.930381e-32,0.87037,1.2325950000000001e-32
5,Wine,0.833333,4.930381e-32,0.888889,1.2325950000000001e-32,0.888889,1.2325950000000001e-32
6,Wine,0.833333,4.930381e-32,0.833333,4.930381e-32,0.851852,4.930381e-32


In [311]:
scores.to_csv(f"benchmarks_{MODEL_NAME}.csv", index = True)