In [None]:
import math
import numpy as np
from numba import jit
import pandas as pd
from scipy.optimize import least_squares

In [None]:
from qmcpy.true_measure.uniform import Uniform
from qmcpy.discrete_distribution.halton.halton import Halton

In [None]:
###
import time
_tstart_stack = []

def tic():
    _tstart_stack.append(time.time())

def toc(fmt="Elapsed: %s s"):
    print(fmt % (time.time() - _tstart_stack.pop()))
###

# The price is given by Two Parts

- First part - easy to implement
- Second part - complex - Monte Carlo

## Standard Black Scholes formula

In [None]:
@jit
def cnd_numba(d):
    """cdf of Normal distribution"""
    
    A1 = 0.31938153
    A2 = -0.356563782
    A3 = 1.781477937
    A4 = -1.821255978
    A5 = 1.330274429
    RSQRT2PI = 0.39894228040143267793994605993438
    K = 1.0 / (1.0 + 0.2316419 * math.fabs(d))
    ret_val = (RSQRT2PI * math.exp(-0.5 * d * d) *
               (K * (A1 + K * (A2 + K * (A3 + K * (A4 + K * A5))))))
    if d > 0:
        ret_val = 1.0 - ret_val
    return ret_val

@jit
def black_scholes_numba(stockPrice, optionStrike,
                        optionYears, Riskfree, Volatility):
    """Standard BS formuls"""

    S = stockPrice
    X = optionStrike
    T = optionYears
    R = Riskfree
    V = Volatility

    sqrtT = math.sqrt(T)
    d1 = (math.log(S / X) + (R + 0.5 * V * V) * T) / (V * sqrtT)
    d2 = d1 - V * sqrtT
    cndd1 = cnd_numba(d1)
    cndd2 = cnd_numba(d2)

    expRT = math.exp((-1. * R) * T)
    callResult = (S * cndd1 - X * expRT * cndd2)

    return callResult

## First Part: No Jump

In [None]:
@jit
def rady_numba(s, k, t, sigma0, l0, u0):
    return black_scholes_numba((1-k/u0)*(s-l0), 
                                (k-l0)*(1-s/u0),
                                t, 
                                0, 
                                (1 - l0 / u0) * sigma0) * u0 / (u0-l0)

@jit
def enlarge_rady_numba(s, k, t, sigma0, l0, u0):
    if 0 < k <= l0:
        return s - k
    elif k >= u0:
        return 0
    elif l0 < k < u0:
        return rady_numba(s, k, t, sigma0, l0, u0)
    
@jit
def no_jump_price_numba(s, k, t, rd, rf, sigma0, l0, u0, lamb, kappa):
    h_t = lamb * kappa * t
    k_star = k * np.exp(-(rd-rf)*t) / np.exp(-h_t)
    return np.exp(- rf * t) * np.exp(-h_t) * enlarge_rady_numba(s, k_star, t, sigma0, l0, u0)


## Second Part: With Jump

In [None]:
@jit
def normal_pdf_numba(x):
    """pdf of Normal distributio"""
    
    return np.exp(-0.5*x**2)/2.5066282746310002

In [None]:
@jit
def implied_density_numba(x, s, l0, u0, sigma0, t):
    """Implied density function at tau"""
    sigmat = (1 - l0 / u0) * sigma0  * t**0.5
    term1 = ((u0 - s) * (u0 - l0)) / (sigmat * (u0-x)**2 * (x-l0))
    term2 = (np.log(((s-l0)*(1-x/u0))/((x-l0)*(1-s/u0))) - 0.5 * sigmat**2) / sigmat
    return term1 * normal_pdf_numba(term2)

In [None]:
@jit
def mc_kernel(x, tau_s, s,k,t,rd,rf,sigma0,sigma1,lamb,kappa,l0,u0):
    """price kernel: the integrand of double integral"""
    
    term1 = black_scholes_numba(x*np.exp(-lamb*kappa*tau_s)*(1+kappa), 
                                    k*np.exp(-(rd-rf)*t),
                                    t-tau_s, 
                                    0, 
                                    sigma1)
    term2 = implied_density_numba(x, s, l0, u0, sigma0, tau_s)
    term3 = lamb * np.exp(-lamb * tau_s)
    return term1 * term2 * term3

## Price1 + Price2

In [None]:
@jit
def first_price_numba(s,k,t,rd,rf,sigma0,lamb,kappa,l0,u0):
    """First price: extend Rady"""
    return no_jump_price_numba(s, k, t, rd, rf, sigma0, l0, u0, lamb, kappa) * np.exp(-lamb*t)

In [None]:
@jit
def second_price_qmc_numba(s,k,t,rd,rf,sigma0,sigma1,lamb,kappa,l0,u0):
    """second price given by Quasi Monte Carlo
        NMAX, Y_SAMP, TAU_SAMP,
    """
    cum_bs = 0
    for y_i, tau_i in zip(Y_SAMP, TAU_SAMP):
        cum_bs += mc_kernel(y_i, tau_i, s,k,t,rd,rf,sigma0,sigma1,lamb,kappa,l0,u0)

    qmc_price = np.exp(-rf*t) * cum_bs / NMAX * (u0-l0) * t
    return qmc_price

In [None]:
@jit
def qmc_price_numba(s,k,t,rd,rf,sigma0,sigma1,lamb,kappa,l0,u0):
    """final price"""
    price1 = first_price_numba(s,k,t,rd,rf,sigma0,lamb,kappa,l0,u0)
    price2 = second_price_qmc_numba(s,k,t,rd,rf,sigma0,sigma1,lamb,kappa,l0,u0)
    return price1 + price2
qmc_price_vect = np.vectorize(qmc_price_numba)

## Simulate samples : Fixed after sampling, always be there

In [None]:
print('Enlarge_Factor', Enlarge_Factor) # according to real data

# given the fixed paras when sampling
L0_SAMP = 7.75*(1-Enlarge_Factor)
U0_SAMP = 7.85*(1+Enlarge_Factor)

print('7.75-->', L0_SAMP,'\n7.85-->', U0_SAMP)
T_SAMP = 0.5 # 6 months

NMAX = 10e5 # numbers of sample

# 2-d Uniform, random seed = 7
unf = Uniform(Halton(2, seed=1), lower_bound=0, upper_bound=1)

# lenth of uniform rv is n_max
unf_sample = unf.gen_samples(n_min=0, n_max=NMAX)
print('Numbers of Sample:', NMAX)

# prepare the data
Y_SAMP = L0_SAMP + (U0_SAMP-L0_SAMP) * unf_sample[:,0]
TAU_SAMP = T_SAMP * unf_sample[:,1]

print('Sampling finished.')

## Sensitivities

In [None]:
from scipy.optimize import root, brentq, minimize

In [None]:
@jit
def garman_kohlhagen_numba(s, k, t, rd, rf, sigma):

    sqrt = math.sqrt(t)
    d1 = (math.log(s / k) + (rd-rf + 0.5 * sigma * sigma) * t) / (sigma * sqrt)
    d2 = d1 - sigma * sqrt
    cndd1 = cnd_numba(d1)
    cndd2 = cnd_numba(d2)

    exprdt = math.exp((-1. * rd) * t)
    exprft = math.exp((-1. * rf) * t)
    callresult = (s *exprft* cndd1 - k * exprdt * cndd2)

    return callresult

In [None]:
def impliedvol_func(s, k, t, rd, rf, price):
    def root_func(x):
        return garman_kohlhagen_numba(s, k, t, rd, rf, x) - price
    result = 0
    try:
        result = brentq(root_func, 0.001, 30)
    except Exception:
        print('There is a problem here with brentq')
        result = root(root_func, 0.2).x[0]
    return result
Impliedvol = np.vectorize(impliedvol_func)

# Calibration

In [None]:
def sigma_model(x, s, k, t, rd, rf, l0=L0_SAMP, u0=U0_SAMP):
    """
    x[0]: sigma0
    x[1]: sigma1
    x[2]: lamb
    x[3]: kappa    
    """
    price = qmc_price_vect(s, k, t, rd, rf, x[0], x[1], x[2], x[3], l0, u0)
    return Impliedvol(s, k, t, rd, rf, price)

In [None]:
# fit with sigma lower bar
def Fit_BF_sigmalow(OBJ, INPUT, GUESS, S, T, Rd, Rf, Verbose = 2):
    
    # By assumption OBJ = [SxP, SxP, SATM, SxC, SxC]
    # And INPUT = [KxP, KxP, KATM, KxC, KxC]

    # we first get the dimension
    n = OBJ.size

    BOUNDS = ([0.1,  OBJ[2], 0.0001, -0.8], 
              [500,  30,     1 / T,   0.8])
    
    # Check that the Guess is correct
    GUESS[1] = np.maximum(GUESS[1], OBJ[2]) # for sigma upper [1]
    GUESS[0] = np.minimum(GUESS[0], 7.8*OBJ[2]/0.0003185*0.5) # for sigma lower

    ###

    def fun(x, k, y):
        z = x
        term = np.empty(n)
        for i in range(n):
            term[i] = (sigma_model(z, S, k[i], T, Rd, Rf) - y[i])
        return term

    res_lsq = least_squares(fun, 
                            GUESS, 
                            loss = 'cauchy', 
                            bounds=BOUNDS, 
                            args = (INPUT, OBJ), 
                            verbose = Verbose,
                            max_nfev=180
                           )
    x = res_lsq.x
    s = sigma_model(x, S, INPUT, T, Rd, Rf)
    
    # 20190816 modify Errors
    o1 = OBJ[0]
    o2 = OBJ[1]
    o3 = OBJ[2]
    o4 = OBJ[3]
    o5 = OBJ[4]

    c1 = s[0]
    c2 = s[1]
    c3 = s[2]
    c4 = s[3]
    c5 = s[4]
    
    # mean squared error
    e1 = ((o1 - c1)  / o1) ** 2
    e2 = ((o2 - c2)  / o2) ** 2
    e3 = ((o3 - c3)  / o3) ** 2
    e4 = ((o4 - c4)  / o4) ** 2
    e5 = ((o5 - c5)  / o5) ** 2

    # 20190822 mean error
    m1 = (np.abs(o1 - c1)  / o1)
    m2 = (np.abs(o2 - c2)  / o2)
    m3 = (np.abs(o3 - c3)  / o3)
    m4 = (np.abs(o4 - c4)  / o4)
    m5 = (np.abs(o5 - c5)  / o5)

    rmse = ((e1+e2+e3+e4+e5) / 5) ** 0.5
    mse = (m1+m2+m3+m4+m5) / 5 

    # 20190816 two errors to one rmse
    # 20190822 rmse, error, errorpa
    error = np.linalg.norm(OBJ-s)
    errorpa = error / OBJ.mean() * 100
    return res_lsq.x, s, rmse, mse, error, errorpa, res_lsq.nfev, res_lsq.status

In [None]:
def func(row):
    global GUESS
    
    S = row['S']
    Rd = row['Rd']
    Rf = row['Rf']
    T = row['T']
    
    S10P = row['S10P']
    S25P = row['S25P']
    SATM = row['SATM']
    S25C = row['S25C']
    S10C = row['S10C']
    
    K10P = row['K10P']
    K25P = row['K25P']
    KATM = row['KATM']
    K25C = row['K25C']
    K10C = row['K10C']
    
    OBJ = np.array([S10P, S25P, SATM, S25C, S10C])
    INPUT = np.array([K10P, K25P, KATM, K25C, K10C])
    
    res_lsq_x, s,rmse,mse, error, errorpa, res_lsq_nfev, res_lsq_status = Fit_BF_sigmalow(OBJ, INPUT, GUESS, S, T, Rd, Rf)
    print('Data Now is:\n%s\nGUESS = %s\n error = %f\n errorpa = %f' %(row,GUESS, error, errorpa))
    
    GUESS = res_lsq_x

    return pd.Series({
        'sigma0': res_lsq_x[0],
        'sigma1': res_lsq_x[1],
        'lamb': res_lsq_x[2],
        'kappa': res_lsq_x[3],
        
        'sS10P': s[0],
        'sS25P': s[1],
        'sSATM': s[2],
        'sS25C': s[3],
        'sS10C': s[4],
        
        'rmse': rmse,
        'mse': mse,
        'Err': error,
        'ErrPa': errorpa,
        'Nfev': res_lsq_nfev,
        'Status': res_lsq_status
        })

# 6M

In [None]:
# 6M
df = pd.read_pickle('./../Data/Data_for_Calibration/USDHKD0608_6M_for_Calibration_Average.pkl')
del df['SEMP'] # delet semp which is ueseless

In [None]:
rdmax = df['Rd'].max()
rdmin = df['Rd'].min()

rfmax = df['Rf'].max()
rfmin = df['Rf'].min()

fct = max(abs(rdmax-rfmin), abs(rfmax-rdmin))
fct

In [None]:
T_6months = 0.5
Enlarge_Factor = max(1-np.exp(-fct*T_6months),np.exp(fct*T_6months)-1)
print('Enlarge_Factor', Enlarge_Factor)

In [None]:
# drop nan value
df = df.dropna().loc['2014':]

df.head() # show and check

In [None]:
# Name: 2014-01-02 00:00:00, dtype: object
# GUESS = [ 4.179  0.094  0.107 -0.011]
GUESS = np.array([4.179, 0.094, 0.107, -0.011])

In [None]:
tic()
result_df = df.apply(func, axis = 1)
toc()

In [None]:
# Elapsed: 36361.29989647865 s

In [None]:
combinedf = df.join(result_df)