### GP

In [None]:
!pip install gpytorch

Collecting gpytorch
  Downloading gpytorch-1.14-py3-none-any.whl.metadata (8.0 kB)
Collecting jaxtyping (from gpytorch)
  Downloading jaxtyping-0.3.0-py3-none-any.whl.metadata (7.0 kB)
Collecting linear-operator>=0.6 (from gpytorch)
  Downloading linear_operator-0.6-py3-none-any.whl.metadata (15 kB)
Collecting wadler-lindig>=0.1.3 (from jaxtyping->gpytorch)
  Downloading wadler_lindig-0.1.4-py3-none-any.whl.metadata (17 kB)
Collecting nvidia-cuda-nvrtc-cu12==12.4.127 (from torch>=2.0->linear-operator>=0.6->gpytorch)
  Downloading nvidia_cuda_nvrtc_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-runtime-cu12==12.4.127 (from torch>=2.0->linear-operator>=0.6->gpytorch)
  Downloading nvidia_cuda_runtime_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cuda-cupti-cu12==12.4.127 (from torch>=2.0->linear-operator>=0.6->gpytorch)
  Downloading nvidia_cuda_cupti_cu12-12.4.127-py3-none-manylinux2014_x86_64.whl.metadata (

In [None]:
import numpy as np
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF, ConstantKernel as C
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm

# Create the same dataset as in the notebook
class NDimRoomDataset:
    """
    Dataset for n-dimensional room exploration data.
    Generates synthetic data where temperature readings are drawn from N(1, 0.5)
    when sum of coordinates >= n/2, and 0 otherwise.
    """
    def __init__(self, n_samples: int, n_dim: int = 2, split: str = "train",
                 train_ratio: float = 0.8, random_seed: int = None):
        if random_seed is not None:
            np.random.seed(random_seed)

        # Generate all data
        total_samples = int(n_samples / train_ratio if split == "train" else n_samples / (1 - train_ratio))
        X = np.random.uniform(0, 1, size=(total_samples, n_dim))
        coord_sums = np.sum(X, axis=1)

        # Initialize kappa array (temperature readings)
        kappa = np.zeros(total_samples)
        random_temp_mask = coord_sums >= n_dim/2
        kappa[random_temp_mask] = np.random.normal(1, 0.5, size=np.sum(random_temp_mask))

        # Split data
        split_idx = int(total_samples * train_ratio)
        if split == "train":
            self.X = X[:split_idx]
            self.kappa = kappa[:split_idx]
        else:
            self.X = X[split_idx:]
            self.kappa = kappa[split_idx:]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.kappa[idx]


def test_gp_uncertainty(gp, n_dim=2, n_samples=1000):
    """
    Evaluate GP model's ability to identify aleatoric and epistemic uncertainty

    For GP with RBF-only kernel:
    - Epistemic uncertainty: Predictive variance minus alpha (uncertainty about the function)
    - Aleatoric uncertainty: Alpha parameter (noise level)
    """
    # Generate test points from [0, √1.5] for balanced uncertainty regions
    np.random.seed(42)
    x = np.random.uniform(0, 1.5**(1/n_dim), size=(n_samples, n_dim))

    # Calculate ground truth uncertainty
    coord_sums = np.sum(x, axis=1)
    is_inside_unit = np.all(x <= 1.0, axis=1)

    # True AU exists if sum of entries > n/2 AND point is inside unit cube
    true_au = (coord_sums > n_dim/2).astype(float) * is_inside_unit.astype(float)

    # True EU exists if ANY coordinate > 1.0
    true_eu = (~is_inside_unit).astype(float)

    # Print distribution statistics
    n_no_uncertainty = np.sum((coord_sums <= n_dim/2) & is_inside_unit)
    n_only_au = np.sum((coord_sums > n_dim/2) & is_inside_unit)
    n_eu = np.sum(~is_inside_unit)

    print("\nSample Distribution:")
    print(f"No Uncertainty: {n_no_uncertainty/n_samples:.3f}")
    print(f"Only AU: {n_only_au/n_samples:.3f}")
    print(f"EU: {n_eu/n_samples:.3f}")

    # Process test points in batches to avoid memory issues
    batch_size = 100
    means, variances = [], []

    print("\nProcessing predictions...")
    for i in tqdm(range(0, n_samples, batch_size), desc="Evaluating"):
        batch_x = x[i:i+batch_size]
        batch_mean, batch_std = gp.predict(batch_x, return_std=True)
        means.append(batch_mean)
        # Convert std to variance
        variances.append(batch_std**2)

    # Stack results
    mean_pred = np.concatenate(means)
    var_pred = np.concatenate(variances)

    # Get noise level from alpha parameter
    alpha = gp.alpha
    print(f"Alpha parameter (base noise level): {alpha:.6f}")

    # Uncertainty Computation:
    # 1. Epistemic uncertainty: Predictive variance - alpha
    # The variance already includes the alpha term, so we subtract it
    pred_eu = np.maximum(0, var_pred - alpha)  # Ensure non-negative

    # 2. Aleatoric uncertainty: Use alpha as the base noise level
    # Similar to before, make it non-uniform based on mean prediction
    pred_au = np.ones_like(mean_pred) * alpha

    # Scale aleatoric uncertainty in areas with non-zero predictions inside unit cube
    nonzero_mean = (mean_pred > 0.1) & is_inside_unit
    if np.any(nonzero_mean):
        # Scale noise by predicted mean (since higher predictions have more noise in this dataset)
        pred_au[nonzero_mean] = alpha * (1 + mean_pred[nonzero_mean])

    # Set AU to zero outside unit cube (by definition)
    pred_au[~is_inside_unit] = 0

    # Print samples for comparison
    print("\nSample predictions (first 10):")
    print("True AU:", true_au[:10])
    print("Pred AU:", pred_au[:10])
    print("True EU:", true_eu[:10])
    print("Pred EU:", pred_eu[:10])

    # Compute metrics
    results = {
        'AU_AUPRC': average_precision_score(true_au, pred_au),
        'AU_AUROC': roc_auc_score(true_au, pred_au),
        'EU_AUPRC': average_precision_score(true_eu, pred_eu),
        'EU_AUROC': roc_auc_score(true_eu, pred_eu)
    }

    return results, mean_pred, pred_au, pred_eu


# Main execution
if __name__ == "__main__":
    # Create dataset with the same parameters as the notebook
    n_dim = 2
    print("Generating dataset...")
    dataset = NDimRoomDataset(30000, n_dim=n_dim, split="train", random_seed=42)

    # Use a subset for GP training, as GPs don't scale well with large datasets
    subset_size = 1500
    indices = np.random.choice(len(dataset.X), subset_size, replace=False)
    X_train = dataset.X[indices]
    y_train = dataset.kappa[indices]

    print(f"Training on {subset_size} examples from a total of {len(dataset.X)}")

    # Define kernel - RBF only (no WhiteKernel)
    kernel = C(1.0) * RBF(length_scale=[1.0] * n_dim)

    # The alpha parameter will account for observation noise
    alpha = 0.1  # Initial noise level estimate

    gp = GaussianProcessRegressor(
        kernel=kernel,
        n_restarts_optimizer=5,
        alpha=alpha,  # Noise level parameter
        normalize_y=True,  # Normalize target values
        random_state=42
    )

    # Train the GP model
    print("Training GP model...")
    gp.fit(X_train, y_train)
    print("Model trained. Optimized kernel parameters:")
    print(gp.kernel_)
    print(f"Final alpha value: {gp.alpha:.6f}")

    # Evaluate with the same test parameters as the CDRM model
    print("Evaluating GP uncertainty quantification...")
    results, mean_pred, pred_au, pred_eu = test_gp_uncertainty(gp, n_dim=n_dim, n_samples=1000)

    # Print results
    print("\nGaussian Process (RBF-only) Results:")
    print(f"Aleatoric Uncertainty:")
    print(f"  AUPRC: {results['AU_AUPRC']:.4f}")
    print(f"  AUROC: {results['AU_AUROC']:.4f}")
    print(f"Epistemic Uncertainty:")
    print(f"  AUPRC: {results['EU_AUPRC']:.4f}")
    print(f"  AUROC: {results['EU_AUROC']:.4f}")


### MC Dropout

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm

# Create the same dataset as in the notebook
class NDimRoomDataset:
    """
    Dataset for n-dimensional room exploration data.
    Generates synthetic data where temperature readings are drawn from N(1, 0.5)
    when sum of coordinates >= n/2, and 0 otherwise.
    """
    def __init__(self, n_samples: int, n_dim: int = 2, split: str = "train",
                 train_ratio: float = 0.8, random_seed: int = None):
        if random_seed is not None:
            np.random.seed(random_seed)
            torch.manual_seed(random_seed)

        # Generate all data
        total_samples = int(n_samples / train_ratio if split == "train" else n_samples / (1 - train_ratio))
        X = np.random.uniform(0, 1, size=(total_samples, n_dim))
        coord_sums = np.sum(X, axis=1)

        # Initialize kappa array (temperature readings)
        kappa = np.zeros(total_samples)
        random_temp_mask = coord_sums >= n_dim/2
        kappa[random_temp_mask] = np.random.normal(1, 0.5, size=np.sum(random_temp_mask))

        # Split data
        split_idx = int(total_samples * train_ratio)
        if split == "train":
            self.X = X[:split_idx]
            self.kappa = kappa[:split_idx]
        else:
            self.X = X[split_idx:]
            self.kappa = kappa[split_idx:]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.kappa[idx]


# Define a simple neural network with Monte Carlo Dropout
class MCDropoutNet(nn.Module):
    def __init__(self, input_dim, hidden_dims=[64, 128, 64], dropout_rate=0.2):
        super(MCDropoutNet, self).__init__()

        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(dropout_rate))
            prev_dim = hidden_dim

        # Output layer for mean prediction
        self.layers = nn.Sequential(*layers)
        self.output = nn.Linear(prev_dim, 1)

    def forward(self, x):
        x = self.layers(x)
        return self.output(x)

    def enable_dropout(self):
        # Enable dropout during inference
        for m in self.modules():
            if isinstance(m, nn.Dropout):
                m.train()


def train_mc_dropout_model(train_data, val_data=None, input_dim=2,
                          hidden_dims=[64, 128, 64], dropout_rate=0.2,
                          batch_size=64, epochs=100, learning_rate=0.001,
                          device="cuda" if torch.cuda.is_available() else "cpu"):
    """Train a MC Dropout neural network"""

    # Create data loaders
    X_train, y_train = train_data
    train_dataset = TensorDataset(
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    if val_data is not None:
        X_val, y_val = val_data
        val_dataset = TensorDataset(
            torch.tensor(X_val, dtype=torch.float32),
            torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
        )
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize model
    model = MCDropoutNet(input_dim, hidden_dims, dropout_rate).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    # Training loop
    print(f"Training MC Dropout model on {device}...")
    for epoch in range(epochs):
        model.train()
        train_loss = 0

        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # Validation
        if val_data is not None and (epoch + 1) % 10 == 0:
            model.eval()
            val_loss = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                    outputs = model(batch_X)
                    loss = criterion(outputs, batch_y)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(val_loader)
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
        elif (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}")

    print("Training complete!")
    return model


def test_mc_dropout_uncertainty(model, n_dim=2, n_samples=1000, n_forward_passes=100,
                              device="cuda" if torch.cuda.is_available() else "cpu"):
    """
    Evaluate model's ability to identify aleatoric and epistemic uncertainty
    using Monte Carlo Dropout
    """
    # Generate test points from [0, √1.5] for balanced uncertainty regions
    np.random.seed(42)
    x_np = np.random.uniform(0, 1.5**(1/n_dim), size=(n_samples, n_dim))
    x = torch.tensor(x_np, dtype=torch.float32).to(device)

    # Calculate ground truth uncertainty
    coord_sums = np.sum(x_np, axis=1)
    is_inside_unit = np.all(x_np <= 1.0, axis=1)

    # True AU exists if sum of entries > n/2 AND point is inside unit cube
    true_au = (coord_sums > n_dim/2).astype(float) * is_inside_unit.astype(float)

    # True EU exists if ANY coordinate > 1.0
    true_eu = (~is_inside_unit).astype(float)

    # Print distribution statistics
    n_no_uncertainty = np.sum((coord_sums <= n_dim/2) & is_inside_unit)
    n_only_au = np.sum((coord_sums > n_dim/2) & is_inside_unit)
    n_eu = np.sum(~is_inside_unit)

    print("\nSample Distribution:")
    print(f"No Uncertainty: {n_no_uncertainty/n_samples:.3f}")
    print(f"Only AU: {n_only_au/n_samples:.3f}")
    print(f"EU: {n_eu/n_samples:.3f}")

    # Perform multiple forward passes with dropout enabled
    model.eval()  # Set model to evaluation mode
    model.enable_dropout()  # But enable dropout explicitly

    all_predictions = []

    print("\nPerforming MC Dropout forward passes...")
    batch_size = 100  # Process in batches

    with torch.no_grad():
        for _ in range(n_forward_passes):
            batch_preds = []

            for i in range(0, n_samples, batch_size):
                batch_x = x[i:i+batch_size]
                preds = model(batch_x).cpu().numpy()
                batch_preds.append(preds)

            all_predictions.append(np.vstack(batch_preds))

    # Stack all predictions (shape: [n_forward_passes, n_samples, 1])
    all_predictions = np.stack(all_predictions, axis=0)
    all_predictions = all_predictions.squeeze(axis=2)  # Remove last dimension

    # Calculate mean and variance across the MC samples for each test point
    mean_pred = np.mean(all_predictions, axis=0)
    var_pred = np.var(all_predictions, axis=0)

    # Pure approach using variance of means and mean of variances
    # For each test point, compute local statistics in prediction space

    print("\nComputing pure MC Dropout uncertainty estimates...")

    # Calculate pure epistemic uncertainty (variance of means)
    pred_eu = var_pred

    # Calculate pure aleatoric uncertainty (mean squared error around the mean)
    # This is analogous to the mean of variances in the neighborhood approach
    # Fix: Properly align dimensions for broadcasting
    pred_au = np.mean(np.square(all_predictions - mean_pred[np.newaxis, :]), axis=0)

    # Print samples for comparison
    print("\nSample predictions (first 10):")
    print("True AU:", true_au[:10])
    print("Pred AU:", pred_au[:10])
    print("True EU:", true_eu[:10])
    print("Pred EU:", pred_eu[:10])

    # Compute metrics
    results = {
        'AU_AUPRC': average_precision_score(true_au, pred_au),
        'AU_AUROC': roc_auc_score(true_au, pred_au),
        'EU_AUPRC': average_precision_score(true_eu, pred_eu),
        'EU_AUROC': roc_auc_score(true_eu, pred_eu)
    }

    return results, mean_pred, pred_au, pred_eu


# Main execution
if __name__ == "__main__":
    # Create dataset with the same parameters as the notebook
    n_dim = 2
    print("Generating dataset...")
    dataset = NDimRoomDataset(30000, n_dim=n_dim, split="train", random_seed=42)
    test_dataset = NDimRoomDataset(10000, n_dim=n_dim, split="test", random_seed=42)

    # Use a subset of data for faster training
    subset_size = 5000
    indices = np.random.choice(len(dataset.X), subset_size, replace=False)
    X_train = dataset.X[indices]
    y_train = dataset.kappa[indices]

    # Create a validation set
    val_size = 1000
    val_indices = np.random.choice(len(test_dataset.X), val_size, replace=False)
    X_val = test_dataset.X[val_indices]
    y_val = test_dataset.kappa[val_indices]

    print(f"Training on {subset_size} examples, validating on {val_size} examples")

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Train the model
    model = train_mc_dropout_model(
        train_data=(X_train, y_train),
        val_data=(X_val, y_val),
        input_dim=n_dim,
        hidden_dims=[128, 256, 128],
        dropout_rate=0.2,
        batch_size=64,
        epochs=100,
        learning_rate=0.001,
        device=device
    )

    # Evaluate with the same test parameters as before
    print("Evaluating MC Dropout uncertainty quantification...")
    results, _, _, _ = test_mc_dropout_uncertainty(
        model, n_dim=n_dim, n_samples=1000, n_forward_passes=50, device=device
    )

    # Print results
    print("\nMonte Carlo Dropout Results:")
    print(f"Aleatoric Uncertainty:")
    print(f"  AUPRC: {results['AU_AUPRC']:.4f}")
    print(f"  AUROC: {results['AU_AUROC']:.4f}")
    print(f"Epistemic Uncertainty:")
    print(f"  AUPRC: {results['EU_AUPRC']:.4f}")
    print(f"  AUROC: {results['EU_AUROC']:.4f}")

Generating dataset...
Training on 5000 examples, validating on 1000 examples
Using device: cuda
Training MC Dropout model on cuda...
Epoch 10/100, Train Loss: 0.1471, Val Loss: 0.1400
Epoch 20/100, Train Loss: 0.1431, Val Loss: 0.1294
Epoch 30/100, Train Loss: 0.1384, Val Loss: 0.1271
Epoch 40/100, Train Loss: 0.1359, Val Loss: 0.1256
Epoch 50/100, Train Loss: 0.1344, Val Loss: 0.1331
Epoch 60/100, Train Loss: 0.1326, Val Loss: 0.1256
Epoch 70/100, Train Loss: 0.1347, Val Loss: 0.1232
Epoch 80/100, Train Loss: 0.1355, Val Loss: 0.1241
Epoch 90/100, Train Loss: 0.1323, Val Loss: 0.1242
Epoch 100/100, Train Loss: 0.1316, Val Loss: 0.1245
Training complete!
Evaluating MC Dropout uncertainty quantification...

Sample Distribution:
No Uncertainty: 0.325
Only AU: 0.347
EU: 0.328

Performing MC Dropout forward passes...

Computing pure MC Dropout uncertainty estimates...

Sample predictions (first 10):
True AU: [0. 1. 0. 0. 1. 0. 0. 0. 1. 0.]
Pred AU: [0.00408908 0.00389651 0.00035053 0.00966

### Ensemble

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm

# Create the same dataset as in the notebook
class NDimRoomDataset:
    """
    Dataset for n-dimensional room exploration data.
    Generates synthetic data where temperature readings are drawn from N(1, 0.5)
    when sum of coordinates >= n/2, and 0 otherwise.
    """
    def __init__(self, n_samples: int, n_dim: int = 2, split: str = "train",
                 train_ratio: float = 0.8, random_seed: int = None):
        if random_seed is not None:
            np.random.seed(random_seed)
            torch.manual_seed(random_seed)

        # Generate all data
        total_samples = int(n_samples / train_ratio if split == "train" else n_samples / (1 - train_ratio))
        X = np.random.uniform(0, 1, size=(total_samples, n_dim))
        coord_sums = np.sum(X, axis=1)

        # Initialize kappa array (temperature readings)
        kappa = np.zeros(total_samples)
        random_temp_mask = coord_sums >= n_dim/2
        kappa[random_temp_mask] = np.random.normal(1, 0.5, size=np.sum(random_temp_mask))

        # Split data
        split_idx = int(total_samples * train_ratio)
        if split == "train":
            self.X = X[:split_idx]
            self.kappa = kappa[:split_idx]
        else:
            self.X = X[split_idx:]
            self.kappa = kappa[split_idx:]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.kappa[idx]


# Define a neural network for ensemble
class EnsembleNet(nn.Module):
    def __init__(self, input_dim, hidden_dims=[64, 128, 64]):
        super(EnsembleNet, self).__init__()

        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim

        # Output layer for mean prediction
        self.layers = nn.Sequential(*layers)
        self.output = nn.Linear(prev_dim, 1)

    def forward(self, x):
        x = self.layers(x)
        return self.output(x)


def train_ensemble_model(train_data, val_data=None, input_dim=2,
                         hidden_dims=[64, 128, 64], n_models=5,
                         batch_size=64, epochs=100, learning_rate=0.001,
                         bootstrap=True, device="cuda" if torch.cuda.is_available() else "cpu"):
    """Train an ensemble of neural networks"""

    # Extract data
    X_train, y_train = train_data
    train_size = len(X_train)

    # Create validation loader if provided
    if val_data is not None:
        X_val, y_val = val_data
        val_dataset = TensorDataset(
            torch.tensor(X_val, dtype=torch.float32),
            torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
        )
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize ensemble models
    ensemble = []

    for model_idx in range(n_models):
        print(f"\nTraining model {model_idx+1}/{n_models}:")

        # For bootstrapping, create a new bootstrap sample for each model
        if bootstrap:
            # Sample with replacement
            bootstrap_indices = np.random.choice(train_size, train_size, replace=True)
            X_boot = X_train[bootstrap_indices]
            y_boot = y_train[bootstrap_indices]

            # Create data loader
            boot_dataset = TensorDataset(
                torch.tensor(X_boot, dtype=torch.float32),
                torch.tensor(y_boot, dtype=torch.float32).unsqueeze(1)
            )
            train_loader = DataLoader(boot_dataset, batch_size=batch_size, shuffle=True)
        else:
            # Use the same dataset but with different initialization and optimization path
            train_dataset = TensorDataset(
                torch.tensor(X_train, dtype=torch.float32),
                torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
            )
            train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

        # Initialize model with different random seed
        torch.manual_seed(42 + model_idx)  # Different seed for each model
        model = EnsembleNet(input_dim, hidden_dims).to(device)
        optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
        criterion = nn.MSELoss()

        # Training loop
        for epoch in range(epochs):
            model.train()
            train_loss = 0

            for batch_X, batch_y in train_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)

                optimizer.zero_grad()
                outputs = model(batch_X)
                loss = criterion(outputs, batch_y)
                loss.backward()
                optimizer.step()

                train_loss += loss.item()

            avg_train_loss = train_loss / len(train_loader)

            # Validation
            if val_data is not None and (epoch + 1) % 20 == 0:
                model.eval()
                val_loss = 0

                with torch.no_grad():
                    for batch_X, batch_y in val_loader:
                        batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                        outputs = model(batch_X)
                        loss = criterion(outputs, batch_y)
                        val_loss += loss.item()

                avg_val_loss = val_loss / len(val_loader)
                print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
            elif (epoch + 1) % 20 == 0:
                print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}")

        # Add model to ensemble
        ensemble.append(model)

    print("\nEnsemble training complete!")
    return ensemble


def test_ensemble_uncertainty(ensemble, n_dim=2, n_samples=1000,
                            device="cuda" if torch.cuda.is_available() else "cpu"):
    """
    Evaluate ensemble's ability to identify aleatoric and epistemic uncertainty
    """
    # Generate test points from [0, √1.5] for balanced uncertainty regions
    np.random.seed(42)
    x_np = np.random.uniform(0, 1.5**(1/n_dim), size=(n_samples, n_dim))
    x = torch.tensor(x_np, dtype=torch.float32).to(device)

    # Calculate ground truth uncertainty
    coord_sums = np.sum(x_np, axis=1)
    is_inside_unit = np.all(x_np <= 1.0, axis=1)

    # True AU exists if sum of entries > n/2 AND point is inside unit cube
    true_au = (coord_sums > n_dim/2).astype(float) * is_inside_unit.astype(float)

    # True EU exists if ANY coordinate > 1.0
    true_eu = (~is_inside_unit).astype(float)

    # Print distribution statistics
    n_no_uncertainty = np.sum((coord_sums <= n_dim/2) & is_inside_unit)
    n_only_au = np.sum((coord_sums > n_dim/2) & is_inside_unit)
    n_eu = np.sum(~is_inside_unit)

    print("\nSample Distribution:")
    print(f"No Uncertainty: {n_no_uncertainty/n_samples:.3f}")
    print(f"Only AU: {n_only_au/n_samples:.3f}")
    print(f"EU: {n_eu/n_samples:.3f}")

    # Get predictions from each ensemble member
    all_predictions = []

    print("\nGetting ensemble predictions...")
    batch_size = 100  # Process in batches

    for model in ensemble:
        model.eval()  # Set model to evaluation mode
        model_preds = []

        with torch.no_grad():
            for i in range(0, n_samples, batch_size):
                batch_x = x[i:i+batch_size]
                preds = model(batch_x).cpu().numpy()
                model_preds.append(preds)

        all_predictions.append(np.vstack(model_preds))

    # Stack all predictions (shape: [n_models, n_samples, 1])
    all_predictions = np.stack(all_predictions, axis=0)
    all_predictions = all_predictions.squeeze(axis=2)  # Remove last dimension

    # Calculate mean prediction for each test point across all ensemble members
    mean_pred = np.mean(all_predictions, axis=0)

    # Pure approach for uncertainty decomposition:
    print("\nComputing ensemble uncertainty estimates...")

    # 1. Epistemic uncertainty: variance of predictions across ensemble members
    # This reflects model uncertainty (how much models disagree)
    pred_eu = np.var(all_predictions, axis=0)

    # 2. Aleatoric uncertainty: mean squared error around the ensemble mean
    # This reflects data uncertainty (inherent noise)
    pred_au = np.mean(np.square(all_predictions - mean_pred[np.newaxis, :]), axis=0)

    # Print samples for comparison
    print("\nSample predictions (first 10):")
    print("True AU:", true_au[:10])
    print("Pred AU:", pred_au[:10])
    print("True EU:", true_eu[:10])
    print("Pred EU:", pred_eu[:10])

    # Compute metrics
    results = {
        'AU_AUPRC': average_precision_score(true_au, pred_au),
        'AU_AUROC': roc_auc_score(true_au, pred_au),
        'EU_AUPRC': average_precision_score(true_eu, pred_eu),
        'EU_AUROC': roc_auc_score(true_eu, pred_eu)
    }

    return results, mean_pred, pred_au, pred_eu


# Main execution
if __name__ == "__main__":
    # Create dataset with the same parameters as the notebook
    n_dim = 2
    print("Generating dataset...")
    dataset = NDimRoomDataset(30000, n_dim=n_dim, split="train", random_seed=42)
    test_dataset = NDimRoomDataset(10000, n_dim=n_dim, split="test", random_seed=42)

    # Use a subset of data for faster training
    subset_size = 5000
    indices = np.random.choice(len(dataset.X), subset_size, replace=False)
    X_train = dataset.X[indices]
    y_train = dataset.kappa[indices]

    # Create a validation set
    val_size = 1000
    val_indices = np.random.choice(len(test_dataset.X), val_size, replace=False)
    X_val = test_dataset.X[val_indices]
    y_val = test_dataset.kappa[val_indices]

    print(f"Training ensemble on {subset_size} examples, validating on {val_size} examples")

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Number of models in ensemble
    n_models = 5

    # Train the ensemble
    ensemble = train_ensemble_model(
        train_data=(X_train, y_train),
        val_data=(X_val, y_val),
        input_dim=n_dim,
        hidden_dims=[128, 256, 128],
        n_models=n_models,
        batch_size=64,
        epochs=100,  # Fewer epochs per model
        learning_rate=0.001,
        bootstrap=True,  # Use bootstrapping
        device=device
    )

    # Evaluate with the same test parameters as before
    print("Evaluating ensemble uncertainty quantification...")
    results, _, _, _ = test_ensemble_uncertainty(
        ensemble, n_dim=n_dim, n_samples=1000, device=device
    )

    # Print results
    print("\nEnsemble Method Results:")
    print(f"Aleatoric Uncertainty:")
    print(f"  AUPRC: {results['AU_AUPRC']:.4f}")
    print(f"  AUROC: {results['AU_AUROC']:.4f}")
    print(f"Epistemic Uncertainty:")
    print(f"  AUPRC: {results['EU_AUPRC']:.4f}")
    print(f"  AUROC: {results['EU_AUROC']:.4f}")


Generating dataset...
Training ensemble on 5000 examples, validating on 1000 examples
Using device: cuda

Training model 1/5:
Epoch 20/100, Train Loss: 0.1376, Val Loss: 0.1348
Epoch 40/100, Train Loss: 0.1391, Val Loss: 0.1320
Epoch 60/100, Train Loss: 0.1353, Val Loss: 0.1277
Epoch 80/100, Train Loss: 0.1367, Val Loss: 0.1280
Epoch 100/100, Train Loss: 0.1382, Val Loss: 0.1253

Training model 2/5:
Epoch 20/100, Train Loss: 0.1287, Val Loss: 0.1333
Epoch 40/100, Train Loss: 0.1323, Val Loss: 0.1308
Epoch 60/100, Train Loss: 0.1287, Val Loss: 0.1255
Epoch 80/100, Train Loss: 0.1265, Val Loss: 0.1325
Epoch 100/100, Train Loss: 0.1248, Val Loss: 0.1519

Training model 3/5:
Epoch 20/100, Train Loss: 0.1356, Val Loss: 0.1343
Epoch 40/100, Train Loss: 0.1321, Val Loss: 0.1278
Epoch 60/100, Train Loss: 0.1307, Val Loss: 0.1325
Epoch 80/100, Train Loss: 0.1314, Val Loss: 0.1300
Epoch 100/100, Train Loss: 0.1302, Val Loss: 0.1392

Training model 4/5:
Epoch 20/100, Train Loss: 0.1362, Val Loss:

### Deep evidential regression

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm

# Create the same dataset as in the notebook
class NDimRoomDataset:
    """
    Dataset for n-dimensional room exploration data.
    Generates synthetic data where temperature readings are drawn from N(1, 0.5)
    when sum of coordinates >= n/2, and 0 otherwise.
    """
    def __init__(self, n_samples: int, n_dim: int = 2, split: str = "train",
                 train_ratio: float = 0.8, random_seed: int = None):
        if random_seed is not None:
            np.random.seed(random_seed)
            torch.manual_seed(random_seed)

        # Generate all data
        total_samples = int(n_samples / train_ratio if split == "train" else n_samples / (1 - train_ratio))
        X = np.random.uniform(0, 1, size=(total_samples, n_dim))
        coord_sums = np.sum(X, axis=1)

        # Initialize kappa array (temperature readings)
        kappa = np.zeros(total_samples)
        random_temp_mask = coord_sums >= n_dim/2
        kappa[random_temp_mask] = np.random.normal(1, 0.5, size=np.sum(random_temp_mask))

        # Split data
        split_idx = int(total_samples * train_ratio)
        if split == "train":
            self.X = X[:split_idx]
            self.kappa = kappa[:split_idx]
        else:
            self.X = X[split_idx:]
            self.kappa = kappa[split_idx:]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.kappa[idx]


# Define Deep Evidential Regression Network
class EvidentialNet(nn.Module):
    def __init__(self, input_dim, hidden_dims=[128, 256, 128]):
        super(EvidentialNet, self).__init__()

        # Feature extraction layers
        layers = []
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim

        self.feature_extractor = nn.Sequential(*layers)

        # Output layers for NIG distribution parameters
        # mu: location parameter (mean)
        # v: precision parameter (lambda in the paper)
        # alpha: shape parameter for inverse gamma
        # beta: scale parameter for inverse gamma
        self.mu_layer = nn.Linear(prev_dim, 1)
        self.v_layer = nn.Linear(prev_dim, 1)
        self.alpha_layer = nn.Linear(prev_dim, 1)
        self.beta_layer = nn.Linear(prev_dim, 1)

    def forward(self, x):
        features = self.feature_extractor(x)

        # Mean (μ) can be any real value
        mu = self.mu_layer(features)

        # Precision (v) must be positive
        v = F.softplus(self.v_layer(features)) + 1e-6

        # Alpha (α) must be > 1 for valid NIG and uncertainty calculations
        alpha = F.softplus(self.alpha_layer(features)) + 1.0

        # Beta (β) must be positive
        beta = F.softplus(self.beta_layer(features)) + 1e-6

        return mu, v, alpha, beta


# Evidential Regression Loss
def evidential_loss(y_true, mu, v, alpha, beta, epsilon=1e-6, kl_weight=0.01):
    """
    Calculates the evidential regression loss with proper NLL and KL terms

    Args:
        y_true: Ground truth values
        mu, v, alpha, beta: Parameters of the Normal-Inverse-Gamma distribution
        epsilon: Small constant for numerical stability
        kl_weight: Weight for the KL divergence term
    """
    # Data fit term (NLL)
    twoBlambda = 2 * beta * (1 + v)
    nll = 0.5 * torch.log(np.pi / v) \
        - alpha * torch.log(twoBlambda) \
        + (alpha + 0.5) * torch.log(v * (y_true - mu)**2 + twoBlambda) \
        + torch.lgamma(alpha) \
        - torch.lgamma(alpha + 0.5)

    # Evidence regularization term - penalize "incorrect" uncertainties
    error = y_true - mu
    reg = kl_weight * (2 * v + alpha) * error**2 / (2 * v * (alpha - 1))

    return (nll + reg).mean()


def train_evidential_model(train_data, val_data=None, input_dim=2,
                          hidden_dims=[128, 256, 128],
                          batch_size=64, epochs=150, learning_rate=0.001,
                          kl_weight=0.01, device="cuda" if torch.cuda.is_available() else "cpu"):
    """Train a Deep Evidential Regression model"""

    # Create data loaders
    X_train, y_train = train_data
    train_dataset = TensorDataset(
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    if val_data is not None:
        X_val, y_val = val_data
        val_dataset = TensorDataset(
            torch.tensor(X_val, dtype=torch.float32),
            torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
        )
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize model
    model = EvidentialNet(input_dim, hidden_dims).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    print(f"Training Evidential Network on {device}...")
    for epoch in range(epochs):
        model.train()
        train_loss = 0

        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()
            mu, v, alpha, beta = model(batch_X)
            loss = evidential_loss(batch_y, mu, v, alpha, beta, kl_weight=kl_weight)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # Validation
        if val_data is not None and (epoch + 1) % 10 == 0:
            model.eval()
            val_loss = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                    mu, v, alpha, beta = model(batch_X)
                    loss = evidential_loss(batch_y, mu, v, alpha, beta, kl_weight=kl_weight)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(val_loader)
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
        elif (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}")

    print("Training complete!")
    return model


def test_evidential_uncertainty(model, n_dim=2, n_samples=1000,
                              device="cuda" if torch.cuda.is_available() else "cpu"):
    """
    Evaluate evidential model's ability to identify aleatoric and epistemic uncertainty
    using the correct formulas from the paper
    """
    # Generate test points from [0, √1.5] for balanced uncertainty regions
    np.random.seed(42)
    x_np = np.random.uniform(0, 1.5**(1/n_dim), size=(n_samples, n_dim))
    x = torch.tensor(x_np, dtype=torch.float32).to(device)

    # Calculate ground truth uncertainty
    coord_sums = np.sum(x_np, axis=1)
    is_inside_unit = np.all(x_np <= 1.0, axis=1)

    # True AU exists if sum of entries > n/2 AND point is inside unit cube
    true_au = (coord_sums > n_dim/2).astype(float) * is_inside_unit.astype(float)

    # True EU exists if ANY coordinate > 1.0
    true_eu = (~is_inside_unit).astype(float)

    # Print distribution statistics
    n_no_uncertainty = np.sum((coord_sums <= n_dim/2) & is_inside_unit)
    n_only_au = np.sum((coord_sums > n_dim/2) & is_inside_unit)
    n_eu = np.sum(~is_inside_unit)

    print("\nSample Distribution:")
    print(f"No Uncertainty: {n_no_uncertainty/n_samples:.3f}")
    print(f"Only AU: {n_only_au/n_samples:.3f}")
    print(f"EU: {n_eu/n_samples:.3f}")

    # Get predictions from evidential model
    model.eval()  # Set model to evaluation mode

    # Process in batches
    print("\nComputing evidential uncertainty estimates...")
    batch_size = 100

    all_mu = []
    all_v = []
    all_alpha = []
    all_beta = []

    with torch.no_grad():
        for i in range(0, n_samples, batch_size):
            batch_x = x[i:i+batch_size]
            mu, v, alpha, beta = model(batch_x)

            all_mu.append(mu.cpu().numpy())
            all_v.append(v.cpu().numpy())
            all_alpha.append(alpha.cpu().numpy())
            all_beta.append(beta.cpu().numpy())

    # Concatenate batch results
    mu = np.concatenate(all_mu)
    v = np.concatenate(all_v)
    alpha = np.concatenate(all_alpha)
    beta = np.concatenate(all_beta)

    # Correctly calculate aleatoric and epistemic uncertainty
    # Ensure alpha > 1 for valid calculations
    alpha_valid = np.maximum(alpha, 1.0 + 1e-6)

    # Aleatoric uncertainty: Expected variance of the observation
    # β/(α-1) when α > 1
    pred_au = beta / (alpha_valid - 1.0)
    pred_au = pred_au.squeeze()

    # Epistemic uncertainty: Variance of the mean
    # β/(v*(α-1)) when α > 1
    pred_eu = beta / (v * (alpha_valid - 1.0))
    pred_eu = pred_eu.squeeze()

    # Print samples for comparison
    print("\nSample predictions (first 10):")
    print("True AU:", true_au[:10])
    print("Pred AU:", pred_au[:10])
    print("True EU:", true_eu[:10])
    print("Pred EU:", pred_eu[:10])

    # Compute metrics
    results = {
        'AU_AUPRC': average_precision_score(true_au, pred_au),
        'AU_AUROC': roc_auc_score(true_au, pred_au),
        'EU_AUPRC': average_precision_score(true_eu, pred_eu),
        'EU_AUROC': roc_auc_score(true_eu, pred_eu)
    }

    return results, mu.squeeze(), pred_au, pred_eu


# Main execution
if __name__ == "__main__":
    # Create dataset with the same parameters as the notebook
    n_dim = 2
    print("Generating dataset...")
    dataset = NDimRoomDataset(30000, n_dim=n_dim, split="train", random_seed=42)
    test_dataset = NDimRoomDataset(10000, n_dim=n_dim, split="test", random_seed=42)

    # Use a subset of data for faster training
    subset_size = 5000
    indices = np.random.choice(len(dataset.X), subset_size, replace=False)
    X_train = dataset.X[indices]
    y_train = dataset.kappa[indices]

    # Create a validation set
    val_size = 1000
    val_indices = np.random.choice(len(test_dataset.X), val_size, replace=False)
    X_val = test_dataset.X[val_indices]
    y_val = test_dataset.kappa[val_indices]

    print(f"Training on {subset_size} examples, validating on {val_size} examples")

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Train the model
    model = train_evidential_model(
        train_data=(X_train, y_train),
        val_data=(X_val, y_val),
        input_dim=n_dim,
        hidden_dims=[128, 256, 128],
        batch_size=64,
        epochs=150,
        learning_rate=0.001,
        kl_weight=0.01,  # Evidence regularization weight
        device=device
    )

    # Evaluate with the same test parameters as before
    print("Evaluating evidential uncertainty quantification...")
    results, _, _, _ = test_evidential_uncertainty(
        model, n_dim=n_dim, n_samples=1000, device=device
    )

    # Print results
    print("\nEvidential Regression Results:")
    print(f"Aleatoric Uncertainty:")
    print(f"  AUPRC: {results['AU_AUPRC']:.4f}")
    print(f"  AUROC: {results['AU_AUROC']:.4f}")
    print(f"Epistemic Uncertainty:")
    print(f"  AUPRC: {results['EU_AUPRC']:.4f}")
    print(f"  AUROC: {results['EU_AUROC']:.4f}")

Generating dataset...
Training on 5000 examples, validating on 1000 examples
Using device: cuda
Training Evidential Network on cuda...
Epoch 10/150, Train Loss: -0.9222, Val Loss: -0.8921
Epoch 20/150, Train Loss: -1.1301, Val Loss: -1.5708
Epoch 30/150, Train Loss: -1.3209, Val Loss: -1.4585
Epoch 40/150, Train Loss: -1.1624, Val Loss: -1.1971
Epoch 50/150, Train Loss: -1.4817, Val Loss: -1.9229
Epoch 60/150, Train Loss: -1.4053, Val Loss: -1.6909
Epoch 70/150, Train Loss: -1.5035, Val Loss: -1.7018
Epoch 80/150, Train Loss: -1.2234, Val Loss: -2.1735
Epoch 90/150, Train Loss: -1.7038, Val Loss: -2.3262
Epoch 100/150, Train Loss: -1.6226, Val Loss: -2.3598
Epoch 110/150, Train Loss: -1.5519, Val Loss: -2.1398
Epoch 120/150, Train Loss: -1.7454, Val Loss: -0.2743
Epoch 130/150, Train Loss: -1.4612, Val Loss: -2.3107
Epoch 140/150, Train Loss: -2.2896, Val Loss: -2.5040
Epoch 150/150, Train Loss: -2.2100, Val Loss: -2.5119
Training complete!
Evaluating evidential uncertainty quantificat

### DPS

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm
import copy

# Create the same dataset as in the notebook
class NDimRoomDataset:
    """
    Dataset for n-dimensional room exploration data.
    Generates synthetic data where temperature readings are drawn from N(1, 0.5)
    when sum of coordinates >= n/2, and 0 otherwise.
    """
    def __init__(self, n_samples: int, n_dim: int = 2, split: str = "train",
                 train_ratio: float = 0.8, random_seed: int = None):
        if random_seed is not None:
            np.random.seed(random_seed)
            torch.manual_seed(random_seed)

        # Generate all data
        total_samples = int(n_samples / train_ratio if split == "train" else n_samples / (1 - train_ratio))
        X = np.random.uniform(0, 1, size=(total_samples, n_dim))
        coord_sums = np.sum(X, axis=1)

        # Initialize kappa array (temperature readings)
        kappa = np.zeros(total_samples)
        random_temp_mask = coord_sums >= n_dim/2
        kappa[random_temp_mask] = np.random.normal(1, 0.5, size=np.sum(random_temp_mask))

        # Split data
        split_idx = int(total_samples * train_ratio)
        if split == "train":
            self.X = X[:split_idx]
            self.kappa = kappa[:split_idx]
        else:
            self.X = X[split_idx:]
            self.kappa = kappa[split_idx:]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.kappa[idx]


# Define the base neural network
class DPSNet(nn.Module):
    def __init__(self, input_dim, hidden_dims=[128, 256, 128]):
        super(DPSNet, self).__init__()

        self.layers = nn.ModuleList()
        prev_dim = input_dim

        for hidden_dim in hidden_dims:
            self.layers.append(nn.Linear(prev_dim, hidden_dim))
            prev_dim = hidden_dim

        # Output layer for mean prediction
        self.output = nn.Linear(prev_dim, 1)

    def forward(self, x):
        for layer in self.layers:
            x = F.relu(layer(x))
        return self.output(x)

    def get_params_vector(self):
        """Get all parameters as a single vector"""
        params = []
        for p in self.parameters():
            params.append(p.view(-1))
        return torch.cat(params)

    def set_params_vector(self, param_vector):
        """Set all parameters from a single vector"""
        pointer = 0
        for p in self.parameters():
            num_params = p.numel()
            # Set this parameter from the vector
            p.data = param_vector[pointer:pointer + num_params].view(p.shape)
            pointer += num_params

    def add_noise_to_params(self, std=0.1):
        """Add Gaussian noise to parameters"""
        for p in self.parameters():
            noise = torch.randn_like(p) * std
            p.data.add_(noise)


def train_prior_network(train_data, val_data=None, input_dim=2,
                       hidden_dims=[128, 256, 128],
                       batch_size=64, epochs=100, learning_rate=0.001,
                       device="cuda" if torch.cuda.is_available() else "cpu"):
    """Train a prior network for DPS"""

    # Create data loaders
    X_train, y_train = train_data
    train_dataset = TensorDataset(
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    if val_data is not None:
        X_val, y_val = val_data
        val_dataset = TensorDataset(
            torch.tensor(X_val, dtype=torch.float32),
            torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
        )
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize model
    model = DPSNet(input_dim, hidden_dims).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    # Training loop
    print(f"Training prior network on {device}...")
    for epoch in range(epochs):
        model.train()
        train_loss = 0

        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # Validation
        if val_data is not None and (epoch + 1) % 10 == 0:
            model.eval()
            val_loss = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    batch_X, batch_y = batch_X.to(device), batch_y.to(device)
                    outputs = model(batch_X)
                    loss = criterion(outputs, batch_y)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(val_loader)
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
        elif (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}")

    print("Prior network training complete!")
    return model


def generate_posterior_samples(prior_model, train_data, n_samples=10, input_dim=2,
                              hidden_dims=[128, 256, 128], reg_scale=1.0,
                              batch_size=64, epochs=50, learning_rate=0.001,
                              device="cuda" if torch.cuda.is_available() else "cpu"):
    """Generate posterior samples using DPS approach"""

    # Create data loaders with different bootstrapped samples
    X_train, y_train = train_data
    train_size = len(X_train)

    # Get prior parameters
    prior_params = prior_model.get_params_vector().detach()

    # Initialize list to store posterior networks
    posterior_models = []

    print(f"\nGenerating {n_samples} posterior samples...")
    for i in range(n_samples):
        print(f"\nTraining posterior sample {i+1}/{n_samples}")

        # Create bootstrapped dataset
        bootstrap_indices = np.random.choice(train_size, train_size, replace=True)
        X_boot = X_train[bootstrap_indices]
        y_boot = y_train[bootstrap_indices]

        boot_dataset = TensorDataset(
            torch.tensor(X_boot, dtype=torch.float32),
            torch.tensor(y_boot, dtype=torch.float32).unsqueeze(1)
        )
        boot_loader = DataLoader(boot_dataset, batch_size=batch_size, shuffle=True)

        # Initialize model with random perturbation from prior
        posterior_model = copy.deepcopy(prior_model)
        posterior_model.add_noise_to_params(std=0.1)  # Random perturbation

        # Train with regularization toward the prior
        optimizer = torch.optim.Adam(posterior_model.parameters(), lr=learning_rate)
        criterion = nn.MSELoss()

        for epoch in range(epochs):
            posterior_model.train()
            train_loss = 0

            for batch_X, batch_y in boot_loader:
                batch_X, batch_y = batch_X.to(device), batch_y.to(device)

                optimizer.zero_grad()
                outputs = posterior_model(batch_X)

                # Data loss
                data_loss = criterion(outputs, batch_y)

                # Regularization loss (L2 distance to prior)
                params_vector = posterior_model.get_params_vector()
                reg_loss = F.mse_loss(params_vector, prior_params)

                # Combined loss
                loss = data_loss + reg_scale * reg_loss
                loss.backward()
                optimizer.step()

                train_loss += loss.item()

            if (epoch + 1) % 10 == 0:
                avg_train_loss = train_loss / len(boot_loader)
                print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_train_loss:.4f}")

        posterior_models.append(posterior_model)

    print("Posterior sampling complete!")
    return posterior_models


def test_dps_uncertainty(posterior_models, n_dim=2, n_samples=1000,
                       device="cuda" if torch.cuda.is_available() else "cpu"):
    """
    Evaluate DPS model's ability to identify aleatoric and epistemic uncertainty
    """
    # Generate test points from [0, √1.5] for balanced uncertainty regions
    np.random.seed(42)
    x_np = np.random.uniform(0, 1.5**(1/n_dim), size=(n_samples, n_dim))
    x = torch.tensor(x_np, dtype=torch.float32).to(device)

    # Calculate ground truth uncertainty
    coord_sums = np.sum(x_np, axis=1)
    is_inside_unit = np.all(x_np <= 1.0, axis=1)

    # True AU exists if sum of entries > n/2 AND point is inside unit cube
    true_au = (coord_sums > n_dim/2).astype(float) * is_inside_unit.astype(float)

    # True EU exists if ANY coordinate > 1.0
    true_eu = (~is_inside_unit).astype(float)

    # Print distribution statistics
    n_no_uncertainty = np.sum((coord_sums <= n_dim/2) & is_inside_unit)
    n_only_au = np.sum((coord_sums > n_dim/2) & is_inside_unit)
    n_eu = np.sum(~is_inside_unit)

    print("\nSample Distribution:")
    print(f"No Uncertainty: {n_no_uncertainty/n_samples:.3f}")
    print(f"Only AU: {n_only_au/n_samples:.3f}")
    print(f"EU: {n_eu/n_samples:.3f}")

    # Get predictions from each posterior model
    all_predictions = []

    print("\nGetting posterior predictions...")
    batch_size = 100  # Process in batches

    for model in posterior_models:
        model.eval()
        model_preds = []

        with torch.no_grad():
            for i in range(0, n_samples, batch_size):
                batch_x = x[i:i+batch_size]
                preds = model(batch_x).cpu().numpy()
                model_preds.append(preds)

        all_predictions.append(np.vstack(model_preds))

    # Stack all predictions (shape: [n_posterior_samples, n_samples, 1])
    all_predictions = np.stack(all_predictions, axis=0)
    all_predictions = all_predictions.squeeze(axis=2)  # Remove last dimension

    # Calculate mean prediction for each test point across all posterior samples
    mean_pred = np.mean(all_predictions, axis=0)

    # In DPS:
    # 1. Epistemic uncertainty: variance of predictions across posterior samples
    pred_eu = np.var(all_predictions, axis=0)

    # 2. For aleatoric uncertainty, we need a proxy since DPS doesn't model it directly
    # We'll use residual variance between observations and the posterior mean
    # (this is more of a heuristic since we don't have real observations at test time)

    # Create a proxy by looking at prediction consistency in regions where we expect data noise
    pred_consistency = np.mean(np.abs(all_predictions - mean_pred[np.newaxis, :]), axis=0)
    # Higher values mean less consistency across samples (higher AU)
    pred_au = pred_consistency

    # Print samples for comparison
    print("\nSample predictions (first 10):")
    print("True AU:", true_au[:10])
    print("Pred AU:", pred_au[:10])
    print("True EU:", true_eu[:10])
    print("Pred EU:", pred_eu[:10])

    # Compute metrics
    results = {
        'AU_AUPRC': average_precision_score(true_au, pred_au),
        'AU_AUROC': roc_auc_score(true_au, pred_au),
        'EU_AUPRC': average_precision_score(true_eu, pred_eu),
        'EU_AUROC': roc_auc_score(true_eu, pred_eu)
    }

    return results, mean_pred, pred_au, pred_eu


# Main execution
if __name__ == "__main__":
    # Create dataset with the same parameters as the notebook
    n_dim = 2
    print("Generating dataset...")
    dataset = NDimRoomDataset(30000, n_dim=n_dim, split="train", random_seed=42)
    test_dataset = NDimRoomDataset(10000, n_dim=n_dim, split="test", random_seed=42)

    # Use a subset of data for faster training
    subset_size = 5000
    indices = np.random.choice(len(dataset.X), subset_size, replace=False)
    X_train = dataset.X[indices]
    y_train = dataset.kappa[indices]

    # Create a validation set
    val_size = 1000
    val_indices = np.random.choice(len(test_dataset.X), val_size, replace=False)
    X_val = test_dataset.X[val_indices]
    y_val = test_dataset.kappa[val_indices]

    print(f"Training on {subset_size} examples, validating on {val_size} examples")

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Train the prior network
    prior_model = train_prior_network(
        train_data=(X_train, y_train),
        val_data=(X_val, y_val),
        input_dim=n_dim,
        hidden_dims=[128, 256, 128],
        batch_size=64,
        epochs=100,
        learning_rate=0.001,
        device=device
    )

    # Generate posterior samples
    n_posterior_samples = 10  # Number of posterior samples
    posterior_models = generate_posterior_samples(
        prior_model,
        train_data=(X_train, y_train),
        n_samples=n_posterior_samples,
        input_dim=n_dim,
        hidden_dims=[128, 256, 128],
        reg_scale=1.0,  # Weight of regularization to prior
        batch_size=64,
        epochs=50,  # Fewer epochs for posterior samples
        learning_rate=0.001,
        device=device
    )

    # Evaluate uncertainty quantification
    print("Evaluating DPS uncertainty quantification...")
    results, _, _, _ = test_dps_uncertainty(
        posterior_models, n_dim=n_dim, n_samples=1000, device=device
    )

    # Print results
    print("\nDeep Posterior Sampling Results:")
    print(f"Aleatoric Uncertainty:")
    print(f"  AUPRC: {results['AU_AUPRC']:.4f}")
    print(f"  AUROC: {results['AU_AUROC']:.4f}")
    print(f"Epistemic Uncertainty:")
    print(f"  AUPRC: {results['EU_AUPRC']:.4f}")
    print(f"  AUROC: {results['EU_AUROC']:.4f}")


Generating dataset...
Training on 5000 examples, validating on 1000 examples
Using device: cuda
Training prior network on cuda...
Epoch 10/100, Train Loss: 0.1370, Val Loss: 0.1421
Epoch 20/100, Train Loss: 0.1356, Val Loss: 0.1299
Epoch 30/100, Train Loss: 0.1340, Val Loss: 0.1266
Epoch 40/100, Train Loss: 0.1345, Val Loss: 0.1276
Epoch 50/100, Train Loss: 0.1313, Val Loss: 0.1394
Epoch 60/100, Train Loss: 0.1322, Val Loss: 0.1286
Epoch 70/100, Train Loss: 0.1329, Val Loss: 0.1255
Epoch 80/100, Train Loss: 0.1317, Val Loss: 0.1241
Epoch 90/100, Train Loss: 0.1312, Val Loss: 0.1307
Epoch 100/100, Train Loss: 0.1312, Val Loss: 0.1268
Prior network training complete!

Generating 10 posterior samples...

Training posterior sample 1/10
Epoch 10/50, Loss: 0.1378
Epoch 20/50, Loss: 0.1410
Epoch 30/50, Loss: 0.1396
Epoch 40/50, Loss: 0.1386
Epoch 50/50, Loss: 0.1383

Training posterior sample 2/10
Epoch 10/50, Loss: 0.1318
Epoch 20/50, Loss: 0.1286
Epoch 30/50, Loss: 0.1293
Epoch 40/50, Loss:

### BNN

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader, TensorDataset
from sklearn.metrics import average_precision_score, roc_auc_score
from tqdm import tqdm
import math

# Create the same dataset as in the notebook
class NDimRoomDataset:
    """
    Dataset for n-dimensional room exploration data.
    Generates synthetic data where temperature readings are drawn from N(1, 0.5)
    when sum of coordinates >= n/2, and 0 otherwise.
    """
    def __init__(self, n_samples: int, n_dim: int = 2, split: str = "train",
                 train_ratio: float = 0.8, random_seed: int = None):
        if random_seed is not None:
            np.random.seed(random_seed)
            torch.manual_seed(random_seed)

        # Generate all data
        total_samples = int(n_samples / train_ratio if split == "train" else n_samples / (1 - train_ratio))
        X = np.random.uniform(0, 1, size=(total_samples, n_dim))
        coord_sums = np.sum(X, axis=1)

        # Initialize kappa array (temperature readings)
        kappa = np.zeros(total_samples)
        random_temp_mask = coord_sums >= n_dim/2
        kappa[random_temp_mask] = np.random.normal(1, 0.5, size=np.sum(random_temp_mask))

        # Split data
        split_idx = int(total_samples * train_ratio)
        if split == "train":
            self.X = X[:split_idx]
            self.kappa = kappa[:split_idx]
        else:
            self.X = X[split_idx:]
            self.kappa = kappa[split_idx:]

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.kappa[idx]


# Gaussian distribution for reparameterization trick
class Gaussian(object):
    def __init__(self, mu, rho):
        super().__init__()
        self.mu = mu
        self.rho = rho
        self.normal = torch.distributions.Normal(0, 1)

    @property
    def sigma(self):
        # Convert rho to sigma using softplus for positivity
        return torch.log(1 + torch.exp(self.rho))

    def sample(self):
        # Reparameterization trick
        epsilon = self.normal.sample(self.mu.size()).to(self.mu.device)
        return self.mu + self.sigma * epsilon

    def log_prob(self, input):
        # Log probability under the Gaussian
        return (-math.log(math.sqrt(2 * math.pi))
                - torch.log(self.sigma)
                - ((input - self.mu) ** 2) / (2 * self.sigma ** 2)).sum()


# Bayesian Linear layer
class BayesianLinear(nn.Module):
    def __init__(self, in_features, out_features, prior_sigma_1=1.0, prior_sigma_2=0.002, prior_pi=0.5):
        super().__init__()

        # Weight parameters
        self.in_features = in_features
        self.out_features = out_features

        # Weight mean and rho parameters
        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-0.1, 0.1))
        self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features).uniform_(-5, -4))
        self.weight = Gaussian(self.weight_mu, self.weight_rho)

        # Bias mean and rho parameters
        self.bias_mu = nn.Parameter(torch.Tensor(out_features).uniform_(-0.1, 0.1))
        self.bias_rho = nn.Parameter(torch.Tensor(out_features).uniform_(-5, -4))
        self.bias = Gaussian(self.bias_mu, self.bias_rho)

        # Prior distributions
        self.weight_prior_sigma_1 = prior_sigma_1
        self.weight_prior_sigma_2 = prior_sigma_2
        self.weight_prior_pi = prior_pi

        self.bias_prior_sigma_1 = prior_sigma_1
        self.bias_prior_sigma_2 = prior_sigma_2
        self.bias_prior_pi = prior_pi

        # Initialize log prior and log posterior
        self.log_prior = 0
        self.log_posterior = 0

    def forward(self, x, return_kl=False):
        # Sample weights and biases
        w = self.weight.sample()
        b = self.bias.sample()

        if self.training or return_kl:
            # Calculate log prior
            self.log_prior = self.calculate_log_prior(w, b)

            # Calculate log posterior
            self.log_posterior = self.weight.log_prob(w) + self.bias.log_prob(b)

        # Linear transformation
        output = F.linear(x, w, b)

        if return_kl:
            return output, self.log_prior, self.log_posterior
        return output

    def calculate_log_prior(self, w, b):
        # Scale mixture prior for weights
        sigma_1 = self.weight_prior_sigma_1
        sigma_2 = self.weight_prior_sigma_2
        pi = self.weight_prior_sigma_1

        # Log prob under mixture of Gaussians
        prob_w1 = torch.distributions.Normal(0, sigma_1).log_prob(w).exp()
        prob_w2 = torch.distributions.Normal(0, sigma_2).log_prob(w).exp()
        prob_w = (pi * prob_w1 + (1 - pi) * prob_w2).log()

        # Same for biases
        prob_b1 = torch.distributions.Normal(0, sigma_1).log_prob(b).exp()
        prob_b2 = torch.distributions.Normal(0, sigma_2).log_prob(b).exp()
        prob_b = (pi * prob_b1 + (1 - pi) * prob_b2).log()

        return prob_w.sum() + prob_b.sum()


# Bayesian Neural Network
class BNN(nn.Module):
    def __init__(self, input_dim, hidden_dims=[128, 256, 128], activation=nn.ReLU()):
        super().__init__()

        self.layers = nn.ModuleList()
        prev_dim = input_dim

        # Create Bayesian layers with activations
        for hidden_dim in hidden_dims:
            self.layers.append(BayesianLinear(prev_dim, hidden_dim))
            self.layers.append(activation)
            prev_dim = hidden_dim

        # Output layer
        self.layers.append(BayesianLinear(prev_dim, 1))

        # Noise precision parameter (learnable)
        self.log_noise_var = nn.Parameter(torch.log(torch.tensor(0.01)))

    def forward(self, x, return_kl=False):
        kl_sum = 0

        for i, layer in enumerate(self.layers):
            if isinstance(layer, BayesianLinear):
                if return_kl:
                    x, log_prior, log_posterior = layer(x, return_kl=True)
                    kl_sum += log_posterior - log_prior
                else:
                    x = layer(x)
            else:
                x = layer(x)

        if return_kl:
            return x, kl_sum
        return x

    def sample_predictions(self, x, n_samples=10):
        """Get multiple predictions by sampling from weight distributions"""
        predictions = []

        for _ in range(n_samples):
            predictions.append(self(x).detach())

        return torch.stack(predictions, dim=0)


def bnn_elbo_loss(y_pred, y, kl, n_samples, beta=1.0):
    """
    Evidence Lower Bound (ELBO) loss for BNN

    Args:
        y_pred: Model predictions
        y: Ground truth values
        kl: KL divergence between posterior and prior
        n_samples: Number of training samples (for scaling)
        beta: Weighting for KL term
    """
    # Likelihood term (scaled negative log likelihood)
    likelihood = -0.5 * F.mse_loss(y_pred, y, reduction='sum')

    # Scale KL term by dataset size
    kl_scaled = beta * kl / n_samples

    # ELBO
    elbo = likelihood - kl_scaled

    # Negative ELBO for minimization
    return -elbo


def train_bnn(train_data, val_data=None, input_dim=2,
             hidden_dims=[128, 256, 128],
             batch_size=64, epochs=150, learning_rate=0.001,
             beta=1.0, device="cuda" if torch.cuda.is_available() else "cpu"):
    """Train a Bayesian Neural Network"""

    # Create data loaders
    X_train, y_train = train_data
    train_dataset = TensorDataset(
        torch.tensor(X_train, dtype=torch.float32),
        torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
    )
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    n_samples = len(train_dataset)

    if val_data is not None:
        X_val, y_val = val_data
        val_dataset = TensorDataset(
            torch.tensor(X_val, dtype=torch.float32),
            torch.tensor(y_val, dtype=torch.float32).unsqueeze(1)
        )
        val_loader = DataLoader(val_dataset, batch_size=batch_size)

    # Initialize model
    model = BNN(input_dim, hidden_dims).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    # Training loop
    print(f"Training Bayesian Neural Network on {device}...")
    for epoch in range(epochs):
        model.train()
        train_loss = 0

        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            optimizer.zero_grad()

            # Forward pass with KL
            pred, kl = model(batch_X, return_kl=True)

            # Calculate ELBO loss
            loss = bnn_elbo_loss(pred, batch_y, kl, n_samples, beta=beta)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        avg_train_loss = train_loss / len(train_loader)

        # Validation
        if val_data is not None and (epoch + 1) % 10 == 0:
            model.eval()
            val_loss = 0

            with torch.no_grad():
                for batch_X, batch_y in val_loader:
                    batch_X, batch_y = batch_X.to(device), batch_y.to(device)

                    # Forward pass with KL
                    pred, kl = model(batch_X, return_kl=True)

                    # Calculate ELBO loss
                    loss = bnn_elbo_loss(pred, batch_y, kl, n_samples, beta=beta)
                    val_loss += loss.item()

            avg_val_loss = val_loss / len(val_loader)
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}, Val Loss: {avg_val_loss:.4f}")
        elif (epoch + 1) % 10 == 0:
            print(f"Epoch {epoch+1}/{epochs}, Train Loss: {avg_train_loss:.4f}")

    print("Training complete!")
    return model


def test_bnn_uncertainty(model, n_dim=2, n_samples=1000, n_mc_samples=50,
                       device="cuda" if torch.cuda.is_available() else "cpu"):
    """
    Evaluate BNN's ability to identify aleatoric and epistemic uncertainty
    """
    # Generate test points from [0, √1.5] for balanced uncertainty regions
    np.random.seed(42)
    x_np = np.random.uniform(0, 1.5**(1/n_dim), size=(n_samples, n_dim))
    x = torch.tensor(x_np, dtype=torch.float32).to(device)

    # Calculate ground truth uncertainty
    coord_sums = np.sum(x_np, axis=1)
    is_inside_unit = np.all(x_np <= 1.0, axis=1)

    # True AU exists if sum of entries > n/2 AND point is inside unit cube
    true_au = (coord_sums > n_dim/2).astype(float) * is_inside_unit.astype(float)

    # True EU exists if ANY coordinate > 1.0
    true_eu = (~is_inside_unit).astype(float)

    # Print distribution statistics
    n_no_uncertainty = np.sum((coord_sums <= n_dim/2) & is_inside_unit)
    n_only_au = np.sum((coord_sums > n_dim/2) & is_inside_unit)
    n_eu = np.sum(~is_inside_unit)

    print("\nSample Distribution:")
    print(f"No Uncertainty: {n_no_uncertainty/n_samples:.3f}")
    print(f"Only AU: {n_only_au/n_samples:.3f}")
    print(f"EU: {n_eu/n_samples:.3f}")

    # Monte Carlo sampling for predictions
    print("\nPerforming Monte Carlo sampling from the BNN...")

    # Process in batches
    batch_size = 100
    all_samples = []

    with torch.no_grad():
        for i in range(0, n_samples, batch_size):
            batch_x = x[i:i+batch_size]

            # Get multiple predictions for each input through MC sampling
            batch_samples = model.sample_predictions(batch_x, n_samples=n_mc_samples)
            all_samples.append(batch_samples.cpu().numpy())

    # Concatenate batch results: shape (n_mc_samples, n_samples, 1)
    all_predictions = np.concatenate(all_samples, axis=1)
    all_predictions = all_predictions.squeeze(axis=2)  # Remove last dimension

    # Calculate mean and variance statistics
    mean_pred = np.mean(all_predictions, axis=0)

    # Decompose uncertainty:
    # 1. Total Uncertainty: predictive variance
    total_uncertainty = np.var(all_predictions, axis=0)

    # 2. Epistemic Uncertainty: variance of means
    pred_eu = total_uncertainty.copy()

    # 3. Aleatoric Uncertainty: For Bayesian NNs, this is more complex
    # We use the expected data noise - mean squared error around mean prediction
    pred_au = np.mean(np.square(all_predictions - mean_pred[np.newaxis, :]), axis=0)

    # Print samples for comparison
    print("\nSample predictions (first 10):")
    print("True AU:", true_au[:10])
    print("Pred AU:", pred_au[:10])
    print("True EU:", true_eu[:10])
    print("Pred EU:", pred_eu[:10])

    # Compute metrics
    results = {
        'AU_AUPRC': average_precision_score(true_au, pred_au),
        'AU_AUROC': roc_auc_score(true_au, pred_au),
        'EU_AUPRC': average_precision_score(true_eu, pred_eu),
        'EU_AUROC': roc_auc_score(true_eu, pred_eu)
    }

    return results, mean_pred, pred_au, pred_eu


# Main execution
if __name__ == "__main__":
    # Create dataset with the same parameters as the notebook
    n_dim = 2
    print("Generating dataset...")
    dataset = NDimRoomDataset(30000, n_dim=n_dim, split="train", random_seed=42)
    test_dataset = NDimRoomDataset(10000, n_dim=n_dim, split="test", random_seed=42)

    # Use a subset of data for faster training
    subset_size = 5000
    indices = np.random.choice(len(dataset.X), subset_size, replace=False)
    X_train = dataset.X[indices]
    y_train = dataset.kappa[indices]

    # Create a validation set
    val_size = 1000
    val_indices = np.random.choice(len(test_dataset.X), val_size, replace=False)
    X_val = test_dataset.X[val_indices]
    y_val = test_dataset.kappa[val_indices]

    print(f"Training on {subset_size} examples, validating on {val_size} examples")

    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Train the BNN
    model = train_bnn(
        train_data=(X_train, y_train),
        val_data=(X_val, y_val),
        input_dim=n_dim,
        hidden_dims=[128, 256, 128],
        batch_size=64,
        epochs=150,
        learning_rate=0.001,
        beta=1.0,  # KL weight
        device=device
    )

    # Evaluate uncertainty quantification
    print("Evaluating BNN uncertainty quantification...")
    results, _, _, _ = test_bnn_uncertainty(
        model, n_dim=n_dim, n_samples=1000, n_mc_samples=30, device=device
    )

    # Print results
    print("\nBayesian Neural Network Results:")
    print(f"Aleatoric Uncertainty:")
    print(f"  AUPRC: {results['AU_AUPRC']:.4f}")
    print(f"  AUROC: {results['AU_AUROC']:.4f}")
    print(f"Epistemic Uncertainty:")
    print(f"  AUPRC: {results['EU_AUPRC']:.4f}")
    print(f"  AUROC: {results['EU_AUROC']:.4f}")

Generating dataset...
Training on 5000 examples, validating on 1000 examples
Using device: cuda
Training Bayesian Neural Network on cuda...
Epoch 10/150, Train Loss: 49.7422, Val Loss: 49.3162
Epoch 20/150, Train Loss: 41.1316, Val Loss: 40.5895
Epoch 30/150, Train Loss: 33.2261, Val Loss: 32.9048
Epoch 40/150, Train Loss: 26.2626, Val Loss: 25.9121
Epoch 50/150, Train Loss: 20.6402, Val Loss: 20.6483
Epoch 60/150, Train Loss: 16.7646, Val Loss: 16.5626
Epoch 70/150, Train Loss: 14.0563, Val Loss: 14.0456
Epoch 80/150, Train Loss: 12.2978, Val Loss: 12.1495
Epoch 90/150, Train Loss: 10.9778, Val Loss: 10.8362
Epoch 100/150, Train Loss: 9.8545, Val Loss: 9.8719
Epoch 110/150, Train Loss: 9.0095, Val Loss: 8.8819
Epoch 120/150, Train Loss: 8.4620, Val Loss: 8.4864
Epoch 130/150, Train Loss: 7.9327, Val Loss: 7.6859
Epoch 140/150, Train Loss: 7.3697, Val Loss: 7.3141
Epoch 150/150, Train Loss: 7.0280, Val Loss: 6.9745
Training complete!
Evaluating BNN uncertainty quantification...

Sample