# ⚡ Energy Demand Forecasting SystemHybrid ML approach combining Prophet + LSTM for accurate energy demand predictions.

In [None]:
# Install dependencies!pip install pandas numpy plotly scikit-learn -q

## 1. Configuration

In [None]:
"""Configuration Module for Energy Demand Forecasting SystemCentralizes all configuration parameters."""from pathlib import Path# ============================================================================# PATHS# ============================================================================PROJECT_ROOT = Path(__file__).parentDATA_DIR = PROJECT_ROOT / "data"MODELS_DIR = PROJECT_ROOT / "saved_models"DATA_DIR.mkdir(exist_ok=True)MODELS_DIR.mkdir(exist_ok=True)DATA_FILE = DATA_DIR / "energy_data.csv"# ============================================================================# DATA CONFIGURATION# ============================================================================# Synthetic data parametersDATA_START_DATE = "2022-01-01"DATA_END_DATE = "2023-12-31"FREQUENCY = "H"  # Hourly# FeaturesTARGET_COL = "demand_mw"DATETIME_COL = "timestamp"FEATURE_COLS = ["temperature", "humidity", "hour", "day_of_week", "month", "is_weekend", "is_holiday"]# ============================================================================# MODEL CONFIGURATION# ============================================================================# Train/Test splitTRAIN_RATIO = 0.8VALIDATION_RATIO = 0.1# Forecasting horizonsFORECAST_HORIZONS = {    "24h": 24,    "48h": 48,    "7d": 168}DEFAULT_HORIZON = 24# ============================================================================# PROPHET CONFIGURATION# ============================================================================PROPHET_YEARLY_SEASONALITY = TruePROPHET_WEEKLY_SEASONALITY = TruePROPHET_DAILY_SEASONALITY = TruePROPHET_CHANGEPOINT_PRIOR_SCALE = 0.05# ============================================================================# LSTM CONFIGURATION# ============================================================================LSTM_SEQUENCE_LENGTH = 168  # 7 days of hourly dataLSTM_HIDDEN_SIZE = 64LSTM_NUM_LAYERS = 2LSTM_DROPOUT = 0.2LSTM_EPOCHS = 50LSTM_BATCH_SIZE = 32LSTM_LEARNING_RATE = 0.001# ============================================================================# ENSEMBLE CONFIGURATION# ============================================================================ENSEMBLE_WEIGHTS = {    "prophet": 0.4,    "lstm": 0.6}# ============================================================================# EVALUATION CONFIGURATION# ============================================================================MAPE_TARGET = 5.0  # Target: <5% errorRMSE_ACCEPTABLE_RANGE = 0.1  # Within 10% of mean demand# ============================================================================# ANOMALY DETECTION# ============================================================================ANOMALY_THRESHOLD_SIGMA = 3.0  # 3 standard deviations# ============================================================================# APPLICATION CONFIGURATION# ============================================================================APP_TITLE = "⚡ Energy Demand Forecasting"APP_LAYOUT = "wide"DEBUG_MODE = True# ============================================================================# VISUALIZATION# ============================================================================CHART_HEIGHT = 400PRIMARY_COLOR = "#1f77b4"SECONDARY_COLOR = "#ff7f0e"ANOMALY_COLOR = "#d62728"

## 2. Data Processing

In [None]:
"""Data Processor for Energy Demand ForecastingHandles:1. Synthetic data generation2. Feature engineering3. Train/test splitting4. Data normalization"""import pandas as pdimport numpy as npfrom datetime import datetime, timedeltafrom typing import Tuple, Dict, Listimport loggingfrom config import (    DATA_START_DATE, DATA_END_DATE, FREQUENCY,    TARGET_COL, DATETIME_COL, TRAIN_RATIO, VALIDATION_RATIO,    LSTM_SEQUENCE_LENGTH, DATA_FILE)logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)def generate_synthetic_energy_data(    start_date: str = DATA_START_DATE,    end_date: str = DATA_END_DATE,    save_path: str = None) -> pd.DataFrame:    """    Generate realistic synthetic energy consumption data.        Patterns incorporated:    - Daily seasonality (peak at noon/evening, low at night)    - Weekly seasonality (lower on weekends)    - Yearly seasonality (higher in summer/winter for AC/heating)    - Temperature correlation    - Random noise    - Occasional anomalies    """    np.random.seed(42)        # Generate hourly timestamps    date_range = pd.date_range(start=start_date, end=end_date, freq='H')    n_samples = len(date_range)        logger.info(f"Generating {n_samples} hourly samples from {start_date} to {end_date}")        # Base load (MW)    base_load = 500        # Extract time features    hours = date_range.hour    days = date_range.dayofweek    months = date_range.month        # Generate temperature (correlated with demand)    # Seasonal temperature pattern    temp_seasonal = 15 + 15 * np.sin(2 * np.pi * (date_range.dayofyear - 80) / 365)    # Daily temperature variation    temp_daily = 5 * np.sin(2 * np.pi * (hours - 6) / 24)    # Random noise    temp_noise = np.random.normal(0, 3, n_samples)    temperature = temp_seasonal + temp_daily + temp_noise        # Humidity (inversely correlated with temp somewhat)    humidity = 60 - 0.3 * temperature + np.random.normal(0, 10, n_samples)    humidity = np.clip(humidity, 20, 95)        # Daily pattern (MW variation)    # Peak at 9-11am and 6-9pm, low at 3-5am    daily_pattern = (        100 * np.sin(np.pi * (hours - 6) / 12) * (hours >= 6) * (hours <= 18) +        80 * np.sin(np.pi * (hours - 18) / 6) * (hours >= 18) +        -50 * (hours >= 0) * (hours <= 5)    )        # Weekly pattern (lower on weekends)    weekly_pattern = np.where(days >= 5, -80, 0)  # Sat=5, Sun=6        # Seasonal pattern (higher in summer and winter)    # Peak in July/August (AC) and December/January (heating)    seasonal_pattern = 100 * np.cos(2 * np.pi * (months - 1) / 6)        # Temperature effect on demand    # Demand increases when temp deviates from comfortable range (18-22°C)    temp_effect = 3 * np.abs(temperature - 20)        # Combine all patterns    demand = (        base_load +        daily_pattern +        weekly_pattern +        seasonal_pattern +        temp_effect +        np.random.normal(0, 20, n_samples)  # Random noise    )        # Convert to numpy array explicitly    demand = np.array(demand)        # Add occasional anomalies (2% of data)    anomaly_mask = np.random.random(n_samples) < 0.02    anomaly_multipliers = np.random.choice([0.7, 1.4], size=n_samples)    demand = np.where(anomaly_mask, demand * anomaly_multipliers, demand)        # Ensure positive demand    demand = np.maximum(demand, 100)        # US Federal Holidays (simplified)    us_holidays = [        "2022-01-01", "2022-01-17", "2022-02-21", "2022-05-30", "2022-07-04",        "2022-09-05", "2022-10-10", "2022-11-11", "2022-11-24", "2022-12-25",        "2023-01-01", "2023-01-16", "2023-02-20", "2023-05-29", "2023-07-04",        "2023-09-04", "2023-10-09", "2023-11-10", "2023-11-23", "2023-12-25"    ]    holiday_dates = set(pd.to_datetime(us_holidays).date)    is_holiday = pd.Series(date_range.date).isin(holiday_dates).astype(int).values        # Create DataFrame    df = pd.DataFrame({        DATETIME_COL: date_range,        TARGET_COL: demand.round(2),        'temperature': temperature.round(1),        'humidity': humidity.round(1),        'hour': hours,        'day_of_week': days,        'month': months,        'is_weekend': (days >= 5).astype(int),        'is_holiday': is_holiday,        'is_anomaly': anomaly_mask.astype(int)    })        if save_path:        df.to_csv(save_path, index=False)        logger.info(f"Saved data to {save_path}")        return dfdef load_energy_data(file_path: str = None) -> pd.DataFrame:    """Load energy data from CSV."""    path = file_path or DATA_FILE        if not path.exists():        logger.info("Data file not found, generating synthetic data...")        return generate_synthetic_energy_data(save_path=str(path))        df = pd.read_csv(path, parse_dates=[DATETIME_COL])    logger.info(f"Loaded {len(df)} records from {path}")    return dfdef create_features(df: pd.DataFrame) -> pd.DataFrame:    """    Create additional features for forecasting.        Features:    - Lag features (past demand)    - Rolling statistics    - Time cyclical encoding    """    df = df.copy()        # Lag features    for lag in [1, 24, 168]:  # 1 hour, 1 day, 1 week        df[f'demand_lag_{lag}'] = df[TARGET_COL].shift(lag)        # Rolling statistics    df['demand_rolling_mean_24'] = df[TARGET_COL].rolling(24).mean()    df['demand_rolling_std_24'] = df[TARGET_COL].rolling(24).std()    df['demand_rolling_mean_168'] = df[TARGET_COL].rolling(168).mean()        # Cyclical encoding for hour and month    df['hour_sin'] = np.sin(2 * np.pi * df['hour'] / 24)    df['hour_cos'] = np.cos(2 * np.pi * df['hour'] / 24)    df['month_sin'] = np.sin(2 * np.pi * df['month'] / 12)    df['month_cos'] = np.cos(2 * np.pi * df['month'] / 12)    df['dow_sin'] = np.sin(2 * np.pi * df['day_of_week'] / 7)    df['dow_cos'] = np.cos(2 * np.pi * df['day_of_week'] / 7)        # Temperature deviation from comfortable range    df['temp_deviation'] = np.abs(df['temperature'] - 20)        return dfdef train_test_split_timeseries(    df: pd.DataFrame,    train_ratio: float = TRAIN_RATIO,    val_ratio: float = VALIDATION_RATIO) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:    """    Split time series data chronologically.    """    n = len(df)    train_end = int(n * train_ratio)    val_end = int(n * (train_ratio + val_ratio))        train = df.iloc[:train_end].copy()    val = df.iloc[train_end:val_end].copy()    test = df.iloc[val_end:].copy()        logger.info(f"Split: Train={len(train)}, Val={len(val)}, Test={len(test)}")        return train, val, testdef prepare_sequences(    data: np.ndarray,    target: np.ndarray,    seq_length: int = LSTM_SEQUENCE_LENGTH) -> Tuple[np.ndarray, np.ndarray]:    """    Create sequences for LSTM training.        Args:        data: Feature array (n_samples, n_features)        target: Target array (n_samples,)        seq_length: Length of input sequence            Returns:        X: (n_sequences, seq_length, n_features)        y: (n_sequences,)    """    X, y = [], []        for i in range(seq_length, len(data)):        X.append(data[i-seq_length:i])        y.append(target[i])        return np.array(X), np.array(y)class DataNormalizer:    """Min-Max normalization for time series data."""        def __init__(self):        self.min_vals = None        self.max_vals = None        self.target_min = None        self.target_max = None        def fit(self, X: np.ndarray, y: np.ndarray = None):        self.min_vals = X.min(axis=0)        self.max_vals = X.max(axis=0)                if y is not None:            self.target_min = y.min()            self.target_max = y.max()                return self        def transform(self, X: np.ndarray) -> np.ndarray:        range_vals = self.max_vals - self.min_vals        range_vals[range_vals == 0] = 1  # Avoid division by zero        return (X - self.min_vals) / range_vals        def transform_target(self, y: np.ndarray) -> np.ndarray:        return (y - self.target_min) / (self.target_max - self.target_min)        def inverse_transform_target(self, y: np.ndarray) -> np.ndarray:        return y * (self.target_max - self.target_min) + self.target_minif __name__ == "__main__":    # Generate and save data    df = generate_synthetic_energy_data(save_path=str(DATA_FILE))    print(f"\nGenerated {len(df)} samples")    print(f"\nSample data:\n{df.head()}")    print(f"\nDemand statistics:\n{df[TARGET_COL].describe()}")

## 3. Generate or Load Data

In [None]:
from models.data_processor import load_energy_data, create_featuresdf = load_energy_data()df = create_features(df)df = df.dropna()print(f"Loaded {len(df)} samples")print(df.head())

## 4. Statistical Models

In [None]:
"""Statistical Models for Energy Demand ForecastingImplements:1. Prophet - Facebook's time series forecasting2. SARIMA - Seasonal ARIMA (optional, simpler fallback)"""import pandas as pdimport numpy as npfrom typing import Dict, List, Tuple, Optionalimport loggingimport warningswarnings.filterwarnings('ignore')from config import (    TARGET_COL, DATETIME_COL,    PROPHET_YEARLY_SEASONALITY, PROPHET_WEEKLY_SEASONALITY,    PROPHET_DAILY_SEASONALITY, PROPHET_CHANGEPOINT_PRIOR_SCALE)logger = logging.getLogger(__name__)class ProphetForecaster:    """    Facebook Prophet for time series forecasting.        Strengths:    - Handles multiple seasonalities well    - Robust to missing data    - Incorporates holidays    - Provides uncertainty intervals    """        def __init__(        self,        yearly_seasonality: bool = PROPHET_YEARLY_SEASONALITY,        weekly_seasonality: bool = PROPHET_WEEKLY_SEASONALITY,        daily_seasonality: bool = PROPHET_DAILY_SEASONALITY,        changepoint_prior_scale: float = PROPHET_CHANGEPOINT_PRIOR_SCALE    ):        self.yearly_seasonality = yearly_seasonality        self.weekly_seasonality = weekly_seasonality        self.daily_seasonality = daily_seasonality        self.changepoint_prior_scale = changepoint_prior_scale        self.model = None        self.is_fitted = False        def _prepare_data(self, df: pd.DataFrame) -> pd.DataFrame:        """Convert to Prophet format (ds, y)."""        prophet_df = df[[DATETIME_COL, TARGET_COL]].copy()        prophet_df.columns = ['ds', 'y']        return prophet_df        def fit(        self,         df: pd.DataFrame,        regressors: List[str] = None    ) -> 'ProphetForecaster':        """        Fit Prophet model on training data.                Args:            df: DataFrame with timestamp and target columns            regressors: Optional list of additional regressor columns        """        try:            from prophet import Prophet        except ImportError:            logger.error("Prophet not installed. Install with: pip install prophet")            raise                self.model = Prophet(            yearly_seasonality=self.yearly_seasonality,            weekly_seasonality=self.weekly_seasonality,            daily_seasonality=self.daily_seasonality,            changepoint_prior_scale=self.changepoint_prior_scale        )                # Add regressors if provided        if regressors:            for reg in regressors:                self.model.add_regressor(reg)                prophet_df = self._prepare_data(df)                # Add regressors to dataframe        if regressors:            for reg in regressors:                prophet_df[reg] = df[reg].values                logger.info(f"Fitting Prophet on {len(prophet_df)} samples...")        self.model.fit(prophet_df)        self.is_fitted = True                return self        def predict(        self,         periods: int,        include_history: bool = False,        future_regressors: pd.DataFrame = None    ) -> pd.DataFrame:        """        Generate forecasts.                Args:            periods: Number of periods to forecast            include_history: Include historical predictions            future_regressors: DataFrame with future regressor values                    Returns:            DataFrame with columns: ds, yhat, yhat_lower, yhat_upper        """        if not self.is_fitted:            raise ValueError("Model not fitted. Call fit() first.")                # Create future dataframe        future = self.model.make_future_dataframe(periods=periods, freq='H')                # Add regressors if needed        if future_regressors is not None:            for col in future_regressors.columns:                future[col] = future_regressors[col].values[:len(future)]                forecast = self.model.predict(future)                if not include_history:            forecast = forecast.tail(periods)                return forecast[['ds', 'yhat', 'yhat_lower', 'yhat_upper']]        def get_components(self, forecast: pd.DataFrame = None) -> Dict[str, pd.Series]:        """Extract trend, seasonality components."""        if forecast is None and not self.is_fitted:            raise ValueError("No forecast available")                components = {}                if hasattr(self.model, 'history'):            future = self.model.make_future_dataframe(periods=0, freq='H')            full_forecast = self.model.predict(future)                        components['trend'] = full_forecast['trend']                        if self.yearly_seasonality:                components['yearly'] = full_forecast.get('yearly', pd.Series())            if self.weekly_seasonality:                components['weekly'] = full_forecast.get('weekly', pd.Series())            if self.daily_seasonality:                components['daily'] = full_forecast.get('daily', pd.Series())                return componentsclass SimpleSeasonalModel:    """    Simple seasonal naive model as baseline.        Predicts using same value from previous period:    - Same hour yesterday    - Same hour last week    """        def __init__(self, seasonal_period: int = 168):        """        Args:            seasonal_period: Hours in seasonal cycle (168 = 1 week)        """        self.seasonal_period = seasonal_period        self.history = None        def fit(self, df: pd.DataFrame) -> 'SimpleSeasonalModel':        """Store historical data for naive forecasting."""        self.history = df[[DATETIME_COL, TARGET_COL]].copy()        self.history.set_index(DATETIME_COL, inplace=True)        return self        def predict(self, periods: int) -> np.ndarray:        """Predict by repeating seasonal pattern."""        if self.history is None:            raise ValueError("Model not fitted")                # Use last seasonal_period values and repeat        last_values = self.history[TARGET_COL].values[-self.seasonal_period:]                predictions = []        for i in range(periods):            predictions.append(last_values[i % self.seasonal_period])                return np.array(predictions)class MovingAverageModel:    """Simple moving average for comparison."""        def __init__(self, window: int = 24):        self.window = window        self.last_values = None        def fit(self, df: pd.DataFrame) -> 'MovingAverageModel':        self.last_values = df[TARGET_COL].values[-self.window:]        return self        def predict(self, periods: int) -> np.ndarray:        predictions = []        values = list(self.last_values)                for _ in range(periods):            pred = np.mean(values[-self.window:])            predictions.append(pred)            values.append(pred)                return np.array(predictions)

## 5. Deep Learning Models

In [None]:
"""Deep Learning Models for Energy Demand ForecastingImplements:1. LSTM - Long Short-Term Memory network2. Simple Transformer encoder (lightweight)"""import numpy as npfrom typing import Tuple, Optionalimport logginglogger = logging.getLogger(__name__)# Check for PyTorch availabilitytry:    import torch    import torch.nn as nn    from torch.utils.data import DataLoader, TensorDataset    TORCH_AVAILABLE = Trueexcept ImportError:    TORCH_AVAILABLE = False    logger.warning("PyTorch not available. Deep learning models will use numpy fallback.")from config import (    LSTM_SEQUENCE_LENGTH, LSTM_HIDDEN_SIZE, LSTM_NUM_LAYERS,    LSTM_DROPOUT, LSTM_EPOCHS, LSTM_BATCH_SIZE, LSTM_LEARNING_RATE)if TORCH_AVAILABLE:    class LSTMModel(nn.Module):        """LSTM for time series forecasting."""                def __init__(            self,            input_size: int,            hidden_size: int = LSTM_HIDDEN_SIZE,            num_layers: int = LSTM_NUM_LAYERS,            dropout: float = LSTM_DROPOUT,            output_size: int = 1        ):            super().__init__()                        self.hidden_size = hidden_size            self.num_layers = num_layers                        self.lstm = nn.LSTM(                input_size=input_size,                hidden_size=hidden_size,                num_layers=num_layers,                dropout=dropout if num_layers > 1 else 0,                batch_first=True            )                        self.fc = nn.Sequential(                nn.Linear(hidden_size, hidden_size // 2),                nn.ReLU(),                nn.Dropout(dropout),                nn.Linear(hidden_size // 2, output_size)            )                def forward(self, x: torch.Tensor) -> torch.Tensor:            # x shape: (batch, seq_len, features)            lstm_out, _ = self.lstm(x)            # Take last time step            last_out = lstm_out[:, -1, :]            return self.fc(last_out)class LSTMForecaster:    """    LSTM-based forecaster wrapper.        Handles training, prediction, and sequence preparation.    """        def __init__(        self,        input_size: int = None,        sequence_length: int = LSTM_SEQUENCE_LENGTH,        hidden_size: int = LSTM_HIDDEN_SIZE,        num_layers: int = LSTM_NUM_LAYERS,        epochs: int = LSTM_EPOCHS,        batch_size: int = LSTM_BATCH_SIZE,        learning_rate: float = LSTM_LEARNING_RATE    ):        self.input_size = input_size        self.sequence_length = sequence_length        self.hidden_size = hidden_size        self.num_layers = num_layers        self.epochs = epochs        self.batch_size = batch_size        self.learning_rate = learning_rate                self.model = None        self.is_fitted = False        self.device = 'cpu'                # Store normalization parameters        self.feature_min = None        self.feature_max = None        self.target_min = None        self.target_max = None                # Store last sequence for forecasting        self.last_sequence = None        def _normalize(self, X: np.ndarray, y: np.ndarray = None, fit: bool = False):        """Min-max normalization."""        if fit:            self.feature_min = X.min(axis=0)            self.feature_max = X.max(axis=0)            if y is not None:                self.target_min = y.min()                self.target_max = y.max()                range_val = self.feature_max - self.feature_min        range_val[range_val == 0] = 1        X_norm = (X - self.feature_min) / range_val                if y is not None:            y_norm = (y - self.target_min) / (self.target_max - self.target_min + 1e-8)            return X_norm, y_norm        return X_norm        def _denormalize_target(self, y: np.ndarray) -> np.ndarray:        """Inverse normalization for predictions."""        return y * (self.target_max - self.target_min) + self.target_min        def _create_sequences(        self,         X: np.ndarray,         y: np.ndarray    ) -> Tuple[np.ndarray, np.ndarray]:        """Create input sequences for LSTM."""        sequences, targets = [], []                for i in range(self.sequence_length, len(X)):            sequences.append(X[i-self.sequence_length:i])            targets.append(y[i])                return np.array(sequences), np.array(targets)        def fit(        self,         X: np.ndarray,         y: np.ndarray,        X_val: np.ndarray = None,        y_val: np.ndarray = None,        verbose: bool = True    ) -> 'LSTMForecaster':        """        Train LSTM model.                Args:            X: Features array (n_samples, n_features)            y: Target array (n_samples,)            X_val: Validation features            y_val: Validation targets            verbose: Print training progress        """        if not TORCH_AVAILABLE:            logger.warning("PyTorch not available. Using simple fallback.")            self._fit_fallback(X, y)            return self                # Normalize        X_norm, y_norm = self._normalize(X, y, fit=True)                # Create sequences        X_seq, y_seq = self._create_sequences(X_norm, y_norm)                if len(X_seq) == 0:            raise ValueError(f"Not enough data for sequence length {self.sequence_length}")                self.input_size = X_seq.shape[2]                # Convert to tensors        X_tensor = torch.FloatTensor(X_seq)        y_tensor = torch.FloatTensor(y_seq).unsqueeze(1)                dataset = TensorDataset(X_tensor, y_tensor)        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)                # Initialize model        self.model = LSTMModel(            input_size=self.input_size,            hidden_size=self.hidden_size,            num_layers=self.num_layers        ).to(self.device)                criterion = nn.MSELoss()        optimizer = torch.optim.Adam(self.model.parameters(), lr=self.learning_rate)                # Training loop        self.model.train()        for epoch in range(self.epochs):            total_loss = 0            for X_batch, y_batch in dataloader:                X_batch = X_batch.to(self.device)                y_batch = y_batch.to(self.device)                                optimizer.zero_grad()                predictions = self.model(X_batch)                loss = criterion(predictions, y_batch)                loss.backward()                optimizer.step()                                total_loss += loss.item()                        if verbose and (epoch + 1) % 10 == 0:                avg_loss = total_loss / len(dataloader)                logger.info(f"Epoch {epoch+1}/{self.epochs}, Loss: {avg_loss:.6f}")                # Store last sequence for future predictions        self.last_sequence = X_norm[-self.sequence_length:]        self.is_fitted = True                return self        def _fit_fallback(self, X: np.ndarray, y: np.ndarray):        """Simple fallback when PyTorch unavailable."""        self.feature_min = X.min(axis=0)        self.feature_max = X.max(axis=0)        self.target_min = y.min()        self.target_max = y.max()                # Store last values for naive prediction        self._last_values = y[-self.sequence_length:]        self.is_fitted = True        def predict(self, periods: int = 1, X_future: np.ndarray = None) -> np.ndarray:        """        Generate forecasts.                For multi-step forecasting, uses recursive prediction.        """        if not self.is_fitted:            raise ValueError("Model not fitted. Call fit() first.")                if not TORCH_AVAILABLE:            # Fallback: return rolling mean            return np.full(periods, self._last_values.mean())                self.model.eval()        predictions = []                # Start with last known sequence        current_seq = self.last_sequence.copy()                with torch.no_grad():            for _ in range(periods):                # Prepare input                X_input = torch.FloatTensor(current_seq).unsqueeze(0).to(self.device)                                # Predict                pred = self.model(X_input).cpu().numpy()[0, 0]                predictions.append(pred)                                # Update sequence for next prediction                new_row = current_seq[-1].copy()                new_row[0] = pred  # Assume first feature is target (lag-1)                current_seq = np.vstack([current_seq[1:], new_row])                # Denormalize        predictions = np.array(predictions)        return self._denormalize_target(predictions)        def save(self, path: str):        """Save model to disk."""        if TORCH_AVAILABLE and self.model:            torch.save({                'model_state': self.model.state_dict(),                'config': {                    'input_size': self.input_size,                    'hidden_size': self.hidden_size,                    'num_layers': self.num_layers                },                'normalization': {                    'feature_min': self.feature_min,                    'feature_max': self.feature_max,                    'target_min': self.target_min,                    'target_max': self.target_max                },                'last_sequence': self.last_sequence            }, path)        def load(self, path: str):        """Load model from disk."""        if TORCH_AVAILABLE:            checkpoint = torch.load(path, map_location=self.device)                        self.input_size = checkpoint['config']['input_size']            self.hidden_size = checkpoint['config']['hidden_size']            self.num_layers = checkpoint['config']['num_layers']                        self.model = LSTMModel(                input_size=self.input_size,                hidden_size=self.hidden_size,                num_layers=self.num_layers            )            self.model.load_state_dict(checkpoint['model_state'])                        self.feature_min = checkpoint['normalization']['feature_min']            self.feature_max = checkpoint['normalization']['feature_max']            self.target_min = checkpoint['normalization']['target_min']            self.target_max = checkpoint['normalization']['target_max']            self.last_sequence = checkpoint['last_sequence']                        self.is_fitted = True

## 6. Ensemble Model

In [None]:
"""Ensemble Model for Energy Demand ForecastingCombines predictions from multiple models with weighted averaging."""import numpy as npimport pandas as pdfrom typing import Dict, List, Optionalimport loggingfrom config import ENSEMBLE_WEIGHTS, DATETIME_COL, TARGET_COLlogger = logging.getLogger(__name__)class EnsembleForecaster:    """    Ensemble forecaster that combines multiple models.        Supports:    - Weighted averaging    - Automatic weight optimization    - Confidence intervals    """        def __init__(        self,        weights: Dict[str, float] = None    ):        self.weights = weights or ENSEMBLE_WEIGHTS.copy()        self.models = {}        self.is_fitted = False        def add_model(self, name: str, model: object, weight: float = None):        """Add a model to the ensemble."""        self.models[name] = model        if weight is not None:            self.weights[name] = weight                # Normalize weights        self._normalize_weights()        def _normalize_weights(self):        """Ensure weights sum to 1."""        total = sum(self.weights.get(name, 0) for name in self.models.keys())        if total > 0:            for name in self.models.keys():                if name in self.weights:                    self.weights[name] /= total        def fit(        self,        df: pd.DataFrame,        X: np.ndarray = None,        y: np.ndarray = None,        X_val: np.ndarray = None,        y_val: np.ndarray = None    ) -> 'EnsembleForecaster':        """        Fit all models in the ensemble.                Args:            df: DataFrame for Prophet-style models            X, y: Arrays for LSTM-style models            X_val, y_val: Validation data for weight optimization        """        for name, model in self.models.items():            logger.info(f"Fitting {name}...")                        if hasattr(model, 'fit'):                # Check model type                if 'Prophet' in type(model).__name__ or 'Seasonal' in type(model).__name__:                    model.fit(df)                elif X is not None and y is not None:                    model.fit(X, y)                self.is_fitted = True                # Optimize weights if validation data provided        if X_val is not None and y_val is not None:            self._optimize_weights(X_val, y_val)                return self        def _optimize_weights(        self,         X_val: np.ndarray,         y_val: np.ndarray,        metric: str = 'mape'    ):        """Optimize weights based on validation performance."""        errors = {}                for name, model in self.models.items():            try:                if hasattr(model, 'predict'):                    pred = model.predict(len(y_val))                    if len(pred) == len(y_val):                        mape = np.mean(np.abs((y_val - pred) / (y_val + 1e-8))) * 100                        errors[name] = mape            except Exception as e:                logger.warning(f"Could not evaluate {name}: {e}")                if errors:            # Inverse error weighting (lower error = higher weight)            total_inv_error = sum(1.0 / (e + 1e-8) for e in errors.values())            for name in errors:                self.weights[name] = (1.0 / (errors[name] + 1e-8)) / total_inv_error                        logger.info(f"Optimized weights: {self.weights}")        def predict(        self,        periods: int,        return_individual: bool = False    ) -> Dict[str, np.ndarray]:        """        Generate ensemble predictions.                Returns:            Dictionary with 'ensemble', 'lower', 'upper' predictions            Optionally includes individual model predictions        """        if not self.is_fitted:            raise ValueError("Ensemble not fitted. Call fit() first.")                predictions = {}        all_preds = []                for name, model in self.models.items():            try:                pred = model.predict(periods)                                # Handle Prophet-style output                if isinstance(pred, pd.DataFrame):                    pred_values = pred['yhat'].values                else:                    pred_values = pred                                predictions[name] = pred_values                all_preds.append(pred_values * self.weights.get(name, 0))                            except Exception as e:                logger.warning(f"Prediction failed for {name}: {e}")                if not all_preds:            raise ValueError("No predictions generated")                # Weighted ensemble        ensemble_pred = np.sum(all_preds, axis=0)        predictions['ensemble'] = ensemble_pred                # Confidence intervals (based on model disagreement)        if len(all_preds) > 1:            std = np.std([p / max(self.weights.values()) for p in all_preds], axis=0)            predictions['lower'] = ensemble_pred - 1.96 * std            predictions['upper'] = ensemble_pred + 1.96 * std        else:            predictions['lower'] = ensemble_pred * 0.9            predictions['upper'] = ensemble_pred * 1.1                if not return_individual:            return {                'ensemble': predictions['ensemble'],                'lower': predictions['lower'],                'upper': predictions['upper']            }                return predictions        def get_model_contributions(self) -> Dict[str, float]:        """Get contribution percentage of each model."""        return {name: weight * 100 for name, weight in self.weights.items() if name in self.models}

## 7. Evaluation Metrics

In [None]:
"""Evaluation Metrics for Time Series ForecastingStandard forecasting metrics:- MAPE: Mean Absolute Percentage Error- RMSE: Root Mean Square Error- MAE: Mean Absolute Error- SMAPE: Symmetric MAPE"""import numpy as npfrom typing import Dictimport logginglogger = logging.getLogger(__name__)def mape(actual: np.ndarray, predicted: np.ndarray) -> float:    """    Mean Absolute Percentage Error.        MAPE = mean(|actual - predicted| / |actual|) * 100    """    actual = np.array(actual)    predicted = np.array(predicted)        # Avoid division by zero    mask = actual != 0    return np.mean(np.abs((actual[mask] - predicted[mask]) / actual[mask])) * 100def smape(actual: np.ndarray, predicted: np.ndarray) -> float:    """    Symmetric Mean Absolute Percentage Error.        More robust when actual values are close to zero.    """    actual = np.array(actual)    predicted = np.array(predicted)        denominator = (np.abs(actual) + np.abs(predicted)) / 2    mask = denominator != 0        return np.mean(np.abs(actual[mask] - predicted[mask]) / denominator[mask]) * 100def rmse(actual: np.ndarray, predicted: np.ndarray) -> float:    """Root Mean Square Error."""    actual = np.array(actual)    predicted = np.array(predicted)    return np.sqrt(np.mean((actual - predicted) ** 2))def mae(actual: np.ndarray, predicted: np.ndarray) -> float:    """Mean Absolute Error."""    return np.mean(np.abs(np.array(actual) - np.array(predicted)))def mase(actual: np.ndarray, predicted: np.ndarray, seasonal_period: int = 24) -> float:    """    Mean Absolute Scaled Error.        Compares forecast error to naive seasonal forecast.    """    actual = np.array(actual)    predicted = np.array(predicted)        # Naive seasonal forecast error    naive_errors = np.abs(actual[seasonal_period:] - actual[:-seasonal_period])    naive_mae = np.mean(naive_errors)        if naive_mae == 0:        return np.inf        forecast_mae = np.mean(np.abs(actual - predicted))    return forecast_mae / naive_maedef coverage(    actual: np.ndarray,     lower: np.ndarray,     upper: np.ndarray) -> float:    """    Prediction interval coverage.        Percentage of actual values within [lower, upper] bounds.    """    actual = np.array(actual)    lower = np.array(lower)    upper = np.array(upper)        within_bounds = (actual >= lower) & (actual <= upper)    return np.mean(within_bounds) * 100def evaluate_forecast(    actual: np.ndarray,    predicted: np.ndarray,    lower: np.ndarray = None,    upper: np.ndarray = None) -> Dict[str, float]:    """    Calculate all metrics for a forecast.        Returns:        Dictionary of metric names to values    """    metrics = {        'MAPE': mape(actual, predicted),        'SMAPE': smape(actual, predicted),        'RMSE': rmse(actual, predicted),        'MAE': mae(actual, predicted),        'MASE': mase(actual, predicted)    }        if lower is not None and upper is not None:        metrics['Coverage'] = coverage(actual, lower, upper)        return metricsclass ForecastEvaluator:    """Evaluate and compare multiple forecasting models."""        def __init__(self):        self.results = {}        def evaluate(        self,        name: str,        actual: np.ndarray,        predicted: np.ndarray,        lower: np.ndarray = None,        upper: np.ndarray = None    ) -> Dict[str, float]:        """Evaluate a single forecast."""        metrics = evaluate_forecast(actual, predicted, lower, upper)        self.results[name] = metrics        return metrics        def compare(self) -> Dict[str, Dict[str, float]]:        """Compare all evaluated models."""        return self.results        def best_model(self, metric: str = 'MAPE') -> str:        """Find best model by specified metric."""        if not self.results:            return None                best = min(self.results.items(), key=lambda x: x[1].get(metric, float('inf')))        return best[0]        def summary(self) -> str:        """Generate summary report."""        if not self.results:            return "No evaluations performed."                lines = ["Forecast Evaluation Summary", "=" * 40]                for name, metrics in self.results.items():            lines.append(f"\n{name}:")            for metric, value in metrics.items():                lines.append(f"  {metric}: {value:.2f}")                best = self.best_model()        lines.append(f"\nBest Model (by MAPE): {best}")                return "\n".join(lines)

## 8. Train and Evaluate

In [None]:
# Train simple seasonal modelfrom models.statistical import SimpleSeasonalModelfrom evaluation.metrics import evaluate_forecast# Split datatrain = df.iloc[:-168]  # All except last weektest = df.iloc[-168:]   # Last week# Trainmodel = SimpleSeasonalModel(seasonal_period=168)model.fit(train)# Predictpredictions = model.predict(168)actual = test['demand_mw'].values# Evaluatemetrics = evaluate_forecast(actual, predictions)print("Evaluation Results:")for k, v in metrics.items():    print(f"  {k}: {v:.2f}")

## 9. Visualize Forecast

In [None]:
import plotly.graph_objects as gofig = go.Figure()fig.add_trace(go.Scatter(x=list(range(len(actual))), y=actual, name='Actual'))fig.add_trace(go.Scatter(x=list(range(len(predictions))), y=predictions, name='Forecast', line=dict(dash='dash')))fig.update_layout(title='7-Day Forecast vs Actual', xaxis_title='Hour', yaxis_title='Demand (MW)')fig.show()