In [5]:
import numpy as np
import math
from numba import jit

@jit(nopython = True)
def wiener(t: float) -> float:
    '''Returns value of Wiener process at time = t'''
    return math.sqrt( t ) * np.random.normal(0, 1)

@jit(nopython = True)
def st(t: float, S0: float, r: float, sigma: float) -> float:
    '''Exact solution for geometric Brownian motion for undelying asset s
    
    Parameters.
    1. t - time. Type - float
    2. S0 - value of asset at time = 0. Type - float
    3. r - risk neutral interest rate. Type - float
    4. sigma - volatility. Type - float.

    Output.
    S(t) random variable.
    '''
    return S0 * math.exp( ( r - sigma**2 / 2) * t + sigma * wiener( t ) )

@jit(nopython = True)
def const_tri_diag_mat_solve(params: np.array, mat_b: np.array) -> np.array:
    '''Solution of problem Ax = b by tridiagonal matrix algorithm for matrix with const coefficients.
    
    Parameters.
    1. params - const coefficients of tridiagonal matrix A. Type - numpy array
    2. mat_b - vector of free coefficients b. Type - numpy array.

    Output.
    `x` - vector. Type - numpy array.
    '''
    n = len(mat_b)
    x = np.zeros(n)
    '''
    p1, p2, p3 const parmeters of tridiagonal matrix.
    Matirx correspons to:
    A = sparse.diags([p1, p2, p3], [-1, 0, 1], shape=(n, n))
    '''
    p1, p2, p3 = params[0], params[1], params[2]
    coef_A = np.zeros(n)
    coef_B = np.zeros(n)
    coef_A[1] = - p3 / p2
    coef_B[1] = mat_b[0] / p2
    alpha_gamma = p2 / p1

    '''forward sweep'''
    for i in range(1, n - 1):
        b_gamma = mat_b[i] / p1
        coef_A[i + 1] =  - p3 / ( p1 * coef_A[i] + p2 )
        coef_B[i + 1] = ( b_gamma - coef_B[i] ) / ( coef_A [i] + alpha_gamma) 

    '''backward sweep'''
    i = n - 1
    b_gamma = mat_b[i] / p1
    x[i] = - ( coef_B[i]  - b_gamma ) / ( coef_A[i] + alpha_gamma )
    for j in range(n - 1, 0 , -1):
        x[j - 1] = x[j] * coef_A[j] + coef_B[j]
    return x

@jit(nopython = True)
def tri_diag_mat_solve_arr(p1, p2, p3, mat_b):
    '''Solution of problem Ax = b by tridiagonal matrix algorithm.
    
    Parameters.
    1. p1 - value of diagonal A[i][i - 1]. Type - Numpy Array. First element equals to 0.
    2. p2 - value of diagonal A[i][i]. Type - Numpy Array.
    3. p3 - value of diagonal A[i][i + 1]. Type - Numpy Array. Last element equals to  0.
    4. mat_b - vector of free coefficients. Type - Numpy Array.

    There are actual values in p1 and p3 - (n - 1), but in p2 - n.
    First element equals in p1 - 0, Last element equals in p3 - 0.
    '''
    n = len(mat_b)
    check = len(p1) == len(p3) == len(p2) == n
 
    if check == False:
        raise Exception('wrong dimensions')
    
    x = np.zeros(n)
    coef_A = np.zeros(n)
    coef_B = np.zeros(n)

    coef_A[1] = - p3[0] / p2[0]
    coef_B[1] = mat_b[0] / p2[0]
    
    '''forward sweep'''
    for i in range(1, n - 1):
        b_gamma = mat_b[i] / p1[i]
        alpha_gamma = p2[i] / p1[i]
        coef_A[i + 1] =  - p3[i] / ( p1[i] * coef_A[i] + p2[i] )
        coef_B[i + 1] = ( b_gamma - coef_B[i] ) / ( coef_A [i] + alpha_gamma) 

    '''backward sweep'''
    i = n - 1
    b_gamma = mat_b[i] / p1[i]
    alpha_gamma = p2[i] / p1[i]
    x[i] = - ( coef_B[i]  - b_gamma ) / ( coef_A[i] + alpha_gamma )
    
    for j in range(n - 1, 0 , -1):
        x[j - 1] = x[j] * coef_A[j] + coef_B[j]
    
    return x


def get_result(x_data: np.array, y_data: np.array, val: float) -> float:
    '''Calculate arbitrary function value by grid data.

    # Parameters.
    1. x_data - set of nodes. Type - numpy array
    2. y_data - set of function values at nodes.Type - numpy array
    3. val - value to calculate. Type - float. 
    
    If `val` do not get to the exact node, then between two nearset nodes straight line is drawn and 
    value is calculated in intermediate point.
    '''
    li = np.where(x_data < val)[0][-1]
    ri = np.where(x_data >= val)[0][0]
    if ri == val:
        return y_data[ri]
    else:
        return (y_data[ri] - y_data[li]) / (x_data[ri] - x_data[li]) * (val - x_data[li]) + y_data[li]

def delta_p(tau: float, s: np.array, r: float, sigma: float):
    return 1 / ( sigma * math.sqrt(tau) ) * ( np.log(s) + ( r + 1 / 2 * sigma**2) * tau )

def delta_m(tau: float, s: np.array, r: float, sigma: float):
    return 1 / ( sigma * math.sqrt(tau) ) * ( np.log(s) + ( r - 1 / 2 * sigma**2) * tau )

def cond_prob_M(B, k, T):
    '''Conditional probability function P( Max(W(t)) < B | W(T) = k )'''
    return 1 - math.exp(2 * B * (k - B) / T)

In [7]:
import numpy as np
import math
from scipy.stats import norm

class barrier_call_option:
    '''
    Barrier up-and-out call option

    # Parameters.
        1. T - expiration time. Type - float
        2. t - time. Type - float
        3. S0 - value of asset at time = 0. Type - float
        4. K - strike. Type - float
        5. B - barrier. Type - float
        6. r - risk neutral interest rate. Type - float
        7. sigma - volatility. Type - float.

    # List of available methods
    1. price_exact - exact solution of Black-Sholes equation
    2. price_monte_carlo - Monte-Carlo simulation of option price
    3. price_pde - numerical solution of Black-Sholes PDE
    4. get_pde_price - calculation of option price at point S0.
    '''
    def __init__(self, T: float, t: float, S0: float, K: float, B: float, r: float, sigma: float) -> None:
        self.verify_init_data(T, t, S0, K, B, r, sigma)
        self.__T = T
        self.__t = t 
        self.__S0 = S0
        self.__K = K
        self.__B = B
        self.__r = r
        self.__sigma = sigma

        #self.__mc_v = np.nan
        #self.__exact_v = np.nan

        self.__pde_calc_flg = 0
        self.__pde_t = np.nan
        self.__pde_s = np.nan
        self.__pde_v = np.nan

    @classmethod
    def verify_init_data(cls, T, t, S0, K, B, r, sigma):
        params = [T, S0, K, B, r, sigma]
        names = ['T', 'S0', 'K', 'B', 'r', 'sigma']
        n = len(params)
        for i in range(0, n):
            param_type = type(params[i])
            if not (param_type == int or param_type == float):
                raise TypeError(f"{names[i]} should be a number, got {param_type.__name__}")
            if params[i] <= 0:
                raise TypeError(f"{names[i]} should be a positive number, got {params[i]}")
            
        '''handle t'''
        param_type = type(t)
        if not (param_type == int or param_type == float):
            raise TypeError(f"{names[i]} should be a number, got {param_type.__name__}")
        if t < 0:
            raise TypeError(f"{names[i]} should be a positive number, got {t}")        
        if t > T: 
            raise TypeError(f"t is out of [0, T] interval. got [0, {T}] and t = {t}")

    @property
    def T(self):
        return self.__T

    @property
    def t(self):
        return self.__t
    
    @property
    def S0(self):
        return self.__S0
    
    @property
    def K(self):
        return self.__K
    
    @property
    def B(self):
        return self.__B   

    @property
    def r(self):
        return self.__r
    
    @property
    def sigma(self):
        return self.__sigma

    @property
    def pde_t(self):
        return self.__pde_t
    
    @pde_t.setter
    def pde_t(self, arr):
        self.__pde_t = arr

    @property
    def pde_s(self):
        return self.__pde_s
    
    @pde_s.setter
    def pde_s(self, arr):
        self.__pde_s = arr

    @property
    def pde_v(self):
        return self.__pde_v

    @pde_v.setter
    def pde_v(self, arr):
        self.__pde_v = arr

    @property
    def pde_calc_flg(self):
        return self.__pde_calc_flg

    @pde_calc_flg.setter
    def pde_calc_flg(self, val):
        self.__pde_calc_flg = val

    #@jit(nopython = True)
    def price_monte_carlo(self, n_iters: int):
        '''Monte Carlo simulaton of barrier call option price.
           t parameter is not considered in this function.

        Parameters.
        n_iters - count of monte-carlo iterations. Type - Int.

        Output.
        Average price.
        '''
        mx = 0
        a = (self.r - self.sigma**2 / 2) / self.sigma

        '''Maximum of Wiener process with respect to barrier'''
        b = 1 / self.sigma * math.log(self.B / self.S0)
        
        for i in range(0, n_iters):
            '''Generation of Wiener process by new probability measure'''
            wiener_hat = wiener(self.T) + a * self.T
            ST = self.S0 * math.exp(self.sigma * wiener_hat)
            Indicator = 1
            if ST > self.B:
                Indicator = 0
            else:
                '''Correction of payoff value on probabilty that Max(W(t)) < b with condition W(T) = wiener_hat'''
                Indicator = cond_prob_M(b, wiener_hat, self.T)
            val = math.exp( - self.r * self.T ) * max(ST - self.K , 0) * Indicator
            mx += val
        return mx / n_iters

    def price_exact(self):
        '''Exact solution of Black-Sholes PDE for barrier call option price.

        Output.
        V(S0, T) price of call option at time = `T` and initial underlying price = `S0`.
        '''
        T, t, S0, K, B, r, sigma = self.T, self.t, self.S0, self.K, self.B, self.r, self.sigma

        if K > B:
            return 0
        tau = T - t
        v = S0 * ( norm.cdf( delta_p(tau, S0 / K, r, sigma) ) - norm.cdf( delta_p(tau, S0 / B, r, sigma) ) ) \
            - math.exp(- r * tau) * K \
                * ( norm.cdf( delta_m(tau, S0 / K, r, sigma) ) - norm.cdf( delta_m(tau, S0 / B, r, sigma) ) ) \
            - B * (S0 / B)**(- 2 * r / sigma**2) * \
                ( norm.cdf( delta_p(tau, B**2 / (K * S0), r, sigma) ) - norm.cdf( delta_p(tau, B / S0, r, sigma) ) ) \
            + math.exp(- r * tau) * K * (S0 / B)**(- 2 * r / sigma**2 + 1) \
                * ( norm.cdf( delta_m(tau, B**2 / (K * S0), r, sigma) ) - norm.cdf( delta_m(tau, B / S0, r, sigma) ) )
        return v

            
    #@jit(nopython = True)
    def price_pde(self, n_t: int, n_s: int) -> np.array:
        '''
        Solution of u_t = u_xx + u_x * ( 1 + D ). D = 2r / s^2. Crank-Nicolson scheme.
        PDE is considered in tau = sigma**2 / 2 * (T - t) and x = log (S/K) variables.
        Transition to initial variables is made at the end of evaluations.
        Initial parameters (`S0`, `t`) of call_option class is not considered in this function.

        # Parameters.
        1. n_t - number of `t` grid steps. Type - Int
        2. n_s - number of `S` grid steps. Type - Int.

        # Output.
        Returns set of numpy arrays: 
        1. t - array with length of n_t (corresponds to region [0, T])
        2. s - array with lengh of n_s (corresponds to region [K / 3, B])
        3. v - matrix of call option price at t_i, x_j
        '''

        '''Auxilary parameters'''
        T, K, B, r, sigma = self.T, self.K, self.B, self.r, self.sigma
        n_x = n_s
        region = [[0, T], [K / 3, B]]
        right_t, left_t = sigma**2 / 2 * (T - region[0][0]), sigma**2 / 2 * (T - region[0][1])
        left_x, right_x = math.log(region[1][0] / K), math.log(region[1][1] / K)
        tau = abs(right_t - left_t) / n_t
        h = abs(right_x - left_x) / n_x
        u = np.zeros((n_t, n_x))
        D =  2 * r / sigma**2
        
        '''Initial and border conditions.'''

        '''x = -inf'''
        for i in range(0, n_t): 
            u[i][0] = 0
        '''x = ln (B / k)'''
        for i in range(0, n_t):
            u[i][n_x - 1] = 0
        '''tau = 0'''
        for i in range(0, n_x):
            u[0][i] = max(0 , 1 - math.exp(- ( left_x + i * h ) ) )

        '''Set matrix for solving system of linear equations'''
        size = n_x - 2
        p1, p2, p3 = - tau, 2 * h**2 + 2 * tau + tau * h * (1 + D), - tau - tau * h * (1 + D)
        A_mat_params = np.array([p1, p2, p3])

        p4 = 2 * h**2 - 2 * tau - tau * h * (1 + D)
        '''Finite difference scheme'''
        for k in range(0, n_t - 1):
            '''Vector of free coefficients b'''
            b = np.zeros(size)
            b[0] = - p1 * u[k + 1][0] - p1 * u[k][0] + p4 * u[k][1] - p3 * u[k][2]
            b[1:size - 1] = - p1 * u[k][1:size - 1] + p4 * u[k][2:size] - p3 * u[k][3:size + 1]
            b[size - 1] = - p3 * u[k + 1][n_x - 1] - p1 * u[k][n_x - 3] + p4 * u[k][n_x - 2] - p3 * u[k][n_x - 1]
            '''Solving system Ax = b by tridiagonal matrix algorithm'''
            res = const_tri_diag_mat_solve(A_mat_params, b)
            u[k + 1][1:n_x - 1] = res
            
        '''Transition from function u(x, tau) to V(S,t)'''
        x_data = np.exp(np.linspace(left_x, right_x, n_x)) * K
        for k in range(0, n_t):
            u[k] = u[k] * x_data
        
        '''Transition from coordinates tau to t'''
        T1 = sigma**2 * T / 2
        t = np.linspace(0, T1, n_t)
        t = T - 2 * t / sigma**2

        '''Transition from coordinates x to S'''
        s = np.linspace(math.log( region[1][0] / K ), math.log( region[1][1] / K), n_x)
        s = np.exp(s) * K

        self.pde_t = t
        self.pde_s = s
        self.pde_v = u
        
        self.pde_calc_flg = 1

        return (t, s, u)

    def get_pde_result(self, S0: float = None):
        '''Returns pde call option price v(S0, t)'''
        if self.pde_calc_flg == 0:
            raise ValueError(f'Nothing to return. Method price_pde should be called first.')
        if S0 == None:
            S0 = self.S0
        return get_result(self.pde_s, self.pde_v[-1], S0)

In [29]:
x = barrier_call_option(1, 1, 100, 100, 90, 0.07, 0.25)
x.price_pde(100, 100)
x.get_pde_result(100)

IndexError: index 0 is out of bounds for axis 0 with size 0