In [200]:
import torch
import pyro
import pyro.distributions as dist
import torch.nn as nn
import torch.optim as optim
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.model_selection import train_test_split
import pandas as pd
import numpy as np
from torch.distributions import constraints


In [201]:
def prepare_tensors(X, y, test_size=0.3, random_state=42):
    """Convert data to PyTorch tensors and perform train-test split."""
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=test_size, random_state=random_state, stratify=y
    )

    # Convert to PyTorch tensors
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32)
    X_test_tensor = torch.tensor(X_test, dtype=torch.float32)
    y_train_tensor = torch.tensor(y_train, dtype=torch.float32)  # Keep float for BCE loss
    y_test_tensor = torch.tensor(y_test, dtype=torch.float32)

    input_dim = X_train.shape[1]  # Get number of features

    return X_train_tensor, X_test_tensor, y_train_tensor, y_test_tensor, input_dim


In [202]:
def load_and_preprocess_data(file_path, target_col, test_size=0.3, random_state=42):
    df = pd.read_csv(file_path)

    # Standard preprocessing (modify if needed for different datasets)
    if target_col == "diagnosis":
        df[target_col] = df[target_col].map({"M": 1, "B": 0}).astype(int) # Convert to 1/0

    feature_cols = [col for col in df.columns if col != target_col]
    X = df[feature_cols].values
    y = df[target_col].values

    # Standardize features
    scaler = StandardScaler()
    X = scaler.fit_transform(X)

    # Debugging output
    print(f"Input dimension calculated: {X.shape[1]}")

    return prepare_tensors(X, y, test_size, random_state)


In [203]:
class BoostedVI_BayesianFNN(nn.Module):
    def __init__(self, input_dim, hidden_dim=16, output_dim=1, num_mixtures=3):
        super().__init__()
        self.input_dim = input_dim
        self.hidden_dim = hidden_dim
        self.output_dim = output_dim
        self.num_mixtures = num_mixtures

    def model(self, X, y=None):
        batch_size = X.shape[0]  
        print("Expected batch size:", batch_size)  

        with pyro.plate("batch", batch_size, dim=-1):  # 🔹 Ensure batch plate

            with pyro.plate("layers_w1", self.input_dim * self.hidden_dim, dim=-2):
                mix_weights = pyro.sample("mix_weights", dist.Dirichlet(torch.ones(self.num_mixtures)))

                means_w1 = pyro.sample("means_w1", dist.Normal(
                    torch.zeros(self.input_dim * self.hidden_dim, self.num_mixtures),
                    torch.ones(self.input_dim * self.hidden_dim, self.num_mixtures)
                ).to_event(1))

                scales_w1 = pyro.sample("scales_w1", dist.LogNormal(
                    torch.zeros(self.input_dim * self.hidden_dim, self.num_mixtures),
                    torch.ones(self.input_dim * self.hidden_dim, self.num_mixtures)
                ).to_event(1))

            # 🔹 Fix dimension mismatch
            if mix_weights.dim() > 2:
                mix_weights = mix_weights.squeeze(1)  
            mix_weights = mix_weights.expand(self.input_dim * self.hidden_dim, self.num_mixtures)

            w1 = pyro.sample("w1", dist.MixtureSameFamily(
                dist.Categorical(mix_weights),
                dist.Normal(means_w1, scales_w1)
            )).reshape(self.input_dim, self.hidden_dim)

            print(f"Shape of w1: {w1.shape}")

            with pyro.plate("layers_b1", self.hidden_dim, dim=-1):
                b1 = pyro.sample("b1", dist.Normal(torch.zeros(self.hidden_dim), torch.ones(self.hidden_dim)))

            hidden = torch.tanh(torch.matmul(X, w1) + b1)

            with pyro.plate("layers_w2", self.hidden_dim * self.output_dim, dim=-2):
                mix_weights_w2 = pyro.sample("mix_weights_w2", dist.Dirichlet(torch.ones(self.num_mixtures)))

                means_w2 = pyro.sample("means_w2", dist.Normal(
                    torch.zeros(self.hidden_dim * self.output_dim, self.num_mixtures),
                    torch.ones(self.hidden_dim * self.output_dim, self.num_mixtures)
                ).to_event(1))

                scales_w2 = pyro.sample("scales_w2", dist.LogNormal(
                    torch.zeros(self.hidden_dim * self.output_dim, self.num_mixtures),
                    torch.ones(self.hidden_dim * self.output_dim, self.num_mixtures)
                ).to_event(1))

            # 🔹 Fix dimension mismatch for w2 weights
            if mix_weights_w2.dim() > 2:
                mix_weights_w2 = mix_weights_w2.squeeze(1)  
            mix_weights_w2 = mix_weights_w2.expand(self.hidden_dim * self.output_dim, self.num_mixtures)

            w2 = pyro.sample("w2", dist.MixtureSameFamily(
                dist.Categorical(mix_weights_w2),
                dist.Normal(means_w2, scales_w2)
            )).reshape(self.hidden_dim, self.output_dim)

            with pyro.plate("layers_b2", self.output_dim, dim=-1):
                b2 = pyro.sample("b2", dist.Normal(torch.zeros(self.output_dim), torch.ones(self.output_dim)))

            logits = torch.matmul(hidden, w2) + b2
            probs = torch.sigmoid(logits).squeeze(-1)

            if y is not None:
                pyro.sample("obs", dist.Bernoulli(probs), obs=y)

            return probs



    def guide(self, X, y=None):
        batch_size = X.shape[0]  # Fix: Ensure guide uses the same batch size as model

        with pyro.plate("batch", batch_size, dim=-1):  # Ensure batch size is consistent

            mix_weights_q = pyro.param("mix_weights_q", torch.ones(self.num_mixtures) / self.num_mixtures, constraint=constraints.simplex)
            pyro.sample("mix_weights", dist.Dirichlet(mix_weights_q))

            means_w1_q = pyro.param("means_w1_q", torch.zeros(self.input_dim * self.hidden_dim, self.num_mixtures))
            scales_w1_q = pyro.param("scales_w1_q", torch.ones(self.input_dim * self.hidden_dim, self.num_mixtures), constraint=constraints.positive)

            with pyro.plate("layers_w1", self.input_dim * self.hidden_dim, dim=-2):  # 🔹 Fix mismatch
                pyro.sample("means_w1", dist.Normal(means_w1_q, scales_w1_q).to_event(1))

            b1_q = pyro.param("b1_q", torch.zeros(self.hidden_dim))
            pyro.sample("b1", dist.Normal(b1_q, 0.1 * torch.ones_like(b1_q)))

            mix_weights_w2_q = pyro.param("mix_weights_w2_q", torch.ones(self.num_mixtures) / self.num_mixtures, constraint=constraints.simplex)
            pyro.sample("mix_weights_w2", dist.Dirichlet(mix_weights_w2_q))

            means_w2_q = pyro.param("means_w2_q", torch.zeros(self.hidden_dim * self.output_dim, self.num_mixtures))
            scales_w2_q = pyro.param("scales_w2_q", torch.ones(self.hidden_dim * self.output_dim, self.num_mixtures), constraint=constraints.positive)

            with pyro.plate("layers_w2", self.hidden_dim * self.output_dim, dim=-2):
                pyro.sample("means_w2", dist.Normal(means_w2_q, scales_w2_q).to_event(1))

            b2_q = pyro.param("b2_q", torch.zeros(self.output_dim))
            pyro.sample("b2", dist.Normal(b2_q, 0.1 * torch.ones_like(b2_q)))



In [204]:
# Train BVINN Model
def train_model(X_train, y_train, input_dim, num_epochs=1000):
    pyro.clear_param_store()
    model = BoostedVI_BayesianFNN(input_dim)
    optimizer = pyro.optim.Adam({"lr": 1e-3})  # Lower from 1e-2 to 1e-3
    svi = SVI(model.model, model.guide, optimizer, loss=Trace_ELBO())
    
    for epoch in range(num_epochs):
        loss = svi.step(X_train, y_train)
        if epoch % 100 == 0:
            print(f"Epoch {epoch}: Loss = {loss}")
    
    return model

In [205]:
def evaluate_model(model, X_test, y_test):
    batch_size = X_test.shape[0]  # Ensure batch size matches X_test size
    print(f"X_test shape: {X_test.shape}")
    print(f"Expected batch size: {batch_size}")

    # ✅ Fix: Ensure consistent batch size
    predictive = pyro.infer.Predictive(model.model, guide=model.guide, num_samples=1000)
    
    # ✅ Fix: Slice X_test properly
    samples = predictive(X_test[:batch_size])

    pred_probs = samples["obs"].mean(axis=0).detach().numpy()
    predictions = (pred_probs > 0.5).astype(int)

    accuracy = (predictions == y_test.numpy()).mean()
    print(f"Accuracy: {accuracy:.4f}")



In [206]:
def main():
    file_path = "Data Sets/cancer.csv"  # Change for different datasets
    target_col = "diagnosis"  # Change for different datasets
    X_train, X_test, y_train, y_test, input_dim = load_and_preprocess_data(file_path, target_col)
    
    model = train_model(X_train, y_train, input_dim)
    evaluate_model(model, X_test, y_test)

In [207]:
if __name__ == "__main__":
    main()

Input dimension calculated: 31


ValueError: Shape mismatch inside plate('batch') at site means_w1 dim -1, 398 vs 496
   Trace Shapes:        
    Param Sites:        
   mix_weights_q     3  
      means_w1_q 496 3  
     scales_w1_q 496 3  
   Sample Sites:        
      batch dist     |  
           value 398 |  
mix_weights dist 398 | 3
           value 398 | 3
  layers_w1 dist     |  
           value 496 |  

In [208]:
model_instance = BoostedVI_BayesianFNN(input_dim=31, hidden_dim=16, output_dim=1, num_mixtures=3)
X_dummy = torch.randn(5, 31)  # Batch size 5, input_dim=31
try:
    output = model_instance.model(X_dummy)
    print("Model ran successfully with output:", output.shape)
except Exception as e:
    print("Error:", e)


Expected batch size: 5
Error: Shape mismatch inside plate('batch') at site means_w1 dim -1, 5 vs 496


ValueError: Shape mismatch inside plate('batch') at site w1 dim -1, 455 vs 16
Trace Shapes:       
 Param Sites:       
       w1_loc  30 16
     w1_scale  30 16
Sample Sites:       
   batch dist      |
        value 455  |