<a id="top"></a>
# Safety Gear Detection Sample Application

## Introduction

This sample application demonstrates how a smart video IoT solution may be created using Intel® hardware and software tools to perform safety gear detection. This solution detects any number of objects within a video frame looking specifically for people, safety vests, and hardhats. This is a work in progress notebook. The model and videos are from https://github.com/intel-iot-devkit/safety-gear-detector-python and https://github.com/intel-iot-devkit/sample-videos"

In [None]:
import colorsys
import os
import random
import time
import urllib
from pathlib import Path

import cv2
import matplotlib.pyplot as plt
import numpy as np
from IPython.display import (
    HTML,
    FileLink,
    Pretty,
    ProgressBar,
    Video,
    clear_output,
    display,
)
from openvino.inference_engine import IECore

### Settings

In [None]:
DEVICE = "CPU"
MODEL_FILE = "models/mobilenet-ssd.xml"
MODEL_FILE_PERSON = "models/person-detection-retail-0013.xml"
LABELS_FILE = "labels.txt"
model_name = os.path.basename(MODEL_FILE)
model_name_person = os.path.basename(MODEL_FILE_PERSON)
model_xml_path = Path(MODEL_FILE).with_suffix(".xml")
model_xml_path_person = Path(MODEL_FILE_PERSON).with_suffix(".xml")

### Functions

In [None]:
def load_image(path: str):
    """
    Loads an image from `path` and returns it as BGR numpy array. `path`
    should point to an image file, either a local filename or an url.
    """
    if path.startswith("http"):
        # Set User-Agent to Mozilla because some websites block
        # requests with User-Agent Python
        request = urllib.request.Request(
            path, headers={"User-Agent": "Mozilla/5.0"}
        )
        response = urllib.request.urlopen(request)
        array = np.asarray(bytearray(response.read()), dtype="uint8")
        image = cv2.imdecode(array, -1)  # Loads the image as BGR
    else:
        image = cv2.imread(path)
    return image


def to_rgb(image_data) -> np.ndarray:
    """
    Convert image_data from BGR to RGB
    """
    return cv2.cvtColor(image_data, cv2.COLOR_BGR2RGB)


def convert_result_to_image(resized_image, result, labeldict):
    inf_results = result[0][0]
    colors = ((255, 0, 0), (0, 255, 0), (0, 0, 255), (0, 0, 255))

    resized_image_rgb = cv2.cvtColor(resized_image, cv2.COLOR_BGR2RGB)

    for number, proposal in enumerate(inf_results):
        if proposal[2] > 0.5:
            ih, iw = resized_image.shape[:-1]
            label = np.int(proposal[1])
            labelname = labeldict[label]

            xmin = np.int(iw * proposal[3])
            ymin = max(10, np.int(ih * proposal[4]))
            xmax = np.int(iw * proposal[5])
            ymax = np.int(ih * proposal[6])

            resized_image_rgb = cv2.rectangle(
                resized_image_rgb,
                (xmin, ymin),
                (xmax, ymax),
                colors[label - 1],
                3,
            )
            cv2.putText(
                resized_image_rgb,
                f"{labelname} {proposal[2]:.2f}",
                (xmin, ymin - 10),
                cv2.FONT_HERSHEY_SIMPLEX,
                0.8,
                colors[label - 1],
                1,
                cv2.LINE_AA,
            )

    result_image_rgb = cv2.resize(resized_image_rgb, (image.shape[:2][::-1]))
    return result_image_rgb

In [None]:
class ColorPalette:
    def __init__(self, n, rng=None):
        assert n > 0

        if rng is None:
            rng = random.Random(0xACE)

        candidates_num = 100
        hsv_colors = [(1.0, 1.0, 1.0)]
        for _ in range(1, n):
            colors_candidates = [
                (rng.random(), rng.uniform(0.8, 1.0), rng.uniform(0.5, 1.0))
                for _ in range(candidates_num)
            ]
            min_distances = [
                self.min_distance(hsv_colors, c) for c in colors_candidates
            ]
            arg_max = np.argmax(min_distances)
            hsv_colors.append(colors_candidates[arg_max])

        self.palette = [self.hsv2rgb(*hsv) for hsv in hsv_colors]

    @staticmethod
    def dist(c1, c2):
        dh = min(abs(c1[0] - c2[0]), 1 - abs(c1[0] - c2[0])) * 2
        ds = abs(c1[1] - c2[1])
        dv = abs(c1[2] - c2[2])
        return dh * dh + ds * ds + dv * dv

    @classmethod
    def min_distance(cls, colors_set, color_candidate):
        distances = [cls.dist(o, color_candidate) for o in colors_set]
        return np.min(distances)

    @staticmethod
    def hsv2rgb(h, s, v):
        return tuple(round(c * 255) for c in colorsys.hsv_to_rgb(h, s, v))

    def __getitem__(self, n):
        return self.palette[n % len(self.palette)]

    def __len__(self):
        return len(self.palette)

## Load model and get model information

Load the model in Inference Engine with `ie.read_network` and load it to the specified device with `ie.load_network`

In [None]:
ie = IECore()
net = ie.read_network(
    str(model_xml_path),
    str(model_xml_path.with_suffix(".bin")),
)

exec_net = ie.load_network(network=net, device_name=DEVICE)

input_key = list(exec_net.input_info)[0]
output_key = list(exec_net.outputs.keys())[0]

network_input_shape = exec_net.input_info[input_key].tensor_desc.dims
(network_image_height, network_image_width) = network_input_shape[2:]

## Safety Gear Detection on a Single Image

In [None]:
image = load_image("media/safety-gear-image.jpg")
# resize to input shape for network
resized_image = cv2.resize(image, (network_image_width, network_image_height))

# reshape image to network input shape NCHW
input_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)
plt.imshow(to_rgb(image))

### Do inference on image

Do the inference, convert the result to an image, and resize it to the original image shape

In [None]:
result = exec_net.infer(inputs={input_key: input_image})[output_key]

### Display result

In [None]:
labels = open(LABELS_FILE).read().splitlines()
labeldict = {i + 1: labelname for i, labelname in enumerate(labels)}

result_image_rgb = convert_result_to_image(image, result, labeldict)
plt.figure(figsize=(12, 6))
plt.imshow(result_image_rgb)

## Safety Gear Detection on Video

In [None]:
VIDEO_FILE = "media/worker_zone_detection_small.mp4"

# Number of seconds of input video to process. Set to 0 to process
# the full video.
NUM_SECONDS = 4

# Set ADVANCE_FRAMES to 1 to process every frame from the input video
# Set ADVANCE_FRAMES to 2 to process every second frame. This reduces
# the time it takes to process the video
ADVANCE_FRAMES = 2

# Set SCALE_OUTPUT to reduce the size of the result video
# If SCALE_OUTPUT is 0.5, the width and height of the result video
# will be half the width and height of the input video
SCALE_OUTPUT = 0.5

# The format to use for video encoding. VP09 is slow,
# but it works on most systems.
# Try the THEO encoding if you have FFMPEG installed.
# FOURCC = cv2.VideoWriter_fourcc(*"THEO")
FOURCC = cv2.VideoWriter_fourcc(*"VP09")

# Create Path objects for the input video and the resulting video
video_path = Path(VIDEO_FILE)
result_video_path = video_path.with_name(f"{video_path.stem}_result.mp4")

In [None]:
cap = cv2.VideoCapture(str(video_path))
ret, image = cap.read()
if not ret:
    raise ValueError(f"The video at {video_path} cannot be read.")
input_fps = cap.get(cv2.CAP_PROP_FPS)
input_video_frame_height, input_video_frame_width = image.shape[:2]

target_fps = input_fps / ADVANCE_FRAMES
target_frame_height = int(input_video_frame_height * SCALE_OUTPUT)
target_frame_width = int(input_video_frame_width * SCALE_OUTPUT)

cap.release()
print(
    f"The input video has a frame width of {input_video_frame_width}, "
    f"frame height of {input_video_frame_height} and runs at {input_fps:.2} fps"
)
print(
    "The result video will be scaled with a factor "
    f"{SCALE_OUTPUT}, have width {target_frame_width}, "
    f" height {target_frame_height}, and run at {target_fps:.2} fps"
)

### Inference loop

In [None]:
# Initialize variables
input_video_frame_nr = 0
start_time = time.perf_counter()
total_inference_duration = 0

# Open input video
cap = cv2.VideoCapture(str(video_path))

# Create result video
out_video = cv2.VideoWriter(
    str(result_video_path),
    FOURCC,
    target_fps,
    (target_frame_width, target_frame_height),
)

num_frames = int(NUM_SECONDS * input_fps)
total_frames = (
    cap.get(cv2.CAP_PROP_FRAME_COUNT) if num_frames == 0 else num_frames
)
progress_bar = ProgressBar(total=total_frames)
progress_bar.display()

try:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            cap.release()
            break

        if input_video_frame_nr >= total_frames:
            break

        # Only process every second frame
        # Prepare frame for inference
        # resize to input shape for network
        resized_image = cv2.resize(
            image, (network_image_height, network_image_width)
        )
        # reshape image to network input shape NCHW
        input_image = np.expand_dims(np.transpose(resized_image, (2, 0, 1)), 0)

        # Do inference
        inference_start_time = time.perf_counter()
        result = exec_net.infer(inputs={input_key: input_image})[output_key]
        inference_stop_time = time.perf_counter()
        inference_duration = inference_stop_time - inference_start_time
        total_inference_duration += inference_duration

        if input_video_frame_nr % (10 * ADVANCE_FRAMES) == 0:
            clear_output(wait=True)
            progress_bar.display()
            # input_video_frame_nr // ADVANCE_FRAMES gives the number of
            # frames that have been processed by the network
            display(
                Pretty(
                    f"Processed frame {input_video_frame_nr // ADVANCE_FRAMES}"
                    f"/{total_frames // ADVANCE_FRAMES}. "
                    f"Inference time: {inference_duration:.2f} seconds "
                    f"({1/inference_duration:.2f} FPS)"
                )
            )

        # Transform network result to RGB image
        result_frame = to_rgb(convert_result_to_image(image, result, labeldict))
        # Resize image and result to target frame shape
        result_frame = cv2.resize(
            result_frame, (target_frame_width, target_frame_height)
        )
        # Save frame to video
        out_video.write(result_frame)

        input_video_frame_nr = input_video_frame_nr + ADVANCE_FRAMES
        cap.set(1, input_video_frame_nr)

        progress_bar.progress = input_video_frame_nr
        progress_bar.update()

except KeyboardInterrupt:
    print("Processing interrupted.")
finally:
    clear_output()
    processed_frames = num_frames // ADVANCE_FRAMES
    out_video.release()
    cap.release()
    end_time = time.perf_counter()
    duration = end_time - start_time

    print(
        f"Processed {processed_frames} frames in {duration:.2f} seconds. "
        f"Total FPS (including video processing): {processed_frames/duration:.2f}."
        f"Inference FPS: {processed_frames/total_inference_duration:.2f} "
    )
    print(f"Safety Gear Detection Video saved to '{str(result_video_path)}'.")

### Display or download video with results

In [None]:
# TODO: embed=True doesn't work well for large videos
video = Video(result_video_path, width=800, embed=True)
if not result_video_path.exists():
    plt.imshow(result_frame)
    raise ValueError(
        "OpenCV was unable to write the video file. Showing one video frame."
    )
else:
    print(
        "Showing Safety Gear Detection video saved at\n"
        f"{result_video_path.resolve()}"
    )
    print(
        "If you cannot see the video in your browser, please click on the "
        "following link to download the video "
    )
    video_link = FileLink(result_video_path)
    video_link.html_link_str = "<a href='%s' download>%s</a>"
    display(HTML(video_link._repr_html_()))
    display(video)