## Starting Point

A pricing model we want to replicate with a neural network.

## Motivation

The belief that the neural network will cost less compute to train and predict prices (RFQs) faster than the full model. The use case will target complex path dependant (montecarlo) models that have a high compute cost and are relativly slow.

In [2]:
#
# Some set-up for the NN environment, for this we are using TensorFlow and Keras.
#
import tensorflow as tf
import numpy as np
import random
import inspect
import time
import re
from enum import Enum
from typing import Tuple, List, Union, Callable
from scipy.stats import norm
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.model_selection import train_test_split
from sklearn import preprocessing
from sklearn.utils import shuffle
from sklearn.preprocessing import MinMaxScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from itertools import permutations
import pandas as pd

In [3]:
#
# This is so we can visualise results
#
from mpl_toolkits.mplot3d import axes3d
import matplotlib.pyplot as plt
import numpy as np
from matplotlib import cm
from mpl_toolkits.axes_grid1 import make_axes_locatable
%matplotlib inline

## Simple Model

For the purpose of experimenting with the process we define a simple Black Scholes option model. This will allow us to show the work flow and highlight the challenges with the approach.

The specific model implementation below is arbitrary, it is just a means to show how the neural network can learn a function of the same order of magnitude of complexity as a simple option contract.

In [None]:
def black_scholes_model(S: float,
                        K: float,
                        T: float,
                        r: float,
                        v: float,
                        o: float = float(0)) -> float:
    """
    Implementation of Black Scholes model.

    Trivial example to demo of proof of concept for fitting an option pricing model
    with a neural network.

    :param S: Spot
    :param K: Strike
    :param T: Time to maturity
    :param r: risk free rate
    :param v: underlying volatility
    :param o: type, call = 0 or put = 1
    :return: Option Price
    """
    d1 = (np.log(S / K) + (r + 0.5 * v ** 2) * T) / (v * np.sqrt(T))
    d2 = (np.log(S / K) + (r - 0.5 * v ** 2) * T) / (v * np.sqrt(T))

    if o == 0.0:
        return S * norm.cdf(d1) - K * np.exp(-r * T) * norm.cdf(d2)
    if o == 1.0:
        return K * np.exp(-r * T) * norm.cdf(-d2) - S * norm.cdf(-d1)

    assert False, "Option must be call=0 or put=1"

## Pricing Scenarios

In the real use case we will collect the back history  RFQ ( (request for quote) as training data for the neural network. This presents the first challenge as to train the model we need a spread of data across all combinations of all parameters and instrument details that impact the price we are trying to predict. If we have gaps in our training data there will also be a weakness in the model when predicting prices with that specific combination of inputs.

The nature of neural networks is to give results, even an totally untrained neural network will make predictions, just erroneous ones. So we will need to review the spread of inputs we have and make sure (expert judgment) that there are sufficient examples for all of teh combinations we are likely to quote. If the markets shift significantly we can also add additional training data, but we must be aware when the model is being asked to make predictions outside of its experience.

In this example we will create training data from scratch by running scenarios by varying the input parameters and capturing the results.

In [None]:
def parameter_scenarios(vol_range: Tuple[float, float] = (0.01, 0.4),
                        mat_range: Tuple[float, float] = (1e-6, 1.0),
                        spot_range: Tuple[float, float] = (1.0, 100.0),
                        strike_range: Tuple[float, float] = (1.0, 100.0),
                        rate_range: Tuple[float, float] = (0.01, 0.25),
                        num_steps=50) -> Tuple[List[float], List[float], List[float], List[float], List[float]]:
    """
    Create scenario vectors for volatility, maturity, spot, strike and risk-free rate

    :param vol_range: The range to vary volatility over
    :param mat_range: The range to vary maturity over
    :param spot_range: The range to vary spot over
    :param strike_range: The range to vary strike over
    :param rate_range: The range to vary risk-free rate over
    :param num_steps: number of scenario steps
    :return: volatility, maturity, spot, strike and risk-free rate scenarios as list of float
    """
    vols = [vol_range[0] + (x * ((vol_range[1] - vol_range[0]) / (num_steps - 1)))
            for x in range(num_steps)]
    mats = [mat_range[0] + (x * ((mat_range[1] - mat_range[0]) / (num_steps - 1)))
            for x in range(num_steps)]
    spots = [spot_range[0] + (x * ((spot_range[1] - spot_range[0]) / (num_steps - 1)))
             for x in range(num_steps)]
    strikes = [strike_range[0] + (x * ((strike_range[1] - strike_range[0]) /
                                  (num_steps - 1))) for x in range(num_steps)]
    rates = [rate_range[0] + (x * ((rate_range[1] - rate_range[0]) / (num_steps - 1)))
             for x in range(num_steps)]
    return vols, mats, spots, strikes, rates

In [None]:
def combinations(list1: Union[List[float], float], list2: Union[List[float], float]) -> List:
    """
    Return all combinations of the given lists

    :param list1: The first list
    :param list2: The second list
    :return: List of all combinations of lists 1 and List 2
    """
    if not isinstance(list1, List):
        list1 = [list1]
    if not isinstance(list2, List):
        list2 = [list2]
    res = []
    for x in list1:
        for y in list2:
            if not isinstance(x, List):
                x = [x]
            if not isinstance(y, List):
                y = [y]
            res.append([*x, *y])
    return res

In [None]:
def generate_model_scenarios(spot: Union[List[float], float],
                             strike: Union[List[float], float],
                             mat: Union[List[float], float],
                             rate: Union[List[float], float],
                             vol: Union[List[float], float]
                             ) -> List[Tuple[float, float, float, float, float]]:
    """
    Return a set of model scenario inputs based on given parameter scenarios

    :param vol: List of volatilities or single volatility
    :param vol: List of maturities or single maturity
    :param vol: List of spots or single spot
    :param vol: List of strike or single strike
    :param vol: List of rates or single rate
    :return: List of all combinations [[s,k,t,r,v]]
    """
    return combinations(combinations(combinations(combinations(spot, strike), mat), rate), vol)

In [None]:
def two_d_scenario(spot: Union[List[float], float],
                   strike: Union[List[float], float],
                   mat: Union[List[float], float],
                   rate: Union[List[float], float],
                   vol: Union[List[float], float],
                   price_func: Callable,
                   scaler=None) -> Tuple[List[float], str, List[float], str, np.ndarray]:

    """
    Generate a scenario where two of the given parameters are scenarios

    :param spot: spot scenario or single spot
    :param strike: strike scenario or single strike
    :param maturity: maturity scenario or single maturity
    :param rate: rate scenario or single rate
    :param vol: volatility scenario or single volatility
    :param price_func: The pricing function, either black_scholes or model.predict
    :param scaler: The scaler used to normalise the model inputs
    :return: scenario List 1, scenario parameter 1 name, scenario List 2, scenario parameter 2 name, scenario prices

    """
    params = [spot, strike, mat, rate, vol]
    arg_names = [*inspect.signature(black_scholes_model).parameters.keys()]
    scenario_params = [[x[0][0], x[0][1], x[1]] for x in zip(
        enumerate(params), arg_names) if isinstance(x[0][1], List)]
    assert (len(scenario_params) ==
            2), "Only two parameters can be passed as scenario lists"
    prices = np.zeros((len(scenario_params[0][1]), len(scenario_params[1][1])))
    for i, x in enumerate(scenario_params[0][1]):
        if scaler is None: # cell by cell
            for j, y in enumerate(scenario_params[1][1]):
                params[scenario_params[0][0]] = x
                params[scenario_params[1][0]] = y
                prices[i, j] = (price_func)(*params)
        else: # row by row as it is quicker when calling model.predict
            params[scenario_params[0][0]] = x
            params[scenario_params[1][0]] = None
            param_set = np.tile(np.asarray(params),
                                (len(scenario_params[1][1]), 1))
            param_set[:, scenario_params[1][0]] = scenario_params[1][1]
            param_set = scaler.transform(param_set)
            prices[i, :] = ((price_func)(param_set.astype(np.float64))).reshape(
                1, len(scenario_params[1][1]))
        print("{:0.0f} % Complete".format(
            100 * ((i*len(scenario_params[1][1]))/prices.size)))
    print("Done")
    return scenario_params[0][1], scenario_params[0][2],  scenario_params[1][1], scenario_params[1][2], prices

### Step 1. Generate parameter scenarios

Generate a range for each model paramater type. These will be used to generate training data by calling the model for each combination of the parameters.

In a real case we would try to collect this data as a side effect of where the real model was being used and only augment the training set where we observed gaps or thin spots in the training data

In [None]:
vols, mats, spots, strikes, rates = parameter_scenarios(num_steps=20)
X = generate_model_scenarios(spot=spots, strike=strikes, mat=mats, rate=rates, vol=vols)

In [None]:
Y = [black_scholes_model(*params) for params in X]

In [None]:
dfy = pd.DataFrame(Y, columns=['price'])
dfx = pd.DataFrame(X, columns=['vols', 'mats', 'spots', 'strikes', 'rates'])
dfRaw = dfx.join(dfy)

In [None]:
dfRaw.to_csv('XYRaw', encoding='ascii', index=False)

In [None]:
scaler = MinMaxScaler(feature_range=(0, 1),copy=False).fit(X)
XScaled = scaler.transform(X)
dfxs = pd.DataFrame(XScaled, columns=['vols', 'mats', 'spots', 'strikes', 'rates'])
dfScaled = dfxs.join(dfy)

In [None]:
dfScaled.to_csv('XYScaled', encoding='ascii', index=False)

### Step 3. Create a neural network that can be regression trained to predict the price.

This is where art meet science in that there are not hard and fast rules for the size, shape and architecture of a neural network that will be able to converge on a general form of teh function in a given set of data (if one exists). There are guidelines and many types of layer that fit certain patterns for image processing etc, so there is always informed experimentation at this stage.

This is a very simple model as the data set is relatively small, however even a simple data set such as this took a number of experiments to get the right balance.

In [None]:
def create_neural_net():
    """
    Create a Neural network with an architecture tuned to regression.
    """

    # Create a NN of 5 Dense layers, with 5 inputs (number of BS parameters) abd a single
    # output that predicts the price for the given parameters
    model = Sequential()

    model.add(Dense(32, input_shape=(5,),
                    kernel_initializer='normal', activation='tanh'))
    model.add(Dense(16,
                    kernel_initializer='normal', activation='tanh'))
    model.add(Dense(8,
                    kernel_initializer='normal', activation='tanh'))
    model.add(Dense(4,
                    kernel_initializer='normal', activation='tanh'))
    model.add(Dense(2,
                    kernel_initializer='normal', activation='tanh'))
    model.add(Dense(1,
                    kernel_initializer='normal', activation='linear'))

    # Print the model architecture out
    model.summary()

    # Compile the model with the Adam optimizer, with a tuned step size.
    opt = tf.keras.optimizers.Adam(learning_rate=0.0005)
    model.compile(loss='mean_squared_error', optimizer=opt)

    return model

### Step 5. Shuffle the training data & split into Test & Train data sets

1. The training data is shuffled to improve training accuracy
2. Split out a third of the data as a validation set
3. Ensure all training and test data is pur numpy arrays (not always the case after using pipeline transforms)

In [None]:
class ModelType(Enum):
    NEURAL_NET = 1
    RANDOM_FOREST_REGRESSOR = 2


def fit(X: np.ndarray,
        Y: np.ndarray,
        model_type: ModelType):
    """
    Prepare the model inputs and fit the data with the given model type.
    
    :param X: Pricing parameter scenarios
    :param Y: Prices corresponding to given inputs
    :return : The tf model, training history & test and training data used X_train, Y_train, X_test, Y_test

    """

    # Shuffle & scale the data

    X = XScaled
    Xs, Ys = shuffle(X, Y, random_state=42)
    X_train, X_test, Y_train, Y_test = train_test_split(Xs, Ys, test_size=0.33)
    X_train = np.asarray(X_train)
    X_test = np.asarray(X_test)
    Y_train = np.asarray(Y_train)
    Y_test = np.asarray(Y_test)

    # Create a check point to save the model version with the best validation loss

    saveBest = ModelCheckpoint(filepath='best',
                               monitor='val_loss',
                               verbose=1,
                               save_best_only=True,
                               mode='min')

    # Fit to the requested type

    if model_type == ModelType.NEURAL_NET:
        print("Create and fit Neural Network")
        model = create_neural_net()
        history = model.fit(X_train, Y_train,
                            batch_size=256,
                            epochs=50,
                            verbose=1,
                            validation_data=(X_test, Y_test),
                            callbacks=[saveBest])
        model.load_weights('best') # load back the model corresponding to lowest validation loss
    elif model_type == ModelType.RANDOM_FOREST_REGRESSOR:
        print("Create and fit Random Forest Regressor")
        model = RandomForestRegressor(n_estimators=250, verbose=2)
        model.fit(X_train, Y_train)
        history = None
    else:
        assert False, "Invalid model type specified, see model types defined by class ModelType"

    mse = mean_squared_error(model.predict(X_test), Y_test)
    print(f"Final Mean Square Error [{mse:5f}]")
    return model, history, X_train, Y_train, X_test, Y_test

### Step 6. Train the model but fitting to the given training data

In [None]:
model, history,X_train, Y_train, X_test, Y_test = fit(X, Y, ModelType.NEURAL_NET)

In [None]:
def plot_training_history(loss,
                          validation_loss, 
                          skip=0):
    """
    Plot a two axis line graph of training and validation losses
    
    :param loss: Training losses per epoch
    :param validation_ loss: Validation losses per epoch
    :param skip: don't plot the first skip points

    """
    fig, ax1 = plt.subplots()
    ax2 = ax1.twinx()
    ax1.set_xlabel('Training Epoch')
    ax1.set_ylabel('Loss', color='r')
    ax2.set_ylabel('Validation Loss', color='b')
    ax1.plot(loss[skip:], color='r')
    ax2.plot(validation_loss[skip:], color='b')
    plt.show()

### Step 7. Examine the training losses and verify if model has learned a generalisation of the option function.

We expect to see the Loss (red line) drop off exponentially as the model learns (fits) the data and the error between actual and predicted reduces.

The validation loss (blue line) should also drop off as the model get better at predicting for test data (hold out) that it has not seen during training. If the loss increases, this is an indication that the model is too powerful and has just learned the training data (over fitted) rather than fitting a generalised form of the function in the training data.

In [None]:
if history is not None:
    plot_training_history(history.history['loss'],
                          history.history['val_loss'],
                          skip=9)

### Step 8. Generate a result set based on direct model calls.

We generate a set of prices based on direct model calls so we can plot a pricing surface and see what the function looks like. This then also acts as the target surface we expect to see when we predict and plot the prices with the model


In [None]:
vols, mats, spots, strikes, rates = parameter_scenarios(num_steps=20)

In [None]:
# Rate vs Vol Scenario
s1,s1n,s2,s2n,actual = two_d_scenario(spot=99, strike=100, mat=0.75, rate=rates,vol=vols, price_func=black_scholes_model)
s1,s1n,s2,s2n,predicted = two_d_scenario(spot=99, strike=100, mat=0.75, rate=rates,vol=vols, price_func=model.predict, scaler=scaler)

In [None]:
s1,s1n,s2,s2n,actual = two_d_scenario(spot=99, strike=100, mat=mats, rate=0.05,vol=vols, price_func=black_scholes_model)
s1,s1n,s2,s2n,predicted = two_d_scenario(spot=99, strike=100, mat=mats, rate=0.05,vol=vols, price_func=model.predict, scaler=scaler)

In [None]:
s1,s1n,s2,s2n,actual = two_d_scenario(spot=spots, strike=strikes, mat=1.0, rate=0.05,vol=.2, price_func=black_scholes_model)
s1,s1n,s2,s2n,predicted = two_d_scenario(spot=spots, strike=strikes, mat=1.0, rate=0.05,vol=0.2, price_func=model.predict, scaler=scaler)

In [None]:
def plot_price_surface(xscen,
                       yscen,
                       actual,
                       predicted,
                       title,
                       xscen_lab,
                       yscen_lab):
    """
    Plot dual surface of actual vs predicted, with a contour plot of price difference as %
    """
    minz = min(np.min(actual), np.min(predicted))
    maxz = max(np.max(actual), np.max(predicted))

    X, Y = np.meshgrid(xscen, yscen)
    fig = plt.figure(figsize=(10, 10))
    ax = fig.add_subplot(111, projection='3d')
    ax.set_zlim3d(minz, maxz)
    c1 = ax.contourf(X, Y, ((actual-predicted)/np.maximum(1e-9,actual))*100.0,
                     levels=range(-100, 100, 1), cmap=cm.RdGy, offset=np.min(minz))
    s1 = ax.plot_surface(X, Y, actual, cmap=cm.coolwarm,
                         linewidth=0, edgecolor='none', alpha=.7)
    s2 = ax.plot_surface(X, Y, predicted, edgecolors='k',
                         linewidth=0.1, color='gray', alpha=.3)
    cax = fig.add_axes([ax.get_position().x1+0.05, ax.get_position().y0 +
                       ((ax.get_position().height)*0.15), 0.02, (ax.get_position().height)*0.7])
    cb = fig.colorbar(c1, cax=cax)
    cb.ax.set_ylabel('% difference', rotation=270)
    ax.set_xlabel(xscen_lab)
    ax.set_ylabel(yscen_lab)
    ax.set_zlabel("Option Price")
    ax.set_title(title)
    plt.show()

## Actual vs Predicted

### Surface

We plot the actuals as a surface (blue-red) and then overlay the predicted as a translucent gray surface, so we can see where the two diverge. We also plot a contour of the % difference at every point.

In [None]:
plot_price_surface(vols, mats, actual, predicted, title="Predicted vs Actual Comparison",
                   xscen_lab=s1n, yscen_lab=s2n)

### Scatter 
If we scatter plot actual vs predicted then we would expect to see a straight line, as for any given scenario point the actual and predicted would be equal.

In [None]:
plt.scatter(actual.flatten(), predicted.flatten(), s=0.25)

In [238]:
@tf.function(jit_compile=True,
             reduce_retracing=True,
             input_signature=(tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),))
def binomial_model_tf(S: float,
                      K: float,
                      T: float,
                      r: float,
                      v: float,
                      CallPut: float,
                      EurAm: float,
                      n: int = 1000) -> float:
    '''
    Binomial Tree for European and American calls and puts.

    The break time up into steps param:n, and creates a corresponding binomial tree. The paths of the
    tree are then traversed and the option price evaluated at each node. This then allows for a stock price
    path dependent value to be obtained, which is then able to support the early exercise for American style
    options.

    This model is thus more compute intensive than the closed form equivalents. These path dependent models
    are more typical for exotic (complex) options where there is no closed form solution. These are the type of
    compute expensive options we would like to evaluate with a neural network, as we believe it is less expensive
    to train the model and quicker to price, than it is to run large numbers of prices from the full path dependent
    call.

    :param S: Spot
    :param K: Strike
    :param T: Time to maturity
    :param r: risk free rate
    :param v: underlying volatility
    :param callPut: Call = 0.0 or Put 1.0
    :param EurAm: European = 0.0 or American 1.0
    :return: Option Price
    """
    '''
    deltaT = tf.cast(T/n, dtype=tf.float64)
    u = tf.cast(tf.exp(v*tf.sqrt(deltaT)), dtype=tf.float64)
    d = tf.cast(1/u, dtype=tf.float64)
    p = tf.cast((tf.exp(r*deltaT) - d)/(u - d), dtype=tf.float64)
    q = tf.cast(1 - p, dtype=tf.float64)

    nc = tf.cast(n, dtype=np.int32)
    underlying = tf.zeros((nc+1, nc+1), dtype=tf.float64)
    underlying = tf.tensor_scatter_nd_update(underlying, [[0, 0]], [S])

    res = tf.scan(lambda a, x: a*u, elems=underlying[:, 0])
    underlying = tf_updateColumn(underlying, 0, res)

    return underlying

    for i in range(1, nc+1):
        underlying = tf.tensor_scatter_nd_update(
            underlying, [[i, 0]], [underlying[i-1, 0]*u])
        for j in range(1, i+1):
            underlying = tf.tensor_scatter_nd_update(
                underlying, [[i, j]], [underlying[i-1, j-1]*d])

    return underlying

    price = np.zeros((n+1, n+1))

    for i in range(n+1):
        if CallPut == float(0):
            price[n, i] = max(0, underlying[n, i] - K)
        elif CallPut == float(1):
            price[n, i] = max(0, K - underlying[n, i])

    for i in range(n-1, -1, -1):
        for j in range(i+1):
            if CallPut == float(0) and EurAm == float(0):
                price[i, j] = max(
                    0, underlying[i, j]-K, np.exp(-r*deltaT)*(p*price[i+1, j]+q*price[i+1, j+1]))
            elif CallPut == float(1) and EurAm == float(0):
                price[i, j] = max(
                    0, K-underlying[i, j], np.exp(-r*deltaT)*(p*price[i+1, j]+q*price[i+1, j+1]))
            elif CallPut == float(0) and EurAm == float(1):
                price[i, j] = np.exp(-r*deltaT) * \
                    (p*price[i+1, j]+q*price[i+1, j+1])
            elif CallPut == float(1) and EurAm == float(1):
                price[i, j] = np.exp(-r*deltaT) * \
                    (p*price[i+1, j]+q*price[i+1, j+1])

    return price[0, 0]

In [239]:
print(binomial_model_tf(S=101, K=99, T=1, r=0.05, v=0.2, CallPut=0, EurAm=0))

tf.Tensor(
[[  101.             0.             0.         ...     0.
      0.             0.        ]
 [  101.64080435     0.             0.         ...     0.
      0.             0.        ]
 [  102.28567435     0.             0.         ...     0.
      0.             0.        ]
 ...
 [55660.53942914     0.             0.         ...     0.
      0.             0.        ]
 [56013.68315128     0.             0.         ...     0.
      0.             0.        ]
 [56369.06742821     0.             0.         ...     0.
      0.             0.        ]], shape=(1001, 1001), dtype=float64)


In [280]:
    S=tf.constant(3.142,dtype=tf.float64)
    u=tf.constant(1.5,dtype=tf.float64)
    d=tf.constant(2,dtype=tf.float64)
    underlying = tf.zeros((5, 10), dtype=tf.float64)
    underlying = tf.tensor_scatter_nd_update(underlying, [[0, 0]], [S])
    res = tf.scan(lambda a, x: a*u, elems=underlying[:,0])
    underlying = tf_updateColumn(underlying,0,res)
    res = tf.scan(lambda a, x: a*d, elems=underlying[0,:])
    underlying = tf_updateRow(underlying,0,res)
    print(underlying)

tf.Tensor(
[[   3.142    6.284   12.568   25.136   50.272  100.544  201.088  402.176
   804.352 1608.704]
 [   0.       0.       0.       0.       0.       0.       0.       0.
     0.       0.   ]
 [   0.       0.       0.       0.       0.       0.       0.       0.
     0.       0.   ]
 [   0.       0.       0.       0.       0.       0.       0.       0.
     0.       0.   ]
 [   0.       0.       0.       0.       0.       0.       0.       0.
     0.       0.   ]], shape=(5, 10), dtype=float64)


In [269]:
def tf_updateColumn(target, col_idx, col_vals):
    rows = tf.stack(tf.range(tf.shape(target)[0]))
    cols = tf.ones_like(rows)*col_idx
    idx = tf.stack((rows, cols), axis=1)
    res = tf.tensor_scatter_nd_update(target, idx, col_vals)
    return res

In [278]:
def tf_updateRow(target, row_idx, row_vals):
    rows = tf.stack(tf.range(tf.shape(target)[1]))
    cols = tf.ones_like(rows)*row_idx
    idx = tf.stack((cols, rows), axis=1)
    res = tf.tensor_scatter_nd_update(target, idx, row_vals)
    return res

In [271]:
print(tf_updateRow(underlying,0,res))

tf.Tensor(
[[0 0]
 [1 0]
 [2 0]
 [3 0]
 [4 0]
 [5 0]
 [6 0]
 [7 0]
 [8 0]
 [9 0]], shape=(10, 2), dtype=int32)


In [223]:
x=tf.ones((6,10))
v=tf.ones((6,))*2.0
print(v)
print(tf_updateColumn(x,1,v))

tf.Tensor([2. 2. 2. 2. 2. 2.], shape=(6,), dtype=float32)
tf.Tensor(
[[1. 2. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 2. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 2. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 2. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 2. 1. 1. 1. 1. 1. 1. 1. 1.]
 [1. 2. 1. 1. 1. 1. 1. 1. 1. 1.]], shape=(6, 10), dtype=float32)


In [None]:
@tf.function(input_signature=(tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),))
def mx(a, b, c):
    print(1)
    underlying = tf.zeros((5, 5),dtype=tf.float64)
    return tf.tensor_scatter_nd_update(underlying, [[0, 0]], [a*b*c])

In [None]:
@tf.function(jit_compile=True,
             reduce_retracing=True,
             input_signature=(tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),
                              tf.TensorSpec(shape=None, dtype=np.float64),))
def binomial_model(S: float,
                   K: float,
                   T: float,
                   r: float,
                   v: float,
                   CallPut: float,
                   EurAm: float,
                   n: int = 1000) -> float:
    '''
    Binomial Tree for European and American calls and puts.

    The break time up into steps param:n, and creates a corresponding binomial tree. The paths of the
    tree are then traversed and the option price evaluated at each node. This then allows for a stock price
    path dependent value to be obtained, which is then able to support the early exercise for American style
    options.

    This model is thus more compute intensive than the closed form equivalents. These path dependent models
    are more typical for exotic (complex) options where there is no closed form solution. These are the type of
    compute expensive options we would like to evaluate with a neural network, as we believe it is less expensive
    to train the model and quicker to price, than it is to run large numbers of prices from the full path dependent
    call.

    :param S: Spot
    :param K: Strike
    :param T: Time to maturity
    :param r: risk free rate
    :param v: underlying volatility
    :param callPut: Call = 0.0 or Put 1.0
    :param EurAm: European = 0.0 or American 1.0
    :return: Option Price
    """
    '''
    deltaT = T/n
    u = tf.exp(v*tf.sqrt(deltaT))
    d = 1/u
    p = (tf.exp(r*deltaT) - d)/(u - d)
    q = 1 - p

    underlying = np.zeros((n+1, n+1))
    underlying[0, 0] = S
    for i in range(1, n+1):
        underlying[i, 0] = underlying[i-1, 0]*u
        for j in range(1, i+1):
            underlying[i, j] = underlying[i-1, j-1]*d

    price = tf.zeros((n+1, n+1))

    for i in range(n+1):
        if CallPut == float(0):
            price[n, i] = max(0, underlying[n, i] - K)
        elif CallPut == float(1):
            price[n, i] = max(0, K - underlying[n, i])

    for i in range(n-1, -1, -1):
        for j in range(i+1):
            if CallPut == float(0) and EurAm == float(0):
                price[i, j] = max(
                    0, underlying[i, j]-K, tf.exp(-r*deltaT)*(p*price[i+1, j]+q*price[i+1, j+1]))
            elif CallPut == float(1) and EurAm == float(0):
                price[i, j] = max(
                    0, K-underlying[i, j], tf.exp(-r*deltaT)*(p*price[i+1, j]+q*price[i+1, j+1]))
            elif CallPut == float(0) and EurAm == float(1):
                price[i, j] = tf.exp(-r*deltaT) * \
                    (p*price[i+1, j]+q*price[i+1, j+1])
            elif CallPut == float(1) and EurAm == float(1):
                price[i, j] = tf.exp(-r*deltaT) * \
                    (p*price[i+1, j]+q*price[i+1, j+1])

    return price[0, 0]

In [None]:
def time_it(code: str) -> float:
    """
    Run the given code and return time in seconds for it to run

    :param code: The code to time
    :return: runtime in seconds
    """
    st = time.time()
    eval(code)
    return (time.time()-st)*10

In [125]:
Sv=99
binomial_model(S=Sv, K=99, T=1, r=0.05, v=0.2, CallPut=0, EurAm=0)
#print("{:10.4f} Seconds".format(time_it("black_scholes_model(S=100, K=99, T=1, r=0.05, v=0.2)")))

TypeError: in user code:

    File "C:\Users\parri\AppData\Local\Temp\ipykernel_432\362485379.py", line 49, in binomial_model  *
        underlying = np.zeros((n+1, n+1))

    TypeError: 'Tensor' object cannot be interpreted as an integer


In [None]:
print(binomial_model.pretty_printed_concrete_signatures())

In [None]:
@tf.function(input_signature=(tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),
                              tf.TensorSpec(shape=None, dtype=tf.float64),))
def mx(a, b, c):
    underlying = tf.zeros((5, 5),dtype=tf.float64)
    tf.tensor_scatter_nd_update(underlying, [[0, 0]], a*b*c)
    return tf.tensor_scatter_nd_update(underlying, [[0, 0]], [a*b*c])

In [None]:
print(mx(5,9,3))
print(mx.pretty_printed_concrete_signatures())

In [None]:
class StringFeature:
    """Encodes string features as numeric values so they can be passed as model inputs
    """
    features = [[re.compile("^[C|c].*"), float(0)],
                [re.compile("^[P|p].*"), float(1)],
                [re.compile("^[A|a].*"), float(1)],
                [re.compile("^[E|e].*"), float(0)]]

    @classmethod
    def encode(cls, feature_as_string: str) -> float:
        """Return the given feature as its numeric equivalent or asset if feature not known

        :param feature_as_string: The string feature to encode
        :return: The feature as it's numeric equivalent
        """
        encoded_feature = None
        for f in cls.features:
            if f[0].search(feature_as_string) is not None:
                encoded_feature = f[1]
                break
        assert encoded_feature is not None, "Unknown feature [{}], cannot encode".format(
            feature_as_string)
        return encoded_feature

    @classmethod
    def decode(cls, values: List[float], time_steps: int) -> Tuple[float, float, float, float, float, str, str]:
        S, K, T, r, v, c, s = values
        return [S, K, T, r, v, time_steps, ['Call', 'Put'][int(c)], ['European', 'American'][int(s)]]

## Extend feature set

We need to extend the feature set to cover the option style, Call or Put and early exercise or not, American or European

These string features needed encoding as numerical values so they can be passed to the model.

In [None]:
# Generate features without type and style.
vols, mats, spots, strikes, rates = parameter_scenarios(num_steps=20)
X = generate_model_scenarios(spot=spots, strike=strikes, mat=mats, rate=rates, vol=vols)

In [None]:
# Create all combinations with option type
# Call = 0, Put = 1
X = combinations(X, [0, 1])

In [None]:
# Create all combinations with option style
# American (early exercise) = 0, European = 1
X = combinations(X, [0, 1])

In [None]:
# Shuffle the training data
Xs = shuffle(X, random_state=42)
#scaler = MinMaxScaler(feature_range=(0, 1),copy=False).fit(Xs)
#Xss = scaler.transform(Xs)
dfxs = pd.DataFrame(Xs, columns=['spot', 'strike', 'maturity', 'rate', 'volatility','call_put','amer_eur'])
dfxs.to_csv('XFull.csv', encoding='ascii', index=False)

In [None]:
!dir

In [None]:
def binomial_model_wrapper(args: List) -> float:
    S, K, T, r, v, c, s = args
    print(binomial_model(S, K, T, r, v, c, s))

In [None]:
import multiprocessing
import time

In [None]:
X_train = dfxs.head(50).values.tolist()
Y_train = []
p = multiprocessing.Pool(2)
p.map(binomial_model_wrapper, X_train)