# Jupyter Notebook for Task 4: Modeling



## Preperation
Before we start modeling, we have to load the packages and the data, and prepare it so it can be processed by the models.

In [None]:
# Import standard libraries
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Polynomial regression
from sklearn.preprocessing import PolynomialFeatures, StandardScaler
from sklearn.linear_model import Ridge
from sklearn.pipeline import Pipeline

# Neural network
from keras import Sequential  # Sequential model: https://keras.io/guides/sequential_model/
from keras.layers import Dense, Dropout, BatchNormalization

# Import custom functions and utilities
from sklearn.model_selection import train_test_split, GridSearchCV, KFold
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, make_scorer

# Time series analysis
from statsmodels.tsa.stattools import adfuller, acf, pacf
from statsmodels.tsa.seasonal import MSTL
from statsmodels.tsa.ar_model import AutoReg
from statsmodels.tsa.arima.model import ARIMA

# Suppress warnings
import warnings
warnings.filterwarnings('ignore')

# Environment settings
import os
os.environ["KERAS_BACKEND"] = "torch"
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# Matplotlib settings
plt.rcParams['figure.figsize'] = (12, 8)
%matplotlib inline


In [None]:
agg_charging_data = pd.read_pickle(os.path.join('Data', 'aggregated_data.pkl'))
agg_charging_data['year'] = agg_charging_data.index.year

plt.figure(figsize=(20, 10))
plt.subplot(3, 1, 1)
plt.plot(agg_charging_data.index, agg_charging_data['utilizationRate_total'], label='Utilization Total', color='blue')
plt.legend()
plt.subplot(3, 1, 2)
plt.plot(agg_charging_data.index, agg_charging_data['utilizationRate_site1'], label='Utilization Site1', color='red')
plt.legend()
plt.subplot(3, 1, 3)
plt.plot(agg_charging_data.index, agg_charging_data['utilizationRate_site2'], label='Utilization Site2', color='orange')
plt.legend()

agg_charging_data.info(20)

If we plot the utilization over time, we can observe some interesting pattern. The Strong dip at around 03/2020 can be explained with the corona pandamic. Besides there is a time frame with no values. The objective for this task (based on the Data mining goal) is to predict the untilization of the sites for a short term after the data ends (10/2021). This is why training the models  on these extraordinary events is not feasible, which is why we only use the data from after the data hole.

In [None]:
# Filter data and convert week of year to integer
agg_charging_data = agg_charging_data[agg_charging_data['year'] == 2021]
agg_charging_data['week_of_year'] = agg_charging_data['week_of_year'].astype('int32')

Define the model properties:

1. Input features: hour_of_day, day_of_week, month_of_year, year
    - Why these features: We cannot use real-time or future-dependent inputs unless they are forecasted or modeled beforehand
2. Target features: utilizationRate_siteX
3. Model parameters: Coefficients (θ) for polynomial regression or weights (𝑊,𝑏) for neural network.
4. Hypothesis function: Polynomial regression: 𝑦^=ℎ𝜃(𝑋), Neural network: y^=f(W,b).
5. Objective funktion: Mean Squared Error (MSE): J(θ)=1/m*∑(y^−y)^2.

In [None]:
#different data sets to train the models
gesamt_charging_data = agg_charging_data[['utilizationRate_total', 'hour_of_day', 'day_of_week', 'year', 'month_of_year']]
site1_charging_data = agg_charging_data[['utilizationRate_site1', 'hour_of_day', 'day_of_week', 'year', 'month_of_year']]
site2_charging_data = agg_charging_data[['utilizationRate_site2', 'hour_of_day', 'day_of_week', 'year', 'month_of_year']]

# Split data
X = gesamt_charging_data[['hour_of_day', 'day_of_week', 'month_of_year', 'year']]
y = gesamt_charging_data['utilizationRate_total']

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, shuffle=False) #shuffle = False ensures to split the data in order
X_train_nn, X_test_nn, y_train_nn, y_test_nn = train_test_split(X, y, test_size=0.1, shuffle=False) #shuffle = False ensures to split the data in order
X_train_poly, X_test_poly, y_train_poly, y_test_poly = train_test_split(X, y, test_size=0.1, shuffle=False) #shuffle = False ensures to split the data in order

## Neural Network

In [None]:
from sklearn.preprocessing import MinMaxScaler

# transform/ scale data
scaler = StandardScaler()
X_train_nn = scaler.fit_transform(X_train_nn)
X_test_nn = scaler.transform(X_test_nn)

In [None]:
# create model
model = Sequential([
    Dense(32, activation='relu', input_dim=X_train_nn.shape[1]),
    Dense(16, activation='relu'),
    Dense(1)  # Für die Vorhersage eines kontinuierlichen Wertes
])

model.compile(optimizer='adam', loss='mse', metrics=['mae'])
model.summary()


In [None]:
# Modell trainieren
epochs = 20
history = model.fit(X_train_nn, y_train_nn, epochs=epochs, batch_size=32, validation_split=0.2, shuffle=False)

In [None]:
#check the overview of the training and validation loss 
history_df = pd.DataFrame(history.history)
history_df.head()

# Plot training and validation loss to see the differences in each epoch
plt.figure(figsize=(14, 6))

plt.plot(history_df['loss'], label='Training Loss')
plt.plot(history_df['val_loss'], label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Training and Validation MSE')

plt.show()

In [None]:
# Vorhersagen
y_pred_nn = model.predict(X_test_nn)


In [None]:
#Measure performance
mse_nn = mean_squared_error(y_test_nn, y_pred_nn)
mae_nn = mean_absolute_error(y_test_nn, y_pred_nn)
r2_nn = r2_score(y_test_nn, y_pred_nn)

## Polynomial regression

As a second model we will us a polynominal regression model. In order to find the right hyperparameters, we conduct a grid search on the performance measures to find the right value for the degree (D) and the regularization strenght (alpha).

In [None]:
# Define the pipeline
pipeline = Pipeline([
    ('polynomial_features', PolynomialFeatures()),
    ('scaler', StandardScaler()),
    ('ridge_regression', Ridge())
])

# Define the hyperparameter grid
param_grid = {
    'polynomial_features__degree': range(11), 
    'ridge_regression__alpha': np.logspace(-3, 2, 5)
}

# Define the scoring metric
scorer = make_scorer(mean_absolute_error, greater_is_better=False)
kf = KFold(n_splits=5, shuffle=False)

# Perform GridSearchCV
grid_search = GridSearchCV(
    pipeline,
    param_grid,
    cv=kf,
    scoring=scorer,
)

# Fit the grid search on the training data
grid_search.fit(X_train_poly, y_train_poly)

# Get the best hyperparameters and corresponding score
best_params = grid_search.best_params_
best_score = -grid_search.best_score_  

print(f"Best Parameters: {best_params}")
print(f"Best Mean Squared Error (Validation Set, Cross-Validation): {best_score}")

# Validate the best model on the separate validation set
best_model_poly = grid_search.best_estimator_

# Final evaluation on the test set
y_pred_poly = best_model_poly.predict(X_test_poly)

mse_poly = mean_squared_error(y_test_poly, y_pred_poly)
mae_poly = mean_absolute_error(y_test_poly, y_pred_poly)
r2_poly = r2_score(y_test_poly, y_pred_poly)


In [None]:
# Plot Predictions
# Index erstellen, um die Testdaten in der Reihenfolge darzustellen
indices = np.arange(len(y_test_poly))

plt.figure(figsize=(12, 6))

# plot actual values
plt.plot(indices, y_test, label='Actual Values', color='blue', linestyle='-', alpha=0.7)

# replace negative values with 0
y_pred_nn = np.maximum(0, y_pred_nn)
y_pred_poly = np.maximum(0, y_pred_poly)

# plot predicted values
plt.plot(indices, y_pred_poly, label='Predicted Values Polynominal regression', color='orange', linestyle='-', alpha=0.7)
plt.plot(indices, y_pred_nn, label='Predicted Values Neural Network', color='red', linestyle='-', alpha=0.7)

plt.title("Actual vs. Predicted Values (Test Set)", fontsize=14)
plt.xlabel("Index", fontsize=12)
plt.ylabel("Utilization Rate", fontsize=12)
plt.legend()
plt.grid(alpha=0.3)
plt.tight_layout()
plt.show()
print(f"Neural Network - MSE: {mse_nn}, MAE: {mae_nn}, R²: {r2_nn}")
print(f"Polynomial Regression - MSE: {mse_poly}, MAE: {mae_poly}, R²: {r2_poly}")


The comparison indicates a better performance of the polynominal regression model, likely because of the limited complexity. Lets use the model to predict the next weeks utilization.

## Future prediction
We use the regression model (better performing) to predict utilizations in the future. Based on our goal to predict relatively short time periods, we choose to predict the next week after the dataset ends.

In [None]:
# Convert X_test back to a DataFrame if it's a NumPy array
if not isinstance(X_test, pd.DataFrame):
    X_test = pd.DataFrame(X_test, columns=['hour_of_day', 'day_of_week', 'month_of_year', 'year'])

# Extend the test set one month forward
def extend_test_set(X_test, n_steps=24):
    extended_test_set = pd.DataFrame()  # Initialize an empty DataFrame
    last_row = X_test.iloc[-1].copy()  # Start from the last row
    extended_rows = []
    for _ in range(n_steps):  # Simulate the next 'n_steps' rows
        last_row['hour_of_day'] += 1
        if last_row['hour_of_day'] > 24:  # Reset hour_of_day and increment day_of_week
            last_row['hour_of_day'] = 1
            last_row['day_of_week'] += 1
        if last_row['day_of_week'] > 7:  # Reset day_of_week and increment month_of_year
            last_row['day_of_week'] = 1
            last_row['month_of_year'] += 1
        if last_row['month_of_year'] > 12:  # Reset month_of_year and increment year
            last_row['month_of_year'] = 1
            last_row['year'] += 1
        extended_rows.append(last_row.copy())  # Add the new row
    
    extended_test_set = pd.concat([extended_test_set, pd.DataFrame(extended_rows)], ignore_index=True)
    return extended_test_set

# Generate extended test set
X_test_extended = extend_test_set(X_test, n_steps=300)

# Predict on the extended test set
y_extended_pred = best_model_poly.predict(X_test_extended)
y_extended_pred = np.maximum(0, y_extended_pred)

# Plot the results
plt.figure(figsize=(12, 6))
plt.plot(range(len(y_test)), y_test, label='Test Set', color='blue')
plt.plot(range(len(y_test), len(y_test) + len(y_extended_pred)), y_extended_pred, label='Next Weeks Predictions', color='orange')
plt.xlabel("Time Steps")
plt.ylabel("Utilization Rate")
plt.title("Test Set and Extended Predictions")
plt.legend()
plt.grid(True)
plt.show()

While the prediction is not perfect, it shows the characteristics of the data. The plot clearly illustrates the daily peaks and the overall rising trend.

## Time Series: ARIMA

The models performance are satisfying and show the basic structure and trend of the data, but the predictive performance is still not optimal. Because we rely on time-dependent data, we also tested the use of an ARIMA model, which focuses on time series.

### Load and Prepare the Data

In [None]:
# Select 'utilizationRate_total' column from the DataFrame
data = agg_charging_data[['utilizationRate_total']]  # This keeps it as a DataFrame, not a Series

data.index = pd.to_datetime(data.index)

data.index.freq = 'H'

data['utilizationRate_total'].replace(0, np.nan, inplace=True)
data['utilizationRate_total'].fillna(method='bfill', inplace=True)
data['utilizationRate_total'].fillna(method='ffill', inplace=True)

data.index = pd.to_datetime(data.index).tz_localize(None)

print(data.head())
print(data['utilizationRate_total'].isna().sum())
print(data.shape)
# Plot the DataFrame
plt.plot(data)
plt.title('Utilization Rate Over Time')
plt.xlabel('Time')
plt.ylabel('Utilization Rate')
plt.grid(True)
plt.show()

### Check stationarity
We use statistic tests like the Dicky Fuller test to determine if the series is stationary. We copied the code out of the  This is necessary to be able for the model to be processed

In [None]:
def test_stationarity(timeseries):
    
    #Determing rolling statistics
    rolmean = timeseries.rolling(window=168).mean()
    rolstd = timeseries.rolling(window=168).std()

    #Plot rolling statistics:
    orig = plt.plot(timeseries, color='blue',label='Original')
    mean = plt.plot(rolmean, color='red', label='Rolling Mean')
    std = plt.plot(rolstd, color='black', label = 'Rolling Std')
    plt.legend(loc='best')
    plt.title('Rolling Mean & Standard Deviation')
    plt.show(block=False)
    
    #Perform Dickey-Fuller test:
    print('Results of Dickey-Fuller Test:\n')
    dftest = adfuller(timeseries)
    dfoutput = pd.Series(dftest[0:4], index=['Test Statistic','p-value','#Lags Used','Number of Observations Used'])
    for key,value in dftest[4].items():
        dfoutput['Critical Value (%s)'%key] = value
    print(dfoutput)

### Data Transformation and Decomposition
We transform the data by calculating its log and decompose it. Since the data shows a daily trend (peaks) and a weekly one (lower peaks on weekends), we use MSTL to decompose 2 seasonal periods.

In [None]:
ts_log = np.log(data['utilizationRate_total'])

In [None]:
res = MSTL(ts_log, periods=(24, 24*7)).fit()
# Komponenten aus dem res-Objekt extrahieren
observed = res.observed
trend = res.trend
seasonal = res.seasonal
residuals = res.resid
print(seasonal.info())

# Anzahl der Subplots berechnen
num_seasonal = len(seasonal.columns)
num_subplots = 3 + num_seasonal  # Observed, Trend, Residuals + jede saisonale Komponente

# Subplots erstellen
fig, ax = plt.subplots(num_subplots, 1, figsize=(12, 3 * num_subplots), sharex=True)

# Plot for the observations
ax[0].plot(observed, label='Observed', color='black')
ax[0].set_title('Observed')
ax[0].legend()

# Plot for the trends
ax[1].plot(trend, label='Trend', color='blue')
ax[1].set_title('Trend')
ax[1].legend()

# Plots for every season
for i, col in enumerate(seasonal.columns):
    ax[i + 2].plot(seasonal[col], label=f'Seasonal {col}', color='green')
    ax[i + 2].set_title(f'Seasonal Component: {col}')
    ax[i + 2].legend()

# Plot for residuals
ax[-1].scatter(residuals.index, residuals, label='Residuals', color='red', alpha=0.6)
ax[-1].axhline(0, linestyle='--', color='gray')
ax[-1].set_title('Residuals')
ax[-1].legend()

plt.tight_layout()
plt.show()

In [None]:
ts_log_decompose = residuals.copy()
ts_log_decompose.interpolate(method='time', inplace=True)
test_stationarity(ts_log_decompose)

The Test Statistic is lower then the critical values and the p-value is very small, indicating a stationary time series.

### Determine options
Using ACF and PACF charts to determine the p and q value

In [None]:
#ACF and PACF plots:
lag_acf = acf(ts_log_decompose, nlags=20)
lag_pacf = pacf(ts_log_decompose, nlags=20, method='ols')

#Plot ACF: 
plt.subplot(121) 
plt.plot(lag_acf)
plt.axhline(y=0,linestyle='--',color='gray')
plt.axhline(y=-1.96/np.sqrt(len(ts_log_decompose)),linestyle='--',color='gray')
plt.axhline(y=1.96/np.sqrt(len(ts_log_decompose)),linestyle='--',color='gray')
plt.title('Autocorrelation Function')

#Plot PACF:
plt.subplot(122)
plt.plot(lag_pacf)
plt.axhline(y=0,linestyle='--',color='gray')
plt.axhline(y=-1.96/np.sqrt(len(ts_log_decompose)),linestyle='--',color='gray')
plt.axhline(y=1.96/np.sqrt(len(ts_log_decompose)),linestyle='--',color='gray')
plt.title('Partial Autocorrelation Function')
plt.tight_layout()

# Define confidence interval
conf_interval = 1.96 / np.sqrt(len(ts_log_decompose))

# Determine p (where PACF cuts off sharply)
p = next((i for i, x in enumerate(lag_pacf) if abs(x) < conf_interval and i > 0), None)

# Determine d (use default value of 1)
d = 1

# Determine q (where ACF cuts off sharply)
q = next((i for i, x in enumerate(lag_acf) if abs(x) < conf_interval and i > 0), None)

print(f"Suggested value for p (AR): {p}")
print(f"Suggested value for q (MA): {q}")

### Fitting Arima model

In [None]:
split_ratio = 0.9
train_size = int(len(ts_log_decompose) * split_ratio)
residual_train, residual_test = train_test_split(ts_log_decompose, train_size=train_size, shuffle=False)

model = ARIMA(residual_train, order=(p, d, q))  
results_ARIMA = model.fit() 
plt.plot(residual_train)
plt.plot(results_ARIMA.fittedvalues, color='red')
plt.title('RSS: %.4f'% sum((results_ARIMA.fittedvalues-residual_train)**2))

### Forecasting on test set

In [None]:
# Ensure frequency and alignment for all components
full_index = pd.date_range(start=residuals.index.min(), end=residuals.index.max(), freq='H')
residuals = residuals.reindex(full_index)
trend = trend.reindex(full_index)
seasonal = seasonal.reindex(full_index)

# Fill missing values and ensure no NaNs
residuals.interpolate(method='time', inplace=True)
trend.interpolate(method='time', inplace=True)
seasonal.interpolate(method='time', inplace=True)

# Set consistent frequency
residuals.index.freq = 'H'
trend.index.freq = 'H'
seasonal.index.freq = 'H'

# Train-test split using slicing
train_size = int(len(residuals) * split_ratio)
residual_train = residuals.iloc[:train_size]
residual_test = residuals.iloc[train_size:]
trend_train = trend.iloc[:train_size]
trend_test = trend.iloc[train_size:]
seasonal_train_1 = seasonal.iloc[:train_size, 0]
seasonal_test_1 = seasonal.iloc[train_size:, 0]
seasonal_train_2 = seasonal.iloc[:train_size, 1]
seasonal_test_2 = seasonal.iloc[train_size:, 1]

# Predict residuals using ARIMA
residual_predictions = results_ARIMA.predict(start=len(residual_train), end=len(residuals) - 1)
residual_predictions.index = residual_test.index

# Align indices for all components
trend_test.index = residual_test.index
seasonal_test_1.index = residual_test.index
seasonal_test_2.index = residual_test.index

# Ensure predictions match in length
assert len(residual_predictions) == len(trend_test), "Residual predictions length mismatch!"

# Reconstruct predictions (in log scale)
log_predictions = (
    residual_predictions
    + trend_test
    + seasonal_test_1
    + seasonal_test_2
)

# Apply inverse log transformation to return to original scale
full_predictions = np.exp(log_predictions)

# Trim full_predictions to match test data length
full_predictions = full_predictions.iloc[:len(data.iloc[train_size:])]

# Ensure the test set length matches full_predictions
test = data.iloc[train_size:]['utilizationRate_total']
assert len(test) == len(full_predictions), "Final lengths still do not match!"

# Plot the results
plt.figure(figsize=(12, 6))
plt.plot(test, label="Original Test Data", color='blue')
plt.plot(full_predictions, label="Predictions (Test Set)", color='green')
plt.legend()
plt.title('ARIMA Model Predictions Test Set')
plt.show()

# Evaluate the model
mse_arima = mean_squared_error(test, full_predictions)
mae_arima = mean_absolute_error(test, full_predictions)
r2_arima = r2_score(test, full_predictions)

print(f"ARIMA model - MSE: {mse_arima}, MAE: {mae_arima}, R²: {r2_arima}")

As we see, the ARIMA model handles the time series data much better then the other models.