In [None]:
# necessary imports
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision.datasets import VOCDetection
import numpy as np
import torch.nn as nn
import math
import torch.nn.functional as F
from PIL import Image
import cv2
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from torch.utils.data import DataLoader
import torch.optim as optim
from tqdm import tqdm
import torchvision.models as models

In [None]:
# set device
if (torch.cuda.is_available()):
    device = torch.device("cuda")
    print(f"gpu found {torch.cuda.get_device_name}")
else:
    device = torch.device("cpu")
    print(f"gpu not found")

In [None]:
# dowload the dataset
root = './'

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Resize((227, 227))
])

train_dataset = VOCDetection(root=root, year='2012', image_set='train', download='True', transform=transform)

In [None]:
CLASS_MAPPING = {
    'aeroplane': 1,
    'bicycle': 2,
    'bird': 3,
    'boat': 4,
    'bottle': 5,
    'bus': 6,
    'car': 7,
    'cat': 8,
    'chair': 9,
    'cow': 10,
    'diningtable': 11,
    'dog': 12,
    'horse': 13,
    'motorbike': 14,
    'person': 15,
    'pottedplant': 16,
    'sheep': 17,
    'sofa': 18,
    'train': 19,
    'tvmonitor': 20,
}

def get_class_id(class_label):
    if class_label in CLASS_MAPPING:
        return CLASS_MAPPING[class_label]
    return 0

In [None]:
# changes to the vggnet according to the paper
"""
1. First, the last max pooling layer is replaced by an RoI
pooling layer.
2. Second, the network’s last fully connected layer and softmax 
(which were trained for 1000-way ImageNet classification) are 
replaced with the two sibling layers
"""

def get_updated_vgg16(use_drop=True):
    # load the model
    vgg16 = models.vgg16(weights='DEFAULT')
    # total number of features are 31 and 30-indexed layer is MaxPool2d
    features = list(vgg16.features)[:-1] # remove the last max pooling layer
    classifier = list(vgg16.classifier)

    # remove last fc layer
    del classifier[6]

    # ignore Dropout layers
    if not use_drop:
        # remove dropout layers
        del classifier[2]
        del classifier[-1]

    classifier = nn.Sequential(*classifier)

    # TODO: Maybe freeze few conv layers? idk

    return nn.Sequential(*features), classifier

In [None]:
class ROILayer(nn.Module):
    """
    This layer projects each RoI literally on to the feature map and 
    for each RoI region on feature map, a fixed size vector is obtained to pass these
    fixed size vectors through fc layer.
    """
    def __init__(self, output_size=(7, 7), spatial_scale=0.062):
        super(ROILayer, self).__init__()
        self.output_size = output_size
        self.spatial_scale = spatial_scale

    def forward(self, rois, feature_map):
        # rois shape is [N, D]
        num_batches, num_channels, img_height, img_width = feature_map.size()
        num_rois = rois.size(0) # rois shape would be [N, D]
        # init an empty tensor of the desired output shape
        output = torch.zeros(num_rois, num_channels, self.output_size[0], self.output_size[1]) # [R, C, H, W]
        # for each roi, get corresponding box of feature map across all the channels
        for i in range(num_rois):
            roi = list(rois[i]) # roi shape would be [x y h w]
            # get scaled roi values of bb's top-left and bottom-right coords
            roi_start_w, roi_start_h, roi_end_w, roi_end_h = self._scaled_coords([roi[0], roi[1], roi[0]+roi[2], roi[1]+roi[3]], img_height, img_width)
            roi_height = roi_end_h - roi_start_h
            roi_width = roi_end_w - roi_start_w
            # this is the bin size from where max pooling will be performed
            bin_height = roi_height / float(self.output_size[0])
            bin_width = roi_width / float(self.output_size[1])
            batch = 0 if i < 64 else 1
            for ph in range(self.output_size[0]):
                for pw in range(self.output_size[1]):
                    start_h = int(roi_start_h + ph * bin_height)
                    start_w = int(roi_start_w + pw * bin_width)
                    end_h = int(roi_start_h + (ph + 1) * bin_height)
                    end_w = int(roi_start_w + (pw + 1) * bin_width)

                    # Clamp the region to be within the bounds of the feature map
                    start_h = min(max(start_h, 0), img_height)
                    end_h = min(max(end_h, 0), img_height)
                    start_w = min(max(start_w, 0), img_width)
                    end_w = min(max(end_w, 0), img_width)

                    # Perform max pooling over the specified region for each channel
                    if start_h < end_h and start_w < end_w:
                        region = feature_map[batch, :, start_h:end_h, start_w:end_w]
                        output[i, :, ph, pw] = F.adaptive_max_pool2d(region, (1, 1)).view(-1)

        return output                      


    def _scaled_coords(self, coords, img_height, img_width):
        """
        Since RoI values are calculated in image spatial dimensions scale
        and the feature map is in diff spatial dim, scale/transform the values of RoI
        to feature map scale from image dim scale
        """
        x1, y1, x2, y2 = coords
        x1 = x1 * img_width * self.spatial_scale
        y1 = y1 * img_height * self.spatial_scale
        x2 = x2 * img_width * self.spatial_scale
        y2 = y2 * img_height * self.spatial_scale
        return x1, y1, x2, y2


In [None]:
class BBoxRegressionLayer(nn.Module):
    def __init__(self, in_features, num_classes):
        super(BBoxRegressionLayer, self).__init__()
        self.fc = nn.Linear(in_features, 4 * num_classes)  # 4 offsets per class

    def forward(self, x):
        x = self.fc(x)
        # Reshape the output to (batch_size, num_classes, 4)
        x = x.view(x.size(0), -1, 4)
        return x

In [None]:
class FastRCNN(nn.Module):
    def __init__(self, num_classes):
        """
        A pre-trained network with 3 changes. Considering VGG16 as a pre-trained network
        """
        super(FastRCNN, self).__init__()
         # pre-trained CNN's feature extractor and classifer modules
        self.feat_extractor, self.classifier = get_updated_vgg16()
        # RoI max pooling layer
        self.roi_layer = ROILayer((7, 7))
        # fully-connected layer 
        self.fc_layer = nn.Linear(4096, num_classes + 1)
        self.bbox = BBoxRegressionLayer(4096, num_classes)

    def forward(self, x, rois):
        """
        Args:
            x -> batch of image tensors [B, C, H, W]
            rois -> list of regio proposal obejcts each of shape [x1, y1, x2, y2]
        """
        # image shape would be [N, C, H, W]
        # pass thru feature extractor
        feat_map = self.feat_extractor(x); # this produces a conv feature map
        x = self.roi_layer(rois, feat_map) # roi max pooling layer
        # flatten the tensors to pass it to next stage that is fully-connected layer
        x = torch.flatten(x, start_dim=1) # shape would be [R, 25088]
        # pass thru the classifier
        x = self.classifier(x)
        # two sibling output layers
        log_probs = F.softmax(self.fc_layer(x), dim=-1)
        regressor = self.bbox(x)
        return log_probs, regressor

In [None]:
def get_zero_index(t):
    target_value = 0
    temp = (t == target_value).nonzero(as_tuple=False)
    if temp.size(0) == 0:
        return -1
    # print(temp[0])
    return int(temp[0])

class MultiTaskLoss(nn.Module):
    """
    combination of classification loss and regression bounding box loss.
    classification loss is NLL. can use CrossEntropyLoss
    regression loss is L1 loss. can use SmoothL1Loss
    """
    def __init__(self, lambda_param=1.0):
        super(MultiTaskLoss, self).__init__()
        self.lambda_param = lambda_param
        self.cls_loss_fn = nn.CrossEntropyLoss()
        self.loc_loss_fn = nn.SmoothL1Loss(reduction='sum')
    
    def forward(self, gt_labels, pred_labels, gt_boxes, pred_boxes):
        # calculate classification loss
        cls_loss = self.cls_loss_fn(pred_labels, gt_labels)
        # if cls_loss.item() is NaN:

        print(f"cls loss {cls_loss.item()}")
        # calculate regresion bounding box loss
        # gt_labels is tensor of shape [1, 2, 5, 19, 19, 19, 0, 0, 0, 0]
        zero_start_index = get_zero_index(gt_labels)
        foreground_obj_labels = gt_labels[:zero_start_index] # shape would 1D tensor of size N
        # print(foreground_obj_labels)
        # get slices of boxes for only class labels >= 1. considering only foreground obbjects bounding boxes
        gt_boxes_slice = gt_boxes[:zero_start_index, :] # shape would be [N, 4]
        pred_boxes_slice = pred_boxes[:zero_start_index, :, :] # shape would be [N, C, 4]
        # pick a single class label box of each RoI from pred_boxes_slice using foreground_obj_labels values as indices
        # sliced_tensor = torch.gather(pred_boxes_slice, 1, foreground_obj_labels.view(-1, 1, 1).expand(-1, -1, pred_boxes_slice.size(2)))
        sliced_tensor = pred_boxes_slice[torch.arange(len(foreground_obj_labels)), foreground_obj_labels - 1]
        loc_loss = self.loc_loss_fn(gt_boxes_slice, sliced_tensor)
        print(f"loc loss {loc_loss.item()}")
        # add losses
        loss = cls_loss + self.lambda_param * loc_loss
        return loss


In [None]:
import math

def truncate(f, n):
    return math.floor(f * 10 ** n) / 10 ** n

def calculate_iou(box1, box2):
    """
    Args:
        box1: a 4-tuple int numbers of top-left and bottom-right corner points of the box [x1, y1, x2, y2]
        box2: same as box 1
    Returns:
        a floating point number of IoU overlap metric
    
    iou = area of intersection / area of union 
    """
    assert len(box1) == len(box2), "Length of box1 and box2 tuples should be equal"
    # find the top-left and bottom right points of the intersection rectangle
    a1 = max(box1[0], box2[0])
    a2 = max(box1[1], box2[1])
    a3 = min(box1[2], box2[2])
    a4 = min(box1[3], box2[3])

    # calculate height and width of the interseced rect
    width = max(0, a3 - a1)
    height = max(0, a4 - a2)

    area_int = width * height

    if area_int == 0:
        return float(0)
    
    # box1 dimensions
    box1_w = box1[2] - box1[0]
    box1_h = box1[3] - box1[1]

    # box2 dimensions
    box2_w = box2[2] - box2[0]
    box2_h = box2[3] - box2[1]

    area_uni = (box1_w * box1_h) + (box2_w * box2_h) - area_int

    return truncate(area_int / area_uni, 2)


# calculate_iou([ 58., 0., 227., 144.], [45, 37, 202, 227])

In [None]:
def get_iou_lists(proposal_boxes, gt_boxes):
    """
    Args: 
        proposal_boxes: list of bounding boxes of object proposals(2D array). each element if of shape (x1, y1, x2, y2) 
        gt_boxes: list of tuples (ground truth/actual bounding boxes of objects, class_label), each ground truth box is of shape (x1, y1, x2, y2)
    Returns:
        two lists. 
        list1 contains rois where IoU overlap is >= 0.5
        list2 where IoU overlap is in interval [0.1, 0.5) ie (>= 0.1 and < 0.5)
    """
    positive_rois = [] # for list1
    negative_rois = [] # for list2

    for gt_box in gt_boxes:
        for pp_box in proposal_boxes:
            overlap_val = calculate_iou(pp_box, gt_box[0])
            if overlap_val >= 0.5:
                # print(f'pp_box is {pp_box} and gt_box is {gt_box}')
                # append (box, gt_label, gt_box) to positive_rois list
                positive_rois.append((pp_box, get_class_id(gt_box[1]), gt_box[0]))
            elif overlap_val >= 0.1 and overlap_val < 0.5:
                negative_rois.append((pp_box, 0))
    
    return positive_rois, negative_rois

In [None]:
def get_proposals(image):
    """
    Args: 
        image => 3D numpy array of shape [H W C]
    returns:
        RoI proposals using selective search
    """
    ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()

    gs = cv2.ximgproc.segmentation.createGraphSegmentation(0.8, 150, 100)
    ss.addGraphSegmentation(gs)

    strategy_color = cv2.ximgproc.segmentation.createSelectiveSearchSegmentationStrategyColor() # color strategy 
    strategy_fill = cv2.ximgproc.segmentation.createSelectiveSearchSegmentationStrategyFill() # fill strategy
    strategy_size = cv2.ximgproc.segmentation.createSelectiveSearchSegmentationStrategySize() # size strat
    strategy_texture = cv2.ximgproc.segmentation.createSelectiveSearchSegmentationStrategyTexture() # texture strat
    strategy_multiple = cv2.ximgproc.segmentation.createSelectiveSearchSegmentationStrategyMultiple(
        strategy_color, strategy_fill, strategy_size, strategy_texture) # linear combination of all the above strats
    ss.addStrategy(strategy_multiple) # add this strat module to selective search
    
    ss.addImage(image)
    ss.setBaseImage(image)
    ss.switchToSelectiveSearchQuality()
    # get the bounding boxes with each of shape (x, y, w, h)
    proposals = ss.process()
    # convert the bounding boxes with each of shape (x, y, w, h) to (x1, y1, x2, y2)
    proposals = [[box[0], box[1], box[0]+box[2], box[1]+box[3]] for box in proposals]
    return proposals    


In [None]:
# construct RoIs from an image
def selective_search(image, bnb_boxes, limit=64, iou=True):
    """
    Takes in image tensor and its ground truth boxes and returns Region Proposals tensor
    """
    assert type(image) == torch.Tensor, "Expected image input to be of type tensor"
    # convert tensor to numpy and reshape the array to [H W C]
    # print(f"Image shape in selective seach is ${image.shape}")
    image = image.permute(1, 2, 0).detach().numpy()
    # print(f"Image shape in selective seach is ${image.shape}")
    proposals = get_proposals(image)

    if not iou:
        return torch.tensor(proposals, dtype=torch.float32)
    final_boxes, gt_labels, gt_boxes = [], [], []

    # get IoU specific object proposal groups. group 1 IoU value >= 0.5 and group 2 is in range [0.1, 0.5)
    positive_rois, negative_rois = get_iou_lists(proposals, bnb_boxes)
    positive_roi_boxes, positive_roi_labels, postive_roi_gt_boxes = [], [], []
    negative_roi_boxes, negative_roi_labels = [], []

    if len(positive_rois) > 0:
        positive_roi_boxes, positive_roi_labels, positive_roi_gt_boxes = zip(*positive_rois) # each unpacked value is a tuple, to be converted to list
        # get 25% of positive_rois
        positive_rois_limit = min(int(0.25 * limit), len(positive_rois))
        final_boxes.extend(list(positive_roi_boxes)[:positive_rois_limit])
        gt_labels.extend(list(positive_roi_labels)[:positive_rois_limit])
        gt_boxes.extend(list(positive_roi_gt_boxes)[:positive_rois_limit])


    remaining_limit = limit - len(final_boxes)
    
    if len(negative_rois) > 0:
        negative_roi_boxes, negative_roi_labels = zip(*negative_rois) # each unpacked value is a tuple, to be converted to list

        negative_rois_limit = min(remaining_limit, len(negative_rois))
        final_boxes.extend(list(negative_roi_boxes)[:negative_rois_limit])
        gt_labels.extend(list(negative_roi_labels)[:negative_rois_limit])
        for _ in range(negative_rois_limit):
            gt_boxes.append([0, 0, 0, 0])

    # hackiest code below. be cautious!
    # if the final boxes length does not reach limit
    while (len(final_boxes) < limit):
        temp_limit = limit - len(final_boxes)
        final_boxes.extend(proposals[:temp_limit])
        gt_labels.extend([0] * temp_limit)
        for _ in range(temp_limit):
            gt_boxes.append([0, 0, 0, 0])

    if len(final_boxes) == 0:
        # what to do in this case?
        final_boxes = proposals[:limit] # what if proposals are less than limit
        gt_labels = [0] * limit
        for _ in range(limit):
            gt_boxes.append([0, 0, 0, 0])

    assert len(final_boxes) == limit, f"Error length of final boxes is {len(final_boxes)}"
    if len(final_boxes) > limit:
        # clip them
        final_boxes = final_boxes[:limit]
    if len(gt_labels) > limit:
        # clip them
        gt_labels = gt_labels[:limit]
    if len(gt_boxes) > limit:
        # clip them
        gt_boxes = gt_boxes[:limit]
    assert len(gt_boxes) == limit, f"gt_boxes limit is greater than limit {limit}"
    return torch.tensor(final_boxes, dtype=torch.float32), torch.tensor(gt_labels, dtype=torch.float32), torch.tensor(gt_boxes, dtype=torch.float32)


In [None]:
# Dataset and DataLaoder
# all torchvision datasets are subclass of torch.utils.data.Dataset. so no need of creating a new Dataset class

def get_bnd_boxes(annotation_data, height, width, rescale=False):
    """
    Args: 
        annotation_data => annaotation data object from VOC dataset
        returns 2d array of [[x1, y1, x2, y2], ...]
    """
    actual_image_dim = annotation_data["annotation"]["size"]
    h_ratio = 1
    w_ratio = 1

    if rescale:
        h_ratio = truncate(height / int(actual_image_dim["height"]), 2)
        w_ratio = truncate(width / int(actual_image_dim["width"]), 2)


    bndboxes = [obj for obj in annotation_data["annotation"]["object"]]

    # print(bndboxes)
    # clip values to height and width of image tensor on which is RoIs are produced
    return [
        ([
            min(int(float(box["bndbox"]["xmin"]) * w_ratio), width), 
            min(int(float(box["bndbox"]["ymin"]) * h_ratio), height), 
            min(int(float(box["bndbox"]["xmax"]) * w_ratio), width), 
            min(int(float(box["bndbox"]["ymax"]) * h_ratio), height)
        ], box["name"])
        for box in bndboxes
    ]

def collate_batch(batch):
    images, rois, gt_labels, gt_boxes = [], [], [], []
    for (img_tensor, annotation_data) in batch:
        images.append(img_tensor)
        bnd_boxes = get_bnd_boxes(annotation_data, img_tensor.shape[1], img_tensor.shape[2])
        # print(bnd_boxes)
        # calculate RoIs of each image and return the RoIs along with stacked tensors
        final_rois, final_labels, final_gt_boxes = selective_search(img_tensor, bnd_boxes)
        rois.append(final_rois)
        gt_labels.append(final_labels)
        gt_boxes.append(final_gt_boxes) # bnd_boxes is list of tuples
    return torch.stack(images, dim=0), torch.vstack(rois), torch.hstack(gt_labels), torch.vstack(gt_boxes)  # always stack the tensors

batch_size = 2

train_dl = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_batch)

In [None]:
model = FastRCNN(num_classes=len(CLASS_MAPPING.keys()))
criterion = MultiTaskLoss(lambda_param=1.0)
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 2

for epoch in tqdm(range(num_epochs)):
    epoch_loss = 0.0
    losses = []
    for (image_tensors, rois, gt_labels, gt_boxes) in train_dl:
        pred_labels, pred_boxes = model(image_tensors, rois)
        loss = criterion(gt_labels.long(), pred_labels, gt_boxes, pred_boxes)
        losses.append(loss.item())
        epoch_loss += loss.item()
        print(f"Batch loss is {loss.item()}")
        # backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {epoch_loss / len(train_dl)}")