# Object Detection

In this assignment, you will implement a fruit detector.
The task is divided into steps for simpler navigation.

Let's start!

In [None]:
# we will need this library to process the labeling
! pip install xmltodict



In [None]:
! pip install torchvision



In [None]:
import numpy as np
import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
import xmltodict
import json
import glob
import cv2
import os
import torchvision
import matplotlib.pyplot as plt

import torchvision
import torchvision.transforms as T
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection import fasterrcnn_resnet50_fpn

## Step 0. Dataset

First, let's load the data that you can download [here](https://drive.google.com/file/d/1Ve5e9qdy_sUCMM4qXWrw8ecURg2af9Cm/view?usp=sharing).

We have already written a dataset class for you and we encourage you to figure out how it works.

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
class2tag = {"apple": 1, "orange": 2, "banana": 3}


class FruitDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.images = []
        self.annotations = []
        self.transform = transform
        for annotation in glob.glob(data_dir + "/*xml"):
            image_fname = os.path.splitext(annotation)[0] + ".jpg"

            # self.images.append(cv2.cvtColor(cv2.imread(image_fname), cv2.COLOR_BGR2RGB).astype(np.float64))

            image = cv2.cvtColor(cv2.imread(image_fname), cv2.COLOR_BGR2RGB).astype(np.float64)
            # 1 channel is RGB, the other ones are w and h. Needed for current model
            image = image.reshape((image.shape[2], image.shape[0], image.shape[1]))
            self.images.append(image)

            with open(annotation) as f:
                annotation_dict = xmltodict.parse(f.read())
            bboxes = []
            labels = []
            objects = annotation_dict["annotation"]["object"]
            if not isinstance(objects, list):
                objects = [objects]
            for obj in objects:
                bndbox = obj["bndbox"]
                bbox = [bndbox["xmin"], bndbox["ymin"], bndbox["xmax"], bndbox["ymax"]]
                bbox = list(map(int, bbox))
                bboxes.append(torch.tensor(bbox))
                labels.append(class2tag[obj["name"]])
            labels = torch.ones(len(objects), dtype=torch.int64)
            self.annotations.append(
                {"boxes": torch.stack(bboxes).float(), "labels": torch.tensor(labels)}
            )

    def __getitem__(self, i):
        if self.transform:
            # the following code is correct if you use albumentations
            # if you use torchvision transforms you have to modify it
            res = self.transform(
                image=self.images[i],
                bboxes=self.annotations[i]["boxes"],
                labels=self.annotations[i]["labels"],
            )
            return res["image"], {
                "boxes": torch.tensor(res["bboxes"]),
                "labels": torch.tensor(res["labels"]),
            }
        else:
            return self.images[i], self.annotations[i]

    def __len__(self):
        return len(self.images)

In [None]:
import os
import numpy as np
import torch
from PIL import Image


class PennFudanDataset(object):
    def __init__(self, root, transforms):
        self.root = root
        self.transforms = transforms
        # load all image files, sorting them to
        # ensure that they are aligned
        self.imgs = list(sorted(os.listdir(os.path.join(root, "PNGImages"))))
        self.masks = list(sorted(os.listdir(os.path.join(root, "PedMasks"))))

    def __getitem__(self, idx):
        # load images ad masks
        img_path = os.path.join(self.root, "PNGImages", self.imgs[idx])
        # mask_path = os.path.join(self.root, "PedMasks", self.masks[idx])
        img = Image.open(img_path).convert("RGB")
        # note that we haven't converted the mask to RGB,
        # because each color corresponds to a different instance
        # with 0 being background
        # mask = Image.open(mask_path)
        # convert the PIL Image into a numpy array
        # mask = np.array(mask)
        # instances are encoded as different colors
        # obj_ids = np.unique(mask)
        # first id is the background, so remove it
        # obj_ids = obj_ids[1:]

        # split the color-encoded mask into a set
        # of binary masks
        # masks = mask == obj_ids[:, None, None]

        # get bounding box coordinates for each mask
        num_objs = len(obj_ids)
        boxes = []
        for i in range(num_objs):
            pos = np.where(masks[i])
            xmin = np.min(pos[1])
            xmax = np.max(pos[1])
            ymin = np.min(pos[0])
            ymax = np.max(pos[0])
            boxes.append([xmin, ymin, xmax, ymax])

        # convert everything into a torch.Tensor
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        # there is only one class
        labels = torch.ones((num_objs,), dtype=torch.int64)
        masks = torch.as_tensor(masks, dtype=torch.uint8)

        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # suppose all instances are not crowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)

        target = {}
        target["boxes"] = boxes
        target["labels"] = labels
        target["masks"] = masks
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd

        if self.transforms is not None:
            img, target = self.transforms(img, target)

        return img, target

    def __len__(self):
        return len(self.imgs)

## Step 1. Intersection over Union (10 points)

In the [Object Detection task](https://en.wikipedia.org/wiki/Object_detection), you need to find objects of a certain class on the image and locate their positions (using the bounding box). The  model should predict the coordinates of the bounding box `[x0, y0, x1, y1]` and the label for this box. The model can predict multiple candidate bounding boxes for an object. We will select candidates using [Intersection Over Union](https://en.wikipedia.org/wiki/Jaccard_index).

<img src=https://upload.wikimedia.org/wikipedia/commons/c/c7/Intersection_over_Union_-_visual_equation.png>


Implement a function that will calculate IoU for bounding boxes.

In [None]:
# type(debug_gt_bbox[0])
# type(debug_dt_bbox[0])
# gt_bbox = debug_gt_bbox[0]
# dt_bbox = debug_dt_bbox[0]
# gt_bbox[2] <= dt_bbox[0] or dt_bbox[2] <= gt_bbox[0]
# debug_gt_bbox[-1]

In [None]:
debug_gt_bbox = []
debug_dt_bbox = []

def log_debug(bbox):
    print(type(bbox))
    if isinstance(bbox, list):
        print(len(bbox))
    else:
        print(bbox.shape)
    for b in bbox:
        print(b)
    print()

def intersection_over_union(dt_bbox, gt_bbox):
    """
    Intersection over Union between two bboxes
    :param dt_bbox: list or numpy array of size (4,) [x0, y0, x1, y1] x0 - xmin, x1 - xmax, ...
    :param gt_bbox: list or numpy array of size (4,) [x0, y0, x1, y1]
    :return : intersection over union
    """
    # debug_gt_bbox.append(gt_bbox)
    # debug_dt_bbox.append(dt_bbox)

    if gt_bbox[2] <= dt_bbox[0] or dt_bbox[2] <= gt_bbox[0]:
      return 0
    if gt_bbox[3] <= dt_bbox[1] or dt_bbox[3] <= gt_bbox[1]:
      return 0
    x_d = min(dt_bbox[2], gt_bbox[2]) - max(dt_bbox[0], gt_bbox[0])
    y_d = min(dt_bbox[3], gt_bbox[3]) - max(dt_bbox[1], gt_bbox[1])
    intersection = x_d * y_d
    union = (
        (dt_bbox[2] - dt_bbox[0]) * (dt_bbox[3] - dt_bbox[1]) +
        (gt_bbox[2] - gt_bbox[0]) * (gt_bbox[3] - gt_bbox[1]) -
        intersection
    )
    iou = intersection / union
    return iou

If the function is implemented correctly, then the execution of the following cell will produce:

**0.14285714285714285**

In [None]:
dt_bbox = [0, 0, 2, 2]
gt_bbox = [1, 1, 3, 3]
intersection_over_union(dt_bbox, gt_bbox)

0.14285714285714285

## Step 2. Evaluate Sample (15 points)

We now have to evaluate the predictions of the model. To do this, we will write a function that will do the following:
1. Take model predictions and ground truth bounding boxes and labels as inputs.
2. For each bounding box from the prediction, find the closest bounding box among the answers.
3. For each found pair of bounding boxes, check whether the IoU is greater than a certain threshold `iou_threshold`. If the **IoU** exceeds the threshold, then we consider this answer as **True Positive**.
4. Remove a matched bounding box from the evaluation.
5. For each predicted bounding box, return the detection score and whether we were able to match it or not.

In [None]:
def evaluate_sample(target_pred, target_true, iou_threshold=0.5):
    # ground truth
    gt_bboxes = target_true['boxes'].numpy()
    gt_labels = target_true['labels'].numpy()

    # predictions
    dt_bboxes = target_pred['boxes'].numpy()
    dt_labels = target_pred['labels'].numpy()
    dt_scores = target_pred['scores'].numpy()

    results = []
    # for each bounding box from the prediction, find the closest bounding box among the answers
    # print('evaluate sample ', len(dt_labels))
    for detection_id in range(len(dt_labels)):
        dt_bbox = dt_bboxes[detection_id, :]
        dt_label = dt_labels[detection_id]
        dt_score = dt_scores[detection_id]

        detection_result_dict = {'score': dt_score}

        max_IoU = 0
        max_gt_id = -1
        for valid_id in range(len(gt_labels)):
            cur_iou = intersection_over_union(dt_bbox, gt_bboxes[valid_id])
            if cur_iou > max_IoU:
              max_IoU = cur_iou
              max_gt_id = valid_id

        if max_gt_id >= 0 and max_IoU >= iou_threshold:
            # mark as True Positive
            detection_result_dict['TP'] = 1
            # delete matched bounding box
            gt_labels = np.delete(gt_labels, max_gt_id, axis=0)
            gt_bboxes = np.delete(gt_bboxes, max_gt_id, axis=0)
            # print(True)
        else:
            detection_result_dict['TP'] = 0
            # print(False)

        results.append(detection_result_dict)

    return results

## Step 3. Evaluate Model (15 points)

To assess the quality of the model, we will use the [mAP](https://jonathan-hui.medium.com/\map-mean-average-precision-for-object-detection-45c121a31173) metric defined as AP Area under the curve. To do this, you will need to calculate `recall` and` precision`.

In [None]:
from sklearn.metrics import auc

In [None]:
# results = debug_results[-1]
# nbr_boxes = debug_nbr[-1]

# precision = []
# recall = []

# tp_so_far = 0
# for e, r in enumerate(results):
#     if r['TP']:
#       tp_so_far += 1
#     precision.append(tp_so_far / (e + 1))
#     recall.append(tp_so_far / nbr_boxes)

In [None]:
# results = debug_results[-2]
# results
# sorted(results, key=lambda k: k['score'], reverse=True)

In [None]:
# val_dataloader
# for images, targets_true in val_dataloader:
#   print(1)

In [None]:
debug_targets_pred = []
debug_targets_true = []
debug_results = []
debug_nbr = []

def evaluate(model, test_loader, device):
    # print('evaluate')

    def log_debug(target):
        print(type(target))
        if isinstance(target, list):
            print(len(target))
        else:
            print(target.shape)
        for t in target:
            print(type(t))
            if isinstance(t, np.ndarray):
                print(t.shape)
        print()

    results = []
    model.eval()
    nbr_boxes = 0
    with torch.no_grad():
        for images, targets_true in test_loader:
            images = images[0]
            images = list(image.unsqueeze(0).to(device).float() for image in images)
            targets_pred = model(images)

            targets_true = [{k: v.cpu().float() for k, v in t.items()} for t in targets_true]
            # result of batch_size == 1?
            # targets_true = [{k: v.cpu().float()[0] for k, v in t.items()} for t in [targets_true]]
            targets_pred = [{k: v.cpu().float() for k, v in t.items()} for t in targets_pred]

            for i in range(len(targets_true)):
                target_true = targets_true[i]
                target_pred = targets_pred[i]
                nbr_boxes += target_true['labels'].shape[0]

                results.extend(evaluate_sample(target_pred, target_true))

            # print(len(results))

    results = sorted(results, key=lambda k: k['score'], reverse=True)
    if not len(results):
        return None

    # compute precision and recall to calculate mAP

    precision = []
    recall = []

    debug_results.append(results)
    debug_nbr.append(nbr_boxes)

    tp_so_far = 0
    counter = 0
    for r in results:
        if r['TP']:
            tp_so_far += 1
        precision.append(tp_so_far / (counter + 1))
        recall.append(tp_so_far / nbr_boxes)
        counter += 1

    return auc(recall, precision)

In [None]:
debug_results

# targets_true = debug_targets_true[-1]
# targets_pred = debug_targets_pred[-1]
# targets_true = [{k: v.cpu().float()[0] for k, v in t.items()} for t in [targets_true]]
# targets_pred = [{k: v.cpu().float() for k, v in t.items()} for t in targets_pred]
# print(targets_true[0]['labels'])
# print(targets_pred[0]['labels'])

[]

## Step 4. Train functions (30 points)

Now define the functions for training the model.

In [None]:
# images, targets = next(iter(train_dataloader))
# images = list(image.to(device).float() for image in images)
# debug_targets.append(targets)
# targets = [{k: v.to(device)[0] for k, v in t.items()} for t in [targets]]
# loss_dict = model(images, targets)
# losses = sum(loss for loss in loss_dict.values())
# loss = losses_reduced.item()

In [None]:
def warmup_lr_scheduler(optimizer, warmup_iters, warmup_factor):

    def f(x):
        if x >= warmup_iters:
            return 1
        alpha = float(x) / warmup_iters
        return warmup_factor * (1 - alpha) + alpha

    return torch.optim.lr_scheduler.LambdaLR(optimizer, f)

In [None]:
debug_targets = []

def train_one_epoch(model, train_dataloader, optimizer, device, epoch):
    # YOUR CODE HERE
    # TRAIN YOUR MODEL ON THE train_dataloader

    model.train()

    lr_scheduler = None
    if epoch == 0:
        warmup_factor = 1. / 1000
        warmup_iters = min(1000, len(train_dataloader) - 1)

        lr_scheduler = warmup_lr_scheduler(optimizer, warmup_iters, warmup_factor)

    for images, targets in train_dataloader:
        images = list(image.to(device).float() for image in images)
        targets = [{k: v.to(device)[0] for k, v in t.items()} for t in targets]
        # result of batch_size == 1?
        # targets = [{k: v.to(device)[0] for k, v in t.items()} for t in [targets]]
        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        # optimizer.zero_grad()
        # losses.backward()
        # optimizer.step()

        # if lr_scheduler is not None:
        #     lr_scheduler.step()

        optimizer.zero_grad()
        # if logits.grad is not None:
        #     logits.grad.zero_()
        losses.backward()
        optimizer.step()

        if lr_scheduler is not None:
            lr_scheduler.step()


def train(model, train_dataloader, val_dataloader, optimizer, lr_scheduler, device, n_epochs=10):
    for epoch in range(n_epochs):
        print("EPOCH: %s" % epoch)
        model.to(device)

        # model.train()
        train_one_epoch(model, train_dataloader, optimizer, device=device, epoch=epoch)
        lr_scheduler.step()
        test_auc = evaluate(model, val_dataloader, device=device)
        if test_auc is not None:
            print("AUC ON TEST: {:.4f}".format(test_auc))


## Step 5. Train model (30 points)

Train the model for object detection on a training dataset and achieve a PR-AUC of at least 0.91 on a test dataset. You can use models from `torchvision`.

It is mandatory to use augmentation for training to achieve the desired result on the test. Use the `torchvision.transforms` module or the [albumentations](https://albumentations.ai/) library. The latter library is especially convenient since it can calculate the new coordinates of bounding boxes itself after image transformations. We advise you to pay attention to this [tutorial](https://albumentations.ai/docs/getting_started/bounding_boxes_augmentation/). Please note that the code written in the dataset above is only correct if you are using `albumentations`.

In [None]:
import albumentations as A

from albumentations.pytorch import ToTensor
import cv2

train_transform = A.Compose([
    # A.RandomCrop(width=229, height=229),

    A.augmentations.transforms.CenterCrop (height=229, width=229),
    # ToTensor(len(class2tag) + 1),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})
val_transform = A.Compose([
    # A.RandomCrop(width=229, height=229),

    A.augmentations.transforms.CenterCrop (height=229, width=229),
    # ToTensor(len(class2tag) + 1),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})
# HINT: TRAIN TRANSFORM OBVIOUSLY SHOULD BE HARDER THAN THOSE FOR VALIDATION

train_dataset = FruitDataset("./drive/MyDrive/Colab Notebooks/train_zip/train", transform=None)
val_dataset = FruitDataset("./drive/MyDrive/Colab Notebooks/test_zip/test", transform=None)



In [None]:
# train_dataset[0][1]['labels'].to

In [None]:
len(val_dataset)

60

In [None]:
from torchvision import datasets, transforms, models
from torch import optim
from torch.utils.data import DataLoader
from torchvision.models.detection import FasterRCNN

BATCH_SIZE = 6

import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

num_classes = len(class2tag) + 1

# backbone = torchvision.models.mobilenet_v2(pretrained=True).features
# backbone.out_channels = 229
# model = FasterRCNN(backbone,
#                    num_classes=2,
#                    )

# model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True, rpn_nms_thresh=0.2)
# try another model
model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)

in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
# HINT: YOU CAN USE torchvision.models AND torchvision.models.detection
# READ OFFICIAL DOCS FOR MORE INFO



train_dataloader = DataLoader(
    train_dataset,
    batch_size=BATCH_SIZE, num_workers=1, shuffle=True,
)
val_dataloader = DataLoader(
    val_dataset,
    batch_size=BATCH_SIZE, num_workers=1,
)
n_epochs = 2
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
model.to(device)
params = [p for p in model.parameters() if p.requires_grad]
# Try Adam
optimizer = torch.optim.SGD(params, lr=0.005,
                            momentum=0.9, weight_decay=0.0005)
lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=3,
                                                   gamma=0.1)

train(model, train_dataloader, val_dataloader, optimizer, lr_scheduler, device, n_epochs)

EPOCH: 0


RuntimeError: ignored

Output the final quality of the model.

In [None]:
evaluate(model, val_dataloader, device)

Draw predicted bounding boxes for any two images from the test dataset.

In [None]:
it = iter(train_dataset)
image, labels = next(it)
image, labels = next(it)
# image = image.reshape((image.shape[1], image.shape[2], image.shape[0]))
pred = model(torch.Tensor(image).unsqueeze(0).to(device))[0]

In [None]:
from PIL import ImageDraw

# image = torchvision.transform.ToPILImage()(image)
image2 = image.reshape((image.shape[1], image.shape[2], image.shape[0])).astype(np.uint8)
image2 = T.ToPILImage()(image2)
draw = ImageDraw.Draw(image2)
for box in labels['boxes']:
    draw.rectangle([(box[0], box[1]), (box[2], box[3])])

for box in pred['boxes']:
    draw.rectangle([(box[0], box[1]), (box[2], box[3])], outline='red')
image2

In [None]:

# from PIL import ImageDraw

counter = 0
for image, labels in train_dataset:
  if counter >= 10:
      break
  pred = model(torch.Tensor(image).unsqueeze(0).to(device))[0]
  # image = torchvision.transform.ToPILImage()(image)
  image2 = image.reshape((image.shape[1], image.shape[2], image.shape[0])).astype(np.uint8)
  image2 = T.ToPILImage()(image2)
  draw = ImageDraw.Draw(image2)
  for box in labels['boxes']:
      draw.rectangle([(box[0], box[1]), (box[2], box[3])])

  for box in pred['boxes']:
      draw.rectangle([(box[0], box[1]), (box[2], box[3])], outline='red')
  display(image2)
  counter += 1