In [2]:
import cv2
import numpy as np
import torch
from torchvision import transforms
from PIL import Image
import warnings
warnings.filterwarnings('ignore')

KeyboardInterrupt: 

In [4]:
class CCTVDetection:
    def __init__(self, rtsp_url):
        """
        Initialize the CCTV detection system
        
        Args:
            rtsp_url (str): RTSP URL of the CCTV camera
        """
        self.rtsp_url = rtsp_url
        self.cap = None
        self.model = None
        self.device = None
        
        # Initialize the detection model
        self.initialize_model()
        
        # Define color mappings for different classes
        self.colors = {
            'car': (0, 255, 0),        # Green
            'bike': (255, 0, 0),       # Blue
            'cycle': (0, 255, 255),    # Yellow
            'motorcycle': (255, 0, 255), # Magenta
            'male': (0, 165, 255),     # Orange
            'female': (255, 192, 203), # Pink
            'kid': (255, 255, 0)       # Cyan
        }
        
        # Class mappings
        self.vehicle_classes = ['car', 'bike', 'cycle', 'motorcycle']
        self.person_classes = ['male', 'female', 'kid']
        
    def initialize_model(self):
        """Initialize YOLOv5 model for object detection"""
        try:
            self.device = 'cuda' if torch.cuda.is_available() else 'cpu'
            print(f"Using device: {self.device}")
            
            # Load YOLOv5 model (you can use different sizes: n, s, m, l, x)
            self.model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
            self.model.to(self.device)
            self.model.conf = 0.5  # Confidence threshold
            self.model.iou = 0.45  # NMS IoU threshold
            
            print("Model loaded successfully!")
            
        except Exception as e:
            print(f"Error loading model: {e}")
            raise
    
    def map_yolo_to_custom_classes(self, yolo_class):
        """
        Map YOLO classes to our custom categories
        
        Args:
            yolo_class (str): YOLO class name
            
        Returns:
            tuple: (custom_class, is_vehicle, is_person)
        """
        # Vehicle mappings
        vehicle_mapping = {
            'car': 'car',
            'truck': 'car',
            'bus': 'car',
            'bicycle': 'cycle',
            'motorcycle': 'motorcycle'
        }
        
        # Person mappings (YOLO doesn't distinguish gender/age, so we'll use additional logic)
        person_mapping = {
            'person': 'person'  # Will be further classified
        }
        
        if yolo_class in vehicle_mapping:
            return vehicle_mapping[yolo_class], True, False
        elif yolo_class in person_mapping:
            return self.classify_person_details(), False, True
        
        return None, False, False
    
    def classify_person_details(self):
        """
        Classify person into male, female, or kid based on additional features
        Note: This is a simplified approach. In production, you'd use a specialized model.
        """
        # This is a placeholder implementation
        # In a real system, you would use:
        # 1. Gender classification model
        # 2. Age estimation model
        # 3. Height/feature analysis
        
        # For demo purposes, we'll use a random distribution
        import random
        choices = ['male', 'female', 'kid']
        weights = [0.4, 0.4, 0.2]  # Weighted probabilities
        return random.choices(choices, weights=weights)[0]
    
    def connect_to_camera(self):
        """Connect to RTSP stream"""
        try:
            self.cap = cv2.VideoCapture(self.rtsp_url)
            
            # Set buffer size to minimize latency
            self.cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
            
            if not self.cap.isOpened():
                print("Error: Could not connect to camera")
                return False
            
            print("Successfully connected to CCTV camera")
            return True
            
        except Exception as e:
            print(f"Error connecting to camera: {e}")
            return False
    
    def process_frame(self, frame):
        """
        Process a single frame for detection
        
        Args:
            frame: Input frame from camera
            
        Returns:
            tuple: (processed_frame, detection_data)
        """
        if frame is None:
            return frame, {}
        
        # Convert BGR to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        
        # Run inference
        results = self.model(rgb_frame)
        
        # Parse results
        detection_data = self.parse_detections(results, frame.shape)
        
        # Draw bounding boxes
        processed_frame = self.draw_detections(frame, detection_data)
        
        return processed_frame, detection_data
    
    def parse_detections(self, results, frame_shape):
        """
        Parse YOLO detection results
        
        Args:
            results: YOLO detection results
            frame_shape: Shape of the original frame
            
        Returns:
            dict: Detection data organized by class
        """
        detection_data = {cls: 0 for cls in self.vehicle_classes + self.person_classes}
        detections = []
        
        if hasattr(results, 'xyxy') and len(results.xyxy) > 0:
            for *xyxy, conf, cls in results.xyxy[0]:
                class_name = results.names[int(cls)]
                custom_class, is_vehicle, is_person = self.map_yolo_to_custom_classes(class_name)
                
                if custom_class:
                    detection_data[custom_class] += 1
                    detections.append({
                        'class': custom_class,
                        'bbox': [int(coord) for coord in xyxy],
                        'confidence': float(conf),
                        'is_vehicle': is_vehicle,
                        'is_person': is_person
                    })
        
        detection_data['detections'] = detections
        return detection_data
    
    def draw_detections(self, frame, detection_data):
        """
        Draw bounding boxes and labels on frame
        
        Args:
            frame: Original frame
            detection_data: Detection information
            
        Returns:
            frame: Frame with drawn detections
        """
        for detection in detection_data.get('detections', []):
            bbox = detection['bbox']
            class_name = detection['class']
            confidence = detection['confidence']
            
            color = self.colors.get(class_name, (255, 255, 255))
            
            # Draw bounding box
            cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
            
            # Draw label background
            label = f"{class_name} {confidence:.2f}"
            label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
            cv2.rectangle(frame, (bbox[0], bbox[1] - label_size[1] - 10),
                         (bbox[0] + label_size[0], bbox[1]), color, -1)
            
            # Draw label text
            cv2.putText(frame, label, (bbox[0], bbox[1] - 5),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)
        
        return frame
    
    def display_statistics(self, frame, detection_data):
        """
        Display detection statistics on frame
        
        Args:
            frame: Input frame
            detection_data: Detection information
            
        Returns:
            frame: Frame with statistics overlay
        """
        # Create statistics text
        stats_text = []
        
        # Vehicle statistics
        vehicle_count = sum(detection_data.get(cls, 0) for cls in self.vehicle_classes)
        stats_text.append(f"Vehicles: {vehicle_count}")
        for vehicle in self.vehicle_classes:
            count = detection_data.get(vehicle, 0)
            if count > 0:
                stats_text.append(f"  {vehicle}: {count}")
        
        # Person statistics
        person_count = sum(detection_data.get(cls, 0) for cls in self.person_classes)
        stats_text.append(f"Persons: {person_count}")
        for person in self.person_classes:
            count = detection_data.get(person, 0)
            if count > 0:
                stats_text.append(f"  {person}: {count}")
        
        # Draw statistics panel
        y_offset = 30
        for i, text in enumerate(stats_text):
            color = (0, 0, 0) if i % 2 == 0 else (50, 50, 50)
            cv2.rectangle(frame, (10, y_offset - 20), (250, y_offset), (255, 255, 255), -1)
            cv2.putText(frame, text, (15, y_offset - 5),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)
            y_offset += 25
        
        return frame
    
    def run_detection(self):
        """Main function to run real-time detection"""
        if not self.connect_to_camera():
            return
        
        print("Starting real-time detection...")
        print("Press 'q' to quit, 'p' to pause")
        
        paused = False
        
        while True:
            if not paused:
                ret, frame = self.cap.read()
                
                if not ret:
                    print("Error reading frame. Reconnecting...")
                    self.connect_to_camera()
                    continue
                
                # Process frame
                processed_frame, detection_data = self.process_frame(frame)
                
                # Display statistics
                processed_frame = self.display_statistics(processed_frame, detection_data)
                
                # Display frame
                cv2.imshow('CCTV Vehicle and Person Detection', processed_frame)
            
            # Handle key presses
            key = cv2.waitKey(1) & 0xFF
            if key == ord('q'):
                break
            elif key == ord('p'):
                paused = not paused
                print("Paused" if paused else "Resumed")
        
        # Cleanup
        self.cap.release()
        cv2.destroyAllWindows()
        print("Detection stopped")


In [None]:
def main():
    """Main function"""
    # RTSP URL format examples:
    # rtsp://username:password@ip_address:port/stream
    # rtsp://ip_address:554/stream
    
    # Replace with your actual RTSP URL
    rtsp_url = "rtsp://kumar:Kumar%23123@116.73.21.116:554/Streaming/channels/101"
    
    # For testing with webcam (comment the above and uncomment below)
    # rtsp_url = 0  # Uses default webcam
    
    # Initialize detection system
    detector = CCTVDetection(rtsp_url)
    
    try:
        # Start detection
        detector.run_detection()
    except KeyboardInterrupt:
        print("\nDetection interrupted by user")
    except Exception as e:
        print(f"Error in main execution: {e}")

if __name__ == "__main__":
    main()

Using device: cpu


Using cache found in C:\Users\Priya/.cache\torch\hub\ultralytics_yolov5_master
YOLOv5  2025-11-4 Python-3.11.2 torch-2.9.0+cpu CPU

Fusing layers... 
YOLOv5s summary: 213 layers, 7225885 parameters, 0 gradients, 16.4 GFLOPs
Adding AutoShape... 


Model loaded successfully!
Successfully connected to CCTV camera
Starting real-time detection...
Press 'q' to quit, 'p' to pause
Error reading frame. Reconnecting...
Successfully connected to CCTV camera
Error reading frame. Reconnecting...
Successfully connected to CCTV camera
