In [1]:
get_ipython().run_line_magic('matplotlib', 'inline')
import matplotlib.pyplot as plt
import numpy as np
from math import *
import scipy.stats
from pandas import *
#import scipy.integrate as integrate
from scipy.integrate import quad
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")
from mpl_toolkits import mplot3d

import plotly.offline as pyoff
import plotly.graph_objs as go
import plotly.graph_objects as go
from scipy.linalg import expm
from tqdm.notebook import tqdm

In [2]:
# Define function to calculate jump distribution for integral terms 

def jump_distribution_full(x_value,mu,sigma_jump,step_x):
    lower = x_value - step_x/2.0
    upper = x_value + step_x/2.0
    #print(lower,upper)
    def normal_distribution_function(x):
        value = scipy.stats.norm.pdf(x,mu,sigma_jump)
        if sigma_jump == 0.0:
            return 0.0
        else:
            return value
    res, err = quad(normal_distribution_function, lower, upper)
    return res  #output is a vector of the average densities around the regions of each value in the vector x


In [3]:
# Coefficients as obtained from BTCS discretization scheme

def a_coeff_rs(x_value, sigma, kappa, theta, q_current):
    coefficient = (sigma**2)/(dx**2) + np.sum(q_current)
    return coefficient

def b_coeff_rs(x_value, sigma, kappa, theta):
    #print(sigma,kappa,theta,jump_rate)
    coefficient_2 = 0.5*(sigma**2)*(1/dx**2) + 0.5*(1/dx)*kappa*(theta-x_value)
    #print(coefficient_2)
    return coefficient_2

def c_coeff_rs(x_value, sigma, kappa, theta):
    coefficient_3 = 0.5*(sigma**2)*(1/dx**2) - 0.5*(1/dx)*kappa*(theta-x_value)
    return coefficient_3   


In [4]:
# Construction of M matrix containing the coefficients of the solution at time t = q+1 in the implicit scheme for a single regime (block)

def M_matrix_rs(x,sigma,kappa, theta, q_current, L):
    M = np.zeros(shape=(len(x),len(x)))
    for i in range(0,len(x)):
        if x[i] <= 0.0 or x[i] >= L:
            M[i,i] = 1.0
        else:
            #print(x[i])
            a_coefficient = a_coeff_rs(x[i], sigma, kappa, theta, q_current)
            b_coefficient = b_coeff_rs(x[i], sigma, kappa, theta)
            c_coefficient = c_coeff_rs(x[i], sigma, kappa, theta)
            if a_coefficient < 0 or b_coefficient < 0 or c_coefficient <0:
                print('Error')
            M[i,(i-1):(i+2)] = [-dt*c_coefficient, 1+dt*a_coefficient, -dt*b_coefficient]

    return M

In [5]:
# Construction of full block M matrix containing the coefficients of the solution at time t = q+1 across all regimes 

def M_final_rs(x,sigma_vec,kappa_vec, theta_vec, Q, L_vec):
    
    M_final_matrix = np.zeros(shape=(1,len(x)*len(Q)))
    Q_temp = Q[~np.eye(Q.shape[0],dtype=bool)].reshape(Q.shape[0],-1)
    for i in range(0,len(Q)):
        M_block = M_matrix_rs(x,sigma_vec[i],kappa_vec[i],theta_vec[i],Q_temp[i,], L_vec[i])
        print(Q_temp)
        #print(sigma_vec[i],kappa_vec[i],theta_vec[i],jump_rate_vec[i],Q_temp[i,])
        #print(DataFrame(M_block))
        for j in range(0,len(Q)):
            X_current = np.eye(len(x))*(-dt*Q[i,j])
            X_current[x <= 0.0] = 0.0
            X_current[x >= L_vec[i]] = 0.0
            if j < i:
                M_block = np.block([X_current, M_block])
            if j > i:
                M_block = np.block([ M_block, X_current]) 
        
        M_final_matrix = np.block([
            [M_final_matrix],
            [M_block]
        ])
        
    return M_final_matrix[1:,:]

In [6]:
# Calculation of integral term in the PIDE

def jump_integral(x, mu, N, sigma_jump, step_x):
    integral_list = []
    positions = np.arange(int(-N/2+1),int(N/2),1)
    for j in positions: 
        x_jump = step_x*j
        integral = jump_distribution_full(x_jump, mu, sigma_jump, step_x)
        #print(x_jump, integral)
        integral_list.append(integral)

    return sum(integral_list)

In [7]:
# Construction of matrix containing the coefficients of the solution at time t = q in the implicit scheme for a single regime (block)

def N_matrix_rs(x, poisson_rate, N, mu, sigma_jump, step_x, L):
    N_m = np.zeros(shape=(len(x),len(x)))
    positions = np.arange(int(-N/2+1),int(N/2),1)
    jump_result_list = []
    for i in range(0,(len(x))):
        integral_list = []
        if x[i] > 0.0 and x[i] < L:
            for j in positions: 
                x_jump = step_x*j
                if (i+j) >= 0 and (i+j) <= (len(x)-1):
                    integral = jump_distribution_full(x_jump, mu, sigma_jump, step_x)
                    integral_list.append(integral)
                    N_m[i,i+j] = poisson_rate*dt*integral
                    
            jump_integral = sum(integral_list)
            N_m[i,i] = N_m[i,i]+ (1 - poisson_rate*dt*jump_integral)
            
            jump_result_list.append(jump_integral)
            if 1-poisson_rate*dt*jump_integral < 0:
                print('Error in integral approximation')
                
    print(f'Min: {min(jump_result_list,  default="EMPTY")} Max: {max(jump_result_list,  default="EMPTY")}')
            
    return N_m

In [8]:
# Construction of full block N matrix containing the coefficients of the solution at time t = q+1 across all regimes 

def N_final_rs(x, poisson_rate, N, mu, sigma_jump, step_x, Q, L_vec):
    
    N_final_matrix = np.zeros(shape=(1,len(x)*len(Q)))
    for i in range(0,len(Q)):
        N_block = N_matrix_rs(x, poisson_rate, N, mu, sigma_jump,step_x, L_vec[i])
        for j in range(0,len(Q)):
            X_current = np.zeros((len(x),len(x)))
            if j < i:
                N_block = np.block([X_current, N_block])
            if j > i:
                N_block = np.block([N_block, X_current]) 
    
        N_final_matrix = np.block([
            [N_final_matrix],
            [N_block]
        ])
    
    return N_final_matrix[1:,:]

In [9]:
# Construction of boundary condition block matrix for all regimes

def b_matrix(L_vec, boundary_conditions_vec):
    b_final_matrix = np.array([0])
    for i in range(0,len(boundary_conditions_vec)):
        b_block = np.zeros(len(x))
        b_block[x >= L_vec[i]] = boundary_conditions_vec[i]
        b_final_matrix = np.block([
            b_final_matrix, b_block
        ])
    
    return b_final_matrix[1:]

In [10]:
# Construction of initial condition block matrix for all regimes 

def phi_matrix_def(boundary_conditions_vec, initial_conditions_vec, x, t):
    phi_final_matrix = np.zeros(shape=(1,len(t)))
    for i in range(0, len(boundary_conditions_vec)):
        phi_block = np.zeros(shape = (len(x),len(t)))
        phi_block[:,0] = initial_conditions_vec[i] * (x>0.0)
        phi_block[len(x)-1,:] = boundary_conditions_vec[i]
        
        phi_final_matrix = np.block([
            [phi_final_matrix],
            [phi_block]
        ])
        
    return phi_final_matrix[1:,:]

In [11]:
# Example - extension of boundary term

x0 = -8.0
xn = 8.0
xsteps = 1000
dx = (xn-x0)/xsteps
x = np.arange(x0, xn+dx, dx) #last x value is x = 99 
#print(x,len(x),dx)

t0 = 0.0
tn = 1.0
tsteps = 1000
dt = (tn-t0)/tsteps
t = np.arange(t0, tn+dt, dt) #last time value is t = 999.9
r = dt / (dx**2) # ensure r < 1

In [12]:
# Example 

# Define Continuous Time Markov Chain generator for summation term in the PIDE

Q = np.array([[-0.5, 0.3, 0.2], 
    [0.3, -0.6, 0.3],   
    [0.0,0.0,-0.0]])

# Define process parameters per regime

kappa_vec = np.array([0.3,0.2,0.0])    ##### The parameters have been changed compared to the MC comparison notebook (limit S = 4.0 should still be ok)
theta_vec = np.array([0.8, 0.5, 0.0])
sigma_vec = np.array([0.3, 0.5, 0.0])

# Define jump term parameters and related discretization parameters

poisson_rate = 1.0
mu_choice = 0.0
sigma_jump_choice = 0.5
N_choice = 160
L_vec = np.array([6.0, 6.0, 6.0])
boundary_conditions_vec = np.array([1.0,1.0,0.0])
initial_conditions_vec = np.array([1.0,1.0,0.0])
#int_approx = jump_integral(x = x, mu = mu_choice, N = N_choice, sigma_jump = sigma_jump_choice, step_x = dx)


# Run Finite Difference iteration scheme

M = M_final_rs(x,sigma_vec,kappa_vec, theta_vec, Q, L_vec = L_vec)
N_mat = N_final_rs(x, poisson_rate = poisson_rate, N = N_choice, 
                   mu = mu_choice, sigma_jump = sigma_jump_choice, step_x = dx, Q = Q, L_vec = L_vec)

b = b_matrix(L_vec = L_vec, boundary_conditions_vec = boundary_conditions_vec)
phi_matrix = phi_matrix_def(boundary_conditions_vec = boundary_conditions_vec, 
                            initial_conditions_vec = initial_conditions_vec, x = x, t = t)

stability = [sigma_vec[i]**2 >= np.max(np.abs(dx*kappa_vec[i]*(theta_vec[i]-x))) for i in range(0,len(sigma_vec))]

if not all(stability):
    print('Stability conditions may not be satisfied')
    
for time in tqdm(range(1, len(t))): 
    phi_matrix[:,time] = np.linalg.inv(M).dot(N_mat.dot(phi_matrix[:,time-1]) + b)


[[0.3 0.2]
 [0.3 0.3]
 [0.  0. ]]
[[0.3 0.2]
 [0.3 0.3]
 [0.  0. ]]
[[0.3 0.2]
 [0.3 0.3]
 [0.  0. ]]
Min: 0.9890408915405504 Max: 0.9890408915405504
Min: 0.9890408915405504 Max: 0.9890408915405504
Min: 0.9890408915405504 Max: 0.9890408915405504


  0%|          | 0/1000 [00:00<?, ?it/s]

In [13]:
accept_x_0 = np.where((x >= 0) & (x <= L_vec[0]))[0]
accept_x_1 = np.where((x >= 0) & (x <= L_vec[1]))[0]
accept_x_2 = np.where((x >= 0) & (x <= L_vec[2]))[0]

In [14]:
accept_x_0 = np.where((x >= 0) & (x <= 4.008))[0]
accept_x_1 = np.where((x >= 0) & (x <= 4.008))[0]
accept_x_2 = np.where((x >= 0) & (x <= 4.008))[0]

In [14]:
def survival(space,time):
    return phi_matrix[space, time]

In [16]:
X = 0*len(x)+ accept_x_0
T = np.arange(0,len(t),1)
X, T = np.meshgrid(X, T)
Phi = survival(X,T)

data=go.Surface(z=Phi, x=x0+dx*accept_x_0, y=t0+dt*T)

layout = go.Layout(scene = dict(
                    xaxis_title='Initial Position',
                    yaxis_title='Time until maturity',
                    zaxis_title='Survival Probability'))

fig = go.Figure(data=[data], layout=layout)
pyoff.plot(fig)

'temp-plot.html'

In [17]:
def survival_av(space,time):
    return (phi_matrix[space,time] + phi_matrix[1*len(x)+space,time] + phi_matrix[2*len(x)+space,time])/3.0

survival_av(550,0)

0.6666666666666666

In [32]:
X = 0*len(x)+accept_x_0#np.arange(len(x),2*len(x),1)
T = np.arange(0,len(t),1)
X, T = np.meshgrid(X, T)
Phi = survival_av(X,T)

data=go.Surface(z=Phi, x=x0+dx*X, y=t0+dt*T)

layout = go.Layout(scene = dict(
                    xaxis_title='Initial Position',
                    yaxis_title='Time until maturity',
                    zaxis_title='Survival Probability'))

fig = go.Figure(data=[data], layout=layout)
pyoff.plot(fig)

'temp-plot.html'

In [16]:
surv = DataFrame(phi_matrix)