In [1]:
!python --version

Python 3.12.4


In [2]:
import ultralytics
import torch

In [3]:
import cv2
import time
from queue import Queue
from threading import Thread
from concurrent.futures import ThreadPoolExecutor, as_completed
from abc import ABC, abstractmethod
import json
from ultralytics import YOLO
import numpy as np
import os
from tqdm import tqdm
import threading

In [4]:
from dataclasses import dataclass, field
from typing import List, Dict, Any, Tuple, Callable

In [5]:
torch.cuda.get_device_name(0)

'NVIDIA GeForce GTX 1650'

In [6]:
# CONSTANTS
YOLO_MODEL_PATH = "models/yolo/yolov8n.pt"
YOLO_CONFIDENCE_THRESHOLD = 0.5
MAX_QUEUE_SIZE = 30
# Class indices for person, car, truck, bus, and motorcycle in COCO dataset
TARGET_CLASSES = [0, 2, 7, 5, 3]

In [7]:
INPUT_VIDEO_PATH = "data/videos/Rec16-1_trimmed.mp4"
# OUTPUT_VIDEO_PATH = "output/videos/Rec16-1-yolo_trimmed_final.mp4"
INPUT_TIMESTAMP_PATH = "output/timestamps/Rec16-1_trimmed.txt"
# JSON_OUTPUT_PATH = "output/json/Rec16-1_trimmed_yolo_final.json"

In [8]:
@dataclass
class VideoInfo:
    width: int
    height: int
    fps: int

In [9]:
@dataclass
class ProcessConfig:
    input_path: str
    video_id: str
    timestamps_path: str
    max_queue_size: int
    output_dir: str
    output_video_path: str
    output_json_path: str
    total_frames: int = 0

In [10]:
@dataclass
class YOLOConfig:
    model_path: str
    confidence_threshold: float
    target_classes: List[int]
    model: Any
    timestamps: List[str]

In [11]:
@dataclass
class YOLOProcessConfig:
    # Fields from ProcessConfig
    input_path: str
    video_id: str
    timestamps_path: str
    max_queue_size: int
    output_dir: str
    output_video_path: str
    output_json_path: str
    total_frames: int = 0
    
    # YOLO-specific field
    yolo_config: YOLOConfig = field(default_factory=dict)

In [12]:
class ProgressTracker:
    def __init__(self, total, desc):
        self.total = total
        self.desc = desc
        self.lock = threading.Lock()
        self.pbar = None

    def start(self):
        self.pbar = tqdm(total=self.total, desc=self.desc, position=1, leave=False)

    def update(self, n=1):
        with self.lock:
            if self.pbar:
                self.pbar.update(n)

    def close(self):
        if self.pbar:
            self.pbar.close()


In [13]:
class FileIO:
    @staticmethod
    def read_lines(file_path: str) -> List[str]:
        with open(file_path, 'r') as f:
            return [line.strip() for line in f]

    @staticmethod
    def read_json(file_path: str) -> Dict[str, Any]:
        with open(file_path, 'r') as f:
            return json.load(f)

    @staticmethod
    def write_json(file_path: str, data: Dict[str, Any]):
        with open(file_path, 'w') as f:
            json.dump(data, f, indent=2)

In [14]:
class VideoProcessor(ABC):
    def __init__(self, output_dir: str, default_max_queue_size: int = 30, max_workers: int = None):
        self.output_dir = output_dir
        self.default_max_queue_size = default_max_queue_size
        self.max_workers = max_workers
        self.text_reader = FileIO()

    @abstractmethod
    def create_process_config(self, video_config: Dict[str, Any]) -> Any:
        pass

    @abstractmethod
    def _process_frames(self, frame_queue: Queue, result_queue: Queue, process_config: Any, results_dict: Dict[str, Any], progress_callback: Callable[[int], None]):
        pass

    @abstractmethod
    def _post_process(self, result_queue: Queue, output_queue: Queue, progress_callback: Callable[[int], None]):
        pass

    def process_single_video(self, process_config: Any, pbar: tqdm):
        cap, video_info = self._initialize_video_capture(process_config.input_path)
        process_config.total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        queues = self._create_queues(process_config.max_queue_size)
        results_dict: Dict[str, Any] = {}

        pbar.total = process_config.total_frames
        pbar.set_description(f"Processing {process_config.video_id}")
        pbar.reset()

        def progress_callback(n):
            pbar.update(n)

        threads = self._create_and_start_threads(cap, queues, process_config, results_dict, video_info, progress_callback)
        self._join_threads(threads)

        self._save_results(process_config.output_json_path, results_dict)

        cap.release()
        cv2.destroyAllWindows()



    
    # def process_videos(self, video_configs: List[Dict[str, Any]]):
    #     with tqdm(total=len(video_configs), desc="Processing videos", position=0, leave=True) as pbar:
    #         with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
    #             futures = [executor.submit(self.process_single_video, self.create_process_config(video_config)) 
    #                        for video_config in video_configs]
    #             for future in as_completed(futures):
    #                 try:
    #                     future.result()
    #                     pbar.update(1)
    #                 except Exception as e:
    #                     print(f"An error occurred: {str(e)}")
    def process_videos(self, video_configs: List[Dict[str, Any]]):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            futures = []
            progress_bars = []
            
            for video_config in video_configs:
                config = self.create_process_config(video_config)
                pbar = tqdm(total=0, desc=f"Initializing {config.video_id}", position=len(progress_bars), leave=True)
                progress_bars.append(pbar)
                futures.append(executor.submit(self.process_single_video, config, pbar))
            
            for future in as_completed(futures):
                try:
                    future.result()
                except Exception as e:
                    print(f"An error occurred: {str(e)}")
            
            for pbar in progress_bars:
                pbar.close()

    def _initialize_video_capture(self, input_path: str) -> Tuple[cv2.VideoCapture, VideoInfo]:
        cap = cv2.VideoCapture(input_path)
        video_info = VideoInfo(
            width=int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
            height=int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
            fps=int(cap.get(cv2.CAP_PROP_FPS))
        )
        return cap, video_info

    def _create_queues(self, max_queue_size: int) -> Dict[str, Queue]:
        return {
            'frame': Queue(maxsize=max_queue_size),
            'result': Queue(maxsize=max_queue_size),
            'output': Queue(maxsize=max_queue_size)
        }

    def _create_and_start_threads(self, cap: cv2.VideoCapture, queues: Dict[str, Queue], 
                                  process_config: Any, results_dict: Dict[str, Any], 
                                  video_info: VideoInfo, progress_callback: Callable[[int], None]) -> List[Thread]:
        threads = [
            Thread(target=self._read_frames, args=(cap, queues['frame'], process_config.max_queue_size)),
            Thread(target=self._process_frames, args=(queues['frame'], queues['result'], process_config, results_dict, progress_callback)),
            Thread(target=self._post_process, args=(queues['result'], queues['output'], progress_callback)),
            Thread(target=self._write_video, args=(process_config.output_video_path, queues['output'], 
                                                   video_info.fps, video_info.width, video_info.height))
        ]
        for thread in threads:
            thread.start()
        return threads


    def _join_threads(self, threads: List[Thread]):
        for thread in threads:
            thread.join()

    def _read_frames(self, cap: cv2.VideoCapture, frame_queue: Queue, max_queue_size: int):
        while True:
            if frame_queue.qsize() < max_queue_size:
                ret, frame = cap.read()
                if not ret:
                    break
                frame_queue.put(frame)
            else:
                time.sleep(0.1)
        frame_queue.put(None)

    def _write_video(self, output_path: str, output_queue: Queue, fps: int, width: int, height: int):
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        while True:
            frame = output_queue.get()
            if frame is None:
                break
            out.write(cv2.cvtColor(frame, cv2.COLOR_RGB2BGR))
        
        out.release()

    def _save_results(self, output_json_path: str, results_dict: Dict[str, Any]):
        self.text_reader.write_json(output_json_path, results_dict)
    
    def get_unique_output_dir(self, base_path: str) -> str:
        if not os.path.exists(base_path):
            return base_path
        
        counter = 1
        while True:
            new_path = f"{base_path}_copy{counter}"
            if not os.path.exists(new_path):
                return new_path
            counter += 1

In [15]:
class YOLOProcessor(VideoProcessor):
    def __init__(self, output_dir: str, default_yolo_model_path: str = YOLO_MODEL_PATH, 
                 default_confidence_threshold: float = YOLO_CONFIDENCE_THRESHOLD, 
                 default_target_classes: List[int] = TARGET_CLASSES,
                 default_max_queue_size: int = MAX_QUEUE_SIZE, max_workers: int = None):
        super().__init__(output_dir, default_max_queue_size, max_workers)
        self.default_yolo_model_path = default_yolo_model_path
        self.default_confidence_threshold = default_confidence_threshold
        self.default_target_classes = default_target_classes
        self.yolo_models: Dict[str, Any] = {}

    def get_yolo_model(self, model_path: str) -> Any:
        if model_path not in self.yolo_models:
            self.yolo_models[model_path] = YOLO(model_path)
        return self.yolo_models[model_path]

    def create_process_config(self, video_config: Dict[str, Any]) -> YOLOProcessConfig:
        base_config = self._create_base_config(video_config)
        output_paths = self._create_output_paths(base_config)
        yolo_config = self._create_yolo_config(video_config)
        
        return YOLOProcessConfig(
            input_path=base_config.input_path,
            video_id=base_config.video_id,
            timestamps_path=base_config.timestamps_path,
            max_queue_size=base_config.max_queue_size,
            output_dir=output_paths['output_dir'],
            output_video_path=output_paths['output_video_path'],
            output_json_path=output_paths['output_json_path'],
            total_frames=0,  # Will be set later
            yolo_config=yolo_config
        )

    def _create_base_config(self, video_config: Dict[str, Any]) -> ProcessConfig:
        return ProcessConfig(
            input_path=video_config['input_path'],
            video_id=video_config['video_id'],
            timestamps_path=video_config['timestamps_path'],
            max_queue_size=video_config.get('max_queue_size', self.default_max_queue_size),
            output_dir='',  # Will be set in _create_output_paths
            output_video_path='',  # Will be set in _create_output_paths
            output_json_path='',  # Will be set in _create_output_paths
            total_frames=0  # Will be set later
        )

    def _create_output_paths(self, config: ProcessConfig) -> Dict[str, str]:
        base_output_dir = os.path.join(self.output_dir, config.video_id)
        output_dir = self.get_unique_output_dir(base_output_dir)
        os.makedirs(output_dir, exist_ok=True)
        return {
            'output_dir': output_dir,
            'output_video_path': os.path.join(output_dir, f"output_{config.video_id}.mp4"),
            'output_json_path': os.path.join(output_dir, f"output_{config.video_id}.json"),
        }

    def _create_yolo_config(self, video_config: Dict[str, Any]) -> YOLOConfig:
        yolo_model_path = video_config.get('yolo_model_path', self.default_yolo_model_path)
        model = self.get_yolo_model(yolo_model_path)
        
        # Disable console output for the model
        model.verbose = False
        
        return YOLOConfig(
            model_path=yolo_model_path,
            confidence_threshold=video_config.get('confidence_threshold', self.default_confidence_threshold),
            target_classes=video_config.get('target_classes', self.default_target_classes),
            model=model,
            timestamps=self.text_reader.read_lines(video_config['timestamps_path'])
        )

    def _process_frames(self, frame_queue: Queue, result_queue: Queue, process_config: YOLOProcessConfig, results_dict: Dict[str, Any], progress_callback: Callable[[int], None]):
        frame_index = 0
        while True:
            frame = frame_queue.get()
            if frame is None:
                break
            
            results = self._run_yolo_inference(frame, process_config.yolo_config)
            frame_results = self._process_yolo_results(results, process_config.yolo_config.model)
            
            timestamp = self._get_timestamp(frame_index, process_config.yolo_config.timestamps)
            results_dict[timestamp] = frame_results
            result_queue.put((frame, results))
            frame_index += 1
            progress_callback(1)  # Update progress by 1 frame
        
        result_queue.put(None)

    def _run_yolo_inference(self, frame: np.ndarray, yolo_config: YOLOConfig) -> Any:
        return yolo_config.model(frame,
                                 classes=yolo_config.target_classes,
                                 conf=yolo_config.confidence_threshold, verbose=False)

    def _process_yolo_results(self, results: Any, yolo_model: Any) -> List[Dict[str, Any]]:
        frame_results = []
        for det in results[0].boxes.data:
            x1, y1, x2, y2, conf, cls = det.tolist()
            frame_results.append({
                "class": yolo_model.names[int(cls)],
                "confidence": conf,
                "bbox": [x1, y1, x2, y2]
            })
        return frame_results

    def _get_timestamp(self, frame_index: int, timestamps: List[str]) -> str:
        return timestamps[frame_index] if frame_index < len(timestamps) else f"frame_{frame_index}"

    def _post_process(self, result_queue: Queue, output_queue: Queue, progress_callback: Callable[[int], None]):
        while True:
            item = result_queue.get()
            if item is None:
                break
            frame, results = item
            annotated_frame = results[0].plot()
            output_queue.put(annotated_frame)
            # We don't update progress here anymore
        
        output_queue.put(None)

In [16]:
OUTPUT_DIR = "output"
yolo_processor = YOLOProcessor(OUTPUT_DIR)

In [17]:
video_config = [
    {
        "video_id": "Rec16-1_trimmed",
        "input_path": INPUT_VIDEO_PATH,
        "timestamps_path": INPUT_TIMESTAMP_PATH
    },
    {
        "video_id": "Rec16-2_trimmed",
        "input_path": INPUT_VIDEO_PATH,
        "timestamps_path": INPUT_TIMESTAMP_PATH
    }
]

yolo_processor.process_videos(video_config)

Processing Rec16-1_trimmed:   0%|          | 0/1396 [00:00<?, ?it/s]

Ultralytics YOLOv8.2.87 🚀 Python-3.12.4 torch-2.4.0+cu121 CUDA:0 (NVIDIA GeForce GTX 1650, 3897MiB)
YOLOv8n summary (fused): 168 layers, 3,151,904 parameters, 0 gradients, 8.7 GFLOPs


Processing Rec16-1_trimmed: 100%|██████████| 1396/1396 [00:33<00:00, 42.23it/s]
Processing Rec16-2_trimmed: 100%|██████████| 1396/1396 [00:33<00:00, 42.23it/s]
