## Step 7: Load Training Data from `.pkl` File

This step loads the preprocessed data prepared in the previous step:

- **Inputs:** A `.pkl` file containing:
  - `images`: The original 2D slices (typically shape: N × H × W × 3).
  - `GT`: Ground truth bounding boxes.
  - `reg`: Regression targets for bounding box localization.
  - `clc`: Classification labels (1: object, 0: background, -1: ignore).
- **Conversion:** Data is stored in NumPy arrays and ready for input into a PyTorch model.



In [3]:
import pickle
import numpy as np

with open('./Brainhack/final_project/data_train.pkl', 'rb') as f:
    Data = pickle.load(f, encoding='latin1')

image_data = Data['images']
bbox_list1 = Data['GT']
offset_list_label_list1 = Data['reg']
label_list1 = Data['clc']

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models
from torchsummary import summary

## Step 8: Define the Custom RPN Architecture Based on VGG16

This step constructs the RPN model used for region proposal generation:

- **Backbone:** A custom VGG16-like convolutional architecture is implemented.
- **Upsampling:** A transpose convolution layer is added to refine spatial resolution.
- **Heads:**
  - **Regression head**: Predicts 4 × *k* values (bounding box offsets).
  - **Classification head**: Predicts objectness score for each anchor (binary classification).
- **Customization:** Additional conv layers allow concatenation of multi-scale features to improve proposal quality.

## Step 9: Load Pretrained Weights from VGG16

This step initializes the convolutional layers of the custom RPN with weights from a pretrained VGG16 model:

- **Transfer Learning:** Weights are copied layer-by-layer from `torchvision.models.vgg16`.
- **Selective Copying:** Only matching layers of type `nn.Conv2d` are transferred.
- **Benefits:** Faster convergence and better initial performance on medical image data.



In [8]:
# Define the RPN model in PyTorch
class CustomRPN(nn.Module):
    def __init__(self, k=6, weight_decay=0.000001):
        super(CustomRPN, self).__init__()

        # Define the initial convolutional layers
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, padding=1)
        
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)
        
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.conv4 = nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, padding=1)
        self.conv5 = nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding=1)
        self.conv6 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.conv7 = nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, padding=1)
        self.conv8 = nn.Conv2d(in_channels=256, out_channels=512, kernel_size=3, padding=1)
        self.conv9 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.conv10 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        
        self.conv11 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.conv12 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)
        self.conv13 = nn.Conv2d(in_channels=512, out_channels=512, kernel_size=3, padding=1)

        # Transpose convolution to upsample feature maps
        self.deconv1 = nn.ConvTranspose2d(in_channels=512, out_channels=512, kernel_size=1, stride=2, padding=1, output_padding=1)

        # Convolutional layers to adjust channel dimensions
        self.conv15 = nn.Conv2d(in_channels=512, out_channels=1, kernel_size=1, padding=1)
        self.conv16 = nn.Conv2d(in_channels=256, out_channels=1, kernel_size=1)
        self.conv17 = nn.Conv2d(in_channels=512, out_channels=510, kernel_size=1)
        
        # Define regressor and classifier layers
        self.regressor = nn.Conv2d(in_channels=512, out_channels=4*k, kernel_size=1)
        self.classifier = nn.Conv2d(in_channels=512, out_channels=k, kernel_size=1)

    def forward(self, x):
        # Forward pass through the network
        x1 = F.relu(self.conv1(x))
        x2 = F.relu(self.conv2(x1))
        p1 = self.pool(x2)
        x3 = F.relu(self.conv3(p1))
        x4 = F.relu(self.conv4(x3))
        p2 = self.pool(x4)
        x5 = F.relu(self.conv5(p2))
        x6 = F.relu(self.conv6(x5))
        x7 = F.relu(self.conv7(x6))
        p3 = self.pool(x7)
        x8 = F.relu(self.conv8(p3))
        x9 = F.relu(self.conv9(x8))
        x10 = F.relu(self.conv10(x9))
        p4 = self.pool(x10)
        x11 = F.relu(self.conv11(p4))
        x12 = F.relu(self.conv12(x11))
        x13 = F.relu(self.conv13(x12))
        
        # Upsample the feature maps
        x14 = self.deconv1(x13)
        x15 = self.conv15(x14)
        # Additional convolutions on earlier layers
        x18 = self.conv16(p3)
        x19 = self.conv17(x10)

        
        # Concatenate feature maps along the channel dimension
        concatenated = torch.cat([x15, x19, x18], dim=1)
        
        # Get bounding box predictions and objectness scores
        regressor = self.regressor(concatenated)
        classifier = torch.sigmoid(self.classifier(concatenated))

        return regressor, classifier


# Initialize the model
model = CustomRPN(k=6)

# Load pretrained VGG16
vgg16 = models.vgg16(pretrained=True)

# Function to load weights from VGG16 to the CustomRPN model
def load_vgg16_weights(custom_model, vgg16_model):
    vgg16_layers = list(vgg16_model.features.children())  # Get layers from VGG16
    custom_layers = list(custom_model.children())  # Get layers from CustomRPN

    # Transfer weights from VGG16 to CustomRPN
    vgg_idx = 0
    for custom_layer in custom_layers:
        if isinstance(custom_layer, nn.Conv2d):
            vgg_layer = vgg16_layers[vgg_idx]
            if isinstance(vgg_layer, nn.Conv2d):
                with torch.no_grad():
                    custom_layer.weight.copy_(vgg_layer.weight)
                    custom_layer.bias.copy_(vgg_layer.bias)
                vgg_idx += 1

# Load the weights into the model
load_vgg16_weights(model, vgg16)

# Print the summary of the model architecture
summary(model, input_size=(3, 512, 512))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 512, 512]           1,792
            Conv2d-2         [-1, 64, 512, 512]          36,928
         MaxPool2d-3         [-1, 64, 256, 256]               0
            Conv2d-4        [-1, 128, 256, 256]          73,856
            Conv2d-5        [-1, 128, 256, 256]         147,584
         MaxPool2d-6        [-1, 128, 128, 128]               0
            Conv2d-7        [-1, 256, 128, 128]         295,168
            Conv2d-8        [-1, 256, 128, 128]         590,080
            Conv2d-9        [-1, 256, 128, 128]         590,080
        MaxPool2d-10          [-1, 256, 64, 64]               0
           Conv2d-11          [-1, 512, 64, 64]       1,180,160
           Conv2d-12          [-1, 512, 64, 64]       2,359,808
           Conv2d-13          [-1, 512, 64, 64]       2,359,808
        MaxPool2d-14          [-1, 512,

## Step 10: Define Custom Loss Functions

This step defines the loss functions used for RPN training:

- **Smooth L1 Loss:** Used for regression of bounding box offsets, robust to outliers.
- **Custom L1 Loss:** Applies Smooth L1 loss only to positive anchors.
- **Custom Binary Loss:** Binary cross-entropy loss applied to anchor objectness scores, ignoring neutral anchors.

> These losses are combined during training to optimize both localization and classification.


In [17]:
class SmoothL1Loss(nn.Module):
    def __init__(self):
        super(SmoothL1Loss, self).__init__()
        
    def forward(self, y_true, y_pred):
        # Calculate the absolute difference between true and predicted values
        x = torch.abs(y_true - y_pred)
        # Create a mask for values where the absolute difference is less than 1
        mask = (x < 1.0).float()
        # Compute the Smooth L1 loss based on the mask
        loss = mask * (0.5 * x ** 2) + (1 - mask) * (x - 0.5)
        return loss.mean()

class CustomL1Loss(nn.Module):
    def __init__(self):
        super(CustomL1Loss, self).__init__()
        # Initialize SmoothL1Loss as a component of CustomL1Loss
        self.smooth_l1_loss = SmoothL1Loss()
        
    def forward(self, y_true, y_pred):
        # Reshape y_pred to match the format of y_true
        y_pred = y_pred.view(y_true.shape[0], y_true.shape[1], -1)  
        # Extract bounding box offsets and labels from y_true
        offset_list = y_true[:, :, :-1]
        label_list = y_true[:, :, -1]
        # Identify positive examples where the label is 1
        positive_idxs = (label_list == 1).nonzero(as_tuple=True)
        # Select the predicted bounding boxes and target offsets for positive examples
        bbox = y_pred[positive_idxs]
        target_bbox = offset_list[positive_idxs]
        # Compute the Smooth L1 loss between the predicted and target bounding boxes
        loss = self.smooth_l1_loss(target_bbox, bbox)
        return loss

class CustomBinaryLoss(nn.Module):
    def __init__(self):
        super(CustomBinaryLoss, self).__init__()
        
    def forward(self, y_true, y_pred_objectiveness):
        # Reshape predictions to match the expected shape
        y_pred = y_pred_objectiveness.view(-1, 24576)
        y_true = y_true.squeeze(-1)
        # Get the indices where the true labels are not -1
        indices = (y_true != -1).nonzero(as_tuple=True)
        # Select the predicted logits and true labels for the valid indices
        rpn_match_logits = y_pred[indices]
        anchor_class = y_true[indices]
        
        # Clamp anchor_class values to be between 0 and 1
        anchor_class = torch.clamp(anchor_class, min=0, max=1)  
        
        loss = F.binary_cross_entropy(rpn_match_logits, anchor_class.float()) # Make sure anchor_class is float
        return loss

In [18]:
import torch
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.model_selection import train_test_split
import numpy as np

## Step 11: Prepare Data for Training

This step converts and splits the data for PyTorch training:

- **Format Conversion:** Image data is transposed to `(N, C, H, W)` and converted to float tensors.
- **Splitting:** Data is split into training and validation sets (90/10).
- **Dataloaders:** PyTorch `DataLoader` objects are created for batching and shuffling.

## Step 12: Train and Validate the RPN Model

This step performs training and validation of the RPN over multiple epochs:

- **Optimizer:** Adam optimizer with a learning rate of 0.0001.
- **Training Loop:** Runs for 5000 epochs (or until early stopping).
- **Loss Composition:** Total loss = classification loss + 0.2 × regression loss.
- **Validation:** Evaluates the model on validation set at each epoch.

> Device used: CUDA (if available), otherwise CPU

## Step 13: Save the Trained Model

After training completes, the RPN model is saved for future use:




In [19]:
# Transpose the image data to match the PyTorch tensor format (N, C, H, W)
image_data1 = np.transpose(image_data, (0, 3, 1, 2))

# Convert image data and target labels to PyTorch tensors
images = torch.tensor(image_data1)
images = images.to(dtype=torch.float32)
target_regressor = torch.tensor(offset_list_label_list1)
target_classifier = torch.tensor(label_list1)

# Split dataset into training and validation sets
train_images, val_images, train_target_regressor, val_target_regressor, train_target_classifier, val_target_classifier = train_test_split(
    images, target_regressor, target_classifier, test_size=0.1)

# Create DataLoader for training data with a batch size of 4
train_dataset = TensorDataset(train_images, train_target_regressor, train_target_classifier)
train_dataloader = DataLoader(train_dataset, batch_size=4)

# Create DataLoader for validation data with a batch size of 4
val_dataset = TensorDataset(val_images, val_target_regressor, val_target_classifier)
val_dataloader = DataLoader(val_dataset, batch_size=4)

# Initialize the model and optimizer
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # Use GPU if available, otherwise CPU
model = CustomRPN(k=6).to(device)
optimizer = optim.Adam(model.parameters(), lr=0.0001)  # Adam optimizer with learning rate of 0.0001

# Define the loss functions
custom_l1_loss = CustomL1Loss()
custom_binary_loss = CustomBinaryLoss()

# Training function
def train(model, dataloader, optimizer, device):
    model.train()  # Set the model to training mode
    total_loss = 0
    for batch in dataloader:
        images, target_regressor, target_classifier = batch
        images, target_regressor, target_classifier = images.to(device), target_regressor.to(device), target_classifier.to(device)

        optimizer.zero_grad()  # Zero the gradients
        regressor, classifier = model(images)

        # Compute losses for regression and classification
        loss_regressor = custom_l1_loss(target_regressor, regressor)
        loss_classifier = custom_binary_loss(target_classifier, classifier)
        loss = loss_classifier + 0.2 * loss_regressor  # Combine losses with a weight for regression loss

        loss.backward()  # Backpropagate the loss
        optimizer.step()  # Update model parameters

        total_loss += loss.item()  # Accumulate loss

    average_loss = total_loss / len(dataloader)  # Compute average loss
    print(f'Average Loss: {average_loss}')

# Define the validation function
def validate(model, dataloader, device):
    model.eval()  # Set the model to evaluation mode
    total_loss = 0
    with torch.no_grad():  # No gradient computation for validation
        for batch in dataloader:
            images, target_regressor, target_classifier = batch
            images, target_regressor, target_classifier = images.to(device), target_regressor.to(device), target_classifier.to(device)
            
            regressor, classifier = model(images)

            # Compute losses for regression and classification
            loss_regressor = custom_l1_loss(target_regressor, regressor)
            loss_classifier = custom_binary_loss(target_classifier, classifier)
            loss = loss_classifier + 0.2 * loss_regressor  # Combine losses with a weight for regression loss

            total_loss += loss.item()  # Accumulate loss

    average_loss = total_loss / len(dataloader)  # Compute average loss
    print(f'Validation Loss: {average_loss}')

# Set the number of epochs and batch size
num_epochs = 5000  # Number of epochs for training

# Training loop
for epoch in range(num_epochs):
    print(f'Epoch {epoch+1}/{num_epochs}')
    train(model, train_dataloader, optimizer, device)  # Train for one epoch
    validate(model, val_dataloader, device)  # Validate after training
    print('-' * 30)

# Save the trained model to a file
torch.save(model.state_dict(), "RPN_torch_weights.pth")

Epoch 1/10
Average Loss: 1.0576949480614501
Validation Loss: 1.0777614690911153
------------------------------
Epoch 2/10
Average Loss: 0.99862130030559
Validation Loss: 0.9977648503878238
------------------------------
Epoch 3/10
Average Loss: 0.9279586158982654
Validation Loss: 0.9708523356212833
------------------------------
Epoch 4/10
Average Loss: 0.9022415641102602
Validation Loss: 0.9650765114224359
------------------------------
Epoch 5/10
Average Loss: 0.9033681851915313
Validation Loss: 0.9405174825867645
------------------------------
Epoch 6/10
Average Loss: 0.8894290972435194
Validation Loss: 0.9405467327471195
------------------------------
Epoch 7/10
Average Loss: 0.8732365614180289
Validation Loss: 0.9350577168737304
------------------------------
Epoch 8/10
Average Loss: 0.8648839116651054
Validation Loss: 0.9204126436941592
------------------------------
Epoch 9/10
Average Loss: 0.8334147511162234
Validation Loss: 0.8962395561751278
------------------------------
Epo