# Exercise: object counting

Let's apply what we learned about object counting in videos. We're going to count how many bags are passing through a conveyer belt at an airport.

## Helpers

In order for you to focus on the important bits, we define here for you some helper functions:

In [None]:
import cv2
from hydra import compose, initialize_config_module
from omegaconf import DictConfig
import torch
from torchvision.transforms.functional import to_tensor
import numpy as np
from PIL import Image
from IPython.display import display, Video

from yolo.tools.data_augmentation import PadAndResize
from yolo.tools.solver import InferenceModel


def get_fps_and_video_size(video_path):
    cap = cv2.VideoCapture(video_path)
    fps = cap.get(cv2.CAP_PROP_FPS)

    # Get frame size
    frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    cap.release()
    
    return fps, (frame_width, frame_height)


def get_model_instance(input_video: str) -> tuple[InferenceModel, DictConfig]:

    # Select device (use GPU if available)
    device = (
        "cuda"
        if torch.cuda.is_available()
        else "mps" if torch.backends.mps.is_available() else "cpu"
    )
    print(f"Using device: {device}")

    # This is necssary to avoid issues with tensors on different devices
    # for this particular version of YOLO
    torch.set_default_device(device)

    # We load the default YOLO configuration, then we override some of its parameters
    # (this is the hidiomatic way of doing things for Hydra, a configuration management tool)
    with initialize_config_module(config_module="yolo.config", version_base=None):
        cfg = compose(
            config_name="config",
            # These are the parameters we want to override
            overrides=[
                "task.task=inference",
                # v9-s is the smallest model
                "model=v9-s",
                # We point to our video file
                f"task.data.source={input_video}",
                # We do not want to track on Weights and Biases
                "use_wandb=false",
                # We set out device
                f"device={device}",
            ],
        )
    # This is the way of loading and setting up a model
    # with this version of YOLOv7
    model = InferenceModel(cfg).to(device)
    model.eval()
    # This is a custom step that is necessary to setup the
    # post-processing step of the model (which includes the 
    # Non-Maximum Suppression)
    model.setup(cfg.task.task)

    return model, cfg


def preprocess_frame(
    frame: np.ndarray,
    pad_and_resize: PadAndResize,
    device: str = "cpu",
) -> tuple[torch.Tensor, torch.Tensor, Image.Image]:
    # We need to pad and resize every frame to match the expected
    # input resolution of the model

    frame = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
    untransformed_frame = frame.copy()

    # PadAndResize can also operate on the ground truth boxes,
    # which we don't have here (because this is inference on unknown data)
    # So we use a dummy tensor
    fake_boxes = torch.zeros((1, 6))
    transformed_frame, _, transform_info = pad_and_resize(frame, fake_boxes)
    transformed_frame = to_tensor(transformed_frame)
    batch_of_one = transformed_frame[None]
    rev_tensor = transform_info[None]

    batch_of_one = batch_of_one.to(device)
    rev_tensor = rev_tensor.to(device)

    return batch_of_one, rev_tensor, untransformed_frame


## Call YOLO in the right way

Here you will complete the first part. Look for the `TODO` comment in the `run_inference_on_one_frame` function code and complete the code:

In [None]:
from typing import Callable
from tqdm import tqdm
from torch.amp import autocast


def run_inference_on_one_frame(
    model: InferenceModel, frame: np.ndarray, pad_and_resize: Callable
) -> list:
    
    # Pre-process the frame and get:
    # the batch of one (the pre-processed frame ready to be fed to the model)
    # the rev_tensor (the information needed to reverse the transformations)
    # the untransformed_frame (the original frame, needed for visualization)
    batch_of_one, rev_tensor, untransformed_frame = preprocess_frame(
        frame, pad_and_resize, device=model.device
    )

    # TODO: Run YOLO. This will return the raw outputs of the model
    # HINT: just call the model on `batch_of_one`
    outputs = model(batch_of_one)

    # TODO: Re-format outputs and apply Non-Maximum Suppression to remove
    # duplicate detections
    # HINT: use the model's `post_process` method on `outputs`,
    # and remember to provide the inverse transformation `rev_tensor`
    predicts = model.post_process(outputs, rev_tensor=rev_tensor)

    # We expect only one element in the batch (one frame)
    assert len(predicts) == 1

    return untransformed_frame, predicts[0].detach().cpu()


def run_inference_on_video(
    input_video: str
) -> list:
    
    # Instance model
    model, cfg = get_model_instance(input_video)

    # We use opencv to loop through the frames of the video
    cap = cv2.VideoCapture(input_video)
    # Get the total number of frames in the video
    n_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

    # We need to pad and resize every frame to match the expected
    # input resolution of the model
    pad_and_resize = PadAndResize(cfg.image_size)

    results = []

    with torch.no_grad():

        # NOTE: this is absolutely necessary for good results with this
        # version of YOLO. Failing to do this will result in very poor
        # performance, because of the way the model has been trained.
        with autocast(model.device.type):

            for _ in tqdm(range(n_frames), total=n_frames):

                # Read frame from the video
                ret, frame = cap.read()

                if not ret:
                    # Video is finished
                    break

                untransformed_frame, predicts = run_inference_on_one_frame(
                    model, frame, pad_and_resize
                )

                # Append results for this frame
                results.append([predicts])

    cap.release()

    return results, cfg.dataset.class_list

Let's make sure it works by running it on our test video:

In [3]:
input_video = "../bags.mp4"
# model, cfg = get_model_instance(input_video)
# cfg.dataset.class_list
results, class_list = run_inference_on_video(input_video)

Using device: mps


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 510/510 [01:00<00:00,  8.44it/s]


Great! Now let's add the tracking part and the counting. Complete the lines with `TODO` in the following code:

(Note that we do not use slicing here, as the objects in this video are large so there is no need)

In [4]:
import supervision as sv


class YOLOVideoObjectCounter:
    """
    A class that encapsulates the logic for counting objects in a video using YOLO and ByteTrack.
    """

    def __init__(
        self,
        video_file: str,
        line_zone: sv.LineZone = None,
    ):

        self.model, cfg = get_model_instance(video_file)

        # Get FPS and video frame size
        fps, video_frame_size = get_fps_and_video_size(video_file)

        # TODO: create an instance of the tracker here.
        # Remember to set frame_rate to our video's fps
        self.byte_tracker = sv.ByteTrack(frame_rate=fps)

        self.line_zone = line_zone

        # These are utilities to draw on the video for visualization
        # purposes
        self.line_zone_annotator = sv.LineZoneAnnotator(
            thickness=2, text_thickness=2, text_scale=1
        )
        self.bounding_box_annotator = sv.BoxAnnotator()
        self.label_annotator = sv.LabelAnnotator()

        # We need to pad and resize every frame to match the expected
        # input resolution of the model
        self.pad_and_resize = PadAndResize(cfg.image_size)

        self.class_list = cfg.dataset.class_list

    @staticmethod
    def yolo_to_sv_detections(yolo_outputs: torch.Tensor):
        """
        Re-organize information in the format expected by the supervision tracker
        """

        yolo_outputs = yolo_outputs.cpu().numpy()

        detections = sv.Detections(
            # yolo_outputs is a tensor of shape (n_detections, 6)
            # where each detection is (class_id, x1, y1, x2, y2, score)
            xyxy=yolo_outputs[:, 1:5],  # box coordinates
            confidence=yolo_outputs[:, 5],  # confidence score
            class_id=yolo_outputs[:, 0].astype(int),  # class id as integer
        )

        return detections

    def _yolo_inference(self, frame: np.ndarray) -> sv.Detections:
        """
        Runs inference on one frame and returns results in the format
        expected by the supervision tracker
        """
        _, predicts = run_inference_on_one_frame(self.model, frame, self.pad_and_resize)
        return self.yolo_to_sv_detections(predicts)

    def run_on_one_frame(self, frame: np.ndarray, index: int) -> np.ndarray:

        # TODO: Run YOLO on the frame
        # HINT: use the `_yolo_inference` method defined above
        detections = self._yolo_inference(frame)

        # TODO: update the tracker with the new detections
        # HINT: use the `update_with_detections` method of the tracker
        detections = self.byte_tracker.update_with_detections(detections)

        if self.line_zone is not None:
            # Counting bags
            # class_id 24 is backpack, 26 is handbag, class_id 28 is suitcase
            bag_detections = detections[
                (detections.class_id == 24)
                | (detections.class_id == 26)
                | (detections.class_id == 28)
            ]
            # TODO: trigger the line zone with the bag detections
            # HINT: use the `trigger` method of the line zone
            self.line_zone.trigger(bag_detections)

        labels = [
            f"{self.class_list[int(class_id)]} {tracker_id} {confidence:0.2f}"
            for _, class_id, confidence, tracker_id in zip(
                detections.xyxy,
                detections.class_id,
                detections.confidence,
                detections.tracker_id,
            )
        ]

        annotated_frame = self.bounding_box_annotator.annotate(
            scene=frame.copy(), detections=detections
        )

        annotated_frame = self.label_annotator.annotate(
            scene=annotated_frame, detections=detections, labels=labels
        )

        if self.line_zone is not None:
            # Apply counting annotation to show the line and the
            # counts
            annotated_frame = self.line_zone_annotator.annotate(
                annotated_frame, line_counter=self.line_zone
            )

        return annotated_frame

Great! Now let's test what we got:

In [8]:
input_video = "../bags.mp4"
output_video = "output_detected.m4v"

# Let's define a line in the video
# We use a vertical line in the middle of the conveyor belt
_, image_size = get_fps_and_video_size(input_video)
START = sv.Point(image_size[0] // 2, 0)
END = sv.Point(image_size[0] // 2, image_size[1])
line_zone = sv.LineZone(
    start=START, 
    end=END,
    # We trigger the count when the center of the bounding
    # box crosses the line
    triggering_anchors=[sv.Position.CENTER],
)

# This works as before
processor = YOLOVideoObjectCounter(video_file=input_video, line_zone=line_zone)

sv.process_video(
    source_path=input_video,
    target_path=output_video,
    callback=processor.run_on_one_frame,
    show_progress=True,
)


Using device: mps


Processing video:   0%|          | 0/510 [00:00<?, ?it/s]

In [None]:

# Convert from m4v to mp4 so we can display it here
!ffmpeg -i {output_video} -c:v libx264 -tag:v avc1 bags_on_conveyor_belt_detected.mp4 -y > /dev/null 2>&1

display(
        Video(url="bags_on_conveyor_belt_detected.mp4", embed=False)
)
