In [None]:
%pip install pytube

In [None]:
%pip install opencv-python

In [None]:
%pip install ultralytics ultralyticsplus

In [None]:
%pip install filterpy

In [None]:
from pytube import YouTube
import cv2
from filterpy.kalman import KalmanFilter
import numpy as np
from ultralyticsplus import YOLO
import math

In [None]:
def download_videos(video_urls, save_path):
    try:
        for idx, url in enumerate(video_urls):
            yt = YouTube(url)
            yt.streams.filter(res='720p', file_extension='mp4').first().download(output_path=save_path, filename=f'video_{idx+1}.mp4')
            print(f'Downloaded video: {yt.title}')
    except Exception as e:
        print(f"Error downloading video: {url}, Error: {e}")

In [None]:
video_urls = [
    'https://youtu.be/WeF4wpw7w9k',
    'https://youtu.be/2NFwY15tRtA',
    'https://youtu.be/5dRramZVu2Q'
]

output_path = './videos'
download_videos(video_urls, output_path)

input_file_1 = "./videos/video_1.mp4"
input_file_2 = "./videos/video_2.mp4"
input_file_3 = "./videos/video_3.mp4"

in_files = [input_file_1, input_file_2, input_file_3]

In [None]:
for idx, file in enumerate(in_files):
    cap = cv2.VideoCapture(file)
    length = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width  = cap.get(cv2.CAP_PROP_FRAME_WIDTH)   
    height = cap.get(cv2.CAP_PROP_FRAME_HEIGHT)

    print(f"Video_{idx+1} Details")
    print("Number of frames = ", length)
    print("Width = ", width)
    print("Height = ", height)
    print("\n\n")

    cap.release()


In [None]:
def _init_kalman_filter():
    kf = KalmanFilter(dim_x=4, dim_z=2)
    dt = 1
    kf.F = np.array([[1, 0, dt, 0],
                    [0, 1, 0, dt],
                    [0, 0, 1, 0],
                    [0, 0, 0, 1]])
    
    kf.H = np.array([[1, 0, 0, 0],
                    [0, 1, 0, 0]])

    kf.P *= 1000.
    
    kf.R *= 0.01
    
    kf.Q = np.eye(4) * 0.01
    
    return kf

In [None]:
model = YOLO('mshamrai/yolov8s-visdrone')

# set model parameters
model.overrides['conf'] = 0.15 # NMS confidence threshold
model.overrides['iou'] = 0.45 # NMS IoU threshold

In [None]:
cap = cv2.VideoCapture(input_file_2)
frame_width = int(cap.get(3))
frame_height = int(cap.get(4))
frame_size = (frame_width,frame_height)
fps = 30

output = cv2.VideoWriter("det_vid_2.mp4", cv2.VideoWriter_fourcc('m', 'p', '4', 'v'), fps, frame_size)

filter_tracker = {}
history_tracker = {}
flag = {}
bbox_class_id = {}
bbox_track_id = {}
count = -1
none_id = -10

f1 = False
f2 = False

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Perform object detection on frame
    detections = model.predict(frame, classes = [0,3])
    for box in detections[0].boxes:
        if box.id is None:
            classid = int(box.cls.item())
            if classid == 0 and f1 == False:
                track_id = count
                bbox_class_id[0] = []
                bbox_class_id[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                f1 = True
            elif classid == 3 and f2 == False:
                track_id = count
                bbox_class_id[3] = []
                bbox_class_id[int(box.cls.item())].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = count
                count -= 1
                f2 = True
            else:
                class_id = int(box.cls.item())
                obj_flag = 0
                for pt in reversed(bbox_class_id[class_id]):
                    distance = math.dist(pt, box.xywh.numpy()[0][:2])
                    if distance < 50:
                        obj_flag = 1
                        track_id = bbox_track_id[tuple(pt)]
                        break
                    if obj_flag == 0:
                        track_id = none_id
                        none_id -= 10
                bbox_class_id[class_id].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
                bbox_track_id[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = track_id

        else:
            track_id = int(box.id.item())
            class_id = int(box.cls.item())
            bbox_class_id[class_id].append([box.xywh.numpy()[0][0], box.xywh.numpy()[0][1]])
            bbox_track_id[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])] = id

        flag[track_id] = True

    for track_id in filter_tracker:
        if(flag[track_id] == False):
            filter_tracker[track_id].predict()
            pt = history_tracker[track_id][-1]
            x_pred, y_pred = filter_tracker[track_id].x[:2]
            cv2.line(frame, (pt[0][0].astype(int), pt[1][0].astype(int)), (x_pred[0].astype(int), y_pred[0].astype(int)), (0,255,0), 3)
        
    for box in detections[0].boxes:
        bbox = box.xywh.numpy()
        if box.id is None:
            track_id = bbox_track_id[(box.xywh.numpy()[0][0], box.xywh.numpy()[0][1])]
        else:
            track_id = int(box.id.item())
        if track_id not in filter_tracker:
            filter_tracker[track_id] = _init_kalman_filter()
            history_tracker[track_id] = []

        filter_tracker[track_id].update(np.array([[bbox[0][0], bbox[0][1]]]))

        x, y, _, _ = filter_tracker[track_id].x
        history_tracker[track_id].append([x,y])
        flag[track_id] = False

        x1, y1 = int(bbox[0][0]), int(bbox[0][1])
        x2, y2 = int(bbox[0][0] + bbox[0][2]), int(bbox[0][1] + bbox[0][3])
        cv2.rectangle(frame, (x1, y1), (x2, y2), (255, 0, 0), 2)

        label = ""
        if int(box.cls.item()) == 0:
            label = f"Pedestrian, ID: {track_id}"
        else:
            label = f"Car, ID: {track_id}"
        cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

    output.write(frame)
    
    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
output.release()
cv2.destroyAllWindows()

To answer the question about how we can address false positives, we can employ a couple of startegies such as tweaking the confidence threshold and the IoU threshold. A higher IoU threshold would lead to fewer false positives. 

The kalman filter can analyze the history of the IoU scores for the detections. If a certain track has a consistent stream of low scores, it can be flagged as a false positive and removed.