<h1>SARIMA hyper parameter tunning with Grid Search</h1>

In [1]:
#!pip install joblib
#https://machinelearningmastery.com/how-to-grid-search-sarima-model-hyperparameters-for-time-series-forecasting-in-python/?utm_campaign=News&utm_medium=Community&utm_source=DataCamp.com
# grid search sarima hyperparameters
from math import sqrt
from multiprocessing import cpu_count
from joblib import Parallel
from joblib import delayed
from warnings import catch_warnings
from warnings import filterwarnings
from statsmodels.tsa.statespace.sarimax import SARIMAX
from sklearn.metrics import mean_squared_error
import pandas as pd
from dateutil.relativedelta import relativedelta
import time

In [2]:
# one-step sarima forecast
def sarima_forecast(history, config):
    order, sorder, trend = config
    # define model
    model = SARIMAX(history, order=order, seasonal_order=sorder, trend=trend, enforce_stationarity=False, enforce_invertibility=False)
    # fit model
    model_fit = model.fit(disp=False)
    # make one step forecast
    yhat = model_fit.predict(len(history), len(history))
    return yhat[0]

# root mean squared error or rmse
def measure_rmse(actual, predicted):
    return sqrt(mean_squared_error(actual, predicted))

# split a univariate dataset into train/test sets
def train_test_split(data, n_test):
    return data[:-n_test], data[-n_test:]

# walk-forward validation for univariate data
def walk_forward_validation(data, n_test, cfg):
    predictions = list()
    # split dataset
    train, test = train_test_split(data, n_test)
    # seed history with training dataset
    history = [x for x in train]
    # step over each time-step in the test set
    for i in range(len(test)):
        # fit model and make forecast for history
        yhat = sarima_forecast(history, cfg)
        # store forecast in list of predictions
        predictions.append(yhat)
        # add actual observation to history for the next loop
        history.append(test[i])
    # estimate prediction error
    error = measure_rmse(test, predictions)
    return error

# score a model, return None on failure
def score_model(data, n_test, cfg, debug=False):
    result = None
    # convert config to a key
    key = str(cfg)
    # show all warnings and fail on exception if debugging
    if debug:
        result = walk_forward_validation(data, n_test, cfg)
    else:
        # one failure during model validation suggests an unstable config
        try:
            # never show warnings when grid searching, too noisy
            with catch_warnings():
                filterwarnings("ignore")
                result = walk_forward_validation(data, n_test, cfg)
        except:
            error = None
    # check for an interesting result
    #if result is not None:
        #print(' > Model[%s] %.3f' % (key, result))
    return (key, result,cfg)

# grid search configs
def grid_search(data, cfg_list, n_test, parallel=True):
    scores = None
    if parallel:
        # execute configs in parallel
        executor = Parallel(n_jobs=cpu_count(), backend='multiprocessing')
        tasks = (delayed(score_model)(data, n_test, cfg) for cfg in cfg_list)
        scores = executor(tasks)
    else:
        scores = [score_model(data, n_test, cfg) for cfg in cfg_list]
    # remove empty results
    scores = [r for r in scores if r[1] != None]
    # sort configs by error, asc
    scores.sort(key=lambda tup: tup[1])
    return scores

In [3]:
# create a set of sarima configs to try
def sarima_configs(seasonal=[0]):
    models = list()
    # define config lists
    p_params = [0, 1, 2, 3]
    d_params = [0, 1]
    q_params = [0, 1, 2, 3]
    t_params = ['n','c','t','ct']
    P_params = [0, 1, 2, 3]
    D_params = [0, 1]
    Q_params = [0, 1, 2, 3]
    m_params = seasonal
    # create config instances
    for p in p_params:
        for d in d_params:
            for q in q_params:
                for t in t_params:
                    for P in P_params:
                        for D in D_params:
                            for Q in Q_params:
                                for m in m_params:
                                    cfg = [(p,d,q), (P,D,Q,m), t]
                                    models.append(cfg)
    return models

In [4]:
final_df = None
final_errores = None

#DATA: log10 was applied to original data
data = pd.read_csv("LEO_mfp_log10_data.csv")
my_index = data.ID.unique()

for current_index in my_index:
    current_serie = data["ID"] == current_index
    serie = data[current_serie]
    
    timeserie = serie.y.to_list()
    #time axis
    lista_fechas = pd.to_datetime(serie.ds,infer_datetime_format=True)
    max_fecha_fact = pd.Timestamp.date(max(lista_fechas))
    # data split
    n_test = 12
    # model configs
    cfg_list = sarima_configs()
    # grid search
    tic = time.perf_counter()
    scores = grid_search(timeserie, cfg_list, n_test)
    toc = time.perf_counter()
    print("Tiempo:")
    print(toc - tic)
    # list top 3 configs
    #for cfg, error, other in scores[:3]:
        #print(cfg, error)
        
    #Best model top 1
    my_model = scores[:1]
    my_order = my_model[0][2][0]
    my_seasonal_order = my_model[0][2][1]
    my_trend = my_model[0][2][2]
    my_mse = my_model[0][1]    
    
    #Best parameters
    my_model = SARIMAX(timeserie, order=my_order , seasonal_order=my_seasonal_order,trend=my_trend)
    model_fit = my_model.fit(disp=False)
    
    forecast_months = 12
    yhat = model_fit.get_forecast(forecast_months)
    my_forecast = yhat.predicted_mean
    
    #Future dates for the forecast
    futuro_fechas = []
    for i in range(forecast_months):     
        fecha = max_fecha_fact + relativedelta(months=i + 1)
        futuro_fechas.append(fecha.strftime('%Y-%m-%d')) 
        
    #forecast pow 10: log10 was applied to original data
    forecast_pow_10 = []
    for elem in my_forecast:    
        forecast_pow_10.append(round(pow(10,elem),4))   
        
    #intervals
    forecast_lower = []
    forecast_upper = []
    for elem in yhat.conf_int():
        forecast_lower.append(round(pow(10,elem[0]),4))
        forecast_upper.append(round(pow(10,elem[1]),4))
        
    #set final dataframe
    d = {'ID':current_index, 'ds':futuro_fechas, 'yhat':forecast_pow_10, 'yhat_lower': forecast_lower, 'yhat_upper' : forecast_upper}
    my_result = pd.DataFrame(data=d)  
    
    #MSE errors
    e = {'ID':[current_index], 'MSE': [my_mse]}
    my_error_result = pd.DataFrame(data=e)
        
    if final_df is None:        
        final_df = my_result
        final_errores = my_error_result 
    else:        
        final_df = final_df.append(my_result, ignore_index = True)
        final_errores = final_errores.append(my_error_result, ignore_index = True)

In [5]:
final_df.to_csv('gridsearch.csv',index=False)
final_errores.to_csv('gridsearch_mse.csv',index=False)