In [1]:
# !pip install ultralytics



In [2]:
from __future__ import annotations

import argparse
import os
import sys
import time
from typing import Tuple, List

import cv2
import numpy as np
import torch
from tqdm import tqdm
from ultralytics import YOLO

In [3]:
def choose_device() -> str:
    """
    Автоматически выбираем устройство: 'cuda' если доступен GPU, иначе 'cpu'.
    """
    return "cuda" if torch.cuda.is_available() else "cpu"

In [4]:
def alpha_rectangle(
    img: np.ndarray,
    xyxy: Tuple[int, int, int, int],
    color: Tuple[int, int, int],
    alpha: float = 0.35,
    thickness: int = 2,
) -> None:
    """
    Рисует полупрозрачный прямоугольник на изображении (in-place).
    Используется для того, чтобы рамки не полностью перекрывали картинку.
    """
    x1, y1, x2, y2 = xyxy
    overlay = img.copy()
    cv2.rectangle(overlay, (x1, y1), (x2, y2), color, -1)
    cv2.addWeighted(overlay, alpha, img, 1 - alpha, 0, img)
    cv2.rectangle(img, (x1, y1), (x2, y2), color, thickness)

In [5]:
def draw_label(
    img: np.ndarray,
    text: str,
    topleft: Tuple[int, int],
    bg_color: Tuple[int, int, int] = (0, 0, 0),
    text_color: Tuple[int, int, int] = (255, 255, 255),
    font_scale: float = 0.45,
    thickness: int = 1,
) -> None:
    """
    Отписовка подписи (фон + текст) над объектом.
    """
    x, y = topleft
    font = cv2.FONT_HERSHEY_SIMPLEX
    ((w, h), _) = (cv2.getTextSize(text, font, font_scale, thickness)[0], None)
    cv2.rectangle(img, (x, y - h - 6), (x + w + 6, y), bg_color, -1)
    cv2.putText(img, text, (x + 3, y - 4), font, font_scale, text_color, thickness, cv2.LINE_AA)

In [6]:
def process_video(
    input_path: str,
    output_path: str,
    model_name: str = "yolov8n.pt",
    conf_thresh: float = 0.3,
    iou_thresh: float = 0.45,
    device: str | None = None,
    class_filter: List[int] | None = None,
) -> None:
    """
    Обрабатывает видео: делает детекцию и сохраняет видео с отрисовкой.

    Args:
        input_path: путь к входному видео (например, 'crowd.mp4').
        output_path: путь для сохранения обработанного видео.
        model_name: имя модели/весов для ultralytics YOLO (по умолчанию 'yolov8n.pt').
                    Можно использовать 'yolov8n', 'yolov8s', путь к локальным весам и т.д.
        conf_thresh: порог confidence для фильтрации детекций.
        iou_thresh: порог NMS iou.
        device: 'cuda' или 'cpu' или None для автоподбора.
        class_filter: список id классов (COCO) которые нужно оставлять. Если None — все.
                      Для людей по COCO: [0]
    """
    if device is None:
        device = choose_device()

    if not os.path.exists(input_path):
        raise FileNotFoundError(f"Input video not found: {input_path}")

    print(f"Loading model {model_name} on device {device} ...")
    model = YOLO(model_name)
    model.to(device)

    cap = cv2.VideoCapture(input_path)
    if not cap.isOpened():
        raise RuntimeError("Не удалось открыть входное видео.")

    fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
    w = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    h = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")
    out = cv2.VideoWriter(output_path, fourcc, fps, (w, h))

    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) or None
    pbar = tqdm(total=total_frames, desc="Processing frames", unit="frame")

    frame_idx = 0
    start_time = time.time()

    # Обрабатываем по кадру (легко читать лог и видеть прогресс видео).
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        results = model.predict(
            frame,
            conf=conf_thresh,
            iou=iou_thresh,
            device=device,
            verbose=False,
            imgsz=max(w, h),
        )

        # results — list of Results (по кадрам); здесь один элемент
        res = results[0]

        # Получаем bounding boxes, classes и confidences
        boxes = getattr(res.boxes, "xyxy", None)
        confs = getattr(res.boxes, "conf", None)
        cls_ids = getattr(res.boxes, "cls", None)

        # Если нет детекций — просто записываем исходный кадр
        if boxes is None or len(boxes) == 0:
            out.write(frame)
            pbar.update(1)
            frame_idx += 1
            continue

        # Конвертируем в numpy
        boxes = boxes.cpu().numpy().astype(int)
        confs = confs.cpu().numpy()
        cls_ids = cls_ids.cpu().numpy().astype(int)

        # Нарисуем все объекты, которые проходят фильтр классов (если задан)
        for (bb, conf, cid) in zip(boxes, confs, cls_ids):
            if class_filter is not None and cid not in class_filter:
                continue

            x1, y1, x2, y2 = int(bb[0]), int(bb[1]), int(bb[2]), int(bb[3])

            color = (19, 142, 255)

            alpha_rectangle(frame, (x1, y1, x2, y2), color=color, alpha=0.25, thickness=2)

            label = f"person {conf:.2f}"
            text_y = y1 if y1 - 12 > 10 else y2 + 12
            draw_label(frame, label, (x1, text_y), bg_color=(0, 0, 0), text_color=(255, 255, 255))

        out.write(frame)
        pbar.update(1)
        frame_idx += 1

    elapsed = time.time() - start_time
    pbar.close()
    cap.release()
    out.release()

    print(f"Done. Processed {frame_idx} frames in {elapsed:.1f}s ({frame_idx/elapsed:.2f} FPS).")
    print(f"Saved output to: {output_path}")

In [7]:
if __name__ == "__main__":

    input_video_path = "crowd.mp4"
    output_video_path = "crowd_out.mp4"

    class_filter = [0]

    print(f"Starting detection on {input_video_path}...")
    process_video(
        input_path=input_video_path,
        output_path=output_video_path,
        model_name="yolov8n.pt",
        conf_thresh=0.45,
        iou_thresh=0.45,
        device=None,
        class_filter=class_filter,
    )

Starting detection on crowd.mp4...
Loading model yolov8n.pt on device cuda ...


Processing frames: 100%|██████████| 705/705 [01:28<00:00,  7.97frame/s]

Done. Processed 705 frames in 88.4s (7.97 FPS).
Saved output to: crowd_out.mp4



