In [1]:
import os
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


In [10]:
from dotenv import load_dotenv

load_dotenv()

class Config:
    
    DETECTION_MODEL_PATH = 'C:/Users/DELL/Desktop/ctrlS nagpur/gpu-debug-main/train/weights/best.pt'
    # Processing Configuration
    MAX_RETRIES = 3
    PROCESSING_TIMEOUT = 3600 # 1 hour in seconds
    FRAMES_OUTPUT_DIR = 'frames'

    TRACKING_THRESHOLD = 0.3 #5
    MAX_AGE = 15 #30
    MIN_MA = 5 #3

In [11]:
import asyncio
from main import ParallelProcessorWorker
from logger import setup_logger

class JobQueue:
    def __init__(self):
        self.queue = asyncio.Queue()
        self.processing = {} # Track processing jobs
        self.lock = asyncio.Lock()
    
    async def add_job(self, job: dict) -> bool:
        async with self.lock:
            if job['job_id'] in self.processing:
                return False
            self.processing[job['job_id']] = job
            await self.queue.put(job)
            return True
    
    async def mark_complete(self, job_id: int):
        async with self.lock:
            self.processing.pop(job_id, None)

class GPUService:
    def __init__(self, num_concurrent_tasks: int = 4):
        self.logger = setup_logger('job_parallel', 'job_parallel.log')
        self.processor_worker = ParallelProcessorWorker(num_concurrent_tasks=num_concurrent_tasks)
        self.job_queue = JobQueue()
        self.processing_task = None
        self.running = False
    
    async def start(self):
        """Start the processing loop"""
        self.running = True
        self.processing_task = asyncio.create_task(self.process_queue())
        self.logger.info("Started GPU API Service")
    
    async def stop(self):
        """Stop the processing loop and cleanup"""
        self.logger.info("Stopping GPU API Service")
        self.running = False
        if self.processing_task:
            self.processing_task.cancel()
            try:
                await self.processing_task
            except asyncio.CancelledError:
                pass
        
        # Wait for active tasks to complete
        if hasattr(self.processor_worker, 'active_tasks'):
            active_tasks = [task for task in self.processor_worker.active_tasks if not task.done()]
            if active_tasks:
                self.logger.info(f"Waiting for {len(active_tasks)} active tasks to complete")
                await asyncio.gather(*active_tasks, return_exceptions=True)
        self.logger.info("GPU API Service stopped")
    
    async def process_queue(self):
        """Main processing loop that handles queued jobs"""
        while True:
            try:
                try:
                    # Get next job from queue
                    # job = await self.job_queue.get()
                    job = await asyncio.wait_for(self.job_queue.queue.get(), timeout=1.0)
                except asyncio.TimeoutError:
                    continue
                # Start processing in parallel
                asyncio.create_task(self.process_job(job))
            except Exception as e:
                self.logger.error(f"Error in process_queue: {str(e)}")
                await asyncio.sleep(1)
    
    async def process_job(self, job: dict):
        """Process a single job"""
        try:
            self.logger.info(f"Processing job: {job['job_id']}")
            await self.processor_worker.process_job(job)
        except Exception as e:
            self.logger.error(f"Error processing job {job['job_id']}: {str(e)}")
        finally:
            await self.job_queue.mark_complete(job['job_id'])

    def get_status(self) -> dict:
        """Get current processing status"""
        return {
            "queued_jobs": self.job_queue.queue.qsize(),
            "processing_jobs": len(self.job_queue.processing),
            "active_tasks": len(self.processor_worker.active_tasks),
            "max_concurrent": self.processor_worker.num_concurrent_tasks
        }


In [19]:
from scipy.optimize import linear_sum_assignment
import scipy.linalg
import numpy as np
import os
import cv2
import time

np.random.seed(123)
# np.set_printoptions(suppress=True)


class KalmanFilter(object):
    """
    The 8-dimensional state space
    x, y, a, h, vx, vy, va, vh
    """
    count = 0
    def __init__(self, measurement):
        ndim, dt = 4, 1
        self.age = 0 # counter for no. of frame predicted
        self.hits = 0 # counter for no. of frame updated
        self.time_since_update = 0
        self.measurement_association = 0
        self.id = KalmanFilter.count
        KalmanFilter.count += 1

        self._motion_mat = np.eye(2*ndim, 2*ndim)
        for i in range(ndim):
            self._motion_mat[i, ndim+i] = dt

        self._transform_mat = np.eye(ndim, 2*ndim)

        # Observation and Motion uncertainty are chosen relative to the
        # current state estimate. These weights control the amount of
        # uncertainty in the model.
        self._std_weight_position = 1. / 20
        self._std_weight_velocity = 1. / 160

        # The mean vector(8 dimensional) and covariance matrix(8x8 dimensional)
        # of the new track. Unobserved velocities are initialized to 0.

        mean_pos = measurement
        mean_vel = np.zeros_like(mean_pos)

        self.mean = np.r_[mean_pos, mean_vel]

        # print('Initial mean : {}'.format(self.mean))
        # print('*'*50)

        std = [self._std_weight_position * measurement[3],
               self._std_weight_position * measurement[3],
               1e-2,
               self._std_weight_position * measurement[3],
               self._std_weight_velocity * measurement[3],
               self._std_weight_velocity * measurement[3],
               1e-5,
               self._std_weight_velocity * measurement[3]]

        self.covariance = np.diag(np.square(std))

        # print('Initial Covariance : {}'.format(self.covariance))
        # print('*'*50)

    def predict(self):
        """
        Run the Kalman Filter Prediction step
        Return:
            The mean and covariance matrix of the predicted state.
            Unobserved velocities are initialized to 0 mean.
        """
        self.age += 1
        if self.time_since_update > 0:
            self.measurement_association = 0

        self.time_since_update += 1

        std_pos = [self._std_weight_position * self.mean[3],
                   self._std_weight_position * self.mean[3],
                   1e-2,
                   self._std_weight_position * self.mean[3]]

        std_vel = [self._std_weight_velocity * self.mean[3],
                   self._std_weight_velocity * self.mean[3],
                   1e-5,
                   self._std_weight_velocity * self.mean[3]]

        motion_cov = np.diag(np.square(np.r_[std_pos, std_vel]))

        # print('Process Covariance(Error) : {}'.format(motion_cov))
        # print('*'*50)

        # x^(k+1) = F.x^k
        self.mean = np.dot(self._motion_mat, self.mean)
        # print('Predicted Mean : {}'.format(self.mean))
        # print('*'*50)
        # P^k = F.P^(k-1).Ft + Q
        self.covariance = np.linalg.multi_dot((self._motion_mat, self.covariance, self._motion_mat.T)) \
                            + motion_cov
        # print('Predicted Covariance : {}'.format(self.covariance))
        # print('*'*50)

        return np.dot(self._transform_mat, self.mean)

    def project(self):
        """
        Project state distribution to measurement state.
        Return: (ndarray, ndarray)
            The projected mean and covariance matrix of the given state estimate.
        """
        std = [self._std_weight_position * self.mean[3],
               self._std_weight_position * self.mean[3],
               1e-1,
               self._std_weight_position * self.mean[3]]

        measurement_cov = np.diag(np.square(std))

        # print('Measurement Error : {}'.format(measurement_cov))
        # print('*'*50)

        projected_mean = np.dot(self._transform_mat, self.mean)
        projected_cov = np.linalg.multi_dot((self._transform_mat, self.covariance, self._transform_mat.T)) \
                            + measurement_cov

        # print('Projected Mean : {}'.format(projected_mean))
        # print('*'*50)
        # print('Projected Covariance : {}'.format(projected_cov))
        # print('*'*50)

        return projected_mean, projected_cov

    def update(self, measurement):
        """
        Run KalmanFilter correction step.
        Returns:
            measurement-corrected state distribution.
        """
        self.time_since_update = 0
        self.hits += 1
        self.measurement_association += 1

        projected_mean, projected_cov = self.project()

        # (H.P.Ht + R).K' = P.Ht
        chol_factor, lower = scipy.linalg.cho_factor(projected_cov, lower=True, check_finite=False)

        # print('chol factor, lower : {}'.format(chol_factor))
        # print('*'*50)

        kalman_gain = scipy.linalg.cho_solve((chol_factor, lower),
                                np.dot(self.covariance, self._transform_mat.T).T, check_finite=False).T
        # print('Kalman Gain : {}'.format(kalman_gain))
        # print('*'*50)

        pos_correction = measurement - projected_mean

        # print('Position Correction : {}'.format(pos_correction))
        # print('*'*50)

        self.mean = self.mean + np.dot(pos_correction, kalman_gain.T)

        # print('Updated mean : {}'.format(self.mean))
        # print('*'*50)

        self.covariance = self.covariance - np.linalg.multi_dot((
                    kalman_gain, projected_cov, kalman_gain.T))

        # print('Updated Covariance : {}'.format(self.covariance))
        # print('*'*50)

    def get_updated_state(self):
        return np.dot(self._transform_mat, self.mean)

def iou(d_bbox, t_bbox):
    # iou = np.sqrt((np.square(d_bbox[0] - t_bbox[0]) + \
    #         np.square(d_bbox[1] - t_bbox[1])))
    d_bbox[2] *= d_bbox[3]
    d_bbox[:2] -= d_bbox[2:4] / 2.
    d_bbox[2:4] += d_bbox[:2]

    t_bbox[2] *= t_bbox[3]
    t_bbox[:2] -= t_bbox[2:4] / 2.
    t_bbox[2:4] += t_bbox[:2]

    xx1 = np.maximum(d_bbox[0], t_bbox[0])
    yy1 = np.maximum(d_bbox[1], t_bbox[1])
    xx2 = np.minimum(d_bbox[2], t_bbox[2])
    yy2 = np.minimum(d_bbox[3], t_bbox[3])

    intersection_width = np.maximum(0., xx2 - xx1)
    intersection_height = np.maximum(0., yy2 - yy1)

    intersection_area = intersection_width * intersection_height

    union_area = ((d_bbox[2] - d_bbox[0]) * (d_bbox[3] - d_bbox[1])) + \
                 ((t_bbox[2] - t_bbox[0]) * (t_bbox[3] - t_bbox[1])) - \
                 intersection_area

    iou = intersection_area / union_area
    # print(iou)
    # print()

    return iou

def associate_detections_to_trackers(d_bbox, t_bbox, iou_threshold=0.2):


    if len(t_bbox) == 0:
        return np.empty((0, 2), dtype=int), np.arange(len(d_bbox)), np.empty((0, 5), dtype=int)

    iou_matrix = np.zeros((len(d_bbox), len(t_bbox)), dtype=np.float32)

    d_bbox_iou = d_bbox.copy()
    t_bbox_iou = t_bbox.copy()

    for d_idx, det in enumerate(d_bbox_iou):
        for t_idx, trk in enumerate(t_bbox_iou):
            iou_matrix[d_idx, t_idx] = iou(det.copy(), trk.copy())

    # print('IOU Matrix : {}'.format(iou_matrix))
    # print('*'*50)

    if iou_matrix.shape[0] == 1:

        if not np.any(iou_matrix > iou_threshold):
            # iou_matrix = np.delete(iou_matrix, 0, axis=0)
            matched_indices = np.empty(shape=(0, 2))

        else:
            argmax = np.argmax(iou_matrix)
            matched_indices = np.array([[0, argmax]])

    elif iou_matrix.shape[1] == 1:

        if not np.any(iou_matrix > iou_threshold):
            matched_indices = np.empty(shape=(0, 2))

        else:
            argmax = np.argmax(iou_matrix)
            matched_indices = np.array([[argmax, 0]])

    elif min(iou_matrix.shape) > 0:

        cost_matrix = -1 * iou_matrix # if you are using eucidean distance then dont't multiply with -1
        x, y = linear_sum_assignment(cost_matrix)
        # print('Applied Hungarian Algo.')
        # print('*'*50)
        matched_indices = np.array(list(zip(x, y)))
    else:
        matched_indices = np.empty(shape=(0, 2))

    # print('Matched Indices : {}'.format(matched_indices))
    # print('*'*50)

    unmatched_detections = []
    # print('d_bbox: {}'.format(d_bbox))
    for d_idx, det in enumerate(d_bbox):
        if (d_idx not in matched_indices[:, 0]):
            unmatched_detections.append(d_idx)

    unmatched_trackers = []
    # print('t_bbox: {}'.format(t_bbox))
    for t_idx, trk in enumerate(t_bbox):
        if (t_idx not in matched_indices[:, 1]):
            unmatched_trackers.append(t_idx)

    # filter out matched with low IOU
    matches = []
    for m in matched_indices:
        if (iou_matrix[m[0], m[1]] < iou_threshold):
            unmatched_detections.append(m[0])
            unmatched_trackers.append(m[1])
        else:
            matches.append(m.reshape(1, 2))

    if len(matches) == 0:
        matches = np.empty((0, 2), dtype=int)
    else:
        matched_indices = np.concatenate(matches, axis=0)

    return matched_indices, np.array(unmatched_detections), np.array(unmatched_trackers)


class Sort(object):
    def __init__(self, max_age=30, min_ma=3):
        """
        Set key parameter for SORT
        """
        self.max_age = max_age # for how long we track the object when we stopped getting any measurement.
        self.min_ma = min_ma # new tracks are classified as tentative during first 3(min_ma) frame.
        #self.frame_count = 0 # frame count
        self.trackers = [] # trackers_list contains multiple tracked object in single frame.
        self.id_set = set()
        self.object_count = 0

    def update(self, detected_bbox=np.empty((0, 5))):
        """
        This method must be called once for each frame even with empty detections (use np.empty((0,5)) for frames without detections).

        Param:
            numpy array of detections in the format [[x1,y1,x2,y2,score],[x3,y3,x4,y4,score],...]
        Return:
            The similar array, where the last column is the object ID.
        NOTE:
            The number of objects returned may differ from the number of detections provided.
        """
        #self.frame_count += 1
        # if we are tracking, get predicted locations from existing trackers.
        # Initialize a tracked_bbox array, make prediction for each trackers and get the predicted position.
        tracked_bbox = np.zeros((len(self.trackers), 5))

        # print('Predicted box')
        # print()

        to_del = [] # contains invalid tracker index
        for tr_idx, each_tracker in enumerate(tracked_bbox):
            pos = self.trackers[tr_idx].predict()
            # print(pos)
            # check for invalid position
            if np.any(np.isnan(pos)):
                to_del.append(tr_idx)
                continue
            each_tracker[:] = [pos[0], pos[1], pos[2], pos[3], 0]

        for invalid_tr_idx in reversed(to_del):
            self.trackers.pop(invalid_tr_idx)
            np.delete(tracked_bbox, invalid_tr_idx, axis=0)

        d_bbox = detected_bbox.copy()
        t_bbox = tracked_bbox.copy()

        # print('Detected bbox: {}'.format(d_bbox))
        # print('*'*50)
        # print('Tracked bbox : {}'.format(t_bbox))
        # print('*'*50)

        matched, unmatched_dets, unmatched_tracks = associate_detections_to_trackers(d_bbox, t_bbox)

        # print('Matched : {}'.format(matched))
        # print('*'*50)
        # print('Unmatched_dets : {}'.format(unmatched_dets))
        # print('*'*50)
        # print('Unmatched_tracks : {}'.format(unmatched_tracks))
        # print('*'*50)

        # update matched trackers with detection bbox i.e, measurement
        for m in matched:
            self.trackers[m[1]].update(detected_bbox[m[0], :])

        # create and initialize new trackers for unmatched detections
        for i in unmatched_dets:
            new_tracker = KalmanFilter(detected_bbox[i, :])
            # print('Started Tracking...')
            self.trackers.append(new_tracker)

        # return only those tracker whose measurement association >= 3.
        # remove those tracker form tracker list whose time_since_update > max_age.
        ret = []
        i = len(self.trackers)
        #print(i)
        for each_tracker in reversed(self.trackers):

            d = each_tracker.get_updated_state()

            ret.append(np.concatenate((d, [each_tracker.id+1])).reshape(1, -1))

            if each_tracker.count not in self.id_set:
                if (each_tracker.measurement_association >= self.min_ma):
                    #ret.append(np.concatenate((d, [each_tracker.id+1])).reshape(1, -1))
                    self.id_set.add(each_tracker.count)

            if d[0] <= 100:
                i -= 1
                #self.id_set.remove(each_tracker.count)
                self.trackers.pop(i)
                # print('Deleted this track.')
                continue
            i -= 1
            # remove dead tracklet
            if (each_tracker.time_since_update > self.max_age):
                #self.id_set.remove(each_tracker.count)
                self.trackers.pop(i)

        if len(self.id_set):
            self.object_count = max(self.id_set)
            #print(self.id_set)

        if len(ret) > 0:
            return np.concatenate(ret), self.object_count

        return np.empty((0, 5)), self.object_count


In [20]:
import logging
import sys
from pathlib import Path

def setup_logger(name, log_file=None, level=logging.INFO):
    """Configure logger with custom formatting and multiple handlers"""
    # Create logs directory if it doesn't exist
    logs_dir = Path("logs")
    logs_dir.mkdir(exist_ok=True)

    # Create formatter
    formatter = logging.Formatter(
        '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
    )

    # Create logger
    logger = logging.getLogger(name)
    logger.setLevel(level)

    # Clear any existing handlers
    logger.handlers = []

    # Console handler
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setFormatter(formatter)
    logger.addHandler(console_handler)

    # File handler
    if log_file:
        file_handler = logging.FileHandler(logs_dir / log_file)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    return logger

# Create separate loggers for different components

In [21]:
import torch
from ultralytics import YOLO
from typing import Dict, Optional, List
import threading
import asyncio

from logger import setup_logger
from config import Config

class ModelInstance:
    def __init__(self, model: YOLO, model_type: str):
        self.model = model
        self.model_type = model_type
        self.in_use = False
        self.lock = threading.Lock()
        # track memory usage for this instance
        # self.initial_memory = torch.cuda.memory_allocated()

class SingleGPUModelManager:
    def __init__(self, instances_per_type: int = 50, memory_threshold: float = 0.85):
        """
        Initialize model manager for single GPU
        Args:
            instances_per_type: Number of instances per model type to load
        """
        self.logger = setup_logger('gpu_model_manager', 'gpu_model_manager.log')
        self.logger.info("Initializing single_gpu_model_manager.")

        self.instances_per_type = instances_per_type
        self.memory_threshold = memory_threshold
        self.total_gpu_memory = torch.cuda.get_device_properties(0).total_memory

        self.model_instances: Dict[str, List[ModelInstance]] = {
            'hotspot': []
        }
        self.model_locks = {
            'hotspot': threading.Lock()
        }
        self.initialize_models()
    
    def _check_memory_usage(self) -> float:
        """
        Check current GPU memory usage
        Returns:
            float: Fraction of GPU memory currently in use
        """
        allocated = torch.cuda.memory_allocated()
        return allocated / self.total_gpu_memory
    
    def initialize_models(self):
        """Initialize multiple instances of each model type on single GPU"""
        try:
            # Ensure CUDA is available
            if not torch.cuda.is_available():
                raise RuntimeError("CUDA is not available")
            
            self.logger.info(f"Initializing {self.instances_per_type} instances per model type on GPU")

            # Initialize models with CUDA memory management
            with torch.cuda.device(0):
                # Initialize hotspot models
                for i in range(self.instances_per_type):
                    hotspot_model = YOLO(Config.DETECTION_MODEL_PATH)
                    hotspot_model.to('cuda')
                    self.model_instances['hotspot'].append(
                        ModelInstance(hotspot_model, 'hotspot')
                    )
                    self.logger.info(f"Detection Model: {i} initialized!")

            # Log GPU memory usage
            allocated = torch.cuda.memory_allocated(0)
            reserved = torch.cuda.memory_allocated(0)
            self.logger.info(f"GPU Memory: Allocated={allocated/1e9:.2f}GB, Reserved={reserved/1e9:.2f}GB")
        
        except Exception as e:
            self.logger.error(f"Failed to initialize models: {str(e)}", exc_info=True)
            raise
    
    async def get_available_model(self, model_type: str) -> Optional[ModelInstance]:
        """Get an available model instance of the specified type"""
        while True:
            with self.model_locks[model_type]:
                for instance in self.model_instances[model_type]:
                    self.logger.info(f"Checking instance: {instance.model_type}")
                    with instance.lock:
                        if not instance.in_use:
                            instance.in_use = True
                            self.logger.info(f"Returning instance: {instance.model_type}")
                            return instance
            # If no instance is available, wait briefly before checking again
            await asyncio.sleep(1)
    
    async def release_model(self, instance: ModelInstance):
        """Release a model instance back to the pool"""
        with instance.lock:
            instance.in_use = False
            # Clear CUDA cache for this model if needed
            # if torch.cuda.is_available():
            #     torch.cuda.empty_cache()
        self.logger.info(f"Released {instance.model_type} model instance")
    
    # def _clear_instance_memory(self, instance: ModelInstance):
    #     """
    #     Clear memory specifically associated with this model instance
    #     """
    #     try:
    #         # Clear stored intermediate results
    #         if hasattr(instance, 'intermediate_results'):
    #             instance.intermediate_results.clear()
            
    #         # Clear last computation if it exists
    #         if hasattr(instance, 'last_computation'):
    #             del instance.last_computation
                
    #         # Clear model gradients
    #         if hasattr(instance.model, 'zero_grad'):
    #             instance.model.zero_grad(set_to_none=True)  # More efficient than just zero_grad()
                
    #         # Remove any references to stored tensors
    #         for attr_name in dir(instance):
    #             attr = getattr(instance, attr_name)
    #             if isinstance(attr, torch.Tensor):
    #                 delattr(instance, attr_name)
                    
    #         # Force garbage collection for this instance
    #         gc.collect()
    #         if torch.cuda.is_available():
    #             torch.cuda.synchronize()  # Ensure CUDA operations are complete
                
    #     except Exception as e:
    #         self.logger.error(f"Error clearing instance memory: {str(e)}", exc_info=True)
    
    # async def release_model(self, instance: ModelInstance):
    #     """
    #     Release a model instance back to the pool with targeted memory cleanup
    #     """
    #     try:
    #         with instance.lock:
    #             # Clear instance-specific memory
    #             self._clear_instance_memory(instance)
                
    #             # Check overall memory usage
    #             current_usage = self._check_memory_usage()
    #             self.logger.debug(f"Current GPU memory usage: {current_usage:.2%}")
                
    #             # If memory usage is above threshold, perform additional cleanup
    #             if current_usage > self.memory_threshold:
    #                 self.logger.warning(
    #                     f"High memory usage detected ({current_usage:.2%}). "
    #                     "Performing additional cleanup."
    #                 )
    #                 torch.cuda.empty_cache()
                    
    #                 # Log memory after cleanup
    #                 new_usage = self._check_memory_usage()
    #                 self.logger.info(
    #                     f"Memory usage after cleanup: {new_usage:.2%} "
    #                     f"(freed {(current_usage - new_usage) * 100:.2f}%)"
    #                 )
                
    #             # Finally mark the instance as not in use
    #             instance.in_use = False
                
    #     except Exception as e:
    #         self.logger.error(f"Error in release_model: {str(e)}", exc_info=True)
    #         raise


In [22]:
import cv2
import os
import numpy as np
from datetime import datetime
from typing import Optional

from kalman_track import Sort
from logger import setup_logger
from config import Config

class ParallelVideoProcessor:
    def __init__(self, model_manager):
        self.logger = setup_logger('parallel_video_processor', 'parallel_video_processor.log')
        self.logger.info("Initializing ParallelVideoProcessor.")

        self.model_manager = model_manager
        
        self.MAX_AGE = Config.MAX_AGE
        self.MIN_MA = Config.MIN_MA
        
        # Create frames output directory
        self.frames_base_dir = Config.FRAMES_OUTPUT_DIR
        os.makedirs(self.frames_base_dir, exist_ok=True)

    def initialize_trackers(self):
        """Initialize separate trackers for each class"""
        trackers = {}
        for class_id in range(4):  # Assuming 4 classes (0-3)
            trackers[class_id] = Sort(self.MAX_AGE, self.MIN_MA)
        return trackers

    async def process_video(self, video_path: str, video_type: str):
        """Process video using available model instance"""
        try:
            # Get available model instance
            model_instance = await self.model_manager.get_available_model(video_type)
            self.logger.info(f"Got {video_type} model instance.")

            try:
                success = await self._process_hotspot_video(video_path, model_instance)
                return success
            finally:
                # Always release the model instance back to the pool
                await self.model_manager.release_model(model_instance)

        except Exception as e:
            self.logger.error(f"Error processing video: {str(e)}", exc_info=True)
            raise
    
    async def process_frame_hotspot(self, frame, model_instance, conf_threshold=0.50):
        """Process frame for hotspot detection using available model instance"""
        try:
            results = model_instance.model(frame, conf=conf_threshold)
            # Get bounding boxes for detected objects
            boxes = []
            detections = []
            
            for r in results:
                boxes_tensor = r.boxes.xyxy.cpu()   # Get boxes in xyxy format
                confs = r.boxes.conf.cpu()          # Get confidence scores
                cls = r.boxes.cls.cpu()             # Get class indices
                
                for box, conf, cl in zip(boxes_tensor, confs, cls):
                    if conf >= conf_threshold:
                        x1, y1, x2, y2 = map(int, box[:4])
                        cx = (x1 + x2) / 2.
                        cy = (y1 + y2) / 2.
                        ar = (x2 - x1) / (y2 - y1)
                        h = (y2 - y1)
                        # if cx <= 100 or cx >= 220:
                        #     continue
                        detections.append({
                            'bbox': [cx, cy, ar, h],
                            'confidence': float(conf),
                            'class': int(cl)
                        })
            
            return results, detections, boxes

        except Exception as e:
            self.logger.error(f"Error processing hotspot frame: {str(e)}", exc_info=True)
            raise

    async def _process_hotspot_video(self, video_path: str, model_instance):
        """Process hotspot video with given model instance"""
        self.logger.info(f"Starting hotspot detection for video_id")

        try:
            # Initialize separate trackers for each class
            self.logger.info("Initializing Kalman trackers for each class")
            trackers = self.initialize_trackers()

            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                raise Exception(f"Failed to open video file: {video_path}")

            frame_count = 0
            frame_processed = 0
            tracked_objects = {class_id: {} for class_id in range(4)}  # Track objects per class
            processing_start = datetime.now()
            self.logger.info("Starting frame processing loop")

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                frame_count += 1
                if frame_count % 10 == 0:  # Process every 5th frame
                    frame_processed += 1

                    # Process frame
                    results, detections, _ = await self.process_frame_hotspot(
                        frame, model_instance
                    )
                    # Organize detections by class
                    class_detections = {class_id: [] for class_id in range(4)}
                    for det in detections:
                        class_id = det['class']
                        bbox = det['bbox']
                        class_detections[class_id].append(bbox)

                    # Update trackers for each class
                    annotated_frame = results[0].plot()
                    
                    for class_id, dets in class_detections.items():
                        if len(dets) > 0:
                            dets_array = np.array(dets)
                            tracked_bbox, count = trackers[class_id].update(dets_array)
                        else:
                            tracked_bbox, count = trackers[class_id].update()

                        # Update tracked objects for this class
                        if len(tracked_bbox) > 0:
                            for track in tracked_bbox:
                                object_id = int(track[-1])
                                centroid = (int(track[0]), int(track[1]))
                                
                                if object_id not in tracked_objects[class_id]:
                                    tracked_objects[class_id][object_id] = {
                                        'frames_tracked': 0,
                                        'last_position': centroid,
                                        'class_id': class_id
                                    }
                                
                                tracked_objects[class_id][object_id]['frames_tracked'] += 1
                                tracked_objects[class_id][object_id]['last_position'] = centroid

                                # Draw tracking info on frame
                                cv2.putText(annotated_frame, f"Class {class_id} ID {object_id}", 
                                            (centroid[0] - 10, centroid[1] - 10),
                                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
                                cv2.circle(annotated_frame, centroid, 4, (0, 255, 0), -1)

                    # Save frame periodically (every 30 processed frames)
                    if frame_processed % 10 == 0:
                        frame_filename = f"frame_{frame_count}.jpg"
                        temp_frame_path = os.path.join(self.frames_base_dir, frame_filename)
                        
                        try:
                            cv2.imwrite(temp_frame_path, annotated_frame)
                            self.logger.debug(f"Frame saved: {temp_frame_path}")
                            
                        except Exception as e:
                            self.logger.error(f"Failed to save frame: {str(e)}")

            cap.release()
            self.logger.info(f"Video processing completed. Total frames: {frame_processed}")
            
            # Store final results
            processing_time = (datetime.now()-processing_start).total_seconds()
            total_tracked_objects = sum(len(objs) for objs in tracked_objects.values())
            
            final_results = {
                'total_frames': frame_processed,
                'unique_tracked_objects': total_tracked_objects,
                'objects_per_class': {
                    class_id: len(objs) 
                    for class_id, objs in tracked_objects.items()
                }
            }
            print(final_results)
            
            self.logger.info(f"Processing completed in {processing_time:.2f} seconds. "
                        f"Found {total_tracked_objects} unique objects across all classes")
            
            return True

        except Exception as e:
            self.logger.error(f"Hotspot video processing failed: {str(e)}", exc_info=True)
            raise

    def cleanup(self):
        """Cleanup resources"""
        self.db_writer.shutdown()

In [23]:
import asyncio
import nest_asyncio
from typing import List

from logger import setup_logger
from model_manager import SingleGPUModelManager
from singlegpu_videoprocessor import ParallelVideoProcessor

class ParallelProcessorWorker:
    def __init__(self, num_concurrent_tasks: int = 4):
        self.num_concurrent_tasks = num_concurrent_tasks
        self.running = False
        self.logger = setup_logger('main', 'main.log')
        self.logger.info("Initializing main worker.")

        # Initialize GPU model manager with instances per type
        self.model_manager = SingleGPUModelManager(instances_per_type=num_concurrent_tasks)
        self.processor = ParallelVideoProcessor(self.model_manager)

        # Track active tasks
        self.active_tasks: List[asyncio.Task] = []
        self.semaphore = asyncio.Semaphore(num_concurrent_tasks)

    async def process_job(self, job: dict):
        """Process a single job with semaphore control and user verification"""
        async with self.semaphore: # Control concurrent processing
            try:
                video_path = job.get('video_path')
                video_type = job.get('video_type')
                try:
                    # Process video
                    success = await self.processor.process_video(
                        video_path,
                        video_type
                    )
                    self.logger.info(f"processing completed successfully: {success}")

                except Exception as e:
                    self.logger.error(f"Error processing job: {str(e)}", exc_info=True)

            except Exception as e:
                self.logger.error(f"Error in process_job: {str(e)}", exc_info=True)

if __name__ == "__main__":
    # Add this line to make asyncio work in Jupyter
    nest_asyncio.apply()
    
    processor_worker = ParallelProcessorWorker(num_concurrent_tasks=4)
    
    # For Jupyter, use this instead of asyncio.run()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(processor_worker.process_job({
        'video_path': 'C:/Users/DELL/Desktop/ctrlS nagpur/gpu-debug-main/test1.MP4', 
        'video_type': 'hotspot'
    }))

2025-03-03 15:23:36,704 - main - INFO - Initializing main worker.
2025-03-03 15:23:36,713 - gpu_model_manager - INFO - Initializing single_gpu_model_manager.
2025-03-03 15:23:36,714 - gpu_model_manager - INFO - Initializing 4 instances per model type on GPU
2025-03-03 15:23:36,907 - gpu_model_manager - INFO - Detection Model: 0 initialized!
2025-03-03 15:23:37,064 - gpu_model_manager - INFO - Detection Model: 1 initialized!
2025-03-03 15:23:37,220 - gpu_model_manager - INFO - Detection Model: 2 initialized!
2025-03-03 15:23:37,439 - gpu_model_manager - INFO - Detection Model: 3 initialized!
2025-03-03 15:23:37,439 - gpu_model_manager - INFO - GPU Memory: Allocated=0.18GB, Reserved=0.18GB
2025-03-03 15:23:37,439 - parallel_video_processor - INFO - Initializing ParallelVideoProcessor.
2025-03-03 15:23:37,454 - gpu_model_manager - INFO - Checking instance: hotspot
2025-03-03 15:23:37,454 - gpu_model_manager - INFO - Returning instance: hotspot
2025-03-03 15:23:37,454 - parallel_video_proc