In [None]:
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, mean_absolute_percentage_error
import random

def set_seed(seed=123):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

set_seed(123)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
from google.colab import files
uploaded = files.upload()

In [None]:
import pandas as pd

# Load datasets
historical_df = pd.read_csv("merged_energy_weather_data.csv", parse_dates=["Date"])
forecast_df = pd.read_csv("weather_forecasts_aug.csv", parse_dates=["Date"])
actual_df = pd.read_csv("actual_aug.csv", parse_dates=["Date"])

In [None]:
historical_df.columns = [
    "date", "price", "temp", "precip", "wind", "humidity",
    "cloud", "radiation", "week_day", "month", "day_month"
]
print(historical_df.shape)
print(historical_df.head())

historical_df = historical_df.sort_values("date").reset_index(drop=True)
cloud_missing = historical_df['cloud'].isna()
historical_df.loc[cloud_missing, 'cloud'] = (
    historical_df['cloud'].shift(1) + historical_df['cloud'].shift(-1)
) / 2
print("Remaining NaNs:\n", historical_df.isna().sum())

In [None]:
df = historical_df.copy()
df = df.drop(columns=["day_month"])

df = df.sort_values("date").reset_index(drop=True)
for lag in range(1, 8):
    df[f"lag_{lag}"] = df["price"].shift(lag)
df = df.dropna().reset_index(drop=True)

df.head()

In [None]:
forecast_df.columns = [
    "date", "temp", "precip", "wind", "humidity",
    "cloud", "radiation", "week_day", "month"
]
print(forecast_df.shape)
forecast_df.head()

In [None]:
actual_df.columns = ["date", "price"]
print(actual_df.shape)
actual_df.head()

Feedforward NN

In [None]:
import torch
import torch.nn as nn
import numpy as np
from sklearn.preprocessing import MinMaxScaler

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define Feedforward NN Model
class FeedforwardNN(nn.Module):
    def __init__(self, input_dim, hidden_dims=[64], dropout=0.0):
        super(FeedforwardNN, self).__init__()
        layers = []
        prev_dim = input_dim
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            if dropout > 0:
                layers.append(nn.Dropout(dropout))
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, 1))  # Output layer
        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

# Define feature columns (input features for the model)
feature_cols = [
    "temp", "precip", "wind", "humidity", "cloud", "radiation",
    "week_day", "month",
    "lag_1", "lag_2", "lag_3", "lag_4", "lag_5", "lag_6", "lag_7"
]

In [None]:
from sklearn.preprocessing import MinMaxScaler

# Extract input (X) and target (y) for training
X_train = df[feature_cols].values
y_train = df["price"].values.reshape(-1, 1)

# Fit scalers
scaler_X = MinMaxScaler()
scaler_y = MinMaxScaler()

X_train_scaled = scaler_X.fit_transform(X_train)
y_train_scaled = scaler_y.fit_transform(y_train)

# Convert to tensors
X_train_tensor = torch.tensor(X_train_scaled, dtype=torch.float32).to(device)
y_train_tensor = torch.tensor(y_train_scaled, dtype=torch.float32).to(device)

# Initialize and train the model (best parameters)
model_ff = FeedforwardNN(input_dim=X_train.shape[1], hidden_dims=[64], dropout=0.0).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model_ff.parameters(), lr=0.001)

# Simple training loop
epochs = 100
for epoch in range(epochs):
    model_ff.train()
    optimizer.zero_grad()
    outputs = model_ff(X_train_tensor)
    loss = criterion(outputs, y_train_tensor)
    loss.backward()
    optimizer.step()
    if (epoch+1) % 10 == 0:
        print(f"Epoch {epoch+1}/{epochs} - Loss: {loss.item():.4f}")

In [None]:
# Create a working copy of the forecast data
final_forecast_df = forecast_df.copy()
final_forecast_df = final_forecast_df.sort_values("date").reset_index(drop=True)

# Initialize lag window from final 7 days of training set
lag_window = df["price"].iloc[-7:].tolist()

# Add placeholder lag columns
for lag in range(1, 8):
    final_forecast_df[f"lag_{lag}"] = np.nan

# Fill lags for the first forecast row
for i in range(14):
    for lag_i in range(1, 8):
        final_forecast_df.loc[i, f"lag_{lag_i}"] = lag_window[-lag_i]

In [None]:
model_ff.eval()
ffnn_preds = []

for i in range(14):
    row = final_forecast_df.iloc[i].copy()

    # Set lag values in row from current lag_window
    for lag_i in range(1, 8):
        row[f"lag_{lag_i}"] = lag_window[-lag_i]

    print(f"\nDay {i+1} – Current lag window:", lag_window[::-1])

    # Prepare input row
    input_row = row[feature_cols].values.reshape(1, -1)
    input_scaled = scaler_X.transform(input_row)
    input_tensor = torch.tensor(input_scaled, dtype=torch.float32).to(device)

    # Predict
    with torch.no_grad():
        pred_scaled = model_ff(input_tensor).cpu().numpy().flatten()
        pred_price = scaler_y.inverse_transform(pred_scaled.reshape(1, -1)).flatten()[0]

    print(f"Prediction: {pred_price:.2f}")

    # Append prediction
    ffnn_preds.append(pred_price)

    # Update lag window (autoregressively)
    lag_window.append(pred_price)
    lag_window = lag_window[1:]

In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error

actual_prices = actual_df["price"].values[:14]

def mape(actual, predicted, eps=5):
    """
    Compute a stable Mean Absolute Percentage Error (MAPE) by flooring small actual values.

    Parameters:
        actual (np.array): Actual target values
        predicted (np.array): Predicted values
        eps (float): Minimum denominator value to avoid exploding percentages

    Returns:
        float: MAPE percentage
    """
    denom = np.maximum(np.abs(actual), eps)
    return np.mean(np.abs(actual - predicted) / denom) * 100

def smape(actual, predicted, eps=1e-8):
    """
    Compute the Symmetric Mean Absolute Percentage Error (SMAPE).

    Parameters:
        actual (np.array): Actual values
        predicted (np.array): Predicted values
        eps (float): Small value to prevent division by zero

    Returns:
        float: SMAPE percentage
    """
    actual = np.array(actual)
    predicted = np.array(predicted)

    denominator = (np.abs(actual) + np.abs(predicted)) + eps
    smape_val = np.mean(2 * np.abs(predicted - actual) / denominator) * 100
    return smape_val

mae = mean_absolute_error(actual_prices, ffnn_preds)
mse = mean_squared_error(actual_prices, ffnn_preds)
rmse = np.sqrt(mse)

mape_value = mape(actual_prices, ffnn_preds)
smape_val = smape(actual_prices, ffnn_preds)

print("\n Feedforward NN – Pseudo-Test Evaluation:")
print(f"MAE:  {mae:.4f}")
print(f"RMSE: {rmse:.4f}")
print(f"MAPE: {mape_value:.4f}")
print(f"SMAPE: {smape_val:.4f}")

In [None]:
print(actual_prices)

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(actual_prices, label="Actual")
plt.plot(ffnn_preds, label="Predicted", linestyle='--')
plt.title("Feedforward NN – Pseudo-Test Predictions")
plt.xlabel("Day (1–14 Aug)")
plt.ylabel("Price")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
for i, (true, pred) in enumerate(zip(actual_prices, ffnn_preds)):
    print(f"Day {i+1}: Actual={true:.2f}, Predicted={pred:.2f}")

TCN

In [None]:
import torch
import torch.nn as nn

class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super().__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size]  # crop from the end only

class TemporalBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, dilation, padding, dropout):
        super().__init__()
        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size,
                               stride=stride, padding=padding, dilation=dilation)
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size,
                               stride=stride, padding=padding, dilation=dilation)
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(
            self.conv1, self.chomp1, self.relu1, self.dropout1,
            self.conv2, self.chomp2, self.relu2, self.dropout2
        )

        self.downsample = nn.Conv1d(in_channels, out_channels, 1) \
            if in_channels != out_channels else None
        self.relu = nn.ReLU()

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)

class TCN(nn.Module):
    def __init__(self, num_inputs, output_size, num_channels, kernel_size=2, dropout=0.2):
        super().__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            in_ch = num_inputs if i == 0 else num_channels[i-1]
            out_ch = num_channels[i]
            dilation_size = 2 ** i
            padding = (kernel_size - 1) * dilation_size
            layers += [TemporalBlock(in_ch, out_ch, kernel_size, stride=1, dilation=dilation_size,
                                     padding=padding, dropout=dropout)]

        self.network = nn.Sequential(*layers)
        self.linear = nn.Linear(num_channels[-1], output_size)

    def forward(self, x):
        # x: [batch, seq_len, features] -> [batch, features, seq_len]
        x = x.permute(0, 2, 1)
        y = self.network(x)
        # Take the last time step's output
        y = y[:, :, -1]
        return self.linear(y)

In [None]:
# Best hyperparameters
channels = [128, 128]
kernel_size = 5
dropout = 0.2
lr = 0.001
batch_size = 32
sequence_len = 30
horizon = 14

# Input/output sizes
input_size = df.drop(columns=["price", "date"]).shape[1]
output_size = 1

# Reinitialize model with the updated TCN definition
model_tcn = TCN(num_inputs=input_size, output_size=output_size, num_channels=channels,
                kernel_size=kernel_size, dropout=dropout).to(device)

# Re-train the model on full data
X_full = df.drop(columns=["price", "date"]).values
y_full = df["price"].values.reshape(-1, 1)

from sklearn.preprocessing import StandardScaler
scaler_X = StandardScaler()
scaler_y = StandardScaler()
X_scaled = scaler_X.fit_transform(X_full)
y_scaled = scaler_y.fit_transform(y_full)

# Create input sequences for training
def create_tcn_sequences(X, y, sequence_len):
    X_seq, y_seq = [], []
    for i in range(len(X) - sequence_len):
        X_seq.append(X[i:i+sequence_len])
        y_seq.append(y[i+sequence_len])
    return torch.tensor(X_seq, dtype=torch.float32), torch.tensor(y_seq, dtype=torch.float32)

X_train_tcn, y_train_tcn = create_tcn_sequences(X_scaled, y_scaled, sequence_len)

train_dataset = torch.utils.data.TensorDataset(X_train_tcn, y_train_tcn)
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

# Optimizer and loss
optimizer = torch.optim.Adam(model_tcn.parameters(), lr=lr)
criterion = nn.MSELoss()

# Train for fixed epochs
model_tcn.train()
for epoch in range(50):
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        preds = model_tcn(xb).squeeze()
        loss = criterion(preds, yb.squeeze())
        loss.backward()
        optimizer.step()

In [None]:
# Starting inputs
last_rows = df.drop(columns=["price", "date"]).iloc[-sequence_len:].copy()
lag_window = df["price"].iloc[-7:].tolist()

tcn_preds = []

for i in range(14):  # Day 1 to 14 August
    row = forecast_df.iloc[i].copy()

    # Insert lag values into the row
    for lag_i in range(1, 8):
        row[f"lag_{lag_i}"] = lag_window[-lag_i]

    # Combine with previous sequence
    input_seq = pd.concat([last_rows.iloc[1:], row.to_frame().T], ignore_index=True)
    last_rows = input_seq.copy()

    # Scale and predict
    X_input = scaler_X.transform(input_seq.drop(columns=["date"]).values).reshape(1, sequence_len, -1)
    X_tensor = torch.tensor(X_input, dtype=torch.float32).to(device)

    with torch.no_grad():
        pred_scaled = model_tcn(X_tensor).cpu().numpy().flatten()[0]
        pred_price = scaler_y.inverse_transform([[pred_scaled]])[0, 0]

    tcn_preds.append(pred_price)
    lag_window.append(pred_price)
    lag_window = lag_window[1:]

In [None]:
actual_prices = actual_df["price"].values[:14]

def smape(y_true, y_pred):
    denom = (np.abs(y_true) + np.abs(y_pred)) / 2.0
    return np.mean(np.abs(y_true - y_pred) / np.maximum(denom, 5)) * 100

def mape_stable(y_true, y_pred, eps=5):
    denom = np.maximum(np.abs(y_true), eps)
    return np.mean(np.abs(y_true - y_pred) / denom) * 100

from sklearn.metrics import mean_absolute_error, mean_squared_error

mae = mean_absolute_error(actual_prices, tcn_preds)
rmse = np.sqrt(mean_squared_error(actual_prices, tcn_preds))
mape = mape_stable(actual_prices, tcn_preds)
smape_val = smape(actual_prices, tcn_preds)

print("\n TCN – Pseudo-Test Evaluation:")
print(f"MAE:   {mae:.4f}")
print(f"RMSE:  {rmse:.4f}")
print(f"MAPE:  {mape:.4f}")
print(f"SMAPE: {smape_val:.4f}")

# Plot
import matplotlib.pyplot as plt

plt.figure(figsize=(10, 5))
plt.plot(actual_prices, label="Actual")
plt.plot(tcn_preds, label="Predicted", linestyle='--')
plt.title("TCN – Pseudo-Test Predictions")
plt.xlabel("Day (1–14 Aug)")
plt.ylabel("Price")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
# Plot of actual prices vs. FFNN predictions vs. TCN predictions
plt.figure(figsize=(12, 6))
plt.plot(actual_prices, label="Actual", marker='o')
plt.plot(ffnn_preds, label="Feedforward NN", linestyle='--', marker='s')
plt.plot(tcn_preds, label="TCN", linestyle='--', marker='^')

plt.title("Pseudo-Test Predictions (1–14 August 2025)")
plt.xlabel("Day")
plt.ylabel("Energy Price")
plt.xticks(ticks=np.arange(14), labels=np.arange(1, 15))
plt.grid(True)
plt.legend()
plt.tight_layout()
plt.show()