# SARIMA forecast models

In [None]:
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt
from statsmodels.graphics.tsaplots import plot_acf
from statsmodels.tsa.arima.model import ARIMA
import os
# %matplotlib ipympl

## Preliminaries
### Load data into numpy arrays

In [None]:
path = os.getcwd()
path = os.path.join(path, "priceData2019To2024.csv")
price_df = pd.read_csv(path, sep=";")
price_df.columns = price_df.columns.str.strip()
price_df["time"] = pd.to_datetime(price_df["Datum"] + " " + price_df["von"], format="%d.%m.%Y %H:%M")
start_time = price_df["time"].min()
price_df["hours"] = (price_df["time"] - start_time).dt.total_seconds() / 60 / 60  # time in hours since beginning of file
price_df["price"] = price_df["Spotmarktpreis in ct/kWh"]
valid_rows = price_df[np.isfinite(price_df["price"])]
TIME = np.array(valid_rows["hours"].tolist())
PRICE = np.array(valid_rows["price"].tolist())

### Split into training and test data

In [None]:
SAMPLES_PER_DAY = 24
PREDICTION_HORIZON = 24

# Test data
nTestSamples = int(366 * SAMPLES_PER_DAY)  # 2024 was a leap year
testData = PRICE[-nTestSamples:]
testTime = TIME[-nTestSamples:]

# Training data
nTrainingSamples = PRICE.size - nTestSamples
trainingData = PRICE[:nTrainingSamples]
trainingTime = TIME[:nTrainingSamples]  # 1 hour sampling time

### Plot data

In [None]:
figure = plt.figure()
plt.plot(trainingTime, trainingData, label="Training data")
plt.plot(testTime, testData, label="Test data")
plt.xlabel("Time [hours]")
plt.ylabel("Price [cts/kWh]")
plt.legend()

### Plot autocorrelation

In [None]:
acf_size = SAMPLES_PER_DAY * 10
plot_acf(trainingData, lags=np.arange(acf_size))
pass  # this line stops duplicated plots in output


### Define function for plotting forecast results

In [None]:
def fitAndPlotForecast(
    trainingData,
    trainingTime,
    testData,
    testTime,
    arimaModel,
    title="Forecast",
    ylabel="Price [cts/kWh]",
):
    n_hist = SAMPLES_PER_DAY * 10
    n_pred = PREDICTION_HORIZON * 2

    trainedArimaModel = arimaModel.fit(method="innovations_mle")
    forecast = trainedArimaModel.get_forecast(n_pred)

    pastTime = trainingTime[-n_hist:]
    pastData = trainingData[-n_hist:]
    futureTime = testTime[0:n_pred]
    futureData = testData[0:n_pred]

    figure = plt.figure()
    plt.title(title)
    plt.plot(pastTime, pastData, label="Known past")
    plt.plot(futureTime, forecast.predicted_mean, label="Nominal forecast")
    plt.plot(futureTime, futureData, label="Unknown future")
    plt.legend()
    plt.xlabel("Time [hours]")
    plt.ylabel(ylabel)

    print(trainedArimaModel.summary())

    return trainedArimaModel

# Forecasting

## Persistence forecast

In [None]:
arima010 = ARIMA(endog=trainingData, order=(0, 1, 0), trend="n")
trainedArima010 = fitAndPlotForecast(
    trainingData,
    trainingTime,
    testData,
    testTime,
    arima010,
    title="Persistence forecast",
)

In [None]:
plot_acf(trainedArima010.resid, lags=np.arange(acf_size), title="Persistence model")
pass  # this line stops duplicated plots in output

## Seasonal persistence forecast

In [None]:
s = 24  # set the seasonal period

### ARIMA(0, 0, 0)(0, 1, 0)<sub>s</sub> without trend

In [None]:
arima000010s = ARIMA(
    endog=trainingData, order=(0, 0, 0), seasonal_order=(0, 1, 0, s), trend="n"
)
trainedArima000010s = fitAndPlotForecast(
    trainingData,
    trainingTime,
    testData,
    testTime,
    arima000010s,
    title=f"ARIMA$(0, 0, 0)(0, 1, 0)_{{{s}}}$",
)

In [None]:
plot_acf(trainedArima000010s.resid, lags=np.arange(acf_size), title="Seasonal persistence model without trend")
pass  # this line stops duplicated plots in output

### ARIMA(1, 0, 0)(1, 1, 1)<sub>s</sub> model

In [None]:
arima100111s = ARIMA(
    endog=trainingData, order=(1, 0, 0), seasonal_order=(1, 1, 1, s), trend="t"
)
trainedArima100111s = fitAndPlotForecast(
    trainingData,
    trainingTime,
    testData,
    testTime,
    arima100111s,
    title=f"ARIMA$(1, 0, 0)(1, 1, 1)_{{{s}}}$",
)

In [None]:
plot_acf(trainedArima100111s.resid, lags=np.arange(acf_size))
pass  # this line stops duplicated plots in output

### ARIMA(0, 0, 0)(3, 0, 0)<sub>s</sub> with trend and exogenous inputs

In [None]:
def create_lagged_features(y, lags):
    x = np.column_stack([np.roll(y, lag) for lag in lags])
    x[:max(lags), :] = np.nan  # Set NaNs for the first rows
    return x


# Define lags
lags = [s, s * 7]  # Use only these specific lags

# Generate lagged features
x_lagged = create_lagged_features(trainingData, lags)

# Remove NaN rows caused by shifting
valid_idx = max(lags)
y_valid = trainingData[valid_idx:]
x_valid = x_lagged[valid_idx:, :]

# Define ARIMA model with exogenous regressors
arimaExog = ARIMA(endog=y_valid, order=(3, 0, 0), exog=x_valid, trend="t")
trainedArimaExog = arimaExog.fit()

In [None]:
plot_acf(trainedArimaExog.resid, lags=np.arange(acf_size), title="Seasonal persistence model with trend and exog inputs")
pass  # this line stops duplicated plots in output

### ARIMA(3, 0, 0)(1, 1, 1)<sub>s</sub> model

In [None]:
arima300111s = ARIMA(
    endog=trainingData, order=(3, 0, 0), seasonal_order=(1, 1, 1, s), trend="t"
)
trainedArima300111s = fitAndPlotForecast(
    trainingData,
    trainingTime,
    testData,
    testTime,
    arima300111s,
    title=f"ARIMA$(3, 0, 0)(1, 1, 1)_{{{s}}}$",
)

In [None]:
plot_acf(trainedArima300111s.resid, lags=np.arange(acf_size))
pass  # this line stops duplicated plots in output

# Out-of-sample analysis

### Define function for Prediction-Root-Mean-Square-Error (PRMSE)

In [None]:
def calculate_prmse(
        trainedModel, testData, nTestSamples, forecastHorizon, exog=False
    ):
    prmse = np.zeros(nTestSamples)
    
    for k in range(nTestSamples):
        # Select the exogenous inputs for forecasting
        exog_forecast = None
        if exog:
            t = k + nTrainingSamples
            lag_s = PRICE[t-s:t].reshape(-1, 1)
            lag_168 = PRICE[t-168:t-168+s].reshape(-1, 1)
            exog_forecast = np.column_stack([lag_s, lag_168])

        # Forecast using exogenous variables
        forecast = trainedModel.forecast(forecastHorizon, exog=exog_forecast)
        actual = testData[k: k + forecastHorizon]
        error = actual - forecast
        prmse[k] = np.sqrt(np.mean(error**2))

        # Extend model with new observation and corresponding exogenous inputs
        exog_update = None
        if exog:
            exog_update = exog_forecast[0, :].reshape(1, -1)
        trainedModel = trainedModel.extend(testData[k][None], exog=exog_update)

    return prmse


### Compute PRMSEs

In [None]:
nTest = nTestSamples - PREDICTION_HORIZON
prmseArima010 =     calculate_prmse(trainedArima010,     testData, nTest, PREDICTION_HORIZON)
prmseArima000010s = calculate_prmse(trainedArima000010s, testData, nTest, PREDICTION_HORIZON)
prmseArima100111s = calculate_prmse(trainedArima100111s, testData, nTest, PREDICTION_HORIZON)
prmseArimaExog =    calculate_prmse(trainedArimaExog,    testData, nTest, PREDICTION_HORIZON, exog=True)
prmseArima300111s = calculate_prmse(trainedArima300111s, testData, nTest, PREDICTION_HORIZON)

### Compare model box plots

In [None]:
prmse = [
    prmseArima010,
    prmseArima000010s,
    prmseArima100111s,
    prmseArimaExog,
    prmseArima300111s,
]
fig, ax = plt.subplots()
bp = ax.boxplot(prmse, vert=False, medianprops=dict(color="firebrick"))
plt.yticks(
    [1, 2, 3, 4, 5],
    [
        "$(010)$",
        f"$(000)(010)_{{{s}}}$",
        f"$(100)(111)_{{{s}}}$",
        f"$(300)$ exog",
        f"$(300)(111)_{{{s}}}$",
    ],
)
plt.yticks(rotation=60)
plt.xlabel("PRMSE [pu]")
mean = np.mean(prmse, axis=1)
for i, line in enumerate(bp["medians"]):
    x, y = line.get_xydata()[1]
    text = "μ={:.4f}".format(mean[i])
    ax.annotate(text, xy=(x - 0.02, y + 0.07), color="firebrick")

# Generate forecast and save to csv

In [None]:
def save_forecast(trainedModel, testData, nTest, nTraining, horizon, filename):
    with open(filename, "w") as f:
        f.write("dateAndTime;price [ct/kWh]\n")
    with open(filename, "a") as f:
        for k in range(nTest):
            forecast = trainedModel.forecast(horizon)
            timestamp = price_df["time"][nTraining + k]
            f.write(f"{timestamp};{forecast[0]:.4f}\n")
            trainedModel = trainedModel.extend(testData[k][None])


path = os.path.abspath(os.path.join(os.getcwd(), os.pardir))
path = os.path.join(path, "priceForecast.csv")
save_forecast(trainedArima300111s, testData, nTestSamples, nTrainingSamples, PREDICTION_HORIZON, path)