## Building the Boxmot Module

In [2]:
!pip install ultralytics
!git clone https://github.com/KeeganFernandesWork/yolo_tracking
%cd yolo_tracking
!pip install -r requirements.txt
!pip install .

from IPython.display import clear_output
clear_output(wait=False)

In [3]:
%matplotlib inline
from IPython.display import display
from PIL import Image, ImageOps
import matplotlib.pyplot as plt
from ultralytics import YOLO
from pathlib import Path
import numpy as np
import tracemalloc
import shutil
import time
import PIL
import cv2
import os
import sys

In [4]:
from boxmot import (OCSORT, BoTSORT, BYTETracker, DeepOCSORT, StrongSORT,create_tracker, get_tracker_config)



## Video Writer

In [5]:
def create_video_writer(video_cap, output_filename):

    # grab the width, height, and fps of the frames in the video stream.
    frame_width = int(video_cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    frame_height = int(video_cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(video_cap.get(cv2.CAP_PROP_FPS))

    # initialize the FourCC and a video writer object
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    writer = cv2.VideoWriter(output_filename, fourcc, fps,
                             (frame_width, frame_height))

    return writer

In [6]:
color = (0, 0, 255)  # BGR
thickness = 10
fontscale = 2

device = "cuda:0" # cuda:0 , cpu
fp16 = True # True if gpu available
# load the pre-trained YOLOv8n model

t = 4    #test video no
m = 9    #YOLO model no

out_dir = f'/kaggle/working/Test{t}/Y{m}_T{t}_'

model = YOLO(f"/kaggle/working/Weights/Yolo{m}_best.pt")
source = f"/kaggle/input/test-video-15fps/Test4_15fps.mp4"

#target_obj_img = np.array([]) # if we have no target and we want to track all objects
target_obj_img = cv2.imread(f'/kaggle/input/videos-target/Test{t}Target.jpg') 

# SIFT Feature Matching

In [7]:
def crop(frame, coords, save, obj_name):
    frame_np_to_PIL = Image.fromarray(frame)        # change to PIL format for easy croping
    cropped_obj_PIL = frame_np_to_PIL.crop(coords)  # do crop
    cropped_obj_to_np = np.asarray(cropped_obj_PIL) # convert to cv2 format
    cropped_obj_rgb = cv2.cvtColor(cropped_obj_to_np, cv2.COLOR_BGR2RGB)  # PIL was BGR format, need to convert in original RGB
    #display(PIL.Image.fromarray(cropped_obj_rgb))   # display RGB for troubleshooting

    if save == True:
        cv2.imwrite(obj_name, cropped_obj_to_np)
        
    return cropped_obj_to_np


def SIFTFeatureMatchine(target_obj, all_obj, frameNo):
    target_obj_bw = cv2.cvtColor(np.array(target_obj), cv2.COLOR_BGR2GRAY)
    
    good_match_list = []
    i=0
    for current_obj in all_obj:
        current_obj_bw = cv2.cvtColor(np.array(current_obj), cv2.COLOR_BGR2GRAY)
        
        reference_height, reference_width = current_obj_bw.shape
        # Resize the target image to match the dimensions of the reference image
        target_obj_bw = cv2.resize(target_obj, (reference_width, reference_height))
        
        sift = cv2.SIFT_create()
        target_obj_keypoints, target_obj_descriptors = sift.detectAndCompute(target_obj_bw, None)
        current_obj_keypoints, current_obj_descriptors = sift.detectAndCompute(current_obj_bw, None)

        matcher = cv2.BFMatcher()
        matches = matcher.knnMatch(target_obj_descriptors, current_obj_descriptors, k=2)

        good = []
        try:  # error handling: ValueError: not enough values to unpack (expected 2, got 1)
            for m, n in matches:
                if m.distance < 0.75 * n.distance:
                    good.append([m])

            good_match_list.append(len(good))
        except:
            return -1
        
    
    max_features_matched = max(good_match_list)
    if max_features_matched>5:   # if no match found
        target_obj_index = good_match_list.index(max_features_matched)
        return target_obj_index
    else:
        return -1



def findTargetObjectIndex(frame, results, target_obj_img, frameNo, obj_dir=''):
    xyxys = results.boxes.xyxy
    
    if len(obj_dir)>0:
        save = True
    else:
        save = False
    
    crop_obj_list = []
    for i in range(len(results)):
        bbox = (int(xyxys[i][0]), int(xyxys[i][1]), int(xyxys[i][2]), int(xyxys[i][3]))

        obj_name= f'{obj_dir}/Obj_{i}.jpg'
        crop_obj = crop(frame, bbox, save, obj_name)
        
        crop_obj_list.append(crop_obj)

    if(target_obj_img.size != 0 and len(crop_obj_list)>0):
        target_obj_index = SIFTFeatureMatchine(target_obj_img, crop_obj_list, frameNo)
        print('FrameNo:',frameNo, ' Target Index: ', target_obj_index)
        
        if target_obj_index == -1:
            return np.array([[0, 0, 0, 0, 0, 0, 0]])
        else:
            return results[target_obj_index]
        
    else:
        return results

In [8]:
        '''
        # Keypoint matching visualization
        if frameNo in [317, 332, 350, 356, 357]:
            final_img = cv2.drawMatchesKnn(target_obj_bw,target_obj_keypoints,current_obj_bw,current_obj_keypoints,good,None,flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)
            display(PIL.Image.fromarray(final_img))
            print('FrameNo:',frameNo, ' ObjNo: ', i)
            fileName = out_dir + 'Diagnosis/Frame_No_' + str(frameNo) + 'Object_No_' + str(i)
            cv2.imwrite(fileName + '.jpg', final_img)
            i+=1
        '''

"\n# Keypoint matching visualization\nif frameNo in [317, 332, 350, 356, 357]:\n    final_img = cv2.drawMatchesKnn(target_obj_bw,target_obj_keypoints,current_obj_bw,current_obj_keypoints,good,None,flags=cv2.DrawMatchesFlags_NOT_DRAW_SINGLE_POINTS)\n    display(PIL.Image.fromarray(final_img))\n    print('FrameNo:',frameNo, ' ObjNo: ', i)\n    fileName = out_dir + 'Diagnosis/Frame_No_' + str(frameNo) + 'Object_No_' + str(i)\n    cv2.imwrite(fileName + '.jpg', final_img)\n    i+=1\n"

# Testing DeepOCSORT

In [8]:
#help(DeepOCSORT)

In [9]:
start_time = time.time()
tracemalloc.start()

In [13]:
tracker = DeepOCSORT(
    model_weights=Path('osnet_x0_25_msmt17.pt'), # which ReID model to use
    device=device,
    fp16=fp16
)
vid = cv2.VideoCapture(source)
writer = create_video_writer(vid, out_dir + "DeepOCSORT.mp4")

tracker_predictions = []

frame_no = 1
while True:       
    clear_output(wait=False)
    ret, im = vid.read()

    # if video frames end
    if not ret:
        break
    else:
        results = model.predict(im)[0]

    if np.array(results.boxes.data.tolist()).ndim < 2:
        continue

    new_results = findTargetObjectIndex(im, results, target_obj_img, frame_no, obj_dir='')
    try:
        ts = tracker.update(np.array(new_results.boxes.data.tolist()), im) # --> (x, y, x, y, id, conf, cls)
    except:
        ts = new_results
        
    xyxys = ts[:,0:4].astype('int') # float64 to int
    ids = ts[:, 4].astype('int') # float64 to int
    confs = np.round(ts[:, 5], 2)
    clss = ts[:, 6]

    # print bboxes with their associated id, cls and conf
    if ts.shape[0] != 0:
        for xyxy, id, conf, cls in zip(xyxys, ids, confs, clss):
            im = cv2.rectangle(
                im,
                (xyxy[0], xyxy[1]),
                (xyxy[2], xyxy[3]),
                color,
                thickness
            )
            cv2.putText(
                im,
                f'id: {id}, conf: {conf}, c: {cls}',
                (xyxy[0], xyxy[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX,
                fontscale,
                color,
                thickness
            )
            
            # save tracker predictions for benchmark 
            # <frame_no>, <tracke_obj_id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <conf>, -1 -1 -1
            # <frame_no>, <tracke_obj_id>, <x1>, <y1>, <x2-x1>, <y2-y1>, <conf>, -1 -1 -1
            pred = f"{frame_no}, {id}, {xyxy[0]}, {xyxy[1]}, {xyxy[2]-xyxy[0]}, {xyxy[2]-xyxy[0]}, {conf}, -1, -1, -1" 
            tracker_predictions.append(pred)
            frame_no += 1

    # show the frame to our screen

    #writer.write(im)


#vid.release()
#writer.release()

with open(out_dir + "DeepOCSORT_pred.txt", 'w') as mot_file:
    for pred in tracker_predictions:
            mot_file.write(pred + '\n')
            
            
print("Task Completed")

Task Completed


In [None]:
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

# Calculate the time taken
end_time = time.time()
execution_time = end_time - start_time

Execution_time = np.round(execution_time,2)
Current_memory = np.round(current / 1024 / 1024, 2)
Peak_memory = np.round(peak / 1024 / 1024, 2)

performance_DeepOCSORT = [Execution_time, Current_memory, Peak_memory]

print(f"Execution time: {Execution_time} seconds")
print(f"Current memory usage: {Current_memory} MB")
print(f"Peak memory usage: {Peak_memory} MB")

# Testing StrongSORT


In [None]:
#help(StrongSORT)

In [None]:
start_time = time.time()
tracemalloc.start()

In [10]:
tracker = StrongSORT(
    model_weights=Path('mobilenetv2_x1_4_dukemtmcreid.pt'), # which ReID model to use
    device=device,
    fp16=fp16,
)
tracker.n_init = 1
vid = cv2.VideoCapture(source)
writer = create_video_writer(vid, out_dir +"StrongSort.mp4")

tracker_predictions = []

frame_no = 1
while True:
    clear_output(wait=False)
    ret, im = vid.read()

    # if video frames end
    if not ret:
        break
    else:
        #im_resized = cv2.resize(im, (640, 640))
        results = model.predict(im)[0]

    if np.array(results.boxes.data.tolist()).ndim < 2:
        continue

    new_results = findTargetObjectIndex(im, results, target_obj_img, frame_no, obj_dir='')
    try:
        ts = tracker.update(np.array(new_results.boxes.data.tolist()), im) # --> (x, y, x, y, id, conf, cls)
        if (ts.size == 0):
            ts = np.array( [[0,0,0,0,0,0,0]] )
    except:
        ts = np.array( [[0,0,0,0,0,0,0]] )

    xyxys = ts[:,0:4].astype('int') # float64 to int
    ids = ts[:, 4].astype('int') # float64 to int
    confs = np.round(ts[:, 5])
    clss = ts[:, 6]

    # print bboxes with their associated id, cls and conf
    if ts.shape[0] != 0:
        for xyxy, id, conf, cls in zip(xyxys, ids, confs, clss):
            im = cv2.rectangle(
                im,
                (xyxy[0], xyxy[1]),
                (xyxy[2], xyxy[3]),
                color,
                thickness
            )
            cv2.putText(
                im,
                f'id: {id}, conf: {conf}, c: {cls}',
                (xyxy[0], xyxy[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX,
                fontscale,
                color,
                thickness
            )
            # save tracker predictions for benchmark 
            # <frame_no>, <tracke_obj_id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <conf>, -1 -1 -1
            # <frame_no>, <tracke_obj_id>, <x1>, <y1>, <x2-x1>, <y2-y1>, <conf>, -1 -1 -1
            pred = f"{frame_no}, {id}, {xyxy[0]}, {xyxy[1]}, {xyxy[2]-xyxy[0]}, {xyxy[2]-xyxy[0]}, {conf}, -1, -1, -1" 
            tracker_predictions.append(pred)
            frame_no += 1

    # show the frame to our screen
    
    #writer.write(im)


#vid.release()
#writer.release()

with open(out_dir + "StrongSORT_pred.txt", 'w') as mot_file:
    for pred in tracker_predictions:
            mot_file.write(pred + '\n')
            
print("Task Completed")

Task Completed


In [None]:
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

# Calculate the time taken
end_time = time.time()
execution_time = end_time - start_time

Execution_time = np.round(execution_time,2)
Current_memory = np.round(current / 1024 / 1024, 2)
Peak_memory = np.round(peak / 1024 / 1024, 2)
performance_StrongSORT = [Execution_time, Current_memory, Peak_memory]

print(f"Execution time: {Execution_time} seconds")
print(f"Current memory usage: {Current_memory} MB")
print(f"Peak memory usage: {Peak_memory} MB")

## Testing BoTSORT

In [None]:
#help(BoTSORT)

In [None]:
start_time = time.time()
tracemalloc.start()

In [11]:
from boxmot import BoTSORT
tracker = BoTSORT(
    model_weights=Path('osnet_x0_25_msmt17.pt'),
    device=device,
    fp16=fp16,
)
vid = cv2.VideoCapture(source)
writer = create_video_writer(vid, out_dir + "BoTSORT.mp4")

tracker_predictions = []

frame_no = 1
while True:
    clear_output(wait=False)
    ret, im = vid.read()

    # if video frames end
    if not ret:
        break
    else:
        #im_resized = cv2.resize(im, (640, 640))
        results = model.predict(im)[0]

    if np.array(results.boxes.data.tolist()).ndim < 2:
        continue

    new_results = findTargetObjectIndex(im, results, target_obj_img, frame_no, obj_dir='')
    try:
        ts = tracker.update(np.array(new_results.boxes.data.tolist()), im) # --> (x, y, x, y, id, conf, cls)
        if (ts.size == 0):
            ts = np.array( [[0,0,0,0,0,0,0]] )
    except:
        ts = np.array( [[0,0,0,0,0,0,0]] )
        

    xyxys = ts[:,0:4].astype('int') # float64 to int
    ids = ts[:, 4].astype('int') # float64 to int 
    confs = np.round(ts[:, 5])
    clss = ts[:, 6]

    # print bboxes with their associated id, cls and conf
    if ts.shape[0] != 0:
        for xyxy, id, conf, cls in zip(xyxys, ids, confs, clss):
            im = cv2.rectangle(
                im,
                (xyxy[0], xyxy[1]),
                (xyxy[2], xyxy[3]),
                color,
                thickness
            )
            cv2.putText(
                im,
                f'id: {id}, conf: {conf}, c: {cls}',
                (xyxy[0], xyxy[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX,
                fontscale,
                color,
                thickness
            )
            # save tracker predictions for benchmark 
            # <frame_no>, <tracke_obj_id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <conf>, -1 -1 -1
            # <frame_no>, <tracke_obj_id>, <x1>, <y1>, <x2-x1>, <y2-y1>, <conf>, -1 -1 -1
            pred = f"{frame_no}, {id}, {xyxy[0]}, {xyxy[1]}, {xyxy[2]-xyxy[0]}, {xyxy[2]-xyxy[0]}, {conf}, -1, -1, -1" 
            tracker_predictions.append(pred)
            frame_no += 1

    # show the frame to our screen
    
    #writer.write(im)


#vid.release()
#writer.release()

with open(out_dir + "BoTSORT_pred.txt", 'w') as mot_file:
    for pred in tracker_predictions:
            mot_file.write(pred + '\n')
            
print("Task Completed")

Task Completed


In [None]:
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

# Calculate the time taken
end_time = time.time()
execution_time = end_time - start_time

Execution_time = np.round(execution_time,2)
Current_memory = np.round(current / 1024 / 1024, 2)
Peak_memory = np.round(peak / 1024 / 1024, 2)

performance_BoTSORT = [Execution_time, Current_memory, Peak_memory]

print(f"Execution time: {Execution_time} seconds")
print(f"Current memory usage: {Current_memory} MB")
print(f"Peak memory usage: {Peak_memory} MB")

## Testing BYTETracker

In [None]:
#help(BYTETracker)

In [None]:
start_time = time.time()
tracemalloc.start()

In [12]:
from boxmot import BYTETracker
tracker =  BYTETracker()
vid = cv2.VideoCapture(source)
writer = create_video_writer(vid, out_dir+"BYTETracker.mp4")

tracker_predictions = []

frame_no = 1
while True:
    clear_output(wait=False)
    ret, im = vid.read()

    # if video frames end
    if not ret:
        break
    else:
        #im_resized = cv2.resize(im, (640, 640))
        results = model.predict(im)[0]

    if np.array(results.boxes.data.tolist()).ndim < 2:
        continue

    new_results = findTargetObjectIndex(im, results, target_obj_img, frame_no, obj_dir='')
    try:
        ts = tracker.update(np.array(new_results.boxes.data.tolist()), im) # --> (x, y, x, y, id, conf, cls)
        if (ts.size == 0):
            ts = np.array( [[0,0,0,0,0,0,0]] )
    except:
        ts = np.array( [[0,0,0,0,0,0,0]] )
        
        
    xyxys = ts[:,0:4].astype('int') # float64 to int
    ids = ts[:, 4].astype('int') # float64 to int
    confs = np.round(ts[:, 5])
    clss = ts[:, 6]

    # print bboxes with their associated id, cls and conf
    if ts.shape[0] != 0:
        for xyxy, id, conf, cls in zip(xyxys, ids, confs, clss):
            im = cv2.rectangle(
                im,
                (xyxy[0], xyxy[1]),
                (xyxy[2], xyxy[3]),
                color,
                thickness
            )
            cv2.putText(
                im,
                f'id: {id}, conf: {conf}, c: {cls}',
                (xyxy[0], xyxy[1]-10),
                cv2.FONT_HERSHEY_SIMPLEX,
                fontscale,
                color,
                thickness
            )
            # save tracker predictions for benchmark 
            # <frame_no>, <tracke_obj_id>, <bb_left>, <bb_top>, <bb_width>, <bb_height>, <conf>, -1 -1 -1
            # <frame_no>, <tracke_obj_id>, <x1>, <y1>, <x2-x1>, <y2-y1>, <conf>, -1 -1 -1
            pred = f"{frame_no}, {id}, {xyxy[0]}, {xyxy[1]}, {xyxy[2]-xyxy[0]}, {xyxy[2]-xyxy[0]}, {conf}, -1, -1, -1" 
            tracker_predictions.append(pred)
            frame_no += 1

    # show the frame to our screen
    
    #writer.write(im)


#vid.release()
#writer.release()

with open(out_dir + "ByteTracker_pred.txt", 'w') as mot_file:
    for pred in tracker_predictions:
            mot_file.write(pred + '\n')
            
print("Task Completed")

Task Completed


In [None]:
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()

# Calculate the time taken
end_time = time.time()
execution_time = end_time - start_time

Execution_time = np.round(execution_time,2)
Current_memory = np.round(current / 1024 / 1024, 2)
Peak_memory = np.round(peak / 1024 / 1024, 2)

performance_BYTETracker = [Execution_time, Current_memory, Peak_memory]

print(f"Execution time: {Execution_time} seconds")
print(f"Current memory usage: {Current_memory} MB")
print(f"Peak memory usage: {Peak_memory} MB")

In [None]:
import pandas as pd

In [None]:
df = pd.DataFrame(list(zip(performance_DeepOCSORT, performance_StrongSORT, performance_BoTSORT, performance_BYTETracker)),
                  columns = ['DeepOCSORT', 'StrongSORT', 'BoTSORT', 'BYTETracker'],
                  index = ['Execution_time(s)', 'Current_memory(MB)', 'Peak_memory(MB)'])

df.to_csv(out_dir + 'SimulationPerformance.csv')

In [None]:
import os
import shutil

for i in [4,5,6]:
    OUTPUT_NAME = f'/kaggle/working/TrackResultsTest{i}'
    DIRECTORY_TO_ZIP = f'/kaggle/working/Test{i}'
    
    print(DIRECTORY_TO_ZIP)
    shutil.make_archive(OUTPUT_NAME, 'zip', DIRECTORY_TO_ZIP)

### Tracker Comparisons
https://pramod-atre.medium.com/understanding-object-tracking-a-hands-on-approach-part-1-3fb1afd0ae46

In [1]:
!git clone https://github.com/JonathonLuiten/TrackEval.git


fatal: destination path 'TrackEval' already exists and is not an empty directory.
