Explainable AI = XAI

João A. Masiero 06/02/23

- Shapley values, importância das variáveis na modelagem

In [None]:
#Imports
import numpy as np
from numpy import sqrt, log, exp, pi
import pandas as pd
import scipy.stats
from scipy.stats import norm

from sklearn.model_selection import train_test_split 
from sklearn.neural_network import MLPRegressor, MLPClassifier, BernoulliRBM
from sklearn import metrics
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

from math import sqrt

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px
import matplotlib.pyplot as plt

from numpy import mean, absolute
import yfinance as yf

from datetime import datetime, timedelta  

from statsmodels.tsa.stattools import pacf, acf

from numpy_ext import rolling_apply as rolling_apply_ext

import pyfolio as pf

import seaborn as sns
sns.set(style = "darkgrid", context = "talk", palette = "rainbow")
import shap # v0.39.0
shap.initjs()

import warnings
warnings.filterwarnings("ignore")

import pickle


In [None]:
# Get the data

#Janela temporal#
inicio = "01-01-2013"
fim = "01-01-2023"

ticker1 = "CCMFUT" 
df0 = pd.read_excel("CCM_Historico.xlsx", names=["Date", "Open", "High", "Low", "Adj Close"]).set_index("Date")
df0 = df0.replace({',': '.'}, regex=True) #substitui "," por "."
df1 = df0.loc[inicio:fim].sort_index(ascending=True).astype(float)
df1["Returns"] = df1["Adj Close"].pct_change(1)
df1["Vol"] = df1["Returns"].rolling(20).std()*np.sqrt(252)
df1.dropna(axis = 0, inplace = True) 

#Funções úteis

def get_acf1(x):
    return acf(x, alpha = 0.05, nlags = 5)[0][1]

def zscore(x):
    return ((x - x.mean())/np.std(x))[-1]

def next_business_day(date):
    if isinstance(date, str):
        date = pd.to_datetime(date)
    business_days = pd.date_range(start=date, end=date + pd.DateOffset(weeks=1), freq='B')
    return business_days.min()

# Variáveis - simples, mas funcionais

df1["f1"] = rolling_apply_ext(get_acf1, 20, df1["Returns"])
df1["f2"] = rolling_apply_ext(zscore, 10, df1["Returns"])

df1["MA5"] = df1["Adj Close"].rolling(5).mean()
df1["MA10"] = df1["Adj Close"].rolling(10).mean()
df1["MA20"] = df1["Adj Close"].rolling(20).mean()
df1["MA52"] = df1["Adj Close"].rolling(52).mean()

df1["f3"] = df1["Adj Close"]/df1["MA5"]-1
df1["f4"] = df1["Adj Close"]/df1["MA10"]-1
df1["f5"] = df1["Adj Close"]/df1["MA20"]-1
df1["f6"] = df1["Adj Close"]/df1["MA52"]-1

df1["f7"] = df1["Returns"].rolling(5).std()
df1["f8"] = df1["Returns"].rolling(10).std()
df1["f9"] = df1["Returns"].rolling(20).std()
df1["f10"] = df1["Returns"].rolling(52).std()

df1["f11"] = df1["Returns"].shift(1)
df1["f12"] = df1["Returns"].shift(2)
df1["f13"] = df1["Returns"].shift(3)
df1["f14"] = df1["Returns"].shift(4)
df1["f15"] = df1["Returns"].shift(5)
df1["f16"] = df1["Returns"].shift(6)
df1["f17"] = df1["Returns"].shift(7)
df1["f18"] = df1["Returns"].shift(8)
df1["f19"] = df1["Returns"].shift(9)
df1["f20"] = df1["Returns"].shift(10)

df1["f21"] = np.where(df1["Vol"] < 0.15, 1
                     , np.where(df1["Vol"] > 0.20, -1, 0))

previsao = df1.iloc[-6:].copy()
ultima = df1["Adj Close"].iloc[-6:-5]
today = df1["Adj Close"][-1]
#print(df1.tail())

# Construção do alvo
p = 5
# Alvo 1 - Volatilidade em 5 dias para frente
df1["Alvo1"] = df1["Adj Close"].shift(-p)
df1["Alvo2"] = np.where(df1["Alvo1"] > df1["Adj Close"], 1, 0)
df1["Alvo3"] = df1["Adj Close"].pct_change(p).shift(-p)

df1.dropna(inplace = True)

df1

In [None]:
# Separando os dados com as variaveis em x e o alvo em y

# Separando os dados entre treinamento e teste

# Vamos treinar com 5 anos
start_train = "2014-01-01"
end_train = "2018-12-31"

# Vamos testar com 4 anos
start_test = "2019-01-01"
end_test = "2023-12-31"

df1_train = df1.loc[start_train : end_train]

df1_test = df1.loc[start_test : end_test]


# Separando os dados com as variaveis em x e o alvo em y
 
manter = ["f3", "f4", "f5", "f6", "f8", "f10"
          , "f12", "f13", "f14", "f16", "f17", "f18", "f20"
          , "f19"
          , "f21"
          , "f1", "f2", "Returns"
         ]

x_train = df1_train[manter]
y_train = df1_train["Alvo2"]

x_test = df1_test[manter]
y_test = df1_test["Alvo2"]

print(x_train.shape)
print(x_test.shape)

In [None]:
# Treinando o modelo

model = MLPClassifier(hidden_layer_sizes = (10, 100), max_iter = 2000
                     , solver = "adam", verbose = 10, tol = 1e-8, random_state = 42
                     , learning_rate_init = .001 , alpha = .0001
                     , activation = "relu"
                     , epsilon = 1e-9
                    )

model.fit(x_train, y_train) # essa é a linha que treina o modelo!!!!

#pickle.dump(model, open("/Users/leandroguerra/Library/CloudStorage/OneDrive-Personale/OM Quant Services/lab-of-ideas/vol-forecasting.pkl", "wb"))

In [None]:
# Model evaluation - no cut-off
y_pred_test = model.predict(x_test)
df1["pred"] = model.predict(df1[manter])

print(classification_report(y_test, y_pred_test))

In [None]:
custo_op = 0.001

df1.loc[: , "Model_Return"] = np.where(((df1.loc[: , "pred"] == 1) & (df1.loc[: , "Vol"] < 0.15))
                                       , df1["Alvo3"] - custo_op
                                       ,  np.where(((df1.loc[: , "pred"] == 0) & (df1.loc[: , "Vol"] > 0.25))
                                       , -df1["Alvo3"] - custo_op, 0))

df1.loc[: , "Model_Return_Acc"] = df1["Model_Return"].cumsum()*100


fig = make_subplots(rows = 1, cols = 1
                    , shared_xaxes = True
                    , vertical_spacing = 0.05)

fig.add_trace(go.Scatter(x = df1.index, y = df1["Model_Return_Acc"]
                                , name = "OM Model"
                                , line = dict(color = "blue"))
              , row = 1, col = 1)



fig.add_vline(x = end_train, line_width = 3, line_dash="dash", line_color = "black")

fig.update_layout(height = 600, width = 800
                  , title_text = "Forecasting " + ticker1 + " - Accumulated Returns"
                  , font_color = "blue"
                  , title_font_color = "black"
                  , xaxis_title = "Time"
                  , yaxis_title = "Accumulated returns (%)"
                  , font = dict(size = 15, color = "Black")
                 )

fig.update_layout(hovermode = "x unified")

# Code to exclude empty dates from the chart
dt_all = pd.date_range(start = df1.index[0]
                       , end = df1.index[-1]
                       , freq = "D")
dt_all_py = [d.to_pydatetime() for d in dt_all]
dt_obs_py = [d.to_pydatetime() for d in df1.index]

dt_breaks = [d for d in dt_all_py if d not in dt_obs_py]

fig.update_xaxes(
    rangebreaks = [dict(values = dt_breaks)]
)


fig.show()

In [None]:
df2 = df1.copy()
df2.index.name = "Data"
df2.reset_index(inplace = True)

df2["Data"] = pd.to_datetime(df2["Data"])

df2["train_test"] = np.where(df2["Data"] > end_train, 1, -1)

base_agregada = df2.resample("D", on = "Data").sum()

base_agregada.loc[: , "Model_Return_Acc"] = base_agregada["Model_Return"].cumsum()*100

summary = df2.copy()
summary["Data"] = pd.to_datetime(summary["Data"], format = "%Y-%m")

summary = summary.groupby([summary["Data"].dt.year]).agg({"Model_Return": sum})

summary.index = summary.index.set_names(["Ano"])

summary*100


In [None]:
# Model Results

resultados = pd.DataFrame({'Real': y_test, 'Previsto': y_pred_test})

print("-"*70)
print("Last measured value before a prediction on the day " + str(previsao.index[0].strftime("%Y-%m-%d")))
print("%.2f" % ultima)
print("Predicted move for " + ticker1 + " on that day was")
if (model.predict(previsao[manter].iloc[-6:-5].values.reshape(1, -1))[0] == 0) & (ultima > 30).values[0]:
    print("Down")
elif (model.predict(previsao[manter].iloc[-6:-5].values.reshape(1, -1))[0] == 0) & (ultima < 30).values[0]:
    print("Do nothing")
else:
    print("Buy")
    
print("Value today " + str(datetime.today().strftime("%Y-%m-%d")))
print("%.2f" % today)
print("")
print("#"*70)
print("Next prediction for " + ticker1 + " on the day " + str(next_business_day((datetime.today() + timedelta(days = 5))).strftime("%Y-%m-%d")))
if (model.predict(previsao[manter].iloc[-1].values.reshape(1, -1))[0] == 0) & (today > 30):
    print("Down")
elif (model.predict(previsao[manter].iloc[-1].values.reshape(1, -1))[0] == 0) & (today < 30):
    print("Do nothing")
else:
    print("Buy")
print("-"*70)

In [None]:
bt_returns = df1.loc[start_test : end_test]["Model_Return"]

In [None]:
plt.figure(figsize = (10,8))
pf.plot_drawdown_underwater(bt_returns);

In [None]:
# Backtest Evaluation functions

def rolling_sharpe(returns, window):
    """
    Calculates the rolling Sharpe ratio for a given dataset.
    
    Parameters:
        - returns: a pandas Series or DataFrame of returns
        - window: the number of periods to use in the rolling window calculation
        
    Returns:
        - a pandas Series or DataFrame of the rolling Sharpe ratio
    """
    # calculate the mean return
    mean_return = returns.rolling(window=window).mean()
    # calculate the standard deviation of returns
    std_return = returns.rolling(window=window).std()
    # calculate the sharpe ratio
    sharpe_ratio = mean_return / std_return
    return sharpe_ratio.dropna()

import pandas as pd

def rolling_volatility(returns, window):
    """
    Calculates the rolling volatility of returns for a given dataset.
    
    Parameters:
        - returns: a pandas Series or DataFrame of returns
        - window: the number of periods to use in the rolling window calculation
        
    Returns:
        - a pandas Series or DataFrame of the rolling volatility of returns
    """
    # calculate the standard deviation of returns
    rolling_std = returns.rolling(window=window).std()
    return rolling_std.dropna()

In [None]:
r_sr = rolling_sharpe(bt_returns, 180)

fig = make_subplots(rows = 1, cols = 1
                    , shared_xaxes = True
                    , vertical_spacing = 0.05)

fig.add_trace(go.Scatter(x = r_sr.index, y = r_sr
                                , name = "OM Model"
                                , line = dict(color = "blue"))
              , row = 1, col = 1)

fig.add_hline(y = r_sr.mean(), line_width = 3, line_dash="dash", line_color = "black")

fig.update_layout(height = 600, width = 800
                  , title_text = "Vol Forecasting - Rolling Sharpe Ratio (6 months)"
                  , font_color = "blue"
                  , title_font_color = "black"
                  , xaxis_title = "Time"
                  , yaxis_title = "Sharpe Ratio"
                  , font = dict(size = 15, color = "Black")
                 )

fig.update_layout(hovermode = "x unified")

# Code to exclude empty dates from the chart
dt_all = pd.date_range(start = resultados.index[0]
                       , end = resultados.index[-1]
                       , freq = "D")
dt_all_py = [d.to_pydatetime() for d in dt_all]
dt_obs_py = [d.to_pydatetime() for d in resultados.index]

dt_breaks = [d for d in dt_all_py if d not in dt_obs_py]

fig.update_xaxes(
    rangebreaks = [dict(values = dt_breaks)]
)


fig.show()

In [None]:
r_vol = rolling_volatility(bt_returns, 180)*100
fig = make_subplots(rows = 1, cols = 1
                    , shared_xaxes = True
                    , vertical_spacing = 0.05)

fig.add_trace(go.Scatter(x = r_vol.index, y = r_vol
                                , name = "OM Model"
                                , line = dict(color = "blue"))
              , row = 1, col = 1)

fig.add_hline(y = r_vol.mean(), line_width = 3, line_dash="dash", line_color = "black")

fig.update_layout(height = 600, width = 800
                  , title_text = "Vol Forecasting - Rolling Volatility (6 months)"
                  , font_color = "blue"
                  , title_font_color = "black"
                  , xaxis_title = "Time"
                  , yaxis_title = "Volatility"
                  , font = dict(size = 15, color = "Black")
                 )

fig.update_layout(hovermode = "x unified")

# Code to exclude empty dates from the chart
dt_all = pd.date_range(start = resultados.index[0]
                       , end = resultados.index[-1]
                       , freq = "D")
dt_all_py = [d.to_pydatetime() for d in dt_all]
dt_obs_py = [d.to_pydatetime() for d in resultados.index]

dt_breaks = [d for d in dt_all_py if d not in dt_obs_py]

fig.update_xaxes(
    rangebreaks = [dict(values = dt_breaks)]
)


fig.show()

In [None]:
explainer = shap.Explainer(model.predict, x_test)
shap_test = explainer(x_test)
print(f"Shap values length: {len(shap_test)}\n")
print(f"Sample shap value:\n{shap_test[0]}")

In [None]:
print(f"Average target value (training data): {y_train.mean():.2f}")
print(f"Base value: {np.unique(shap_test.base_values)[0]:.2f}")

In [None]:
shap_df = pd.DataFrame(shap_test.values, 
                       columns=shap_test.feature_names, 
                       index=x_test.index)

In [None]:
# Visualisations to understand the feature contribution
columns = shap_df.apply(np.abs).mean()\
                 .sort_values(ascending=False).index
fig, ax = plt.subplots(1, 2, figsize=(15, 10))
sns.barplot(data=shap_df[columns].apply(np.abs), orient='h', 
            ax=ax[0])
ax[0].set_title("Mean absolute shap value")
sns.boxplot(data=shap_df[columns], orient='h', ax=ax[1])
ax[1].set_title("Distribution of shap values");

In [None]:
shap.plots.bar(shap_test)

In [None]:
shap.summary_plot(shap_test)

In [None]:
max_trade = np.where(shap_df.index.isin([df1.loc[start_test : end_test][["Model_Return"]].idxmax().values[0]]))[0][0]

In [None]:
min_trade = np.where(shap_df.index.isin([df1.loc[start_test : end_test][["Model_Return"]].idxmin().values[0]]))[0][0]

In [None]:
# Plots for understanding predictions for individual cases
shap.plots.bar(shap_test[max_trade])

In [None]:
# Plots for understanding predictions for individual cases
shap.plots.bar(shap_test[min_trade])

In [None]:
class WaterfallData():
    def __init__ (self, shap_test, index):
        self.values = shap_test[index].values
        self.base_values = shap_test[index].base_values[0]
        self.data = shap_test[index].data
        self.feature_names = shap_test.feature_names
shap.plots.waterfall(shap_test[max_trade])

In [None]:
class WaterfallData():
    def __init__ (self, shap_test, index):
        self.values = shap_test[index].values
        self.base_values = shap_test[index].base_values[0]
        self.data = shap_test[index].data
        self.feature_names = shap_test.feature_names
shap.plots.waterfall(shap_test[min_trade])

In [None]:
shap.plots.force(shap_test[max_trade])

In [None]:
shap.plots.force(shap_test[min_trade])