# accident detection without main()

In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import torch
from datetime import datetime

class AccidentDetectionSystem:
    def __init__(self, model_path="best.pt", conf_threshold=0.5, iou_threshold=0.45):
        """
        Initialize the Accident Detection System
        Args:
            model_path: Path to custom trained YOLO model
            conf_threshold: Confidence threshold for detections
            iou_threshold: IOU threshold for NMS
        """
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold
        
        # Define class colors (BGR format)
        self.colors = {
            0: (0, 255, 0),    # bike: green
            1: (0, 0, 255),    # bike_bike_accident: red
            2: (0, 0, 255),    # bike_object_accident: red
            3: (0, 0, 255),    # bike_person_accident: red
            4: (0, 255, 0),    # car: green
            5: (0, 0, 255),    # car_bike_accident: red
            6: (0, 0, 255),    # car_car_accident: red
            7: (0, 0, 255),    # car_object_accident: red
            8: (0, 0, 255),    # car_person_accident: red
            9: (0, 255, 0)     # person: green
        }

        # Initialize class names
        self.class_names = {
            0: 'bike',
            1: 'bike_bike_accident',
            2: 'bike_object_accident',
            3: 'bike_person_accident',
            4: 'car',
            5: 'car_bike_accident',
            6: 'car_car_accident',
            7: 'car_object_accident',
            8: 'car_person_accident',
            9: 'person'
        }

        # Initialize accident event logger
        self.accident_log = []

    def is_accident_class(self, class_id):
        """Check if the class ID represents an accident"""
        accident_classes = [1, 2, 3, 5, 6, 7, 8]  # All accident-related class IDs
        return class_id in accident_classes

    def log_accident(self, class_name, confidence, frame_time):
        """Log accident details with timestamp"""
        accident_entry = {
            'timestamp': frame_time,
            'type': class_name,
            'confidence': confidence
        }
        self.accident_log.append(accident_entry)
        
        # Immediately write to file for real-time logging
        with open("realtime_accident_log.txt", 'a') as f:
            f.write(f"\nTime: {frame_time}\n")
            f.write(f"Type: {class_name}\n")
            f.write(f"Confidence: {confidence:.2f}\n")
            f.write("---------------------\n")

    def detect(self, frame):
        """
        Process a single frame for accident detection
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, accident_detected)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, confidence, class_name]
                - accident_detected: Boolean indicating if an accident was detected
        """
        # Get current timestamp
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Run YOLO inference
        results = self.model(frame, conf=self.conf_threshold, iou=self.iou_threshold)[0]
        
        # Initialize accident detected flag and detections list
        accident_detected = False
        detections = []
        
        # Process detections
        for box in results.boxes:
            # Get box coordinates
            x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
            
            # Get class and confidence
            class_id = int(box.cls[0])
            confidence = float(box.conf[0])
            
            # Get color for this class
            color = self.colors[class_id]
            
            # Add to detections list
            detections.append([x1, y1, x2, y2, confidence, self.class_names[class_id]])
            
            # Draw bounding box
            cv2.rectangle(frame, 
                        (int(x1), int(y1)), 
                        (int(x2), int(y2)), 
                        color, 2)
            
            # Prepare label
            label = f"{self.class_names[class_id]} {confidence:.2f}"
            
            # Add label to frame
            cv2.putText(frame, label, 
                       (int(x1), int(y1 - 10)),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
            
            # Check if this is an accident class
            if self.is_accident_class(class_id):
                accident_detected = True
                self.log_accident(self.class_names[class_id], confidence, current_time)
        
        # Add timestamp to frame
        cv2.putText(frame, current_time, 
                   (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)
        
        # If accident detected, add warning text
        if accident_detected:
            cv2.putText(frame, "ACCIDENT DETECTED!", 
                       (frame.shape[1]//2 - 100, 50),
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        
        return frame, detections, accident_detected

    def get_accident_log(self):
        """Return the accident log"""
        return self.accident_log

    def clear_accident_log(self):
        """Clear the accident log"""
        self.accident_log = []

# helmet detection 


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from datetime import datetime

class HelmetDetector:
    def __init__(self, model_path, conf_threshold=0.5):
        """
        Initialize the Helmet Detection System
        Args:
            model_path: Path to YOLO model
            conf_threshold: Confidence threshold for detections
        """
        # Initialize YOLO model
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        
        # Colors for visualization
        self.colors = {
            'With_Helmet': (0, 255, 0),    # Green
            'Without_Helmet': (0, 0, 255)   # Red
        }
        
        # Initialize statistics
        self.reset_stats()
        
        # Store detection history
        self.detection_log = []

    def reset_stats(self):
        """Reset detection statistics"""
        self.stats = {
            'With_Helmet': 0,
            'Without_Helmet': 0
        }

    def log_detection(self, detection_type, confidence, frame_time):
        """Log detection details with timestamp"""
        detection_entry = {
            'timestamp': frame_time,
            'type': detection_type,
            'confidence': confidence
        }
        self.detection_log.append(detection_entry)

    def draw_detection_box(self, frame, box, cls, conf):
        """
        Draw detection box with label and confidence
        Args:
            frame: Input frame
            box: Bounding box coordinates [x1, y1, x2, y2]
            cls: Class name
            conf: Confidence score
        """
        x1, y1, x2, y2 = map(int, box)
        color = self.colors[cls]
        
        # Draw box
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        
        # Create label with class and confidence
        text = f'{cls} {conf:.2f}'
        
        # Get label size
        (text_width, text_height), _ = cv2.getTextSize(
            text, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
        
        # Draw label background
        cv2.rectangle(frame,
                     (x1, y1 - text_height - 10),
                     (x1 + text_width + 10, y1),
                     color,
                     -1)
        
        # Draw label text
        cv2.putText(frame,
                   text,
                   (x1 + 5, y1 - 5),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.6,
                   (255, 255, 255),
                   2)

    def draw_stats(self, frame):
        """
        Draw detection statistics on frame
        Args:
            frame: Input frame
        """
        # Background for stats
        cv2.rectangle(frame, 
                     (10, 10), 
                     (250, 100), 
                     (0, 0, 0), 
                     -1)
        
        # Draw stats text
        y_pos = 40
        for cls, count in self.stats.items():
            color = self.colors[cls]
            text = f'{cls}: {count}'
            cv2.putText(frame, text,
                       (20, y_pos),
                       cv2.FONT_HERSHEY_SIMPLEX,
                       0.8, color, 2)
            y_pos += 30
        
        # Add timestamp
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        cv2.putText(frame, timestamp,
                   (10, frame.shape[0] - 20),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.6, (255, 255, 255), 2)

    def detect(self, frame):
        """
        Process a single frame for helmet detection
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, stats)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, confidence, class_name]
                - stats: Dictionary containing detection statistics
        """
        # Reset statistics for this frame
        self.reset_stats()
        
        # Get current timestamp
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Run detection with increased confidence threshold
        results = self.model(frame, conf=self.conf_threshold)
        
        detections = []
        
        # Process detections
        for r in results:
            boxes = r.boxes
            for box in boxes:
                # Get box coordinates
                x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                
                # Get class and confidence
                cls = self.model.names[int(box.cls[0])]
                conf = float(box.conf[0])
                
                # Add to detections list
                detections.append([x1, y1, x2, y2, conf, cls])
                
                # Update statistics
                self.stats[cls] += 1
                
                # Log detection
                self.log_detection(cls, conf, current_time)
                
                # Draw detection box
                self.draw_detection_box(frame, [x1, y1, x2, y2], cls, conf)
        
        # Draw statistics
        self.draw_stats(frame)
        
        return frame, detections, self.stats.copy()

    def get_detection_log(self):
        """Return the detection log"""
        return self.detection_log

    def clear_detection_log(self):
        """Clear the detection log"""
        self.detection_log = []

    def get_current_stats(self):
        """Return current detection statistics"""
        return self.stats.copy()

# people and animal detection 


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from datetime import datetime

class PeopleAndAnimalsDetector: 
    def __init__(self, model_path, conf_threshold=0.5):
        """
        Initialize the People and Animals Detection System
        Args:
            model_path: Path to YOLO model
            conf_threshold: Confidence threshold for detections
        """
        # Initialize YOLO model
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        
        # Define class IDs (COCO dataset)
        self.person_class_id = 0  # Person class
        self.animal_class_ids = [16, 17, 18, 19, 20, 21, 22]  # Bird, cat, dog, horse, sheep, cow, elephant
        
        # Define colors for visualization
        self.colors = {
            'person': (0, 255, 0),  # Green for person
            'animal': (255, 0, 0)   # Blue for animals
        }
        
        # Initialize statistics
        self.reset_stats()
        
        # Store detection history
        self.detection_log = []

    def reset_stats(self):
        """Reset detection statistics"""
        self.stats = {
            'person_count': 0,
            'animal_count': 0
        }

    def log_detection(self, detection_type, confidence, frame_time):
        """Log detection details with timestamp"""
        detection_entry = {
            'timestamp': frame_time,
            'type': detection_type,
            'confidence': confidence
        }
        self.detection_log.append(detection_entry)

    def draw_detection_box(self, frame, box, label, confidence, is_person):
        """
        Draw detection box with label and confidence
        Args:
            frame: Input frame
            box: Bounding box coordinates [x1, y1, x2, y2]
            label: Detection label
            confidence: Confidence score
            is_person: Boolean indicating if detection is a person
        """
        x1, y1, x2, y2 = map(int, box[:4])
        color = self.colors['person'] if is_person else self.colors['animal']
        
        # Draw bounding box
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        
        # Create label with confidence
        text = f'{label} {confidence:.2f}'
        
        # Draw label background
        (text_width, text_height), _ = cv2.getTextSize(
            text, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)
        cv2.rectangle(frame,
                     (x1, y1 - text_height - 10),
                     (x1 + text_width + 10, y1),
                     color,
                     -1)
        
        # Draw label text
        cv2.putText(frame, text,
                   (x1 + 5, y1 - 5),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.5, (255, 255, 255), 2)

    def draw_stats(self, frame):
        """Draw detection statistics on frame"""
        # Draw the counts on the frame
        cv2.putText(frame, f'Persons Detected: {self.stats["person_count"]}',
                   (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
        cv2.putText(frame, f'Animals Detected: {self.stats["animal_count"]}',
                   (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2)
        
        # Add timestamp
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        cv2.putText(frame, timestamp,
                   (10, frame.shape[0] - 20),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.6, (255, 255, 255), 2)

    def detect(self, frame):
        """
        Process a single frame for people and animals detection
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, stats)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, confidence, class_name]
                - stats: Dictionary containing detection statistics
        """
        # Reset statistics for this frame
        self.reset_stats()
        
        # Get current timestamp
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Perform inference on the frame
        results = self.model(frame, conf=self.conf_threshold)
        
        detections = []
        
        for result in results:
            for box in result.boxes.data:
                x1, y1, x2, y2 = map(int, box[:4])
                conf = float(box[4])
                cls = int(box[5])
                
                # Skip if confidence is below threshold
                if conf < self.conf_threshold:
                    continue
                
                # Process person detections
                if cls == self.person_class_id:
                    self.stats['person_count'] += 1
                    label = 'Person'
                    is_person = True
                    detections.append([x1, y1, x2, y2, conf, label])
                    self.log_detection(label, conf, current_time)
                    self.draw_detection_box(frame, box, label, conf, is_person)
                
                # Process animal detections
                elif cls in self.animal_class_ids:
                    self.stats['animal_count'] += 1
                    label = 'Animal'
                    is_person = False
                    detections.append([x1, y1, x2, y2, conf, label])
                    self.log_detection(label, conf, current_time)
                    self.draw_detection_box(frame, box, label, conf, is_person)
        
        # Draw statistics
        self.draw_stats(frame)
        
        return frame, detections, self.stats.copy()

    def get_detection_log(self):
        """Return the detection log"""
        return self.detection_log

    def clear_detection_log(self):
        """Clear the detection log"""
        self.detection_log = []

    def get_current_stats(self):
        """Return current detection statistics"""
        return self.stats.copy()

# speed of vehicle 

In [None]:
import cv2
import numpy as np
import pandas as pd
from datetime import datetime
import torch
from time import time
from ultralytics.solutions.solutions import BaseSolution
from ultralytics.utils.plotting import Annotator, colors

class SpeedDetector:
    def __init__(self, model_path, real_world_distance=20, conf_threshold=0.5, fps=30):
        """
        Initialize the Speed Detection System
        Args:
            model_path: Path to YOLO model
            real_world_distance: Real-world distance in meters for calibration
            conf_threshold: Confidence threshold for detections
            fps: Frame rate of the video stream
        """
        self.model = BaseSolution(model=model_path, conf=conf_threshold)
        self.real_world_distance = real_world_distance
        self.conf_threshold = conf_threshold
        self.fps = fps
        
        # Initialize tracking data
        self.track_history = {}
        self.speed_data = {}
        self.previous_positions = {}
        self.previous_times = {}
        
        # Region for speed calculation (can be set using set_region method)
        self.region = None
        
        # Initialize detection log
        self.detection_log = []
        
        # CSV configuration
        self.csv_filename = self.get_daily_filename()
        self.create_csv()

    def get_daily_filename(self):
        """Generate a filename based on the current date"""
        current_date = datetime.now().strftime("%Y-%m-%d")
        return f"speed_data_{current_date}.csv"

    def create_csv(self):
        """Create the CSV file with headers if it doesn't exist"""
        header = ["Track ID", "Speed (km/h)", "Class", "Date", "Time"]
        df = pd.DataFrame(columns=header)
        df.to_csv(self.csv_filename, index=False)

    def set_region(self, region_points):
        """
        Set the region for speed calculation
        Args:
            region_points: List of two points defining the measurement region
        """
        self.region = region_points

    def calculate_speed(self, track_id, current_position, current_time):
        """
        Calculate speed of a tracked object
        Args:
            track_id: Object tracking ID
            current_position: Current position (x, y)
            current_time: Current timestamp
        Returns:
            float: Calculated speed in km/h
        """
        if track_id not in self.previous_positions:
            self.previous_positions[track_id] = current_position
            self.previous_times[track_id] = current_time
            return 0

        # Calculate pixel distance
        prev_pos = self.previous_positions[track_id]
        pixel_distance = np.sqrt(
            (current_position[0] - prev_pos[0]) ** 2 +
            (current_position[1] - prev_pos[1]) ** 2
        )

        # Calculate time difference
        time_diff = current_time - self.previous_times[track_id]
        if time_diff <= 0:
            return 0

        # Convert pixel distance to real-world distance
        if self.region is not None:
            region_pixel_distance = np.sqrt(
                (self.region[1][0] - self.region[0][0]) ** 2 +
                (self.region[1][1] - self.region[0][1]) ** 2
            )
            meters_per_pixel = self.real_world_distance / region_pixel_distance
            real_distance = pixel_distance * meters_per_pixel

            # Calculate speed in km/h
            speed = (real_distance / time_diff) * 3.6  # Convert m/s to km/h

            # Apply smoothing if we have previous speed data
            if track_id in self.speed_data:
                speed = 0.7 * self.speed_data[track_id] + 0.3 * speed

            # Cap speed between 0 and 150 km/h
            speed = min(max(speed, 0), 150)
            
            # Update previous position and time
            self.previous_positions[track_id] = current_position
            self.previous_times[track_id] = current_time
            self.speed_data[track_id] = speed

            return speed
        return 0

    def log_detection(self, track_id, speed, class_name):
        """Log speed detection to CSV file"""
        current_time = datetime.now()
        data = {
            "Track ID": track_id,
            "Speed (km/h)": int(round(speed)),
            "Class": class_name,
            "Date": current_time.date(),
            "Time": current_time.strftime("%H:%M:%S")
        }
        df = pd.DataFrame([data])
        df.to_csv(self.csv_filename, mode='a', header=False, index=False)
        
        # Add to detection log
        self.detection_log.append(data)

    def draw_detection_box(self, frame, box, track_id, speed, class_name):
        """Draw detection box with speed information"""
        x1, y1, x2, y2 = map(int, box)
        
        # Draw box
        cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
        
        # Draw speed label
        speed_text = f"ID:{track_id} {class_name} {int(speed)}km/h"
        cv2.putText(frame, speed_text, (x1, y1 - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 255, 0), 2)

    def detect(self, frame):
        """
        Process a single frame for speed detection
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, speeds)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, speed, class_name]
                - speeds: Dictionary mapping track_ids to speeds
        """
        if self.region is None:
            raise ValueError("Region not set. Call set_region() first.")

        current_time = time()
        results = self.model.predict(frame, conf=self.conf_threshold, verbose=False)
        detections = []

        # Draw region line
        cv2.line(frame, 
                 (int(self.region[0][0]), int(self.region[0][1])),
                 (int(self.region[1][0]), int(self.region[1][1])),
                 (0, 255, 0), 2)

        if results[0].boxes is not None and results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            classes = results[0].boxes.cls.cpu().numpy()

            for box, track_id, cls in zip(boxes, track_ids, classes):
                # Calculate center point
                center_x = (box[0] + box[2]) / 2
                center_y = (box[1] + box[3]) / 2
                
                # Calculate speed
                speed = self.calculate_speed(track_id, (center_x, center_y), current_time)
                
                # Get class name
                class_name = self.model.model.names[int(cls)]
                
                # Add to detections list
                detections.append([box[0], box[1], box[2], box[3], speed, class_name])
                
                # Draw detection box
                self.draw_detection_box(frame, box, track_id, speed, class_name)
                
                # Log detection if speed is significant
                if speed > 1:  # Only log if speed is greater than 1 km/h
                    self.log_detection(track_id, speed, class_name)

        # Add timestamp to frame
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        cv2.putText(frame, timestamp, (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

        return frame, detections, self.speed_data.copy()

    def get_detection_log(self):
        """Return the detection log"""
        return self.detection_log.copy()

    def clear_detection_log(self):
        """Clear the detection log"""
        self.detection_log = []

    def get_current_speeds(self):
        """Return current speed data"""
        return self.speed_data.copy()

# triple riding 


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import time
from datetime import datetime

class TripleRidingDetector:
    def __init__(self, model_path, conf_threshold=0.5, iou_threshold=0.3):
        """
        Initialize the Triple Riding Detection System
        Args:
            model_path: Path to YOLO model
            conf_threshold: Confidence threshold for detections
            iou_threshold: IOU threshold for person-motorcycle overlap
        """
        # Initialize YOLOv8 model
        self.model = YOLO(model_path)
        
        # Constants
        self.MOTORCYCLE_CLASS = 3  # COCO class for motorcycle
        self.PERSON_CLASS = 0      # COCO class for person
        self.conf_threshold = conf_threshold
        self.iou_threshold = iou_threshold
        
        # Tracking history
        self.track_history = defaultdict(list)
        self.motorcycle_riders = defaultdict(int)
        self.detection_persistence = 5  # frames
        
        # Detection logging
        self.detection_log = []

    def calculate_overlap(self, box1, box2):
        """
        Calculate IoU between two bounding boxes
        Args:
            box1: First bounding box [x1, y1, x2, y2]
            box2: Second bounding box [x1, y1, x2, y2]
        Returns:
            float: IoU value
        """
        x1 = max(box1[0], box2[0])
        y1 = max(box1[1], box2[1])
        x2 = min(box1[2], box2[2])
        y2 = min(box1[3], box2[3])
        
        intersection = max(0, x2 - x1) * max(0, y2 - y1)
        area1 = (box1[2] - box1[0]) * (box1[3] - box1[1])
        area2 = (box2[2] - box2[0]) * (box2[3] - box2[1])
        
        return intersection / min(area1, area2)
    
    def expand_box(self, box, frame_shape, expand_ratio=1.3):
        """
        Expand bounding box to better capture riders
        Args:
            box: Original bounding box [x1, y1, x2, y2]
            frame_shape: Shape of the frame (height, width)
            expand_ratio: Ratio to expand the box
        Returns:
            list: Expanded bounding box
        """
        height, width = frame_shape[:2]
        center_x = (box[0] + box[2]) / 2
        center_y = (box[1] + box[3]) / 2
        box_width = (box[2] - box[0]) * expand_ratio
        box_height = (box[3] - box[1]) * expand_ratio
        
        x1 = max(0, center_x - box_width/2)
        y1 = max(0, center_y - box_height/2)
        x2 = min(width, center_x + box_width/2)
        y2 = min(height, center_y + box_height/2)
        
        return [x1, y1, x2, y2]

    def log_detection(self, motorcycle_id, rider_count, confidence, frame_time):
        """Log detection details with timestamp"""
        detection_entry = {
            'timestamp': frame_time,
            'motorcycle_id': motorcycle_id,
            'rider_count': rider_count,
            'confidence': confidence,
            'violation': rider_count >= 3
        }
        self.detection_log.append(detection_entry)

    def draw_detection_box(self, frame, box, text, is_violation):
        """Draw detection box with label"""
        x1, y1, x2, y2 = map(int, box)
        color = (0, 0, 255) if is_violation else (0, 255, 0)
        
        # Draw box
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        
        # Draw label background
        text_size = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.9, 2)[0]
        cv2.rectangle(frame,
                     (x1, y1 - text_size[1] - 10),
                     (x1 + text_size[0], y1),
                     color, -1)
        
        # Draw label text
        cv2.putText(frame, text,
                   (x1, y1 - 10),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.9, (255, 255, 255), 2)

    def detect(self, frame):
        """
        Process a single frame for triple riding detection
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, stats)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, rider_count, is_violation]
                - stats: Dictionary containing detection statistics
        """
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Run YOLOv8 tracking
        results = self.model.track(frame, persist=True, conf=self.conf_threshold, 
                                 classes=[self.MOTORCYCLE_CLASS, self.PERSON_CLASS])
        
        if results[0].boxes.id is None:
            return frame, [], {'total_motorcycles': 0, 'triple_riding': 0}
        
        # Extract detections
        boxes = results[0].boxes.xyxy.cpu().numpy()
        classes = results[0].boxes.cls.cpu().numpy()
        track_ids = results[0].boxes.id.cpu().numpy().astype(int)
        confidences = results[0].boxes.conf.cpu().numpy()
        
        # Separate motorcycles and persons
        motorcycles = []
        motorcycle_ids = []
        persons = []
        detections = []
        stats = {'total_motorcycles': 0, 'triple_riding': 0}
        
        for box, cls, track_id, conf in zip(boxes, classes, track_ids, confidences):
            if conf < self.conf_threshold:
                continue
                
            if cls == self.MOTORCYCLE_CLASS:
                motorcycles.append(box)
                motorcycle_ids.append(track_id)
                stats['total_motorcycles'] += 1
            elif cls == self.PERSON_CLASS:
                persons.append(box)
        
        # Process each motorcycle
        for motorcycle_box, motorcycle_id in zip(motorcycles, motorcycle_ids):
            expanded_box = self.expand_box(motorcycle_box, frame.shape)
            
            # Count riders
            rider_count = 0
            for person_box in persons:
                overlap = self.calculate_overlap(expanded_box, person_box)
                if overlap > self.iou_threshold:
                    rider_count += 1
            
            # Update tracking history
            self.track_history[motorcycle_id].append(rider_count)
            if len(self.track_history[motorcycle_id]) > self.detection_persistence:
                self.track_history[motorcycle_id].pop(0)
            
            # Calculate smooth rider count
            smooth_count = max(1, round(np.mean(self.track_history[motorcycle_id])))
            self.motorcycle_riders[motorcycle_id] = smooth_count
            
            # Check for violation
            is_violation = smooth_count >= 3
            if is_violation:
                stats['triple_riding'] += 1
            
            # Add to detections list
            detections.append([
                motorcycle_box[0], motorcycle_box[1], 
                motorcycle_box[2], motorcycle_box[3],
                smooth_count, is_violation
            ])
            
            # Log detection
            self.log_detection(motorcycle_id, smooth_count, 
                             confidences[motorcycle_ids.index(motorcycle_id)],
                             current_time)
            
            # Draw detection
            text = "TRIPLE RIDING DETECTED!" if is_violation else f"Riders: {smooth_count}"
            self.draw_detection_box(frame, motorcycle_box, text, is_violation)
        
        # Add timestamp to frame
        cv2.putText(frame, current_time, (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        # Clean up old tracks
        for track_id in list(self.track_history.keys()):
            if track_id not in motorcycle_ids:
                del self.track_history[track_id]
                if track_id in self.motorcycle_riders:
                    del self.motorcycle_riders[track_id]
        
        return frame, detections, stats

    def get_detection_log(self):
        """Return the detection log"""
        return self.detection_log.copy()

    def clear_detection_log(self):
        """Clear the detection log"""
        self.detection_log = []

    def get_current_stats(self):
        """Return current statistics about motorcycle riders"""
        return {
            'total_motorcycles': len(self.motorcycle_riders),
            'triple_riding': sum(1 for count in self.motorcycle_riders.values() if count >= 3)
        }

# vehicle monitering 


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
import cvzone
import time
from datetime import datetime

class VehicleMonitor:
    def __init__(self, model_path, conf_threshold=0.5):
        """
        Initialize the Vehicle Monitoring System
        Args:
            model_path: Path to YOLO model
            conf_threshold: Confidence threshold for detections
        """
        # Initialize YOLO model
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        
        # Initialize tracking
        self.tracker = SimpleTracker()
        
        # Define target classes
        self.class_names_goal = ['car', 'motorcycle', 'bus', 'truck']
        self.class_colors = {
            'car': (0, 255, 0),
            'motorcycle': (255, 0, 0),
            'bus': (0, 0, 255),
            'truck': (255, 255, 0)
        }
        
        # Initialize counters
        self.reset_counters()
        
        # Store detection history
        self.detection_log = []
        
        # Line for counting (can be set using set_counting_line)
        self.line_y = None

    def reset_counters(self):
        """Reset all vehicle counters"""
        self.vehicle_count = {
            'left': {'car': 0, 'motorcycle': 0, 'bus': 0, 'truck': 0},
            'right': {'car': 0, 'motorcycle': 0, 'bus': 0, 'truck': 0}
        }
        self.counted_vehicle_ids = {'left': set(), 'right': set()}

    def set_counting_line(self, y_position):
        """Set the y-position of the counting line"""
        self.line_y = y_position

    def log_detection(self, vehicle_type, direction, frame_time):
        """Log detection details with timestamp"""
        detection_entry = {
            'timestamp': frame_time,
            'vehicle_type': vehicle_type,
            'direction': direction
        }
        self.detection_log.append(detection_entry)

    def draw_detection_box(self, frame, box, obj_id, cls):
        """Draw detection box with label"""
        x1, y1, x2, y2 = map(int, box)
        color = self.class_colors.get(cls, (255, 255, 255))
        
        # Draw corner rectangle
        cvzone.cornerRect(frame, (x1, y1, x2-x1, y2-y1), 
                         l=9, rt=2, colorR=color)
        
        # Create label
        text = f'ID: {obj_id} {cls}'
        font_scale = 0.6
        thickness = 2
        font = cv2.FONT_HERSHEY_SIMPLEX
        
        # Get label size
        (text_width, text_height), _ = cv2.getTextSize(
            text, font, font_scale, thickness)
        
        # Draw label background
        cv2.rectangle(frame,
                     (x1, y1 - text_height - 10),
                     (x1 + text_width + 10, y1),
                     color, -1)
        
        # Draw label text
        cv2.putText(frame, text,
                   (x1 + 5, y1 - 5),
                   font, font_scale,
                   (255, 255, 255),
                   thickness)

    def draw_stats(self, frame):
        """Draw vehicle statistics on frame"""
        # Draw left lane stats
        cvzone.putTextRect(frame,
                          f"Left Lane: {sum(self.vehicle_count['left'].values())}",
                          (20, 40),
                          scale=2,
                          thickness=3,
                          offset=10,
                          colorR=(52, 73, 94))
        
        # Draw right lane stats
        cvzone.putTextRect(frame,
                          f"Right Lane: {sum(self.vehicle_count['right'].values())}",
                          (frame.shape[1] - 280, 40),
                          scale=2,
                          thickness=3,
                          offset=10,
                          colorR=(52, 73, 94))

    def detect(self, frame):
        """
        Process a single frame for vehicle monitoring
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, stats)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, vehicle_type, direction]
                - stats: Dictionary containing vehicle counts
        """
        if self.line_y is None:
            self.line_y = frame.shape[0] // 2
        
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        detections = []
        
        # Draw counting line
        cv2.line(frame, (0, self.line_y), 
                (frame.shape[1], self.line_y),
                (255, 255, 255), 1)
        
        # Run detection
        results = self.model(frame, conf=self.conf_threshold)
        
        if results[0].boxes.data is not None:
            tracked_objects = self.tracker.update(results[0].boxes.data.cpu().numpy())
            
            for obj in tracked_objects:
                x1, y1, x2, y2 = map(int, obj[:4])
                obj_id = int(obj[4])
                cls_id = int(obj[5])
                
                # Skip if not a target class
                if cls_id >= len(self.class_names_goal):
                    continue
                
                vehicle_type = self.class_names_goal[cls_id]
                center_x = (x1 + x2) // 2
                center_y = (y1 + y2) // 2
                
                # Determine direction and count vehicles
                if self.line_y - 10 < center_y < self.line_y + 10:
                    direction = 'left' if center_x < frame.shape[1] // 2 else 'right'
                    
                    if obj_id not in self.counted_vehicle_ids[direction]:
                        self.counted_vehicle_ids[direction].add(obj_id)
                        self.vehicle_count[direction][vehicle_type] += 1
                        self.log_detection(vehicle_type, direction, current_time)
                
                # Add to detections list
                detections.append([x1, y1, x2, y2, vehicle_type, 
                                'left' if center_x < frame.shape[1] // 2 else 'right'])
                
                # Draw detection box
                self.draw_detection_box(frame, [x1, y1, x2, y2], obj_id, vehicle_type)
        
        # Draw statistics
        self.draw_stats(frame)
        
        # Add timestamp
        cv2.putText(frame, current_time, 
                   (10, frame.shape[0] - 20),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.6, (255, 255, 255), 2)
        
        return frame, detections, self.vehicle_count.copy()

    def get_detection_log(self):
        """Return the detection log"""
        return self.detection_log.copy()

    def clear_detection_log(self):
        """Clear the detection log"""
        self.detection_log = []

    def get_current_stats(self):
        """Return current vehicle statistics"""
        return self.vehicle_count.copy()


class SimpleTracker:
    """Simple tracking class for vehicle monitoring"""
    def __init__(self, max_age=20):
        self.next_id = 1
        self.tracked_objects = {}
        self.max_age = max_age

    def update(self, detections):
        """Update tracked objects with new detections"""
        new_tracked_objects = {}
        
        for det in detections:
            x1, y1, x2, y2 = map(int, det[:4])
            conf = float(det[4])
            cls = int(det[5])
            center = ((x1 + x2) // 2, (y1 + y2) // 2)
            
            best_match = None
            min_distance = float('inf')
            
            for obj_id, obj in self.tracked_objects.items():
                dist = np.linalg.norm(np.array(center) - np.array(obj['center']))
                if dist < min_distance:
                    min_distance = dist
                    best_match = obj_id
            
            if best_match is not None and min_distance < 50:
                new_tracked_objects[best_match] = {
                    'bbox': (x1, y1, x2, y2),
                    'center': center,
                    'class': cls,
                    'age': 0
                }
            else:
                new_tracked_objects[self.next_id] = {
                    'bbox': (x1, y1, x2, y2),
                    'center': center,
                    'class': cls,
                    'age': 0
                }
                self.next_id += 1
        
        # Update age of existing objects
        for obj_id in self.tracked_objects:
            if obj_id not in new_tracked_objects:
                self.tracked_objects[obj_id]['age'] += 1
                if self.tracked_objects[obj_id]['age'] < self.max_age:
                    new_tracked_objects[obj_id] = self.tracked_objects[obj_id]
        
        self.tracked_objects = new_tracked_objects
        
        # Convert to detection format
        return np.array([[obj['bbox'][0], obj['bbox'][1], obj['bbox'][2], obj['bbox'][3], 
                         obj_id, obj['class']] 
                        for obj_id, obj in self.tracked_objects.items()])
    

# wrong lane detection 


In [None]:
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import math
from datetime import datetime

class WrongLaneDetector:
    def __init__(self, model_path, conf_threshold=0.5):
        """
        Initialize the Wrong Lane Detection System
        Args:
            model_path: Path to YOLO model
            conf_threshold: Confidence threshold for detections
        """
        # Initialize YOLOv8 model
        self.model = YOLO(model_path)
        self.conf_threshold = conf_threshold
        
        # Dictionary to store tracking history
        self.track_history = defaultdict(lambda: [])
        
        # Dictionary to store direction status
        self.direction_status = {}  # Will store 'correct' or 'wrong'
        
        # Number of points to consider for direction
        self.direction_points = 10
        
        # Detection logging
        self.detection_log = []
        
        # Statistics
        self.reset_stats()

    def reset_stats(self):
        """Reset detection statistics"""
        self.stats = {
            'total_vehicles': 0,
            'wrong_direction': 0,
            'correct_direction': 0
        }

    def get_centroid(self, box):
        """
        Calculate centroid from bbox coordinates
        Args:
            box: Bounding box coordinates [x1, y1, x2, y2]
        Returns:
            tuple: (center_x, center_y)
        """
        x1, y1, x2, y2 = box
        return (int((x1 + x2) / 2), int((y1 + y2) / 2))
    
    def determine_direction(self, points):
        """
        Determine if vehicle is going in correct direction (top to bottom)
        Args:
            points: List of tracking points
        Returns:
            str: 'wrong' or 'correct' or None
        """
        if len(points) < self.direction_points:
            return None
            
        # Take last n points
        recent_points = points[-self.direction_points:]
        
        # Calculate overall y-direction
        start_y = recent_points[0][1]
        end_y = recent_points[-1][1]
        
        # If y is increasing (going down), it's wrong direction
        return 'wrong' if end_y > start_y else 'correct'

    def log_detection(self, track_id, direction, frame_time):
        """Log detection details with timestamp"""
        detection_entry = {
            'timestamp': frame_time,
            'track_id': track_id,
            'direction': direction,
            'is_violation': direction == 'wrong'
        }
        self.detection_log.append(detection_entry)

    def draw_tracking_line(self, frame, points, color):
        """Draw tracking line for vehicle"""
        for i in range(1, len(points)):
            cv2.line(frame, points[i - 1], points[i], color, 2)

    def draw_detection_box(self, frame, box, track_id, direction):
        """Draw detection box with label"""
        x1, y1, x2, y2 = map(int, box)
        color = (0, 0, 255) if direction == 'wrong' else (0, 255, 0)
        
        # Draw box
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        
        # Add label
        label = f"ID: {track_id} ({direction})"
        cv2.putText(frame, label, (x1, y1 - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)

    def detect(self, frame):
        """
        Process a single frame for wrong lane detection
        Args:
            frame: Input frame (numpy array)
        Returns:
            tuple: (processed_frame, detections, stats)
                - processed_frame: Frame with annotations
                - detections: List of detections with format [x1, y1, x2, y2, direction, is_wrong]
                - stats: Dictionary containing detection statistics
        """
        current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        
        # Reset statistics for this frame
        self.reset_stats()
        
        # Run YOLOv8 tracking
        results = self.model.track(frame, persist=True, conf=self.conf_threshold,
                                 classes=[2, 3, 5, 7])  # car, motorcycle, bus, truck
        
        detections = []
        
        if results[0].boxes.id is not None:
            boxes = results[0].boxes.xyxy.cpu().numpy()
            track_ids = results[0].boxes.id.cpu().numpy().astype(int)
            
            # Process each detection
            for box, track_id in zip(boxes, track_ids):
                # Get centroid
                centroid = self.get_centroid(box)
                
                # Add centroid to track history
                self.track_history[track_id].append(centroid)
                
                # Determine direction if enough points are collected
                direction = self.determine_direction(self.track_history[track_id])
                if direction:
                    self.direction_status[track_id] = direction
                    
                    # Update statistics
                    self.stats['total_vehicles'] += 1
                    if direction == 'wrong':
                        self.stats['wrong_direction'] += 1
                    else:
                        self.stats['correct_direction'] += 1
                    
                    # Log detection
                    self.log_detection(track_id, direction, current_time)
                
                # Add to detections list
                is_wrong = self.direction_status.get(track_id) == 'wrong'
                detections.append([
                    box[0], box[1], box[2], box[3],
                    self.direction_status.get(track_id, 'unknown'),
                    is_wrong
                ])
                
                # Draw detection box
                self.draw_detection_box(frame, box, track_id,
                                     self.direction_status.get(track_id, 'unknown'))
                
                # Draw tracking line
                points = self.track_history[track_id]
                color = (0, 0, 255) if is_wrong else (0, 255, 0)
                self.draw_tracking_line(frame, points, color)
            
            # Clean up old tracks
            track_ids_set = set(track_ids)
            for track_id in list(self.track_history.keys()):
                if track_id not in track_ids_set:
                    del self.track_history[track_id]
                    if track_id in self.direction_status:
                        del self.direction_status[track_id]
        
        # Add timestamp to frame
        cv2.putText(frame, current_time, (10, 30),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
        
        return frame, detections, self.stats.copy()

    def get_detection_log(self):
        """Return the detection log"""
        return self.detection_log.copy()

    def clear_detection_log(self):
        """Clear the detection log"""
        self.detection_log = []

    def get_current_stats(self):
        """Return current detection statistics"""
        return self.stats.copy()

# updated main backend code

In [None]:
from flask import Flask, jsonify, request, Response
from flask_socketio import SocketIO
from flask_cors import CORS
import cv2
import numpy as np
import threading
import queue
import time
import json
from datetime import datetime

# Import all detector classes
from accident_detection import AccidentDetectionSystem
from helmet_detection import HelmetDetector
from people_and_animals import PeopleAndAnimalsDetector
from speed import SpeedDetector
from triple_riding import TripleRidingDetector
from vehicle_monitoring import VehicleMonitor
from wrong_lane_only_for_one_side_road import WrongLaneDetector

app = Flask(__name__)
CORS(app)
socketio = SocketIO(app, cors_allowed_origins="*")

class TrafficMonitoringSystem:
    def __init__(self):
        """Initialize the Traffic Monitoring System"""
        self.models = {}
        self.stream_queues = {}
        self.processing_threads = {}
        self.statistics = {
            'accidents': 0,
            'helmet_violations': 0,
            'wrong_lane': 0,
            'triple_riding': 0,
            'speed_violations': 0,
            'vehicle_count': {
                'left': 0,
                'right': 0
            }
        }
        
        # Initialize all models
        self.initialize_models()

    def initialize_models(self):
        """Initialize all detection models"""
        try:
            # Initialize Accident Detection
            self.models['accident'] = AccidentDetectionSystem(
                model_path="models/accident_detection.pt",
                conf_threshold=0.5,
                iou_threshold=0.45
            )

            # Initialize Helmet Detection
            self.models['helmet'] = HelmetDetector(
                model_path="models/helmet_detection.pt",
                conf_threshold=0.5
            )

            # Initialize People and Animals Detection
            self.models['people_animals'] = PeopleAndAnimalsDetector(
                model_path="models/people_animals_detection.pt",
                conf_threshold=0.5
            )

            # Initialize Speed Detection
            self.models['speed'] = SpeedDetector(
                model_path="models/speed_detection.pt",
                real_world_distance=20,
                conf_threshold=0.5
            )

            # Initialize Triple Riding Detection
            self.models['triple_riding'] = TripleRidingDetector(
                model_path="models/triple_riding_detection.pt",
                conf_threshold=0.5
            )

            # Initialize Vehicle Monitoring
            self.models['vehicle'] = VehicleMonitor(
                model_path="models/vehicle_monitoring.pt",
                conf_threshold=0.5
            )

            # Initialize Wrong Lane Detection
            self.models['wrong_lane'] = WrongLaneDetector(
                model_path="models/wrong_lane_detection.pt",
                conf_threshold=0.5
            )

            print("All models initialized successfully")
            
        except Exception as e:
            print(f"Error initializing models: {str(e)}")
            raise

    def process_frame(self, frame, camera_id):
        """
        Process a single frame through all models
        Args:
            frame: Input frame
            camera_id: Camera identifier
        Returns:
            tuple: (processed_frame, results)
        """
        results = {
            'frame_time': datetime.now().isoformat(),
            'detections': {}
        }
        
        try:
            # Process through each model
            if 'accident' in self.models:
                processed_frame, detections, is_accident = self.models['accident'].detect(frame)
                results['detections']['accidents'] = {
                    'detections': detections,
                    'is_accident': is_accident
                }
                frame = processed_frame
            
            if 'helmet' in self.models:
                processed_frame, detections, stats = self.models['helmet'].detect(frame)
                results['detections']['helmet'] = {
                    'detections': detections,
                    'stats': stats
                }
                frame = processed_frame
            
            if 'speed' in self.models:
                processed_frame, detections, speeds = self.models['speed'].detect(frame)
                results['detections']['speed'] = {
                    'detections': detections,
                    'speeds': speeds
                }
                frame = processed_frame
            
            if 'triple_riding' in self.models:
                processed_frame, detections, stats = self.models['triple_riding'].detect(frame)
                results['detections']['triple_riding'] = {
                    'detections': detections,
                    'stats': stats
                }
                frame = processed_frame
            
            if 'vehicle' in self.models:
                processed_frame, detections, stats = self.models['vehicle'].detect(frame)
                results['detections']['vehicle'] = {
                    'detections': detections,
                    'stats': stats
                }
                frame = processed_frame
            
            if 'wrong_lane' in self.models:
                processed_frame, detections, stats = self.models['wrong_lane'].detect(frame)
                results['detections']['wrong_lane'] = {
                    'detections': detections,
                    'stats': stats
                }
                frame = processed_frame

            self._update_statistics(results)
            
            return frame, results
            
        except Exception as e:
            print(f"Error processing frame: {str(e)}")
            return frame, results

    def _update_statistics(self, results):
        """Update violation statistics based on detection results"""
        try:
            if 'accidents' in results['detections']:
                if results['detections']['accidents']['is_accident']:
                    self.statistics['accidents'] += 1
            
            if 'helmet' in results['detections']:
                stats = results['detections']['helmet']['stats']
                self.statistics['helmet_violations'] += stats.get('Without_Helmet', 0)
            
            if 'wrong_lane' in results['detections']:
                stats = results['detections']['wrong_lane']['stats']
                self.statistics['wrong_lane'] += stats.get('wrong_direction', 0)
            
            if 'triple_riding' in results['detections']:
                stats = results['detections']['triple_riding']['stats']
                self.statistics['triple_riding'] += stats.get('triple_riding', 0)
            
            if 'vehicle' in results['detections']:
                stats = results['detections']['vehicle']['stats']
                self.statistics['vehicle_count']['left'] = stats.get('left', {}).get('total', 0)
                self.statistics['vehicle_count']['right'] = stats.get('right', {}).get('total', 0)
            
        except Exception as e:
            print(f"Error updating statistics: {str(e)}")

    def start_stream_processing(self, camera_id, rtsp_url):
        """Start processing an RTSP stream"""
        if camera_id in self.processing_threads:
            return False
        
        # Create a queue for this camera
        self.stream_queues[camera_id] = queue.Queue(maxsize=10)
        
        # Start processing thread
        thread = threading.Thread(
            target=self._process_stream,
            args=(camera_id, rtsp_url)
        )
        thread.daemon = True
        thread.start()
        
        self.processing_threads[camera_id] = thread
        return True

    def _process_stream(self, camera_id, rtsp_url):
        """Process RTSP stream in a separate thread"""
        cap = cv2.VideoCapture(rtsp_url)
        
        if not cap.isOpened():
            print(f"Failed to open stream for camera {camera_id}")
            return
        
        while True:
            ret, frame = cap.read()
            if not ret:
                print(f"Failed to read from camera {camera_id}")
                time.sleep(1)
                continue
            
            # Process frame through all models
            processed_frame, results = self.process_frame(frame, camera_id)
            
            # Convert frame to JPEG
            _, jpeg = cv2.imencode('.jpg', processed_frame)
            frame_bytes = jpeg.tobytes()
            
            # Emit results through Socket.IO
            socketio.emit(f'detection_results_{camera_id}', {
                'frame': frame_bytes,
                'results': results
            })
            
            # Control processing rate
            time.sleep(0.033)  # ~30 FPS

# Initialize the system
traffic_system = TrafficMonitoringSystem()

@app.route('/api/start_stream', methods=['POST'])
def start_stream():
    """Start processing a new RTSP stream"""
    data = request.json
    camera_id = data.get('camera_id')
    rtsp_url = data.get('rtsp_url')
    
    if not camera_id or not rtsp_url:
        return jsonify({'error': 'Missing required parameters'}), 400
    
    success = traffic_system.start_stream_processing(camera_id, rtsp_url)
    
    if success:
        return jsonify({'message': 'Stream processing started'})
    else:
        return jsonify({'error': 'Stream already being processed'}), 400

@app.route('/api/statistics', methods=['GET'])
def get_statistics():
    """Get current violation statistics"""
    return jsonify(traffic_system.statistics)

@socketio.on('connect')
def handle_connect():
    print('Client connected')

@socketio.on('disconnect')
def handle_disconnect():
    print('Client disconnected')

if __name__ == '__main__':
    socketio.run(app, host='0.0.0.0', port=5000)

# config.py


In [None]:
import os

class Config:
    # Server settings
    HOST = '0.0.0.0'
    PORT = 5000
    DEBUG = False
    
    # Base paths
    BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
    MODELS_DIR = os.path.join(BASE_DIR, 'models')
    
    # Video settings
    FRAME_WIDTH = 1280
    FRAME_HEIGHT = 720
    FPS = 30
    
    # Detection settings
    CONFIDENCE_THRESHOLD = 0.5
    NMS_THRESHOLD = 0.4
    
    # Model paths
    MODEL_PATHS = {
        'accident': os.path.join(MODELS_DIR, 'accident_detection.pt'),
        'helmet': os.path.join(MODELS_DIR, 'helmet_detection.pt'),
        'people_animals': os.path.join(MODELS_DIR, 'people_animals_detection.pt'),
        'speed': os.path.join(MODELS_DIR, 'speed_detection.pt'),
        'triple_riding': os.path.join(MODELS_DIR, 'triple_riding_detection.pt'),
        'wrong_lane': os.path.join(MODELS_DIR, 'wrong_lane_detection.pt'),
        'vehicle': os.path.join(MODELS_DIR, 'vehicle_monitoring.pt')
    }
    
    # Database settings
    DB_HOST = 'localhost'
    DB_PORT = 27017
    DB_NAME = 'traffic_monitoring'
    
    # Alert settings
    ALERT_TIMEOUT = 30  # seconds
    
    # Detection class settings
    VEHICLE_CLASSES = ['car', 'motorcycle', 'bus', 'truck']
    PERSON_CLASS_ID = 0
    MOTORCYCLE_CLASS_ID = 3

# utils.py 


In [None]:
import cv2
import numpy as np
from typing import Tuple, List, Dict
import logging
from datetime import datetime
import os

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def init_video_writer(output_path: str, frame_width: int, frame_height: int, fps: int) -> cv2.VideoWriter:
    """Initialize video writer for saving processed frames"""
    os.makedirs(os.path.dirname(output_path), exist_ok=True)
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    return cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))

def draw_detections(frame: np.ndarray, detections: List[Dict], color: Tuple[int, int, int] = (0, 255, 0)) -> np.ndarray:
    """Draw bounding boxes and labels on frame"""
    frame_copy = frame.copy()
    
    for detection in detections:
        x1, y1, x2, y2 = detection['bbox']
        label = detection['class']
        confidence = detection['confidence']
        
        # Draw bounding box
        cv2.rectangle(frame_copy, (x1, y1), (x2, y2), color, 2)
        
        # Draw label
        label_text = f'{label}: {confidence:.2f}'
        cv2.putText(frame_copy, label_text, (x1, y1 - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
    
    return frame_copy

def calculate_iou(box1: List[int], box2: List[int]) -> float:
    """Calculate IoU between two bounding boxes"""
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    intersection = max(0, x2 - x1) * max(0, y2 - y1)
    
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    
    union = box1_area + box2_area - intersection
    
    return intersection / union if union > 0 else 0

def save_violation_record(output_dir: str, violation_type: str, frame: np.ndarray, 
                         timestamp: datetime, location: str) -> Dict:
    """Save violation record and image"""
    # Create violations directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Generate unique filename
    filename = f"{violation_type}_{timestamp.strftime('%Y%m%d_%H%M%S')}.jpg"
    filepath = os.path.join(output_dir, filename)
    
    # Save frame to disk
    cv2.imwrite(filepath, frame)
    
    # Create violation record
    record = {
        'type': violation_type,
        'timestamp': timestamp,
        'location': location,
        'image_path': filepath
    }
    
    logger.info(f"Violation recorded: {violation_type} at {timestamp}")
    return record

def generate_alert(violation_type: str, detection: Dict, frame: np.ndarray) -> Dict:
    """Generate alert for detected violation"""
    alert = {
        'type': violation_type,
        'timestamp': datetime.now(),
        'confidence': detection['confidence'],
        'location': detection['bbox'],
        'frame': frame
    }
    
    logger.warning(f"Alert generated: {violation_type}")
    return alert

def get_centroid(box: List[float]) -> Tuple[int, int]:
    """Calculate centroid of a bounding box"""
    return (int((box[0] + box[2]) / 2), int((box[1] + box[3]) / 2))

def resize_frame(frame: np.ndarray, width: int = None, height: int = None) -> np.ndarray:
    """Resize frame maintaining aspect ratio"""
    if width is None and height is None:
        return frame
        
    h, w = frame.shape[:2]
    if width is None:
        aspect = height / h
        width = int(w * aspect)
    elif height is None:
        aspect = width / w
        height = int(h * aspect)
        
    return cv2.resize(frame, (width, height))

def add_timestamp(frame: np.ndarray, position: Tuple[int, int] = (10, 30)) -> np.ndarray:
    """Add timestamp to frame"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    cv2.putText(frame, timestamp, position,
                cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

# base detector.py 


In [None]:
import cv2
import numpy as np
from typing import Dict, List, Tuple, Any
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from datetime import datetime
from abc import ABC, abstractmethod

class BaseDetector(ABC):
    def __init__(self, model_path: str, conf_threshold: float = 0.5):
        """
        Initialize base detector
        Args:
            model_path: Path to model file
            conf_threshold: Confidence threshold for detections
        """
        self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
        self.conf_threshold = conf_threshold
        self.detection_log = []
        
        # Standard transform for YOLO models
        self.transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((640, 640)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def preprocess_frame(self, frame: np.ndarray) -> torch.Tensor:
        """Convert frame to tensor and normalize"""
        frame_tensor = self.transform(frame).unsqueeze(0)
        return frame_tensor.to(self.device)

    @abstractmethod
    def detect(self, frame: np.ndarray) -> Tuple[np.ndarray, List[Any], Dict]:
        """
        Process a single frame
        Args:
            frame: Input frame
        Returns:
            tuple: (processed_frame, detections, stats)
        """
        pass

    def log_detection(self, detection_type: str, confidence: float, frame_time: str):
        """Log detection with timestamp"""
        detection_entry = {
            'timestamp': frame_time,
            'type': detection_type,
            'confidence': confidence
        }
        self.detection_log.append(detection_entry)

    def get_detection_log(self) -> List[Dict]:
        """Return copy of detection log"""
        return self.detection_log.copy()

    def clear_detection_log(self):
        """Clear detection log"""
        self.detection_log = []

    def draw_detection_box(self, frame: np.ndarray, box: List[float], 
                         label: str, color: Tuple[int, int, int]):
        """Draw detection box with label"""
        x1, y1, x2, y2 = map(int, box)
        
        # Draw box
        cv2.rectangle(frame, (x1, y1), (x2, y2), color, 2)
        
        # Draw label
        (text_width, text_height), _ = cv2.getTextSize(
            label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)
            
        cv2.rectangle(frame,
                     (x1, y1 - text_height - 10),
                     (x1 + text_width + 10, y1),
                     color, -1)
                     
        cv2.putText(frame, label,
                   (x1 + 5, y1 - 5),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.6, (255, 255, 255), 2)

    def add_timestamp(self, frame: np.ndarray):
        """Add timestamp to frame"""
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        cv2.putText(frame, timestamp,
                   (10, frame.shape[0] - 20),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.6, (255, 255, 255), 2)