In [None]:
#Import Required Libraries
import pandas as pd
import numpy as np
import math
from itertools import combinations
from statsmodels.tsa.statespace.sarimax import SARIMAX
from metrics import *
from typing import Callable
from sklearn.preprocessing import StandardScaler
import matplotlib.pyplot as plt
import warnings
warnings.filterwarnings('ignore')

In [None]:
def transf(x: int)-> int:
    """ 
    This function performs normalization on the data
    
    Arguments
    ----------
    x: int
        the integer data 

    Returned Values
    ----------
    x: int
        the normalized data
        
    """ 
    if (x != 0 and x != np.nan):
        return np.log(x)
    if (x == 0 and x != np.nan): #checks for zeros
        return 0
    if x == np.nan: #checks for nan values
        return np.nan
      
def itransf(x: int)-> int:
    """ 
    This function performs inverse normalization on the data
    
    Arguments
    ----------
    x: int
        the normalized data

    Returned Values
    ----------
    x: int
        original data (unnormalized data)
        
    """ 
    return np.exp(x)

In [None]:
def forecast_SARIMAX(window: int, n_train: int, p: int, d: int, q: int, ps: int, ds: int, qs: int, m: int, 
                     x: list[str], y: str, h_max: int, transf: Callable[[float], float], itransf: Callable[[float], float],file_name:str, lag:int )-> pd.DataFrame:
    
    """
    This function uses the SARIMAX model from statsmodels. It predicts 'y' by using 'x' as exogenous variables. 
    Each exogenous variable is shifted by lags. It trains the model once and forecasts 'h_max' time steps in the future and shifts 
    the 'window' by 1 and then retrains. This model returns the forecast for each window.

    Arguments
    ----------
    window : int
        window number
    n_train : int
        the size of training set
    p : int
        the value for auto regression order
    d : int
        the value for differencing 
    q : int
        the value for moving average order
    ps : int
        the value for seasonal auto regression order
    ds : int
        the value for seasonal differencing 
    qs : int
        the value for seasonal moving average order
    m : int
        the number of time steps for a single seasonal period
    x : list[str]
        the list of exogenous variables
    y : str
        the forecasted variable
    h_max : int
        the maximum number of forecasted horizons
    transf: Callable[[float], float]
        the definition for transformation
    itransf: Callable[[float], float]
        the definition for inverse transformation

    Returned Values
    ----------
    forecast_f : pd.DataFrame
      DataFrame containing the forecasted variable for 'h_max' horizons.

    """

    df_US = pd.read_csv(file_name)
    df_US.index = pd.to_datetime(df_US['date'])
    date = df_US['date']
    date_list = date.tolist()
    df_US = df_US.drop(columns=['date'])
    
    #Apply transformation
    df_US_transf = pd.DataFrame()
    for col in df_US.columns:
        df_US_transf[col] = df_US[col].apply(lambda x: transf(x))

    df_y = df_US_transf[y]
    df_x = df_US_transf[x]
  
    x_lagged_variables = df_x.shift(lag).bfill()
    exog_variables = x_lagged_variables[x]

    df_window = df_y[int(window-1):int(window-1+n_train+h_max)] #window for training

    i = h_max-1

    col_train = df_y.loc[df_window.index][0:n_train]
    exo_train = exog_variables.loc[df_window.index][0:n_train]
    exo_test = exog_variables.loc[df_window.index][n_train:]

    #Model definition for prediction
    model = SARIMAX(col_train, exog=exo_train, order=(p,d,q), seasonal_order=(ps,ds,qs,m), enforce_stationarity=False, enforce_invertibility=False)
    model_fit = model.fit(disp=False)
    forecast = model_fit.predict(len(col_train), len(col_train)+i, exog=exo_test)
    forecast_f = forecast.to_frame()

    df_forecast = forecast_f.rename(columns= {'predicted_mean': y})

    return df_forecast

In [None]:
def window_results(n_train: int, n_test: int, p: int, d: int, q: int, ps: int, ds: int, qs: int, m: int, 
                      x: list[str], y: str, h_max: int, transf ,itransf,file_name,lag:int)-> pd.DataFrame:
     
    """
    A function that returns the weekly prediction for each window. 

    Arguments
    ----------
    n_train : int
        the size of training set
    n_test : int
        the size of testing set
    p : int
        the value for auto regression order
    q : int
        the value for moving average order
    ps : int
        the value for seasonal auto regression order
    qs : int
        the value for seasonal moving average order
    w : int
        the number of time steps for a single seasonal period
    d : int
        the value for differencing 
    ds : int
        the value for seasonal differencing 
    exo : list[str]
        the list of exogenous variables
        
    Returned Values
    ----------
    temp_results : pd.DataFrame
        A DataFrame containing weekly predictions for each window

    """  
    temp_results = pd.DataFrame()
    for i in range(n_test):
        window = i + 1
        predict_current = forecast_SARIMAX(window, n_train, p, d, q, ps, ds, qs, m, x, y, h_max, transf, itransf,file_name,lag)
        predict_current = predict_current.rename(columns={str(y):"window_"+str(window)})
        temp_results = pd.concat([temp_results, predict_current], axis=1)
    return temp_results

In [None]:
def output_h(temp_results: pd.DataFrame, h: int, n_test:int)-> pd.DataFrame:
    """
    This function processes the 'temp_results' DataFrame, which contains predictions for 'h' weeks ahead for each window, and organizes the data into a single column for each horizon.

    Arguments
    ----------
    temp_results : pd.DataFrame
        the dataframe that has h weeks ahead prediction for each window
    h : int
        horizon value
    n_test : int
        the size of testing set

    Returned Values
    ----------
    df_results_python : pd.DataFrame
        Dataframe for each horizon.  

    """ 
    n_iter = n_test-h+1
    df_results = []
    if h==1:
        for i in range(int(n_iter)):
          window = i + 1
          current_results = temp_results[['window_'+str(window)]].dropna()
          current_results = current_results.rename(columns={'window_'+str(window): "h="+str(h)})
          df_results.append(pd.DataFrame(current_results.iloc[0]).transpose())
    if h>1:
        for i in range(int(h-1)):
          window = i + 1
          current_results = temp_results[['window_'+str(window)]].dropna()
          current_results = current_results.rename(columns={'window_'+str(window): "h="+str(h)})
          df_results.append(pd.DataFrame(current_results.iloc[0]).transpose())

        for i in range(int(n_iter)):
          window = i + 1
          current_results = temp_results[['window_'+str(window)]].dropna()
          current_results = current_results.rename(columns={'window_'+str(window): "h="+str(h)})
          df_results.append(pd.DataFrame(current_results.iloc[int(h-1)]).transpose())
    df_results_python = pd.concat(df_results,axis=0)
    return df_results_python

In [None]:
def predict_table(n_test, temp_results,h)-> pd.DataFrame:
    """
    A function that iterates through each horizon 'h' and combines the predictions for each horizon into a final DataFrame that has 'h' columns for each week.

    Arguments
    ----------
    n_test : int
        the size of testing set
    temp_results : pd.DataFrame
        the dataframe that has h weeks ahead prediction for each window

    Returned Values
    ----------
    df_final_forecast : pd.DataFrame
        DataFrame presenting the h weeks ahead prediction for each week.

    """ 
    df_final_forecast = pd.DataFrame()
    for h in range(1,h+1):
        df_current = output_h(temp_results, h, n_test)
        df_final_forecast = pd.concat([df_final_forecast, df_current], axis=1)
    return df_final_forecast

In [None]:
def smape_results(df_final_forecast: pd.DataFrame ,y: str,file_name: str)-> pd.DataFrame:
    """ 
    This function calculates the sMAPE score using the sMAPE definition between the original data and the predictions for each horizon
    
    Arguments
    ----------
    df_final_forecast: pd.DataFrame
        the predicted DataFrame containing h weeks ahead prediction for each week
    p : int
        the value for auto regression order
    file_name : str
        filename to read the data

    Returned Values
    ----------
    df_results : pd.DataFrame
        DataFrame with SMAPE scores for each horizon and the average SMAPE score.
        
    """  
    df_US = pd.read_csv(file_name)
    df_US.index = pd.to_datetime(df_US['date'])
    df_US = df_US.drop(columns=['date'])
    df_pred = itransf(df_final_forecast)
    h_level = df_pred.columns.to_list()

    eval = []
    for cols in h_level:
        y_true = df_US.loc[df_pred.index][y]
        y_pred = df_pred[cols]
        # plt.figure(figsize=(10, 6))
        # plt.plot(df_US[y], label='True')
        # plt.plot(df_pred[cols], "-o", label='Predicted')
        # plt.xlabel('Date')
        # plt.ylabel('Number of Deaths')
        # plt.title('SMAPE')
        # plt.legend()
        # plt.show()
        value = smape(y_true, y_pred)
        eval.append(value)
    eval.append(np.mean(eval))
    df_results = pd.DataFrame(eval, columns=[str(y)])
    h_level.append('Average')
    df_results.index = h_level
    return df_results

In [None]:
def mae_results(df_final_forecast: pd.DataFrame ,y: str,file_name: str)-> pd.DataFrame:
    """ 
    This function calculates the MAE score using the MAE definition between the original data and the predictions for each horizon
    
    Arguments
    ----------
    df_final_forecast: pd.DataFrame
        the predicted DataFrame containing h weeks ahead prediction for each week
    p : int
        the value for auto regression order
    file_name : str
        filename to read the data

    Returned Values
    ----------
    df_results : pd.DataFrame
        DataFrame with MAE scores for each horizon and the average MAE score.
        
    """  
    df_US = pd.read_csv(file_name)
    df_US.index = pd.to_datetime(df_US['date'])
    df_US = df_US.drop(columns=['date'])

    df_pred = itransf(df_final_forecast)
   
    h_level = df_pred.columns.to_list()
    eval = []
    for cols in h_level:
        y_true = df_US.loc[df_pred.index][y]
        y_pred = df_pred[cols]
        # plt.figure(figsize=(10, 6))
        # plt.plot(df_US[y], label='True')
        # plt.plot(df_pred[cols], "-o", label='Predicted')
        # plt.xlabel('Date')
        # plt.ylabel('Number of Deaths')
        # plt.title('MAE')
        # plt.legend()
        # plt.show()
        value = mae(y_true, y_pred)
        eval.append(value)
    eval.append(np.mean(eval))
    df_results = pd.DataFrame(eval, columns=[str(y)])
    h_level.append('Average')
    df_results.index = h_level

    return df_results

In [None]:
def normmae_results(df_final_forecast: pd.DataFrame ,y: str,file_name: str)-> pd.DataFrame:
    """ 
    This function calculates the MAE score using the MAE definition between the normalized data and the predictions for each horizon
    
    Arguments
    ----------
    df_final_forecast: pd.DataFrame
        the predicted DataFrame containing h weeks ahead prediction for each week
    p : int
        the value for auto regression order
    file_name : str
        filename to read the data

    Returned Values
    ----------
    df_results : pd.DataFrame
        DataFrame with MAE scores for each horizon and the average MAE score.
        
    """  
    df_US = pd.read_csv(file_name)
    df_US.index = pd.to_datetime(df_US['date'])
    df_US = df_US.drop(columns=['date'])

    
    df_US_transf = pd.DataFrame()
    for col in df_US.columns:
        df_US_transf[col] = df_US[col].apply(lambda x: transf(x))

    h_level = df_final_forecast.columns.to_list()
    eval = []
    for cols in h_level:
        y_true = df_US_transf.loc[df_final_forecast.index][y]
        y_pred = df_final_forecast[cols]
        # plt.figure(figsize=(10, 6))
        # plt.plot(df_US_transf[y], label='True')
        # plt.plot(df_final_forecast[cols], "-o", label='Predicted')
        # plt.xlabel('Date')
        # plt.ylabel('Number of Deaths')
        # plt.title('normMAE')
        # plt.legend()
        # plt.show()
        value = mae(y_true, y_pred)
        eval.append(value)
    eval.append(np.mean(eval))
    df_results = pd.DataFrame(eval, columns=[str(y)])
    h_level.append('Average')
    df_results.index = h_level

    return df_results

In [None]:
#For reproducibility
file_name = 'data/covid_till14May22.csv'
y = 'new_deaths'
h_max = 6
train_ratio = 0.8
df_US = pd.read_csv(file_name)
n_train = round(len(df_US) * train_ratio)
n_test = len(df_US) - n_train-h_max
m = 10
d = 1
ds = 0
x = ['icu_patients','hosp_patients','new_tests','people_vaccinated', 'people_fully_vaccinated','new_cases']
lag = 15
for p in range(1,2):
    for q in range(3,4):
        for ps in range(5,6):
            for qs in range(0,1):
                print(p,d,q,ps,ds,qs)
                temp_results = window_results(n_train, n_test, p, d, q, ps, ds, qs, m, x, y, h_max, transf,itransf,file_name,lag)
                df_final_forecast = predict_table(n_test, temp_results,h_max)
                df_smape_results = smape_results(df_final_forecast, y,file_name)
                print("sMAPE {}".format(df_smape_results))
                df_mae_results = mae_results(df_final_forecast, y,file_name)
                print("realMAE {}".format(df_mae_results))
                df_normmae_results = normmae_results(df_final_forecast, y,file_name)
                print("normMAE {}".format(df_normmae_results))