In [None]:
import pandas as pd
import scipy.stats as si
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

### Load libraries ###

# interactive plotting
%matplotlib inline
%config InlineBackend.figure_format = 'svg' # ‘png’, ‘retina’, ‘jpeg’, ‘svg’, ‘pdf’

# plotting libraries
import seaborn as sns
import matplotlib.pyplot as plt
sns.set_theme()


# Data management libraries
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Machine learning libraries
import math
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.metrics import accuracy_score
from sklearn.compose import ColumnTransformer
from sklearn.neural_network import  MLPClassifier, MLPRegressor

# XAI libraries
import dalex as dx
from lime.lime_tabular import LimeTabularExplainer 
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score

# Other libraries
# from utils import plotModelGridError, confusion_matrix
from neuralsens import partial_derivatives as ns
from sklearn import set_config
set_config(display='diagram')

### 1. Data load and split

In [None]:
def black_scholes_call_option(S, K, T, r, sigma):
    """
    Calculate the Black-Scholes price of a European call option.
    
    Parameters:
    S: Current stock price.
    K: Strike price of the option.
    T: Time to expiration in years.
    r: Risk-free interest rate (annualized).
    sigma: Volatility of the underlying stock (annualized).
    Returns:
        The Black-Scholes price of the call option.
    """
    d1 = (np.log(S / K) + (r + 0.5 * sigma ** 2) * T) / (sigma * np.sqrt(T))
    d2 = d1 - sigma * np.sqrt(T)
    
    call_price = (S * si.norm.cdf(d1, 0.0, 1.0) - 
                  K * np.exp(-r * T) * si.norm.cdf(d2, 0.0, 1.0))
    
    return call_price


In [None]:
df = pd.read_csv('../data/processed/calls_2025_05_28.csv')
df_final = df[['price', 'strike', 'T', 'impliedVolatility', 'midPrice']].copy()

# Rename columns for clarity
df_final.rename(columns={
    'price': 'S',  # Current stock price
    'strike': 'K',  # Strike price
    'T': 'T',  # Time to expiration in years
    'impliedVolatility': 'sigma',  # Volatility of the underlying stock
    'midPrice': 'call_price'  # Black-Scholes price of the call option
}, inplace=True)

In [None]:
# how many with sigma == 0
print(f"Number of options with sigma == 0: {df_final[df_final['sigma'] == 0].shape[0]}")

# Filter out rows where sigma is zero
df_final = df_final[df_final['sigma'] > 0].reset_index(drop=True)

In [None]:
df_final.info()

In [None]:
INPUTS = df_final.columns.drop('call_price').tolist()
TARGET = 'call_price'

X = df_final[INPUTS]
y = df_final[TARGET]

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
## Create dataset to store model predictions
dfTR_eval = X_train.copy()
dfTR_eval['midPrice'] = y_train
dfTS_eval = X_test.copy()
dfTS_eval['midPrice'] = y_test

### 2. MLP

#### A. Train model

In [None]:
numeric_transformer = Pipeline(steps=[('scaler', StandardScaler())])

# Create a preprocessor to perform the steps defined above
preprocessor = ColumnTransformer(transformers=[
        ('num', numeric_transformer, INPUTS)
        ])

param = {'MLP__alpha': [0.001], # Initial value of regularization
         'MLP__hidden_layer_sizes':[(80,)],
         'MLP__learning_rate_init': [0.01],
         'MLP__activation': ['tanh']
}

pipe = Pipeline([
        ('preprocessor', preprocessor),
        ('MLP', MLPRegressor(solver='adam', 
                max_iter=2000, # Maximum number of iterations
                tol=1e-4, # Tolerance for the optimization
                random_state=150,
                verbose = True))]) # For replication

# We use Grid Search Cross Validation to find the best parameter for the model in the grid defined 
nFolds = 10
MLP_fit = GridSearchCV(estimator=pipe, # Structure of the model to use
                       param_grid=param, # Defined grid to search in
                       n_jobs=-1, # Number of cores to use (parallelize)
                       scoring='neg_mean_squared_error', # RMSE https://scikit-learn.org/stable/modules/model_evaluation.html
                       cv=nFolds) # Number of Folds 
MLP_fit.fit(X_train[INPUTS], y_train) # Search in grid


# Save the model
import joblib
joblib.dump(MLP_fit, '../models/MLP_simple_vs_BS_v2.pkl')

#### B. Load Model

In [None]:
import joblib

# Cargamos el modelo guardado
MLP_fit = joblib.load('../models/MLP_simple_vs_BS_v2.pkl')

### 3. MLP vs Black-Scholes

#### A. Graphic

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import matplotlib.cm as cm
import matplotlib.colors as mcolors

def plotModelGridError(MLP_fit):
    results = MLP_fit.cv_results_
    mean_test_scores = results['mean_test_score']
    params = results['params']

    # Convertir a error (RMSE)
    errors = np.sqrt(-mean_test_scores)

    # Etiquetas incluyendo todos los hiperparámetros relevantes
    param_labels = [
        f"act: {p['MLP__activation']}, alpha: {p['MLP__alpha']}, size: {p['MLP__hidden_layer_sizes']}, lr: {p['MLP__learning_rate_init']}"
        for p in params
    ]

    # Obtener todos los learning rates únicos para codificarlos por color
    lrs = [p['MLP__learning_rate_init'] for p in params]
    unique_lrs = sorted(set(lrs))
    lr_color_map = {lr: cm.viridis(i / len(unique_lrs)) for i, lr in enumerate(unique_lrs)}
    bar_colors = [lr_color_map[lr] for lr in lrs]

    # Ordenar por error creciente
    sorted_indices = np.argsort(errors)
    errors_sorted = errors[sorted_indices]
    param_labels_sorted = [param_labels[i] for i in sorted_indices]
    bar_colors_sorted = [bar_colors[i] for i in sorted_indices]

    # Crear el gráfico
    plt.figure(figsize=(14, 10))
    bars = plt.barh(param_labels_sorted, errors_sorted, color=bar_colors_sorted)
    plt.xlabel("RMSE")
    plt.title("Errores del Modelo (RMSE) según Grid Search, agrupados por Learning Rate")
    plt.gca().invert_yaxis()
    plt.tight_layout()

    # Crear leyenda manual
    legend_handles = [
        plt.Rectangle((0, 0), 1, 1, color=lr_color_map[lr]) for lr in unique_lrs
    ]
    legend_labels = [f"lr: {lr}" for lr in unique_lrs]
    plt.legend(legend_handles, legend_labels, title="Learning Rate", loc="lower right")

    plt.show()

In [None]:
plotModelGridError(MLP_fit)
# La mejor combinación es ReLu, alpha=0.001, size=(60,), lr=0.1

#### B. Comparison of metrics

In [None]:
# Calculate the predictions on the training set using Black-Scholes formula
dfTR_eval['BS_predict'] = black_scholes_call_option(
    dfTR_eval['S'],
    dfTR_eval['K'],
    dfTR_eval['T'],
    0.045,  # Assuming a risk-free interest rate of 5%
    dfTR_eval['sigma']
)

# Calculate the predictions on the test set using Black-Scholes formula
dfTS_eval['BS_predict'] = black_scholes_call_option(
    dfTS_eval['S'],
    dfTS_eval['K'],
    dfTS_eval['T'],
    0.045,  # Assuming a risk-free interest rate of 5%
    dfTS_eval['sigma']
)


In [None]:
dfTR_eval['MLP_pred'] = MLP_fit.predict(X_train[INPUTS])
dfTS_eval['MLP_pred'] = MLP_fit.predict(X_test[INPUTS])

In [None]:
#Training and test MAE - Mean Absolute error
print('MLP Predictions')
print('Training MAE:',mean_absolute_error(dfTR_eval['midPrice'], dfTR_eval['MLP_pred']))
print('Test MAE:',mean_absolute_error(dfTS_eval['midPrice'], dfTS_eval['MLP_pred']))
#Training and test RMSE - Root Mean Square Error
print('Training RMSE:',math.sqrt(mean_squared_error(dfTR_eval['midPrice'], dfTR_eval['MLP_pred'])))
print('Test RMSE:',math.sqrt(mean_squared_error(dfTS_eval['midPrice'], dfTS_eval['MLP_pred'])))
#Training and test r^2 
print('Training R2:',r2_score(dfTR_eval['midPrice'], dfTR_eval['MLP_pred']))
print('Test R2:',r2_score(dfTS_eval['midPrice'], dfTS_eval['MLP_pred']))

In [None]:
print('Black-Scholes Predictions')
#Training and test MAE - Mean Absolute error
print('Training MAE:',mean_absolute_error(dfTR_eval['midPrice'], dfTR_eval['BS_predict']))
print('Test MAE:',mean_absolute_error(dfTS_eval['midPrice'], dfTS_eval['BS_predict']))
#Training and test RMSE - Root Mean Square Error
print('Training RMSE:',math.sqrt(mean_squared_error(dfTR_eval['midPrice'], dfTR_eval['BS_predict'])))
print('Test RMSE:',math.sqrt(mean_squared_error(dfTS_eval['midPrice'], dfTS_eval['BS_predict'])))
#Training and test r^2 
print('Training R2:',r2_score(dfTR_eval['midPrice'], dfTR_eval['BS_predict']))
print('Test R2:',r2_score(dfTS_eval['midPrice'], dfTS_eval['BS_predict']))

#### C. Case to case comparison

In [None]:
import numpy as np
dfTS_eval['MLP_diff'] = np.abs(dfTS_eval['midPrice'] - dfTS_eval['MLP_pred'])
dfTS_eval['BS_diff'] = np.abs(dfTS_eval['midPrice'] - dfTS_eval['BS_predict'])
dfTS_eval['MLP_better'] = dfTS_eval['MLP_diff'] < dfTS_eval['BS_diff']

# Casos en los que MLP es mejor
mlp_better_cases = dfTS_eval[dfTS_eval['MLP_better']]
bs_better_cases = dfTS_eval[~dfTS_eval['MLP_better']]

# Diferencias de error entre ambos modelos
mlp_margin = bs_better_cases['BS_diff'] - bs_better_cases['MLP_diff']
bs_margin = mlp_better_cases['MLP_diff'] - mlp_better_cases['BS_diff']

# Estadísticas descriptivas
print("Cuando BS gana:")
print("   Media de mejora:", mlp_margin.mean())
print("   Mediana de mejora:", mlp_margin.median())

print("Cuando MLP gana:")
print("   Media de mejora:", bs_margin.mean())
print("   Mediana de mejora:", bs_margin.median())

In [None]:
from utils import plotModelDiagnosis
plotModelDiagnosis(dfTR_eval, 'MLP_pred', 'midPrice')

### 4. Neuralsense

In [None]:
mlp = MLP_fit.best_estimator_['MLP']
wts = mlp.coefs_
bias = mlp.intercepts_
actfunc = ['identity',MLP_fit.best_estimator_['MLP'].get_params()['activation'],mlp.out_activation_]
X = MLP_fit.best_estimator_['preprocessor'].transform(X_train) # Preprocess the variables
coefnames = MLP_fit.best_estimator_['preprocessor'].get_feature_names_out(INPUTS)


In [None]:
X = pd.DataFrame(X, columns=coefnames)
y = pd.DataFrame(y_train, columns=[TARGET])
sens_end_layer = 'last'
sens_end_input = False
sens_origin_layer = 0
sens_origin_input = True

In [None]:
sensmlp = ns.jacobian_mlp(wts, bias, actfunc, X, y,)

In [None]:
sensmlp.summary()

In [None]:
sensmlp.info()

In [None]:
sensmlp.plot()

In [None]:
ns.alpha_sens_curves(sensmlp)

In [None]:
df_partDeriv = sensmlp.raw_sens[0]

In [None]:
df_partDeriv.info()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

def plot_partial_derivative_distributions(df):
    """
    Dibuja histogramas con líneas de media y mediana para cada derivada parcial
    contenida en un DataFrame, útil para detectar outliers o anomalías.

    Args:
        df (pd.DataFrame): DataFrame con columnas como num__S, num__K, etc.
    """
    plt.figure(figsize=(16, 10))
    for i, column in enumerate(df.columns, 1):
        plt.subplot(2, 2, i)
        sns.histplot(df[column], kde=True, bins=100)
        plt.axvline(df[column].mean(), color='red', linestyle='--', label=f"Media: {df[column].mean():.2f}")
        plt.axvline(df[column].median(), color='green', linestyle=':', label=f"Mediana: {df[column].median():.2f}")
        plt.title(f"Distribución de ∂Precio/∂{column.split('__')[-1]}")
        plt.xlabel("Valor de la derivada parcial")
        plt.ylabel("Frecuencia")
        plt.legend()
    plt.tight_layout()
    plt.show()

In [None]:
plot_partial_derivative_distributions(df_partDeriv)

In [None]:
def get_outliers_std(df, column, n_std=3):
    media = df[column].mean()
    std = df[column].std()
    return df[np.abs(df[column] - media) > n_std * std]

In [None]:
outliers_sigma = get_outliers_std(df_partDeriv, 'num__sigma', n_std=3)

## get a subdataframe with the outliers using the positional index in outlier_indices
outlier_rows = dfTR_eval.iloc[outliers_sigma.index]
# Display the outlier rows
outlier_rows.head(50)




In [None]:
outlier_rows[outlier_rows['sigma']== 0].head(50)

In [None]:
df = pd.read_csv('../data/raw/sp500_calls_2025_05_28.csv')
df.info()

In [None]:
df[df['impliedVolatility']0].info()