In [1]:
import cv2
from filterpy.kalman import KalmanFilter
import numpy as np
import numpy as np
import re
import os
import torch
from ultralyticsplus import YOLO
import matplotlib.pyplot as plt
import math

Using Pytube download the Youtube Videos

In [2]:
from pytube import YouTube

def Download(link):
    youtubeObject = YouTube(link)
    youtubeObject = youtubeObject.streams.get_highest_resolution()
    youtubeObject.download()

In [3]:
link1 = "https://youtu.be/WeF4wpw7w9k"
Download(link1)
link2 = "https://youtu.be/2NFwY15tRtA"
Download(link2)
link3 = "https://youtu.be/5dRramZVu2Q"
Download(link3)

Define Kalman Filter

In [4]:
def initialize_kalman_filter():
    kf = KalmanFilter(dim_x=4, dim_z=2)
    kf.F = np.array([[1, 0, 1, 0],
                     [0, 1, 0, 1],
                     [0, 0, 1, 0],
                     [0, 0, 0, 1]])
    kf.H = np.array([[1, 0, 0, 0],
                     [0, 1, 0, 0]])
    kf.R *= 0.01
    kf.Q = np.eye(4) * 0.01
    return kf

Define YOLOv8 model that has been pretrained on Visdrone dataset for object detection

In [5]:
model = YOLO('mshamrai/yolov8s-visdrone')

model.overrides['conf'] = 0.15  
model.overrides['iou'] = 0.45  
model.overrides['agnostic_nms'] = False 
model.overrides['max_det'] = 1000

Now using the YOLOv8 model, we detect Pedestrian and Car classes in the videos. Using the detections we predict the next state of the object and predict the position using the Kalman filter. We draw a line from the current state to the predicted state whenever the object detector misses a detection.

Note: The object detector isnt able to detect certain frames even though there is a pedestrian due to the limitations of the model. The generated videos are the best representations that were generated using the Kalman Filter.

Generating Output for Video-1

To address false positives and improve object tracking with Kalman, we can do the following things:
1. Thresholding: Applying a confidence threshold similar to code below which will make sure the object detections with low confidence values are filtered out thereby removing false positives.
2. Kalman filter + Detections: We can combine the predictions from the Kalman filter with the detections from the object detector model which can help fine tune and filter out any noisy detections. We can update the filter's current state with predicted state only if the object's position is consistent with the prediction.
3. Remove Tracks: We can remove tracking for objects that dont have any updates after a threshold of frames
4. Confirm Tracks: Object's detection can be confirmed if and on if it is detected in a certain threshold of consecutive frames so that we know it was not an outlier thereby ensring no false positives

In [19]:
kalman_filters = {}   
trajectories = {}
flag = {}
class_id_dict = {}
bbox_track_id_dict = {}

video = cv2.VideoCapture('Cyclist and vehicle Tracking - 1.mp4')
count = -1
id_for_none = -5
flagger1 = 0
flagger2 = 0
frame_count=0
while video.isOpened():
    ret, frame = video.read()
    if not ret:
        break
    
    detections = model.predict(frame, classes = [0,3])
    

    for box in detections[0].boxes:
        if box.id is None:
            classid = int(box.cls.item())
            if classid == 0 and flagger1 == 0:
                id = count
                class_id_dict[0] = []
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                flagger1 = 1
            elif classid == 3 and flagger2 == 0:                
                id = count
                class_id_dict[3] = []
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                flagger2 = 1
            else:
                class_id = int(box.cls.item())
                found = 0
                for i in reversed(class_id_dict[class_id]):
                    distance = math.dist(i, box.xywh.numpy()[0][:2])
                    if distance < 40:
                        id = bbox_track_id_dict[tuple(i)]
                        found = 1
                        break
                if found == 0:
                    id = id_for_none
                    id_for_none -= 1
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                
                    
        else:
            id = int(box.id.item())
            class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
            bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id
        flag[id] = True

    for id in kalman_filters:
        if(flag[id] == False):            
            kalman_filters[id].predict()
            points = trajectories[id][-1]
            x_new, y_new = kalman_filters[id].x[:2]
            cv2.line(frame, (points[0][0].astype(int) ,points[1][0].astype(int)), (x_new[0].astype(int),y_new[0].astype(int)), (255,0,0), 3)
            
    for box in detections[0].boxes:
        bbox = box.xywh.numpy()
        if box.id is None:
            id = bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])]
        else:
            id = int(box.id.item())
        if id not in kalman_filters:
            kalman_filters[id] = initialize_kalman_filter()
            trajectories[id] = []
            
        kalman_filters[id].update(np.array([[bbox[0][0], bbox[0][1]]]))
        x1, y1 = int(bbox[0][0]), int(bbox[0][1])
        x2, y2 = int(bbox[0][0] + bbox[0][2]), int(bbox[0][1] + bbox[0][3])
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        label = "String"
        if int(box.cls.item()) == 0:
            label = f"Pedestrian, ID: {id}"
        else:
            label = f"Car, ID: {id}"
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        x, y, _, _ = kalman_filters[id].x
        trajectories[id].append([x,y])
        flag[id] = False
    
    frame_filename = f'FinalVideo1/frame_{frame_count:04d}.jpg'
    cv2.imwrite(frame_filename, frame)
    print(frame_count)
    frame_count += 1
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video.release()
cv2.destroyAllWindows()


0: 352x640 (no detections), 65.8ms
Speed: 1.7ms preprocess, 65.8ms inference, 0.3ms postprocess per image at shape (1, 3, 352, 640)
0

0: 352x640 (no detections), 63.5ms
Speed: 0.9ms preprocess, 63.5ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
1

0: 352x640 (no detections), 53.8ms
Speed: 1.3ms preprocess, 53.8ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
2

0: 352x640 (no detections), 52.8ms
Speed: 1.2ms preprocess, 52.8ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
3

0: 352x640 (no detections), 54.0ms
Speed: 1.0ms preprocess, 54.0ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
4

0: 352x640 (no detections), 51.8ms
Speed: 1.0ms preprocess, 51.8ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
5

0: 352x640 (no detections), 53.3ms
Speed: 1.1ms preprocess, 53.3ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
6

0: 352x640 (no detections), 57.5ms
Speed: 1.0ms prepro

In [21]:
kalman_filters = {}   
trajectories = {}
flag = {}
class_id_dict = {}
bbox_track_id_dict = {}

video = cv2.VideoCapture('Cyclist and vehicle Tracking - 2.mp4')
count = -1
id_for_none = -5
flagger1 = 0
flagger2 = 0
frame_count=0
while video.isOpened():
    ret, frame = video.read()
    if not ret:
        break
    
    detections = model.predict(frame, classes = [0,3])
    

    for box in detections[0].boxes:
        if box.id is None:
            classid = int(box.cls.item())
            if classid == 0 and flagger1 == 0:
                id = count
                class_id_dict[0] = []
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                flagger1 = 1
            elif classid == 3 and flagger2 == 0:                
                id = count
                class_id_dict[3] = []
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                flagger2 = 1
            else:
                class_id = int(box.cls.item())
                found = 0
                for i in reversed(class_id_dict[class_id]):
                    distance = math.dist(i, box.xywh.numpy()[0][:2])
                    if distance < 40:
                        id = bbox_track_id_dict[tuple(i)]
                        found = 1
                        break
                if found == 0:
                    id = id_for_none
                    id_for_none -= 1
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                
                    
        else:
            id = int(box.id.item())
            class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
            bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id
        flag[id] = True

    for id in kalman_filters:
        if(flag[id] == False):            
            kalman_filters[id].predict()
            points = trajectories[id][-1]
            x_new, y_new = kalman_filters[id].x[:2]
            cv2.line(frame, (points[0][0].astype(int) ,points[1][0].astype(int)), (x_new[0].astype(int),y_new[0].astype(int)), (255,0,0), 3)
            
    for box in detections[0].boxes:
        bbox = box.xywh.numpy()
        if box.id is None:
            id = bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])]
        else:
            id = int(box.id.item())
        if id not in kalman_filters:
            kalman_filters[id] = initialize_kalman_filter()
            trajectories[id] = []
            
        kalman_filters[id].update(np.array([[bbox[0][0], bbox[0][1]]]))
        x1, y1 = int(bbox[0][0]), int(bbox[0][1])
        x2, y2 = int(bbox[0][0] + bbox[0][2]), int(bbox[0][1] + bbox[0][3])
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        label = "String"
        if int(box.cls.item()) == 0:
            label = f"Pedestrian, ID: {id}"
        else:
            label = f"Car, ID: {id}"
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        x, y, _, _ = kalman_filters[id].x
        trajectories[id].append([x,y])
        flag[id] = False
    
    frame_filename = f'FinalVideo2/frame_{frame_count:04d}.jpg'
    cv2.imwrite(frame_filename, frame)
    print(frame_count)
    frame_count += 1
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video.release()
cv2.destroyAllWindows()


0: 352x640 1 pedestrian, 65.0ms
Speed: 1.4ms preprocess, 65.0ms inference, 0.3ms postprocess per image at shape (1, 3, 352, 640)
0

0: 352x640 1 pedestrian, 77.2ms
Speed: 1.0ms preprocess, 77.2ms inference, 0.4ms postprocess per image at shape (1, 3, 352, 640)
1

0: 352x640 1 pedestrian, 70.4ms
Speed: 1.3ms preprocess, 70.4ms inference, 0.5ms postprocess per image at shape (1, 3, 352, 640)
2

0: 352x640 1 pedestrian, 88.0ms
Speed: 1.3ms preprocess, 88.0ms inference, 0.5ms postprocess per image at shape (1, 3, 352, 640)
3

0: 352x640 1 pedestrian, 77.3ms
Speed: 1.1ms preprocess, 77.3ms inference, 0.3ms postprocess per image at shape (1, 3, 352, 640)
4

0: 352x640 1 pedestrian, 74.2ms
Speed: 1.2ms preprocess, 74.2ms inference, 0.4ms postprocess per image at shape (1, 3, 352, 640)
5

0: 352x640 (no detections), 70.6ms
Speed: 1.1ms preprocess, 70.6ms inference, 0.2ms postprocess per image at shape (1, 3, 352, 640)
6

0: 352x640 (no detections), 74.0ms
Speed: 1.4ms preprocess, 74.0ms infer

In [22]:
kalman_filters = {}   
trajectories = {}
flag = {}
class_id_dict = {}
bbox_track_id_dict = {}

video = cv2.VideoCapture('Drone Tracking Video.mp4')
count = -1
id_for_none = -5
flagger1 = 0
flagger2 = 0
frame_count=0
while video.isOpened():
    ret, frame = video.read()
    if not ret:
        break
    
    detections = model.predict(frame, classes = [0,3])
    

    for box in detections[0].boxes:
        if box.id is None:
            classid = int(box.cls.item())
            if classid == 0 and flagger1 == 0:
                id = count
                class_id_dict[0] = []
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                flagger1 = 1
            elif classid == 3 and flagger2 == 0:                
                id = count
                class_id_dict[3] = []
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                flagger2 = 1
            else:
                class_id = int(box.cls.item())
                found = 0
                for i in reversed(class_id_dict[class_id]):
                    distance = math.dist(i, box.xywh.numpy()[0][:2])
                    if distance < 40:
                        id = bbox_track_id_dict[tuple(i)]
                        found = 1
                        break
                if found == 0:
                    id = id_for_none
                    id_for_none -= 1
                bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id
                class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                
                    
        else:
            id = int(box.id.item())
            class_id_dict[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
            bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id
        flag[id] = True

    for id in kalman_filters:
        if(flag[id] == False):            
            kalman_filters[id].predict()
            points = trajectories[id][-1]
            x_new, y_new = kalman_filters[id].x[:2]
            cv2.line(frame, (points[0][0].astype(int) ,points[1][0].astype(int)), (x_new[0].astype(int),y_new[0].astype(int)), (255,0,0), 3)
            
    for box in detections[0].boxes:
        bbox = box.xywh.numpy()
        if box.id is None:
            id = bbox_track_id_dict[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])]
        else:
            id = int(box.id.item())
        if id not in kalman_filters:
            kalman_filters[id] = initialize_kalman_filter()
            trajectories[id] = []
            
        kalman_filters[id].update(np.array([[bbox[0][0], bbox[0][1]]]))
        x1, y1 = int(bbox[0][0]), int(bbox[0][1])
        x2, y2 = int(bbox[0][0] + bbox[0][2]), int(bbox[0][1] + bbox[0][3])
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        label = "String"
        if int(box.cls.item()) == 0:
            label = f"Pedestrian, ID: {id}"
        else:
            label = f"Car, ID: {id}"
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
        x, y, _, _ = kalman_filters[id].x
        trajectories[id].append([x,y])
        flag[id] = False
    
    frame_filename = f'FinalVideo3/frame_{frame_count:04d}.jpg'
    cv2.imwrite(frame_filename, frame)
    print(frame_count)
    frame_count += 1
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

video.release()
cv2.destroyAllWindows()


0: 384x640 1 pedestrian, 68.5ms
Speed: 1.8ms preprocess, 68.5ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
0

0: 384x640 1 pedestrian, 60.6ms
Speed: 0.9ms preprocess, 60.6ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
1

0: 384x640 1 pedestrian, 62.1ms
Speed: 0.9ms preprocess, 62.1ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
2

0: 384x640 (no detections), 58.2ms
Speed: 0.8ms preprocess, 58.2ms inference, 0.2ms postprocess per image at shape (1, 3, 384, 640)
3

0: 384x640 1 pedestrian, 58.7ms
Speed: 1.0ms preprocess, 58.7ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
4

0: 384x640 1 pedestrian, 59.9ms
Speed: 1.0ms preprocess, 59.9ms inference, 0.3ms postprocess per image at shape (1, 3, 384, 640)
5

0: 384x640 3 pedestrians, 60.6ms
Speed: 1.0ms preprocess, 60.6ms inference, 0.8ms postprocess per image at shape (1, 3, 384, 640)
6

0: 384x640 2 pedestrians, 60.7ms
Speed: 0.9ms preprocess, 60.7ms infere