<a href="https://colab.research.google.com/github/nikhil-iitb/DerivativePricing_MarketMaking/blob/main/SVM_ANN_OptionPricing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install keras-tuner

Collecting keras-tuner
  Downloading keras_tuner-1.3.5-py3-none-any.whl (176 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m176.1/176.1 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
Collecting kt-legacy (from keras-tuner)
  Downloading kt_legacy-1.0.5-py3-none-any.whl (9.6 kB)
Installing collected packages: kt-legacy, keras-tuner
Successfully installed keras-tuner-1.3.5 kt-legacy-1.0.5


In [None]:
import os
import gzip
import pathlib
import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner as kt
import tensorflow_datasets as tfds

# Model

In [None]:
import argparse
import numpy as np
import tensorflow as tf

from tensorflow.keras.layers import Dense, Input, Flatten, Dropout, BatchNormalization
from tensorflow.keras.regularizers import l1, l2, l1_l2

def getModel(input_shape = (8,),
            num_layers   = 2,
            hidden_units = [14,7],
            output_shape = (1,),
            activation = 'elu',
            initializer = tf.random_normal_initializer(mean=0.0, stddev=0.1),
            regularizer = l1_l2(0.000001,0.000001),
            final_activation = None,
            dropout = None,
            batchnorm = False
            ):
    """
    Returns a model for training and testing.

    Args:
        - input_shape: shape of the input data
        - num_layers: int, number of hidden layers
        - hidden_units: list of number of hidden units in each layer
        - output_shape: shape of the output data
        - activation: string, activation function
        - initializer: initializer for the weights
        - regularizer: regularizer for the weights
        - final_activation: string, activation function of final layer
        - dropout: list, dropout rate for each layer, default None
        - batchnorm: bool, specifies if batch normalization is used, default False

    Output:
        - model: tf.keras.Model, compiled if compile is True
    """
    assert num_layers == len(hidden_units), "Number of hidden units must match number of layers"
    if dropout is not None:
        assert num_layers == len(dropout), "Number of dropout rates must match number of layers"

    inputs = Input(shape=input_shape)
    h = Flatten()(inputs)

    for i, layer in enumerate(hidden_units):
        h = Dense(layer, activation=activation,
                                  kernel_initializer = initializer,
                                  kernel_regularizer = regularizer)(h)
        if dropout:
            h = Dropout(dropout[i])(h)
        if batchnorm:
            h = BatchNormalization()(h)
    if final_activation is not None:
        outputs = Dense(output_shape[0], activation=final_activation,
                                        kernel_initializer = initializer)(h)
    else:
        outputs = Dense(output_shape[0],
                                        kernel_initializer = initializer)(h)

    model = tf.keras.Model(inputs=inputs, outputs=outputs)

    return model


def tuneLayer(hp):
    """
    Returns a compiled hyperModel for keras tuner. (this is private)
    """

    num_layers = hp.Int('num_layers', min_value=1, max_value=5)
    hidden_units = []
    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=5, max_value=50, step=5)
        hidden_units.append(hidden_unit)

    # learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling = 'log')

    model = getModel(input_shape=input_shape_glob,
                    output_shape=output_shape_glob,
                    num_layers = num_layers,
                    hidden_units = hidden_units,
                    compile = True
                    )

    return model



def tuneLR(hp):
    """
    Returns a compiled hyperModel for keras tuner. (this is private)
    """

    num_layers = hp.Int('num_layers', min_value=1, max_value=5)
    hidden_units = []
    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=5, max_value=50, step=5)
        hidden_units.append(hidden_unit)

    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling = 'log')

    model = getModel(input_shape=input_shape_glob,
                    output_shape=output_shape_glob,
                    num_layers = num_layers,
                    hidden_units = hidden_units,
                    compile = True,
                    learning_rate = learning_rate
                    )

    return model



def tuned_model(hp):
    """
    Returns a compiled hyperModel for keras tuner.

    - Number of layers: 1-5
    - Number of hidden units: 5-7, step 1
    - Learning rate: 1e-4 - 1e-2, log sampling
    - Rate of lr decay: 0.85-0.9995
    - l1_coeff: 1e-8 - 1e-6.5, log sampling
    - l2_coeff: 1e-8 - 1e-6.5, log sampling
    - Loss:
    - Metrics:
    """

    # defining a set of hyperparameters for tuning and a range of values for each
    num_layers = hp.Int('num_layers', min_value=1, max_value=5)

    # https://stats.stackexchange.com/questions/402618/can-sinx-be-used-as-activation-in-deep-learning
    activation = hp.Choice('activation', ['elu', 'tanh'])

    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=0.01, sampling = 'log')
    rate_decay = hp.Float('rate_decay', min_value=0.85, max_value=0.9995)
    l1_reg = hp.Float('l1_coeff', min_value=10**(-8), max_value=10**(-6.5))
    l2_reg = hp.Float('l2_coeff', min_value=10**(-8), max_value=10**(-6.5))


    hidden_units = []
    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=5, max_value=7)
        hidden_units.append(hidden_unit)

    model = getModel(input_shape=input_shape_glob,
                    output_shape=output_shape_glob,
                    num_layers = num_layers,
                    hidden_units = hidden_units,
                    activation = activation,
                    regularizer = tf.keras.regularizers.l1_l2(l1_reg,l2_reg)
                    )

    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        learning_rate, decay_steps = 4000, decay_rate = rate_decay, staircase = True)

    # perhaps a little change here with loss and metrics
    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr_schedule), loss = tf.keras.losses.MeanAbsolutePercentageError(),
                metrics = [tf.keras.metrics.MeanSquaredError()])

    return model

class DenseResidualBlock(tf.keras.layers.Layer):

    def __init__(self,
                layer_per_block=[10,10,10],
                activation='elu',
                initializer='random_uniform',
                l2reg_coeff=0.01,
                dropout = True,
                **kwargs):
        """
        Class initializer for a custom dense residual block with batch normalization.

        Args:
            - layer_per_block: number of layers in the block
            - activation: string, activation function to use in the dense layers
            - initializer: tf.keras.initializers, initializer to use in the dense layer
            - l2reg_coeff: coefficient for L2 regularization
            - dropout: boolean, whether to use dropout or not, default True
            - **kwargs: keyword arguments for the parent class
        """

        super(DenseResidualBlock, self).__init__(**kwargs)

        self.l2reg_coeff = l2reg_coeff
        self.activation = activation
        self.layer_per_block = layer_per_block
        self.initializer = initializer
        self.dropout = dropout


    def build(self, input_shape):
        for i in range(len(self.layer_per_block)):
            setattr(self, 'dense_{}'.format(i+1), Dense(self.layer_per_block[i],
                                                    activation=self.activation,
                                                    kernel_initializer=self.initializer,
                                                    kernel_regularizer=l2(self.l2reg_coeff),
                                                    input_shape = input_shape,
                                                    name='dense_{}'.format(i+1)))

            setattr(self, 'bn_{}'.format(i+1), BatchNormalization(name='bn_{}'.format(i+1)))
            if self.dropout:
                setattr(self, 'dropout_{}'.format(i+1), Dropout(0.2, name='dropout_{}'.format(i+1)))


    def call(self, inputs, training=False):
        h = inputs
        for i in range(len(self.layer_per_block)):
            h = getattr(self, 'dense_{}'.format(i+1))(h)
            h = getattr(self, 'bn_{}'.format(i+1))(h, training=training)
            if self.dropout:
                h = getattr(self, 'dropout_{}'.format(i+1))(h, training=training)
        return h + inputs




# PreProcessing

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# define a global variable minmax_scaler for later to reverse the transformation
minmax_scaler = MinMaxScaler()


def getDatasets(dataframe, scaling = True):
    """
    Returns tuple of scaled train, valid, and test datasets.

    Args:
        - dataframe: ndarray, dataframe of the data
        - scaling: boolean, whether to scale the data, default is True
    Output:
        - (train_ds, valid_ds, test_ds): tuple of datasets which are between 0 and 1 if scaling is True

    """
    N = len(dataframe)
    indices = np.random.permutation(N)
    train, val, test = np.split(dataframe[indices], [int(.8*N), int(.9*N)])
    # add normalizing layer here
    if scaling:
        train = minmax_scaler.fit_transform(train)
        val = minmax_scaler.transform(val)
        test = minmax_scaler.transform(test)
    print(train.shape, val.shape, test.shape)
    train_ds = tf.data.Dataset.from_tensor_slices(tf.cast(train, tf.float32))
    valid_ds = tf.data.Dataset.from_tensor_slices(tf.cast(val, tf.float32))
    test_ds  = tf.data.Dataset.from_tensor_slices(tf.cast(test, tf.float32))
    return (train_ds, valid_ds, test_ds)



def makeArr_BS(df):
    """Returns a numpy array given the pandas dataframe"""
    dataframe_BS = np.vstack((df['strike'].values,
                      df['underlyings_price'].values,
                      df['days_to_maturity'].values,
                      df['volatility'].values,
                      df['rate'].values,
                      df['contract_price'].values)).T
    return dataframe_BS



def getNormalizedData_BS(df):
    """
    Returns the normalized data using StandardScaler from sklearn.

    Args:
        - dataframe: pandas array, dataframe of the data

    Output:
        - (train_ds, valid_ds, test_ds): tuple of datasets
    """
    N = len(df)
    # drop the non-numeric columns
    try:
        df = df.drop(['callput'], axis = 1)
        df = df.drop(['date_traded'], axis=1)
    except:
        pass

    df = df.sample(frac=1)
    df_train = df[:int(.8*N)]
    df_val = df[int(.8*N):int(.9*N)]
    df_test = df[int(.9*N):]

    # normalize the data
    normalizer = StandardScaler()
    df_train[df.columns] = normalizer.fit_transform(df_train[df.columns])
    df_val[df.columns] = normalizer.transform(df_val[df.columns]) # transform using the values from the training set
    df_test[df.columns] = normalizer.transform(df_test[df.columns])

    train = makeArr_BS(df_train)
    val = makeArr_BS(df_val)
    test = makeArr_BS(df_test)
    print(train.shape, val.shape, test.shape)

    train_ds = tf.data.Dataset.from_tensor_slices(tf.cast(train, tf.float32))
    valid_ds = tf.data.Dataset.from_tensor_slices(tf.cast(val, tf.float32))
    test_ds  = tf.data.Dataset.from_tensor_slices(tf.cast(test, tf.float32))

    return (train_ds, valid_ds, test_ds)


def getScaledData_BS(df):
    """
    Returns the scaled data using MinMaxScaler from sklearn.

    Args:
        - dataframe: pandas array, dataframe of the data

    Output:
        - (train_ds, valid_ds, test_ds): tuple of datasets
    """
    N = len(df)
    # drop the non-numeric columns
    try:
        df = df.drop(['callput'], axis = 1)
        df = df.drop(['date_traded'], axis=1)
    except:
        pass

    df = df.sample(frac=1)
    df_train = df[:int(.8*N)]
    df_val = df[int(.8*N):int(.9*N)]
    df_test = df[int(.9*N):]

    # normalize the data
    normalizer = MinMaxScaler()
    df_train[df.columns] = normalizer.fit_transform(df_train[df.columns])
    df_val[df.columns] = normalizer.transform(df_val[df.columns]) # transform using the values from the training set
    df_test[df.columns] = normalizer.transform(df_test[df.columns])

    train = makeArr_BS(df_train)
    val = makeArr_BS(df_val)
    test = makeArr_BS(df_test)
    print(train.shape, val.shape, test.shape)
    train_ds = tf.data.Dataset.from_tensor_slices(tf.cast(train, tf.float32))
    valid_ds = tf.data.Dataset.from_tensor_slices(tf.cast(val, tf.float32))
    test_ds  = tf.data.Dataset.from_tensor_slices(tf.cast(test, tf.float32))

    return (train_ds, valid_ds, test_ds)



def shuffle_and_batch_dataset(dataset, batch_size, shuffle_buffer=None):
    """
    Returns the shuffled and batched Dataset.

    Args:
        - dataset: tf.data.Dataset
        - batch_size: int, batch size
        - shuffle_buffer: int, shuffle buffer size
    """
    if shuffle_buffer is not None:
      out = dataset.shuffle(shuffle_buffer).batch(batch_size)
    else:
      out = dataset.batch(batch_size)

    return out

def map_dataset(dataset, map_func):
    """
    Return a mapped dataset.

    Args:
        - dataset: tf.data.Dataset
        - map_func: function, mapping function
    """
    out = dataset.map(map_func)
    return out

# map_function
def xy_split(data):
    """
    Returns the x and y tensors from the dataset.

    Args:
        - data: content of a dataset

    Output:
        - (x,y): tuple of x the features and y the label
    """

    return (data[:,:-1], data[:,-1])



def pipeline1(dataframe_BS, prefetch = True, scaling = True, batch_size = 32, shuffle_buffer = 1000):
    """
    Returns a tuple of processed prefetched data for training.

    Args:
        - dataframe_BS: ndarray, dataframe of the data
        - prefetch: boolean, whether to prefetch the data
        - scaling: boolean, whether to scale the data
        - batch_size: int, batch size, default 32
        - shuffle_buffer: int, shuffle buffer size, default 1000

    Output:
        - (train_ds, valid_ds, test_ds): tuple of datasets
    """
    train_ds, valid_ds, test_ds = getDatasets(dataframe_BS, scaling)
    train_ds = shuffle_and_batch_dataset(train_ds, batch_size=batch_size, shuffle_buffer=shuffle_buffer)
    valid_ds = shuffle_and_batch_dataset(valid_ds, batch_size=batch_size, shuffle_buffer=shuffle_buffer)
    test_ds = shuffle_and_batch_dataset(test_ds, batch_size=batch_size, shuffle_buffer=shuffle_buffer)
    train_ds = map_dataset(train_ds, xy_split)
    valid_ds = map_dataset(valid_ds, xy_split)
    test_ds = map_dataset(test_ds, xy_split)
    if prefetch:
        train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)
        valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)
        test_ds = test_ds.prefetch(tf.data.experimental.AUTOTUNE)

    return (train_ds, valid_ds, test_ds)


def pipeline2(df, prefetch = True, scaling = 'minmax', batch_size = 32, shuffle_buffer = 1000):
    """
    Returns a tuple of processed prefetched data for training.

    Args:
        - df: pandas dataframe, dataframe of the data
        - prefetch: boolean, whether to prefetch the data
        - scaling: string, either minmax or normalize, default minmax
        - batch_size: int, batch size, default 32
        - shuffle_buffer: int, shuffle buffer size, default 1000

    Output:
        - (train_ds, valid_ds, test_ds): tuple of datasets
    """
    if scaling == 'minmax':
        train_ds, valid_ds, test_ds = getScaledData_BS(df)
    elif scaling == 'normalize':
        train_ds, valid_ds, test_ds = getNormalizedData_BS(df)
    else:
        raise ValueError('scaling must be either minmax or normalize')

    train_ds = shuffle_and_batch_dataset(train_ds, batch_size=batch_size, shuffle_buffer=shuffle_buffer)
    valid_ds = shuffle_and_batch_dataset(valid_ds, batch_size=batch_size, shuffle_buffer=shuffle_buffer)
    test_ds = shuffle_and_batch_dataset(test_ds, batch_size=batch_size, shuffle_buffer=shuffle_buffer)
    train_ds = map_dataset(train_ds, xy_split)
    valid_ds = map_dataset(valid_ds, xy_split)
    test_ds = map_dataset(test_ds, xy_split)
    if prefetch:
        train_ds = train_ds.prefetch(tf.data.experimental.AUTOTUNE)
        valid_ds = valid_ds.prefetch(tf.data.experimental.AUTOTUNE)
        test_ds = test_ds.prefetch(tf.data.experimental.AUTOTUNE)

    return (train_ds, valid_ds, test_ds)

# Trainer

In [None]:
import tensorflow as tf


def compile_and_fit(model, optimizer, loss, num_epochs, train_dataset,
                    validation_dataset=None, metrics=None, callbacks=None,
                    verbose=False):
    """
    Returns history of training.

    Args:
        - model: tf.keras.Model, model to be trained
        - optimizer: tf.keras.optimizers.Optimizer, optimizer to be used for training
        - loss: tf.keras.losses, loss function to be used for training
        - num_epochs: int, number of epochs to be trained
        - train_dataset: tf.data.Dataset, dataset for training
        - validation_dataset: tf.data.Dataset, dataset for validation
        - metrics: tf.keras.metrics, metrics to be used for evaluation
        - callbacks: list of tf.keras.callbacks.Callback, callbacks to be used for training
        - verbose: bool, specifies if training progress is printed to stdout
    """

    model.compile(optimizer, loss, metrics)
    history = model.fit(train_dataset, validation_data=validation_dataset,
              epochs = num_epochs, callbacks = callbacks, verbose=verbose)
    return history

# CallBack

In [None]:
import numpy as np
import tensorflow as tf

class PrintProgress(tf.keras.callbacks.Callback):

    def __init__(self, num_epochs, **kwargs):
        """
        Initializes the PrintProgress callback.

        Args:
            - num_epochs: int, every num_epochs epochs the progress is printed
        """
        super(PrintProgress, self).__init__(**kwargs)
        self.num_epochs = num_epochs

    def on_epoch_end(self, epoch, logs= None):
        # for key in logs.keys():
        #     print(key)
        train_loss = logs['loss']
        train_acc  = logs['accuracy']
        val_acc   = logs['val_accuracy']
        val_loss   = logs['val_loss']
        if epoch>0 and (epoch+1)%(self.num_epochs)==0:
          print("Epoch {:0} train loss is {:.4f}, train accuracy is {:.4f}, val loss is {:.4f}, and val accuracy is {:.4f}".format(epoch+1, train_loss, train_acc, val_loss, val_acc))


class CheckpointCallback(tf.keras.callbacks.Callback):

    def __init__(self, directory):
        """
        Initializes the CheckpointCallback.

        Args:
            - directory: string, path to the directory where the checkpoint is saved
        """
        super(CheckpointCallback, self).__init__()
        self.directory = directory
        self.best_val = tf.Variable(np.inf, trainable=False)

    def set_model(self, model):
        self.model = model
        self.ckpt = tf.train.Checkpoint(model=self.model)
        self.manager = tf.train.CheckpointManager(self.ckpt, self.directory,
                                                  checkpoint_name='model', max_to_keep=1)

    def on_epoch_end(self, epoch, logs=None):
        val = logs['val_loss']
        if val < self.best_val:
            self.best_val = val
            self.manager.save()

# Tuner

In [None]:
"""Implements the custom tuner class."""

import keras_tuner
import numpy as np
import tensorflow as tf

def tuneLayer(hp):
    """
    Returns a compiled hyperModel for keras tuner. (this is private)
    """

    num_layers = hp.Int('num_layers', min_value=1, max_value=5)
    hidden_units = []
    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=5, max_value=50, step=5)
        hidden_units.append(hidden_unit)

    model = getModel(input_shape=input_shape_glob,
                    output_shape=output_shape_glob,
                    num_layers = num_layers,
                    hidden_units = hidden_units,
                    compile = True
                    )

    return model



def tuneLR(hp):
    """
    Returns a compiled hyperModel for keras tuner. (this is private)
    """

    num_layers = hp.Int('num_layers', min_value=1, max_value=5)
    hidden_units = []
    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=5, max_value=50, step=5)
        hidden_units.append(hidden_unit)

    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=1e-2, sampling = 'log')

    model = getModel(input_shape=input_shape_glob,
                    output_shape=output_shape_glob,
                    num_layers = num_layers,
                    hidden_units = hidden_units,
                    compile = True,
                    learning_rate = learning_rate
                    )

    return model



def tuned_model(hp):
    """
    Returns a compiled hyperModel for keras tuner.

    - Number of layers: 1-5
    - Number of hidden units: 5-7, step 1
    - Learning rate: 1e-4 - 1e-2, log sampling
    - Rate of lr decay: 0.85-0.9995
    - l1_coeff: 1e-8 - 1e-6.5, log sampling
    - l2_coeff: 1e-8 - 1e-6.5, log sampling
    - Loss:
    - Metrics:
    """

    # defining a set of hyperparameters for tuning and a range of values for each
    num_layers = hp.Int('num_layers', min_value=1, max_value=5)

    # https://stats.stackexchange.com/questions/402618/can-sinx-be-used-as-activation-in-deep-learning

    # activation = hp.Choice('activation', ['elu', 'tanh'])

    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=0.01, sampling = 'log')
    rate_decay = hp.Float('rate_decay', min_value=0.85, max_value=0.9995)
    l1_reg = hp.Float('l1_coeff', min_value=10**(-8), max_value=10**(-6.5))
    l2_reg = hp.Float('l2_coeff', min_value=10**(-8), max_value=10**(-6.5))


    hidden_units = []
    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=50, max_value=500, step=50)
        hidden_units.append(hidden_unit)

    model = getModel(input_shape=input_shape_glob,
                    output_shape=output_shape_glob,
                    num_layers = num_layers,
                    hidden_units = hidden_units,
                    activation = 'elu',
                    regularizer = tf.keras.regularizers.l1_l2(l1_reg,l2_reg)
                    )

    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        learning_rate, decay_steps = 4000, decay_rate = rate_decay, staircase = True)

    # perhaps a little change here with loss and metrics
    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr_schedule), loss = tf.keras.losses.MeanAbsolutePercentageError(name='loss'),
                metrics = [tf.keras.metrics.MeanSquaredError(name='accuracy')])

    return model

def tuneSine(hp):
    """
    Returns a compiled hyperModel for keras tuner.  The input shape is limited to (5,) and out shape to (1,)

    - Number of layers: 1-5
    - Number of hidden units: 5-7, step 1
    - Learning rate: 1e-4 - 1e-2, log sampling
    - Rate of lr decay: 0.85-0.9995
    - l1_coeff: 1e-8 - 1e-6.5, log sampling
    - l2_coeff: 1e-8 - 1e-6.5, log sampling
    """

    # defining a set of hyperparameters for tuning and a range of values for each
    num_layers = hp.Int('num_layers', min_value=1, max_value=5)

    # https://stats.stackexchange.com/questions/402618/can-sinx-be-used-as-activation-in-deep-learning

    learning_rate = hp.Float('learning_rate', min_value=1e-4, max_value=0.01, sampling = 'log')
    rate_decay = hp.Float('rate_decay', min_value=0.85, max_value=0.9995)
    l1_reg = hp.Float('l1_coeff', min_value=10**(-8), max_value=10**(-6.5))
    l2_reg = hp.Float('l2_coeff', min_value=10**(-8), max_value=10**(-6.5))

    list_of_layers = []

    for i in range(num_layers):
        hidden_unit = hp.Int(f'units_{i+1}', min_value=50, max_value=300, step=50)
        list_of_layers.append(tf.keras.layers.Dense(hidden_unit, kernel_regularizer = tf.keras.regularizers.l1_l2(l1_reg,l2_reg)))
        list_of_layers.append(tf.keras.layers.Lambda(lambda x: tf.math.sin(x)))

    list_of_layers.append(tf.keras.layers.Dense(1))

    model = tf.keras.Sequential(list_of_layers)

    lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
        learning_rate, decay_steps = 4000, decay_rate = rate_decay, staircase = True)

    # perhaps a little change here with loss and metrics
    model.compile(optimizer = tf.keras.optimizers.Adam(learning_rate = lr_schedule), loss = tf.keras.losses.MeanAbsolutePercentageError(name='loss'),
                metrics = [tf.keras.metrics.MeanSquaredError(name='accuracy')])

    return model

# for the moment being, we only use the RandomSearch tuner,

class customTuner(keras_tuner.RandomSearch):

    def __init__(self, input_shape, output_shape, dim=None, basket=False, **kwargs):
        """
        Initializes the custom tuner class.

        Args:
            - input_shape: the shape of the input data
            - output_shape: the shape of the output data
            - dim: int, the dimension of basket, default None
            - basket: bool, whether to use basket model or not, default False
        """
        global input_shape_glob
        global dim_glob
        global output_shape_glob
        global basket_glob

        super(customTuner, self).__init__(**kwargs)
        self.input_shape = input_shape
        self.output_shape = output_shape
        self.dim = dim
        self.basket = basket

        input_shape_glob = input_shape
        dim_glob = dim
        output_shape_glob = output_shape
        basket_glob = basket

    def run_trial(self, trial, train_ds, valid_ds, epochs, **kwargs):
        # overrides the run_trial method of the RandomSearch class
        # should return the result of model.fit()
        hp = trial.hyperparameters
        compiled_model = tuned_model(hp)
        history = compiled_model.fit(train_ds, validation_data=valid_ds, epochs=epochs, **kwargs)
        return  history



# define a hypermodel subclass

class customHyperModel(keras_tuner.HyperModel):

  def build(self, hp):
    return tuned_model(hp)

## IMPLEMENTATION

In [None]:
import os
import gzip
import pathlib
import numpy as np
import pandas as pd
import tensorflow as tf
import keras_tuner as kt
import tensorflow_datasets as tfds

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
file_path = '/content/drive/MyDrive/BTechProject/Data/data.csv'
data_df = pd.read_csv(file_path)
data_df.head()

Mounted at /content/drive


Unnamed: 0.1,Unnamed: 0,optionid,securityid,strike,callput,date_traded,contract_price,market_price,underlyings_price,contract_volume,days_to_maturity,moneyness,rate,volatility
0,0,150034236.0,504569.0,0.42,C,2006-10-18,0.0715,0.07025,0.4885,5.0,2.0,1.163095,0.053646,0.022956
1,1,150247468.0,504880.0,40.0,C,2006-10-18,0.124,0.1225,39.913799,56137.0,2.0,0.997845,0.053646,0.114784
2,2,150255000.0,506496.0,62.0,C,2006-10-18,0.172,0.174,61.827798,27369.0,2.0,0.997223,0.053646,0.106823
3,3,150255496.0,506497.0,53.5,C,2006-10-18,0.296,0.2655,53.6129,1224.0,2.0,1.00211,0.053646,0.110336
4,4,150255498.0,506497.0,54.0,C,2006-10-18,0.075,0.0645,53.6129,963.0,2.0,0.992831,0.053646,0.110336


In [None]:
df = data_df.drop(['Unnamed: 0'], axis=1)
df.sample(5)

Unnamed: 0,optionid,securityid,strike,callput,date_traded,contract_price,market_price,underlyings_price,contract_volume,days_to_maturity,moneyness,rate,volatility
81702,162747770.0,702263.0,15.65,C,2018-03-06,0.1,0.0875,15.489449,54.0,10.0,0.989741,0.015239,0.218024
55294,155640371.0,702263.0,17.1,C,2015-03-26,0.2125,0.2125,16.58285,50.0,85.0,0.969757,0.002625,0.124373
100988,165413648.0,702263.0,14.2,C,2020-03-25,0.375,0.46125,14.42625,40.0,2.0,1.015933,0.005768,0.655505
48700,157953448.0,702263.0,13.6,C,2013-12-06,0.0775,0.07875,12.743,32.0,105.0,0.936985,0.002339,0.100346
78746,161490489.0,702263.0,15.75,C,2016-12-16,0.11,0.12125,15.4915,3.0,35.0,0.983587,0.007595,0.093418


Creating the first neural network

In [None]:
dataframe_BS = np.vstack((df['strike'].values,
                      df['underlyings_price'].values,
                      df['days_to_maturity'].values,
                      df['volatility'].values,
                      df['rate'].values,
                      df['contract_price'].values,
                      df['contract_volume'].values)).T

In [None]:
train_ds, valid_ds, test_ds = pipeline1(dataframe_BS, scaling=False)
train_copy, valid_copy, test_copy = pipeline1(dataframe_BS, prefetch=False)

(85999, 7) (10750, 7) (10750, 7)
(85999, 7) (10750, 7) (10750, 7)


In [None]:
df['days_to_maturity'].value_counts().to_dict()[2.0]

2667

In [None]:
normal_train_ds, normal_valid_ds, normal_test_ds = pipeline2(df, True, 'normalize', 32, 1000)
scaled_train_ds, scaled_valid_ds, scaled_test_ds = pipeline2(df, True, 'minmax', 32, 1000)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_train[df.columns] = normalizer.fit_transform(df_train[df.columns])
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_val[df.columns] = normalizer.transform(df_val[df.columns]) # transform using the values from the training set
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df_test[df.columns] = 

(85999, 6) (10750, 6) (10750, 6)
(85999, 6) (10750, 6) (10750, 6)


In [None]:
normal_test_ds.element_spec

(TensorSpec(shape=(None, 5), dtype=tf.float32, name=None),
 TensorSpec(shape=(None,), dtype=tf.float32, name=None))

In [None]:
print_num_epochs = 5 # print progress every print_num_epochs epochs

drive_folder_path = '/content/drive/MyDrive/BTechProject/Models'
os.makedirs(drive_folder_path, exist_ok=True)
model_filename = 'NN_model_rough.h5'
path_to_save = os.path.join(drive_folder_path, model_filename)

patience = 10

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

loss = tf.keras.losses.MeanAbsoluteError(name='loss')

metrics = tf.keras.metrics.MeanAbsolutePercentageError(name='accuracy')

num_epochs = 10

input_shape = (6,)

num_layers = 3

hidden_units = [14, 14, 14]

output_shape = (1, )

batchnorm = True

dropout = None

model = getModel(input_shape = input_shape,
                num_layers   = num_layers,
                 hidden_units = hidden_units,
                 output_shape = output_shape,
                 batchnorm = batchnorm,
                 dropout = dropout)
model.summary()

Model: "model_5"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_6 (InputLayer)        [(None, 6)]               0         
                                                                 
 flatten_5 (Flatten)         (None, 6)                 0         
                                                                 
 dense_18 (Dense)            (None, 14)                98        
                                                                 
 batch_normalization_3 (Batc  (None, 14)               56        
 hNormalization)                                                 
                                                                 
 dense_19 (Dense)            (None, 14)                210       
                                                                 
 batch_normalization_4 (Batc  (None, 14)               56        
 hNormalization)                                           

In [None]:
ckpt = CheckpointCallback(path_to_save)
printing =PrintProgress(num_epochs=print_num_epochs)
early_stop = tf.keras.callbacks.EarlyStopping(patience=patience, monitor='val_loss')
callbacks = [ckpt, printing, early_stop]

In [None]:
testing = False
if testing:
    history = compile_and_fit(model,
                          optimizer,
                          loss,
                          num_epochs,
                          train_ds,
                          valid_ds,
                          metrics,
                          callbacks,
                          verbose=True
                          )

## Tuning the hyperparameters

In [None]:
import keras_tuner

In [None]:
random_tuner = customTuner(input_shape, output_shape,
                            objective='val_loss',
                            max_trials=10,
                            executions_per_trial=1,
                            overwrite=True,
                            directory='hyperparams/RandomSearch',
                            project_name='Black-Scholes')
random_tuner.search_space_summary()

Search space summary
Default search space size: 0


In [None]:
random_tuner.search(train_ds, valid_ds, epochs=5)

Trial 10 Complete [00h 00m 39s]
val_loss: 79.6660385131836

Best val_loss So Far: 79.2119140625
Total elapsed time: 00h 08m 48s


## Building the best model now

In [None]:
hypermodel = customHyperModel()
best_model = hypermodel.build(random_tuner.get_best_hyperparameters()[0])
best_model.summary()

Model: "model_16"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_17 (InputLayer)       [(None, 6)]               0         
                                                                 
 flatten_16 (Flatten)        (None, 6)                 0         
                                                                 
 dense_56 (Dense)            (None, 7)                 49        
                                                                 
 dense_57 (Dense)            (None, 5)                 40        
                                                                 
 dense_58 (Dense)            (None, 1)                 6         
                                                                 
Total params: 95
Trainable params: 95
Non-trainable params: 0
_________________________________________________________________


Results of search and used hyperparameters

In [None]:
random_tuner.results_summary()

Results summary
Results in hyperparams/RandomSearch/Black-Scholes
Showing 10 best trials
Objective(name="val_loss", direction="min")

Trial 01 summary
Hyperparameters:
num_layers: 2
activation: elu
learning_rate: 0.0010882806869262302
rate_decay: 0.8644126938673833
l1_coeff: 1.4020113723467632e-07
l2_coeff: 1.0656989809910276e-07
units_1: 7
units_2: 5
Score: 79.2119140625

Trial 02 summary
Hyperparameters:
num_layers: 4
activation: elu
learning_rate: 0.0011649730037648692
rate_decay: 0.9744292088196477
l1_coeff: 1.9975993642369768e-07
l2_coeff: 2.66109764975253e-07
units_1: 6
units_2: 6
units_3: 5
units_4: 5
Score: 79.2514877319336

Trial 06 summary
Hyperparameters:
num_layers: 2
activation: elu
learning_rate: 0.006155357671731701
rate_decay: 0.992118419714269
l1_coeff: 5.675623645220833e-08
l2_coeff: 1.3786824599660509e-07
units_1: 6
units_2: 6
units_3: 7
units_4: 7
units_5: 7
Score: 79.28911590576172

Trial 08 summary
Hyperparameters:
num_layers: 1
activation: elu
learning_rate: 0.00

# Training the model

In [None]:
best_model.fit(train_ds, epochs = 10, validation_data = valid_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<keras.callbacks.History at 0x7af12da97be0>

In [None]:
drive_folder_path = '/content/drive/MyDrive/BTechProject/Models'
os.makedirs(drive_folder_path, exist_ok=True)
model_filename = 'BS-0909.h5'
path_to_model = os.path.join(drive_folder_path, model_filename)
best_model.save(path_to_model)

In [None]:
from tensorflow import keras
model = keras.models.load_model(path_to_model)
model.evaluate(test_ds)



[79.69095611572266, 3260010.5]

## Trying out SVM

In [None]:
def no_progress_loss(iteration_stop_count=20, percent_increase=0.0):
    """
    Stop function that will stop after X iteration if the loss doesn't increase
    Parameters
    ----------
    iteration_stop_count: int
        search will stop if the loss doesn't improve after this number of iteration
    percent_increase: float
        allow this percentage of variation within iteration_stop_count.
        Early stop will be triggered if the data didn't change for more than this number
        after iteration_stop_count rounds
    """

    def stop_fn(trials, best_loss=None, iteration_no_progress=0):
        new_loss = trials.trials[len(trials.trials) - 1]["result"]["loss"]
        if best_loss is None:
            return False, [new_loss, iteration_no_progress + 1]
        best_loss_threshold = best_loss - abs(best_loss * (percent_increase / 100.0))
        if new_loss < best_loss_threshold:
            best_loss = new_loss
            iteration_no_progress = 0
        else:
            iteration_no_progress += 1

        return (
            iteration_no_progress >= iteration_stop_count,
            [best_loss, iteration_no_progress],
        )

    return stop_fn

def no_progress_loss_1(iteration_stop_count=5, percent_increase=0.0):
    """
    Stop function that will stop after X iteration if the loss doesn't increase
    Parameters
    ----------
    iteration_stop_count: int
        search will stop if the loss doesn't improve after this number of iteration
    percent_increase: float
        allow this percentage of variation within iteration_stop_count.
        Early stop will be triggered if the data didn't change for more than this number
        after iteration_stop_count rounds
    """

    def stop_fn(trials, best_losses=[]):
        new_loss = trials.trials[len(trials.trials) - 1]["result"]["loss"]
        if not best_losses:
            return False, [new_loss]

        best_losses.append(min(new_loss,best_losses[-1]))

        if len(best_losses)<=iteration_stop_count+1:
            return False, [best_losses]
        else:
            return best_losses[-iteration_stop_count:] == [best_losses[-iteration_stop_count]] * iteration_stop_count, [best_losses]

    return stop_fn

## Pre Processing

In [None]:
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split


def propocessed(data, shuffle = True):
    """
    Returns tuples of normalized train and test data.

    Args:
        - dataframe: ndarray, dataframe of the data
        - shuffle: boolean, whether to shuffle the dataframe

    Output:
        - (x_train, y_train) , (x_test, y_test): tuple of arrays which are between 0 and 1

    """

    # separate features and targets
    x_data, y_data = data[:,:-1], data[:,-1]

    # perform train and test set split with optional shuffle
    x_train, x_test, y_train, y_test = train_test_split(x_data, y_data ,test_size = 0.2, shuffle= shuffle)

    # standardize the dataset
    scaler = MinMaxScaler()
    x_train = scaler.fit_transform(x_train)
    x_test = scaler.fit_transform(x_test)

    return (x_train, y_train) , (x_test, y_test)


def makeBS(df):
    """
    Extract inputs to BS model from the dataframe passed in

    Arg:
        df: pd.DataFrame, contains all data read from data.csv

    Output:
        np.array: contains inputs to BS model
    """

    dataframe_BS = np.vstack((df['strike'].values,
                      df['underlyings_price'].values,
                      df['days_to_maturity'].values,
                      df['volatility'].values,
                      df['rate'].values,
                      df['contract_price'].values,
                      df['contract_volume'])).T
    return dataframe_BS

In [None]:
import os
import pathlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.svm import SVR
from sklearn.model_selection import StratifiedKFold, GridSearchCV, RandomizedSearchCV, cross_val_score
from hyperopt import tpe, STATUS_OK, Trials, hp, fmin, STATUS_OK, space_eval

## Conventional Methods

In [None]:
"""Implements Black-Scholes option pricing model."""

import numpy as np
from scipy.stats import norm

def generate_bs_vec(df):
    """
    Produces result for multiple B-S runs for call options only (analytical).

    Args:
        - df: dataframe, containing all parameters and has the following entries
            - underlyings_price: float, current price of underlying
            - volatility: float, current volatility of underlying
            - rate: float, risk free rate
            - strike: float, strike price
            - days_to_maturity: float, days to maturity
            - moneyness: float, moneyness of option

    Output:
        - result: ndarray, containing prices for each sample
    """
    S   = df['underlyings_price'].values
    vol   = df['volatility'].values
    r     = df['rate'].values
    K     = df['strike'].values
    T     = df['days_to_maturity'].values / 365
    m     = df['moneyness'].values

    d1 = (np.log(m) + (r + 0.5 * vol ** 2) * T) / (vol * np.sqrt(T))
    d2 = d1 - vol * np.sqrt(T)

    S_call = S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)

    return S_call


In [None]:
import os
import time
import pathlib
import numpy as np
import pandas as pd

In [None]:
from google.colab import drive
drive.mount('/content/drive')
import pandas as pd
file_path = '/content/drive/MyDrive/BTechProject/Data/data.csv'
df = pd.read_csv(file_path)
df = df.drop(['Unnamed: 0'], axis=1)
df.head()

Mounted at /content/drive


Unnamed: 0,optionid,securityid,strike,callput,date_traded,contract_price,market_price,underlyings_price,contract_volume,days_to_maturity,moneyness,rate,volatility
0,150034236.0,504569.0,0.42,C,2006-10-18,0.0715,0.07025,0.4885,5.0,2.0,1.163095,0.053646,0.022956
1,150247468.0,504880.0,40.0,C,2006-10-18,0.124,0.1225,39.913799,56137.0,2.0,0.997845,0.053646,0.114784
2,150255000.0,506496.0,62.0,C,2006-10-18,0.172,0.174,61.827798,27369.0,2.0,0.997223,0.053646,0.106823
3,150255496.0,506497.0,53.5,C,2006-10-18,0.296,0.2655,53.6129,1224.0,2.0,1.00211,0.053646,0.110336
4,150255498.0,506497.0,54.0,C,2006-10-18,0.075,0.0645,53.6129,963.0,2.0,0.992831,0.053646,0.110336


In [None]:
dataframe_BS = makeBS(df)
print(dataframe_BS[0])
small_ds = df.sample(frac=0.2, random_state=42)
small_dataframe_BS = makeBS(small_ds)

[0.42       0.48849998 2.         0.0229555  0.05364576 0.0715
 5.        ]


In [None]:
(x_train, y_train) , (x_test, y_test)= propocessed(dataframe_BS)
print(np.shape(x_train), np.shape(y_train), np.shape(x_test), np.shape(y_test))
(small_x_train, small_y_train) , (small_x_test, small_y_test)= propocessed(small_dataframe_BS)
print(np.shape(small_x_train), np.shape(small_y_train), np.shape(small_x_test), np.shape(small_y_test))
print(x_train[0])

(85999, 6) (85999,) (21500, 6) (21500,)
(17200, 6) (17200,) (4300, 6) (4300,)
[0.00729351 0.00833113 0.09917355 0.26593516 0.90822978 0.21369235]


In [None]:
regressor = SVR(kernel = 'rbf')

In [None]:
regressor.fit(x_train, y_train)

In [None]:
y_pred_svr = regressor.predict(x_test)

In [None]:
# New MSE including contract volume
mse = np.mean((y_test-y_pred_svr)**2)
mse

3038858.2380520427

In [None]:
# Old MSE without Contract_Volume
mse = np.mean((y_test-y_pred_svr)**2)
mse

0.007385946319633321

In [None]:
# Old MAPE
mape = np.mean((np.abs((y_test-y_pred_svr)/y_test)))
mape

0.5550090087703028

In [None]:
# New MAPE
mape = np.mean((np.abs((y_test-y_pred_svr)/y_test)))
mape

2.5030091950984046

# BLACK SCHOLES

In [None]:
output_bs = generate_bs_vec(df)

  d1 = (np.log(m) + (r + 0.5 * vol ** 2) * T) / (vol * np.sqrt(T))


In [None]:
test_output_bs = np.sum((output_bs - df['contract_price'].values)**2) / len(df)

In [None]:
print('MSE for B-S: ', test_output_bs)

MSE for B-S:  0.06926002258211712


In [None]:
test_output_bs_per = np.mean(np.abs(output_bs - df['contract_price'].values)/df['contract_price'].values)
print(test_output_bs_per)

0.590128666454265
