# Bensemble Benchmark on Boston Housing

This notebook assembles a lightweight benchmark for Bayesian neural network baselines on the classic Boston Housing regression task.

We compare four models:

- Deterministic MLP baseline trained with mean squared error.
- Monte Carlo Dropout network.
- Hamiltonian Monte Carlo.
- Probabilistic Backpropagation.

Each model reports RMSE and predictive negative log-likelihood on a held-out test split to illustrate both accuracy and calibrated uncertainty.


In [1]:
import math
import pathlib

import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
SEED = 7
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

print(f"Running on {DEVICE} with seed {SEED}")


Running on cpu with seed 7


## Init dataset and define helper functions

In [2]:
DATA_PATH = pathlib.Path('data/housing.data')
COLUMN_NAMES = [
    'CRIM', 'ZN', 'INDUS', 'CHAS', 'NOX', 'RM', 'AGE', 'DIS',
    'RAD', 'TAX', 'PTRATIO', 'B', 'LSTAT', 'MEDV'
]

df = pd.read_csv(DATA_PATH, sep='\s+', header=None, names=COLUMN_NAMES)
print(df.shape)
df.head()


(506, 14)


  df = pd.read_csv(DATA_PATH, sep='\s+', header=None, names=COLUMN_NAMES)


Unnamed: 0,CRIM,ZN,INDUS,CHAS,NOX,RM,AGE,DIS,RAD,TAX,PTRATIO,B,LSTAT,MEDV
0,0.00632,18.0,2.31,0,0.538,6.575,65.2,4.09,1,296.0,15.3,396.9,4.98,24.0
1,0.02731,0.0,7.07,0,0.469,6.421,78.9,4.9671,2,242.0,17.8,396.9,9.14,21.6
2,0.02729,0.0,7.07,0,0.469,7.185,61.1,4.9671,2,242.0,17.8,392.83,4.03,34.7
3,0.03237,0.0,2.18,0,0.458,6.998,45.8,6.0622,3,222.0,18.7,394.63,2.94,33.4
4,0.06905,0.0,2.18,0,0.458,7.147,54.2,6.0622,3,222.0,18.7,396.9,5.33,36.2


In [3]:
TARGET_COL = 'MEDV'
TEST_SIZE = 0.2

X = df.drop(columns=[TARGET_COL]).values.astype(np.float32)
y = df[TARGET_COL].values.astype(np.float32).reshape(-1, 1)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=TEST_SIZE, random_state=SEED
)

x_scaler = StandardScaler()
y_scaler = StandardScaler()

X_train_scaled = x_scaler.fit_transform(X_train).astype(np.float32)
X_test_scaled = x_scaler.transform(X_test).astype(np.float32)
y_train_scaled = y_scaler.fit_transform(y_train).astype(np.float32)
y_test_scaled = y_scaler.transform(y_test).astype(np.float32)

train_tensor_x = torch.from_numpy(X_train_scaled)
train_tensor_y = torch.from_numpy(y_train_scaled)
test_tensor_x = torch.from_numpy(X_test_scaled)
test_tensor_y = torch.from_numpy(y_test_scaled)

BATCH_SIZE = 64
epochs = 1000
train_dataset = TensorDataset(train_tensor_x, train_tensor_y)
test_dataset = TensorDataset(test_tensor_x, test_tensor_y)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
full_train_loader = DataLoader(train_dataset, batch_size=len(train_dataset), shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=len(test_dataset), shuffle=False)

train_size = len(train_dataset)
y_test_true = y_test.reshape(-1)
y_train_true = y_train.reshape(-1)

Y_SCALE = float(y_scaler.scale_[0])
Y_MEAN = float(y_scaler.mean_[0])

print(f"Train size: {train_size}, Test size: {len(test_dataset)}")


Train size: 404, Test size: 102


In [4]:
def inverse_transform_targets(y_scaled):
    # Return targets in the original scale as a NumPy array.
    y_np = y_scaled.detach().cpu().numpy().reshape(-1, 1)
    return y_scaler.inverse_transform(y_np).reshape(-1)


def summarize_predictions(pred_samples, y_true, noise_variance=None, jitter=1e-6):
    # Aggregate predictive samples into RMSE and negative log-likelihood.
    pred_samples = np.array(pred_samples)
    if pred_samples.ndim == 1:
        pred_samples = pred_samples[None, :]
    mean_pred = pred_samples.mean(axis=0)
    epistemic_var = pred_samples.var(axis=0)
    total_var = epistemic_var + jitter
    if noise_variance is not None:
        total_var = total_var + noise_variance
    rmse = float(np.sqrt(np.mean((mean_pred - y_true) ** 2)))
    nll = 0.5 * np.mean(np.log(2 * np.pi * total_var) + ((y_true - mean_pred) ** 2) / total_var)
    return {"rmse": rmse, "nll": float(nll)}


def print_metrics(name, metrics):
    print(f"{name:>20s} | RMSE: {metrics['rmse']:.4f}, NLL: {metrics['nll']:.4f}")


## Deterministic MLP

In [5]:
class DeterministicMLP(nn.Module):
    def __init__(self, in_features, hidden_dims=(64, 64)):
        super().__init__()
        layers = []
        prev_dim = in_features
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, 1))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)


def train_deterministic(model, data_loader, epochs=1000, lr=1e-3, weight_decay=1e-4):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    loss_fn = nn.MSELoss()
    for epoch in range(epochs):
        epoch_loss = 0.0
        for xb, yb in data_loader:
            xb = xb.to(DEVICE)
            yb = yb.to(DEVICE)
            preds = model(xb)
            loss = loss_fn(preds, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item() * xb.size(0)
        if (epoch + 1) % 100 == 0:
            avg_loss = epoch_loss / train_size
            print(f"Epoch {epoch + 1:03d} | Train MSE: {avg_loss:.4f}")


def evaluate_deterministic(model):
    model.eval()
    with torch.no_grad():
        train_preds_scaled = []
        for xb, _ in full_train_loader:
            xb = xb.to(DEVICE)
            preds = model(xb)
            train_preds_scaled.append(preds.cpu())
        train_preds_scaled = torch.cat(train_preds_scaled, dim=0)
        train_preds = inverse_transform_targets(train_preds_scaled)
        residuals = y_train_true - train_preds
        residual_variance = float(np.var(residuals, ddof=1))

        test_preds_scaled = []
        for xb, _ in test_loader:
            xb = xb.to(DEVICE)
            preds = model(xb)
            test_preds_scaled.append(preds.cpu())
        test_preds_scaled = torch.cat(test_preds_scaled, dim=0)
        test_preds = inverse_transform_targets(test_preds_scaled)

    predictive_samples = np.expand_dims(test_preds, axis=0)
    metrics = summarize_predictions(predictive_samples, y_test_true, noise_variance=residual_variance)
    metrics['noise_variance'] = residual_variance
    return metrics


base_model = DeterministicMLP(in_features=X_train.shape[1]).to(DEVICE)
train_deterministic(base_model, train_loader, epochs=epochs)
mlp_metrics = evaluate_deterministic(base_model)
print_metrics('Deterministic MLP', mlp_metrics)


Epoch 100 | Train MSE: 0.0410
Epoch 200 | Train MSE: 0.0178
Epoch 300 | Train MSE: 0.0105
Epoch 400 | Train MSE: 0.0077
Epoch 500 | Train MSE: 0.0073
Epoch 600 | Train MSE: 0.0051
Epoch 700 | Train MSE: 0.0038
Epoch 800 | Train MSE: 0.0038
Epoch 900 | Train MSE: 0.0024
Epoch 1000 | Train MSE: 0.0031
   Deterministic MLP | RMSE: 4.0317, NLL: 34.1395


## MCDropout

In [6]:
class MCDropoutMLP(nn.Module):
    def __init__(self, in_features, hidden_dims=(64, 64), dropout_p=0.1):
        super().__init__()
        layers = []
        prev_dim = in_features
        for hidden_dim in hidden_dims:
            layers.append(nn.Linear(prev_dim, hidden_dim))
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(p=dropout_p))
            prev_dim = hidden_dim
        layers.append(nn.Linear(prev_dim, 1))
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x)


def train_mc_dropout(model, data_loader, epochs=400, lr=1e-3, weight_decay=1e-4):
    model.train()
    optimizer = torch.optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    loss_fn = nn.MSELoss()
    for epoch in range(epochs):
        epoch_loss = 0.0
        for xb, yb in data_loader:
            xb = xb.to(DEVICE)
            yb = yb.to(DEVICE)
            preds = model(xb)
            loss = loss_fn(preds, yb)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item() * xb.size(0)
        if (epoch + 1) % 100 == 0:
            avg_loss = epoch_loss / train_size
            print(f"Epoch {epoch + 1:03d} | Train MSE: {avg_loss:.4f}")


def mc_dropout_predictions(model, x_tensor, n_samples=200):
    model.train()
    preds = []
    with torch.no_grad():
        for _ in range(n_samples):
            outputs = model(x_tensor.to(DEVICE))
            outputs = inverse_transform_targets(outputs.cpu())
            preds.append(outputs)
    return np.stack(preds, axis=0)


def evaluate_mc_dropout(model, noise_variance):
    test_batch = next(iter(test_loader))[0]
    predictive_samples = mc_dropout_predictions(model, test_batch, n_samples=200)
    metrics = summarize_predictions(predictive_samples, y_test_true, noise_variance=noise_variance)
    return metrics


mc_model = MCDropoutMLP(in_features=X_train.shape[1], dropout_p=0.1).to(DEVICE)
train_mc_dropout(mc_model, train_loader, epochs=epochs)

noise_variance_base = mlp_metrics['noise_variance']
mc_metrics = evaluate_mc_dropout(mc_model, noise_variance=noise_variance_base)
print_metrics('MC Dropout', mc_metrics)


Epoch 100 | Train MSE: 0.0680
Epoch 200 | Train MSE: 0.0523
Epoch 300 | Train MSE: 0.0387
Epoch 400 | Train MSE: 0.0340
Epoch 500 | Train MSE: 0.0328
Epoch 600 | Train MSE: 0.0316
Epoch 700 | Train MSE: 0.0288
Epoch 800 | Train MSE: 0.0230
Epoch 900 | Train MSE: 0.0251
Epoch 1000 | Train MSE: 0.0216
          MC Dropout | RMSE: 3.7852, NLL: 3.5514


## Hamiltonian Monte Carlo

In [7]:
# Hamiltonian Monte Carlo baseline (Bayesian linear regression)
x_train_hmc = train_tensor_x.cpu()
y_train_hmc = train_tensor_y.squeeze(1).cpu()
x_test_hmc = test_tensor_x.cpu()

n_features = x_train_hmc.shape[1]
prior_std = 1.0
noise_prior_std = 0.5

def split_params(theta):
    weight = theta[:n_features]
    bias = theta[n_features]
    log_noise = theta[n_features + 1]
    return weight, bias, log_noise

def log_posterior(theta):
    weight, bias, log_noise = split_params(theta)
    noise_var = torch.exp(log_noise)
    log_prior_w = -0.5 * torch.sum((weight / prior_std) ** 2)
    log_prior_b = -0.5 * (bias / prior_std) ** 2
    log_prior_log_noise = -0.5 * (log_noise / noise_prior_std) ** 2

    preds = x_train_hmc @ weight + bias
    residuals = y_train_hmc - preds
    log_likelihood = -0.5 * (
        x_train_hmc.shape[0] * (math.log(2 * math.pi) + log_noise)
        + torch.sum(residuals ** 2) / torch.exp(log_noise)
    )
    return log_likelihood + log_prior_w + log_prior_b + log_prior_log_noise

def potential_and_grad(theta):
    theta = theta.detach().requires_grad_(True)
    potential = -log_posterior(theta)
    grad, = torch.autograd.grad(potential, theta)
    return potential.detach(), grad.detach()

def run_hmc(initial_theta, num_samples=400, burn_in=200, step_size=0.01, leapfrog_steps=30):
    theta = initial_theta.clone()
    samples = []
    accepts = 0
    total_iters = num_samples + burn_in
    current_U, current_grad = potential_and_grad(theta)
    for i in range(total_iters):
        momentum = torch.randn_like(theta)
        current_theta = theta.clone()
        current_K = 0.5 * torch.dot(momentum, momentum)

        theta_prop = theta.clone()
        momentum_prop = momentum.clone()

        momentum_prop = momentum_prop - 0.5 * step_size * current_grad
        theta_prop = theta_prop + step_size * momentum_prop

        for _ in range(leapfrog_steps - 1):
            U_prop, grad_prop = potential_and_grad(theta_prop)
            momentum_prop = momentum_prop - step_size * grad_prop
            theta_prop = theta_prop + step_size * momentum_prop

        U_prop, grad_prop = potential_and_grad(theta_prop)
        momentum_prop = momentum_prop - 0.5 * step_size * grad_prop

        proposal_K = 0.5 * torch.dot(momentum_prop, momentum_prop)
        acceptance_log_prob = (current_U + current_K) - (U_prop + proposal_K)
        acceptance_log_prob_value = float(acceptance_log_prob)
        if acceptance_log_prob_value >= 0 or math.log(np.random.rand()) < acceptance_log_prob_value:
            theta = theta_prop.detach()
            current_U = U_prop
            current_grad = grad_prop
            accepts += 1
        else:
            theta = current_theta
            current_U, current_grad = potential_and_grad(theta)

        if i >= burn_in:
            samples.append(theta.detach().cpu().numpy())

    accept_rate = accepts / total_iters
    return np.stack(samples), accept_rate

initial_theta = torch.zeros(n_features + 2)
hmc_samples, hmc_accept = run_hmc(initial_theta, num_samples=epochs)
print(f"HMC accept rate: {hmc_accept:.3f}")

hmc_preds = []
noise_vars = []
for sample in hmc_samples:
    weight = torch.from_numpy(sample[:n_features]).float()
    bias = float(sample[n_features])
    log_noise = float(sample[n_features + 1])
    preds_scaled = x_test_hmc @ weight + bias
    preds = inverse_transform_targets(preds_scaled.unsqueeze(1))
    hmc_preds.append(preds)
    noise_vars.append((Y_SCALE ** 2) * math.exp(log_noise))

hmc_preds = np.array(hmc_preds)
hmc_noise = float(np.mean(noise_vars))
hmc_metrics = summarize_predictions(hmc_preds, y_test_true, noise_variance=hmc_noise)
print_metrics('HMC Linear', hmc_metrics)



HMC accept rate: 0.937
          HMC Linear | RMSE: 5.8315, NLL: 3.2159


## Probabilistic Backpropagation

In [20]:
def phi(x) -> torch.Tensor:
    return torch.exp(-0.5 * x * x) / math.sqrt(2.0 * math.pi)

def Phi(x) -> torch.Tensor:
    return 0.5 * (1.0 + torch.erf(x / math.sqrt(2.0)))

def relu_moments(m, v, eps: float = 1e-9):
    v = torch.clamp(v, min=eps)
    sigma = torch.sqrt(v)
    alpha = torch.clamp(m / sigma, min=-10.0, max=10.0)
    pdf = phi(alpha)
    cdf = Phi(alpha)
    mean = sigma * pdf + m * cdf
    second_moment = (v + m * m) * cdf + m * sigma * pdf
    var = torch.clamp(second_moment - mean * mean, min=eps)
    return mean, var

class ProbLinear(nn.Module):
    def __init__(self, in_features: int, out_features: int):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        d = self.in_features + 1
        h = self.out_features
        scale = 1.0 / math.sqrt(d)
        self.m = nn.Parameter(scale * torch.randn(h, d, dtype=torch.float64, device=DEVICE))
        self.v = nn.Parameter(0.5 * torch.ones(h, d, dtype=torch.float64, device=DEVICE))

class PBPNet(nn.Module):
    def __init__(self, layer_sizes):
        super().__init__()
        self.layers = nn.ModuleList([ProbLinear(layer_sizes[i], layer_sizes[i + 1]) for i in range(len(layer_sizes) - 1)])

    def forward_moments(self, x):
        batch = x.shape[0]
        mz = torch.cat([x, torch.ones(batch, 1, dtype=torch.float64, device=DEVICE)], dim=1)
        vz = torch.zeros_like(mz)
        for li, layer in enumerate(self.layers):
            d = layer.in_features + 1
            scale = 1.0 / math.sqrt(d)
            ma = (mz @ layer.m.t()) * scale
            term1 = (vz @ (layer.m ** 2).t())
            term2 = ((mz ** 2) @ layer.v.t())
            term3 = (vz @ layer.v.t())
            va = (term1 + term2 + term3) * (scale ** 2)
            is_last = li == len(self.layers) - 1
            if not is_last:
                mb, vb = relu_moments(ma, va)
                mz = torch.cat([mb, torch.ones(batch, 1, dtype=torch.float64, device=DEVICE)], dim=1)
                vz = torch.cat([vb, torch.zeros(batch, 1, dtype=torch.float64, device=DEVICE)], dim=1)
            else:
                mz = ma
                vz = va
        return mz, vz

def logZ_gaussian_likelihood(y, mz, vz, alpha_g, beta_g, eps = 1e-9):
    alpha_g = torch.clamp(alpha_g, min=1.0 + 1e-6)
    sigma2_eff = torch.clamp(beta_g / (alpha_g - 1.0) + vz, min=eps)
    return (-0.5 * ((y - mz) ** 2) / sigma2_eff - 0.5 * torch.log(2.0 * math.pi * sigma2_eff)).squeeze(-1)

def gamma_adf_update_from_Z(logZ, logZ1, logZ2, alpha_old, beta_old, clamp_eps = 1e-9) :
    r1 = torch.exp(logZ1 - logZ)
    r2 = torch.exp(logZ2 - logZ)
    term_alpha = (r2 / (r1 * r1)) * ((alpha_old + 1.0) / alpha_old)
    denom_alpha = torch.clamp(term_alpha - 1.0, min=1e-12)
    alpha_new = torch.clamp(1.0 / denom_alpha, min=1.0 + 1e-6, max=1e6)
    termA = r2 / r1 * ((alpha_old + 1.0) / beta_old)
    termB = r1 * (alpha_old / beta_old)
    denom_beta = torch.clamp(termA - termB, min=clamp_eps)
    beta_new = torch.clamp(1.0 / denom_beta, min=clamp_eps, max=1e9)
    return alpha_new.detach(), beta_new.detach()

def single_datapoint_adf_step(net, x, y, alpha_g, beta_g, step_clip = 1.5):
    mz, vz = net.forward_moments(x.unsqueeze(0))
    logZ = logZ_gaussian_likelihood(y.unsqueeze(0).unsqueeze(0), mz, vz, alpha_g, beta_g).mean()
    logZ1 = logZ_gaussian_likelihood(y.unsqueeze(0).unsqueeze(0), mz, vz, alpha_g + 1.0, beta_g).mean()
    logZ2 = logZ_gaussian_likelihood(y.unsqueeze(0).unsqueeze(0), mz, vz, alpha_g + 2.0, beta_g).mean()
    for p in net.parameters():
        if p.grad is not None:
            p.grad.zero_()
    logZ.backward()
    for layer in net.layers:
        gm = layer.m.grad
        gv = layer.v.grad
        if step_clip is not None:
            gm = torch.clamp(gm, min=-step_clip, max=step_clip)
            gv = torch.clamp(gv, min=-step_clip, max=step_clip)
        m = layer.m
        v = torch.clamp(layer.v, min=1e-10, max=1e3)
        m_new = m + v * gm
        v_new = v - (v * v) * (gm * gm - 2.0 * gv)
        v_new = torch.clamp(v_new, min=1e-10, max=1e3)
        with torch.no_grad():
            layer.m.copy_(m_new.detach())
            layer.v.copy_(v_new.detach())
        layer.m.grad = None
        layer.v.grad = None
    alpha_g, beta_g = gamma_adf_update_from_Z(logZ.detach(), logZ1.detach(), logZ2.detach(), alpha_g, beta_g)
    return alpha_g, beta_g

def prior_refresh_epoch(net, alpha_l, beta_l, n_refresh = 1):
    alpha_l = torch.clamp(alpha_l, min=1.0 + 1e-6)
    for _ in range(n_refresh):
        s2 = beta_l / (alpha_l - 1.0)
        s2_1 = beta_l / alpha_l
        s2_2 = beta_l / (alpha_l + 1.0)
        Z_acc = 0.0
        Z1_acc = 0.0
        Z2_acc = 0.0
        n_tot = 0
        for layer in net.layers:
            m = layer.m
            v = torch.clamp(layer.v, min=1e-12)
            m_tmp = m.detach().clone().requires_grad_(True)
            v_tmp = v.detach().clone().requires_grad_(True)
            sigma2 = s2 + v_tmp
            lp = -0.5 * (m_tmp ** 2) / sigma2 - 0.5 * torch.log(2.0 * math.pi * sigma2)
            logZ_prior = lp.sum()
            if m_tmp.grad is not None:
                m_tmp.grad.zero_()
            if v_tmp.grad is not None:
                v_tmp.grad.zero_()
            logZ_prior.backward()
            gm = m_tmp.grad
            gv = v_tmp.grad
            m_new = m + v * gm
            v_new = v - (v * v) * (gm * gm - 2.0 * gv)
            v_new = torch.clamp(v_new, min=1e-12, max=1e3)
            with torch.no_grad():
                layer.m.copy_(m_new.detach())
                layer.v.copy_(v_new.detach())
            def sum_logZ_given_s2(s2_local) -> torch.Tensor:
                sigma2_local = s2_local + v
                lp_local = -0.5 * (m ** 2) / sigma2_local - 0.5 * torch.log(2.0 * math.pi * sigma2_local)
                return lp_local.sum()
            Z_acc += sum_logZ_given_s2(s2).detach()
            Z1_acc += sum_logZ_given_s2(s2_1).detach()
            Z2_acc += sum_logZ_given_s2(s2_2).detach()
            n_tot += m.numel()
        logZ_avg = Z_acc / max(n_tot, 1)
        logZ1_avg = Z1_acc / max(n_tot, 1)
        logZ2_avg = Z2_acc / max(n_tot, 1)
        alpha_l, beta_l = gamma_adf_update_from_Z(logZ_avg, logZ1_avg, logZ2_avg, alpha_l, beta_l)
    return alpha_l.detach(), beta_l.detach()

train_x = train_tensor_x.to(DEVICE, dtype=torch.float64)
train_y = train_tensor_y.squeeze(1).to(DEVICE, dtype=torch.float64)
test_x = test_tensor_x.to(DEVICE, dtype=torch.float64)
test_y = test_tensor_y.squeeze(1).to(DEVICE, dtype=torch.float64)

torch.manual_seed(SEED + 1)
pbp_net = PBPNet([train_x.shape[1], 50, 1]).to(DEVICE)
alpha_g = torch.tensor(6.0, dtype=torch.float64, device=DEVICE)
beta_g = torch.tensor(6.0, dtype=torch.float64, device=DEVICE)
alpha_l = torch.tensor(6.0, dtype=torch.float64, device=DEVICE)
beta_l = torch.tensor(6.0, dtype=torch.float64, device=DEVICE)

pbp_epochs = 100
for epoch in range(1, pbp_epochs + 1):
    order = torch.randperm(train_x.shape[0], device=DEVICE)
    for idx in order:
        x_i = train_x[idx]
        y_i = train_y[idx]
        alpha_g, beta_g = single_datapoint_adf_step(pbp_net, x_i, y_i, alpha_g, beta_g, step_clip=2.0)
    alpha_l, beta_l = prior_refresh_epoch(pbp_net, alpha_l, beta_l, n_refresh=1)
    if epoch % 15 == 0 or epoch == 1:
        with torch.no_grad():
            mz, vz = pbp_net.forward_moments(test_x)
            pred_mean_scaled = mz.squeeze(-1).cpu()
            pred_mean = inverse_transform_targets(pred_mean_scaled)
            noise_var_scaled = torch.clamp(beta_g / (alpha_g - 1.0), min=1e-8).cpu().item()
            total_var_scaled = noise_var_scaled + vz.squeeze(-1).cpu().numpy()
            total_var = np.maximum(total_var_scaled * (Y_SCALE ** 2), 1e-9)
            rmse = float(np.sqrt(np.mean((pred_mean - y_test_true) ** 2)))
            nll = 0.5 * np.log(2 * math.pi * total_var) + 0.5 * ((y_test_true - pred_mean) ** 2) / total_var
            print(f"Epoch {epoch:02d} | RMSE: {rmse:.4f} | NLL: {float(np.mean(nll)):.4f}")

with torch.no_grad():
    test_mz, test_vz = pbp_net.forward_moments(test_x)
    pred_mean_scaled = test_mz.squeeze(-1)
    pred_mean = inverse_transform_targets(pred_mean_scaled.cpu())
    epistemic_var_scaled = test_vz.squeeze(-1).cpu().numpy()
    epistemic_var = np.maximum(epistemic_var_scaled * (Y_SCALE ** 2), 1e-9)
    noise_var_scaled = torch.clamp(beta_g / (alpha_g - 1.0), min=1e-8).cpu().item()
    noise_var = float(noise_var_scaled * (Y_SCALE ** 2))
    rng = np.random.default_rng(SEED)
    pbp_samples = rng.normal(loc=pred_mean, scale=np.sqrt(epistemic_var), size=(300, pred_mean.shape[0]))
    pbp_metrics = summarize_predictions(pbp_samples, y_test_true, noise_variance=noise_var)
    print_metrics('Probabilistic Backprop', pbp_metrics)


Epoch 01 | RMSE: 6.3725 | NLL: 3.2123
Epoch 15 | RMSE: 5.4200 | NLL: 3.3409
Epoch 30 | RMSE: 5.3818 | NLL: 3.3993
Epoch 45 | RMSE: 5.3808 | NLL: 3.4260
Epoch 60 | RMSE: 5.3943 | NLL: 3.4439
Epoch 75 | RMSE: 5.4090 | NLL: 3.4558
Epoch 90 | RMSE: 5.4312 | NLL: 3.4685
Probabilistic Backprop | RMSE: 5.4381, NLL: 3.4718


In [21]:
results = pd.DataFrame([
    {"model": "Deterministic MLP", "rmse": mlp_metrics['rmse'], "nll": mlp_metrics['nll']},
    {"model": "MC Dropout", "rmse": mc_metrics['rmse'], "nll": mc_metrics['nll']},
    {"model": "HMC Linear", "rmse": hmc_metrics['rmse'], "nll": hmc_metrics['nll']},
    {"model": "Probabilistic Backprop", "rmse": pbp_metrics['rmse'], "nll": pbp_metrics['nll']},
])
results


Unnamed: 0,model,rmse,nll
0,Deterministic MLP,4.031665,34.139454
1,MC Dropout,3.785248,3.551396
2,HMC Linear,5.83153,3.215919
3,Probabilistic Backprop,5.438124,3.471774
