In [None]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.ops import RoIPool
from torch.utils.data import Dataset, DataLoader
import numpy as np
import cv2  # For image processing

Architecture

In [None]:
class RPN(nn.Module):
    def __init__(self, in_channels, num_anchors):
        super(RPN, self).__init__()
        self.conv = nn.Conv2d(in_channels, 512, kernel_size=3, padding=1)
        self.cls_logits = nn.Conv2d(512, num_anchors * 2, kernel_size=1)  # Objectness score
        self.bbox_pred = nn.Conv2d(512, num_anchors * 4, kernel_size=1)  # Bounding box regression

    def forward(self, x):
        x = nn.ReLU()(self.conv(x))
        objectness = self.cls_logits(x)
        bbox_regression = self.bbox_pred(x)
        return objectness, bbox_regression


class ObjectDetector(nn.Module):
    def __init__(self, num_classes=2, num_anchors=9):
        super(ObjectDetector, self).__init__()
        self.backbone = models.vgg16(pretrained=True).features[:30]  # Use layers up to conv5_3
        self.rpn = RPN(512, num_anchors)
        self.roi_pool = RoIPool(output_size=(7, 7), spatial_scale=1/16)  # Adjust based on input size
        self.fc = nn.Sequential(
            nn.Linear(512 * 7 * 7, 4096),
            nn.ReLU(),
            nn.Linear(4096, num_classes),  # Classification
            nn.Linear(4096, num_anchors * 4)  # Bounding box regression
        )

    def forward(self, images, proposals):
        features = self.backbone(images)
        objectness, bbox_regression = self.rpn(features)
        pooled_rois = self.roi_pool(features, proposals)
        pooled_rois_flat = pooled_rois.view(pooled_rois.size(0), -1)  # Flatten
        fc_out = self.fc(pooled_rois_flat)
        return objectness, bbox_regression, fc_out

IoU Loss Function

In [None]:
def iou_loss(pred_boxes, target_boxes):
    # pred_boxes and target_boxes should be of shape (N, 4) where N is number of boxes
    inter_x1 = torch.max(pred_boxes[:, 0], target_boxes[:, 0])
    inter_y1 = torch.max(pred_boxes[:, 1], target_boxes[:, 1])
    inter_x2 = torch.min(pred_boxes[:, 2], target_boxes[:, 2])
    inter_y2 = torch.min(pred_boxes[:, 3], target_boxes[:, 3])

    inter_area = torch.clamp(inter_x2 - inter_x1, min=0) * torch.clamp(inter_y2 - inter_y1, min=0)
    pred_area = (pred_boxes[:, 2] - pred_boxes[:, 0]) * (pred_boxes[:, 3] - pred_boxes[:, 1])
    target_area = (target_boxes[:, 2] - target_boxes[:, 0]) * (target_boxes[:, 3] - target_boxes[:, 1])

    union_area = pred_area + target_area - inter_area
    iou = inter_area / (union_area + 1e-6)  # Add epsilon to avoid division by zero

    return 1 - iou.mean()

In [None]:
def non_max_suppression(boxes, scores, threshold=0.5):
    if len(boxes) == 0:
        return []

    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    areas = (x2 - x1) * (y2 - y1)
    order = scores.argsort()[::-1]

    keep = []
    while order.size > 0:
        i = order[0]
        keep.append(i)

        xx1 = torch.max(x1[i], x1[order[1:]])
        yy1 = torch.max(y1[i], y1[order[1:]])
        xx2 = torch.min(x2[i], x2[order[1:]])
        yy2 = torch.min(y2[i], y2[order[1:]])

        w = torch.clamp(xx2 - xx1, min=0)
        h = torch.clamp(yy2 - yy1, min=0)

        inter = w * h
        ovr = inter / (areas[i] + areas[order[1:]] - inter)

        order = order[1:][ovr <= threshold]

    return keep

Proposal generator

In [None]:
def generate_proposals(objectness_scores, bbox_deltas, anchors, threshold=0.5):
    scores = torch.sigmoid(objectness_scores)  # Apply sigmoid to get probabilities
    filtered_indices = torch.where(scores > threshold)[0]

    proposals = []
    for idx in filtered_indices:
        anchor = anchors[idx]
        delta = bbox_deltas[idx]
        proposal = anchor + delta
        proposals.append(proposal)

    return torch.stack(proposals)

Training

In [None]:
def train_model(model, dataloader, optimizer, num_epochs=10):
    model.train()
    for epoch in range(num_epochs):
        for images, targets in dataloader:
            optimizer.zero_grad()

            # Forward pass through the model
            objectness, bbox_regression = model.rpn(model.backbone(images))

            # Generate proposals
            proposals = generate_proposals(objectness, bbox_regression, dataset.anchors)

            # NMS to filter proposals
            # Assuming the scores are the objectness scores from the RPN
            keep_indices = non_max_suppression(proposals, objectness[keep_indices], threshold=0.5)
            proposals = proposals[keep_indices]

            # Run the proposals through the model
            objectness, bbox_regression, fc_out = model(images, proposals)

            # Assuming targets include labels and bounding boxes
            class_loss = nn.CrossEntropyLoss()(fc_out, targets['labels'])
            bbox_loss = iou_loss(bbox_regression, targets['boxes'])

            loss = class_loss + bbox_loss
            loss.backward()
            optimizer.step()
        print(f'Epoch {epoch + 1}/{num_epochs}, Loss: {loss.item()}')

Notes:
 - Image dataset should have anchor boxes
 - RoI pooling takes (batch_index, x1, y1, x2, y2)
  -