## Object Detection - Pytorch Retinanet
- code reference taken from - https://debuggercafe.com/object-detection-using-retinanet-with-pytorch-and-deep-learning/
- https://pytorch.org/docs/stable/torchvision/models.html#torchvision.models.detection.retinanet_resnet50_fpn

In [43]:
#import statements
import torch
from torch import nn
import torch.nn.functional as F
from torchvision.ops import misc as misc_nn_ops
from torchvision.ops import MultiScaleRoIAlign
import torchvision
from pathlib import Path
import matplotlib
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
import warnings
warnings.filterwarnings('ignore')
from PIL import Image
import torchvision.transforms as transforms
import cv2
import bbox_visualizer as bbv
import warnings
warnings.filterwarnings('ignore')
import os
import time
import image_preprocessing_library as lib
import pandas as pd

### Model Name List

In [2]:
model_name = "retinanet_resnet50_fpn"
model = torchvision.models.detection.retinanet_resnet50_fpn(pretrained=True)
model.eval()

RetinaNet(
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d(64)
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d(64)
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d(64)
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d(256)
          (relu): ReLU(inplace=True)
          (downsample): Sequential(
            (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
            (1): FrozenBatchNorm2d(256)
          )
        )
        (1): Bottleneck

In [3]:
dataset_path = Path("../../../dataset/object_detection/images")
detection_result_path = Path("../experiment_results")
df_columns = ["image_name", "label", "detection_score", "ymin", "xmin", "ymax", "xmax", "model_name", "processing_seq_name"]

### Transform 

In [4]:
transform = transforms.Compose([transforms.ToTensor()])

### COCO LABELS

In [5]:
COCO_INSTANCE_CATEGORY_NAMES = [
    '__background__', 'person', 'bicycle', 'car', 'motorcycle', 'airplane', 'bus',
    'train', 'truck', 'boat', 'traffic light', 'fire hydrant', 'N/A', 'stop sign',
    'parking meter', 'bench', 'bird', 'cat', 'dog', 'horse', 'sheep', 'cow',
    'elephant', 'bear', 'zebra', 'giraffe', 'N/A', 'backpack', 'umbrella', 'N/A', 'N/A',
    'handbag', 'tie', 'suitcase', 'frisbee', 'skis', 'snowboard', 'sports ball',
    'kite', 'baseball bat', 'baseball glove', 'skateboard', 'surfboard', 'tennis racket',
    'bottle', 'N/A', 'wine glass', 'cup', 'fork', 'knife', 'spoon', 'bowl',
    'banana', 'apple', 'sandwich', 'orange', 'broccoli', 'carrot', 'hot dog', 'pizza',
    'donut', 'cake', 'chair', 'couch', 'potted plant', 'bed', 'N/A', 'dining table',
    'N/A', 'N/A', 'toilet', 'N/A', 'tv', 'laptop', 'mouse', 'remote', 'keyboard', 'cell phone',
    'microwave', 'oven', 'toaster', 'sink', 'refrigerator', 'N/A', 'book',
    'clock', 'vase', 'scissors', 'teddy bear', 'hair drier', 'toothbrush'
]

### Pre Processing Sequence

In [6]:
pre_processing_seq_dict = {
    "seq_0" : [], # for raw seq
    "seq_1" : ["gray"],
    "seq_2" : ["hsv"],
    "seq_3" : ["sharpen"],
    "seq_4" : ["gray", "bilateral_blur", "threshold_mean"],
    "seq_5" : ["gray", "bilateral_blur", "threshold_gaussian"],
    "seq_6" : ["gray", "bilateral_blur", "threshold_otsu"],
    "seq_7" : ["median_blur"],
    "seq_8" : ["gaussian_blur"],
    "seq_9" : ["bilateral_blur"],
    "seq_10" : ["fastnl_blur"],
    "seq_11" : ["gray", "bilateral_blur", "threshold_otsu", "opening"],
    "seq_12" : ["gray", "bilateral_blur", "threshold_otsu", "closing"],
    "seq_13" : ["opening"],
    "seq_14" : ["closing"],
    "seq_15" : ["gray", "sobel"],
    "seq_16" : ["gray", "laplacian"],
    "seq_17" : ["gray", "canny"]
}

### Load the downloaded models from directories

In [46]:
def load_images_batch(batch_size, seq_id):
    batch_images = []
    counter = 1
    for image_name in os.listdir(dataset_path):
        img = Image.open(dataset_path/image_name)
        #TO-DO : pre run this step of converting to RGB and remove from here
        rgb_im = img.convert('RGB')
        processed_img = apply_cv_transformations(seq_id, rgb_im)
        batch_images.append((image_name, processed_img))
        if counter % batch_size == 0:
            yield batch_images
            batch_images = []
        counter = counter + 1

def apply_cv_transformations(seq_id, pil_img):
    cv_img = convert_to_cv_img(pil_img)
    operations = get_seq_operations(seq_id)
    processed_img = cv_img
    for operation in operations:
        processed_img = lib.dispatcher[operation](processed_img)
    return convert_to_pil_img(processed_img)         

def convert_to_pil_img(opencv_img):
    if opencv_img.dtype == 'float64':
        opencv_img = opencv_img.astype(np.uint8)
    if len(opencv_img.shape) > 2:
        pil_img = cv2.cvtColor(opencv_img, cv2.COLOR_BGR2RGB)
    else:
        pil_img = cv2.cvtColor(opencv_img, cv2.COLOR_GRAY2RGB)
    pil_img = Image.fromarray(pil_img)
    return pil_img

def convert_to_cv_img(pil_img):
    np_img_arr = np.asarray(pil_img)
    cv_image=cv2.cvtColor(np_img_arr, cv2.COLOR_RGB2BGR)
    return cv_image

def get_seq_operations(seq_id):
    return pre_processing_seq_dict[seq_id]

def get_seq_name(seq_id):
    if seq_id not in pre_processing_seq_dict.keys():
        return "not found for dataset id: " + seq_id
    return " > ".join(get_seq_operations(seq_id))


def load_image_into_numpy_array(path):
    return np.array(Image.open(path))

def detect_objects(model, pil_images_tuple):
    # works on single image
    # use Image.fromarray(image_np) for converting below numpy arr img to pil img
    detections_list = []
    for pil_img_tuple in pil_images_tuple:
        tranformed_img = transform(pil_img_tuple[1])
        tranformed_img = tranformed_img.unsqueeze(0)
        detections = model(tranformed_img)
        detections = detections[0] # here taking first element only, as the results is in list form for batch images.
        detections_list.append((pil_img_tuple[0], detections))
    return detections_list

In [30]:
def filter_detections(detections_list, batch_images, denorm = True):
    # purpose - filter passed detections based on threshold, and denormalise box coordinates
    # input - [(image_name, {boxes, scores, labels, ...})]
    # output - [(image_name, {boxes, scores, labels})]
    filtered_detection_list = []
    for detection_tuple in detections_list:
        filtered_dict = {}
        image_name = detection_tuple[0]
        detections = detection_tuple[1]
        
        # TO-DO - change logic here to pick the right boxes and labels as per the score
        # do note here that the scores are not sorted, hence we may need to take the indexes where the scores is > threshold
        # and based on that we can pick the boxes and labels
        scores = detections["scores"].tolist()
        filtered_indexes = [i for i, e in enumerate(scores) if e > 0.5]
        scores = [round(score, 2) for score in scores if score > 0.5]

        boxes = detections["boxes"].tolist()
        filtered_boxes = [b for i, b in enumerate(boxes) if i in filtered_indexes]
        int_boxes = []
        for box in filtered_boxes:
            box = [int(c) for c in box]
            int_boxes.append(box)
        
        classes = detections["labels"].tolist()
        filtered_labels = [l for i, l in enumerate(classes) if i in filtered_indexes]
        #filtered_labels = [COCO_INSTANCE_CATEGORY_NAMES[i] for i in filtered_labels]
        
        filtered_dict["detection_scores"] = scores
        filtered_dict["detection_boxes"] = int_boxes
        filtered_dict["detection_classes"] = filtered_labels
        
        filtered_detection_list.append((image_name, filtered_dict))
    return filtered_detection_list

In [9]:
def run_inference(model, images_tuple):
    detections_list = detect_objects(model, images_tuple)
    # process detections object to output list as per your need
    # below method will filter out the boxes based on threshold
    detections_list = filter_detections(detections_list, images_tuple)
    return detections_list

In [40]:
def get_label_name(label_id):
    if label_id not in list(range(0, len(COCO_INSTANCE_CATEGORY_NAMES))):
        return "NA"
    return COCO_INSTANCE_CATEGORY_NAMES[label_id]

def prepare_per_image_res(detection_res):
    # detection_res: (image_name, {detection_scores:[], detection_boxes:array(), detection_classes:array()})
    num_detections_per_image = len(detection_res[1]["detection_boxes"])
    rows = []
    for i in range(0, num_detections_per_image):
        image_name = detection_res[0]
        label_id = detection_res[1]["detection_classes"][i]
        label = get_label_name(label_id)
        box = detection_res[1]["detection_boxes"][i]
        score = round(detection_res[1]["detection_scores"][i], 2)
        # retinanet gives results in format xmin, ymin, xmax, ymax
        # and we store in the format ymin, xmin, ymax, xmax
        row = [image_name, label, score, int(box[1]), int(box[0]), int(box[3]), int(box[2])]
        rows.append(row)
    return rows

def store_results(detection_results, pre_processing_seq_name):
    # process detection_results for storing it in the df/csv
    #     round of the bbox coordinates
    df_rows = []
    for res in detection_results:
        rows = prepare_per_image_res(res)
        df_rows.extend(rows)
            
    # append model_name, seq_name at the end of rows
    info = [model_name, pre_processing_seq_name]
    for row in df_rows:
        row.extend(info)
    
    # create a df for storing the results
    df = pd.DataFrame(df_rows, columns = df_columns)
    
    # check if csv file exists, if yes then append result, if not the create and dump the result
    if not os.path.exists(detection_result_path/(model_name + ".csv")):
        # store the df in to model_name.csv file
        df.to_csv(detection_result_path/(model_name + ".csv"), index=False)
    else:
        stored_df = pd.read_csv(detection_result_path/(model_name + ".csv"))
        stored_df = stored_df.append(df)
        stored_df.to_csv(detection_result_path/(model_name + ".csv"), index=False)

### RUN below for all inference

In [None]:
for seq_id in pre_processing_seq_dict.keys():
    print("processing for seq_id:{0} started.".format(seq_id))
    counter = 1
    for batch_images in load_images_batch(50, seq_id):
        print("{0}".format(counter), end="...")
        detection_result = run_inference(model, batch_images)
        store_results(detection_result, seq_id)
        counter = counter + 1
    print("----------------")
print("Inference completed for model:{0}".format(model_name))

In [47]:
# batch_images = next(load_images_batch(2, "seq_1"))
# res = run_inference(model, batch_images)
# store_results(res, "seq_0")