<a href="https://colab.research.google.com/github/joangog/object-detection/blob/main/mask_inference.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Model evaluation (inference) on COCO 2017 dataset

The following models will be evaluated:

| Model | Backbone | Image Size | Parameters | GFLOPs
| --- | --- | --- | --- | --- |
| YOLOv5s |  Custom | 640x640 | 7.3M | 17 |
| YOLOv5m |  Custom | 640x640 | 21.4M | 51.3 |
| YOLOv5l |  Custom |640x640 | 47M | 115.5 |
| YOLOv3-tiny |  Darknet53 | 640x640 | 8.8M | 13.3 |

<br>

**Note: GPU Runtime needed (hosted or local)**

*Example experiment: Tesla K80, 460.32.03, 11441 MiB, batch_size=8, workers=2*

In [None]:
# Show system specs
!nvidia-smi --query-gpu=gpu_name,driver_version,memory.total --format=csv

### Initialization

In [None]:
# Parameters

dataset_name = 'PWMFD'  # Dataset to evaluate: 'MASKD' or 'PWMFD'

num_workers = 2  #  Data loader workers
batch_size = 1  # Data loader batch size

th = 0.5  # Threshold for confidence score of predicted bboxes to show

# Directories

load_ckpt_path = f'/home/ioanna/object-detection-checkpoints/pretrained/PWMFD_yolov5s_sgd_ep50_lr01_img320_run/weights/last.pt'  # Model checkpoint to load

import os
root_dir = os.getcwd()  # Root dir of project
dataset_dir = os.path.join(root_dir,f'dataset_{dataset_name}')

img_dir = os.path.join(dataset_dir,'images')
val_img_dir = os.path.join(img_dir,'val_images')

label_dir = os.path.join(dataset_dir,'labels')
val_label_dir = os.path.join(label_dir,'val_images')

ann_dir = os.path.join(dataset_dir,'annotations')

### Get requirements
*Note: Restart runtime after installation*

In [None]:
# Install Yolov5
!cd {root_dir}
!git clone https://github.com/ultralytics/yolov5
!pip install -r {os.path.join(root_dir,'yolov5','requirements.txt')}

In [None]:
# Install Yolov3
!cd {root_dir}
!git clone https://github.com/ultralytics/yolov3
!pip install -r {os.path.join(root_dir,'yolov3','requirements.txt')}

In [None]:
# Install colabtools
!git clone https://github.com/googlecolab/colabtools.git
!python {root_dir}/colabtools/setup.py install

In [None]:
# Install unrar command
if os.geteuid() != 0:  # If not root, ask for sudo priviledges
  from getpass import getpass
  password = getpass('Insert sudo password:')
  !echo {password} | sudo -S -k apt-get install unrar
else:
  !apt-get install unrar

In [None]:
# Clone asset files
!cd {root_dir}
!git clone https://github.com/joangog/object-detection-assets
!mv -n {os.path.join(root_dir,'object-detection-assets','scripts')} ./
!mv -n {os.path.join(root_dir,'object-detection-assets','config')} ./
!mv -n {os.path.join(root_dir,'object-detection-assets','requirements.txt')} ./
!rm -rf {os.path.join(root_dir,'object-detection-assets')}


In [None]:
# Install packages
!cd {root_dir}
!pip install -r requirements.txt

### Import packages

In [None]:
from google.colab import files
from google.colab import drive
from google.colab.output import eval_js

import os, sys
import math
import time
import copy
import re
import io

import numpy as np
import pandas as pd
import json
import xml.etree.ElementTree as ET
import html

import PIL
import cv2
import IPython
from IPython.display import display, Javascript, Image
from base64 import b64decode, b64encode

import matplotlib
import matplotlib.pyplot as plt

import torch
from torch.utils.tensorboard import SummaryWriter
import torchvision
import torchvision.models.detection as M
import torchvision.transforms.functional as F
import torchvision.utils as U
from torchvision.datasets import CocoDetection

from pycocotools import coco
from pycocotools import mask as cocomask

from ptflops import get_model_complexity_info

import scripts.utils as SU
import scripts.transforms as ST
import scripts.engine as SE
import scripts.coco_utils as SCU
from scripts.coco_eval import CocoEvaluator

### Define auxiliary functions

In [None]:
def add_motion_blur(img, kernel_size, kernel_angle):
  kernel = np.zeros((kernel_size, kernel_size))
  if kernel_angle == 'v':  # Vertical Motion Blur
    kernel[:, int((kernel_size - 1)/2)] = np.ones(kernel_size)
  elif kernel_angle == 'h':  # Horizontal Motion Blur
    kernel[int((kernel_size - 1)/2), :] = np.ones(kernel_size)
  kernel /= kernel_size
  return cv2.filter2D(img, -1, kernel)

# Converts base64 image from JS reply to PIL image
def js_to_image(js_reply):  
  image_bytes = b64decode(js_reply.split(',')[1])
  img = PIL.Image.open(io.BytesIO(image_bytes))
  return img

# Convert OpenCV Rectangle bbox overlay image into base64 byte string
def bbox_to_bytes(bbox_array):
  # Convert array into PIL image
  bbox_PIL = PIL.Image.fromarray(bbox_array, 'RGBA')
  iobuf = io.BytesIO()
  # Format bbox into png for return
  bbox_PIL.save(iobuf, format='png')
  # Format return string
  bbox_bytes = 'data:image/png;base64,{}'.format((str(b64encode(iobuf.getvalue()), 'utf-8')))
  return bbox_bytes

# JavaScript functions to print live video stream using webcam
def video_stream():
  js = Javascript('''
    var video;
    var div = null;
    var stream;
    var captureCanvas;
    var imgElement;
    var labelElement;
    
    var pendingResolve = null;
    var shutdown = false;
    
    function removeDom() {
       stream.getVideoTracks()[0].stop();
       video.remove();
       div.remove();
       video = null;
       div = null;
       stream = null;
       imgElement = null;
       captureCanvas = null;
       labelElement = null;
    }
    
    function onAnimationFrame() {
      if (!shutdown) {
        window.requestAnimationFrame(onAnimationFrame);
      }
      if (pendingResolve) {
        var result = "";
        if (!shutdown) {
          captureCanvas.getContext('2d').drawImage(video, 0, 0, 640, 480);
          result = captureCanvas.toDataURL('image/jpeg', 0.8)
        }
        var lp = pendingResolve;
        pendingResolve = null;
        lp(result);
      }
    }
    
    async function createDom() {
      if (div !== null) {
        return stream;
      }

      div = document.createElement('div');
      div.style.border = '2px solid black';
      div.style.padding = '3px';
      div.style.width = '100%';
      div.style.maxWidth = '600px';
      document.body.appendChild(div);
      
      const modelOut = document.createElement('div');
      modelOut.innerHTML = "<span>FPS: </span>";
      labelElement = document.createElement('span');
      labelElement.innerText = 'No data';
      labelElement.style.fontWeight = 'bold';
      modelOut.appendChild(labelElement);
      div.appendChild(modelOut);
           
      video = document.createElement('video');
      video.style.display = 'block';
      video.width = div.clientWidth - 6;
      video.setAttribute('playsinline', '');
      video.onclick = () => { shutdown = true; };
      stream = await navigator.mediaDevices.getUserMedia(
          {video: { facingMode: "environment"}});
      div.appendChild(video);

      imgElement = document.createElement('img');
      imgElement.style.position = 'absolute';
      imgElement.style.zIndex = 1;
      imgElement.onclick = () => { shutdown = true; };
      div.appendChild(imgElement);
      
      const instruction = document.createElement('div');
      instruction.innerHTML = 
          '<span style="color: red; font-weight: bold;">' +
          'When finished, click here or on the video to stop this demo</span>';
      div.appendChild(instruction);
      instruction.onclick = () => { shutdown = true; };
      
      video.srcObject = stream;
      await video.play();

      captureCanvas = document.createElement('canvas');
      captureCanvas.width = 640; //video.videoWidth;
      captureCanvas.height = 480; //video.videoHeight;
      window.requestAnimationFrame(onAnimationFrame);
      
      return stream;
    }
    async function stream_frame(label, imgData) {
      if (shutdown) {
        removeDom();
        shutdown = false;
        return '';
      }

      var preCreate = Date.now();
      stream = await createDom();
      
      var preShow = Date.now();
      if (label != "") {
        labelElement.innerHTML = label;
      }
            
      if (imgData != "") {
        var videoRect = video.getClientRects()[0];
        imgElement.style.top = videoRect.top + "px";
        imgElement.style.left = videoRect.left + "px";
        imgElement.style.width = videoRect.width + "px";
        imgElement.style.height = videoRect.height + "px";
        imgElement.src = imgData;
      }
      
      var preCapture = Date.now();
      var result = await new Promise(function(resolve, reject) {
        pendingResolve = resolve;
      });
      shutdown = false;
      
      return {'create': preShow - preCreate, 
              'show': preCapture - preShow, 
              'capture': Date.now() - preCapture,
              'img': result};
    }
    ''')

  display(js)
  
def video_frame(label, bbox):
  data = eval_js('stream_frame("{}", "{}")'.format(label, bbox))
  return data

### (Optional) Connect to GDrive for storage access
*Note: Not possible with local runtime*

In [None]:
drive.mount('/content/drive', force_remount=True)

### Download Mask dataset

In [None]:
!cd {root_dir}
!mkdir -p dataset_{dataset_name}
!cd {dataset_dir}
!mkdir -p {img_dir} {ann_dir} {label_dir}
!cd {img_dir}
!mkdir {val_img_dir}
!cd {label_dir}
!mkdir {val_label_dir}

In [None]:
if dataset_name == 'MASKD':

  !cd {root_dir}

  # Download validation images
  if not os.path.exists('val_images.zip'):
    !gdown --id '101F2k6PJ-tD_uwlsCG7zzGF9ILJW01M1'
  !unzip -q -n 'val_images.zip' -d {img_dir}

  # Download validation annotations
  if not os.path.exists('val.json'):
    !gdown -O {os.path.join(ann_dir,'val.json')} --id '1YLV7-7vmiNdFI8Xpdx_jbhnxfgQRWrgF'

elif dataset_name == 'PWMFD':

  # Download validation images
  if not os.path.exists('val_images.rar'):
    !gdown -O 'val_images.rar' --id  1ZXuSwoRvTnnca81RUj3kMoLFZJ6auAwT
  !unrar e -idq -o- 'val_images.rar' -d {val_img_dir}

  # Convert annotation files from PASCAL VOC .xml to COCO .json (only for PWMFD dataset)
  
  label_ids = {'with_mask': 1, 'without_mask': 2, 'incorrect_mask': 3}  # BG class is 0

  ann_count = 0  # Annotation counter

  images = []
  annotations = []

  xml_files = os.listdir(val_img_dir)
  xml_files = [file for file in xml_files if '.xml' in file]

  for xml_file in xml_files:

    tree = ET.parse(os.path.join(val_img_dir,xml_file))
    root = tree.getroot()

    # Image
    file_name = root[0].text
    height = int(root[1][1].text)
    width = int(root[1][0].text)
    id = int(re.sub(r'^\D*0*', '', file_name).replace('.jpg',''))
    images.append(
        {
          'file_name': file_name,
          'height': height,
          'width': width,
          'id': id
        }
    )
    
    # Annotations
    if len(root) > 2:  # If annotations (object attribute) exist, they will be after the 1-index attribute in the XML
      for i in range(2,len(root)):
        category_id = label_ids[root[i][0].text]
        xmin = int(root[i][1][0].text)
        ymin = int(root[i][1][1].text)
        xmax = int(root[i][1][2].text)
        ymax = int(root[i][1][3].text)
        annotations.append(
            {
              'iscrowd': 0,
              'image_id': id,
              'bbox': [xmin, ymin, xmax-xmin, ymax-ymin],
              'area': (xmax-xmin) * (ymax-ymin),
              'category_id': category_id,
              'ignore': 0,
              'id': ann_count,
              'segmentation': []
            }
        )
        ann_count += 1

  coco_dict = {
  'info': {},
  'images': images,
  'annotations': annotations,
  'licenses': []
  }

  with open(os.path.join(ann_dir,f'val.json'),'w') as outfile:
    json.dump(coco_dict, outfile, indent=3)


# Copy COCO annotations in images folder
!cp {os.path.join(ann_dir,'val.json')} {val_img_dir}


# Copy COCO annotations in images folder
!cp {os.path.join(ann_dir,'val.json')} {val_img_dir}

### Load Mask dataset

In [None]:
val_ann_file = 'val.json'  # annotations
val_ann_path = os.path.join(val_img_dir,val_ann_file)  

# Define data transforms
transforms = ST.Compose([ST.ToTensor()])

# Create dataset
val_dataset = CocoDetection(val_img_dir, val_ann_path, transforms = transforms)

# Create data loader
val_data_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=SU.collate_fn)

# Get label names
if dataset_name == 'MASKD':
  label_ids = [1,2]
  label_names = ['mask', 'no_mask']
elif dataset_name == 'PWMFD':
  label_ids = [1,2,3]
  label_names = ['with_mask', 'without_mask', 'incorrect_mask']
labels = dict(zip(label_ids,label_names))  # Label dictionary with id-name as key-value
labels_inv = dict(zip(label_names,label_ids))  # Inverse label dictionary with name-id as key-value
label_colors = {1: (0,255,0), 2:(255,0,0), 3: (255,255,0)}

### Load model

In [None]:
!cd {root_dir}

# Delete utils package to reload it (if loaded), because YOLOv3 and YOLOv5 have
# the same name for it and it causes error
try:
  sys.modules.pop('utils')
except:
  pass

# @markdown Model Selection { display-mode: 'form', run: 'auto' }
model_name = 'YOLOv5s' # @param ['SSD300 VGG16', 'SSDlite320 MobileNetV3-Large', 'Faster R-CNN ResNet-50 FPN', 'Faster R-CNN MobileNetV3-Large FPN', 'Mask R-CNN ResNet-50 FPN', 'YOLOv5s', 'YOLOv5m', 'YOLOv5l', 'YOLOv3', 'YOLOv3-tiny', 'YOLOv3-spp']

# @markdown *Note: If you get the error "Cache may be out of date, try 'force_reload=True'" then restart runtime.*

if model_name == 'SSD300 VGG16':
  model_id = 'ssd300_vgg16'
  model = M.ssd300_vgg16(pretrained=False, progress=True)
  model_img_size = (3,300,300)
elif model_name == 'SSDlite320 MobileNetV3-Large':
  model_id = 'ssdlite320_mobilenet_v3_large'
  model = M.ssdlite320_mobilenet_v3_large(pretrained=False, progress=True)
  model_img_size = (3,320,320)
elif model_name == 'Faster R-CNN ResNet-50 FPN':
  model_id = 'fasterrcnn_resnet50_fpn'
  model = M.fasterrcnn_resnet50_fpn(pretrained=False, progress=True)
  model_img_size = (3,800,800) # COCO's 640x640 in upscaled to the model's minimum 800x800
elif model_name == 'Faster R-CNN MobileNetV3-Large FPN':
  model_id = 'fasterrcnn_mobilenet_v3_large_fpn'
  model = M.fasterrcnn_mobilenet_v3_large_fpn(pretrained=False, progress=True)
  model_img_size = (3,800,800) 
elif model_name == 'Mask R-CNN ResNet-50 FPN':
  model_id = 'maskrcnn_resnet50_fpn'
  model = M.maskrcnn_resnet50_fpn(pretrained=False, progress=True)
  model_img_size = (3,800,800)
elif 'YOLOv5' in model_name:
  model_id = model_name.lower().replace('-','_')
  model = torch.hub.load(os.path.join(root_dir,'yolov5'), 'custom', path=load_ckpt_path, source='local', force_reload=True)
  model_img_size = (3,640,640)
elif 'YOLOv3' in model_name:
  model_id = model_name.lower().replace('-','_')
  model = torch.hub.load(os.path.join(root_dir,'yolov3'), 'custom', path=load_ckpt_path, source='local', force_reload=True)
  model_img_size = (3,640,640)

# Prepare model for dataset (for Fast R-CNN or Mask R-CNN)
if 'R-CNN' in model_name: 
  num_classes = len(val_dataset.coco.getCatIds()) + 1
  # Get the number of input features for the bbox predictor
  in_features = model.roi_heads.box_predictor.cls_score.in_features
  # Replace the pre-trained head with a new one
  model.roi_heads.box_predictor = M.faster_rcnn.FastRCNNPredictor(in_features, num_classes)  # includes background (0) class
  if 'Mask R-CNN' in model_name:
    # Get the number of input features for the segmentation max predictor
    in_features_mask = model.roi_heads.mask_predictor.conv5_mask.in_channels
    hidden_layer = 256
    # Replace the mask predictor with a new one
    model.roi_heads.mask_predictor = M.mask_rcnn.MaskRCNNPredictor(in_features_mask, hidden_layer,num_classes)

print('-------------------------------------------------------------------------------------------------------\n')

print(f'Loaded model: {model_name}')
model_params = round(sum([param.numel() for param in model.parameters()]) / 1000000, 1)
print(f'\t- Parameters: {model_params}M')
model_macs, _ = get_model_complexity_info(model, model_img_size, as_strings=False, 
                                          print_per_layer_stat=False, verbose=False)
model_gflops = round(2 * int(model_macs) / 1000000000, 1)
print(f'\t- GFLOPs: {model_gflops}')

### (Optional) Test model with image sample
*Note 1: If you get the error "module 'PIL.TiffTags' has no attribute 'IFD'" then restart runtime.*


In [None]:
%matplotlib inline

# Parameters
img_from_path = False
img_path = '/content/test.jpg'  # Only for image from path
img_id = 32  # Only for image not from path (image from val dataset)

# Get appropriate device for model
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Get image sample
if img_from_path:
  img = cv2.imread(img_path)
else:
  img = cv2.imread(os.path.join(val_img_dir,val_dataset.coco.loadImgs([img_id])[0]['file_name']))

img = PIL.Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))

# Format image
img_tensor = F.convert_image_dtype(F.to_tensor(img),torch.uint8)
img_torchvision = torch.div(img_tensor,255).float().to(device)  # Format image for torchvision models
img_anns = val_dataset.coco.loadAnns(val_dataset.coco.getAnnIds([img_id]))

# Draw ground truth bboxes
if not img_from_path:  # If the image is not from path (if it is, ground truth doesn't exist)
  true_bboxes = SE.convert_to_xyxy(copy.deepcopy(F.Tensor([obj['bbox'] for obj in img_anns]).to(device)))  # Create deep copy to avoid updating original dataset
  true_labels = [labels[obj['category_id']] for obj in img_anns]
  true_img = U.draw_bounding_boxes(img_tensor, true_bboxes, true_labels, colors=[label_colors[obj['category_id']] for obj in img_anns])
  plt.figure(figsize = (25,7))
  plt.title('Ground Truth Detection')
  plot = plt.imshow(F.to_pil_image(true_img))

# Generate model predictions
model.eval()
with torch.no_grad():
  if 'YOLO' in model_name:
    pred = model([img])
  else:    
    pred = model([img_torchvision])

# Get predicted bboxes
# For YOLO models
if 'YOLO' in model_name:  
  pred_bboxes = []
  pred_label_ids = []
  pred_labels = []
  for bbox in pred.xyxy[0]:  # For every bbox
    conf = bbox[4]
    if conf > th:  # Show only bboxes with high confidence score
      pred_bboxes.append(bbox[:4])
      label_id = labels_inv[label_names[int(bbox[5])]]  # Convert YOLO label id to COCO label id
      pred_label_ids.append(label_id)  
      pred_labels.append(labels[label_id] + f'[{int(conf*100)}%]')
  if len(pred_bboxes) != 0:
    pred_bboxes = torch.stack(pred_bboxes)

# For torchvision models
else:
  for i, bbox in enumerate(pred[0]['boxes']):  # For every bbox
    conf = pred[0]['scores'][i]
    if conf > th:  # Show only bboxes with high confidence score
      pred_bboxes.append(bbox)
      label_id = pred[0]['labels'][i]
      pred_label_ids.append(label_id)
      pred_labels.append(labels[label_id] + f'[{int(conf*100)}%]')
  if len(pred_bboxes) != 0:
    pred_bboxes = torch.stack(pred_bboxes)

# Draw predicted bboxes
if len(pred_bboxes) != 0:
  pred_img = U.draw_bounding_boxes(img_tensor, pred_bboxes, pred_labels, colors=[label_colors[label_id] for label_id in pred_label_ids])
else:  # If no bboxes are found just return the image
  pred_img = img_tensor
plt.figure(figsize = (25,7))
plt.title(f'Predicted Detection (thresh={th})')
plot = plt.imshow(F.to_pil_image(pred_img))

### Test model on webcam video stream

In [None]:
# Get appropriate device for model
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

# Start streaming video from webcam
video_stream()

# FPS monitor
fps = 0

# Initialze bounding box to empty
bbox = ''
count = 0 

while True:
    
    js_reply = video_frame(fps, bbox)
    if not js_reply:
        break

    # Convert JS response to PIL Image
    frame = js_to_image(js_reply["img"])
    plt.imshow(frame)

    # Create transparent overlay for bounding box
    bbox_array = np.zeros([480,640,4], dtype=np.uint8)

    # Format frame (for torchvision models)
    frame_tensor = F.convert_image_dtype(F.to_tensor(frame),torch.uint8)
    frame_torchvision = torch.div(frame_tensor,255).float().to(device)

    # Generate model predictions
    model.eval()
    model_time = time.time()
    with torch.no_grad():
      if 'YOLO' in model_name:
        pred = model(frame)
      else:    
        pred = model([frame_torchvision])
    model_time = time.time() - model_time
    fps = int(1/model_time)

    # Get predicted bboxes
    # For YOLO models
    if 'YOLO' in model_name:  
      pred_bboxes = []
      pred_label_ids = []
      pred_labels = []
      for img in pred.xyxy: # For every image
        for bbox in img:  # For every bbox of that image
          conf = bbox[4]
          if conf > th:  # Show only bboxes with high confidence score
            pred_bboxes.append(bbox[:4])
            label_id = labels_inv[label_names[int(bbox[5])]]  # Convert YOLO label id to COCO label id
            pred_label_ids.append(label_id)  
            pred_labels.append(labels[label_id] + f'[{int(conf*100)}%]')
      if len(pred_bboxes) != 0:
        pred_bboxes = torch.stack(pred_bboxes)
    # For torchvision models
    else:
      for i, bbox in enumerate(pred[0]['boxes']):  # For every bbox
        conf = pred[0]['scores'][i]
        if conf > th:  # Show only bboxes with high confidence score
          pred_bboxes.append(bbox)
          label_id = pred[0]['labels'][i]
          pred_label_ids.append(label_id)
          pred_labels.append(labels[label_id] + f'[{int(conf*100)}%]')
      if len(pred_bboxes) != 0:
        pred_bboxes = torch.stack(pred_bboxes)  

    predictions = zip(pred_label_ids, pred_labels, pred_bboxes)

    # Loop through detections and draw them on transparent overlay image
    for label_id, label, bbox in predictions:
      left, top, right, bottom = bbox
      left = int(left); top = int(top); right = int(right); bottom = int(bottom)
      bbox_array = cv2.rectangle(bbox_array, (left, top), (right, bottom), label_colors[label_id], 2)
      bbox_array = cv2.putText(bbox_array, "{}".format(label),
                        (left, top - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
                        label_colors[label_id], 2)

    bbox_array[:,:,3] = (bbox_array.max(axis = 2) > 0 ).astype(int) * 255
    # convert overlay of bbox into bytes
    bbox_bytes = bbox_to_bytes(bbox_array)
    # update bbox so next frame gets new overlay
    bbox = bbox_bytes

### Evaluate model
*Note 1: If you get the error "module 'PIL.TiffTags' has no attribute 'IFD'" then restart runtime.*

*Note 2: To get accurate maximum GPU memory usage logging, restart runtime when choosing a different model.*

In [None]:
# Get appropriate device for model
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)
print(f'Model: {model_name}')

# Evaluate model
evaluator, fps, max_mem, outputs = SE.evaluate(model, val_data_loader, device)