In [None]:
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt
import pandas as pd
from scipy.stats import norm
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler, StandardScaler, MinMaxScaler
from keras.models import Sequential
from keras.layers import Dense, Dropout, LeakyReLU, Lambda
from keras.optimizers import Adam
from keras.regularizers import l2
import time






In [None]:

print(tf.config.list_physical_devices('GPU'))






## Option prices and BS-Price are divided by K

In [None]:

def CheckAccuracy(y_true, y_pred):
    stats = dict()
    stats['diff'] = y_true - y_pred
    stats['rmse'] = np.sqrt(np.mean(stats['diff']**2))
    print("Root Mean Squared Error:   " , stats['rmse'])
    stats['mape'] = np.mean(np.abs(stats['diff'] / y_true)) 
    print("Mean Absolute Percentage Error:   " , stats['mape'])
    stats['mse'] = np.mean(stats['diff']**2)
    print("Mean Squared Error:   " , stats['mse'])
    stats['mae'] = np.mean(np.abs(stats['diff']))
    print("Mean Absolute Error:   " , stats['mae'])
    return stats




In [None]:

# import data
df = pd.read_csv('C:/Users/User/Desktop/Data speciale/NeuralNet_data_v2.csv', parse_dates= True, index_col=0)

df.dropna(inplace=True)


In [None]:

df.head()


In [None]:
# volatility smile

import py_vollib 
from py_vollib.black_scholes.implied_volatility import implied_volatility as implied_volatility

# calculate the implied volatility of the options using the Black-Scholes formula

iv = []
for i in range(len(df)):
    intrinsic_value = max(df['Stock (S)'][i] - df['Strike (K)'][i], 0)  # For call option
    if df['price'][i] < intrinsic_value:
        iv.append(np.nan)  # Mark as NaN if price is below intrinsic value
    else:
        try:
            iv.append(implied_volatility(
                df['price'][i],
                df['Stock (S)'][i],
                df['Strike (K)'][i],
                df['Time to maturity (T)'][i],
                df['Risk free rate (r)'][i],
                flag='c'
            ))
        except Exception as e:
            iv.append(np.nan)  # Handle other exceptions gracefully

df['implied volatility'] = iv





In [None]:

# select the rows where year is greater than 2022 
#df = df[df['Year'] >= 2022]

df = df.sort_index()
df.dropna(inplace=True)


unique_dates = df.index.unique()

# Split into train 80% train 10% val 10% test based on index
#train_size = int(0.80 * len(df))
#val_size = int(0.10 * len(df))
train_size = int(0.80 * len(unique_dates))
val_size = int(0.10 * len(unique_dates))

# Get unique dates for each split
train_dates = unique_dates[:train_size]
val_dates = unique_dates[train_size:train_size + val_size]
test_dates = unique_dates[train_size:]



# Split the dataset by the determined dates
X_train = df.loc[train_dates, ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)', 
                               'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
                               'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)','tchi','shtint','gdpce']].values
X_val = df.loc[val_dates, ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)', 
                               'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
                               'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)','tchi','shtint','gdpce']].values
X_test = df.loc[test_dates, ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)', 
                               'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
                               'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)','tchi','shtint','gdpce']].values



y_train = df.loc[train_dates, ['Option Price (C)']].values
y_val = df.loc[val_dates, ['Option Price (C)']].values
y_test = df.loc[test_dates, ['Option Price (C)']].values
strike_train = X_train[:, -1]
strike_val = X_val[:, -1]
strike_test = X_test[:, -1]
year_train = X_train[:, -2]
year_val = X_val[:, -2]
year_test = X_test[:, -2]
BSM_train = X_train[:, 3:8]
BSM_val = X_val[:, 3:8]
BSM_test = X_test[:, 3:8]
vol_train = X_train[:, 8:13]
vol_val = X_val[:, 8:13]
vol_test = X_test[:, 8:13]
X_train = X_train[:, 0:3]
X_val = X_val[:, 0:3]
X_test = X_test[:, 0:3]
# select the last 3 columns in x_train, x_val and x_test
macro_train = X_train[:, -3:]
macro_val = X_val[:, -3:]
macro_test = X_test[:, -3:]



X_train_val = np.concatenate((X_train, X_val), axis=0)
y_train_val = np.concatenate((y_train, y_val), axis=0)
vol_train_val = np.concatenate((vol_train, vol_val), axis=0)
macro_train_val = np.concatenate((macro_train, macro_val), axis=0)



In [None]:
df = df.sort_index()
df.dropna(inplace=True)

# Define the sizes for each split
train_size = int(0.80 * len(df))
val_size = int(0.10 * len(df))
test_size = len(df) - train_size - val_size  # Remaining data for testing

# Split the data based on the calculated sizes
train_df = df.iloc[:train_size]
val_df = df.iloc[train_size:train_size + val_size]
test_df = df.iloc[train_size + val_size:]

# Selecting features and targets for each dataset
features = ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)', 
            'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
            'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)']
target = ['Option Price (C)']

# Separate X and y for each split
X_train = train_df[features].values
X_val = val_df[features].values
X_test = test_df[features].values

y_train = train_df[target].values
y_val = val_df[target].values
y_test = test_df[target].values

# Extract specific columns for additional variables as required
strike_train, strike_val, strike_test = X_train[:, -1], X_val[:, -1], X_test[:, -1]
year_train, year_val, year_test = X_train[:, -2], X_val[:, -2], X_test[:, -2]
BSM_train, BSM_val, BSM_test = X_train[:, 3:8], X_val[:, 3:8], X_test[:, 3:8]
vol_train, vol_val, vol_test = X_train[:, 8:13], X_val[:, 8:13], X_test[:, 8:13]

# Narrow down to the first three columns for X_train, X_val, and X_test as desired
X_train = X_train[:, :3]
X_val = X_val[:, :3]
X_test = X_test[:, :3]

# (Optional) Combine train and validation data for full training/validation set
X_train_val = np.concatenate((X_train, X_val), axis=0)
y_train_val = np.concatenate((y_train, y_val), axis=0)
vol_train_val = np.concatenate((vol_train, vol_val), axis=0)

In [None]:
# check accuracy of the BSM models on the test set

CheckAccuracy(y_test.flatten(), BSM_test[:, 0].flatten())
CheckAccuracy(y_test.flatten(), BSM_test[:, 1].flatten())
CheckAccuracy(y_test.flatten(), BSM_test[:, 2].flatten())
CheckAccuracy(y_test.flatten(), BSM_test[:, 3].flatten())
CheckAccuracy(y_test.flatten(), BSM_test[:, 4].flatten())



In [None]:
# make train_dates into a numpy array
train_dates = np.array(train_dates)
test_dates = np.array(test_dates)
val_dates = np.array(val_dates)

In [None]:

# Scale the data
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
X_train_val_scaled = scaler.transform(X_train_val)

scaler_y = MinMaxScaler()
y_train_scaled = scaler_y.fit_transform(y_train)
y_val_scaled = scaler_y.transform(y_val)
y_test_scaled = scaler_y.transform(y_test)
y_train_val_scaled = scaler_y.transform(y_train_val)

scaler_vol = MinMaxScaler()
vol_train_scaled = scaler_vol.fit_transform(vol_train)
vol_val_scaled = scaler_vol.transform(vol_val)
vol_test_scaled = scaler_vol.transform(vol_test)
vol_train_val_scaled = scaler_vol.transform(vol_train_val)

scaler_macro = MinMaxScaler()
macro_train_scaled = scaler_macro.fit_transform(macro_train)
macro_val_scaled = scaler_macro.transform(macro_val)
macro_test_scaled = scaler_macro.transform(macro_test)
macro_train_val_scaled = scaler_macro.transform(macro_train_val)

#X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,0], (len(vol_train_scaled),1)), axis=1)
#X_val_sc = np.append(X_val_scaled, np.resize(vol_val_scaled[:,0], (len(vol_val_scaled),1)), axis=1)
#X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,0], (len(vol_test_scaled),1)), axis=1)











In [None]:
# Define the exponential decay schedule
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=4000,   # Adjust this to suit your data size
    decay_rate=0.96,     # The rate at which the learning rate decays
    staircase=True       # If True, the learning rate decays in discrete steps
)



model = create_model(num_layers=4, nodes=128, dropout_rate=True, learning_rate_schedule=0.001)


X_train_val_sc = np.append(X_train_val, np.resize(vol_train_val[:,4], (len(vol_train_val),1)), axis=1)
X_test_sc = np.append(X_test, np.resize(vol_test[:,4], (len(vol_test),1)), axis=1)
X_train_sc = np.append(X_train, np.resize(vol_train[:,4], (len(vol_train),1)), axis=1)

# fit the model
model.fit(X_train_sc, y_train_scaled, epochs=2, batch_size=64, verbose=1)

#y_test_pred = model.predict(X_test_sc).flatten()


y_test_pred = scaler_y.inverse_transform(model.predict(X_test_sc)).flatten()



#model.save('C:/Users/User/Desktop/Data speciale/NeuralNetModels/NeuralNet_model.h5')

In [None]:
# Define the exponential decay schedule
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=4000,   # Adjust this to suit your data size
    decay_rate=0.96,     # The rate at which the learning rate decays
    staircase=True       # If True, the learning rate decays in discrete steps
)

model = create_model(num_layers=3, nodes=64, dropout_rate=True, learning_rate_schedule=lr_schedule)


X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,4], (len(vol_train_scaled),1)), axis=1)
X_val_sc = np.append(X_val_scaled, np.resize(vol_val_scaled[:,4], (len(vol_val_scaled),1)), axis=1)
X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,4], (len(vol_test_scaled),1)), axis=1)

# fit the model
model.fit(X_train_sc, y_train_scaled, epochs=20, batch_size=1024, verbose=1)


y_val_pred = scaler_y.inverse_transform(model.predict(X_val_sc)).flatten()
y_test_pred = scaler_y.inverse_transform(model.predict(X_test_sc)).flatten()

In [None]:


# calculate the accuracy
#stats_val = CheckAccuracy(y_val.flatten(), y_val_pred.flatten())

stats_test = CheckAccuracy(y_test.flatten(), y_test_pred.flatten())


# check the accuracy of the BS vol10 model
#stats_val_BSM = CheckAccuracy(y_val.flatten(), BSM_val[:,4].flatten())

stats_test_BSM = CheckAccuracy(y_test.flatten(), BSM_test[:,4].flatten())

# print the results
#print('Val set:')
#print('RMSE:', stats_val['rmse'])
#print('MAPE:', stats_val['mape'])
#print('BS VIX RMSE:', stats_val_BSM['rmse'])
#print('BS VIX MAPE:', stats_val_BSM['mape'])
print(' ')
print('Test set:')
print('RMSE:', stats_test['rmse'])
print('MAPE:', stats_test['mape'])
print('BS VIX RMSE:', stats_test_BSM['rmse'])
print('BS VIX MAPE:', stats_test_BSM['mape'])


In [None]:

# build a model that has 4 hidden layers with 100 nodes each the first layer is leaky relu, then ELu, Relu, Elu and finanlly an exponential output layer

# Define the model
model = Sequential()
# First hidden layer with LeakyReLU
model.add(Dense(100, input_dim=4))
model.add(LeakyReLU(alpha=0.01))  # LeakyReLU with alpha=0.01 (default value)
model.add(Dropout(0.2))
# Second hidden layer with ELU
model.add(Dense(100, activation='elu'))
model.add(Dropout(0.2))
# Third hidden layer with ReLU
model.add(Dense(100, activation='relu'))
model.add(Dropout(0.2))
# Fourth hidden layer with ELU
model.add(Dense(100, activation='elu'))
model.add(Dropout(0.2))
# Output layer with exponential activation
model.add(Dense(1, activation='exponential'))

optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer, loss='mean_squared_error')


X_train_val = np.append(X_train_val, np.resize(vol_train_val[:,4], (len(vol_train_val),1)), axis=1)
X_test = np.append(X_test, np.resize(vol_test[:,4], (len(vol_test),1)), axis=1)

# fit the model
model.fit(X_train_val, y_train_val, epochs=30, batch_size=4096, verbose=1)

#y_test_pred = model.predict(X_test_sc).flatten()


y_test_pred = model.predict(X_test).flatten()


# build the model
model = Sequential()
model.add(Dense(128, input_dim=X_train_scaled.shape[1], activation='relu'))
model.add(Dropout(0.2))
model.add(Dense(64, activation='relu'))



In [None]:

param_grid = {
    'n_hidden': [1, 2, 3],
    'n_neurons': [32, 64, 128],
    'use_dropout': [True, False]
}

In [None]:






def create_model(num_layers=2, nodes=64, dropout_rate=True, learning_rate_schedule=None):
    model = Sequential()

      
    # Input Layer
    model.add(Dense(nodes, input_dim=5, activation='relu', kernel_regularizer=l2(0.0001)))

    if dropout_rate == True:
        model.add(Dropout(0.2))

    # Hidden Layers
    for _ in range(num_layers-1):
        model.add(Dense(nodes, activation='relu', kernel_regularizer=l2(0.0001)))
        if dropout_rate == True:
            model.add(Dropout(0.2))

    # Output Layer
    model.add(Dense(1, activation='linear')) # Output layer is the price of the option. This is non negative as the price of an option cannot be negative

    # Compile model
    optimizer = Adam(learning_rate=learning_rate_schedule)
    model.compile(optimizer=optimizer, loss='mean_squared_error')
    
    
    return model





In [None]:

volatility_model_list = ['vol10', 'vol30', 'vol60', 'GARCH', 'VIX']
errorList_csv = ['diff_train.csv', 'diff_val.csv']

# build the model for the hyperparameter grid and a specific volatility model

y_train = y_train.flatten()
y_val = y_val.flatten()
y_test = y_test.flatten()

for volatility in range(len(volatility_model_list)):
    for layers in param_grid['n_hidden']:
        for nodes in param_grid['n_neurons']:
            for dropout in param_grid['use_dropout']:
                start_time = time.time()

                initial_learning_rate = 0.001
                lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
                    initial_learning_rate=initial_learning_rate,
                    decay_steps=4000,   # Adjust this to suit your data size
                    decay_rate=0.96,     # The rate at which the learning rate decays
                    staircase=True       # If True, the learning rate decays in discrete steps
                )

                # Build model (assuming `create_model` is your model creation function)
                model = create_model(num_layers=layers, nodes=nodes, dropout_rate=dropout, learning_rate_schedule=lr_schedule)

                # Prepare training and validation data without using `tf.data`
                X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:, volatility], (len(vol_train_scaled), 1)), axis=1)
                X_val_sc = np.append(X_val_scaled, np.resize(vol_val_scaled[:, volatility], (len(vol_val_scaled), 1)), axis=1)
                X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:, volatility], (len(vol_test_scaled), 1)), axis=1)
                #X_train_val_sc = np.append(X_train_val_scaled, np.resize(vol_train_val_scaled[:, volatility], (len(vol_train_val_scaled), 1)), axis=1)
                
                
                model.fit(X_train_sc, y_train_scaled, epochs=20, batch_size=1024, verbose=0)

                # After training, validate by predicting on the validation set
                y_train_pred = scaler_y.inverse_transform(np.float64(model.predict(X_train_sc))).flatten()
                y_val_pred = scaler_y.inverse_transform(np.float64(model.predict(X_val_sc))).flatten()


                # ANN Evaluation Metrics
                diff_train = y_train - y_train_pred
                mse_train = np.mean(diff_train**2)
                mape_train = np.mean(np.abs(diff_train / y_train))
                diff_val = y_val - y_val_pred
                mse_val = np.mean(diff_val**2)
                mape_val = np.mean(np.abs(diff_val / y_val))
          

                # BSM Evaluation Metrics
                #diff_BSM_train = y_train - BSM_train[:, volatility]
                #mse_BSM_train = np.mean(diff_BSM_train**2)
                #mape_BSM_train = np.mean(np.abs(diff_BSM_train / y_train))
                #diff_BSM_val = y_val - BSM_val[:, volatility]
                #mse_BSM_val = np.mean(diff_BSM_val**2)
                #mape_BSM_val = np.mean(np.abs(diff_BSM_val / y_val))

                # Print the results
                print('Model completed:', volatility_model_list[volatility])
                print('Layers:', layers, 'Nodes:', nodes, 'Dropout:', dropout)
                print('ANN Train MSE:', mse_train, 'MAPE:', mape_train)
                print('ANN Val MSE:', mse_val, 'MAPE:', mape_val)
                # print('BSM Train MSE:', mse_BSM_train, 'MAPE:', mape_BSM_train)
                # print('BSM Val MSE:', mse_BSM_val, 'MAPE:', mape_BSM_val)

                # Save the model
                print('Saving Model')
                model.save('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Saved models/V2/' +
                           f'model_{volatility_model_list[volatility]}_layers{layers}_nodes{nodes}_dropout{dropout}.h5')

                # Save ANN Evaluation Metrics to CSV
                df = pd.DataFrame({
                    'Model': ['ANN']*2,
                    'Volatility Model': [volatility_model_list[volatility]]*2,
                    'Layers': [layers]*2,
                    'Nodes': [nodes]*2,
                    'Dropout': [dropout]*2,
                    'MSE': [mse_train, mse_val],
                    'MAPE': [mape_train, mape_val],
                    'Val/Test': ['Train', 'Val']
                })
                df.to_csv('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Results/ANN_results_newMethod_v3.csv',
                          mode='a', header=False, index=False)

                # Save residuals
                #print('Saving Residuals')
                #errorList = [diff_train, diff_val]
                #for j in range(len(errorList)):
                #    df = pd.DataFrame(errorList[j])
                #    df.to_csv(f'C:/Users/User/Desktop/Data speciale/NeuralNetModels/Residuals/'
                #              f'model_{volatility_model_list[volatility]}_layers{layers}_nodes{nodes}_dropout{dropout}_{errorList_csv[j]}.csv',
                #              index=False)

                print('Model saved')
                end_time = time.time()
                execution_time = end_time - start_time
                print('Execution time:', execution_time)
                    
                
                
                        


    



In [None]:
# train model for the macroeconomic variables

# Define the exponential decay schedule
initial_learning_rate = 0.001
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=initial_learning_rate,
    decay_steps=4000,   # Adjust this to suit your data size
    decay_rate=0.96,     # The rate at which the learning rate decays
    staircase=True       # If True, the learning rate decays in discrete steps
)

model = create_model(num_layers=3, nodes=64, dropout_rate=False, learning_rate_schedule=lr_schedule)

X_train_val_sc = np.append(X_train_val_scaled, np.resize(vol_train_val_scaled[:, 4], (len(vol_train_val_scaled), 1)), axis=1)
# add the macroeconomic variables to the X_train_val_sc
# 0 = tchi, 1 = shtint, 2 = gdpce
X_train_val_sc = np.append(X_train_val_sc, np.resize(macro_train_val_scaled[:, 2], (len(macro_train_val_scaled), 1)), axis=1)

X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:, 4], (len(vol_test_scaled), 1)), axis=1)

X_test_sc = np.append(X_test_sc, np.resize(macro_test_scaled[:, 2], (len(macro_test_scaled), 1)), axis=1)

# fit the model
model.fit(X_train_val_sc, y_train_val_scaled, epochs=30, batch_size=1024, verbose=1)

y_test_pred = scaler_y.inverse_transform(model.predict(X_test_sc)).flatten()





# Final model prediction on test data

In [None]:


ANN_results = pd.read_csv('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Results/ANN_Results.csv')

# load the best model
model_VIX = tf.keras.models.load_model('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Saved models/V2/model_VIX_layers3_nodes64_dropoutFalse.h5')

model_GARCH = tf.keras.models.load_model('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Saved models/V2/model_GARCH_layers1_nodes128_dropoutTrue.h5')

model_vol60 = tf.keras.models.load_model('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Saved models/V2/model_vol60_layers1_nodes128_dropoutFalse.h5')

model_vol30 = tf.keras.models.load_model('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Saved models/V2/model_vol30_layers2_nodes32_dropoutFalse.h5')

model_vol10 = tf.keras.models.load_model('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Saved models/V2/model_vol10_layers1_nodes32_dropoutFalse.h5')


In [None]:
df = df.sort_index()
df.dropna(inplace=True)

# Define the sizes for each split
train_size = int(0.80 * len(df))
val_size = int(0.10 * len(df))
test_size = len(df) - train_size - val_size  # Remaining data for testing

# Split the data based on the calculated sizes
train_df = df.iloc[:train_size]
val_df = df.iloc[train_size:train_size + val_size]
test_df = df.iloc[train_size + val_size:]

# Selecting features and targets for each dataset
features = ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)', 
            'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
            'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)']
target = ['Option Price (C)']

# Separate X and y for each split
X_train = train_df[features].values
X_val = val_df[features].values
X_test = test_df[features].values

y_train = train_df[target].values
y_val = val_df[target].values
y_test = test_df[target].values

# Extract specific columns for additional variables as required
strike_train, strike_val, strike_test = X_train[:, -1], X_val[:, -1], X_test[:, -1]
year_train, year_val, year_test = X_train[:, -2], X_val[:, -2], X_test[:, -2]
BSM_train, BSM_val, BSM_test = X_train[:, 3:8], X_val[:, 3:8], X_test[:, 3:8]
vol_train, vol_val, vol_test = X_train[:, 8:13], X_val[:, 8:13], X_test[:, 8:13]

# Narrow down to the first three columns for X_train, X_val, and X_test as desired
X_train = X_train[:, :3]
X_val = X_val[:, :3]
X_test = X_test[:, :3]

# (Optional) Combine train and validation data for full training/validation set
X_train_val = np.concatenate((X_train, X_val), axis=0)
y_train_val = np.concatenate((y_train, y_val), axis=0)
vol_train_val = np.concatenate((vol_train, vol_val), axis=0)


# Scale the data
scaler = MinMaxScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_val_scaled = scaler.transform(X_val)
X_test_scaled = scaler.transform(X_test)
X_train_val_scaled = scaler.transform(X_train_val)

scaler_y = MinMaxScaler()
y_train_scaled = scaler_y.fit_transform(y_train)
y_val_scaled = scaler_y.transform(y_val)
y_test_scaled = scaler_y.transform(y_test)
y_train_val_scaled = scaler_y.transform(y_train_val)

scaler_vol = MinMaxScaler()
vol_train_scaled = scaler_vol.fit_transform(vol_train)
vol_val_scaled = scaler_vol.transform(vol_val)
vol_test_scaled = scaler_vol.transform(vol_test)
vol_train_val_scaled = scaler_vol.transform(vol_train_val)


In [None]:
# predict the option prices for the test set for each model

X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,4], (len(vol_train_scaled),1)), axis=1)
X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,4], (len(vol_test_scaled),1)), axis=1)
#model_VIX = create_model(num_layers=2, nodes=128, dropout_rate=True, learning_rate=0.001)
#model_VIX.fit(X_train_sc, y_train_scaled, epochs=30, batch_size=4096, verbose=0)
y_pred_VIX = scaler_y.inverse_transform(np.float64(model_VIX.predict(X_test_sc))).flatten()

X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,3], (len(vol_train_scaled),1)), axis=1)
X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,3], (len(vol_test_scaled),1)), axis=1)
#model_GARCH = create_model(num_layers=2, nodes=64, dropout_rate=True, learning_rate=0.001)
#model_GARCH.fit(X_train_sc, y_train_scaled, epochs=30, batch_size=4096, verbose=0)
y_pred_GARCH = scaler_y.inverse_transform(np.float64(model_GARCH.predict(X_test_sc))).flatten()

X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,2], (len(vol_train_scaled),1)), axis=1)
X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,2], (len(vol_test_scaled),1)), axis=1)
#model_vol60 = create_model(num_layers=2, nodes=128, dropout_rate=True, learning_rate=0.001)
#model_vol60.fit(X_train_sc, y_train_scaled, epochs=30, batch_size=4096, verbose=0)
y_pred_vol60 = scaler_y.inverse_transform(np.float64(model_vol60.predict(X_test_sc))).flatten()

X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,1], (len(vol_train_scaled),1)), axis=1)
X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,1], (len(vol_test_scaled),1)), axis=1)
#model_vol30 = create_model(num_layers=2, nodes=32, dropout_rate=True, learning_rate=0.001)
#model_vol30.fit(X_train_sc, y_train_scaled, epochs=30, batch_size=4096, verbose=0)
y_pred_vol30 = scaler_y.inverse_transform(np.float64(model_vol30.predict(X_test_sc))).flatten()

X_train_sc = np.append(X_train_scaled, np.resize(vol_train_scaled[:,0], (len(vol_train_scaled),1)), axis=1)
X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,0], (len(vol_test_scaled),1)), axis=1)
#model_vol10 = create_model(num_layers=2, nodes=32, dropout_rate=False, learning_rate=0.001)
#model_vol10.fit(X_train_sc, y_train_scaled, epochs=30, batch_size=4096, verbose=0)
y_pred_vol10 = scaler_y.inverse_transform(np.float64(model_vol10.predict(X_test_sc))).flatten()






In [None]:




# calculate the accuracy for each model
print('ANN')
print('VIX')
stats_VIX = CheckAccuracy(y_test.flatten(), y_pred_VIX)
print('GARCH')
stats_GARCH = CheckAccuracy(y_test.flatten(), y_pred_GARCH)
print('vol60')
stats_vol60 = CheckAccuracy(y_test.flatten(), y_pred_vol60)
print('vol30')
stats_vol30 = CheckAccuracy(y_test.flatten(), y_pred_vol30)
print('vol10')
stats_vol10 = CheckAccuracy(y_test.flatten(), y_pred_vol10)

# calculate the accuracy for the BS model 
print('BSM')
print('VIX')
stats_BSM_VIX = CheckAccuracy(y_test.flatten(), BSM_test[:,4].flatten())
print('GARCH')
stats_BSM_GARCH = CheckAccuracy(y_test.flatten(), BSM_test[:,3].flatten())
print('vol60')
stats_BSM_vol60 = CheckAccuracy(y_test.flatten(), BSM_test[:,2].flatten())
print('vol30')
stats_BSM_vol30 = CheckAccuracy(y_test.flatten(), BSM_test[:,1].flatten())
print('vol10')
stats_BSM_vol10 = CheckAccuracy(y_test.flatten(), BSM_test[:,0].flatten())

In [None]:
# join the results in on dataframe with X_test and y_test and the predictions and attach it to the original dataframe

#df_val = df.loc[val_dates, ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)',
#                            'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
#                            'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)', 'Option Price (C)', 'Stock (S)']]
#df_val['ANN VIX'] = y_pred_VIX
#df_val['ANN GARCH'] = y_pred_GARCH
#df_val['ANN vol60'] = y_pred_vol60
#df_val['ANN vol30'] = y_pred_vol30
#df_val['ANN vol10'] = y_pred_vol10




# do the same for train set 

#df_train = df.loc[train_dates, ['Moneyness (M)', 'Time to maturity (T)', 'Risk free rate (r)',
#                            'BS (vol10)', 'BS (vol30)', 'BS (vol60)', 'BS GARCH', 'BS VIX',
#                            'vol10', 'vol30', 'vol60', 'GARCH', 'VIX', 'Year', 'Strike (K)', 'Option Price (C)', 'Stock (S)']]
#df_train['ANN VIX'] = y_pred_VIX_train
#df_train['ANN GARCH'] = y_pred_GARCH_train
#df_train['ANN vol60'] = y_pred_vol60_train
#df_train['ANN vol30'] = y_pred_vol30_train
#df_train['ANN vol10'] = y_pred_vol10_train



# save the results to a csv file
#df_val.to_csv('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Results/NN_results_test.csv')
#df_train.to_csv('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Results/NN_results_train.csv')

test_df['ANN VIX'] = y_pred_VIX
test_df['ANN GARCH'] = y_pred_GARCH
test_df['ANN vol60'] = y_pred_vol60
test_df['ANN vol30'] = y_pred_vol30
test_df['ANN vol10'] = y_pred_vol10


test_df.to_csv('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Results/NN_results_test_v2.csv')




In [None]:

# import every model in directory on the computer and predict the option prices for the test set then store the results in a csv file
import os

model_dir = 'C:/Users/User/Desktop/Data speciale/NeuralNetModels'
model_list = os.listdir(model_dir)

y_test = y_test.flatten()

for model_name in model_list:
    model = tf.keras.models.load_model(model_dir + '/' + model_name)
    
    # if model name contains 'vol10' then use the vol10 volatility model
    if 'vol10' in model_name:
        X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,0], (len(vol_test_scaled),1)), axis=1)
    elif 'vol30' in model_name:
        X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,1], (len(vol_test_scaled),1)), axis=1)
    elif 'vol60' in model_name:
        X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,2], (len(vol_test_scaled),1)), axis=1)
    elif 'GARCH' in model_name:
        X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,3], (len(vol_test_scaled),1)), axis=1)
    elif 'VIX' in model_name:
        X_test_sc = np.append(X_test_scaled, np.resize(vol_test_scaled[:,4], (len(vol_test_scaled),1)), axis=1)

    y_pred = scaler_y.inverse_transform(np.float64(model.predict(X_test_sc))).flatten()
    stats = CheckAccuracy(y_test, y_pred)
    df = pd.DataFrame({
        'Model': [model_name]*2,
        'MSE': [stats['mse']]*2,
        'MAPE': [stats['mape']]*2,
        'Train/Test': ['Train', 'Test']
    })
    df.to_csv('C:/Users/User/Desktop/Data speciale/NeuralNetModels/Results/ANN_results_v3_test.csv',
              mode='a', header=False, index=False)
    print(model_name, 'completed')
