# Advanced Time Series Forecasting with Uncertainty Quantification

This notebook implements a Bayesian LSTM for multi-step time series forecasting with Monte Carlo Dropout for uncertainty quantification.

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import layers, models, losses
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error
import os

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

## 1. Data Generation (Data Factory)

In [None]:
class DataFactory:
    """
    A class to generate synthetic time series data and prepare it for 
    sequence-based models (LSTM/TCN).
    """
    
    def __init__(self, seed=42):
        np.random.seed(seed)
        self.scaler = StandardScaler()

    def generate_synthetic_data(self, n_points=1200, slope=0.01, seasonality_period=50, noise_level=0.1):
        """
        Generates a synthetic time series with:
        - Linear trend
        - Seasonality (Sine wave)
        - Heteroscedastic noise (noise level varies with value)
        """
        time = np.arange(n_points)
        
        # Components
        trend = slope * time
        seasonality = np.sin(2 * np.pi * time / seasonality_period)
        
        # Heteroscedastic noise
        signal = trend + seasonality
        noise_amplitude = noise_level * (1 + 0.5 * np.abs(signal)) 
        noise = np.random.normal(0, noise_amplitude, size=n_points)
        
        data = signal + noise
        
        df = pd.DataFrame({'value': data, 'time': time})
        return df

    def create_sequences(self, data, lookback, horizon, step=1):
        """
        Creates sliding window sequences for input (X) and target (y).
        """
        X, y = [], []
        if len(data.shape) == 1:
            data = data.reshape(-1, 1)
            
        for i in range(0, len(data) - lookback - horizon + 1, step):
            X.append(data[i : i + lookback])
            # Predict point(s) at horizon
            y.append(data[i + lookback : i + lookback + horizon])
            
        return np.array(X), np.array(y)

    def split_data(self, df, train_ratio=0.7, val_ratio=0.15):
        """
        Splits data into train, validation, and test sets respecting temporal order.
        """
        n = len(df)
        train_end = int(n * train_ratio)
        val_end = int(n * (train_ratio + val_ratio))
        
        train_data = df.iloc[:train_end]
        val_data = df.iloc[train_end:val_end]
        test_data = df.iloc[val_end:]
        
        return train_data, val_data, test_data

    def scale_data(self, train, val, test):
        """
        Fits scaler on train, transforms train, val, test.
        """
        train_scaled = self.scaler.fit_transform(train)
        val_scaled = self.scaler.transform(val)
        test_scaled = self.scaler.transform(test)
        
        return train_scaled, val_scaled, test_scaled

## 2. Model Definition (Bayesian LSTM)

In [None]:
class BayesianLSTM(tf.keras.Model):
    """
    LSTM model with MC Dropout for Uncertainty Quantification.
    """
    def __init__(self, units, output_dim, dropout_rate=0.2):
        super(BayesianLSTM, self).__init__()
        self.dropout_rate = dropout_rate
        
        # LSTM Layers
        self.lstm1 = layers.LSTM(units, return_sequences=True)
        self.dropout1 = layers.Dropout(dropout_rate)
        
        self.lstm2 = layers.LSTM(units, return_sequences=True)
        self.dropout2 = layers.Dropout(dropout_rate)
        
        self.lstm3 = layers.LSTM(units)
        self.dropout3 = layers.Dropout(dropout_rate)
        
        self.dense = layers.Dense(output_dim)

    def call(self, inputs, training=None):
        """
        Forward pass.
        Pass 'training' arg to dropouts to enable/disable MC sampling.
        """
        x = self.lstm1(inputs)
        x = self.dropout1(x, training=training)
        
        x = self.lstm2(x)
        x = self.dropout2(x, training=training)
        
        x = self.lstm3(x)
        x = self.dropout3(x, training=training)
        
        return self.dense(x)

def quantile_loss(q, y, f):
    e = (y - f)
    return tf.maximum(q * e, (q - 1) * e)

## 3. Training Logic

In [None]:
class Trainer:
    def __init__(self, model, learning_rate=0.001, loss='mse'):
        self.model = model
        self.loss = loss 
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=learning_rate)
        self.model.compile(optimizer=self.optimizer, loss=self.loss)

    def train(self, X_train, y_train, X_val, y_val, epochs=50, batch_size=32):
        callbacks = [
            EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1),
            ReduceLROnPlateau(monitor='val_loss', factor=0.5, patience=5, min_lr=1e-5, verbose=1)
        ]
        
        history = self.model.fit(
            X_train, y_train,
            validation_data=(X_val, y_val),
            epochs=epochs,
            batch_size=batch_size,
            callbacks=callbacks,
            verbose=1
        )
        return history

## 4. Evaluation & Uncertainty

In [None]:
class UncertaintyEvaluator:
    def __init__(self, model, n_samples=100):
        self.model = model
        self.n_samples = n_samples

    def predict_mc_dropout(self, X):
        """
        Performs Monte Carlo Dropout prediction.
        """
        predictions = []
        for _ in range(self.n_samples):
            # Force training=True to enable dropout during inference
            pred = self.model(X, training=True)
            predictions.append(pred)
            
        predictions = np.array(predictions) # Shape: (n_samples, n_data, output_dim)
        
        mean_pred = np.mean(predictions, axis=0)
        lower_bound = np.percentile(predictions, 5, axis=0)
        upper_bound = np.percentile(predictions, 95, axis=0)
        
        return mean_pred, lower_bound, upper_bound, predictions

    def evaluate_metrics(self, y_true, y_pred, lower, upper):
        """
        Calculates metrics. Handles squeezing dimensions and flattening.
        """
        # Enforce flattening to avoid dimension mismatch (e.g. (N, 1) vs (N,))
        y_true = np.array(y_true).flatten()
        y_pred = np.array(y_pred).flatten()
        lower = np.array(lower).flatten()
        upper = np.array(upper).flatten()

        rmse = np.sqrt(mean_squared_error(y_true, y_pred))
        mae = mean_absolute_error(y_true, y_pred)
        
        # PICP
        captured = ((y_true >= lower) & (y_true <= upper))
        picp = np.mean(captured)
        
        # MPIW
        mpiw = np.mean(upper - lower)
        
        return {
            "RMSE": rmse,
            "MAE": mae,
            "PICP": picp,
            "MPIW": mpiw
        }

    def plot_results(self, time, y_true, y_pred, lower, upper, title="Forecast with Uncertainty"):
        plt.figure(figsize=(12, 6))
        # Flatten inputs for plotting
        plt.plot(time, y_true.flatten(), label='Ground Truth', color='black', alpha=0.7)
        plt.plot(time, y_pred.flatten(), label='Prediction (Mean)', color='blue')
        plt.fill_between(time, lower.flatten(), upper.flatten(), color='blue', alpha=0.2, label='90% Confidence Interval')
        plt.title(title)
        plt.legend()
        plt.grid(True, alpha=0.3)
        plt.show()

## 5. Main Execution

Here we run the pipeline.

In [None]:
# Parameters
SEQ_LEN = 50   
HORIZON = 1    
EPOCHS = 10 # Adjust as needed
BATCH_SIZE = 32
DROPOUT_RATE = 0.2
MC_SAMPLES = 100

# 1. Data
factory = DataFactory()
df = factory.generate_synthetic_data(n_points=1500, slope=0.015, seasonality_period=60, noise_level=0.15)

plt.figure(figsize=(10,4))
plt.plot(df['time'], df['value'])
plt.title("Synthetic Data")
plt.show()

train_df, val_df, test_df = factory.split_data(df)
train_scaled, val_scaled, test_scaled = factory.scale_data(train_df[['value']], val_df[['value']], test_df[['value']])

X_train, y_train = factory.create_sequences(train_scaled, SEQ_LEN, HORIZON)
X_val, y_val = factory.create_sequences(val_scaled, SEQ_LEN, HORIZON)
X_test, y_test = factory.create_sequences(test_scaled, SEQ_LEN, HORIZON)

# 2. Model
model = BayesianLSTM(units=64, output_dim=1, dropout_rate=DROPOUT_RATE)
# Build input shape
model.build(input_shape=(None, SEQ_LEN, 1))
model.summary()

# 3. Train
trainer = Trainer(model, learning_rate=0.001)
history = trainer.train(X_train, y_train, X_val, y_val, epochs=EPOCHS, batch_size=BATCH_SIZE)

plt.plot(history.history['loss'], label='Train')
plt.plot(history.history['val_loss'], label='Val')
plt.title("Loss Curves")
plt.legend()
plt.show()

# 4. Evaluate
evaluator = UncertaintyEvaluator(model, n_samples=MC_SAMPLES)
print("Running MC Dropout inference...")
mean_pred, lower, upper, _ = evaluator.predict_mc_dropout(X_test)

# Inverse transform helper
def inverse(arr):
    arr = np.array(arr)
    # Handle 3D (samples, horizon, features)
    if arr.ndim == 3:
        orig_shape = arr.shape
        arr_2d = arr.reshape(-1, arr.shape[-1])
        inv_2d = factory.scaler.inverse_transform(arr_2d)
        return inv_2d.reshape(orig_shape)
    # Handle 1D (samples,)
    if arr.ndim == 1:
        return factory.scaler.inverse_transform(arr.reshape(-1, 1)).flatten()
    # Handle 2D (samples, features)
    return factory.scaler.inverse_transform(arr)

y_test_inv = inverse(y_test)
mean_pred_inv = inverse(mean_pred)
lower_inv = inverse(lower)
upper_inv = inverse(upper)

metrics = evaluator.evaluate_metrics(y_test_inv, mean_pred_inv, lower_inv, upper_inv)
print("Metrics:", metrics)

# Plot first 200 points
time_axis = np.arange(len(y_test_inv))
plot_len = min(200, len(y_test_inv))

evaluator.plot_results(
    time_axis[:plot_len], 
    y_test_inv[:plot_len], 
    mean_pred_inv[:plot_len], 
    lower_inv[:plot_len], 
    upper_inv[:plot_len]
)