Explaining Models with SHAP
===

Author: Nathan A. Mahynski

Date: 2023/08/23

Description: This notebook demonstrates the use of SHAP to explain model predictions.

For "soft" models / OCC, we need to look at the likelihood a sample is assigned as an inlier or not; when models combine many of these (e.g., PLS-DA) we need to pull those apart to inspect each known class.  Typically, a model may be trained and TEFF optimized.  From this, optimal hyperparameters can be extracted then individual models retrained and examined to explain predictions. 

In [None]:
if 'google.colab' in str(get_ipython()):
    !pip install git+https://github.com/mahynski/pychemauth@main
    import os
    os.kill(os.getpid(), 9) # Automatically restart the runtime to reload libraries

In [None]:
using_colab = 'google.colab' in str(get_ipython())
if using_colab:
    !pip install git+https://github.com/mahynski/pychemauth@main.

try:
    import pychemauth
except:
    raise ImportError("pychemauth not installed")

import matplotlib.pyplot as plt
%matplotlib inline

import watermark
%load_ext watermark

%load_ext autoreload
%autoreload 2

import shap
shap.initjs()

In [None]:
import imblearn
import sklearn

from sklearn.model_selection import GridSearchCV

import numpy as np
import pandas as pd

In [None]:
import bokeh
from bokeh.plotting import output_notebook
from pychemauth.utils import bokeh_color_spectrum
from pychemauth.utils import color_spectrum
output_notebook()

In [None]:
%watermark -t -m -v --iversions

Analyze a DD-SIMCA Model
---

<h3>Load the Data</h3>

In [None]:
# Let's load some data from the tests/ for this example
if using_colab:
    loc = 'https://raw.githubusercontent.com/mahynski/pychemauth/main/tests/data/simca_train.csv'
else:
    loc = '../tests/data/simca_train.csv'
    
loc = 'https://raw.githubusercontent.com/mahynski/pychemauth/main/tests/data/simca_train.csv'
    
df = pd.read_csv(loc)

In [None]:
# You can see that samples are rows, columns are different features
df.head()

In [None]:
raw_x = np.array(df.values[:,3:], dtype=float) # Extract features
raw_y = np.array(df['Class'].values, dtype=str) # Take the class as the target

X_train = raw_x

In [None]:
from pychemauth.classifier.simca import SIMCA_Classifier
from pychemauth.preprocessing.scaling import CorrectedScaler
from pychemauth.preprocessing.filter import SavGol

<h3>A Simple Model</h3>

In [None]:
# Here the data are spectra so we will not scale the X data
simca = SIMCA_Classifier(n_components=7, alpha=0.05, scale_x=False, style='dd-simca', use='rigorous', target_class='Pure')
_ = simca.fit(X_train, raw_y)
_ = simca.model.visualize(X_train, raw_y)

In [None]:
mask1 = simca.predict_proba(X_train)[:,0] > 0.5
mask2 = simca.predict(X_train)
np.all(mask1 == mask2)

In [None]:
simca.predict(X_train)

In [None]:
simca.model.accuracy(X_train, np.array([True]*len(X_train)))

In [None]:
simca.model.score(X_train, np.array([True]*len(X_train))) # Negative log loss

<h3>An Optimized Pipeline</h3>

In [None]:
# Let's select the features with the highest variance to focus on
from sklearn.feature_selection import GenericUnivariateSelect

def take_top_var(X, y=None):
    return np.std(X, axis=0)

In [None]:
pipeline = imblearn.pipeline.Pipeline(steps=[
    ("savgol", # Remove baselines and smooth by filtering (possibly)
     SavGol(
         window_length=11, 
         deriv=2, 
         polyorder=3)
     ),
     ('selector', # Remove low variance features
      GenericUnivariateSelect(score_func=take_top_var,
                        mode='k_best',
                        param=75, # keep the top 50 (arbitrary choice here)
                       ) 
      ),
    ("simca", 
     SIMCA_Classifier(
         n_components=7, 
         target_class='Pure',
         style='dd-simca',
         use='rigorous',
         alpha=0.05, 
         scale_x=False, # Spectral data
     )
    )
])

# Hyperparameters of pipeline steps are given in standard notation: step__parameter_name
param_grid = [{
    'savgol__window_length': [5, 7, 9, 11, 13, 15, 17, 21],
    'savgol__deriv': [0, 1, 2],
    'savgol__polyorder': [3],
    'simca__n_components': np.arange(1, 10, 2),
}]

model = GridSearchCV(
    estimator=pipeline,
    param_grid=param_grid,
    n_jobs=-1,
    cv=sklearn.model_selection.StratifiedKFold(n_splits=3, shuffle=True, random_state=0),
    error_score=0,
    refit=True,
)

_ = model.fit(
    X_train, 
    raw_y
)

In [None]:
model.best_params_ 

In [None]:
np.sum(model.best_estimator_.named_steps['selector'].get_support()) # Retaining the top 50 channels

In [None]:
# model.cv_results_ # View full CV results

In [None]:
# Look for any ties - in this case, not substantially different
np.array(model.cv_results_['params'])[model.cv_results_['rank_test_score'] == 1]

In [None]:
model.predict_proba(X_train)[:10]

In [None]:
prep = imblearn.pipeline.Pipeline(
   model.best_estimator_.steps[:-1] 
)

In [None]:
# We have trimmed the features with low variance out - the ones that remain have the following distribution of 
# standard deviations over the dataset
plt.hist(np.std(prep.transform(X_train), axis=0))
plt.xlabel(r'$\sigma$')
plt.ylabel('Number of Channels')

In [None]:
# Let's visualize which parts of the spectrum are going to be used - if you are 
# doing this in practice, you might want to check that this makes sense; perhaps 
# the values are the right end of the spectra are not as reliable, for example?
color = ['blue' if not x_ else 'red' for x_ in prep.named_steps['selector'].get_support()]
for x,y,c in zip(np.arange(X_train.shape[1]), X_train[0,:], color):
    plt.plot([x], [y], '.', color=c)

<h3>Create an Explainer</h3>

In [None]:
# The pipeline drops out a lot of features.  To analyze the importance of the ones that remain, we need to break the 
# pipeline into a preprocessing stage and modeling stage.
prep = imblearn.pipeline.Pipeline(steps=model.best_estimator_.steps[:-1])

# These are the features that have been preprocessed and remain after "deletion" of many of the channels
X_train_prep = prep.transform(X_train)

# The model itself (last step in pipeline)
function = model.best_estimator_.named_steps['simca']

In [None]:
# If you have a small dataset with only a few features you can use the entire training dataset for 
# the background (data). In problems with more features you probably want to pass only the median 
# of the training dataset, or weighted k-medians.
# See https://shap.readthedocs.io/en/latest/example_notebooks/tabular_examples/model_agnostic/Iris%20classification%20with%20scikit-learn.html
# Since we have some spectral data with > 400 features, we can just use a random subsample as 
# the background.

# See SHAP documentation for a discussion on the utility and impact of using a "squashing function" to 
# go from an unbounded "margin space" (raw model output) to a bounded probability space.
# https://shap.readthedocs.io/en/latest/example_notebooks/tabular_examples/model_agnostic/Squashing%20Effect.html#Probability-space-explaination
explainer = shap.KernelExplainer(
    model = function.predict_proba, # Use probability function to "squash"
    data = shap.sample(X_train_prep, 25, random_state=0), # vs. data = X_train_prep to use full training set
    seed = 42
)

In [None]:
# If there are many features to explain, this can fail to converge. You can choose to look at just
# the top features as shown below.  Feature correlation can also be a concern.  Using a smaller subset
# of decorrelated inputs.

# Consider BorutaShap, Heirarchical clustering, etc.

shap_values = explainer.shap_values(X_train_prep, 
                                    nsamples='auto', # Can increase for lower variance
                                    gc_collect = True
                                   )

In [None]:
explainer.expected_value 

In [None]:
np.mean(function.predict_proba(X_train_prep)[:,0]) 
# If the background sampling was bad, the mean model prediction will differ greatly from the 
# explainer.expected_value. If this is large, consider increasing the number of background samples.

In [None]:
shap.force_plot(
    base_value=explainer.expected_value[0], # [0] to look at inlier probability
    shap_values=shap_values[0], # [0] to look at inlier probability
    features=X_train[:,prep.named_steps['selector'].get_support()], # Display raw feature values for visualization
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]] # Feature names for visualization
)

In [None]:
# Look at the lowest probability example
look_at = np.argmin(model.predict_proba(X_train)[:,0]) 

shap.force_plot(
    base_value=explainer.expected_value[0], # [0] to look at inlier probability
    shap_values=shap_values[0][look_at,:], # [0] to look at inlier probability
    features=X_train[look_at, prep.named_steps['selector'].get_support()], # Display raw feature values for visualization
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]] # Feature names for visualization
)

In [None]:
shap.decision_plot(
    explainer.expected_value[0], 
    shap_values[0][look_at,:],
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]]
)

In [None]:
ax = color_spectrum(
    y=X_train[look_at,:].reshape(1,-1), 
    x=np.arange(X_train.shape[1]),
    importance_values=model.best_estimator_.named_steps['selector'].inverse_transform(shap_values[0][look_at].reshape(1,-1)), 
    figsize=(20,5),
    cmap='seismic',
    bounds=(-0.05, 0.05),
    background=True
)
ax.set_title('Lowest Probability Example in Training Set')

In [None]:
best = np.argmax(model.predict_proba(X_train)[:,0])
ax = color_spectrum(
    y=X_train[best,:].reshape(1,-1), 
    x=np.arange(X_train.shape[1]),
    importance_values=model.best_estimator_.named_steps['selector'].inverse_transform(shap_values[0][best].reshape(1,-1)), 
    figsize=(20,5),
    cmap='seismic',
    bounds=(-0.05, 0.05),
    background=True
)
ax.set_title('Highest Probability Example in Training Set')

In [None]:
shap.decision_plot(
    explainer.expected_value[0], 
    shap_values[0][best,:],
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]]
)

In [None]:
shap.summary_plot(
    shap_values=shap_values[0],
    features=X_train[:, prep.named_steps['selector'].get_support()],
    plot_type='bar',
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]]
)

In [None]:
# Look at all the decisions made
shap.decision_plot(
    explainer.expected_value[0], 
    shap_values[0],
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]]
)

Analyze an EllipticManifold Model
---

In [None]:
import sklearn.manifold
from pychemauth.manifold.elliptic import EllipticManifold
from pychemauth.preprocessing.scaling import CorrectedScaler
from pychemauth.preprocessing.filter import SavGol
from sklearn.feature_selection import VarianceThreshold

In [None]:
mani = sklearn.manifold.Isomap
kwargs = {"n_neighbors":10, 
          "n_components":2, # Choose 2 for visualization
          "metric":'minkowski',
          "p":2, 
         }

pipeline = imblearn.pipeline.Pipeline(steps=[
    ("savgol",
     SavGol(
         window_length=13, 
         deriv=1, 
         polyorder=3
     ),
    ),
     ('selector',
      GenericUnivariateSelect(score_func=take_top_var,
                        mode='k_best',
                        param=75, # keep the top 50 (arbitrary choice here)
                       ) 
      ),
    ("model", 
     EllipticManifold(0.05, mani, kwargs)
    )
])

In [None]:
_ = pipeline.fit(X_train, raw_y)

In [None]:
prep = imblearn.pipeline.Pipeline(steps=pipeline.steps[:-1])
model = pipeline.named_steps['model']
X_train_prep = prep.transform(X_train)

In [None]:
# This data might require some "cleaning"?
model.visualize([X_train_prep], ["Training Set"])

In [None]:
explainer = shap.KernelExplainer(
    model = model.predict_proba, # Use probability function to "squash"
    data = shap.sample(X_train_prep, 25, random_state=0), 
    seed = 42
)

In [None]:
shap_values = explainer.shap_values(X_train_prep, 
                                    nsamples='auto', # Can increase for lower variance
                                    gc_collect = True
                                   )

In [None]:
explainer.expected_value

In [None]:
np.mean(model.predict_proba(X_train_prep)[:,0]) 

In [None]:
# On AVERAGE, it looks like the most important parts of the spectrum are somewhere around channel ~310 (Similar to SIMCA)
# Here, we did not optimize hyperparamters (used the same) so this is not very surprising
shap.summary_plot(
    shap_values=shap_values[0],
    features=X_train[:, prep.named_steps['selector'].get_support()],
    plot_type='bar',
    feature_names=['Channel {}'.format(i) for i in prep.named_steps['selector'].transform([range(X_train.shape[1])])[0]]
)

In [None]:
# Or, we can color by the average SHAP importance.
# This is useful for visualizing how the model makes predictions on average.
chosen = 0
bokeh_color_spectrum(x=np.arange(X_train.shape[1]), 
                     y=X_train[chosen, :], 
                     importance_values=
                     prep.named_steps['selector'].inverse_transform(
                         np.mean(np.abs(shap_values[0]), axis=0).reshape(1,-1) # Mean ABSOLUTE value
                     )
                    )

In [None]:
# This model is using a 1st derivative and picking out the highest variance channels after SavGol.
# This does NOT correspond to the channels which have the highest variance BEFORE SavGol!

chosen = 0 # Select one example spectra
bokeh_color_spectrum(x=np.arange(X_train.shape[1]), 
                     y=X_train[chosen, :], 
                     importance_values=np.std(X_train, axis=0)**2 # Color based on overall variance in the training data
                     )

Analyze a PLS-DA Model
---

In [None]:
# Let's load some data from the tests/ for this example
if using_colab:
    loc = 'https://raw.githubusercontent.com/mahynski/pychemauth/main/tests/data/plsda3_train.csv'
else:
    loc = '../tests/data/plsda3_train.csv'
    
loc = 'https://raw.githubusercontent.com/mahynski/pychemauth/main/tests/data/plsda3_train.csv'

df = pd.read_csv(loc)

In [None]:
# now using SITE data
df.head()

In [None]:
element_names = df.columns[3:]
raw_x = np.array(df.values[:,3:], dtype=float) # Extract features
raw_y = np.array(df['Class'].values, dtype=str) # Take the class as the target

In [None]:
from pychemauth.classifier.plsda import PLSDA

In [None]:
# Here the data are elemental levels so we will scale the X data
plsda = PLSDA(n_components=5, 
              alpha=0.05, 
              gamma=0.01, 
              not_assigned='UNKNOWN', 
              style="soft", 
              scale_x=True)

In [None]:
_ = plsda.fit(raw_x, raw_y)

In [None]:
_ = plsda.visualize(styles=['hard', 'soft'])

In [None]:
plsda.predict(raw_x)[:10]

In [None]:
plsda.categories

In [None]:
plsda.predict_proba(raw_x)[:10] # For soft model, probabilities are relative to each class and they do not sum to 1

In [None]:
# plsda.predict_proba predicts the probability for EACH individual class (in columns).
# Athough these do not sum to unity, we can still analyze each column individually with SHAP!
# The decision function can also be used which returns < 0 for outliers, and > 0 for inliers in a class - thus 0 is the class membership threshold which can be intutitive to analyze

explainer = shap.KernelExplainer(
    model = plsda.decision_function, 
    data = shap.sample(raw_x, 25, random_state=0), # vs. data = raw_x to use full training set
    seed = 42
)

In [None]:
shap_values = explainer.shap_values(raw_x, 
                                    nsamples='auto', # Can increase for lower variance
#                                     l1_reg='num_features({})'.format(
#                                         np.min(
#                                             [raw_x.shape[1], 20] # Only look at (up to) the top 20 features
#                                         )
#                                     ),
                                    gc_collect = True
                                   )

In [None]:
len(shap_values) # There is a set of shap values for each class

In [None]:
explainer.expected_value # Average for each class used (columnwise)

In [None]:
np.mean(plsda.decision_function(shap.sample(raw_x, 10, random_state=0)), axis=0)

<h3>Examine Categories</h3>

In [None]:
chosen_class = 2 # Which category? 0, 1, or 2

In [None]:
sv = shap_values[chosen_class]
plsda.categories[chosen_class]

shap.summary_plot(
    shap_values=sv,
    features=raw_x,
    feature_names=element_names,
    plot_type='bar'
)

In [None]:
# You can see that the SHAP values add up to the model output (decision function, here) for each point

examine_pt = 12

shap_sum = np.sum(shap_values[chosen_class][examine_pt]) + explainer.expected_value[chosen_class]
pred_prob = plsda.decision_function(raw_x)[examine_pt][chosen_class]

print(shap_sum, pred_prob, shap_sum-pred_prob)

In [None]:
shap.summary_plot(
    shap_values=sv,
    features=raw_x,
    feature_names=element_names,
    plot_type='violin'
)

In [None]:
shap.force_plot(
    base_value=explainer.expected_value[chosen_class],
    shap_values=sv, 
    features=raw_x,
    feature_names=element_names
)

In [None]:
# Look at the point furthest away
look_at = np.argmin(plsda.decision_function(raw_x)[:,chosen_class]) 

shap.force_plot(
    base_value=explainer.expected_value[chosen_class], 
    shap_values=sv[look_at,:], 
    features=raw_x[look_at,:], 
    feature_names=element_names
)

In [None]:
shap.decision_plot(
    explainer.expected_value[chosen_class], 
    sv[look_at,:],
    feature_names=element_names.tolist()
)

In [None]:
# Look at the closest example
look_at = np.argmax(plsda.decision_function(raw_x)[:,chosen_class]) 

shap.force_plot(
    base_value=explainer.expected_value[chosen_class], 
    shap_values=sv[look_at,:], 
    features=raw_x[look_at,:], 
    feature_names=element_names
)

In [None]:
shap.decision_plot(
    explainer.expected_value[chosen_class], 
    sv[look_at,:],
    feature_names=element_names.tolist()
)

<h3>How are decisions made?</h3>

In [None]:
# Let's assess the specificity of the model - we can look at examples of when a sample is predicted to belong
# to a class that it does not.

In [None]:
chosen_class = 0
sv = shap_values[chosen_class]
print('Examining model for : {}'.format(plsda.categories[chosen_class])) # Let's look at samples which belong to this category

In [None]:
compare_class = (chosen_class+1)%3 # Let's see how the model predicts for other classes
print('These samples belong to : {}'.format(plsda.categories[compare_class]))

# Look at samples that belong to compare_class
mask = raw_y == plsda.categories[compare_class]

# These should NOT be predicted to belong to chosen_class, but are predicted to if decision_function > 0
wrong = plsda.decision_function(raw_x[mask])[:,chosen_class] > 0.0

shap.decision_plot(
    explainer.expected_value[chosen_class], 
    sv[mask,:], 
    feature_names=element_names.tolist(),
    highlight=wrong
)
# None are highlighted because all predictions are correct in that they indicate inconsistency with JPN1

In [None]:
# None are wrong here, but some are getting close to 0 which might warrant further consideration.

In [None]:
compare_class = (chosen_class+2)%3 # Let's see how the model predicts for other classes
print('These samples belong to : {}'.format(plsda.categories[compare_class]))

# Look at samples that belong to compare_class
mask = raw_y == plsda.categories[compare_class]

# These should NOT be predicted to belong to chosen_class
wrong = plsda.decision_function(raw_x[mask])[:,chosen_class] > 0.0

shap.decision_plot(
    explainer.expected_value[chosen_class], 
    sv[mask,:], 
    feature_names=element_names.tolist(),
    highlight=wrong
)

In [None]:
# However, in the latent space, these wrong ones appear to be just "on the edge" and could be included
# if alpha (type 1 error rate) was adjusted.  That is not the point - we WANT a few of those on the edge
# to be excluded to draw the boundary, so this appears to be reasonable.
t = plsda.transform(raw_x[mask][~wrong])
plt.plot(t[:,0], t[:,1], 'o')

t = plsda.transform(raw_x[mask][wrong])
plt.plot(t[:,0], t[:,1], 'o')

In [None]:
# First incorrect point
shap.force_plot(
    base_value=explainer.expected_value[chosen_class], 
    shap_values=sv[mask][wrong,:][0], 
    features=raw_x[mask][wrong,:][0], 
    feature_names=element_names
)

In [None]:
# Second incorrect point
shap.force_plot(
    base_value=explainer.expected_value[chosen_class], 
    shap_values=sv[mask][wrong,:][1], 
    features=raw_x[mask][wrong,:][1], 
    feature_names=element_names
)

In [None]:
compare_class = chosen_class # Let's see how the model predicts for other classes
print('These samples belong to : {}'.format(plsda.categories[compare_class]))

# Look at samples that belong to compare_class
mask = raw_y == plsda.categories[compare_class]

# These SHOULD be predicted to belong to chosen_class
wrong = plsda.decision_function(raw_x[mask])[:,chosen_class] <= 0.0

shap.decision_plot(
    explainer.expected_value[chosen_class], 
    sv[mask,:], 
    feature_names=element_names.tolist(),
    highlight=wrong
)

In [None]:
# Although a few are wrong, they are pretty close to the "pack"