# Load synthetic data

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import logging
import tensorflow as tf
import keras as K
from keras.callbacks import EarlyStopping

from tqdm import tqdm
tqdm.pandas()

from utils import heston_pricer
from utils import rBergomi_pricer

logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s', level=logging.INFO)

In [None]:
class FeatureScaler(object):
    def __init__(self, train):
        self.mean = train.mean(axis=0)
        self.std = train.std(axis=0)
    
    def transform(self, df):
        return (df - self.mean) / self.std
    
    def inverse_transform(self, df):
        return df * self.std + self.mean
    
    def get_params(self):
        return self.mean, self.std

In [None]:
def build_nn(input_length, hidden_layer_sizes = [20, 10, 5]):
    model = K.models.Sequential()
    kernel_init = K.initializers.RandomNormal(stddev=np.sqrt(2.0/input_length))
    model.add(K.layers.Dense(units=hidden_layer_sizes[0], activation='relu', kernel_initializer=kernel_init, 
                             input_shape=[input_length]))
    for layer_idx in range(1, len(hidden_layer_sizes)):
        kernel_init = K.initializers.RandomNormal(stddev=np.sqrt(2.0/hidden_layer_sizes[layer_idx-1]))
        model.add(K.layers.Dense(units=hidden_layer_sizes[layer_idx], kernel_initializer=kernel_init, activation='relu'))
    kernel_init = K.initializers.RandomNormal(stddev=np.sqrt(2.0/hidden_layer_sizes[-1]))
    model.add(K.layers.Dense(units=1, kernel_initializer=kernel_init, activation='linear'))
    return model

In [None]:
def train_nn(X_train, Y_train, X_val, Y_val, batch_size=128, n_epochs=50, hidden_layer_sizes=[20, 10, 5], verbose=1):
    n_features = X_train.shape[1]
    model = build_nn(n_features, hidden_layer_sizes)
    
    if verbose != 0:
        print("Neural Network Architechture:")
        print(model.summary())
        print('\n')
    
    model.compile(optimizer='adam', loss='mean_squared_error')

    early_stopping = EarlyStopping(monitor='val_loss', patience=3, verbose=verbose, mode='auto', restore_best_weights=True)
    train_log = model.fit(X_train, Y_train, batch_size=batch_size, epochs=n_epochs, verbose=verbose, 
                          validation_data=(X_val, Y_val), callbacks=[early_stopping])
    
    return model, train_log.history

In [None]:
def plot_learning_history(history):
    plt.figure()
    plt.plot(history['loss'], color='b', marker='o', label='train loss')
    plt.plot(history['val_loss'], color='r', marker='o', label='val loss')
    plt.xlabel("epochs")
    plt.ylabel('loss')
    plt.legend(loc=1)
    plt.show()

# Heston Model Neural Network

## Data Loading and preprocessing

Load generated synthetic data

In [None]:
train = pd.read_csv("./data/heston/ftse_0118_train.csv").dropna()
val = pd.read_csv("./data/heston/ftse_0118_val.csv").dropna()
test = pd.read_csv("./data/heston/ftse_0118_test.csv").dropna()

Split input features and labels

In [None]:
X_train_origin, Y_train = train.iloc[:, :-1], train.iloc[:, [-1]]
X_val_origin, Y_val = val.iloc[:, :-1], val.iloc[:, [-1]]
X_test_origin, Y_test = test.iloc[:, :-1], test.iloc[:, [-1]]

Normalize features

In [None]:
SCALER = FeatureScaler(X_train_origin)
X_train = SCALER.transform(X_train_origin)
X_val = SCALER.transform(X_val_origin)
X_test = SCALER.transform(X_test_origin)

## Neural Network

Define some hyper-parameters

In [None]:
# parameters
hidden_layer_sizes = [20, 10, 5]
batch_size = 128
n_epochs = 50

Build Neural Network, and train it

In [None]:
model, history = train_nn(X_train, Y_train, X_val, Y_val, batch_size, n_epochs, hidden_layer_sizes, verbose=1)

Plot learning history curve

In [None]:
plot_learning_history(history)

Evaluate model performance on test dataset

In [None]:
model.evaluate(X_test, Y_test)

Save model to local file for further use, or load a previously trained model

In [None]:
# from keras.models import load_model
# save model

model.save("heston.h5")

Study the quantiles of the relative error on test dataset

In [None]:
pred_test = model.predict(X_test).flatten()
true_test = Y_test.values.flatten()
relative_error = 100 * np.abs(pred_test - true_test) / true_test

In [None]:
sorted_re = sorted(relative_error)
q_90 = sorted_re[int(0.90*len(relative_error))]
q_95 = sorted_re[int(0.95*len(relative_error))]
q_99 = sorted_re[int(0.99*len(relative_error))]

In [None]:
plt.figure(figsize=(16, 8))
plt.hist(relative_error, bins=np.linspace(0, 15, 151), density=True, rwidth=0.5)
plt.xlim((0, 15))
sns.despine(left=True, bottom=True, right=True)
plt.grid(True)
plt.axvline(x=q_90, ls='--', color='orange', label=r"$q_{0.90}$"+"={:.2f}".format(q_90))
plt.axvline(x=q_95, ls='-.', color='purple', label=r"$q_{0.95}$"+"={:.2f}".format(q_95))
plt.axvline(x=q_99, ls=':', color='green', label=r"$q_{0.99}$"+"={:.2f}".format(q_99))
plt.legend(loc=1)
plt.xlabel("relative error")
plt.ylabel("frequency density")
plt.show()

Use a fixed parameter to generate the IV surface

In [None]:
log_moneyness = np.linspace(-0.1, 0.1, 21)
maturity = np.linspace(0.01, 0.18, 18)
log_moneyness, maturity = np.meshgrid(log_moneyness, maturity)

# columns ['Moneyness', 'Time to Maturity (years)', 'lambda', 'vbar', 'eta', 'rho', 'v0']
df = pd.DataFrame(columns=train.columns)
df['Moneyness'] = np.exp(log_moneyness.flatten())
df['Time to Maturity (years)'] = maturity.flatten()
df['eta'] = 0.3877
df['rho'] = -0.7165
df['lambda'] = 1.3253
df['v0'] = 0.0354
df['vbar'] = 0.0174

In [None]:
df['iv'] = df.progress_apply(lambda row: heston_pricer(row['lambda'], row['vbar'], row['eta'], row['rho'], row['v0'], 
                                                       0, 0, row['Time to Maturity (years)'], 1.0, row['Moneyness'])[1], 
                             axis=1)

In [None]:
scaled_features = SCALER.transform(df.iloc[:, :-1])

In [None]:
df['iv_nn'] = model.predict(scaled_features)

In [None]:
df['Log Moneyness'] = log_moneyness.flatten()
df['re'] = np.abs(df['iv_nn'] - df['iv']) / df['iv']

In [None]:
def plot_heatmap(data, values='re'):
    """ Plots the heatmap of the `values` column w.r.t. `Time to Maturity` and `Log Moneyness`.
    """
    data_sort = data.sort_values(values, ascending=True)
    data_pivot = data_sort.pivot(index='Time to Maturity (years)', columns='Log Moneyness', values=values)
    plt.figure(figsize=(10, 6))
    ax = sns.heatmap(data_pivot, cmap=plt.cm.Spectral, cbar=True, 
                     xticklabels=data_pivot.columns.values.round(2), 
                     yticklabels=data_pivot.index.values.round(2))
    ax.invert_yaxis()
    plt.tight_layout()
    plt.show()

Plot the heatmap of relative errors

In [None]:
plot_heatmap(df, 're')

Plot IV surface

In [None]:
import matplotlib.ticker as ticker
from mpl_toolkits.mplot3d import Axes3D

def plot_iv_surface(data, x="Log Moneyness", y='Time to Maturity (years)', z='iv'):
    """ Plots the IV surface
    """
    fig = plt.figure(figsize=(12, 6))
    ax = fig.gca(projection='3d')
    ax.azim = 120
    ax.elev = 13
    
    ax.set_xlabel(x)
    ax.set_ylabel(y)
    ax.set_zlabel(z)

    ax.invert_xaxis()
    ax.xaxis.set_major_formatter(ticker.FormatStrFormatter('%0.2f'))
    
    surf = ax.plot_trisurf(data[x], data[y], data[z], antialiased=True, cmap = plt.cm.Spectral)
    fig.colorbar(surf, shrink=0.7, aspect=10)
    
    plt.tight_layout()
    
    plt.show()

Firstly, the "true" IV surface

In [None]:
plot_iv_surface(df, z='iv')

Secondly, the "predicted" IV surface

In [None]:
plot_iv_surface(df, z='iv_nn')

----------

----------

# rBergomi Model Neural Network

## Data Loading and preprocessing

Load generated synthetic data

In [None]:
train = pd.read_csv("./data/rBergomi/ftse_0118_train.csv").dropna()
val = pd.read_csv("./data/rBergomi/ftse_0118_val.csv").dropna()
test = pd.read_csv("./data/rBergomi/ftse_0118_test.csv").dropna()

In [None]:
train.shape, val.shape, test.shape

In [None]:
train = train.drop(columns=['label','index'])
val = val.drop(columns=['label','index'])
test = test.drop(columns=['label','index'])

In [None]:
train = train[['H', 'Moneyness', 'Time to Maturity (years)', 'eta', 'rho', 'v0', 'iv']]
val = val[['H', 'Moneyness', 'Time to Maturity (years)', 'eta', 'rho', 'v0', 'iv']]
test = test[['H', 'Moneyness', 'Time to Maturity (years)', 'eta', 'rho', 'v0', 'iv']]

Split input features and labels

In [None]:
X_train_origin, Y_train = train.iloc[:, :-1], train.iloc[:, [-1]]
X_val_origin, Y_val = val.iloc[:, :-1], val.iloc[:, [-1]]
X_test_origin, Y_test = test.iloc[:, :-1], test.iloc[:, [-1]]

Normalize features

In [None]:
SCALER = FeatureScaler(X_train_origin)
X_train = SCALER.transform(X_train_origin)
X_val = SCALER.transform(X_val_origin)
X_test = SCALER.transform(X_test_origin)

## Neural Network

Define some hyper-parameters

In [None]:
# parameters
hidden_layer_sizes = [20, 10, 5]
batch_size = 128
n_epochs = 50

Build Neural Network and train it

In [None]:
model, history = train_nn(X_train, Y_train, X_val, Y_val, batch_size, n_epochs, hidden_layer_sizes, verbose=1)

Plot learning history curve

In [None]:
plot_learning_history(history)

Evaluate model performance on test dataset

In [None]:
model.evaluate(X_test, Y_test)

Save model to local file for further use

In [None]:
model.save("rBergomi.h5")

Study the quantiles of the relative error on test dataset

In [None]:
pred_test = model.predict(X_test).flatten()
true_test = Y_test.values.flatten()
relative_error = 100 * np.abs(pred_test - true_test) / true_test

In [None]:
sorted_re = sorted(relative_error)
q_90 = sorted_re[int(0.90*len(relative_error))]
q_95 = sorted_re[int(0.95*len(relative_error))]
q_99 = sorted_re[int(0.99*len(relative_error))]

In [None]:
plt.figure(figsize=(16, 8))
plt.hist(relative_error, bins=np.linspace(0, 15, 151), density=True, rwidth=0.5)
plt.xlim((0, 15))
sns.despine(left=True, bottom=True, right=True)
plt.grid(True)
plt.axvline(x=q_90, ls='--', color='orange', label=r"$q_{0.90}$"+"={:.2f}".format(q_90))
plt.axvline(x=q_95, ls='-.', color='purple', label=r"$q_{0.95}$"+"={:.2f}".format(q_95))
plt.axvline(x=q_99, ls=':', color='green', label=r"$q_{0.99}$"+"={:.2f}".format(q_99))
plt.legend(loc=1)
plt.xlabel("relative error")
plt.ylabel("frequency density")
plt.show()

Use a fixed parameter to generate the IV surface

In [None]:
log_moneyness = np.linspace(-0.1, 0.1, 21)
maturity = np.linspace(0.01, 0.18, 18)
log_moneyness, maturity = np.meshgrid(log_moneyness, maturity)

# columns ['Moneyness', 'Time to Maturity (years)', 'H', 'eta', 'rho', 'v0']
df = pd.DataFrame(columns=train.columns)
df['Moneyness'] = np.exp(log_moneyness.flatten())
df['Time to Maturity (years)'] = maturity.flatten()
df['H'] = 0.07
df['eta'] = 1.9
df['rho'] = -0.9
df['v0'] = 0.01

In [None]:
df = df[['H', 'Moneyness', 'Time to Maturity (years)', 'eta', 'rho', 'v0', 'iv']]
df.head()

In [None]:
df['iv'] = df.progress_apply(lambda row: rBergomi_pricer(row['H'], row['eta'], row['rho'], row['v0'], 
                                                         row['Time to Maturity (years)'], row['Moneyness'], 1.0)[1], 
                             axis=1)

In [None]:
scaled_features = SCALER.transform(df.iloc[:, :-1])
df['iv_nn'] = model.predict(scaled_features)
df['Log Moneyness'] = log_moneyness.flatten()
df['re'] = np.abs(df['iv_nn'] - df['iv']) / df['iv']
df.dropna(inplace=True)

In [None]:
plot_heatmap(df, 're')

In [None]:
plot_iv_surface(df, z='iv')

In [None]:
plot_iv_surface(df, z='iv_nn')

-----

-----

# Deep Calibration

This part implements the Deep calibration algorithm (LM combined with NN regression)

In [None]:
from utils import heston_pricer, rBergomi_pricer

Define a model parameters initializer, which will be used to initialize the model parameters before the LM calibrating loop

In [None]:
from scipy.stats import truncnorm

def model_parameters_initializer(model='heston', random_seed=None):
    """ Initialize model parameters
    """
    if model == 'heston':
        params = [
            10 * np.random.rand(), # lambda
            np.random.rand(), # vbar
            5 * np.random.rand(), # eta
            -1 * np.random.rand(), # rho
            np.random.rand() #v0
        ]
        names = ['lambda', 'vbar', 'eta', 'rho', 'v0']
    elif model == 'rbergomi' or model == 'rBergomi':
        params = [
            truncnorm.rvs(-1.2, 8.6, 0.07, 0.05), # H
            truncnorm.rvs(-3, 3, 2.5, 0.5), # eta
            truncnorm.rvs(-0.25, 2.25, -0.95, 0.2), # rho
            truncnorm.rvs(-2.5, 7, 0.3, 0.1) # v0
        ]
        names = ['H', 'eta', 'rho', 'v0']
    else:
        raise NameError("No such model name: {}".format(model))
    return params, names
params_, names_ = model_parameters_initializer(model='rBergomi', random_seed=None)
params_

Define a function to return the prediction and the Jacobian matrix of a neural network w.r.t. a specific input

In [None]:
def predict_label_jac(sess, model, test_inputs,model_name):
    """ Use a trained model to predict and to return jacobian matrix
    """
    if model_name=='heston':
        model = keras.models.load_model('heston.h5')
    else:
        model = keras.models.load_model('rBergomi.h5')
    print("Loaded Model from disk")
    #compile and evaluate loaded model
    model.compile(optimizer='adam', loss='mean_squared_error')
    pred = model.predict(test_inputs)
    grad_func = tf.gradients(model.output, model.input)
    jac = sess.run(grad_func, feed_dict={model.input: test_inputs})[0]
    return pred, jac

In [None]:
def deep_calibration(tf_sess, nn, K_T, market_quotes, model_name='heston', lambd_init=0.1, beta0=0.25, beta1=0.75, max_iter=1000, tol=1e-8):
    """ Combines LM algorithm with a NN regressor to calibrating model parameters.
    """
    # initialize model parameters
    params, param_names = model_parameters_initializer(model_name)
    
    # initalize learning step
    lambd = lambd_init
    
    n_samples = K_T.shape[0]
    n_params = len(params)
    I = np.eye(n_params)
    Q = market_quotes.reshape((-1, 1)) # shape: [n_samples, 1]
    K_T_values = K_T.values
    
    iter_count = 0
    
    # history to store some useful information during training 
    history = {
        'delta_params': {k: [] for k in param_names},
        'R': [],
        'lambda': [],
        'c_mu': []
    }
    
    # build a input dataframe by combining K_T and model parameters
    for i in range(len(param_names)):
        K_T[param_names[i]] = params[i]
    if model_name=='heston':

        input_data = K_T[['Moneyness','Time to Maturity (years)','lambda','vbar','eta','rho','v0']].values      ##np.insert(K_T_values, [2]*n_params, params, axis=1) # shape: [n_samples, n_params+2]
    else:
        input_data = K_T[['H', 'Moneyness', 'Time to Maturity (years)', 'eta', 'rho', 'v0']].values
    
    iv_nn, J = predict_label_jac(tf_sess, nn, input_data,model_name)
    R = iv_nn - Q # shape: [n_samples, 1]
    J = J[:, 2:] # shape: [n_samples, n_params], excluding K and T
    delta_params = np.linalg.pinv(J.T.dot(J) + lambd * I).dot(J.T.dot(R)).flatten() # vector size: [n_params,]
    
    history['R'].append(np.linalg.norm(R))
    history['lambda'].append(lambd)
    for param_idx, param_name in enumerate(param_names):
        history['delta_params'][param_name].append(delta_params[param_idx])

    while iter_count < max_iter and np.linalg.norm(delta_params) > tol:
        if iter_count % 50 == 0:
            logging.info("{}/{} iteration".format(iter_count+1, max_iter))
        params_new = params - delta_params
        input_data_new = np.insert(K_T_values, [2]*n_params, params_new, axis=1)
        iv_nn_new, J_new = predict_label_jac(tf_sess, nn, input_data_new,model_name)
        R_new = iv_nn_new - Q
        J_new = J_new[:, 2:]
        R_norm = np.linalg.norm(R)
        c_mu = (R_norm - np.linalg.norm(R_new)) / (R_norm - np.linalg.norm(R - J.dot(delta_params)))
        
        history['c_mu'].append(c_mu)
        
        if c_mu <= beta0:
            # reject delta_params
            lambd *= 2 # too slow, use greater lambd
        else:
            params = params_new
            R = R_new
            J = J_new
        if c_mu >=beta1:
            lambd /= 2.0
        
        delta_params = np.linalg.pinv(J.T.dot(J) + lambd * I).dot(J.T.dot(R)).flatten() # vector size: [n_params, ]
        iter_count += 1
        
        history['R'].append(np.linalg.norm(R))
        history['lambda'].append(lambd)
        for param_idx, param_name in enumerate(param_names):
            history['delta_params'][param_name].append(delta_params[param_idx])
    if iter_count < max_iter:
        logging.info("Leave iterations after {} iters".format(iter_count))
        
    return dict(zip(param_names, params)), history

In [None]:
tf.compat.v1.disable_eager_execution()
sess = tf.compat.v1.InteractiveSession()
sess.run(tf.compat.v1.global_variables_initializer())

In [None]:
K_T = X_test.iloc[:500,:][['Moneyness','Time to Maturity (years)']]
K_T_origin = X_test_origin.iloc[:500, :][['Moneyness','Time to Maturity (years)']]

In [None]:
# Market parameters
S0 = 1
r = 0 

# Heston parameters 
lambd = 1.3253 
vbar = 0.0354 
eta = 0.3877 
rho = -0.7165 
v0 = 0.0174 
q = 0

market_quotes = np.array([heston_pricer(lambd, vbar, eta, rho, v0, r, q, K_T_origin.iloc[i, 1], S0, K_T_origin.iloc[i, 0])[1] for i in range(K_T.shape[0])])
market_quotes = market_quotes.reshape((-1, 1))

In [None]:
# df.progress_apply(lambda row: rBergomi_pricer(row['H'], row['eta'], row['rho'], row['v0'], 
#                                                          row['Time to Maturity (years)'], row['Moneyness'], 1.0)[1], 
#                              axis=1)

S0 = 1                            
eta = 1.9
rho = -0.9
H = 0.07
v0 = 0.01
market_quotes = np.array([rBergomi_pricer(H, eta, rho, v0, K_T_origin.iloc[i, 1], K_T_origin.iloc[i, 0], S0)[1] for i in range(K_T.shape[0])])
market_quotes = market_quotes.reshape((-1, 1))

In [None]:
valid_idx = ~np.isnan(market_quotes.flatten())
K_T_input = K_T.iloc[valid_idx, :]
market_quotes = market_quotes[valid_idx, :]

In [None]:
import keras as K
import keras
params, history = deep_calibration(sess, model, K_T_input, market_quotes, model_name='rbergomi', lambd_init=0.01, beta0=0.25, beta1=0.75, max_iter=1000)

In [None]:
mu_hat = pd.DataFrame([params.values()], columns=params.keys())
mu_hat['Moneyness'] = 0
mu_hat['Time to Maturity (years)'] = 0
SCALER.inverse_transform(mu_hat[['H', 'Moneyness', 'Time to Maturity (years)', 'eta', 'rho', 'v0']])


In [None]:
class FeatureScaler(object):
    def __init__(self, train):
        self.mean = train.mean(axis=0)
        self.std = train.std(axis=0)
    
    def transform(self, df):
        return (df - self.mean) / self.std
    
    def inverse_transform(self, df):
        return df * self.std + self.mean
    
    def get_params(self):
        return self.mean, self.std

SCALER.get_params()