# Time Series Forecasting - Autoregressive Models with Extra Regressors

What Is a ARIMAX, SARIMAX Model?

Article : https://365datascience.com/tutorials/python-tutorials/sarimax/

In [None]:
import pandas as pd 
import numpy as np 
import matplotlib.pyplot as plt
import datetime
import math
import warnings
import pmdarima as pmd
from sklearn.metrics import mean_squared_error
from math import sqrt
from statsmodels.tsa.arima_model import ARMA, ARIMA
from statsmodels.tsa.statespace.sarimax import SARIMAX

In [None]:
# In order to use this notebook for univarate time series analysis :-
# 1) The primary requirement is not to have missing values or categorial(string) data for time_dependent variable 
#    and time_column.
# 2) This cell requires information on file_name (only csv), time_dependent_variable, time_column, date_time format (frmt)
#    and resample grain(X). After filling the required information correctly, you can run all the cells (Cell ---> Run All)
# 3) Example :-
#   file_name               = "JetRail Avg Hourly Traffic Data - 2012-2013.csv"
#   time_dependent_variable = "Count"    (column name in your dataset)
#   time_column             = "Datetime" (column name in your dataset)
#   frmt                    = "%Y-%m-%d"
#   X                       = "D" 
file_name = "Monthly Production of Chocolate - Australia.csv"
time_dependent_variable = "Volume"
time_column = "Month"
frmt =  '%Y-%m'
Resample_grain = "M"
split= .9          #train and test split

### Reading the csv file

In [None]:
def data(method = "csv"):
    df = pd.read_csv(file_name, parse_dates= True)
    df[time_column] = pd.to_datetime(df[time_column],format=frmt) 
    df.index = df[time_column]
    df = df.resample(Resample_grain).mean()
    df.reset_index(inplace= True)
    return df
df = data()

### Splitting the data into train and test

In [None]:
def train_test_split_perc(df):
    total_size=len(df)
    train_size=math.floor(split*total_size)  #(70% Dataset)
    train=df.head(train_size)
    test=df.tail(len(df) -train_size)
    return train, test

In [None]:
def train_test_split_date(df, split_date):
    split_date = '2017-01-01'
    train = df.loc[df.index <= split_date].copy()
    test = df.loc[df.index > split_date].copy()
    return train, test

### Metrics

Probabilistic Model Selection with AIC/BIC in Python

https://medium.com/analytics-vidhya/probabilistic-model-selection-with-aic-bic-in-python-f8471d6add32#:~:text=AIC%20and%20BIC%20techniques%20can%20be%20implemented%20in,statsmodels.formula.api%20provides%20a%20direct%20approach%20to%20compute%20aic%2Fbic.

In [None]:
from sklearn import metrics

def timeseries_evaluation_metrics_func(y_true, y_pred):
    print('Evaluation metric results:-')
    def mean_absolute_percentage_error(y_true, y_pred):
        y_true, y_pred = np.array(y_true), np.array(y_pred)
        return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
    print(f'MSE is : {metrics.mean_squared_error(y_true, y_pred)}')
    print(f'MAE is : {metrics.mean_absolute_error(y_true, y_pred)}')
    print(f'MAPE is : {mean_absolute_percentage_error(y_true,y_pred)}')
    print(f'R2 is : {metrics.r2_score(y_true, y_pred)}')
    print(f'RMSE is : {np.sqrt(metrics.mean_squared_error(y_true, y_pred))}',end='\n\n')

In [None]:
 def plot(method):
    plt.figure(figsize=(12,8))
    plt.plot(train[time_column], train[time_dependent_variable], label='Train')
    plt.plot(test[time_column],test[time_dependent_variable], label='Test')
    plt.plot(y_hat[time_column],y_hat[method], label= method +' forecast')
    plt.legend(loc='best')
    plt.title(method + ' forecast')
    plt.show()
    timeseries_evaluation_metrics_func(y_hat[time_dependent_variable], y_hat[method])


## Add your exogeneous variables 

In [None]:
def date_features(df):
    df['year'] = df[time_column].dt.year
    df['quarter'] = df[time_column].dt.quarter
    df['month'] = df[time_column].dt.month
    df['week_day'] = df[time_column].dt.weekday
    return df
df= date_features(df)

In [None]:
df.head()

### columns reorder with extra regressors

https://stackoverflow.com/questions/35321812/move-column-in-pandas-dataframe 
This function will reorder your columns with time column as first column and time dependent variable column as the last column

In [None]:
df.columns

In [None]:
[ col for col in df.columns if col not in [time_column, time_dependent_variable]]

In [None]:
def column_reorder(df):
    df = df[[time_column] + 
        [ col for col in df.columns if col not in [time_column, time_dependent_variable]] + 
        [time_dependent_variable]]
    return df
df = column_reorder(df)
df.head()

### Train and test split with extra regressors

In [None]:
train, test = train_test_split_perc(df)
y_hat = test.copy()

In [None]:
def extra_regressors(exog_variables_names):    
    exog_train = train[exog_variables_names]
    exog_test = test[exog_variables_names]
    return exog_train, exog_test

i denotes the starting position of the extra regressors columns 

do call the column_reorder() function before

In [None]:
def extra_regressors_with_iloc(i=1):
    exog_train = train.iloc[: , i : -1]
    exog_test = test.iloc[: , i : -1]
    return exog_train, exog_test 
exog_train, exog_test = extra_regressors_with_iloc()

# ARX

If you want only specific list of lags like 1 & 3 as AR components, then you can do that in the following way
https://stackoverflow.com/questions/55882111/arima-model-for-certain-lags

In [None]:
trend = "ct"

In [None]:
def diagnostic_plot(model_fit, lags= 30):
    fig = plt.figure(figsize=(16, 9))
    fig = model_fit.plot_diagnostics(fig=fig, lags=lags)
    print(model_fit.summary())
    print("aic = " +  str(model_fit.aic))
    print("bic = " + str(model_fit.bic))

# Choose the metrics that you want to minimize

In [None]:
def metrics_( method):
    mse = mean_squared_error(test[time_dependent_variable], y_hat[method])
    mae = metrics.mean_absolute_error(test[time_dependent_variable], y_hat[method])
    mape = mean_absolute_percentage_error(test[time_dependent_variable], y_hat[method])
    r2 = metrics.r2_score(test[time_dependent_variable], y_hat[method])
    return mape

In [None]:
from statsmodels.tsa.ar_model import AR
from statsmodels.tsa.ar_model import AutoReg, ar_select_order

def arx(lags , plot_ = False, method = "ARX"):


    model = AutoReg(train[time_dependent_variable], lags=lags,exog = exog_train, trend= trend)
    
    fit1 = model.fit()
    if plot_:
        diagnostic_plot(fit1, lags= 30)
    
    y_hat[method] = fit1.predict(start=len(train), end=len(train)+len(test)-1, dynamic=False, exog_oos= exog_test)
    
     
    return metrics_(method)
arx(lags= 7)

### ARX with Seasonality = True

we have to specify the number of periods as well

In [None]:
from statsmodels.tsa.ar_model import AR
from statsmodels.tsa.ar_model import AutoReg, ar_select_order

def arx_seasonal(lags ,seasonal=True, period = 7 ,plot_ = False, method = "ARX"):


    model = AutoReg(train[time_dependent_variable], lags=lags,exog = exog_train, trend= trend, seasonal=seasonal, period =period)
    
    fit1 = model.fit()
    if plot_:
        diagnostic_plot(fit1, lags= 30)
    
    y_hat[method] = fit1.predict(start=len(train), end=len(train)+len(test)-1, dynamic=False, exog_oos= exog_test)
    
     
    return metrics_(method)
arx_seasonal(lags= 7)

In [None]:
def arx_best_params(p_values=range(28)):
    best_score, best_cfg = float("inf"), None
    for p in p_values:
        order = (p,)
        try:
            metric = arx(*order) # you can also choose arx_seasonal() here
            if metric < best_score:
                best_score, best_cfg = metric, order
            print('ARX%s metric=%.3f' % (order,metric))
        except:
            continue
    print('Best ARX%s metric=%.3f' % (best_cfg, best_score))
    return best_cfg
    
best_cfg = arx_best_params()

In [None]:
arx(*best_cfg, plot_ = True)

In [None]:
plot(method= 'ARX' )

# ARX with ar_select_order

https://www.statsmodels.org/stable/examples/notebooks/generated/autoregressions.html

This cell sets the plotting style, registers pandas date converters for matplotlib
Also, we set the frequency for the time series to “D” (daily) to avoid warnings when using AutoReg.

In [None]:
pd.plotting.register_matplotlib_converters()


In [None]:
def autoreg_select_order(plot_= False, method= "AutoReg_extra_reggressors"):
    sel = ar_select_order(train[time_dependent_variable],     maxlag= 13, exog= exog_train,
        ic='aic',
        glob=False,
        trend='ct',
        seasonal=False, old_names=False)
    
    res = sel.model.fit()
    if plot_:
        diagnostic_plot(res)
    y_hat[method]= res.predict(start=len(train), end=len(train)+len(test)-1, exog_oos= exog_test, dynamic=False)
autoreg_select_order(method= "AutoReg_extra_reggressors")

In [None]:
plot(method= "AutoReg_extra_reggressors")

# ARX with pmdarima library

In [None]:
def auto_arx(max_p = 7, plot_ = False, method = "Auto_ARX"):
    model = pmd.auto_arima(train[time_dependent_variable], 
                          start_p=1, 
                          start_q=0,
                          d = 0,
                           max_p = max_p, max_q=0,
                           X= exog_train,
                           trend = trend, seasonal=False,
                           trace=True,error_action='ignore',
                          suppress_warnings=True,stepwise=True,
                           n_jobs= -1)

    
    


     
    
    y_hat.loc[:,method] = model.predict(len(test),X= exog_test)

     
     
     
    if plot_:
        diagnostic_plot(model)
    

     
    print(metrics_(method))
    
auto_arx()

In [None]:
plot(method = "Auto_ARX")

# MAX

In [None]:
def ma(MA, summary= False, method = "MAX"):
    global y_hat
    model = ARMA(train[time_dependent_variable], order=(0, MA), exog= exog_train)
    
    fit1 = model.fit(disp=False)
    
    if summary: print(fit1.summary())
        
    y_hat[method] = fit1.predict(start=len(train), end=len(train)+len(test)-1, exog= exog_test, dynamic=False)
     
    return metrics_(method)
    
ma(MA = 5, method = "MAX")



In [None]:
def ma_best_params(q_values=range(7)):
    
    best_score, best_cfg = float("inf"), None
 

    for q in q_values:
        order = (q, )
        try:
            metric = ma(*order)
            if metric < best_score:
                best_score, best_cfg = metric, order
            print('MAX%s metric=%.3f' % (order,metric))
        except:
            continue
    print('Best MAX%s metric=%.3f' % (best_cfg, best_score))
    return best_cfg
    
best_cfg = ma_best_params()

In [None]:
ma(*best_cfg, summary = True)

In [None]:
plot(method = "MAX")

# MAX with pmdarima library

In [None]:
def auto_ma(max_q = 40, summary = False, method = "Auto_MAX"):
    model = pmd.auto_arima(train[time_dependent_variable], 
                          start_p=0, 
                          start_q=7,
                          d = 0,
                           X= exog_train,
                           max_p = 0, max_q=max_q, seasonal=False,trend = trend,
                            trace=True,error_action='ignore',
                          suppress_warnings=True,stepwise=True, 
                          n_jobs= -1)

    
    
     
     
    if summary: print(model.summary())
     
    
    y_hat.loc[:,method] = model.predict(len(test), X= exog_test)

    print(metrics_(method))
    
auto_ma()

In [None]:
plot(method = "Auto_MAX")

# ARMAX

In [None]:
def arma(AR,MA, summary= False, method = "ARMAX"):
    global y_hat
    model = ARMA(train[time_dependent_variable], order=(AR,MA))
    
    fit1 = model.fit(disp=False)
    if summary:
        print(fit1.summary())
    
    y_hat[method] = fit1.predict(start=len(train), end=len(train)+len(test)-1, dynamic=False)
     
    return metrics_(method)
    
arma(AR= 1,MA = 1, method = "ARMAX")

In [None]:
def arma_best_params(p_values=range(7), q_values=range(7)):
    best_score, best_cfg = float("inf"), None
    for p in p_values:
        for q in q_values:
            order = (p, q)
            try:
                metric = arma(*order)
                if metric < best_score:
                    best_score, best_cfg = metric, order
                print('ARMAX%s metric=%.3f' % (order,metric))
            except:
                continue
    print('Best ARMAX%s metric=%.3f' % (best_cfg, best_score))
    return best_cfg
    
best_cfg = arma_best_params()

In [None]:
arma(*best_cfg, summary= True)

In [None]:
plot(method = "ARMAX")

# ARMAX with pmdarima

In [None]:
def auto_arma(max_p = 7, max_q= 7, summary = False, method = "Auto_ARMAX"):
    model = pmd.auto_arima(train[time_dependent_variable], 
                          start_p=7, 
                          start_q=1,
                           d = 0,
                           max_p = max_p, max_q = max_q,
                           seasonal=False, trend = trend,
                           trace=True,error_action='ignore',
                          suppress_warnings=True,stepwise=True,
                          n_jobs= -1)

    
    
     
     
    if summary: print(model.summary())
     
    
    y_hat.loc[:,method] = model.predict(len(test))

     
    print(metrics_(method))
    
auto_arma()

In [None]:
plot(method = "Auto_ARMAX")

# ARIMAX

In [None]:
def arima(p,d,q,summary= False,  method = 'ARIMAX'):

    
    model = ARIMA(train[time_dependent_variable], exog=exog_train, order=(p,d,q))
    fit1 = model.fit()
    if summary:
        print(fit1.summary())
    y_hat[method] = fit1.predict(start=len(train), end=len(train)+len(test)-1, exog=exog_test, dynamic=False, typ='levels')
   
    return metrics_(method)

arima(1, 1, 1)

In [None]:
def arima_best_params(p_values=range(7), d_values=range(2), q_values=range(7)):
    best_score, best_cfg = float("inf"), None
 
    for p in p_values:
        for d in d_values:
            for q in q_values:
                order = (p, d, q)
                try:
                    metric = arima(*order)
                    if metric < best_score:
                        best_score, best_cfg = metric, order
                    print('ARIMA%s metric=%.3f' % (order,metric))
                except:
                    continue
    print('Best ARIMA%s metric=%.3f' % (best_cfg, best_score))
    return best_cfg
    
best_cfg = arima_best_params()

In [None]:
arima(*best_cfg, summary= True)

In [None]:
plot(method= 'ARIMAX' )

# ARIMAX With PMDARIMA LIBRARY

• Pmdarima (for py + arima) is a statistical library designed to fill the void in Python’s time-series analysis capabilities, which is the equivalent of R’s auto.arima

In [None]:
import pmdarima as pmd

def arimamodel():
    autoarima_model = pmd.auto_arima(train[time_dependent_variable], 
                              start_p=1, 
                              start_q=1,
                               max_p=7, max_q=7, 
                              seasonal=False, trend = trend,
                               d=None, trace=True,error_action='ignore',
                              suppress_warnings=True,stepwise=True)
    return autoarima_model


In [None]:
def auto_arima(max_p= 7, max_q=7,summary = False, method = "Auto_ARIMAX"):
    model = pmd.auto_arima(train[time_dependent_variable], 
                          start_p=1, 
                          start_q=1,
                           max_p=max_p, max_q=max_q,
                           X= exog_train,
                           seasonal=False, 
                           d=None, trace=True,error_action='ignore',
                          suppress_warnings=True,stepwise=True,
                           n_jobs= -1)

    
    
     
     
    if summary: print(model.summary())
     
    
    y_hat.loc[:,method] = model.predict(len(test), X= exog_test)

    print(metrics_(method))
    
auto_arima()

In [None]:
plot(method=  "Auto_ARIMAX")

# SARIMAX

m refers to the number of periods in each season.

• 7 → Daily

• 12 → Monthly

• 52 → Weekly

• 4 → Quarterly

• 1 → Annual (non-seasonal)

In [None]:
from statsmodels.tsa.statespace.sarimax import SARIMAX
def sarima(p,d,q, P,D,Q,M, plot_ = False, method = 'SARIMAX' ):


    
    model = SARIMAX(train[time_dependent_variable], exog=exog_train, order=(p,d,q),  seasonal_order=(P,D,Q,M))
    fit1 = model.fit()
    if plot_:
        diagnostic_plot(fit1)
    
    y_hat[method] = fit1.predict(start=len(train), end=len(train)+len(test)-1,exog= exog_test, dynamic=False, typ='levels',start_params=[0, 0, 0, 0, 0, 1])
    
    
    return metrics_(method)
sarima(1, 1, 1, 1, 1, 3, 7)

In [None]:
def sarima_best_params(p_values=range(4), d_values=range(2), q_values=range(4), P_values=range(4), D_values=range(2), Q_values=range(4)):
    best_score, best_cfg = float("inf"), None
    for M in [1, 4,7,12,52]:
        for p in p_values:
            for d in d_values:
                for q in q_values:
                        for P in P_values:
                            for D in D_values:
                                for Q in Q_values:
                                    order = (p,d,q,P,D,Q, M)
                                    try:
                                        metric = sarima(*order)
                                        if metric < best_score:
                                            best_score, best_cfg = metric, order
                                        print('SARIMAX%s metric=%.3f' % (order,metric))
                                    except:
                                        continue
    print('Best SARIMAX%s metric=%.3f' % (best_cfg, best_score))
    return best_cfg 
    
best_cfg = sarima_best_params()

In [None]:
print(best_cfg)
sarima(*best_cfg, plot_ = True)

In [None]:
plot(method = "SARIMAX")

# Seasonal ARIMAX using Pmdarima Library

In [None]:
def auto_sarima(m, max_p= 3, max_q= 3, max_P=3 , max_Q=3  ,summary = False, method = "Auto_SARIMAX"):
    model = pmd.auto_arima(train[time_dependent_variable], 
                               start_p=1, start_q=1,
                                max_p=max_p, max_q=max_q, seasonal=True, start_P=1,
                                start_Q=1, max_P=max_P, max_D=7, max_Q=max_Q, m=m,
                                X = exog_train,
                                d=None, D=None, trace=True, error_action='ignore', 
                                suppress_warnings=True,
                                stepwise=True,
                                n_jobs= -1)

    
    
     
     
    
    if summary: print(model.summary())
     
    
    y_hat.loc[:,method] = model.predict(len(test), X = exog_test)

    return metrics_(method)
auto_sarima(m = 12)

Choosing the best m value 

In [None]:
def auto_sarima_best_seasonal_params():
    best_score, best_cfg = float("inf"), None
    for m in [1, 4,7,12,52]:
        print("="*100)
        print(f' Fitting SARIMAX for Seasonal value m = {str(m)}')
        order = m
        try:
            metric = auto_sarima(m= m)
            if metric < best_score:
                best_score, best_cfg = metric, order
            print('SARIMAX%s metric=%.3f' % (order,metric))
        except:
            continue
    print('Best SARIMAX%s metric=%.3f' % (best_cfg, best_score))
    return best_cfg
    
best_cfg = auto_sarima_best_seasonal_params()


In [None]:
auto_sarima(m= best_cfg, summary = True)

In [None]:
plot(method=  "Auto_SARIMAX")