In [None]:
# =============================================================================
# Project: Deep Learning Benchmark for PV Power Forecasting (Deep_Robust Regime)
# File: benchmark_comparison.ipynb
# Description: Benchmarking 9 models (including Transformer) under identical
#              hyperparameter conditions to ensure fair comparison.
# Environment: Google Colab / TensorFlow 2.x
# =============================================================================

# ---------------------------------------------------------
# 0. Setup & Configuration
# ---------------------------------------------------------
import pandas as pd
import numpy as np
import tensorflow as tf
import io
import matplotlib.pyplot as plt
import os
import random
from google.colab import files
from tensorflow.keras.models import Model
from tensorflow.keras.layers import (Input, Dense, Dropout, LSTM, GRU, SimpleRNN,
                                     Conv1D, BatchNormalization, Bidirectional,
                                     MaxPooling1D, GlobalAveragePooling1D,
                                     MultiHeadAttention, LayerNormalization, Add,
                                     Concatenate, Flatten, Activation)
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

# Robust Reproducibility
SEED = 42
def set_seeds(seed=SEED):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)
    try:
        tf.config.experimental.enable_op_determinism()
    except:
        pass

set_seeds()

# [Configuration] Deep_Robust Settings
CONFIG = {
    'SEQ_LENGTH': 24,           # Input Sequence Length
    'UNITS': 128,               # Model Capacity
    'DROPOUT': 0.5,             # Regularization
    'KERNEL_SIZE': 3,           # CNN Kernel
    'ATTN_HEADS': 4,            # Attention Heads
    'FF_DIM': 128,              # Transformer FeedForward Dim
    'ACTIVATION': 'swish',      # Activation Function
    'LEARNING_RATE': 0.0005,    # Optimizer LR
    'BATCH_SIZE': 32,           # Batch Size
    'EPOCHS': 30,               # Max Epochs
    'OPTIMIZER': 'adam',        # Optimizer
    'LOSS': 'mse'               # Loss Function
}

print("--- [Configuration] Deep_Robust Settings Applied ---")
for k, v in CONFIG.items():
    print(f" > {k}: {v}")

In [None]:
# ---------------------------------------------------------
# 1. Data Loading & Preprocessing
# ---------------------------------------------------------
print("\n--- [Step 1] Data Preparation ---")
print("Please upload your dataset (e.g., 'Dangjin_Landfill_PV_Dataset.csv' or 'Gwangyang_Port_Phase2_PV.csv').")

uploaded = files.upload()
filename = next(iter(uploaded))
df = pd.read_csv(io.BytesIO(uploaded[filename]))

def add_cyclical_features(df):
    """Adds cyclical time encoding for seasonality capture."""
    df['Date'] = pd.to_datetime(df[['Year', 'Month', 'Day']])
    df['DayOfYear'] = df['Date'].dt.dayofyear
    df['DaysInYear'] = df['Date'].dt.is_leap_year.apply(lambda x: 366 if x else 365)

    # Yearly Seasonality
    df['Day_sin'] = np.sin(2 * np.pi * df['DayOfYear'] / df['DaysInYear'])
    df['Day_cos'] = np.cos(2 * np.pi * df['DayOfYear'] / df['DaysInYear'])

    # Daily Seasonality
    df['Hour_sin'] = np.sin(2 * np.pi * df['Hour'] / 24.0)
    df['Hour_cos'] = np.cos(2 * np.pi * df['Hour'] / 24.0)
    return df

df = add_cyclical_features(df)

# Feature Definition
weather_cols = ['AverageTemp', 'LowTemp', 'HighTemp', 'RainFall', 'SteamPress',
                'DewPoint', 'Sunshine', 'Insolation', 'Cloudiness', 'GroundTemp',
                'Temp', 'Wind', 'Press', 'Humi']
time_cols = ['Day_sin', 'Day_cos', 'Hour_sin', 'Hour_cos']
feature_cols = weather_cols + time_cols
target_col = 'Solar_Power'

# Chronological Split
train_df = df[df['Year'].isin([2015, 2016, 2017])]
val_df = df[df['Year'] == 2018]
test_df = df[df['Year'] == 2019]

# Standardization
scaler = StandardScaler()
X_train_raw = scaler.fit_transform(train_df[feature_cols].values)
X_val_raw = scaler.transform(val_df[feature_cols].values)
X_test_raw = scaler.transform(test_df[feature_cols].values)

y_train = train_df[target_col].values
y_val = val_df[target_col].values
y_test = test_df[target_col].values

def create_sequences(data, target, seq_length):
    xs, ys = [], []
    for i in range(len(data) - seq_length):
        xs.append(data[i:(i + seq_length)])
        ys.append(target[i + seq_length])
    return np.array(xs), np.array(ys)

X_train, y_train = create_sequences(X_train_raw, y_train, CONFIG['SEQ_LENGTH'])
X_val, y_val = create_sequences(X_val_raw, y_val, CONFIG['SEQ_LENGTH'])
X_test, y_test = create_sequences(X_test_raw, y_test, CONFIG['SEQ_LENGTH'])

print(f" >> Data Split | Train: {X_train.shape}, Val: {X_val.shape}, Test: {X_test.shape}")

In [None]:
# ---------------------------------------------------------
# 2. Model Factory (Including Transformer)
# ---------------------------------------------------------
def get_benchmark_model(model_name, input_shape, conf):
    """
    Constructs the specified model architecture using fixed hyperparameters.
    """
    inputs = Input(shape=input_shape)

    # Unpack Config
    u = conf['UNITS']
    dr = conf['DROPOUT']
    act = conf['ACTIVATION']
    ks = conf['KERNEL_SIZE']

    x = inputs

    # --- Architectures ---
    if model_name == 'RNN':
        x = SimpleRNN(u, activation=act, return_sequences=False)(x)
        x = Dropout(dr)(x)

    elif model_name == 'LSTM':
        x = LSTM(u, activation=act, return_sequences=False)(x)
        x = Dropout(dr)(x)

    elif model_name == 'GRU':
        x = GRU(u, activation=act, return_sequences=False)(x)
        x = Dropout(dr)(x)

    elif model_name == 'BiLSTM':
        x = Bidirectional(LSTM(u, activation=act, return_sequences=False))(x)
        x = Dropout(dr)(x)

    elif model_name == 'CNN_1D':
        x = Conv1D(filters=u, kernel_size=ks, activation=act, padding='same')(x)
        x = BatchNormalization()(x)
        x = MaxPooling1D(pool_size=2)(x)
        x = Flatten()(x)
        x = Dropout(dr)(x)

    elif model_name == 'CNN_BiLSTM':
        x = Conv1D(filters=u, kernel_size=ks, activation=act, padding='same')(x)
        x = BatchNormalization()(x)
        x = MaxPooling1D(pool_size=2)(x)
        x = Dropout(dr)(x)
        x = Bidirectional(LSTM(u, activation=act, return_sequences=False))(x)
        x = Dropout(dr)(x)

    elif model_name == 'Attn_GRU':
        gru_out = GRU(u, activation=act, return_sequences=True)(x)
        # Self-Attention
        attn_out = MultiHeadAttention(num_heads=conf['ATTN_HEADS'], key_dim=u)(gru_out, gru_out)
        x = Add()([gru_out, attn_out])
        x = LayerNormalization()(x)
        x = GlobalAveragePooling1D()(x)
        x = Dropout(dr)(x)

    elif model_name == 'TCN':
        # Simplified TCN Block with Residuals
        for dilation_rate in [1, 2, 4]:
            prev_x = x
            conv = Conv1D(filters=u, kernel_size=ks, dilation_rate=dilation_rate,
                          padding='causal', activation=act)(x)
            conv = BatchNormalization()(conv)
            conv = Dropout(0.1)(conv) # Light internal dropout

            # Match dimensions if needed
            if prev_x.shape[-1] != u:
                prev_x = Conv1D(filters=u, kernel_size=1, padding='same', activation=act)(prev_x)

            x = Add()([prev_x, conv])

        x = GlobalAveragePooling1D()(x)
        x = Dropout(dr)(x)

    elif model_name == 'Transformer':
        # Standard Transformer Encoder
        attn_output = MultiHeadAttention(num_heads=conf['ATTN_HEADS'], key_dim=u)(x, x)
        x = Add()([x, attn_output])
        x = LayerNormalization(epsilon=1e-6)(x)

        # Feed Forward Network
        ffn = Dense(conf['FF_DIM'], activation=act)(x)
        ffn = Dense(input_shape[-1])(ffn)
        x = Add()([x, ffn])
        x = LayerNormalization(epsilon=1e-6)(x)

        x = GlobalAveragePooling1D()(x) # Pooling instead of LSTM
        x = Dropout(dr)(x)

    # --- Output Head ---
    x = Dense(32, activation=act)(x)
    outputs = Dense(1, activation='relu')(x) # Non-negative power

    model = Model(inputs=inputs, outputs=outputs, name=model_name)
    model.compile(optimizer=Adam(learning_rate=conf['LEARNING_RATE']),
                  loss=conf['LOSS'], metrics=['mae'])
    return model

In [None]:
# ---------------------------------------------------------
# 3. Benchmarking Loop
# ---------------------------------------------------------
print("\n--- [Step 2] Conducting Benchmark (9 Models) ---")

model_list = [
    'RNN', 'LSTM', 'GRU', 'BiLSTM',
    'CNN_1D', 'CNN_BiLSTM', 'Attn_GRU', 'TCN', 'Transformer'
]

results = []

# Prepare Combined Data for Retraining
X_combined = np.concatenate((X_train, X_val), axis=0)
y_combined = np.concatenate((y_train, y_val), axis=0)

for name in model_list:
    print(f"\n>> Evaluating Model: {name}")

    # 1. Validation Phase (Find Best Epoch)
    model = get_benchmark_model(name, (CONFIG['SEQ_LENGTH'], len(feature_cols)), CONFIG)
    early_stop = EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)

    hist = model.fit(
        X_train, y_train,
        validation_data=(X_val, y_val),
        epochs=CONFIG['EPOCHS'],
        batch_size=CONFIG['BATCH_SIZE'],
        callbacks=[early_stop],
        verbose=0
    )

    best_epoch = np.argmin(hist.history['val_loss']) + 1
    val_loss = min(hist.history['val_loss'])
    print(f"   -> Optimal Epoch: {best_epoch} (Val MSE: {val_loss:.4f})")

    # 2. Retraining Phase (Combined Data)
    print(f"   -> Retraining on full dataset...")
    final_model = get_benchmark_model(name, (CONFIG['SEQ_LENGTH'], len(feature_cols)), CONFIG)
    final_model.fit(
        X_combined, y_combined,
        epochs=best_epoch,
        batch_size=CONFIG['BATCH_SIZE'],
        verbose=0
    )

    # 3. Testing Phase
    y_pred = final_model.predict(X_test, verbose=0)

    rmse = np.sqrt(mean_squared_error(y_test, y_pred))
    mae = mean_absolute_error(y_test, y_pred)
    r2 = r2_score(y_test, y_pred)

    print(f"   -> [Result] RMSE: {rmse:.4f} | MAE: {mae:.4f} | R2: {r2:.4f}")

    results.append({
        'Model': name,
        'RMSE': rmse,
        'MAE': mae,
        'R2': r2
    })

In [None]:
# ---------------------------------------------------------
# 4. Final Reporting
# ---------------------------------------------------------
print("\n--- [Step 3] Final Benchmark Report ---")

# Table
results_df = pd.DataFrame(results)
print("\n[Table: Deep Learning Models Comparison]")
print(results_df)

# Visualization
plt.figure(figsize=(12, 6))
bars = plt.bar(results_df['Model'], results_df['RMSE'], color='dimgray', alpha=0.8)
plt.title('Benchmark Comparison: Deep Learning Models (RMSE)', fontsize=14, fontweight='bold')
plt.ylabel('RMSE (kW) - Lower is Better', fontsize=12)
plt.grid(axis='y', linestyle='--', alpha=0.5)

# Highlight lowest RMSE
min_rmse_idx = results_df['RMSE'].idxmin()
bars[min_rmse_idx].set_color('crimson')
bars[min_rmse_idx].set_alpha(1.0)

for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval + 1.0, f"{yval:.2f}",
             ha='center', va='bottom', fontweight='bold', fontsize=10)

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("\n[Analysis]")
print(f"1. The model with the lowest RMSE is '{results_df.loc[min_rmse_idx, 'Model']}'.")
print("2. 'Transformer' results indicate the performance of pure self-attention without local convolution.")
print("3. Comparing 'CNN_BiLSTM' and 'Attn_GRU' reveals the trade-off between local feature extraction and global context.")