"Playground" for implementing Monte Carlo Dropout in Pytorch, as not possible in native pytorch.

Some links:
- https://stackoverflow.com/questions/63285197/measuring-uncertainty-using-mc-dropout-on-pytorch
- https://xuwd11.github.io/Dropout_Tutorial_in_PyTorch/
- https://discuss.pytorch.org/t/measuring-uncertainty-using-mc-dropout/91753

In [None]:
#imports
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, TensorDataset
from tqdm import tqdm


device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [5]:
#dataloading and preprocessing
import h5py
with h5py.File('data/tactmat.h5', 'r') as dataset:
    samples = dataset['samples'][:]
    materials = dataset['materials'][:]
    materials = [m.decode('utf-8') for m in materials]
train_samples = samples[:, :80,...]  # (36, 80, 1000, 4, 4)
test_samples = samples[:, 80:,...]  # (36, 20, 1000, 4, 4)

# Flatten data
train_samples = train_samples.reshape((36*80, 1000, 16))  # (2880, 1000, 16)
test_samples = test_samples.reshape((36*20, 1000, 16))  # (720, 1000, 16)

# Create labels
train_labels = np.repeat(np.arange(36), 80)  # (2880,)
test_labels = np.repeat(np.arange(36), 20)  # (720,)

# One-hot encoding not needed for pytorch CrossEntropyLoss
train_samples = torch.tensor(train_samples.reshape(2880, 1, 1000, 16), dtype=torch.float32)
test_samples = torch.tensor(test_samples.reshape(720, 1, 1000, 16), dtype=torch.float32)
train_labels = torch.tensor(train_labels, dtype=torch.long)
test_labels = torch.tensor(test_labels, dtype=torch.long)

In [None]:
# Model Definition
class MCTestTacNet(nn.Module):
    def __init__(self):
        super(MCTestTacNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=(15, 5))
        self.bn1 = nn.BatchNorm2d(32)
        self.pool1 = nn.MaxPool2d(kernel_size=(10, 1), stride=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=(15, 5))
        self.bn2 = nn.BatchNorm2d(64)
        self.pool2 = nn.MaxPool2d(kernel_size=(10, 1), stride=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=(15, 5))
        self.bn3 = nn.BatchNorm2d(128)
        self.pool3 = nn.MaxPool2d(kernel_size=(10, 1), stride=1)
        self.dropout = nn.Dropout(0.8)

        # Dummy forward pass to calculate the flattened size
        self.flattened_size = self._get_flatten_size()

        self.fc1 = nn.Linear(self.flattened_size, 512)
        self.bn4 = nn.BatchNorm1d(512)
        self.fc2 = nn.Linear(512, 36)

    def _get_flatten_size(self): # helper function for dynamic sizing of linear layer
        with torch.no_grad():
            x = torch.zeros(1, 1, 1000, 16)  # example
            x = self.pool3(self.bn3(self.conv3(self.pool2(self.bn2(self.conv2(self.pool1(self.bn1(self.conv1(x)))))))))
            return x.view(1, -1).size(1)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = self.pool1(x)
        x = F.relu(self.bn2(self.conv2(x)))
        x = self.pool2(x)
        x = F.relu(self.bn3(self.conv3(x)))
        x = self.pool3(x)
        x = torch.flatten(x, start_dim=1)
        x = F.relu(self.bn4(self.fc1(self.dropout(x))))
        x = self.fc2(x)  # no softmax bc cross entropy loss includes it
        return x

# Instantiate Model
model = MCTestTacNet()
model.to(device)


# Training func
def train_model(model, train_samples, train_labels, num_epochs=10, batch_size=32, learning_rate=0.001):
    train_dataset = TensorDataset(train_samples, train_labels)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

    train_samples = train_samples.to(device)
    train_labels = train_labels.to(device)
    
    # Define loss and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

    model.train()
    for epoch in range(num_epochs):
        running_loss = 0.0

        for inputs, labels in tqdm(train_loader, desc=f"Epoch [{epoch+1}/{num_epochs}]", unit="batch"):


            optimizer.zero_grad()
            outputs = model(inputs) #forward pass
            loss = criterion(outputs, labels) #loss
            loss.backward() #backward pass
            optimizer.step() #update params
            running_loss += loss.item() #calc running loss

        average_loss = running_loss / len(train_loader)
        print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {average_loss:.4f}")

    print("Training complete!")

#run training
train_model(model, train_samples, train_labels, num_epochs=10, batch_size=32, learning_rate=0.001)


Epoch [1/10]: 100%|██████████| 90/90 [21:55<00:00, 14.62s/batch]


Epoch [1/10], Loss: 2.6167


Epoch [2/10]:  73%|███████▎  | 66/90 [13:26<04:53, 12.21s/batch]


KeyboardInterrupt: 