In [4]:
import os, sys, importlib

In [None]:
sys.path.append("../Functions")

In [None]:
from preprocessing_functions import timeseries_type

In [None]:
import numpy as np
import math
import pandas as pd
from IPython.display import clear_output

In [5]:
def rolling_forecast_holt_winters(
    time_series, window_end_date, model_params, nsteps, window_length=None
):

    """
    Takes a time series and computes forecasts on a rolling basis.

    Forecasts are based on a holt winters model.

    INPUTS:
    ----------------
        time_series (pandas.core.frame.DataFrame) : A time series

        window_end_date (string) : A date string of the form 'yyyy-mm-dd'. This gives the model the latets piece of data it may
        use to make a forecast.

        model_params (list) : A list of the holt winters parameters [trend, seasonal, seasonal_periods]. Trend can be one of
        {“add”, “mul”, “additive”, “multiplicative”, None}. Seasonal can be one of {“add”, “mul”, “additive”, “multiplicative”, None}.
        Seasonal_periods is the interval length required to capture seasonality and is an integer.

        nsteps (int) : The number of steps out to forecast

        window_length (int) : If specified, will make sure the rolling window the forecasts are based on is of this length. If
        not specified will just start from the first date in the time series

    RETURNS:
    ----------------
        (tuple): A tuple of the form (a, b, c)

        a (pandas.core.frame.DataFrame) : The original time series now with an additional column of fitted values

        b (pandas.core.frame.DataFrame) : A timeseries containing the final set of forecasts made

        c (string) : A string which contains the title of the model. This includes some details of the set parameters

    """

    ts_dummy = time_series.copy()
    date_col = ts_dummy.columns.values.tolist()[0]
    series_col = ts_dummy.columns.values.tolist()[1]
    final_index = ts_dummy[ts_dummy[date_col] == window_end_date].index[0]
    final_posn = final_index + 1

    # deduce series type
    seasonal_lag, offset, freq = timeseries_type(time_series)

    if window_length == None:
        start_index = 0
        length_text = ""
        start_index_increment = 0
    else:
        start_index = final_index - (window_length - 1)
        length_text = "_wl_" + str(window_length)
        start_index_increment = nsteps

    prediction_column = "holt_winters" + length_text + "_nsteps_" + str(nsteps)

    ts_dummy[prediction_column] = np.nan
    column_number = ts_dummy.columns.get_loc(prediction_column)
    trend = model_params[0]
    seasonal = model_params[1]
    seasonal_periods = model_params[2]

    latest_training_datapoint = ts_dummy[date_col].iloc[final_index]
    loops_required = math.floor(1 + (len(ts_dummy) - final_posn) / nsteps)
    loop = 0

    while final_posn <= len(ts_dummy):

        rolling_window = ts_dummy.iloc[start_index : final_index + 1]
        latest_training_datapoint = ts_dummy[date_col].iloc[final_index]

        model = ExponentialSmoothing(
            rolling_window[series_col],
            trend=trend,
            seasonal=seasonal,
            seasonal_periods=seasonal_periods,
        )
        fitted_model = model.fit()
        prediction = fitted_model.forecast(nsteps)

        for i in range(0, min(nsteps, len(ts_dummy) - final_posn)):
            ts_dummy.iloc[final_index + i + 1, column_number] = prediction[
                final_index + i + 1
            ]

        start_index = start_index + start_index_increment
        final_index = final_index + nsteps
        final_posn = final_index + 1

        loop = loop + 1
        clear_output()
        print(str(round(100 * loop / loops_required)) + " % done")

    forecast_column = prediction_column + "_forecasts"
    first_forecast_date = ts_dummy[date_col].iloc[-1] + offset
    forecasts = pd.DataFrame(
        {
            "End date": pd.date_range(
                start=first_forecast_date, periods=nsteps, freq=freq
            ),
            forecast_column: np.nan,
        }
    )
    for i in range(nsteps):
        forecasts[forecast_column].iloc[i] = prediction.iloc[i]

    return (ts_dummy, forecasts, prediction_column)

In [1]:
# write all the above code to a py file but not this particular cell of code.

!jupyter nbconvert --to script holt_winters.ipynb
with open("holt_winters.py", "r") as f:
    lines = f.readlines()
with open("holt_winters.py", "w") as f:
    for line in lines:
        if "nbconvert --to script" in line:
            break
        else:
            f.write(line)

[NbConvertApp] Converting notebook holt_winters.ipynb to script
[NbConvertApp] Writing 3237 bytes to holt_winters.py
