# TL/DR
1. Run entire notebook
2. Results saved to: /local/ml_local/obj_detection/images_videos/output
3. (Optional) Go to section "Select a TF model by index for object detection" and select the index of the pre-trained TF model to use for object detection
4. (Optional) Go to section "Detecting/Visualizing a Single Image" and change the variable image_path_as_string to change default image to one of your choosing.
5. (Optional) Go to section "Detecting/Visualizing Video" and change video_input_path to change default video to one of your choosing.


# Overview
This notebook explores how to use prebuilt TensorFlow models available at Googles TensorFlow Zoo to perform object detection on images, video, and webcams.

This notebook will detect objects in an image and video of your choosing.  By default, the image will be a tennis photo and the video will be from Back to the Future.  If you would like to model other images/videos, change the input source in the sections titled "Detecting/Visualizing a Single Image" and "Detecting/Visualizing Video."

The Open Source Computer Vision (OpenCV) is an open-source BSD-licensed library that includes several hundreds of computer vision algorithms and is used in this tutorial to pre- and post-process images, videos, and streams.  OpenCV is imported into python as cv2.

OpenCV uses a different color ordering than the TensorFlow models.  OpenCV represents an image as a 3D numpy array with shape (height, width, 3) where 3 are the colors in order of BGR (blue, green, red). TensorFlow models expect a tensor with shape (batch_size, height, width, 3) with the color order as RGB (red, green, blue). The OpenCV function cv2.cvtColor is used to change the color ordering of OpenCV so that they are compatible with TF.  Some of the TF models require the batch_size to be 1.

To avoid confusion between OpenCV images and their associated TF model input, the following naming convention is used:
* cv_image: OpenCV image stored as a 3D numpy array with shape (height, width, 3) in BGR format
* tf_input: Tensor Flow model input associated with a cv_image stored as a 4D tensor with shape (batch, height, width, 3) in RGB format.

* File convention:
  * All file references should be of type Path
  * Any function that accepts a file reference should convert it to Path
  * Paths should be cast to strings when calling 3rd party libraries such as cv2
  * cv2.imshow() is disabled in Colab, because it causes Jupyter sessions to crash
    * see https://github.com/jupyter/notebook/issues/3935.
    * A workaround is to use cv2_imshow rather than cv2.imshow() on colab
    * from google.colab.patches import cv2_imshow

References:
* __[Read and Write Videos using OpenCV](https://arshren.medium.com/read-and-write-videos-using-opencv-7f92548afcba)__
* __[Introduction to OpenCV](https://www.geeksforgeeks.org/introduction-to-opencv/)__
* __[Object Detection Tutorial](https://www.youtube.com/watch?v=2yQqg_mXuPQ)__

Version Notes:
* v24: TF models work on Windows Pycharm with GPU support


# Imports

In [1]:
import datetime
import os
import time
import pathlib
from pathlib import Path

import cv2
import numpy as np
import json
import tensorflow as tf
from tensorflow.python.keras.utils.data_utils import get_file
from tqdm import tqdm
import pprint

In [2]:
# import object_detection_utils

# Utility Functions & Classes

## Helper Functions

In [3]:
def cv_image_list_to_tf_input(cv_images):
  """
  Convert a list of cv images into a 4D tensor for TF modeling.

  Args:
    cv_images:
        list of cv images represented as 3D numpy arrays with
        shape (height, width, 3) in BGR format

  Returns:
    4D tensor with shape (batch, height, width, 3) in RGB format with datatype uint8

  """
  cv_images_corrected = []
  for cv_image in cv_images:
    cv_images_corrected.append(cv2.cvtColor(cv_image.copy(), cv2.COLOR_BGR2RGB))
  np_images = np.stack(cv_images_corrected)
  tf_input = tf.convert_to_tensor(np_images, dtype=tf.uint8)

  return tf_input

In [4]:
def image_np_to_tf(numpy_array):
  """
  Convert a numpy array representing an image with shape
  (height, width, 3) or (batch, height, width, 3) into a tensor of
  shape (batch, height, width, 3) with datatype uint8
  which is suitable for TF modeling.

  Notes:
    1) Color order is not changed
    2) If the numpy_array is 3D a dummy batch dimension of 1 is added.
    """
  # Convert np to tf ensuring the correct data type
  tf_input = tf.convert_to_tensor(numpy_array, dtype=tf.uint8)
  # tf model expects 4D tensors with shape ( batch, height, width, 3)
  if len(tf_input.shape) == 4:
    pass
  # if array is shape (height, width, 3), add a dummy batch axis
  elif len(tf_input.shape) == 3:
    tf_input = tf_input[tf.newaxis, ...]
  else:
    msg = f"Tensor must be either 4D (batch, height, width, color)." \
          f"or 3D (height, width, color).  Incompatible shape: {tf_input.shape}"
    raise ValueError()
  print(f"{type(tf_input)=}, {tf_input.shape=}")
  return tf_input

In [5]:
def convert_coordinate_system(cv_image, tf_bounding_box):
  """
  Convert tf_bounding_box coordinates that range from 0 to 1 into
  pixel coordinates to suitable for cv plotting routines.

  Args:
    cv_image:
      An image with shape (height, width, depth) that will be overlayed
      with an object detection bounding box.
    tf_bounding_box:
      TF model output bounding box that uses a 0 to 1 coordinate system.
      The bounding box is in the form (y_min, x_min, y_max, x_max).
  Returns:
    pixel based coordinate in the form (x_min, x_max, y_min, y_max) suitable for
    cv2 plotting routines.
  """
  image_height, image_width, _ = cv_image.shape
  y_min, x_min, y_max, x_max = tf_bounding_box
  x_min, x_max, y_min, y_max = (x_min * image_width,
                                x_max * image_width,
                                y_min * image_height,
                                y_max * image_height)

  return int(x_min), int(x_max), int(y_min), int(y_max)

In [6]:
def overlay_text(cv_image,
                 text,
                 font=cv2.FONT_HERSHEY_SIMPLEX,
                 position=(0, 0),
                 font_scale=1,
                 font_thickness=1,
                 rectangle_color=(0, 255, 0),
                 ):
  """
  Overlay text on a cv_image.  The text will be white on a rectangle of color rectangle_color.

  Notes:
  1) The top left of window is the origin, as you increase y, you move downwards on the screen.
  2) When you place text, you specify the bottom left position
  """
  # location determines if the text box is inside the bounding box, or above it
  # location = 'inside'
  location = 'above'

  white = (255, 255, 255)
  black = (0, 0, 0)
  pad = 3  # pads the size of the background box

  x, y = position
  # Get the width (w) and height (h) of the text to be overlayed
  (w, h), _ = cv2.getTextSize(text, font, font_scale, font_thickness)

  # create background rectangle of class color
  # put text over transparent section
  if location == 'inside':
    cv_image[y:y + h + pad, x:x + w + pad] = rectangle_color
    cv2.putText(cv_image, text, (x, y + h), font, font_scale, white, font_thickness)
  else:
    cv_image[y - h - 4:y, x:x + w] = rectangle_color
    cv2.putText(cv_image, text, (x, y - 3), font, font_scale, white, font_thickness)

  return w, h

In [7]:
def draw_bounding_box(cv_image, class_color, x_min, x_max, y_min, y_max):
  """
  Draw a bounding box on a cv_image with a specified color.
  x and y coordinates are in pixel coordinates (not 0 to 1 format)
  """
  # Draw a thin bounding box around min and max coordinates
  cv2.rectangle(cv_image,
                pt1=(x_min, y_min),
                pt2=(x_max, y_max),
                color=class_color,
                thickness=1)

  # Thicken the bounding box near the corners
  lineWidth = min(int((x_max - x_min) * 0.2), int((y_max - y_min) * 0.2))

  cv2.line(cv_image, (x_min, y_min), (x_min + lineWidth, y_min), class_color, thickness=5)
  cv2.line(cv_image, (x_min, y_min), (x_min, y_min + lineWidth), class_color, thickness=5)

  cv2.line(cv_image, (x_max, y_min), (x_max - lineWidth, y_min), class_color, thickness=5)
  cv2.line(cv_image, (x_max, y_min), (x_max, y_min + lineWidth), class_color, thickness=5)

  cv2.line(cv_image, (x_min, y_max), (x_min + lineWidth, y_max), class_color, thickness=5)
  cv2.line(cv_image, (x_min, y_max), (x_min, y_max - lineWidth), class_color, thickness=5)

  cv2.line(cv_image, (x_max, y_max), (x_max - lineWidth, y_max), class_color, thickness=5)
  cv2.line(cv_image, (x_max, y_max), (x_max, y_max - lineWidth), class_color, thickness=5)


In [8]:
def annotate_bounding_box(cv_image, detection_box, display_text, class_color):
  """
  Annotate a cv_image with a single object's class name, confidence, and bounding box
  in a specified color.

  Notes:
    1) detection_box coordinate system is the 0 to 1 TF model output format.
  """
  # convert fractional bounding box coordinates (0 to 1) to pixel coordinates
  x_min, x_max, y_min, y_max = convert_coordinate_system(cv_image, detection_box)

  draw_bounding_box(cv_image, class_color, x_min, x_max, y_min, y_max)

  overlay_text(cv_image,
               display_text,
               font=cv2.FONT_HERSHEY_DUPLEX,
               position=(x_min, y_min),
               font_scale=1,
               font_thickness=2,
               rectangle_color=class_color,
               )

In [9]:
def video_properties(video_capture, cv_image):
  """
  Create a dictionary of key video characteristics associated with a video file.

  Args:
      video_capture:
          Created with cv2.VideoCapture(video_input_path)
      cv_image:
          An numpy image with shape (height, width, depth)
  Returns:
      Dictionary with key video properties
  """
  properties = {}
  image_height, image_width, _ = cv_image.shape
  properties['height'] = image_height
  properties['width'] = image_width
  properties['fps'] = video_capture.get(cv2.CAP_PROP_FPS)
  properties['frames'] = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
  properties['duration_in_seconds'] = round(properties['frames'] / properties['fps'], 2)

  return properties

In [10]:
def url_file_name(path_url):
  """
  Return the file name of a url with and without a suffix

  Args:
  path_url (Path):

  Returns:
    file_name, file_name_no_suffix

  Examples:
    1) http://site.org/file_name.tar.gz -> file_name.tar.gz, file_name
    2) http://site.org/file_name.tar -> file_name.tar, file_name
    3) http://site.org/file_name -> file_name, file_name

  """
  path_url = Path(path_url)
  file_name = path_url.name

  if '.' in file_name:
    first_dot = file_name.index('.')
    file_name_no_suffix = file_name[:first_dot]
  else:
    file_name_no_suffix = file_name

  return file_name, file_name_no_suffix

In [11]:
def output_file_name(model_name,
                     file_name_in,
                     codec="",
                     ):
  """
  Create an output directory (if needed) and suitable file names for model output
  and logging.

  Args:
    model_name (str):
      ML model name used for object detection
    file_name_in (Path):
      image or video file to be processed in the form: input_path/file_name.file_suffix
      example:  /home/user/images/input/my_video.mp4
    codec:
      four letter codec for video encoding

  Returns:
    image_name_out:
      file name to output annotated image or video using cv2
      example:  /home/user/images/output/my_video_<tf_model_name>_<codec>.mp4
    log_name_out:
      file name to output image detection logging information
      example:  /home/user/images/output/my_video_<tf_model_name>_<codec>.log

  """
  file_name_in = Path(file_name_in)
  input_directory = file_name_in.parent
  output_directory = Path.joinpath(input_directory.parent, 'output')

  output_directory.mkdir(parents=True, exist_ok=True)

  base_name_out = file_name_in.stem + "_" + model_name

  if codec:
    new_file_name = base_name_out + "_" + codec + file_name_in.suffix
  else:
    new_file_name = base_name_out + file_name_in.suffix

  image_name_out = Path.joinpath(output_directory, new_file_name)
  log_name_out = Path.joinpath(output_directory, base_name_out + ".log")
  return image_name_out, log_name_out

In [12]:
def get_file_by_url(url,
                    cache_dir,
                    cache_subdir=""):
  """
  Download a url (if not already cached) to local file system

  Args:
    url (str):
      url to download
    cache_dir:
    cache_subdir:

  Returns:
    file_path of cached/downloaded file

  Notes:
    ?raw=true used in github downloads will be stripped off the file_name
  """
  url_path = Path(url)

  file_name = url_path.name
  raw_suffix = file_name.find('?raw=true')
  if raw_suffix != -1:
    file_name = file_name[:raw_suffix]

  file_path = get_file(file_name,
                       origin=url,
                       cache_dir=cache_dir,
                       cache_subdir=cache_subdir,
                       extract=False,
                       )
  return Path(file_path)

In [13]:
def get_files_by_url_list(url_list,
                          cache_dir):
  """
  Download a list of urls (if not already cached) to local file system
  See: get_file_by_url
  """

  file_names = []
  for url in url_list:
    file_name = get_file_by_url(url=url,
                                cache_dir=cache_dir)
    file_names.append(file_name)
  return file_names

In [14]:
def list_files_in_path(path, verbose=0):
  """
  No recursive listing of files in a path

  path (Path):
  verbose (int):
    0 to supress diagnostics
    1 to output diagnostics

  Returns (dict):
    dictionary of file path names in directory
  """
  file_dict = {}
  if verbose > 0:
    print(f"File(s) available in {path}\nindex, file name\n{'-' * 60}")

  for i, item in enumerate(path.iterdir()):
    if item.is_file():
      file_dict[i] = item
      if verbose > 0:
        print(f"{i}, {item.stem + item.suffix}")

  return file_dict

## Color Class
Assists in making brightly colored bounding box plots

In [15]:
class Colors:
  """
  Utility class to select bright and easy to read colors for bounding boxes.

  Usage:
    1) colors = Colors()
    2) colors.rgb_colors(int_value) -> returns RGB tuple

  Notes:
      1) Adjust self.hex_colors to change the available colors
      2) Colors will cycle if int_value is larger than color list length
      3) colors can be returned in either RGB or BGR ordering.

  References:
      1) https://matplotlib.org/stable/gallery/color/named_colors.html
      2) https://github.com/ultralytics/yolov5/issues/670#issuecomment-872348461
  """

  def __init__(self):
    self.hex_colors = {'blue': '#1f77b4',
                       'orange': '#ff7f0e',
                       'green': '#2ca02c',
                       'red': '#d62728',
                       'cyan': '#17becf',
                       'fuchsia': '#FF00FF',
                       }
    self.rgb_colors = [self.hex2rgb(color) for color in self.hex_colors.values()]
    self.bgr_colors = [(b, g, r) for (r, g, b) in self.rgb_colors]

  def hex2rgb(self, h):
    """Convert hex colors to rgb (using PIL ordering)"""
    return tuple(int(h[1 + i:1 + i + 2], 16) for i in (0, 2, 4))

  def get_rgb(self, index):
    """Return a RGB color tuple"""
    index = index % len(self.rgb_colors)
    return self.rgb_colors[index]

  def get_bgr(self, index):
    """Return a BGR color tuple"""
    index = index % len(self.bgr_colors)
    return self.bgr_colors[index]

  def diagnostics(self, number_of_colors=10):
    """Output color tuples for diagnostics"""
    for i in range(number_of_colors):
      print(f"Color tuples in RGB: {colors.get_rgb(i)}, BGR: {colors.get_bgr(i)}")

Color Diagnostics

In [16]:
colors = Colors()
colors.diagnostics()

Color tuples in RGB: (31, 119, 180), BGR: (180, 119, 31)
Color tuples in RGB: (255, 127, 14), BGR: (14, 127, 255)
Color tuples in RGB: (44, 160, 44), BGR: (44, 160, 44)
Color tuples in RGB: (214, 39, 40), BGR: (40, 39, 214)
Color tuples in RGB: (23, 190, 207), BGR: (207, 190, 23)
Color tuples in RGB: (255, 0, 255), BGR: (255, 0, 255)
Color tuples in RGB: (31, 119, 180), BGR: (180, 119, 31)
Color tuples in RGB: (255, 127, 14), BGR: (14, 127, 255)
Color tuples in RGB: (44, 160, 44), BGR: (44, 160, 44)
Color tuples in RGB: (214, 39, 40), BGR: (40, 39, 214)


## TensorFlowModel Class
Class to specify and load a TensorFlow object detection model
that can be used to detect objects in images, video, and webcam streams

In [17]:
# noinspection PyAttributeOutsideInit,PyMethodMayBeStatic
class TensorFlowModel:
  """
  Download and/or load TensorFlow model for object detection.

  Notes:
    1) TensorFlow 2 Detection Model Zoo:
        https://github.com/tensorflow/models/blob/master/research/object_detection/g3doc/tf2_detection_zoo.md
    2) Coco dataset used to train TF models
        https://cocodataset.org/
    3) Coco id's are 1 based (human is id:1), to ensure that the coco.names file agrees with the
       TF model indexing, a placeholder line with '__Background__' is used to align indexing.
    4) Discussion of Coco label types
        https://tech.amikelive.com/node-718/what-object-categories-labels-are-in-coco-dataset/
        https://saturncloud.io/blog/what-is-tensorflowmodels-and-its-relationship-with-coco/
        https://www.tensorflow.org/datasets/catalog/coco
        https://arxiv.org/pdf/1405.0312.pdf
    5) Some articles indicate coco is 91 objects, while others indicate 90.  It appears that some tensorflow
       models were trained with 90 object id's.  object 91 is a hairbrush, which may not be considered in some TF models
  """

  def __init__(self,
               cache_dir,
               cache_subdir="checkpoints",
               ):
    """
    Read in Coco class names, TF model Zoo names and urls, and create model directory structure.

    Args:
      cache_dir:
        local directory to store TF models
      cache_subdir:
        TF model sub directory
    """
    self.cache_dir = Path(cache_dir)
    self.cache_subdir = cache_subdir

    # Create file structure
    os.makedirs(self.cache_dir, exist_ok=True)
    self.load_meta_data(cache_dir)

  def load_meta_data(self, cache_dir):
    """
    Load COCO class names into an array and available TensorFlow models into a dictionary.

    Args:
      cache_dir:
        local directory to store TF models
    """
    # text file with coco dataset names
    coco_class_names_url = "https://raw.githubusercontent.com/tony-held/public/main/obj_detection/coco_class_names.txt"
    # json file with TF model name and associated url
    tf_obj_detection_models_url = "https://raw.githubusercontent.com/tony-held/public/main/obj_detection/tf_object_detection_models.json"

    # Download metadata files to local drive (if necessary)
    self.class_names_path = get_file_by_url(coco_class_names_url,
                                            cache_dir=cache_dir,
                                            cache_subdir='meta_data',
                                            )
    self.tf_models_path = get_file_by_url(tf_obj_detection_models_url,
                                          cache_dir=cache_dir,
                                          cache_subdir='meta_data',
                                          )
    # Read list of objects that have been pretrained for classification in the coco dataset
    self.class_names = self.read_class_name_file(self.class_names_path)
    # Read name and associated url for TF object detection models available at the the Model Zoo
    self.tf_models = self.read_tf_object_models(self.tf_models_path)

  def read_class_name_file(self, class_name_file_path):
    with open(class_name_file_path, 'r') as read_file:
      return read_file.read().splitlines()

  def read_tf_object_models(self, tf_models_path):
    with open(tf_models_path, "r") as read_file:
      return json.load(read_file)

  def get_model_and_file_name(self, model_url):
    """
    Return the file name and model name associated with a TF model url in tar/zip format.
    """
    model_url = Path(model_url)

    file_name, model_name = url_file_name(model_url)

    # file_name = os.path.basename(model_url)  # example, efficientdet_d4_coco17_tpu-32.tar.gz
    model_name = file_name[:file_name.index('.')]  # example, efficientdet_d4_coco17_tpu-32
    return file_name, model_name

  def download_model_url(self, model_url):
    """
    Download a local copy of the TF model associated with a model url (if needed).
    """
    file_name, model_name = self.get_model_and_file_name(model_url)

    print("Downloading model (if necessary): " + model_name)
    # Downloads a file from a url if it not already in the cache.
    get_file(fname=str(file_name),
             origin=model_url,
             cache_dir=self.cache_dir,
             cache_subdir=self.cache_subdir,
             extract=True)

  def load_model_by_url(self, model_url):
    """
    Set self.model to a local copy of the TF model associated with a model url.
    """
    tf.keras.backend.clear_session()
    self.file_name, self.model_name = self.get_model_and_file_name(model_url)

    self.download_model_url(model_url)
    print("Loading model: " + self.model_name)
    print(f"{'-' * 80}\nThis may generate warnings and other diagnostics\n{'-' * 80}")

    model_path = Path.joinpath(self.cache_dir,
                               self.cache_subdir,
                               self.model_name,
                               "saved_model")

    self.model = tf.saved_model.load(str(model_path))

    print("Model " + self.model_name + " loaded successfully...")

  def number_pretrained_models(self):
    return len(self.tf_models)

  def list_models(self):
    """
    Print a list of available pre-trained TF models
    """
    print(f"The following pre-trained TF object detection models are available:\n{'-' * 80}")
    print(f"Not all models support all features of this notebook and may need code modification to work")
    for i, key in enumerate(self.tf_models):
      print(f"{i}: <{key}>")

  def load_model_by_name(self, model_name):
    """
    Using the dictionary of model names and urls read in from the TF metadata file,
    load in a publicly available TF model.

    Args:
      model_name (str):
        name of TF model.  This is a key to the self.tf_models dict
    """
    model_url = self.tf_models[model_name]
    self.load_model_by_url(model_url)

  def load_model_by_index(self, index):
    """
    Same as load_model_by_name, except an int index is used to specify the model rather
    than the model name.
    """
    if index > self.number_pretrained_models():
      raise ValueError(f"Index <{index}> greater than number "
                       f"of available models <{self.number_pretrained_models()}> ")
    for i, key in enumerate(self.tf_models):
      if i == index:
        self.load_model_by_name(key)
        break

## Single image object detection and annotation

In [18]:
def image_annotation(image,
                     tf_model,
                     score_threshold=0.5,
                     iou_threshold=0.5,
                     max_output_size=50,
                     verbose=0,
                     display_image=True,
                     display_time_ms=4000,
                     save_to_file=True,
                     ):
  """
  Annotate list of cv_images with class names, confidence, and bounding boxes using
  a pre-trained tensorflow model to detect objects.

  Args:
    image (Path or np.array):
      Path to a single image for object detection or a np.array representing an image
    tf_model:
      TF object model to be used for object detection
    score_threshold: 0.->1.0
      User specified minimum confidence score for object detection
    iou_threshold: 0.->1.0
      User specified threshold for removing overlapping detections
    max_output_size:
      max number of objects to visualize
    verbose (int):
      0 for critical messages
      1 for general messages
      2 for detailed debugging message
    display_image (bool):
      True to show image
    display_time_ms (int):
      number of milliseconds to display an image before closing it automatically
    save_to_file (bool):
      True to save image to file
  Returns:
    detection_map (dict):
      map with image number as the key, and object name and confidence as a list of tuples

  Notes:
    1) Some object detection models can only process a single image at a time and require a shape of (1, height, width, 3).  When using models with this constraint your image list must be of length 1.
    2) The formulation below converts an image path into a list of image paths that will be length 1, future implementations may explore image lengths greater than 1 for models that support larger batches.

  """
  detection_map = {}

  if isinstance(image, Path):
    image_path_as_string = str(image)
    print(f"Performing object detection on file: {image_path_as_string}")
    # Convert color image to np array with shape: (height, width, 3)
    cv_image = cv2.imread(image_path_as_string)
  else:
    cv_image = image

  image_height, image_width, image_depth = cv_image.shape

  if verbose >= 1:
    print(f"Converting cv image into color corrected 4D tensor")
    print(f"Expected shape of input tensor is (1, {image_height}, {image_width}, {image_depth})")
  tf_input = cv_image_list_to_tf_input([cv_image])

  # Use TF model to perform object detections
  detections = tf_model.model(tf_input)

  # there will only be one image processed per tensor because it hard coded by many TF model formulations
  # the index i is zero because the detection lists will only be length 1
  i = 0
  detection_map[i] = []
  detection_classes = detections['detection_classes'][i].numpy().astype(np.int32)
  detection_scores = detections['detection_scores'][i].numpy()
  detection_boxes = detections['detection_boxes'][i].numpy()

  # find indices of object detections that satisfy the
  # user specified confidence and overlap thresholds
  selected_indices = tf.image.non_max_suppression(detection_boxes,
                                                  detection_scores,
                                                  max_output_size=max_output_size,
                                                  iou_threshold=iou_threshold,
                                                  score_threshold=score_threshold)

  if verbose >= 1:
    print(f"Number of object detections before suppression low confidence and high overlap: {len(detection_classes)}")
    print(f"Number of object detections after suppression low confidence and high overlap: {len(selected_indices)}")
  if verbose == 2:
    print(f"{selected_indices=}")

  # Loop through bounding boxes that satisfied thresholds and plot them
  # on the underlying image
  for j in selected_indices:
    # Extract detected objects class, confidence score, and bounding box
    detection_class = detection_classes[j]
    detection_score = round(100 * detection_scores[j])
    detection_box = tuple(detection_boxes[j].tolist())
    # print(detection_box, detection_class, detection_score)

    # Determine class name, associated color, and text to display
    class_label_text = tf_model.class_names[detection_class]
    class_color = colors.get_bgr(detection_class)
    display_text = '{}: {}%'.format(class_label_text.lower(), detection_score)
    if verbose == 2:
      print(f"\tDetection. {display_text}")
    detection_map[i].append((class_label_text, detection_scores[j]))

    annotate_bounding_box(cv_image, detection_box, display_text, class_color)

  if save_to_file is True:
    image_name_out, _ = output_file_name(tf_model.model_name, image)
    print(f"Creating File: {image_name_out}")
    cv2.imwrite(str(image_name_out), cv_image)

  if display_image is True:
    # colab needs to use a special function for image display
    if is_environ_colab is True:
      cv2_imshow(cv_image)
    else:
      cv2.imshow("Result", cv_image)
      cv2.waitKey(display_time_ms)

    cv2.destroyAllWindows()
  return detection_map

## Video object detection and annotation

In [19]:
def video_annotation(tf_model,
                     video_input_path,
                     video_output_path,
                     log_file_path,
                     score_threshold=0.5,
                     iou_threshold=0.5,
                     max_output_size=50,
                     codec='mp4v',
                     output_fps=None,
                     skip_frames=0,
                     adjust_fps_for_skip=False,
                     preview_annotations=True,
                     make_annotated_video=True,
                     verbose=0):
  """
  Annotate a video with class names, confidence, and bounding boxes using
  a pre-trained tensorflow model to detect objects.

  Args:
    tf_model:
      TF model to be used for object detection
    video_input_path:
      path of video for object detection
    video_output_path:
      path of video to be created with object annotations
    log_file_path:
      object detection results file
    score_threshold: 0.->1.0
      User specified minimum confidence score for object detection
    iou_threshold: 0.->1.0
      User specified threshold for removing overlapping detections
    max_output_size:
      max number of objects to visualize
    codec:
      4-character code of codec used to compress the frames of the output video
      supported codes: https://fourcc.org/codecs.php
      'mp4v' and 'x264' both create mp4s, 'x264' seems faster and smaller
      note that x264 does not appear to be on colab, so safest to use mp4v
    output_fps:
      Frame rate (frames per second, fps) of the output video.
      If None, the frame rate of the input video will be used.
    skip_frames:
      The number of frames to skip between captures.
      =0 to not skip any frames, x to skip x frames after each capture
    adjust_fps_for_skip:
      if True, adjust the fps of the output file so that skip_frames does not
      result in high speed videos
    preview_annotations:
      True if you want to see object detection video results graphically during processing
    make_annotated_video:
      True if you want to create an annotated video with object detection boxes
    verbose (int):
      0 for critical messages
      1 for general messages
      2 for detailed debugging messages

  Returns:
    detection_map (dict):
      map with frame number as the key, and object name and confidence as a list of tuples
  """
  start_time = datetime.datetime.now()

  # Save arguments for diagnostic
  diagnostics = vars()

  # dict to store object detection results
  detection_map = {}

  # Open log file
  logger = open(log_file_path, 'w')
  print(f"Logging object detection results to: {log_file_path}")
  logger.write(f"video_annotation() called with the following arguments:\n{'-' * 80}\n")
  pprint.pprint(diagnostics, logger, sort_dicts=False)

  # Open video file
  video_capture = cv2.VideoCapture(video_input_path)

  if (video_capture.isOpened() == False):
    msg = f"Error opening file: {video_input_path}"
    raise IOError(msg)

  # capture first video frame and save key video properties to dict
  (success, cv_image) = video_capture.read()
  video_props = video_properties(video_capture, cv_image)
  logger.write(f"\nInput video has the following properties\n{'-' * 80}\n")
  pprint.pprint(video_props, logger)
  logger.write(f"\nFrame #, Objects Detected\n{'-' * 80}\n")

  # if output frames per second is not set by user, use the input fps
  if output_fps is None:
    output_fps = video_props['fps']
  # adjust outputfile framerate if desired
  if adjust_fps_for_skip is True:
    output_fps = output_fps / (1 + skip_frames)

  # Create video output file writer if desired
  if make_annotated_video is True:
    # VideoWriter_fourcc expects four single characters as arguments
    # *'mp4v' will expand to 'm', 'p', '4', 'v'
    fourcc = cv2.VideoWriter_fourcc(*codec)
    video_out = cv2.VideoWriter(str(video_output_path),
                                fourcc,
                                output_fps,
                                (video_props['width'], video_props['height']))
  else:
    video_out = None

  # counters/timers
  frame_number = 0  # frame number of video
  frames_modeled = 0  # Number of frames modeled with TF object detection
  tf_fps = 0  # The number of frames per second processed by TF object detector
  call_time = time.time()

  # tqdm creates a completion status bar
  with tqdm(total=video_props['frames'],
            unit=' frames',
            desc='Frames Processed') as pbar:

    while success and cv_image is not None:

      # print(frame_number)
      # start timer for processing speed calculations
      start_time = time.time()

      # Take a single image, create an single member image list, and run image detection
      image_detection_map = image_annotation(cv_image,
                                             tf_model,
                                             score_threshold=score_threshold,
                                             iou_threshold=iou_threshold,
                                             max_output_size=max_output_size,
                                             save_to_file=False,
                                             display_image=False,
                                             )
      # Save the objects detected into the detection map
      # Note, this assumes that image batch sizes are 1, if larger sizes are used
      # then this methodology needs to be modified.
      # [0] because there is only one image in the image list
      detection_map[frame_number] = image_detection_map[0]

      msg = f"{frame_number}, {detection_map[frame_number]}"
      logger.write(msg + '\n')

      fps_caption = "FPS: " + str(int(tf_fps))
      cv2.putText(img=cv_image,
                  text=fps_caption,
                  org=(20, 20),
                  fontFace=cv2.FONT_HERSHEY_PLAIN,
                  fontScale=2,
                  color=(0, 255, 0),  # green
                  thickness=2)

      if verbose >= 1:
        print(f"{frame_number = }, TF model FPS {tf_fps:.2f}")

      if preview_annotations is True:
        if is_environ_colab is True:
          # Suppressing image preview on colab as it will strain memory
          # cv2_imshow(cv_images[0])
          pass
        else:
          cv2.imshow("Result", cv_image)
          # cv2.waitKey(milliseconds) display the frame for given milliseconds or until any key is pressed
          # 0xFF is a bit filter that takes only the last 8 bits of the key pressed
          key = cv2.waitKey(1) & 0xFF
          if key == ord("q"):
            success = False
            break

      if make_annotated_video is True:
        video_out.write(cv_image)

      # skip interim frames
      for j in range(skip_frames):
        frame_number += 1
        video_capture.grab()

      # read next frame
      frame_number += 1
      (success, cv_image) = video_capture.read()

      # update counters and calculate frame rate
      end_time = time.time()
      tf_fps = 1 / (end_time - start_time)
      pbar.update(1 + skip_frames)

  if make_annotated_video is True:
    video_out.release()
    print(f"\nCreating file: {video_output_path}")

  end_time = time.time()
  msg = f"Video processing complete in {end_time - call_time:.2f} seconds."
  print(msg)
  logger.write(msg + '\n')
  logger.close()

  return detection_map

# Determine Runtime Environment and Project Directory Structure

## Determine machine operating system

In [20]:
# Determine machine type
# path_prefix is used to specify a drive letter (windows only)
if os.name == 'posix':
  print(f"Running on a linux/mac system")
elif os.name == 'nt':
  print(f"Running on a windows system")
else:
  raise ValueError('Unknown operating system')

Running on a windows system


## Determine if environment is Colab

Notes:
* Image processing with cv2 on Colab has some quirks and requires workarounds
* For details, see the following refs:
  * __[Introduction to Image Processing in Python](https://colab.research.google.com/github/xn2333/OpenCV/blob/master/Image_Processing_in_Python_Final.ipynb#scrollTo=OU4AAstR0HG6)__
  * __[cv2imshow doesnt render video file in Google Colab](https://saturncloud.io/blog/cv2imshow-doesnt-render-video-file-in-google-colab/)__

In [21]:
colab_release_tag = os.getenv("COLAB_RELEASE_TAG")
if colab_release_tag:
  print(f"Colab Release Tag: {colab_release_tag}")
  is_environ_colab = True
  from google.colab.patches import cv2_imshow  # required for image display on colab
else:
  is_environ_colab = False

print(f"Is runtime Google Colab: {is_environ_colab}")

Is runtime Google Colab: False


## Specify file structure for input and TF model location

Notes:
* obj_detection_path and path_prefix determine the base directory for obj detection modeling input/output
* obj_detection_path is a string using unix (forward slash) notation.  Use the forward slash notation even if working on windows
* path_prefix only applies to windows lets you specify the drive letter of the obj_detection_path
* You can change obj_detection_path and path_prefix, but it is recommended that your directory should not be in a directory that is synced with online back-up system (OneDrive, GoogleDrive, etc)

In [22]:
obj_detection_path = "/local/ml_local/obj_detection/"
path_prefix = "D:"  # Only applies to windows, ignored on other platforms

if os.name != 'nt':
  path_prefix = ""

# Location of input images and videos to be processed
image_input_directory = Path(path_prefix + obj_detection_path + "images_videos/input")
image_input_directory.mkdir(parents=True, exist_ok=True)
print(f"Location of input images and videos: {image_input_directory}")

# Location of pretrained models for object detection
pretrained_model_directory = Path(path_prefix + obj_detection_path + "pretrained_models")
pretrained_model_directory.mkdir(parents=True, exist_ok=True)
print(f"Location of pre-trained object detection models: {pretrained_model_directory}")

Location of input images and videos: D:\local\ml_local\obj_detection\images_videos\input
Location of pre-trained object detection models: D:\local\ml_local\obj_detection\pretrained_models


# Download images/video for processing
Download example images/videos from github for example purposes.


In [23]:
url_list = ["https://github.com/tony-held/public/blob/main/obj_detection/tennis_01.jpg?raw=true",
            "https://github.com/tony-held/public/blob/main/obj_detection/animals_01.jpg?raw=true",
            "https://github.com/tony-held/public/blob/main/obj_detection/back_to_the_future_skate.mp4?raw=true",
            ]
file_names = get_files_by_url_list(url_list,
                                   image_input_directory)

List files available in local image_input_directory

In [24]:
file_dict = list_files_in_path(path=image_input_directory, verbose=1)

File(s) available in D:\local\ml_local\obj_detection\images_videos\input
index, file name
------------------------------------------------------------
0, animals_01.jpg
1, back_to_the_future.mp4
2, back_to_the_future_skate.mp4
3, dana_golf.mp4
4, four_dogs.jpg
5, ian_jumping.mp4
6, mountain_bike_01.jpg
7, mountain_bike_02.jpg
8, mountain_bike_03.jpg
9, mountain_bike_04.jpg
11, soccer_01.jpg
12, street1.mp4
13, street2.mp4
14, tennis_01.jpg
15, your_the_one_that_i_want.mp4


# Load Pretrained TF Model

In [25]:
# A TensorFlow model wrapper class allows the selection of a
# specific tf object detection model
tf_model = TensorFlowModel(cache_dir=pretrained_model_directory)

In [26]:
tf_model.list_models()

The following pre-trained TF object detection models are available:
--------------------------------------------------------------------------------
Not all models support all features of this notebook and may need code modification to work
0: <CenterNet HourGlass104 512x512>
1: <CenterNet HourGlass104 Keypoints 512x512>
2: <CenterNet HourGlass104 1024x1024>
3: <CenterNet HourGlass104 Keypoints 1024x1024>
4: <CenterNet Resnet50 V1 FPN 512x512>
5: <CenterNet Resnet50 V1 FPN Keypoints 512x512>
6: <CenterNet Resnet101 V1 FPN 512x512>
7: <CenterNet Resnet50 V2 512x512>
8: <CenterNet Resnet50 V2 Keypoints 512x512>
9: <CenterNet MobileNetV2 FPN 512x512>
10: <CenterNet MobileNetV2 FPN Keypoints 512x512>
11: <EfficientDet D0 512x512>
12: <EfficientDet D1 640x640>
13: <EfficientDet D2 768x768>
14: <EfficientDet D3 896x896>
15: <EfficientDet D4 1024x1024>
16: <EfficientDet D5 1280x1280>
17: <EfficientDet D6 1280x1280>
18: <EfficientDet D7 1536x1536>
19: <SSD MobileNet v2 320x320>
20: <SSD Mobile

## Select a TF model by index for object detection

In [27]:
# 15: <EfficientDet D4 1024x1024> COCO (mAP): 48.5, speed (ms): 133,  gives excellent results
# 13: <EfficientDet D2 768x768>   COCO (mAP): 41.8, speed (ms): 67, twice as fast, lower accuracy
# 6: <CenterNet Resnet101 V1 FPN 512x512> COCO (mAP): 34.2, speed (ms): 34, fastest model with mAP>30
tf_model.load_model_by_index(15)

Downloading model (if necessary): efficientdet_d4_coco17_tpu-32
Loading model: efficientdet_d4_coco17_tpu-32
--------------------------------------------------------------------------------
--------------------------------------------------------------------------------




Model efficientdet_d4_coco17_tpu-32 loaded successfully...


# Object Detection Visualization


# Detecting/Visualizing a Single Image

In [28]:
# Change variable below to the image path of your choosing
image_path = Path.joinpath(image_input_directory, 'tennis_01.jpg')

In [29]:
detection_map = image_annotation(image_path,
                                 tf_model,
                                 score_threshold=0.5,
                                 iou_threshold=0.5,
                                 max_output_size=50,
                                 verbose=0,
                                 display_time_ms=3000,
                                 )
cv2.destroyAllWindows()
print(f"Classifier type and confidence of detected objects:\n{detection_map}")

Performing object detection on file: D:\local\ml_local\obj_detection\images_videos\input\tennis_01.jpg
Creating File: D:\local\ml_local\obj_detection\images_videos\output\tennis_01_efficientdet_d4_coco17_tpu-32.jpg
Classifier type and confidence of detected objects:
{0: [('person', 0.9126301), ('tennis racket', 0.8685208), ('sports ball', 0.83457816), ('person', 0.58147424)]}


# Detecting/Visualizing Video
Select video for object detection by specifying video_input_path.
* The default image/video input directory is:
  * /local/ml_local/obj_detection/images_videos/input/
* back_to_the_future_skate.mp4 will be in the image directory
* populate video_input_paths dictionary with other movies you place in your input directory for subsequent modeling

## Select input file and codec

In [30]:
video_input_path = Path.joinpath(image_input_directory, 'back_to_the_future_skate.mp4')
codec = 'mp4v'  # alternatives are 'xvid' and 'x264'

## Automatic output/logging and diagnostics

In [31]:
video_input_path = str(video_input_path)
print(f"Performing object detection on file: {video_input_path}")
video_output_path, log_file_path = output_file_name(tf_model.model_name,
                                                    video_input_path,
                                                    codec=codec)
print(f"Video input path: {video_input_path}")
print(f"Video output path: {video_output_path}")
print(f"Video log file path: {log_file_path}")

Performing object detection on file: D:\local\ml_local\obj_detection\images_videos\input\back_to_the_future_skate.mp4
Video input path: D:\local\ml_local\obj_detection\images_videos\input\back_to_the_future_skate.mp4
Video output path: D:\local\ml_local\obj_detection\images_videos\output\back_to_the_future_skate_efficientdet_d4_coco17_tpu-32_mp4v.mp4
Video log file path: D:\local\ml_local\obj_detection\images_videos\output\back_to_the_future_skate_efficientdet_d4_coco17_tpu-32.log


In [32]:
detection_map = video_annotation(tf_model,
                                 str(video_input_path),
                                 video_output_path=str(video_output_path),
                                 log_file_path=log_file_path,
                                 score_threshold=0.25,  # consider 0.25
                                 iou_threshold=0.25,  # consider 0.45
                                 max_output_size=50,
                                 codec=codec,
                                 skip_frames=0,  # consider 5 to skip every 5 frames
                                 adjust_fps_for_skip=True,
                                 preview_annotations=True,
                                 verbose=0)
cv2.destroyAllWindows()

Logging object detection results to: D:\local\ml_local\obj_detection\images_videos\output\back_to_the_future_skate_efficientdet_d4_coco17_tpu-32.log


Frames Processed: 100%|██████████| 764/764 [03:37<00:00,  3.51 frames/s]


Creating file: D:\local\ml_local\obj_detection\images_videos\output\back_to_the_future_skate_efficientdet_d4_coco17_tpu-32_mp4v.mp4
Video processing complete in 217.91 seconds.





# Details and Diagnostics of TF Model Output
This section explores the general modeling process and detailed results
for a single hard coded image to allow developers to more easily create
custom functions to process TF model output.

In [33]:
image_path = Path.joinpath(image_input_directory, 'tennis_01.jpg')
image_path_as_string = str(image_path)
print(f"Performing object detection on file: {image_path_as_string}")

Performing object detection on file: D:\local\ml_local\obj_detection\images_videos\input\tennis_01.jpg


In [34]:
# Convert image file to a np array with shape: (height, width, 3)
image = cv2.imread(image_path_as_string)
# Change color ordering to be consistent with TF model expectations
np_image = cv2.cvtColor(image.copy(), cv2.COLOR_BGR2RGB)
# Convert corrected color ordering image in numpy format
# to a tensor of shape (1, height, width, 3)
input_tensor = image_np_to_tf(np_image)

type(tf_input)=<class 'tensorflow.python.framework.ops.EagerTensor'>, tf_input.shape=TensorShape([1, 489, 870, 3])


## Find Objects Using TF Model

TF modeling returns a dictionary of tensors.

A brief description of the key and the shape of its associated value is shown below.

Note that batch_size is the number of images evaluated simultaneously, for a single image, the batch_size = 1

* num_detections: shape(batch_size,)
  * the number of objects detected
* detection_classes: shape(batch_size, num_detections)
  * class number associated with object
* detection_scores: shape(batch_size, num_detections)
  * confidence of the highest ranked class being correctly identified
* detection_multiclass_scores: shape(batch_size, num_detections, num_classes)
  * confidence of all class types being correctly identified
* detection_boxes: shape(batch_size, num_detections, 4)
  * coordinates of bounding boxes
* raw_detection_scores: shape(batch_size, raw_num_detections, num_classes)
  * all possible raw object detections (before gleaning the reasonable ones)
* raw_detection_boxes: shape(batch_size, raw_num_detections, 4)
  * all possible raw object detections bounding boxes
* detection_anchor_indices

The coco dataset is 91 classes, but some tensorflow models appear to use only 90 classes.


In [35]:
# The following line performs object detection using the specified TF model
detections = tf_model.model(input_tensor)

## Diagnostics of the model results dictionary

In [36]:
for key, value in sorted(detections.items()):
  print(f"{key}\n{value.shape}\n{'-' * 30}")

detection_anchor_indices
(1, 100)
------------------------------
detection_boxes
(1, 100, 4)
------------------------------
detection_classes
(1, 100)
------------------------------
detection_multiclass_scores
(1, 100, 90)
------------------------------
detection_scores
(1, 100)
------------------------------
num_detections
(1,)
------------------------------
raw_detection_boxes
(1, 196416, 4)
------------------------------
raw_detection_scores
(1, 196416, 90)
------------------------------


In [37]:
print(f"Number of significant object detections {detections['num_detections']}")
if 'raw_detection_scores' in detections:
  print(f"Number of raw object detections {detections['raw_detection_scores'].shape}")

Number of significant object detections [100.]
Number of raw object detections (1, 196416, 90)


In [38]:
print(f"Class ID's associated with the significant object detections.")
detections['detection_classes']

Class ID's associated with the significant object detections.


<tf.Tensor: shape=(1, 100), dtype=float32, numpy=
array([[ 1., 43., 37.,  1., 37.,  1.,  1., 37.,  1., 53.,  1.,  1.,  1.,
        34.,  1., 37., 37., 40.,  1., 37., 32.,  1., 57., 47., 86., 85.,
         1., 43., 55., 43.,  1.,  1., 43., 77., 41., 43., 74., 60., 39.,
        38., 37., 37., 44., 53.,  2., 37.,  1., 34., 44., 10., 37., 34.,
         1., 37., 43., 13.,  1., 25., 22., 43., 77., 14., 43.,  1.,  1.,
         3., 15., 43., 16.,  3.,  1., 37., 86.,  1., 28., 10.,  1.,  1.,
        37., 37.,  1., 32.,  1., 37., 43., 37., 37., 40.,  1.,  1., 37.,
        85., 31., 43., 31.,  6., 37., 31., 37.,  1.]], dtype=float32)>

In [39]:
print(f"Confidence (0 to 1) for the most likely class associated with the significant object detections.")
detections['detection_scores'].shape

Confidence (0 to 1) for the most likely class associated with the significant object detections.


TensorShape([1, 100])

In [40]:
if 'detection_multiclass_scores' in detections:
  print(f"Confidence (0 to 1) for all classes associated with the significant object detections.")
  print(detections['detection_multiclass_scores'].shape)

Confidence (0 to 1) for all classes associated with the significant object detections.
(1, 100, 90)


In [41]:
cv2.destroyAllWindows()