In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
from sklearn.model_selection import train_test_split
from data_processing_utils import *

import shap
import ipywidgets as widgets
shap.initjs()

import seaborn as sb
import lime.lime_tabular
from sklearn.linear_model import HuberRegressor

from deepexplain.tensorflow import DeepExplain

# Data

In [None]:
df=pd.read_csv('data.csv',sep=';', decimal=",", header=[0,1], index_col=0)
train_X, test_X, train_Y, test_Y = train_test_split(df.iloc[:,:-5], df.iloc[:,-5:], test_size=0.25, random_state=42)

train_X = train_X.stack().values.reshape(-1,8,11)
test_X = test_X.stack().values.reshape(-1,8,11)

train_Y = train_Y.droplevel(1, axis=1)
test_Y = test_Y.droplevel(1, axis=1)

scaler = Scaler()
scaler.fit(train_X)

# Model

## Init

In [None]:
#Params
batch_size = 64
STEPS_PER_EPOCH = int(train_X.shape[0]/batch_size)

lr_schedule = tf.keras.optimizers.schedules.InverseTimeDecay(
      0.001, decay_steps=STEPS_PER_EPOCH*1000, decay_rate=1, staircase=False)
optimizer = tf.keras.optimizers.Adam(lr_schedule)
loss = tf.keras.losses.mean_squared_error

def init_cnn():
    input_0 = tf.keras.layers.Input(shape=(8, 11, 1,))
    x = tf.keras.layers.Conv2D(32, (3, 3), activation='selu')(input_0)
    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(50,activation ='selu')(x)

    d1 = tf.keras.layers.Dense(1, activation='hard_sigmoid',
                      kernel_initializer=tf.keras.initializers.VarianceScaling(scale=1.0, mode='fan_in'),
                      bias_initializer=tf.keras.initializers.zeros(),
                      kernel_regularizer=tf.keras.regularizers.l2(0.000001))(x)
    d2 = tf.keras.layers.Dense(1, activation='hard_sigmoid',
                      kernel_initializer=tf.keras.initializers.VarianceScaling(scale=1.0, mode='fan_in'),
                      bias_initializer=tf.keras.initializers.zeros(),
                      kernel_regularizer=tf.keras.regularizers.l2(0.000001))(x)
    d3 = tf.keras.layers.Dense(1, activation='hard_sigmoid',
                      kernel_initializer=tf.keras.initializers.VarianceScaling(scale=1.0, mode='fan_in'),
                      bias_initializer=tf.keras.initializers.zeros(),
                      kernel_regularizer=tf.keras.regularizers.l2(0.000001))(x)
    d4 = tf.keras.layers.Dense(1, activation='hard_sigmoid',
                      kernel_initializer=tf.keras.initializers.VarianceScaling(scale=1.0, mode='fan_in'),
                      bias_initializer=tf.keras.initializers.zeros(),
                      kernel_regularizer=tf.keras.regularizers.l2(0.000001))(x)
    d5 = tf.keras.layers.Dense(1, activation='hard_sigmoid',
                      kernel_initializer=tf.keras.initializers.VarianceScaling(scale=1.0, mode='fan_in'),
                      bias_initializer=tf.keras.initializers.zeros(),
                      kernel_regularizer=tf.keras.regularizers.l2(0.000001))(x)

    x = tf.keras.layers.Concatenate(axis=1)([d1, d2, d3, d4, d5])

    model_cnn = tf.keras.models.Model(inputs=input_0, outputs=x, name='CNN')

    model_cnn.compile(optimizer=optimizer, loss=loss)
    return model_cnn

model = init_cnn()

## Train

In [None]:
batch_size = 64
history = model_cnn.fit(
    scaler.transform(train_X).reshape(-1,8,11,1), scale_to_0_1(train_Y).values,
    validation_data= (scaler.transform(test_X).reshape(-1,8,11,1), scale_to_0_1(test_Y).values),
    batch_size=batch_size,
    epochs=100,
    #callbacks =[early_stopping],
    verbose=0,
)

In [None]:
history_df = pd.DataFrame(history.history)
history_df.loc[:, ['loss', 'val_loss']].plot(figsize=(12,8));

# Interprability

## LIME

In [None]:
def flatten_data(data):
    return data.reshape(-1,88)

def unflatten_data(data):
    return data.reshape(-1,8,11,1)

def flatten_cnn(data):
    data = unflatten_data(data)
    return model.predict(data)

In [None]:
X = np.concatenate([train_X, test_X], axis=0).reshape(-1,8,11)
Y = np.concatenate([train_Y, test_Y], axis=0)
#Maturity x Strike

flatten_data(scaler.transform(X))

In [None]:
explainer = lime.lime_tabular.LimeTabularExplainer(flatten_data(scaler.transform(X)), feature_names=[f'T={s[1]} K={s[0]}' for s in df.iloc[:,:-5].columns.values], class_names=['beta'], mode='regression', verbose=False)
exp = explainer.explain_instance(flatten_data(scaler.transform(X))[0], flatten_cnn, num_features=88, num_samples=1500, model_regressor=HuberRegressor(), top_labels=1)

In [None]:
v0_lime= pd.DataFrame(exp.as_map()[0]).sort_values(0)
temp = pd.DataFrame(v0_lime[1].values.reshape(11,8))
cols = df.iloc[:,:-5].columns.levels[0][:-5].values
d = {}
for i in range(11):
    d[i] = f'K={cols[i]}'

temp = temp.rename(d)
    
cols = df.iloc[:,:-5].columns.levels[1][:-5].values
d = {}
for i in range(8):
    d[i] = f'T={cols[i]}'

temp = temp.rename(d,axis=1)

In [None]:
plt.title(f'LIME Attributions Heat Map')
sb.heatmap(temp.abs().T)
#plt.savefig(f'lime_cnn_heatmap.png', bbox_inches = 'tight')

## SHAP

In [None]:
explainer = shap.DeepExplainer(model = model_cnn, data = scaler.transform(train_X).reshape(-1,8,11,1)[:1500])
shap_value = explainer.shap_values(X = scaler.transform(test_X).reshape(-1,8,11,1)[:1500],check_additivity=False)

In [None]:
temp_data = scaler.transform(train_X).reshape(-1,8,11,1)[:1500].copy()

In [None]:
print(f'Current Label Shown: {list_of_labels[current_label.value]}\n')

plt.title(list_of_labels[current_label.value])
shap.summary_plot(shap_values = shap_value[0].reshape(-1,88),
                  features = temp_data.reshape(-1,88),
                  feature_names = [f'T={t} K={k}' for t in maturity for k in strike],
                  show=False
                  )
plt.savefig('v0_shapley_cnn.png', bbox_inches='tight')

### Aggregated bar plot

In [None]:
mean_attr = np.array(shap_value).mean(axis=1).reshape(5,8,11).transpose([0,2,1]).reshape(5,-1)

In [None]:
agg_shap = pd.DataFrame(np.abs(mean_attr), columns = temp_data.columns.values).T

inds = agg_shap.sum(axis=1).sort_values(ascending=True)[-20:].index

agg_shap.loc[inds].plot.barh(width=0.6,stacked=True, figsize=(8,8))
plt.legend([r'$\upsilon_0$', r'$\rho$', r'$\sigma$', r'$\theta$', r'$\kappa$'], frameon=False)
#plt.savefig('shap_cnn_barh_1.png', bbox_inches='tight')

### Heat map

In [None]:
temp = pd.DataFrame(agg_shap.sum(axis=1).values.reshape(11,8))

In [None]:
cols = df.columns.levels[0][:-5].values
d = {}
for i in range(11):
    d[i] = f'K={cols[i]}'

temp = temp.rename(d)
    
cols = df.columns.levels[1][:-5].values
d = {}
for i in range(8):
    d[i] = f'T={cols[i]}'

temp = temp.rename(d,axis=1)

In [None]:
plt.title(f'SHAP Attributions Heat Map')
sb.heatmap(temp.T)
plt.savefig(f'shapley_cnn_heatmap.png', bbox_inches = 'tight')

## E-LRP

In [None]:
X = df.iloc[:,:-5].stack().values.reshape(-1,8,11)
X = scaler.transform(X).reshape(-1,8,11,1)


#xs =zca_process(X)[0:1500].values
title_map = {
    'elrp': 'Epsilon-LRP'
}

In [None]:
xs =X[0:1500]
method_name = "elrp"

In [None]:
session = tf.keras.backend.get_session()
with DeepExplain(session=session) as de:
    input_tensors = model.inputs
    fModel = tf.keras.models.Model(inputs = input_tensors, outputs = model.outputs)
    target_tensor = fModel(input_tensors)
    attributions = de.explain(method_name, target_tensor, input_tensors, [xs])

In [None]:
attr = attributions[0].reshape(-1,8,11)
mean_attr = np.mean(attr,axis=0)

In [None]:
attributes = pd.DataFrame(mean_attr).T
cols = df.columns.levels[0][:-5].values
d = {}
for i in range(11):
    d[i] = f'K={cols[i]}'

attributes = attributes.rename(d)
    
cols = df.columns.levels[1][:-5].values
d = {}
for i in range(8):
    d[i] = f'T={cols[i]}'
    
attributes = attributes.rename(d,axis=1)

In [None]:
attributes_=pd.DataFrame(attributes.stack()).T
cols = [f'T={i[1]} K={i[0]}' for i in df.iloc[0:2,:-5].columns.values]
attributes_.columns = cols
attributes_.abs().sort_values(0, axis=1,ascending=True).T.iloc[-20:].plot.barh(legend=False,figsize=(8,8))
#plt.savefig(f'gradinput_cnn_barh.png',  bbox_inches='tight')

In [None]:
plt.title(f'{title_map[method_name]} Attributions Heat Map')
sb.heatmap(attributes.abs().T)
plt.savefig(f'{method_name}_cnn_heatmap.png',  bbox_inches='tight')

## Integrated Gradient

In [None]:
X = df.iloc[:,:-5].stack().values.reshape(-1,8,11)
X = scaler.transform(X).reshape(-1,8,11,1)


#xs =zca_process(X)[0:1500].values
title_map = {
    'intgrad': 'Integrated Gradient'
}

In [None]:
xs =X[0:1500]
method_name = "intgrad"

In [None]:
session = tf.keras.backend.get_session()
with DeepExplain(session=session) as de:
    input_tensors = model.inputs
    fModel = tf.keras.models.Model(inputs = input_tensors, outputs = model.outputs)
    target_tensor = fModel(input_tensors)
    attributions = de.explain(method_name, target_tensor, input_tensors, [xs])

In [None]:
attr = attributions[0].reshape(-1,8,11)
mean_attr = np.mean(attr,axis=0)

In [None]:
attributes = pd.DataFrame(mean_attr).T
cols = df.columns.levels[0][:-5].values
d = {}
for i in range(11):
    d[i] = f'K={cols[i]}'

attributes = attributes.rename(d)
    
cols = df.columns.levels[1][:-5].values
d = {}
for i in range(8):
    d[i] = f'T={cols[i]}'
    
attributes = attributes.rename(d,axis=1)

In [None]:
attributes_=pd.DataFrame(attributes.stack()).T
cols = [f'T={i[1]} K={i[0]}' for i in df.iloc[0:2,:-5].columns.values]
attributes_.columns = cols
attributes_.abs().sort_values(0, axis=1,ascending=True).T.iloc[-20:].plot.barh(legend=False,figsize=(8,8))
#plt.savefig(f'gradinput_cnn_barh.png',  bbox_inches='tight')

In [None]:
plt.title(f'{title_map[method_name]} Attributions Heat Map')
sb.heatmap(attributes.abs().T)
plt.savefig(f'{method_name}_cnn_heatmap.png',  bbox_inches='tight')