![Degirum banner](https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/degirum_banner.png)
## Counting People in a Video Frame-by-Frame
This notebook demonstrates the algorithm of counting people in a video frame, and the use of this algorithm to annotate a video.

This script works with the following inference options:

1. Run inference on the DeGirum Cloud Platform;
2. Run inference on a DeGirum AI Server deployed on the local host or on some computer in your LAN or VPN;
3. Run inference on a DeGirum ORCA accelerator directly installed on your computer.

To try different options, you need to specify the appropriate `hw_location` option. 

When running this notebook locally, you need to specify your cloud API access token in the [env.ini](../../env.ini) file, located in the same directory as this notebook.

When running this notebook in Google Colab, the cloud API access token should be stored in a user secret named `DEGIRUM_CLOUD_TOKEN`.

Note: Please specify a path to an input video source before running this notebook.

In [None]:
# make sure degirum and degirum-tools packages are installed
!pip show degirum-tools || pip install degirum-tools

#### Specify where you want to run your inferences, model zoo url, model name and video source

In [None]:
# hw_location: where you want to run inference
#     "@cloud" to use DeGirum cloud
#     "@local" to run on local machine
#     IP address for AI server inference
# person_model_zoo_url: url/path for person model zoo
#     Use cloud_zoo_url for @cloud, @local, and AI server inference options.
#     Use '' for an AI server serving models from a local folder.
#     Use a path to a JSON file for a single model zoo in case of @local inference.
# person_model_name: name of the model for person detection.
# head_model_zoo_url: URL/path for the head model zoo.
# head_model_name: name of the model for head detection.
# face_model_zoo_url: URL/path for the face model zoo.
# face_model_name: name of the model for face detection.
# video_source: video source for inference
#     camera index for local camera
#     URL of RTSP stream
#     URL of YouTube Video
#     path to video file (mp4 etc)
# degirum_cloud_token: your token for accessing the DeGirum cloud platform
hw_location = "@cloud"
person_model_zoo_url = "degirum/person_detection"
person_model_name = "yolov8m_relu6_person--960x960_float_openvino_cpu_1"
head_model_zoo_url = "degirum/human_head_detection"
head_model_name = "yolov8s_relu6_human_head--960x960_float_openvino_cpu_1"
face_model_zoo_url = "degirum/face_detection"
face_model_name = "yolov8s_relu6_face--640x640_float_openvino_cpu_1"

#### Specify arguments for the annotation process (please specify the models, threshold value, and video source)

In [None]:
# Detection models to be used in the person counting algorithm;
# can be one or any combination of the following - 'person', 'head', 'face'
models = ["person", "head", "face"]

# IoMA (Intersection over Minimum Area) threshold for matching bounding boxes
threshold = 0.7

# Paths to video source and output video
video_source = "https://raw.githubusercontent.com/DeGirum/PySDKExamples/main/images/store_short.mp4"
output_video = "temp/person_count_video.mp4"

#### Connect to model zoos, load the corresponding detection models, and create the combined model to use for inference

In [None]:
import degirum as dg
import degirum_tools as dgtools

# Set up models
model = {}
model["person"] = dg.load_model(
    model_name=person_model_name,
    inference_host_address=hw_location,
    zoo_url=person_model_zoo_url,
    token=dgtools.get_token(),
    overlay_show_labels=False,
    overlay_line_width=2,
    input_letterbox_fill_color=(114, 114, 114),
)
model["head"] = dg.load_model(
    model_name=head_model_name,
    inference_host_address=hw_location,
    zoo_url=head_model_zoo_url,
    token=dgtools.get_token(),
    overlay_show_labels=False,
    overlay_line_width=2,
    input_letterbox_fill_color=(114, 114, 114),
)
model["face"] = dg.load_model(
    model_name=face_model_name,
    inference_host_address=hw_location,
    zoo_url=face_model_zoo_url,
    token=dgtools.get_token(),
    overlay_show_labels=False,
    overlay_line_width=2,
    input_letterbox_fill_color=(114, 114, 114),
)

# Create combined model from individual models
for i, model_name in enumerate(models):
    if i == 0:
        combined_model = model[model_name]
    else:
        combined_model = dgtools.CombiningCompoundModel(
            combined_model, model[model_name]
        )

#### People Counting Algorithm

In [None]:
import numpy as np


def ioma(box1, box2):
    """Calculate intersection over minimum area (IoMA) between two bounding boxes."""
    # Unpack the bounding box coordinates
    b1_x1, b1_y1, b1_x2, b1_y2 = box1
    b2_x1, b2_y1, b2_x2, b2_y2 = box2

    if b1_x1 > b2_x2 or b1_x2 < b2_x1 or b1_y1 > b2_y2 or b1_y2 < b2_y1:
        return 0.0  # If the boxes don't overlap, return 0

    # Compute intersection coordinates
    intersection = [
        max(b1_x1, b2_x1),
        max(b1_y1, b2_y1),
        min(b1_x2, b2_x2),
        min(b1_y2, b2_y2),
    ]

    # Compute areas of the boxes and their intersection
    area1 = (b1_x2 - b1_x1) * (b1_y2 - b1_y1)
    area2 = (b2_x2 - b2_x1) * (b2_y2 - b2_y1)
    i_area = (intersection[2] - intersection[0]) * (intersection[3] - intersection[1])

    # Return the IoMA value
    return i_area / min(area1, area2)


def add_detections_to_persons(person_boxes, new_boxes, threshold):
    """Add new detections to existing person boxes based on IoMA."""
    # Immediately return if there are no new boxes to process
    if not new_boxes:
        return person_boxes

    # Check if there are existing person boxes; if not, add new boxes as new persons directly
    if not person_boxes:
        return [[n_box] for n_box in new_boxes]

    # Create a matrix of IoMA values
    ioma_matrix = np.zeros((len(new_boxes), len(person_boxes)))
    for i, n_box in enumerate(new_boxes):
        for j, person in enumerate(person_boxes):
            # Compute the max IoMA of the new box with all boxes of the person
            ioma_matrix[i, j] = max(
                ioma(n_box["bbox"], p_box["bbox"]) for p_box in person
            )

    # Continue assigning boxes to persons based on the IoMA matrix
    while np.max(ioma_matrix) > threshold:  # and person_assigned < len(person_boxes):
        # Find the new box-person pair with the highest IoMA
        i, j = np.unravel_index(np.argmax(ioma_matrix, axis=None), ioma_matrix.shape)

        # Add the new box to the corresponding person
        person_boxes[j].append(new_boxes[i])

        # Remove the assigned new box and person from further consideration
        ioma_matrix[i, :] = -1  # Invalidate this row (new box)

    # Add any remaining unassigned new boxes as new persons
    for i, row in enumerate(ioma_matrix):
        if np.max(row) != -1:  # If this new box hasn't been assigned
            person_boxes.append([new_boxes[i]])

    return person_boxes


def aggregate_person_boxes(res, threshold=0.6):
    # Extract and separate person, face, and head bounding boxes from results
    persons = [
        res.results[i]
        for i in range(len(res.results))
        if res.results[i]["label"] == "Person"
    ]
    heads = [
        res.results[i]
        for i in range(len(res.results))
        if res.results[i]["label"] == "Human head"
    ]
    faces = [
        res.results[i]
        for i in range(len(res.results))
        if res.results[i]["label"] == "Human face"
    ]

    # Initialize person boxes with each face's box
    person_boxes = [[p] for p in faces]

    # Add head and person detections to existing person detections
    person_boxes = add_detections_to_persons(person_boxes, heads, threshold)
    person_boxes = add_detections_to_persons(person_boxes, persons, threshold)
    return person_boxes

#### Analyzer
In order to annotate a video, a class known as an Analyzer is required. A child of this class will compute the number of people based on inference results, and annotate each frame with the computed value. This class, called PersonCounter, is implemented below.

In [None]:
from numpy import ndarray
from degirum import _draw_primitives as dp


class PersonCounter(dgtools.ResultAnalyzerBase):
    """Analyzer class to count people using inference results' bounding boxes."""

    # Compute number of people in a given frame.
    def analyze(self, result):
        p_box = aggregate_person_boxes(result, threshold=threshold)
        result.person_count = len(p_box)

    # Annotate a video frame with the number of people in it.
    def annotate(self, result, image: ndarray) -> ndarray:

        back_color = (
            result.overlay_color
            if not isinstance(result.overlay_color, list)
            else result.overlay_color[0]
        )
        font_color = dgtools.deduce_text_color(back_color)
        label = f"Number of People: {result.person_count}"
        dgtools.put_text(
            image,
            label,
            (0, image.shape[0]),
            corner_position=dgtools.CornerPosition.BOTTOM_LEFT,
            bg_color=back_color,
            font_color=font_color,
        )

        return image


# Instantiate a PersonCounter Analyzer
person_counter = PersonCounter()

#### Annotate the video source with the PersonCounter Analyzer.

In [None]:
dgtools.annotate_video(
    combined_model, video_source, output_video, analyzers=person_counter
)

packet queue is empty, aborting
