## 1. Install libraries and trained detection model

In [None]:
!pip install ultralytics -q
!pip install scikit-learn numpy opencv-python tensorflow spacy -q
!pip install gdown==4.6.0 -q

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/699.8 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [91m━━━━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m102.4/699.8 kB[0m [31m2.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━[0m [32m409.6/699.8 kB[0m [31m5.8 MB/s[0m eta [36m0:00:01[0m[2K     [91m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m[91m╸[0m [32m696.3/699.8 kB[0m [31m7.1 MB/s[0m eta [36m0:00:01[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m699.8/699.8 kB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m
[?25h

In [1]:
# Download trained detection model, you should use your own one
# https://drive.google.com/file/d/1gWPKcmAnIsW7JcErigf5-CLp0QAgvtOx/view?usp=sharing
!gdown 1gWPKcmAnIsW7JcErigf5-CLp0QAgvtOx

Downloading...
From: https://drive.google.com/uc?id=1gWPKcmAnIsW7JcErigf5-CLp0QAgvtOx
To: /home/minhvo/DATA/12.Project_AI/Image-Project-Tracking-by-Detection-Yolov8/notebook/yolov8_mot_det.pt
100%|██████████████████████████████████████| 22.5M/22.5M [00:02<00:00, 7.97MB/s]


## 2. Install DeepSORT

In [None]:
!git clone https://github.com/wjnwjn59/deep_sort.git

Cloning into 'deep_sort'...
remote: Enumerating objects: 167, done.[K
remote: Counting objects: 100% (25/25), done.[K
remote: Compressing objects: 100% (25/25), done.[K
remote: Total 167 (delta 9), reused 0 (delta 0), pack-reused 142[K
Receiving objects: 100% (167/167), 77.63 KiB | 1.23 MiB/s, done.
Resolving deltas: 100% (92/92), done.


In [2]:
!gdown --no-check-certificate --folder https://drive.google.com/open?id=18fKzfqnqhqW3s9zwsCbnVJ5XF2JFeqMp

Retrieving folder list
Retrieving folder 1VVqtL0klSUvLnmBKS89il1EKC3IxUBVK detections
Retrieving folder 1qNWOpUtKG8GqEiL-LbBdXyvifUtcbOvc MOT16_POI_test
Processing file 1aEzvFHPK-N6hqLXMqhh3i9JJzn7WFUA3 MOT16-01.npy
Processing file 1h_ktJDBIEXaSBAA-RxKNYnL9e4fp2HPd MOT16-03.npy
Processing file 1ilOElwfYZLwQKH57HoYdXfuYhpPibfqF MOT16-06.npy
Processing file 1TajzH3GbumKmtYvKBvOtGERFGD0tStwG MOT16-07.npy
Processing file 1WB9Mi4RLVPHV4_20sVq7FdoeG5JYQ_J1 MOT16-08.npy
Processing file 1mksH9GWNT7zmcuq6rlRev8pevZz8Rfsm MOT16-12.npy
Processing file 1FVVhn_IpxQ_jkYhc0CUQHSQMm1SMTEBj MOT16-14.npy
Retrieving folder 1DcOcApOkxP3NdeIUXxVF1KNex6T6YDq3 MOT16_POI_train
Processing file 1Va__9NWU2ZCmaxIq4oIabi05NYWEOk1K MOT16-02.npy
Processing file 1EH7orgDPp7kqRY5OA0hEctcEtQnYq0Ea MOT16-04.npy
Processing file 1RCfHJx5ZoUecapbZCsgp0tCEiItvLsd8 MOT16-05.npy
Processing file 1VLOvn-mbpY0Q1rsMONQZhaEQIGEmyLQL MOT16-09.npy
Processing file 1SbMhOgYPvZ84xE8lRtXc7CLXJF86lwf4 MOT16-10.npy
Processing file 1a4w-Ho

In [4]:
import os
import json
import cv2
import numpy as np
import matplotlib.pyplot as plt

## 3. Define Detector

In [5]:
from ultralytics import YOLO

class YOLOv8:
    def __init__(
        self,
        model_path
    ):
        self.model = YOLO(model_path)

    def detect(self, source_img):
        results = self.model.predict(source_img, verbose=False)[0]
        bboxes = results.boxes.xywh.cpu().numpy()
        bboxes[:, :2] = bboxes[:, :2] - (bboxes[:, 2:] / 2)
        scores = results.boxes.conf.cpu().numpy()
        class_ids = results.boxes.cls.cpu().numpy()

        return bboxes, scores, class_ids

## 4. Define Tracker

In [6]:
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.deep_sort.tracker import Tracker
from deep_sort.tools import generate_detections as gdet

class DeepSORT:
    def __init__(
        self,
        model_path='resources/networks/mars-small128.pb',
        max_cosine_distance = 0.7,
        nn_budget = None,
        classes=['objects']
    ):

        self.encoder = gdet.create_box_encoder(model_path, batch_size=1)
        self.metric = nn_matching.NearestNeighborDistanceMetric('cosine', max_cosine_distance, nn_budget)
        self.tracker = Tracker(self.metric)

        key_list = []
        val_list = []
        for ID, class_name in enumerate(classes):
            key_list.append(ID)
            val_list.append(class_name)
        self.key_list = key_list
        self.val_list = val_list

    def tracking(
        self,
        origin_frame,
        bboxes,
        scores,
        class_ids
    ):
        features = self.encoder(origin_frame, bboxes)

        detections = [Detection(bbox, score, class_id, feature)
            for bbox, score, class_id, feature in zip(bboxes, scores, class_ids, features)]

        self.tracker.predict()
        self.tracker.update(detections)

        tracked_bboxes = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 5:
                continue
            bbox = track.to_tlbr()
            class_id = track.get_class()
            conf_score = track.get_conf_score()
            tracking_id = track.track_id
            tracked_bboxes.append(
                bbox.tolist() + [class_id, conf_score, tracking_id]
            )

        tracked_bboxes = np.array(tracked_bboxes)

        return tracked_bboxes

## 5. Inference

In [7]:
def draw_detection(
    img,
    bboxes,
    scores,
    class_ids,
    ids,
    classes=['objects'],
    mask_alpha=0.3
):
    height, width = img.shape[:2]
    np.random.seed(0)
    rng = np.random.default_rng(3)
    colors = rng.uniform(0, 255, size=(len(classes), 3))

    mask_img = img.copy()
    det_img = img.copy()

    size = min([height, width]) * 0.0006
    text_thickness = int(min([height, width]) * 0.001)

    # Draw bounding boxes and labels of detections
    for bbox, score, class_id, id_ in zip(bboxes, scores, class_ids, ids):
        color = colors[class_id]

        x1, y1, x2, y2 = bbox.astype(int)

        # Draw rectangle
        cv2.rectangle(det_img, (x1, y1), (x2, y2), color, 2)

        # Draw fill rectangle in mask image
        cv2.rectangle(mask_img, (x1, y1), (x2, y2), color, -1)

        label = classes[class_id]
        caption = f'{label} {int(score * 100)}% ID: {id_}'
        (tw, th), _ = cv2.getTextSize(text=caption, fontFace=cv2.FONT_HERSHEY_SIMPLEX,
                                    fontScale=size, thickness=text_thickness)
        th = int(th * 1.2)

        cv2.rectangle(det_img, (x1, y1),
                    (x1 + tw, y1 - th), color, -1)
        cv2.rectangle(mask_img, (x1, y1),
                    (x1 + tw, y1 - th), color, -1)
        cv2.putText(det_img, caption, (x1, y1),
                    cv2.FONT_HERSHEY_SIMPLEX, size, (255, 255, 255), text_thickness, cv2.LINE_AA)

        cv2.putText(mask_img, caption, (x1, y1),
                    cv2.FONT_HERSHEY_SIMPLEX, size, (255, 255, 255), text_thickness, cv2.LINE_AA)

    return cv2.addWeighted(mask_img, mask_alpha, det_img, 1 - mask_alpha, 0)

In [8]:
def video_tracking(
    video_path,
    detector,
    tracker,
    is_save_result=False,
    save_dir='tracking_results'
):
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    if is_save_result:
        os.makedirs(save_dir, exist_ok=True)
        # Get the video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))

        # Define the codec and create the video writer
        fourcc = cv2.VideoWriter_fourcc(*'MJPG')

        save_result_name = 'output_video.avi'
        save_result_path = os.path.join(save_dir, save_result_name)
        out = cv2.VideoWriter(save_result_path, fourcc, fps, (width, height))


    all_tracking_results = []
    tracked_ids = np.array([], dtype=np.int32)
    while True:
        ret, frame = cap.read()

        if not ret:
            break

        detector_results = detector.detect(frame)
        bboxes, scores, class_ids = detector_results

        tracker_pred = tracker.tracking(
            origin_frame=frame,
            bboxes=bboxes,
            scores=scores,
            class_ids=class_ids
        )
        if tracker_pred.size > 0:
            bboxes = tracker_pred[:, :4]

            class_ids = tracker_pred[:, 4].astype(int)
            conf_scores = tracker_pred[:, 5]
            tracking_ids = tracker_pred[:, 6].astype(int)

            # Get new tracking IDs
            new_ids = np.setdiff1d(tracking_ids, tracked_ids)

            # Store new tracking IDs
            tracked_ids = np.concatenate((tracked_ids, new_ids))

            result_img = draw_detection(
                img=frame,
                bboxes=bboxes,
                scores=conf_scores,
                class_ids=class_ids,
                ids=tracking_ids
            )
        else:
            result_img=frame

        all_tracking_results.append(tracker_pred)

        if is_save_result == 1:
            out.write(result_img)

        # Break the loop if 'q' is pressed
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break

    # Release video capture
    cap.release()
    if is_save_result:
        out.release()
    cv2.destroyAllWindows()

    return all_tracking_results

In [9]:
yolo_model_path = 'yolov8_mot_det.pt'

detector = YOLOv8(yolo_model_path)
tracker = DeepSORT()

In [3]:
# Download a pedestrian video sample
!gdown 1sCJAZn6Ug17HWn7PmwGH1XAqjgPhr8ZQ

Downloading...
From: https://drive.google.com/uc?id=1sCJAZn6Ug17HWn7PmwGH1XAqjgPhr8ZQ
To: /home/minhvo/DATA/12.Project_AI/Image-Project-Tracking-by-Detection-Yolov8/notebook/CityRoam.mp4
100%|██████████████████████████████████████| 5.43M/5.43M [00:00<00:00, 6.47MB/s]


In [13]:
video_path = './CityRoam.mp4'
all_tracking_results = video_tracking(
    video_path,
    detector,
    tracker,
    is_save_result=True
)

In [14]:
from IPython.display import HTML
from base64 import b64encode
import os

# Input video path
output_video_path = 'tracking_results/output_video.avi'

# Compressed video path
compressed_path = 'tracking_results/result_compressed.mp4'

os.system(f"ffmpeg -i {output_video_path} -vcodec libx264 {compressed_path}")

ffmpeg version 4.2.7-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 9 (Ubuntu 9.4.0-1ubuntu1~20.04.1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-avresample --disable-filter=resample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librsvg --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --e

0

In [15]:
# Show video
mp4 = open(compressed_path,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=600 controls>
      <source src="%s" type="video/mp4">
</video>
""" % data_url)