In [None]:
!pip install collection
!pip install ultralytics

In [None]:
from collections import defaultdict
import cv2
import numpy as np
from ultralytics import YOLO

In [8]:
model = YOLO("yolo11l.pt")

video_path = "vietnam.mp4"
cap = cv2.VideoCapture(video_path)

Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolo11l.pt to 'yolo11l.pt'...


100%|██████████| 49.0M/49.0M [00:00<00:00, 194MB/s]


In [10]:
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))

# Create VideoWriter object
video_name = video_path.split("/")[-1]
output_path = f"run/{video_name.split('.')[0]}_tracked.mp4"
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

In [13]:
# Store the track history
track_history = defaultdict(lambda: [])

# Loop through the video frames
while cap.isOpened():
    success, frame = cap.read()

    if success:
        results = model.track(frame, persist=True, show=False)
        boxes = results[0].boxes.xywh.cpu()
        try:
            track_ids = results[0].boxes.id
            if track_ids is not None:
                track_ids = track_ids.int().cpu().tolist()
            else:
                track_ids = [] # No tracks found in this frame
        except AttributeError:
            track_ids = [] # Handle case where tracking fails

        # Visualize a result on the frame
        annotated_frame = results[0].plot()

        # Plot the tracks only if we have valid tracking data
        if track_ids:
            for box, track_id in zip(boxes, track_ids):
                x, y, w, h = box
                track = track_history[track_id]
                track.append((float(x), float(y)))

                if len(track) > 120:
                    track.pop(0)

                # Draw the tracking lines
                points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
                cv2.polylines(
                    annotated_frame,
                    [points],
                    isClosed=False,
                    color=(230, 230, 230),
                    thickness=4,
                )

        # Write the frame to output video
        out.write(annotated_frame)
    else:
        # Break the loop if the end of the video is reached
        break

cap.release()
out.release()
print(f"Video has been saved to {output_path}")


0: 384x640 19 persons, 6 cars, 26 motorcycles, 1 bus, 2 trucks, 1104.1ms
Speed: 5.7ms preprocess, 1104.1ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 19 persons, 5 cars, 25 motorcycles, 1 bus, 3 trucks, 1009.9ms
Speed: 4.0ms preprocess, 1009.9ms inference, 1.7ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 19 persons, 5 cars, 25 motorcycles, 1 bus, 4 trucks, 957.4ms
Speed: 4.6ms preprocess, 957.4ms inference, 2.2ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 18 persons, 5 cars, 25 motorcycles, 1 bus, 4 trucks, 1 backpack, 984.3ms
Speed: 4.6ms preprocess, 984.3ms inference, 1.5ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 18 persons, 5 cars, 25 motorcycles, 1 bus, 4 trucks, 1 backpack, 979.3ms
Speed: 4.5ms preprocess, 979.3ms inference, 1.8ms postprocess per image at shape (1, 3, 384, 640)

0: 384x640 18 persons, 5 cars, 27 motorcycles, 1 bus, 4 trucks, 1 backpack, 978.8ms
Speed: 4.0ms preprocess, 978.8ms

# Optimize Version

In [None]:
!pip install loguru

In [15]:
import argparse
from collections import defaultdict
import cv2
import numpy as np
from tqdm import tqdm
from ultralytics import YOLO
from loguru import logger


In [16]:
def load_config():
    """Load and return configuration settings"""
    return {
        "model_path": "yolo11x.pt",
        "track_history_length": 120,
        "batch_size": 64,
        "line_thickness": 4,
        "track_color": (230, 230, 230),
    }


def initialize_video(video_path):
    """Initialize video capture and writer objects"""
    cap = cv2.VideoCapture(video_path)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    video_name = video_path.split("/")[-1]
    output_path = f"run/{video_name.split('.')[0]}_tracked.mp4"
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    return cap, out, output_path, fps


In [17]:
def update_track_history(track_history, last_seen, track_ids, frame_count, batch_size, frame_idx, history_length):
    """Update tracking history and remove old tracks"""
    current_tracks = set(track_ids)
    for track_id in list(track_history.keys()):
        if track_id in current_tracks:
            last_seen[track_id] = frame_count - (batch_size - frame_idx - 1)
        elif frame_count - last_seen[track_id] > history_length:
            del track_history[track_id]
            del last_seen[track_id]

In [18]:
def draw_tracks(frame, boxes, track_ids, track_history, config):
    """Draw tracking lines on frame"""
    if not track_ids:
        return frame

    for box, track_id in zip(boxes, track_ids):
        x, y, w, h = box
        track = track_history[track_id]
        track.append((float(x), float(y)))
        if len(track) > config["track_history_length"]:
            track.pop(0)

        points = np.hstack(track).astype(np.int32).reshape((-1, 1, 2))
        cv2.polylines(
            frame,
            [points],
            isClosed=False,
            color=config["track_color"],
            thickness=config["line_thickness"],
        )
    return frame


In [19]:
def process_batch(model, batch_frames, track_history, last_seen, frame_count, config):
    """Process a batch of frames through YOLO model"""
    results = model.track(
        batch_frames,
        persist=True,
        tracker="botsort.yaml",
        show=False,
        verbose=False,
        iou=0.5,
    )

    processed_frames = []
    for frame_idx, result in enumerate(results):
        boxes = result.boxes.xywh.cpu()
        track_ids = (
            result.boxes.id.int().cpu().tolist() if result.boxes.id is not None else []
        )

        update_track_history(
            track_history,
            last_seen,
            track_ids,
            frame_count,
            len(batch_frames),
            frame_idx,
            config["track_history_length"],
        )

        annotated_frame = result.plot(font_size=4, line_width=2)
        annotated_frame = draw_tracks(
            annotated_frame, boxes, track_ids, track_history, config
        )
        processed_frames.append(annotated_frame)

    return processed_frames

In [22]:
def main(video_path):
    """Main function to process video"""
    CONFIG = load_config()
    model = YOLO(CONFIG.get("model_path", "yolo11x.pt"))

    cap, out, output_path, fps = initialize_video(video_path)
    track_history = defaultdict(lambda: [])
    last_seen = defaultdict(int)

    # Calculate frames for 3 seconds
    frames_to_process = int(fps * 3)

    with tqdm(
        total=frames_to_process,
        desc="Processing frames",
        colour="green",
    ) as pbar:
        frame_count = 0
        batch_frames = []

        while cap.isOpened() and frame_count < frames_to_process:
            success, frame = cap.read()
            if not success:
                break

            frame_count += 1
            batch_frames.append(frame)

            if (
                len(batch_frames) == CONFIG["batch_size"]
                or frame_count == frames_to_process
            ):
                try:
                    processed_frames = process_batch(
                        model,
                        batch_frames,
                        track_history,
                        last_seen,
                        frame_count,
                        CONFIG,
                    )
                    for frame in processed_frames:
                        out.write(frame)
                        pbar.update(1)
                    batch_frames = []

                except Exception as e:
                    logger.error(
                        f"Error processing frames {frame_count - len(batch_frames) + 1} to {frame_count}: {str(e)}"
                    )
                    batch_frames = []
                    continue

    try:
        cap.release()
        out.release()
        cv2.destroyAllWindows()
    except Exception as e:
        logger.error(f"Error during cleanup: {str(e)}")

    logger.info(f"\nVideo has been saved to {output_path}")


In [None]:
if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--video-path", type=str, default="samples/vietnam-2.mp4")
    args = parser.parse_args()

    main(args.video_path)

    video_path = "samples/vietnam-2.mp4"
    main(video_path)