In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.decomposition import PCA

import numpy as np
import scipy
from scipy.special import gammaln
from scipy.stats import norm
from scipy.stats import t as student_t

In [5]:
# Load each
datasets = {}
for hour in range(24):
    datasets[hour] = pd.read_csv(f"Splits/dataset_hour_{hour}.csv")

datasets_train = {hour: datasets[hour][(datasets[hour]['DATE'] >= '2014-01-01') & (datasets[hour]['DATE'] < '2023-01-01')].drop(['DATE'], axis=1).to_numpy() for hour in range(24)}
datasets_test = {hour: datasets[hour][datasets[hour]['DATE'] >= '2023-01-01'].drop(['DATE'], axis=1).to_numpy() for hour in range(24)}

y_train = {hour: datasets_train[hour][:, 0].reshape(-1, 1) for hour in range(24)}
y_test = {hour: datasets_test[hour][:, 0].reshape(-1, 1) for hour in range(24)}

weather_train = {hour: datasets_train[hour][:, 1:-1] for hour in range(24)}
weather_test = {hour: datasets_test[hour][:, 1:-1] for hour in range(24)}

In [11]:
Y = y_train[20]
X = weather_train[20]
X = X[:,27:30]

In [7]:

class DARMAX:
    def __init__(self, p):
        """
        Initialize the DARMA-X class
        """
        self.p = p
        self.d = 0
        self.num_params = None
        self.params = None
        self.loglike_val = None
        self.T = None
    
    def t(self, Y):
        T = len(Y)
        self.T = T
        t = np.arange(start=-1, stop=-(T+1), step=-1)
        
        return t
    
    def y(self, Y):
        
        max_lag = self.p + 1

        Y = np.vstack((np.full((max_lag, Y.shape[1]), np.nan), Y))
        y = lambda t: Y[t,:]
        
        return y
    
    def x(self, X = None):

        max_lag = self.p + 1

        if X is None:
            x = lambda t: np.zeros((self.d,))
        else:
            X = np.vstack((np.full((max_lag, X.shape[1]), np.nan), X))
            x = lambda t: X[t, :]
        
        return x

    def parse_params(self, params):
        p, d = self.p, self.d

        rho = params[:p]
        omega, phi = params[p], params[p + 1]
        alpha = params[p + 2:p + 2 + p]
        gamma = params[p + 2 + p:p + 2 + p + p * d].reshape(p, d) if p * d > 0 else np.zeros((p, d))
        nu = params[-1] if len(params) > p + 2 + p + p * d else None

        return {
            "rho": rho,
            "omega": omega,
            "phi": phi,
            "alpha": alpha,
            "gamma": gamma,
            **({"nu": nu} if nu is not None else {})
        }

    def cond_var(self, Y, X = None, params = None):
        
        omega = params["omega"]
        alpha = params["alpha"]
        gamma = params["gamma"]

        y = self.y(Y)
        x = self.x(X)

        sigma = lambda t: np.sqrt(omega
                                  + sum(alpha[i-1] * y(t-i)**2 for i in range(1, self.p+1))
                                  + sum((gamma[i-1] @ x(t-i).T**2).reshape(-1, 1) for i in range(1, self.p+1))
                                  )
        
        cond_var = lambda t: sigma(t)**2

        return cond_var
    
    def cond_mean(self, Y, X = None, params = None):
  
        rho = params["rho"]
        omega = params["omega"]
        alpha = params["alpha"]
        gamma = params["gamma"]
        phi = params["phi"]

        y = self.y(Y)
        x = self.x(X)
        
        sigma = lambda t: np.sqrt(omega
                                  + sum(alpha[i-1] * y(t-i)**2 for i in range(1, self.p+1))
                                  + sum((gamma[i-1] @ x(t-i).T**2).reshape(-1, 1) for i in range(1, self.p+1))
                                  )
        
        beta = lambda t: phi * (y(t) - sum(rho[i-1] * y(t-i) for i in range(1, self.p + 1))) / sigma(t)

        cond_mean = lambda t: sum(rho[i-1] * y(t-i) for i in range(1, self.p + 1)) + sigma(t) * beta(t-1)

        return cond_mean
    
    def fit(self, Y, X = None, dist = "normal"):

        t = self.t(Y)
        self.d = 0 if X is None else X.shape[1]
        self.num_params = 2 + (2 + self.d) * self.p + (1 if dist == "student-t" else 0)
        
        def MLE(params):
            
            params = self.parse_params(params)
            
            y = self.y(Y)
            cond_var = self.cond_var(Y, X, params)
            cond_mean = self.cond_mean(Y, X, params)

            if dist == "normal":
                l_t = -1/2 * (np.log(cond_var(t)) + (y(t) - cond_mean(t))**2 / cond_var(t))
            
            elif dist == "student-t":
                nu = params["nu"]  # Degrees of freedom
                term1 = -0.5 * np.log(cond_var(t) * (nu - 2) * np.pi)
                term2 = gammaln((nu + 1) / 2) - gammaln(nu / 2)
                term3 = -((nu + 1) / 2) * np.log(1 + ((y(t) - cond_mean(t))**2) / (cond_var(t) * (nu - 2)))
                l_t = term1 + term2 + term3

            else:
                raise ValueError("dist must be 'normal' or 'student-t'")
            
            L_t = np.nansum(l_t)
            return -L_t
        
        init_params = np.full(self.num_params, 0.5)
        if dist == "student-t":
            init_params[-1] = 3  # Set nu to 3 explicitly
        
        bounds = ([(None, None)] * self.p  # rho
                + [(1e-6, None)]  # omega
                + [(1e-6, 1 - 1e-6)]  # phi
                + [(1e-6, None)] * self.p  # alpha
                + [(1e-6, None)] * (self.p * self.d)  # gamma
                + ([(2 + 1e-6, None)] if dist == "student-t" else [])  # nu (degrees of freedom) if student-t
                )
        
        result = scipy.optimize.minimize(MLE, init_params, method = "SLSQP", bounds = bounds)
        self.params = self.parse_params(result.x)
        self.loglike_val = -result.fun
        
        return self
    
    def std_res(self, Y, X = None):
        
        t = self.t(Y)
        y = self.y(Y)

        cond_mean = self.cond_mean(Y, X, self.params)
        cond_var = self.cond_var(Y, X, self.params)

        z = lambda t: (y(t) - cond_mean(t)) / np.sqrt(cond_var(t))
        return np.flip(z(t))

    def predict(self, Y, X = None):
        
        t = self.t(Y)
        
        cond_mean = self.cond_mean(Y, X, self.params)
        
        return np.flip(np.append(cond_mean(t+1), np.nan)).reshape(-1,1)
        
    def CI(self, Y, X = None, alpha_level = 0.05, dist = "normal"):

        t = self.t(Y)
        cond_mean = self.cond_mean(Y, X, self.params)
        cond_var = self.cond_var(Y, X, self.params)

        if dist == "normal":
            z_val = norm.ppf(1 - alpha_level / 2)

        elif dist == "student-t":
            nu = self.params["nu"]
            z_val = student_t.ppf(1 - alpha_level / 2, df = nu) * np.sqrt((nu - 2) / nu)
        
        else:
            raise ValueError("dist must be 'normal' or 'student-t'")
        

        CI = lambda t: np.column_stack((cond_mean(t) + z_val * np.sqrt(cond_var(t)), 
                                        cond_mean(t) - z_val * np.sqrt(cond_var(t))
                                        ))
        
        return np.vstack((np.full((1, 2), np.nan), np.flip(CI(t+1))))

In [13]:
model = DARMAX(p=4)
model.fit(Y,X, dist = "student-t")
model.predict(Y,X)

  fx = wrapped_fun(x)
  g = append(wrapped_grad(x), 0.0)


array([[         nan],
       [         nan],
       [         nan],
       ...,
       [218.2246855 ],
       [ 83.34541559],
       [146.87625333]])