# Download Dataset (MOT17)

In [None]:
!wget https://motchallenge.net/data/MOT17.zip

In [None]:
!unzip-qq MOT17.zip

## Import Libraries

In [None]:
import pandas as pd
import os
import yaml
import shutil
import configparser

from tqdm import tqdm

## Normalize bounding box

In [None]:
def convert_to_yolo_format(bb, img_width, img_height):
    x_center = bb['bb_left'] + (bb['bb_width'] / 2)
    y_center = bb['bb_top'] + (bb['bb_height'] / 2)
    # Normalize the coordinates by the dimensions of the image
    x_center /= img_width
    y_center /= img_height
    bb_width_normalized = bb['bb_width'] / img_width
    bb_height_normalized = bb['bb_height'] / img_height

    # Clip the values to make sure they are between 0 and 1
    x_center = max(min(x_center, 1), 0)
    y_center = max(min(y_center, 1), 0)
    bb_width_normalized = max(min(bb_width_normalized, 1), 0)
    bb_height_normalized = max(min(bb_height_normalized, 1), 0)

    return (x_center, y_center, bb_width_normalized, bb_height_normalized)

## Read file seqinfo.ini

In [None]:
def process_folder(folder_path):
    # Read image dimensions from seqinfo.ini
    config = configparser.ConfigParser()
    config.read(os.path.join(folder_path, 'seqinfo.ini'))
    img_width = int(config['Sequence']['imWidth'])
    img_height = int(config['Sequence']['imHeight'])

    # Load ground truth data
    gt_path = os.path.join(folder_path, 'det/det.txt')
    gt_data = pd.read_csv(gt_path, header=None, names=['frame', 'id', 'bb_left', 'bb_top', 'bb_width', 'bb_height', 'conf', 'class', 'visibility'])

    labels_folder = os.path.join(folder_path, 'labels')
    os.makedirs(labels_folder, exist_ok=True)

    for frame_number in gt_data['frame'].unique():
        frame_data = gt_data[gt_data['frame'] == frame_number]
        label_file = os.path.join(labels_folder, f'{frame_number:06d}.txt')

        with open(label_file, 'w') as file:
            for _, row in frame_data.iterrows():
                yolo_bb = convert_to_yolo_format(row, img_width, img_height)
                file.write(f'0 {yolo_bb[0]} {yolo_bb[1]} {yolo_bb[2]} {yolo_bb[3]}\n')

## Process dataset

In [None]:
def process_all_folders(base_directory):
    # List all subdirectories in the base directory
    for folder_name in tqdm(os.listdir(base_directory)):
        folder_path = os.path.join(base_directory, folder_name)

        # Delete folder not contain ’FRCNN’ in name
        if 'FRCNN' not in folder_name:
            os.system(f'rm-rf {folder_path}')
            continue

        if os.path.isdir(folder_path):
            process_folder(folder_path)

In [None]:
process_all_folders('MOT17/train')
process_all_folders('MOT17/test')

## Move files

In [None]:
def rename_and_move_files(src_folder, dst_folder,folder_name, file_extension):

    for filename in os.listdir(src_folder):
        if filename.endswith(file_extension):
            # Include folder name in the new filename
            new_filename = f'{folder_name}_{filename}'
            shutil.move(os.path.join(src_folder, filename), os.path.join(dst_folder, new_filename))

## Move all files

In [None]:
def move_files_all_folders(base_directory):
    images_dir = os.path.join(base_directory, 'images')
    labels_dir = os.path.join(base_directory, 'labels')
    os.makedirs(images_dir, exist_ok=True)
    os.makedirs(labels_dir, exist_ok=True)
    for folder_name in tqdm(os.listdir(base_directory)):
        if folder_name in ['images', 'labels']: # Skip these folders
            continue
        folder_path = os.path.join(base_directory, folder_name)
        if os.path.isdir(folder_path):
            rename_and_move_files(os.path.join(folder_path, 'img1'),images_dir, folder_name, '.jpg')
            rename_and_move_files(os.path.join(folder_path, 'labels'),labels_dir, folder_name, '.txt')


## Delete subfolders

In [None]:
def delete_subfolders(base_directory):
    for folder_name in os.listdir(base_directory):
        folder_path = os.path.join(base_directory, folder_name)
        if os.path.isdir(folder_path) and folder_name not in ['images', 'labels']:
            shutil.rmtree(folder_path)
            print(f"Deleted folder: {folder_name}")

delete_subfolders('MOT17/train')
delete_subfolders('MOT17/test')

## Setup yaml file

In [None]:
class_labels = ['objects']
dataset_root_dir = os.path.join(
    os.getcwd(),
    'MOT17'
)
yolo_yaml_path = os.path.join(
    dataset_root_dir,
    'mot17_data.yml'
)

data_yaml = {
    'path': dataset_root_dir,
    'train': 'train/images',
    'val': 'test/images',
    'nc': len(class_labels),
    'names': class_labels
}

with open(yolo_yaml_path, 'w') as f:
    yaml.dump(data_yaml, f, default_flow_style=False)

# Module Detector YOLOv8

## Import Ultralytics

In [None]:
!pip install ultralytics-q

import ultralytics
ultralytics.checks()

In [None]:
from ultralytics import YOLO

## Training Model

In [None]:
from ultralytics import YOLO
# Load the YOLOv8 model
model = YOLO('yolov8s.pt')

# Config
epochs = 30
batch_size =-1 # Auto scale based on GPU memory
img_size = 640
project_name = 'models/yolo'
name = 'yolov8s_mot17_det'

# Train the model
results = model.train(
 data=yolo_yaml_path,
 epochs=epochs,
 batch=batch_size,
 imgsz=img_size,
 project=project_name,
 name=name)

## Class Detector

In [None]:
from ultralytics import YOLO

class YOLOv8:
    def __init__(
        self,
        model_path
    ):
        self.model = YOLO(model_path)
    def detect(self, source_img):
        results = self.model.predict(source_img, verbose=False)[0]
        bboxes = results.boxes.xywh.cpu().numpy()
        bboxes[:, :2] = bboxes[:, :2]- (bboxes[:, 2:] / 2)
        scores = results.boxes.conf.cpu().numpy()
        class_ids = results.boxes.cls.cpu().numpy()
        return bboxes, scores, class_ids

# Module Tracker: DeepSORT

In [None]:
!pip install scikit-learn numpy opencv-python tensorflow spacy-q
!pip install gdown==4.6.0-q

In [None]:
!git clone https://github.com/wjnwjn59/deep_sort.git

In [None]:
!gdown--no-check-certificate--folder https://drive.google.com/open?id=18fKzfqnqhqW3s9zwsCbnVJ5XF2JFeqMp

In [None]:
import os
import cv2
import numpy as np

## Class Tracker

In [None]:
from deep_sort.deep_sort import nn_matching
from deep_sort.deep_sort.detection import Detection
from deep_sort.deep_sort.tracker import Tracker
from deep_sort.tools import generate_detections as gdet

In [None]:
class DeepSORT:
    def __init__(
        self,
        model_path='resources/networks/mars-small128.pb',
        max_cosine_distance = 0.7,
        nn_budget = None,
        classes=['objects']
    ):

        self.encoder = gdet.create_box_encoder(model_path, batch_size=1)
        self.metric = nn_matching.NearestNeighborDistanceMetric('cosine',max_cosine_distance, nn_budget)
        self.tracker = Tracker(self.metric)

        key_list = []
        val_list = []
        for ID, class_name in enumerate(classes):
            key_list.append(ID)
            val_list.append(class_name)
        self.key_list = key_list
        self.val_list = val_list
    def tracking(self, origin_frame, bboxes, scores, class_ids):
        features = self.encoder(origin_frame, bboxes)

        detections = [Detection(bbox, score, class_id, feature)
            for bbox, score, class_id, feature in zip(bboxes,
                                                      scores,
                                                      class_ids,
                                                      features)]

        self.tracker.predict()
        self.tracker.update(detections)

        tracked_bboxes = []
        for track in self.tracker.tracks:
            if not track.is_confirmed() or track.time_since_update > 5:
                continue
            bbox = track.to_tlbr()
            class_id = track.get_class()
            conf_score = track.get_conf_score()
            tracking_id = track.track_id
            tracked_bboxes.append(
                bbox.tolist() + [class_id, conf_score, tracking_id]
            )

        tracked_bboxes = np.array(tracked_bboxes)

        return tracked_bboxes

## Tracking inference

In [None]:
def draw_detection(img, bboxes, scores, class_ids, ids,
                    classes=['objects'], mask_alpha=0.3):
    height, width = img.shape[:2]
    np.random.seed(0)
    rng = np.random.default_rng(3)
    colors = rng.uniform(0, 255, size=(len(classes), 3))

    mask_img = img.copy()
    det_img = img.copy()

    size = min([height, width]) * 0.0006
    text_thickness = int(min([height, width]) * 0.001)

    # Draw bounding boxes and labels of detections
    for bbox, score, class_id, id_ in zip(bboxes, scores, class_ids, ids):
        color = colors[class_id]

        x1, y1, x2, y2 = bbox.astype(int)

        # Draw rectangle
        cv2.rectangle(det_img, (x1, y1), (x2, y2), color, 2)
        # Draw fill rectangle in mask image
        cv2.rectangle(mask_img, (x1, y1), (x2, y2), color,-1)

        label = classes[class_id]
        caption = f'{label} {int(score * 100)}% ID: {id_}'
        (tw, th), _ = cv2.getTextSize(text=caption,
        fontFace=cv2.FONT_HERSHEY_SIMPLEX,
        fontScale=size,
        thickness=text_thickness)
        th = int(th * 1.2)

        cv2.rectangle(det_img, (x1, y1), (x1 + tw, y1-th), color,-1)
        cv2.rectangle(mask_img, (x1, y1), (x1 + tw, y1-th), color,-1)
        cv2.putText(det_img, caption, (x1, y1),
                    cv2.FONT_HERSHEY_SIMPLEX, size,
                    (255, 255, 255),
                    text_thickness, cv2.LINE_AA)

        cv2.putText(mask_img, caption, (x1, y1),
                    cv2.FONT_HERSHEY_SIMPLEX, size,
                    (255, 255, 255),
                    text_thickness, cv2.LINE_AA)

    return cv2.addWeighted(mask_img, mask_alpha, det_img, 1-mask_alpha, 0)

## Tracking Video

In [None]:
def video_tracking(video_path, detector, tracker,
                  is_save_result=False, save_dir='tracking_results'):
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    if is_save_result:
        os.makedirs(save_dir, exist_ok=True)
        # Get the video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS))

        # Define the codec and create the video writer
        fourcc = cv2.VideoWriter_fourcc(*’MJPG’)

        save_result_name = ’output_video.avi’
        save_result_path = os.path.join(save_dir, save_result_name)
        out = cv2.VideoWriter(save_result_path, fourcc, fps, (width, height))


    all_tracking_results = []
    tracked_ids = np.array([], dtype=np.int32)
    while True:
        ret, frame = cap.read()

        if not ret:
            break

        detector_results = detector.detect(frame)
        bboxes, scores, class_ids = detector_results

        tracker_pred = tracker.inference(
            origin_frame=frame,
            bboxes=bboxes,
            scores=scores,
            class_ids=class_ids
        )
        if tracker_pred.size > 0:
            bboxes = tracker_pred[:, :4]
            class_ids = tracker_pred[:, 4].astype(int)
            conf_scores = tracker_pred[:, 5]
            tracking_ids = tracker_pred[:, 6].astype(int)

            # Get new tracking IDs
            new_ids = np.setdiff1d(tracking_ids, tracked_ids)

            # Store new tracking IDs
            tracked_ids = np.concatenate((tracked_ids, new_ids))

            result_img = draw_detection(
                img=frame,
                bboxes=bboxes,
                scores=conf_scores,
                class_ids=class_ids,
                ids=tracking_ids
            )
        else:
            result_img=frame

        all_tracking_results.append(tracker_pred)

        if is_save_result == 1:
            out.write(result_img)

        # Break the loop if ’q’ is pressed
        if cv2.waitKey(25) & 0xFF == ord('q'):
            break

    # Release video capture
    cap.release()
    if is_save_result:
        out.release()
    cv2.destroyAllWindows()

    return all_tracking_results

## Tracking

In [None]:
yolo_model_path = 'yolov8_mot_det.pt'

detector = YOLOv8(yolo_model_path)
tracker = DeepSORT()

video_path = '/content/CityRoam.mp4'
all_tracking_results = video_tracking(
    video_path,
    detector,
    tracker,
    is_save_result=True)

In [None]:
from IPython.display import HTML
from base64 import b64encode
import os

# Input video path
output_video_path = 'tracking_results/output_video.avi'

# Compressed video path
compressed_path = 'tracking_results/result_compressed.mp4'

os.system(f"ffmpeg-i {output_video_path}-vcodec libx264 {compressed_path}")

In [None]:
# Show video
mp4 = open(compressed_path,'rb').read()
data_url = "data:video/mp4;base64," + b64encode(mp4).decode()
HTML("""
<video width=600 controls>

 <source src="%s" type="video/mp4">
</video>
""" % data_url)