In [1]:
import matplotlib, time, copy
import matplotlib.pyplot as plt
import autograd.numpy as np
import autograd.scipy.stats as sps_autograd
from autograd import grad, hessian
from statsmodels.tsa.arima_process import ArmaProcess
from scipy.optimize import minimize
from scipy.linalg import toeplitz

# Simulate ARMA data 

In [2]:
"""
Simulate ARMA(1, 1) model
"""

# Define AR and MA coefficients
ar = np.array([1, -0.5])  
ma = np.array([1, 0.4])        

# Create ARMA process object
arma_process = ArmaProcess(ar, ma)

# Simulate 10000 samples
N = 1000
y = arma_process.generate_sample(nsample=N)

# Karman Filter 

In [3]:
"""
def initialize_FGHQ(a, b):
    
    Construct the state-space matrices F, G, H for an ARMA(p, q) model.

    Parameters:
    - p: int, order of the AR component
    - q: int, order of the MA component
    - a: list or np.array of AR coefficients [a1, a2, ..., ap]
    - b: list or np.array of MA coefficients [b1, b2, ..., bq]

    Returns:
    - F: state transition matrix of shape (k, k)
    - G: noise coefficient matrix of shape (k, 1)
    - H: observation matrix of shape (1, k)
    - Q: covariance identity matrix of shape (k, k)
    
    p = len(a)
    q = len(b)
    k = max(p, q + 1)  # dimension of the state vector
    F = np.zeros((k, k))
    G = np.zeros((k, 1))
    H = np.zeros((1, k))

    # Fill the first column of F with AR coefficients
    for i in range(p):
        F[i, 0] = a[i]

    # Fill the lower subdiagonal of F with 1s (shifting the state)
    for i in range(k - 1):
        F[i, i + 1] = 1

    # Fill G with negative MA coefficients, first element = 1
    for i in range(q):
        G[i+1, 0] = -b[i]
    G[0, 0] = 1  # first element always set to 1

    # Matrix H: only first element is 1
    H[0, 0] = 1

    # Initialize covariance matrix Q as identity matrix
    Q = np.eye(k)

    return F, G, H, Q

"""

'\ndef initialize_FGHQ(a, b):\n\n    Construct the state-space matrices F, G, H for an ARMA(p, q) model.\n\n    Parameters:\n    - p: int, order of the AR component\n    - q: int, order of the MA component\n    - a: list or np.array of AR coefficients [a1, a2, ..., ap]\n    - b: list or np.array of MA coefficients [b1, b2, ..., bq]\n\n    Returns:\n    - F: state transition matrix of shape (k, k)\n    - G: noise coefficient matrix of shape (k, 1)\n    - H: observation matrix of shape (1, k)\n    - Q: covariance identity matrix of shape (k, k)\n\n    p = len(a)\n    q = len(b)\n    k = max(p, q + 1)  # dimension of the state vector\n    F = np.zeros((k, k))\n    G = np.zeros((k, 1))\n    H = np.zeros((1, k))\n\n    # Fill the first column of F with AR coefficients\n    for i in range(p):\n        F[i, 0] = a[i]\n\n    # Fill the lower subdiagonal of F with 1s (shifting the state)\n    for i in range(k - 1):\n        F[i, i + 1] = 1\n\n    # Fill G with negative MA coefficients, first el

In [4]:
def initialize_FGHQ(a, b):
    """
    Construct the state-space matrices F, G, H for an ARMA(p, q) model.

    Parameters:
    - a: list or np.array of AR coefficients [a1, a2, ..., ap]
    - b: list or np.array of MA coefficients [b1, b2, ..., bq]

    Returns:
    - F: state transition matrix of shape (k, k)
    - G: noise coefficient matrix of shape (k, 1)
    - H: observation matrix of shape (1, k)
    - Q: covariance identity matrix of shape (k, k)
    - dF: derivative of F with respect to theta of shape (l, k, k)
    - dG: derivative of G with respect to theta of shape (l, k, 1)
    """
    p = len(a)
    q = len(b)
    k = max(p, q + 1)  # dimension of the state vector
    F = np.zeros((k, k))
    G = np.zeros((k, 1))
    H = np.zeros((1, k))

    # Fill the first column of F with AR coefficients
    for i in range(p):
        F[i, 0] = a[i]

    # Fill the lower subdiagonal of F with 1s (shifting the state)
    for i in range(k - 1):
        F[i, i + 1] = 1

    # Fill G with negative MA coefficients, first element = 1
    for i in range(q):
        G[i+1, 0] = -b[i]
    G[0, 0] = 1  # first element always set to 1

    # Matrix H: only first element is 1
    H[0, 0] = 1

    # Initialize covariance matrix Q as identity matrix
    Q = np.eye(k)

    # Compute derivatives of F and G with respect to theta
    dF = np.zeros((k, k, k))
    dF[0, 0, 0] = 1 # ARMA(1,1)

    dG = np.zeros((k, k, 1))
    dG[1, 1, 0] = -1 # ARMA(1,1)

    return F, G, H, Q, dF, dG 

In [5]:
def log_likelihood(sigma2_hat, r):
    N = len(r)
    return -0.5 * (N * np.log(2 * np.pi) 
                   + N * np.log(sigma2_hat) 
                   + np.sum(np.log(r)) + N)

In [6]:
def karman_filter_arma(theta):
    p = 1   # AR order
    q = 1   # MA order
    a = theta[:p]
    b = theta[p:p+q]
    k = max(p, q + 1)
    
    F, G, H, Q, dF, dG = initialize_FGHQ(a, b)

    # Initialize values
    x = np.zeros((k, 1))
    V = np.eye(k) * 100
    e = np.zeros((N, 1))
    r = np.zeros((N, 1))

    # Implement Kalman filter
    for t in range(N):
        # Predict one-step-ahead state predictive density of x_{t}
        x_predict = F @ x
        V_predict = F @ V @ F.T + G @ G.T

        # Compute forecast error and one-step-ahead predictive variance
        e[t] = y[t] - (H @ x_predict).item()
        r[t] = (H @ V_predict @ H.T).item()

        # Kalman gain
        K = V_predict @ H.T / r[t]

        # Update current state and covariance
        x = x_predict + K * e[t]
        V = (np.eye(k) - K @ H) @ V_predict

    sigma2_hat = np.sum(e**2 / r) / N

    return sigma2_hat, r

In [7]:
def obj_func_likelihood(theta):
    sigma2_hat, r = karman_filter_arma(theta)
    log_lik = log_likelihood(sigma2_hat, r)

    return -log_lik  

In [8]:
"""
Test the log-likelihood function
"""

theta_start = [0.1, 0.1]

# Minimize negative log-likelihood
result = minimize(obj_func_likelihood, theta_start, method='BFGS')

# Print results
print("Estimated parameters:", result.x)
print("Negative log-likelihood:", result.fun)

Estimated parameters: [ 0.54290688 -0.35020954]
Negative log-likelihood: 1390.6811877545556


# Test algorithm

In [9]:
def compute_initial_V_and_dV(a, b, sigma2):
    """
    Tính toán V ban đầu và dV/d(theta) ban đầu.
    Thực thi các phương trình từ mục 3.3.2 của tài liệu.
    """
    p, q = 1, 1
    k = max(p, q + 1)  

    # 1. Tính hàm đáp ứng xung g_k (Eq. 52)
    g = np.zeros(k + 1)
    g[0] = 1.0
    g[1] = a - b
    g[2] = a * g[1]

    C = np.zeros(k + 1)
    C[0] = sigma2 * (1 - 2 * a * b + b**2) / (1 - a**2)
    C[1] = a * C[0] - sigma2 * b
    C[2] = a * C[1]

    V = np.zeros((k, k))
    V[0, 0] = C[0]
    V[0, 1] = V[1, 0] = - b * g[0]
    V[1, 1] = b**2 * sigma2

    dV = np.zeros((k, k, k))
    dV[0, 0, 0] = (2 * sigma2 * (a-b) * (1 - a*b)) / (1 - a**2)**2
    dV[0, 0, 1] = 0
    dV[0, 1, 0] = 0
    dV[0, 1, 1] = 0
    dV[1, 0, 0] = 2 * sigma2 * (b-a) / (1 - a**2)
    dV[1, 0, 1] = -1
    dV[1, 1, 0] = -1
    dV[1, 1, 1] = 2 * b * sigma2

    return V, dV

In [10]:
def grad_obj_func_likelihood(theta):
    """
    Implement the gradient of the log-likelihood function for ARMA(1,1) model.
    """
    
    a = theta[0]
    b = theta[1]
    k = 2

    # Initialize the state-space matrices
    F, G, H, Q, dF, dG = initialize_FGHQ(np.array([a]), np.array([b]))
    
    # Initialize the x and V matrices
    V, dV = compute_initial_V_and_dV(a, b, 1)
    x = np.zeros((k, 1))
    dx = np.zeros((2, 2, 1))
    e = np.zeros((N, 1))
    r = np.zeros((N, 1))
    de = np.zeros((N, 2))
    dr = np.zeros((N, 2))

    # 
    for t in range(N):
        # 1. Predict
        # Predict one-step-ahead state predictive density of x_{t}
        x_predict = F @ x
        V_predict = F @ V @ F.T + G @ G.T

        # Compute forecast error and one-step-ahead predictive variance
        e[t] = y[t] - (H @ x_predict).item()
        r[t] = (H @ V_predict @ H.T).item()


        # Kalman filter for gradient
        dx_predict = F @ dx + dF @ x
        dV_predict = F @ dV @ F.T + dF @ V @ F.T + F @ V @ dF.T + dG @ G.T  # + G @ dG.T

        de_t = -H @ dx_predict
        dr_t = H @ dV_predict @ H.T


        # 2. Update
        # Kalman gain
        K = V_predict @ H.T / r[t]

        # Update current state and covariance
        x = x_predict + K * e[t]
        V = (np.eye(k) - K @ H) @ V_predict

        dK = (dV_predict @ H.T / r[t]) - (V_predict @ H.T / r[t]**2 * dr[t])
        dx = dx_predict + K @ de_t + dK * e[t]
        dV = dV_predict - dK * H @ V_predict - K @ H @ dV_predict

        # Store value de and dr 
        de[t] = de_t.flatten()
        dr[t] = dr_t.flatten()

    # === 3. Tính Gradient cuối cùng ===
    sigma2_hat = np.sum(e**2 / r) / N
    
    grad = -0.5 * sum(dr / r) - (1/sigma2_hat) * sum(de * e / r) + (1/(2*sigma2_hat)) * sum(dr * e**2 / r**2)
    
    return grad

In [16]:
"""
Test the log-likelihood function
"""

theta_start = [0.1, 0.1]

# Minimize negative log-likelihood
result = minimize(obj_func_likelihood, theta_start, method='BFGS', jac=grad_obj_func_likelihood,
                  options={'gtol': 1e-04, 'maxiter': 1000, 'disp': True})

# Print results
print("Estimated parameters:", result.x)
print("Negative log-likelihood:", result.fun)

ValueError: could not broadcast input array from shape (4,) into shape (2,)

In [12]:
a = 0.3
b = 0.5
k = 2

# Initialize the state-space matrices
F, G, H, Q, dF, dG = initialize_FGHQ(np.array([a]), np.array([b]))

# Initialize the x and V matrices
V, dV = compute_initial_V_and_dV(a, b, 1)
x = np.zeros((k, 1))
dx = np.zeros((2, 2, 1))
e = np.zeros((N, 1))
r = np.zeros((N, 1))
de = np.zeros((N, 2))
dr = np.zeros((N, 2))

    # 
t = 0
# 1. Predict
# Predict one-step-ahead state predictive density of x_{t}
x_predict = F @ x
V_predict = F @ V @ F.T + G @ G.T

# Compute forecast error and one-step-ahead predictive variance
e[t] = y[t] - (H @ x_predict).item()
r[t] = (H @ V_predict @ H.T).item()


# Kalman filter for gradient
dx_predict = F @ dx + dF @ x
dV_predict = F @ dV @ F.T + dF @ V @ F.T + F @ V @ dF.T + dG @ G.T  # + G @ dG.T

de_t = -H @ dx_predict
dr_t = H @ dV_predict @ H.T


# 2. Update
# Kalman gain
K = V_predict @ H.T / r[t]

# Update current state and covariance
x = x_predict + K * e[t]
V = (np.eye(k) - K @ H) @ V_predict

dK = (dV_predict @ H.T / r[t]) - (V_predict @ H.T / r[t]**2 * dr[t])
dx = dx_predict + K @ de_t + dK * e[t]
dV = dV_predict - dK * H @ V_predict - K @ H @ dV_predict

# Store value de and dr 
de[t] = de_t.flatten()
dr[t] = dr_t.flatten()

In [15]:
de[t]

array([-0.41057843,  0.43956044])