Maximum Likelihood Estimation for CIR process

In [None]:
from AlgorithmImports import *
import math
import scipy.optimize as so
import scipy.special as sp
import numpy as np
import statsmodels.api as sm
import pandas as pd


def loglikelihood(params,*args):
    theta,mu,sigma = params
    
    #if(sigma > np.sqrt(2*mu*theta)):
    #   sigma = np.sqrt(2*theta*mu)
        
    y, dt = args
    n = len(y)
    
    sigma_tilde = np.sqrt(sigma**2*((1 - np.e**(-mu*dt))/(2*mu)))
    q = 2*mu*theta/sigma**2 - 1
    sum_1 = 0
    sum_2 = 0
    
    for i in range(1,n):
        sum_1 += y[i] + y[i - 1]*np.e**(-mu*dt)
        sum_2 += q/2*np.log(y[i]/(y[i - 1]*np.e**(-mu*dt))) + np.log(sp.iv(q,2*np.sqrt(y[i]*y[i - 1]*np.e**(-mu*dt))/sigma_tilde**2))

    
    log_likelihood = -2*np.log(sigma_tilde) - 1/(n*sigma_tilde**2)*sum_1 + (1/n)*sum_2
    
    return -log_likelihood

def estimate_coefficients(y, dt):
    
   # theta ∈ ℝ, mu > 0, sigma > 0
                                                           # we need 1e-10 b/c scipy bounds are inclusive of 0, 
                                                          # and sigma = 0 causes division by 0 error
    
    #y_t = np.diff(y)/np.sqrt(y[0:len(y) - 1])
    #x_1 = dt/np.sqrt(y[0:len(y) - 1])
    #x_2 = np.sqrt(y[0:len(y) - 1])
    #df = pd.DataFrame({'y' : y, 'x_1' : x_1, 'x_2' : x_2})
    #model = sm.OLS(df['y'],df[['x_1','x_2']])
    #mu_init = -model.fit.params[1]
    #theta_init = -model.fit.params[0]/model.fit.params[1]
    #sigma_init = model.fit.resid.std()
    #print(mu_init,theta_init,sigma_init)
    N = len(y)
  
    theta_init = np.mean(y)
    initial_guess = (theta_init, 100, 100)
    bounds = ((1e-5, None), (1e-5, None), (1e-5, None))
      # initial guesses for theta, mu, sigma
    result = so.minimize(loglikelihood, initial_guess, args=(y,dt), bounds=bounds)
    theta, mu, sigma = result.x 
    
    return theta, mu, sigma, result.fun

# From QuantConnect.com

def compute_portfolio_values(ts_A, ts_B, alloc_B):
        ts_A = ts_A / ts_A[0]
        ts_B = ts_B / ts_B[0]
        return ts_A - alloc_B * ts_B

def argmax_B_alloc(ts_A, ts_B,dt):
        
        theta = mu = sigma = alloc_B = 0
        max_log_likelihood = 0

        def compute_coefficients(x):
            portfolio_values = compute_portfolio_values(ts_A, ts_B, x)
            return estimate_coefficients(portfolio_values,dt)
        
        vectorized = np.vectorize(compute_coefficients)
        linspace = np.linspace(.001, 1, 100)
        res = vectorized(linspace)
        index = res[3].argmax()

        return res[0][index], res[1][index], res[2][index], linspace[index]

In [None]:
import yfinance as yf
from statsmodels.tsa.stattools import adfuller
from pymle.models import CIR
import pymle.models.OrnsteinUhlenbeck as OU
from pymle.core.TransitionDensity import ExactDensity, KesslerDensity
from pymle.fit.AnalyticalMLE import AnalyticalMLE
import numpy as np
import matplotlib.pyplot as plt

gld = yf.download('GC=F', start='2020-01-01', end='2022-12-31', progress=False)
crude = yf.download('CL=F', start='2020-01-01', end='2022-12-31', progress=False)
silver = yf.download('SI=F', start='2020-01-01', end='2022-12-31',progress=False)
copper = yf.download('HG=F', start='2020-01-01', end='2022-12-31', progress=False)
palladium = yf.download('PA=F', start='2020-01-01', end='2022-12-31', progress=False)
platinum = yf.download('PL=F', start='2020-01-01', end='2022-12-31', progress=False)
#corn = yf.download('C=F', start='2020-01-01', end='2022-12-31', progress=False)
ts = compute_portfolio_values(gld['Adj Close'], crude['Adj Close'], 1)


def check_stationarity(timeseries):
    dftest = adfuller(timeseries, autolag='AIC')
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
        dfoutput['Critical Value (%s)'%key] = value
    print(dfoutput)
check_stationarity(ts)

 # ===========================
 # Set the true model (CIR) params, to simulate the process
 # ===========================
 # ===========================
 # Fit maximum Likelihood estimators
 # ===========================
 # Choose some initial guess for params fit
param_bounds = [(0.0001, 1), (0.0001, 30), (0.0001, 1)]
goldCrudeModelsCIR = []
goldCrudeModelsOU = []
 #silverCrudeModels = []
 #goldSilverModels = []
 #copperCrudeModels = []
dt = 1/len(gld)
 # Choose some initial guess for params fit
for i in np.linspace(.001, 0.5, 100):
    model = CIR()
    model2 = OU()
    goldCrude = compute_portfolio_values(gld['Close'], crude['Close'], i)
    guess = np.array([0.5, np.mean(goldCrude), 0.10])
    #silverCrude = compute_portfolio_values(silver['Close'], crude['Close'], i)
    #goldSilver = compute_portfolio_values(gld['Close'], silver['Close'], i)
    #copperCrude = compute_portfolio_values(copper['Close'], crude['Close'], i)
    exact_est = AnalyticalMLE(goldCrude, param_bounds, dt, density=ExactDensity(model)).estimate_params(guess)
    goldCrudeModelsCIR.append(exact_est)
    exact_est2 = AnalyticalMLE(goldCrude, param_bounds, dt, density=ExactDensity(model2)).estimate_params(guess)
    goldCrudeModelsOU.append(exact_est2)
    #exact_est = AnalyticalMLE(silverCrude, param_bounds, dt,density=ExactDensity(model)).estimate_params(guess)
    #silverCrudeModels.append(exact_est)
    #exact_est = AnalyticalMLE(goldSilver, param_bounds, dt,density=ExactDensity(model)).estimate_params(guess)
    #goldSilverModels.append(exact_est)
    #exact_est = AnalyticalMLE(copperCrude, param_bounds, dt, density=ExactDensity(model)).estimate_params(guess)
    #copperCrudeModels.append(exact_est)

log_likelihoods = [goldCrudeModelsCIR[i].log_like for i in range(len(goldCrudeModelsCIR))]
plt.plot(np.linspace(.001, 0.5, 100), log_likelihoods)
plt.show()
print("optimal parameters are: ", goldCrudeModelsCIR[np.argmax(log_likelihoods)].params)
print("optimal B is: ", np.linspace(.001, 0.5, 100)[np.argmax(log_likelihoods)])
log_likelihoods = [goldCrudeModelsOU[i].log_like for i in range(len(goldCrudeModelsOU))]
plt.plot(np.linspace(.001, 0.5, 100), log_likelihoods)
plt.show()
print("optimal parameters are: ", goldCrudeModelsOU[np.argmax(log_likelihoods)].params)
print("optimal B is: ", np.linspace(.001, 0.5, 100)[np.argmax(log_likelihoods)])

In [None]:
from AlgorithmImports import *
import math
import scipy.optimize as sci
import numpy as np
import mpmath

def V_X(y, theta, mu, sigma, r,c):
    b_star = compute_b_star(mu,theta,sigma,r)
    if y >= 0 and y < b_star:
        return (b_star- c)/F_X(b_star,theta, mu, sigma, r,c)*F_X(y,theta, mu, sigma, r,c)
    else:
        return y - c
    
def dV_X(y,theta, mu, sigma, r,c):
    b_star = compute_b_star(mu,theta,sigma,r)
    if y >= 0 and y < b_star:
        return (b_star- c)/F_X(b_star,theta, mu, sigma, r,c)*dF_X(y,theta, mu, sigma, r,c)
    else:
        return 1
    
 # Theorem 4.4
def J_X(y,theta, mu, sigma, r,c):
    d_star = compute_d_star(mu,theta,sigma,r)
    if y >= 0 and y < d_star:
        return V_X(y,theta, mu, sigma, r,c)- (y + c)
    else:
        return (V_X(d_star,theta, mu, sigma, r,c)- (d_star + c))/G_X(d_star,theta, mu, sigma, r,c)*G_X(y,theta, mu, sigma, r,c)
 # 4.9
def F_X(y,mu,theta,sigma,r):
    a,b,z = r/mu,(2*mu*theta)/sigma**2,(2*mu*y)/sigma**2
    return mpmath.hyp1f1(a,b,z)

def dF_X(y,mu,theta,sigma,r):
    a,b,z = r/mu,(2*mu*theta)/sigma**2,(2*mu*y)/sigma**2
    return a/b*mpmath.hyp1f1(a,b,z)

 # 4.9
def G_X(y,mu,theta,sigma,r):
    a,b,z = r/mu,(2*mu*theta)/sigma**2,(2*mu*y)/sigma**2
    return mpmath.gamma(1- b)/mpmath.gamma(a- b + 1)*mpmath.hyp1f1(a,b,z) + mpmath.gamma(b- 1)/mpmath.gamma(a)*z**(1- b)*mpmath.hyp1f1(a- b + 1,2-b,z)

def dG_X(y,mu,theta,sigma,r):
    a,b,z = r/mu,(2*mu*theta)/sigma**2,(2*mu*y)/sigma**2
    return-a*(mpmath.gamma(1- b)/mpmath.gamma(a- b + 1)*mpmath.hyp1f1(a,b,z) + mpmath.gamma(b- 1)/mpmath.gamma(a)*z**(1- b)*mpmath.hyp1f1(a- b + 1,2-b,z))

#def y_b(y):
    #return (self.mu*self.theta- self.r*self.c_b)/(self.mu + self.r)
#def y_s(y):
 #
    #return (self.mu*self.theta + self.r*self.c_s)/(self.mu + self.r)

def compute_b_star(mu,theta,sigma,r):
    def f(y):
        return F_X(y,mu,theta,sigma,r)- (y)*dF_X(y,mu,theta,sigma,r)
    return sci.brentq(f, 0, 1)
def compute_d_star(mu,theta,sigma,r):
    def g(y):
        return G_X(y,mu,theta,sigma,r)(dV_X(y,mu,theta,sigma,r,r)- 1)- dG_X(y,mu,theta,sigma,r)(V_X(y,mu,theta,sigma,r,r)- y)
    return sci.brentq(g, 0, 1)