# Functions and Classes

## utils

In [None]:
import numpy as np
from scipy.stats import norm
import tensorflow as tf

In [None]:
def pwlin_basis(xknots):
    """Basis that represent a piecewise linear function with given knots"""
    fs = [lambda x: np.ones_like(x, dtype=float), lambda x: x-xknots[0]]
    fs.extend([lambda x, a=xknots[i]: np.maximum(x-a, 0) for i in range(len(xknots))])
    return fs

def pwlin_fit(xdata, ydata, xknots):
    """Fit a piecewise linear function with xknots to xdata and ydata"""
    fs = pwlin_basis(xknots)
    A = np.column_stack([f(xdata) for f in fs])
    ps = np.linalg.lstsq(A, ydata, rcond=None)[0]
    return ps, fs

def gauss_kern(x):
    """Gaussian kernel function"""
    return np.exp(-x**2/2)

def kern_reg(x, xdata, ydata, bandwidth, kern=gauss_kern):
    """Nadaraya-Watson Kernel Regression (Locally weighted average)

    Parameters
    ----------
    x: array_like, one-dimensional
        The x-coordinates of the target points
    xdata: array_like
        The x-coordinates of the data points.
    ydata: array_like
        The y-coordinates of the data points.
    bandwidth: positive scalar
        Bandwidth of the kernel
    kern: callable
        kernel function
    """
    weights = kern((xdata[:, np.newaxis] - x) / bandwidth)
    return np.sum(weights * ydata[:, np.newaxis], axis=0) / np.sum(weights, axis=0)


def ll_reg(x, xdata, ydata, bandwidth, kern=gauss_kern):
    """Local Linear Regression

    Parameters
    ----------
    x: array_like, one-dimensional
        The x-coordinates of the target points
    xdata: array_like
        The x-coordinates of the data points.
    ydata: array_like
        The y-coordinates of the data points.
    bandwidth: positive scalar
        Bandwidth of the kernel
    kern: callable
        kernel function
    """

    def func(xx):
        weights = np.sqrt(kern((xdata-xx)/bandwidth))
        b = ydata*weights
        A = np.column_stack((np.ones_like(xdata), xdata-xx))*weights[:, np.newaxis]
        yy, _ = np.linalg.lstsq(A, b, rcond=None)[0]
        return yy

    return np.vectorize(func)(x)

def blackscholes_price(K, T, S0, vol, r=0, q=0, callput='call'):
    """Compute the call/put option price in the Black-Scholes model

    Parameters
    ----------
    K: scalar or array_like
        The strike of the option.
    T: scalar or array_like
        The maturity of the option, expressed in years (e.g. 0.25 for 3-month and 2 for 2 years)
    S0: scalar or array_like
        The current price of the underlying asset.
    vol: scalar or array_like
        The implied Black-Scholes volatility.
    r: scalar or array_like
        The annualized risk-free interest rate, continuously compounded.
    q: scalar or array_like
        The annualized continuous dividend yield.
    callput: str
        Must be either 'call' or 'put'.

    Returns
    -------
    price: scalar or array_like
        The price of the option.

    Examples
    --------
    >>> blackscholes_price(95, 0.25, 100, 0.2, r=0.05, callput='put')
    1.5342604771222823
    """
    F = S0*np.exp((r-q)*T)
    v = vol*np.sqrt(T)
    d1 = np.log(F/K)/v + 0.5*v
    d2 = d1 - v
    try:
        opttype = {'call':1, 'put':-1}[callput.lower()]
    except:
        raise ValueError('The value of callput must be either "call" or "put".')
    price = opttype*(F*norm.cdf(opttype*d1)-K*norm.cdf(opttype*d2))*np.exp(-r*T)
    return price

class LossCallback(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        if (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch + 1}: Loss = {logs['loss']}")

## funcRecord

In [None]:
import numpy as np
from scipy.interpolate import interp1d
import tensorflow as tf
from tensorflow import keras

In [None]:
class ExpectationFunc():
    def __init__(self, ftype: str) -> None:
        ftypes = [
            'Polynomial',
            'PieceWise',
            'KernelRegression',
            'LocalLR',
            'BlackScholes',
            'NeuralNetwork',
            'SelfDefined'
            ]
        self.ftype = ftype
        self.coeff = None

    def input_from_path(self, path_i, input_fn = None):
        if self.ftype not in ('SelfDefined'):
            return path_i

    def fit(self, X, y,
            **kwargs):
        if self.ftype == 'Polynomial':
            deg = kwargs.get('deg', 3)
            self.coeff = np.polyfit(X, y, deg=deg)
        if self.ftype == 'PieceWise':
            lower = kwargs.get('lower', 5)
            upper = kwargs.get('upper', 95)
            n_xknots = kwargs.get('n_xknots', 10)
            xknots = np.linspace(np.percentile(X, lower), np.percentile(X, upper), n_xknots)
            ps, fs = pwlin_fit(X, y, xknots)
            self.coeff = (ps, fs)
        if self.ftype == 'KernelRegression':
            lower = kwargs.get('lower', 5)
            upper = kwargs.get('upper', 95)
            n_xknots = kwargs.get('n_xknots', 10)
            kern = kwargs.get('kern', gauss_kern)
            bandwidth = kwargs.get('bandwidth', (4/(3*len(X)))**0.2*np.std(X))
            xknots = np.linspace(np.percentile(X, lower), np.percentile(X, upper), n_xknots)
            yknots = kern_reg(xknots, X, y, bandwidth, kern)
            self.coeff = interp1d(xknots, yknots, kind='linear', fill_value='extrapolate')
        if self.ftype == 'LocalLR':
            lower = kwargs.get('lower', 5)
            upper = kwargs.get('upper', 95)
            n_xknots = kwargs.get('n_xknots', 10)
            kern = kwargs.get('kern', gauss_kern)
            bandwidth = kwargs.get('bandwidth', (4/(3*len(X)))**0.2*np.std(X))
            xknots = np.linspace(np.percentile(X, lower), np.percentile(X, upper), n_xknots)
            yknots = ll_reg(xknots, X, y, bandwidth, kern)
            self.coeff = interp1d(xknots, yknots, kind='linear', fill_value='extrapolate')
        if self.ftype == 'BlackScholes':
            K = kwargs['K']
            T = kwargs['T']
            vol = kwargs['vol']
            r = kwargs.get('r', 0)
            q = kwargs.get('q', 0)
            callput = kwargs['callput']
            self.bs_f = lambda x: blackscholes_price(K, T, x, vol, r, q, callput)
            Z = self.bs_f(X)
            A = np.vstack((np.ones_like(Z), Z)).T
            self.coeff = np.linalg.lstsq(A, y, rcond=None)[0]
        if self.ftype == 'NeuralNetwork':
            self.coeff = keras.models.Sequential([
                keras.layers.InputLayer(shape=[2]),
                keras.layers.Dense(20, activation='relu'),
                keras.layers.Dense(20, activation='relu'),
                keras.layers.Dense(20, activation='relu'),
                keras.layers.Dense(1)])
            self.coeff.compile(loss=keras.losses.MeanSquaredError(), optimizer=keras.optimizers.Adam(learning_rate=0.001))
            self.coeff.fit(X, y, epochs=50, batch_size=128, verbose=0, callbacks=[LossCallback()])
        if self.ftype == 'SelfDefined':
            self.fit_function = kwargs['fit_function']
            self.pred_function = kwargs['pred_function']
            self.coeff = self.fit_function(X, y)
        return self.predict(X)

    def predict(self, X):
        if self.ftype == 'Polynomial':
            return np.polyval(self.coeff, X)
        if self.ftype == 'PieceWise':
            ps, fs = self.coeff
            return sum([f(X)*p for (f, p) in zip(fs, ps)])
        if self.ftype == 'KernelRegression':
            return self.coeff(X)
        if self.ftype == 'LocalLR':
            return self.coeff(X)
        if self.ftype == 'BlackScholes':
            Z = self.bs_f(X)
            return self.coeff[0]+self.coeff[1]*Z
        if self.ftype == 'NeuralNetwork':
            return (self.coeff.predict(X)).reshape(-1,)
        if self.ftype =='SelfDefined':
            return self.pred_function(X)


class FuncRecord():
    def __init__(self, ftype: str) -> None:
        ftypes = [
            'Polynomial',
            'PieceWise',
            'KernelRegression',
            'LocalLR',
            'BlackScholes',
            'NeuralNetwork',
            'SelfDefined'
            ]
        self.coeff_hist= {}
        if ftype not in ftypes:
            raise ValueError('This type of method is not implemented!')
        self.ftype = ftype

    def __len__(self):
        return len(self.coeff_hist)

    def __iter__(self):
        return self.coeff_hist.values

    def __setitem__(self, i, value):
        self.coeff_hist[i] = value

    def __getitem__(self, i):
        if i not in self.coeff_hist:
            return self.__missing__(i)
        return self.coeff_hist[i]

    def __missing__(self, i):
        self.coeff_hist[i] = ExpectationFunc(self.ftype)
        return self.coeff_hist[i]

    def __repr__(self):
        return f'{self.ftype} Expectation-Calculation Functions for {len(self.coeff_hist)} time-steps'

## derivativeClass

In [None]:
def blackscholes_mc(ts, n_paths, S0, vol, r, q):
    """Generate Monte-Carlo paths in Black-Scholes model.

    Parameters
    ----------
    ts: array_like
        The time steps of the simualtion
    n_paths: int
        the number of paths to simulate
    S0: scalar
        The spot price of the underlying security.
    vol: scalar
        The implied Black-Scholes volatility.
    r: scalar
        The annualized risk-free interest rate, continuously compounded.
    q: scalar
        The annualized continuous dividend yield.

    Returns
    -------
    paths: ndarray
        The Monte-Carlo paths.
    """
    paths = np.full((len(ts), n_paths), np.nan, dtype=float)
    paths[0] = S0
    for i in range(len(ts)-1):
        dt = ts[i+1] - ts[i]
        dW = np.sqrt(dt)*np.random.randn(n_paths)
        paths[i+1] = paths[i] * np.exp((r-q-1/2*vol**2)*dt + vol*dW)
    return paths

In [None]:
class Derivative():
    def __init__(self, **kwargs):
        self.T = kwargs.get('T', 1)
        self.r = kwargs.get('r', 0)
        self.time_step = kwargs.get('time_step', 12)
        self.ts = np.linspace(0, self.T, int(np.round(self.T*self.time_step))+1)
        self.dt = 1/self.time_step

    def discounted(self, X):
        return np.exp(-self.r*self.dt)*X

class StockBacked(Derivative):
    def __init__(self, *args, S0, vol, **kwargs):
        super().__init__(*args, **kwargs)
        if 'K_mat' in kwargs:
            self.K_mat = kwargs['K_mat']
        else:
            self.K_mat = kwargs['K']
        self.S0 = S0
        self.vol = vol
        self.q = kwargs.get('q',0)

    def back_asset_simulation(self, n_paths):
        return blackscholes_mc(self.ts, n_paths, self.S0, self.vol, self.r, self.q)


class AmericanOption(StockBacked):
    def __init__(self, *args, callput, **kwargs):
        super().__init__(*args, **kwargs)
        try:
            opttype = {'call':1, 'put':-1}[callput.lower()]
        except:
            raise ValueError('The value of callput must be either "call" or "put".')
        self.opttype = opttype

    def exer(self, path_i):
        return np.maximum(0, self.opttype*(path_i - self.K_mat))

    def exer_at_maturity(self, path_i):
        return self.exer(path_i)

    def prob_param_exer(self, contval, exerval):
        return np.where(exerval >= contval, 1, 0)

    def target_expectation(self, contval, exerval, p):
        return p*exerval + (1-p)*contval


class BermudanAsianOption(StockBacked):
    def __init__(self, *args, callput, **kwargs):
        super().__init__(*args, **kwargs)
        try:
            opttype = {'call':1, 'put':-1}[callput.lower()]
        except:
            raise ValueError('The value of callput must be either "call" or "put".')
        self.opttype = opttype

    def back_asset_simulation(self, n_paths):
        """
        return (len(ts), 2, n_paths) where the first column is actual stock price and the second is average price
        """
        paths = super().back_asset_simulation(n_paths)
        len_t = len(self.ts)
        A = np.zeros((len_t, n_paths))
        A[1:] = np.cumsum(paths[1:], axis = 0)/np.arange(1, len_t).reshape(len_t-1, 1)
        return np.stack((paths, A), axis=1)

    def exer(self, path_i):
        return np.maximum(0, self.opttype*(path_i[1] - self.K_mat))

    def exer_at_maturity(self, path_i):
        return self.exer(path_i)

    def prob_param_exer(self, contval, exerval):
        return np.where(exerval >= contval, 1, 0)

    def target_expectation(self, contval, exerval, p):
        return p*exerval + (1-p)*contval

class Reinsurance(StockBacked):
    def __init__(self, *args, K_D, alpha, lambda_max, lambda_min, **kwargs):
        super().__init__(*args, **kwargs)
        self.K_D = K_D
        self.opttype = -1
        self.alpha = alpha
        self.lambda_max = lambda_max
        self.lambda_min = lambda_min

    def exer(self, path_i):
        return np.maximum(0, self.opttype*(path_i - self.K_D))

    def exer_at_maturity(self, path_i):
        return np.maximum(0, self.opttype*(path_i - self.K_mat))

    def prob_param_exer(self, contval, exerval):
        return np.where(exerval >= contval, self.lambda_max, self.lambda_min)

    def target_expectation(self, contval, exerval, lambd):
        p = 1-np.exp(-lambd*self.dt)
        return p*exerval + (1-p)*contval - self.alpha*self.dt


## Simulation Algo

In [None]:
# Longstaff-Schwartz algorithm

class LS():
    def __init__(self, ftype, derivative, **kwargs):
        ftypes = [
            'Polynomial',
            'PieceWise',
            'KernelRegression',
            'LocalLR',
            'BlackScholes',
            'NeuralNetwork',
            'SelfDefined'
            ]
        if ftype not in ftypes:
            raise ValueError('This type of method is not implemented!')
        self.ftype = ftype
        self.deriv = derivative
        self.kwargs = kwargs
        self.model = None

    def fit_simulation(self, n_paths):
        betas = FuncRecord(ftype = self.ftype)
        paths = self.deriv.back_asset_simulation(n_paths)
        V = self.deriv.exer_at_maturity(paths[-1])
        exerval = self.deriv.exer(paths[-1])
        p = self.deriv.prob_param_exer(V, exerval)
        V = self.deriv.target_expectation(V, exerval, p)
        for i in range(len(self.deriv.ts)-2, 0, -1):
            V = self.deriv.discounted(V)
            f_kwargs = self.kwargs
            f_kwargs['T'] = self.deriv.T - self.deriv.ts[i]
            betas[i].fit(paths[i], V, **f_kwargs)
            contval = betas[i].predict(paths[i])
            exerval = self.deriv.exer(paths[i])
            p = self.deriv.prob_param_exer(contval, exerval)
            V = self.deriv.target_expectation(V, exerval, p)
        self.model = betas
        return np.mean(self.deriv.discounted(V))

    def lb_simulation(self, n_paths):
        betas = self.model
        paths = self.deriv.back_asset_simulation(n_paths)
        V = self.deriv.exer_at_maturity(paths[-1])
        exerval = self.deriv.exer(paths[-1])
        p = self.deriv.prob_param_exer(V, exerval)
        V = self.deriv.target_expectation(V, exerval, p)
        for i in range(len(self.deriv.ts)-2, 0, -1):
            V = self.deriv.discounted(V)
            contval = betas[i].predict(paths[i])
            exerval = self.deriv.exer(paths[i])
            p = self.deriv.prob_param_exer(contval, exerval)
            V = self.deriv.target_expectation(V, exerval, p)
        return np.mean(self.deriv.discounted(V))

# Instances

In [None]:
# Instance: American Option
np.random.seed(5400)
params = {
    'K': 100,
    'S0': 100,
    'vol': 0.2,
    'r': 0.1,
    'q': 0.02,
    'T': 1,
    'callput': 'put',
    'deg': 2,
}
a = AmericanOption(**params)
a_ls = LS(ftype='Polynomial', derivative = a, **params)
a_p = a_ls.fit_simulation(n_paths=10000)
a_lb = a_ls.lb_simulation(n_paths=100000)
print('Fitting Simulation Price is {:.4f}'.format(a_p))
print('Lower-Bound Price is {:.4f}'.format(a_lb))

Fitting Simulation Price is 5.0052
Lower-Bound Price is 4.9737


In [None]:
# Instance: Uncertain Mortality Model
np.random.seed(5400)
params = {
    'K_mat': 90,
    'K_D': 100,
    'K': 95,
    'S0': 100,
    'vol': 0.3,
    'r': 0,
    'q': 0,
    'T': 10,
    'alpha': 3,
    'lambda_max': 0.04,
    'lambda_min': 0.005,
    'callput': 'put'
}
u = Reinsurance(**params)
u_ls = LS(ftype='BlackScholes', derivative = u, **params)
u_p = u_ls.fit_simulation(n_paths=5000)
u_lb = u_ls.lb_simulation(n_paths=100000)
print('Fitting Simulation Price is {:.4f}'.format(u_p))
print('Lower-Bound Price is {:.4f}'.format(u_lb))

Fitting Simulation Price is 3.4902
Lower-Bound Price is 3.3114
