In [None]:
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.optimizers import Adam
import tensorflow as tf
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
from sklearn.metrics import confusion_matrix, classification_report
import seaborn as sns

In [None]:
import kagglehub
import os

dataset_path = kagglehub.dataset_download('bhavikjikadara/dog-and-cat-classification-dataset')
directory = os.path.join(dataset_path, 'PetImages')

In [None]:
images = []
labels = []

try:
  for foldr in os.listdir(directory):
    for filee in os.listdir(os.path.join(directory, foldr)):
      images.append(os.path.join(foldr, filee))
      labels.append(foldr)

except Exception as e:
  print(f'Error: {e}')

all_df = pd.DataFrame({
    'Images': images,
    'Labels': labels
    })

all_df.info()

In [None]:
all_df.groupby("Labels").count()

In [None]:
plt.figure(figsize=(10, 6))

for i, label in enumerate(all_df['Labels'].unique()):
    sample = all_df[all_df['Labels'] == label].sample(1).iloc[0]
    img_path = os.path.join(directory, sample['Images'])
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)

    plt.subplot(1, len(all_df['Labels'].unique()), i+1)
    plt.imshow(img)
    plt.title(label)
    plt.axis('off')

plt.show()

In [None]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.preprocessing.image import ImageDataGenerator

train_df, test_df = train_test_split(all_df, test_size=0.2, random_state=111620206, stratify=all_df['Labels'])

In [None]:
train_df_small = train_df.sample(n=100, random_state=42).reset_index(drop=True)

In [None]:
trainimgen = ImageDataGenerator(
    rotation_range=20,
    zoom_range=0.2,
    horizontal_flip=True,
    shear_range=0.2
    )

train_data = trainimgen.flow_from_dataframe(
    dataframe=train_df,
    directory=directory,
    x_col='Images',
    y_col='Labels',
    target_size=(224,224),
    color_mode='rgb',
    class_mode='binary',
    batch_size=16,
)




In [None]:
testimgen = ImageDataGenerator()

test_data = testimgen.flow_from_dataframe(
    dataframe=test_df,
    directory=directory,
    x_col='Images',
    y_col='Labels',
    target_size=(224,224),
    color_mode='rgb',
    class_mode='binary',
    batch_size=16,
    shuffle=False # For test data, it's crucial to set shuffle=False. This ensures the prediction order matches the label order, which is necessary for correct evaluation with metrics like a confusion matrix.
)

## Model Time

In [None]:
class BayesianLinear(nn.Module):
    def __init__(self, in_features, out_features):
        super(BayesianLinear, self).__init__()
        self.in_features = in_features
        self.out_features = out_features

        # Weight parameters (mean and rho for std via softplus)
        self.weight_mu = nn.Parameter(torch.Tensor(out_features, in_features).normal_(0, 0.1))
        self.weight_rho = nn.Parameter(torch.Tensor(out_features, in_features).normal_(-3, 0.1))

        # Bias parameters
        self.bias_mu = nn.Parameter(torch.Tensor(out_features).normal_(0, 0.1))
        self.bias_rho = nn.Parameter(torch.Tensor(out_features).normal_(-3, 0.1))

    def forward(self, x):
        # Sample weights: w = mu + softplus(rho) * epsilon - equivilant to Normal sample - randomness htrough epsilon, so we can get gradients from mu and sigma
        weight_std = torch.log1p(torch.exp(self.weight_rho)) # backprop p instead of sigma - ensures sigma positive
        weight = self.weight_mu + weight_std * torch.randn_like(self.weight_mu)

        # Sample bias
        bias_std = torch.log1p(torch.exp(self.bias_rho))
        bias = self.bias_mu + bias_std * torch.randn_like(self.bias_mu)

        # Use the sampled weights in linear pytorch function
        return F.linear(x, weight, bias)

    def kl_divergence(self):
        # KL divergence between posterior q(w|theta) and prior p(w)
        weight_std = torch.log1p(torch.exp(self.weight_rho))
        bias_std = torch.log1p(torch.exp(self.bias_rho))

        kl = self._kl_divergence_normal(self.weight_mu, weight_std)
        kl += self._kl_divergence_normal(self.bias_mu, bias_std)
        return kl

    def _kl_divergence_normal(self, mu, std):
        # KL divergence between N(mu, std) and N(0, 1)
        kl = -torch.log(std) + (std**2 + mu**2) / 2 - 0.5
        return kl.sum()


class BayesianConv2d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=0, stride=1):
        super(BayesianConv2d, self).__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size if isinstance(kernel_size, tuple) else (kernel_size, kernel_size)
        self.padding = padding
        self.stride = stride

        # Weight parameters
        self.weight_mu = nn.Parameter(torch.Tensor(
            out_channels, in_channels, *self.kernel_size).normal_(0, 0.1))
        self.weight_rho = nn.Parameter(torch.Tensor(
            out_channels, in_channels, *self.kernel_size).normal_(-3, 0.1))

        # Bias parameters
        self.bias_mu = nn.Parameter(torch.Tensor(out_channels).normal_(0, 0.1))
        self.bias_rho = nn.Parameter(torch.Tensor(out_channels).normal_(-3, 0.1))

    def forward(self, x):
        weight_std = torch.log1p(torch.exp(self.weight_rho))
        weight = self.weight_mu + weight_std * torch.randn_like(self.weight_mu)

        bias_std = torch.log1p(torch.exp(self.bias_rho))
        bias = self.bias_mu + bias_std * torch.randn_like(self.bias_mu)

        return F.conv2d(x, weight, bias, stride=self.stride, padding=self.padding)

    def kl_divergence(self):
        weight_std = torch.log1p(torch.exp(self.weight_rho))
        bias_std = torch.log1p(torch.exp(self.bias_rho))

        return (self._kl_divergence_normal(self.weight_mu, weight_std) +
                self._kl_divergence_normal(self.bias_mu, bias_std))

    def _kl_divergence_normal(self, mu, std):
        kl = -torch.log(std) + (std**2 + mu**2) / 2 - 0.5
        return kl.sum()

class BayesianCNN(nn.Module):
    def __init__(self):
        super(BayesianCNN, self).__init__()

        # Conv block 1
        self.conv1 = BayesianConv2d(3, 16, kernel_size=3, padding=1) # 3 channels for RGB
        self.pool1 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Conv block 2
        self.conv2 = BayesianConv2d(16, 32, kernel_size=3, padding=1)
        self.pool2 = nn.MaxPool2d(kernel_size=2, stride=2)

        # Conv block 3
        self.conv3 = BayesianConv2d(32, 64, kernel_size=3, padding=1)
        self.pool3 = nn.MaxPool2d(kernel_size=2, stride=2)

        # For 224x224 input: after 3 poolings -> 28x28
        self.fc1 = BayesianLinear(28*28*64, 128)
        self.fc2 = BayesianLinear(128, 2)  # 2 classes

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.pool2(x)
        x = F.relu(self.conv3(x))
        x = self.pool3(x)
        x = x.reshape(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

    def kl_divergence(self):
        # Total KL divergence for the network
        kl = 0
        kl += self.conv1.kl_divergence()
        kl += self.conv2.kl_divergence()
        kl += self.conv3.kl_divergence()
        kl += self.fc1.kl_divergence()
        kl += self.fc2.kl_divergence()
        return kl

    def predict_with_uncertainty(self, x, num_samples=100):
        self.train()  # Keep in train mode to sample weights
        predictions = []

        with torch.no_grad():
            for _ in range(num_samples):
                output = self(x)
                probs = F.softmax(output, dim=1)
                predictions.append(probs)

        predictions = torch.stack(predictions)  # [num_samples, batch_size, num_classes]

        mean_predictions = predictions.mean(dim=0)
        std_predictions = predictions.std(dim=0)

        return mean_predictions, std_predictions, predictions

class ParameterTracker:
    # Track specific parameters during training
    def __init__(self, model, num_params_to_track=10):
        self.history = {
            'conv1_weight_mu': [],
            'conv1_weight_sigma': [],
            'fc1_weight_mu': [],
            'fc1_weight_sigma': [],
            'fc2_bias_mu': [],
            'fc2_bias_sigma': [],
        }

        # Randomly select indices to track
        conv1_size = model.conv1.weight_mu.numel()
        fc1_size = model.fc1.weight_mu.numel()
        fc2_bias_size = model.fc2.bias_mu.numel()

        self.indices = {
            'conv1': np.random.choice(conv1_size, num_params_to_track, replace=False),
            'fc1': np.random.choice(fc1_size, num_params_to_track, replace=False),
            'fc2_bias': np.random.choice(fc2_bias_size, min(num_params_to_track, fc2_bias_size), replace=False),
        }

    def record(self, model):
        # Conv1 weights
        weight_mu = model.conv1.weight_mu.detach().cpu().flatten().numpy()
        weight_rho = model.conv1.weight_rho.detach().cpu().flatten().numpy()
        weight_sigma = np.log1p(np.exp(weight_rho))
        self.history['conv1_weight_mu'].append(weight_mu[self.indices['conv1']])
        self.history['conv1_weight_sigma'].append(weight_sigma[self.indices['conv1']])

        # FC1 weights
        weight_mu = model.fc1.weight_mu.detach().cpu().flatten().numpy()
        weight_rho = model.fc1.weight_rho.detach().cpu().flatten().numpy()
        weight_sigma = np.log1p(np.exp(weight_rho))
        self.history['fc1_weight_mu'].append(weight_mu[self.indices['fc1']])
        self.history['fc1_weight_sigma'].append(weight_sigma[self.indices['fc1']])

        # FC2 biases
        bias_mu = model.fc2.bias_mu.detach().cpu().numpy()
        bias_rho = model.fc2.bias_rho.detach().cpu().numpy()
        bias_sigma = np.log1p(np.exp(bias_rho))
        self.history['fc2_bias_mu'].append(bias_mu[self.indices['fc2_bias']])
        self.history['fc2_bias_sigma'].append(bias_sigma[self.indices['fc2_bias']])

    def plot_evolution(self):
        fig, axes = plt.subplots(3, 2, figsize=(15, 12))
        fig.suptitle('Parameter Evolution During Training', fontsize=16)

        epochs = np.arange(len(self.history['conv1_weight_mu']))

        # Conv1 weight mu
        for i in range(len(self.indices['conv1'])):
            mu_values = [epoch_vals[i] for epoch_vals in self.history['conv1_weight_mu']]
            axes[0, 0].plot(epochs, mu_values, alpha=0.7, label=f'Weight {i}')
        axes[0, 0].set_title('Conv1 Weight μ Evolution')
        axes[0, 0].set_xlabel('Epoch')
        axes[0, 0].set_ylabel('μ value')
        axes[0, 0].grid(alpha=0.3)

        # Conv1 weight sigma
        for i in range(len(self.indices['conv1'])):
            sigma_values = [epoch_vals[i] for epoch_vals in self.history['conv1_weight_sigma']]
            axes[0, 1].plot(epochs, sigma_values, alpha=0.7, label=f'Weight {i}')
        axes[0, 1].set_title('Conv1 Weight σ Evolution')
        axes[0, 1].set_xlabel('Epoch')
        axes[0, 1].set_ylabel('σ value')
        axes[0, 1].grid(alpha=0.3)

        # FC1 weight mu
        for i in range(len(self.indices['fc1'])):
            mu_values = [epoch_vals[i] for epoch_vals in self.history['fc1_weight_mu']]
            axes[1, 0].plot(epochs, mu_values, alpha=0.7)
        axes[1, 0].set_title('FC1 Weight μ Evolution')
        axes[1, 0].set_xlabel('Epoch')
        axes[1, 0].set_ylabel('μ value')
        axes[1, 0].grid(alpha=0.3)

        # FC1 weight sigma
        for i in range(len(self.indices['fc1'])):
            sigma_values = [epoch_vals[i] for epoch_vals in self.history['fc1_weight_sigma']]
            axes[1, 1].plot(epochs, sigma_values, alpha=0.7)
        axes[1, 1].set_title('FC1 Weight σ Evolution')
        axes[1, 1].set_xlabel('Epoch')
        axes[1, 1].set_ylabel('σ value')
        axes[1, 1].grid(alpha=0.3)

        # FC2 bias mu
        for i in range(len(self.indices['fc2_bias'])):
            mu_values = [epoch_vals[i] for epoch_vals in self.history['fc2_bias_mu']]
            axes[2, 0].plot(epochs, mu_values, alpha=0.7, marker='o', markersize=3)
        axes[2, 0].set_title('FC2 Bias μ Evolution')
        axes[2, 0].set_xlabel('Epoch')
        axes[2, 0].set_ylabel('μ value')
        axes[2, 0].grid(alpha=0.3)

        # FC2 bias sigma
        for i in range(len(self.indices['fc2_bias'])):
            sigma_values = [epoch_vals[i] for epoch_vals in self.history['fc2_bias_sigma']]
            axes[2, 1].plot(epochs, sigma_values, alpha=0.7, marker='o', markersize=3)
        axes[2, 1].set_title('FC2 Bias σ Evolution')
        axes[2, 1].set_xlabel('Epoch')
        axes[2, 1].set_ylabel('σ value')
        axes[2, 1].axhline(1, color='red', linestyle='--', alpha=0.5, label='Prior σ=1')
        axes[2, 1].grid(alpha=0.3)

        plt.tight_layout()
        plt.show()

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

model = BayesianCNN().to(device)
model

In [None]:
print(f"train_data will have {len(train_data)} batches")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

save_dir = '/content/drive/MyDrive/STATS/'

In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
tracker = ParameterTracker(model, num_params_to_track=10)
tracker.record(model)

# Calculate beta based on total number of training samples
beta = 1.0 / len(train_df)
print(f"Beta: {beta:.8f}")

num_epochs = 30
num_batches = len(train_data)
train_losses = []
val_accuracies = []

# Early stopping parameters
best_val_accuracy = 0.0
patience = 5  # Number of epochs to wait for improvement
patience_counter = 0

for epoch in range(num_epochs):
    model.train()

    total_loss = 0
    total_nll = 0
    total_kl = 0

    for batch_idx, (data, target) in enumerate(train_data):
        if batch_idx >= num_batches:
            break

        # Convert numpy arrays to PyTorch tensors
        data = torch.FloatTensor(data).to(device)
        target = torch.LongTensor(target.astype(int)).to(device)

        # Normalize pixel values to [0, 1] if not already done
        if data.max() > 1.0:
            data = data / 255.0

        # Transpose from (batch, height, width, channels) to (batch, channels, height, width)
        data = data.permute(0, 3, 1, 2)

        # Forward pass
        output = model(data)
        nll_loss = F.cross_entropy(output, target)
        kl_loss = model.kl_divergence()
        loss = nll_loss + beta * kl_loss

        # Backward pass
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_nll += nll_loss.item()
        total_kl += kl_loss.item()

    avg_loss = total_loss / len(train_data)
    avg_nll = total_nll / len(train_data)
    avg_kl = total_kl / len(train_data)
    train_losses.append(avg_loss)

    tracker.record(model)

    print(f'Epoch {epoch+1}/{num_epochs}:')
    print(f'  Loss: {avg_loss:.4f} (NLL: {avg_nll:.4f}, KL: {avg_kl:.4f})')

    # ============ VALIDATION ACCURACY ============
    # Calculate validation accuracy every 2 epochs
    if (epoch + 1) % 2 == 0:
        model.eval()
        correct = 0
        total = 0

        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(test_data):
                if batch_idx >= len(test_data):
                    break

                # Convert to tensors
                data = torch.FloatTensor(data).to(device)
                target = torch.LongTensor(target.astype(int)).to(device)

                # Normalize
                if data.max() > 1.0:
                    data = data / 255.0

                # Transpose
                data = data.permute(0, 3, 1, 2)

                # Forward pass (single sample for speed)
                outputs = model(data)
                _, predicted = torch.max(outputs.data, 1)

                total += target.size(0)
                correct += (predicted == target).sum().item()

        val_accuracy = 100 * correct / total
        val_accuracies.append(val_accuracy)
        print(f'  Validation Accuracy: {val_accuracy:.2f}%')

        # Early stopping check
        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            patience_counter = 0
            # Save best model
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'train_losses': train_losses,
                'val_accuracies': val_accuracies,
                'best_val_accuracy': best_val_accuracy,
            }, f'{save_dir}/best_bayesian_cnn_catdog.pth')
            print(f'  ✓ New best model saved! (Val Acc: {val_accuracy:.2f}%)')
        else:
            patience_counter += 1
            print(f'  No improvement. Patience: {patience_counter}/{patience}')

            if patience_counter >= patience:
                print(f'\nEarly stopping triggered at epoch {epoch+1}')
                print(f'Best validation accuracy: {best_val_accuracy:.2f}%')
                break

    # Save periodic checkpoints every 5 epochs
    if (epoch + 1) % 5 == 0:
        torch.save({
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_losses': train_losses,
            'val_accuracies': val_accuracies,
        }, f'{save_dir}/checkpoint_epoch_{epoch+1}.pth')
        print(f"Checkpoint saved at epoch {epoch+1}")


# Save the final model
torch.save({
    'epoch': epoch + 1,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'train_losses': train_losses,
    'val_accuracies': val_accuracies,
    'tracker_history': tracker.history,
    'best_val_accuracy': best_val_accuracy,
}, f'{save_dir}/bayesian_cnn_catdog_final.pth')

print(f"\nTraining completed!")
print(f"Final model saved as 'bayesian_cnn_catdog_final.pth'")
print(f"Best model saved as 'best_bayesian_cnn_catdog.pth'")
print(f"Best validation accuracy: {best_val_accuracy:.2f}%")

In [None]:
# Plot training loss and validation accuracy
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))

# Plot training loss
ax1.plot(range(1, len(train_losses) + 1), train_losses, marker='o')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Training Loss')
ax1.set_title('Training Loss over Epochs (Bayesian CNN)')
ax1.grid(True)

# Plot validation accuracy
val_epochs = list(range(2, 2 * len(val_accuracies) + 1, 2))
ax2.plot(val_epochs, val_accuracies, marker='o', color='green')
ax2.axhline(y=best_val_accuracy, color='r', linestyle='--', label=f'Best: {best_val_accuracy:.2f}%')
ax2.set_xlabel('Epoch')
ax2.set_ylabel('Validation Accuracy (%)')
ax2.set_title('Validation Accuracy over Epochs (Bayesian CNN)')
ax2.legend()
ax2.grid(True)

plt.tight_layout()
plt.show()

print(f"\nFinal Training Loss: {train_losses[-1]:.4f}")
if len(val_accuracies) > 0:
    print(f"Final Validation Accuracy: {val_accuracies[-1]:.2f}%")

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

model = BayesianCNN().to(device)
model.load_state_dict(torch.load(f'{save_dir}/checkpoint_epoch_10.pth'))
model.eval()