In [None]:
#wrong lane full traffic
import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import math
import os

class WrongLaneDetector:
    def __init__(self, model_path="yolov8n.pt"):
        # Initialize YOLOv8 model
        self.model = YOLO(model_path)
        
        # Dictionary to store tracking history
        self.track_history = defaultdict(lambda: [])
        
        # Dictionary to store direction status
        self.direction_status = {}  # Will store 'correct' or 'wrong'
        
        # Number of points to consider for direction
        self.direction_points = 10
        
    def get_centroid(self, box):
        """Calculate centroid from bbox coordinates"""
        x1, y1, x2, y2 = box
        return (int((x1 + x2) / 2), int((y1 + y2) / 2))
    
    def determine_direction(self, points):
        """Determine if vehicle is going in correct direction (top to bottom)"""
        if len(points) < self.direction_points:
            return None
            
        # Take last n points
        recent_points = points[-self.direction_points:]
        
        # Calculate overall y-direction
        start_y = recent_points[0][1]
        end_y = recent_points[-1][1]
        
        # If y is increasing (going down), it's correct direction
        return 'correct' if end_y > start_y else 'wrong'
    
    def process_video(self, video_path, output_path):
        if not os.path.exists(video_path):
            raise FileNotFoundError(f"Input video file not found: {video_path}")
            
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError("Error opening video file")
        
        # Get video properties
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Try different codecs if your system has issues
        try:
            # Try XVID codec first
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            
            if not out.isOpened():
                # If XVID fails, try MP4V
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
                
            if not out.isOpened():
                # If MP4V fails, try X264
                fourcc = cv2.VideoWriter_fourcc(*'X264')
                out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        except Exception as e:
            raise ValueError(f"Error initializing video writer: {str(e)}")
        
        frame_count = 0
        print("Processing video...")
        
        try:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                    
                frame_count += 1
                if frame_count % 30 == 0:  # Print progress every 30 frames
                    print(f"Processing frame {frame_count}/{total_frames}")
                
                # Run YOLOv8 tracking on the frame
                results = self.model.track(frame, persist=True, classes=[2, 3, 5, 7])  # Only detect cars, motorcycles, buses, trucks
                
                if results[0].boxes.id is not None:
                    boxes = results[0].boxes.xyxy.cpu().numpy()
                    track_ids = results[0].boxes.id.cpu().numpy().astype(int)
                    
                    # Process each detection
                    for box, track_id in zip(boxes, track_ids):
                        # Get centroid
                        centroid = self.get_centroid(box)
                        
                        # Add centroid to track history
                        self.track_history[track_id].append(centroid)
                        
                        # Determine direction if enough points are collected
                        direction = self.determine_direction(self.track_history[track_id])
                        if direction:
                            self.direction_status[track_id] = direction
                        
                        # Draw bounding box
                        color = (0, 255, 0) if self.direction_status.get(track_id) == 'correct' else (0, 0, 255)
                        cv2.rectangle(frame, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), color, 2)
                        
                        # Draw tracking line
                        points = self.track_history[track_id]
                        for i in range(1, len(points)):
                            cv2.line(frame, points[i - 1], points[i], color, 2)
                        
                        # Add label
                        label = f"ID: {track_id} ({self.direction_status.get(track_id, 'unknown')})"
                        cv2.putText(frame, label, (int(box[0]), int(box[1] - 10)), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                
                # Write frame to output video
                out.write(frame)
                
                # Optional: Display frame (comment out for faster processing)
                # cv2.imshow('Frame', frame)
                # if cv2.waitKey(1) & 0xFF == ord('q'):
                #     break
                
                # Clean up old tracks
                for track_id in list(self.track_history.keys()):
                    if track_id not in track_ids:
                        del self.track_history[track_id]
                        if track_id in self.direction_status:
                            del self.direction_status[track_id]
        
        except Exception as e:
            print(f"Error during video processing: {str(e)}")
            raise
        
        finally:
            # Release resources
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            
            # Verify output file was created successfully
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"Video processing completed. Output saved to: {output_path}")
            else:
                raise ValueError("Error: Output video file is empty or was not created")

def main():
    try:
        # Initialize detector
        detector = WrongLaneDetector()
        
        # Process video
        input_video = "C:/Users/DELL/Desktop/accident detection/numberplate/vid2.mp4"  # Replace with your video path
        output_video = "C:/Users/DELL/Desktop/accident detection/numberplate/wrong_out2.avi"  # Changed extension to .avi for better compatibility
        
        detector.process_video(input_video, output_video)
        
    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()

In [None]:
#wrong lane correct direction (without ROI)

import cv2
import numpy as np
from ultralytics import YOLO
from collections import defaultdict
import math
import os

class WrongLaneDetector:
    def __init__(self, model_path="yolov8n.pt"):
        # Initialize YOLOv8 model
        self.model = YOLO(model_path)
        
        # Dictionary to store tracking history
        self.track_history = defaultdict(lambda: [])
        
        # Dictionary to store direction status
        self.direction_status = {}  # Will store 'correct' or 'wrong'
        
        # Number of points to consider for direction
        self.direction_points = 10
        
    def get_centroid(self, box):
        """Calculate centroid from bbox coordinates"""
        x1, y1, x2, y2 = box
        return (int((x1 + x2) / 2), int((y1 + y2) / 2))
    
    def determine_direction(self, points):
        """Determine if vehicle is going in correct direction (top to bottom)"""
        if len(points) < self.direction_points:
            return None
            
        # Take last n points
        recent_points = points[-self.direction_points:]
        
        # Calculate overall y-direction
        start_y = recent_points[0][1]
        end_y = recent_points[-1][1]
        
        # If y is increasing (going down), it's correct direction
        return 'wrong' if end_y > start_y else 'correct'
    
    def process_video(self, video_path, output_path):
        if not os.path.exists(video_path):
            raise FileNotFoundError(f"Input video file not found: {video_path}")
            
        cap = cv2.VideoCapture(video_path)
        if not cap.isOpened():
            raise ValueError("Error opening video file")
        
        # Get video properties
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        fps = int(cap.get(cv2.CAP_PROP_FPS))
        total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        
        # Try different codecs if your system has issues
        try:
            # Try XVID codec first
            fourcc = cv2.VideoWriter_fourcc(*'XVID')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
            
            if not out.isOpened():
                # If XVID fails, try MP4V
                fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
                
            if not out.isOpened():
                # If MP4V fails, try X264
                fourcc = cv2.VideoWriter_fourcc(*'X264')
                out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        except Exception as e:
            raise ValueError(f"Error initializing video writer: {str(e)}")
        
        frame_count = 0
        print("Processing video...")
        
        try:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                    
                frame_count += 1
                if frame_count % 30 == 0:  # Print progress every 30 frames
                    print(f"Processing frame {frame_count}/{total_frames}")
                
                # Run YOLOv8 tracking on the frame
                results = self.model.track(frame, persist=True, classes=[2, 3, 5, 7])  # Only detect cars, motorcycles, buses, trucks
                
                if results[0].boxes.id is not None:
                    boxes = results[0].boxes.xyxy.cpu().numpy()
                    track_ids = results[0].boxes.id.cpu().numpy().astype(int)
                    
                    # Process each detection
                    for box, track_id in zip(boxes, track_ids):
                        # Get centroid
                        centroid = self.get_centroid(box)
                        
                        # Add centroid to track history
                        self.track_history[track_id].append(centroid)
                        
                        # Determine direction if enough points are collected
                        direction = self.determine_direction(self.track_history[track_id])
                        if direction:
                            self.direction_status[track_id] = direction
                        
                        # Draw bounding box
                        color = (0, 0, 255) if self.direction_status.get(track_id) == 'wrong' else (0, 255, 0)
                        cv2.rectangle(frame, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), color, 2)
                        
                        # Draw tracking line
                        points = self.track_history[track_id]
                        for i in range(1, len(points)):
                            cv2.line(frame, points[i - 1], points[i], color, 2)
                        
                        # Add label
                        label = f"ID: {track_id} ({self.direction_status.get(track_id, 'unknown')})"
                        cv2.putText(frame, label, (int(box[0]), int(box[1] - 10)), 
                                  cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
                
                # Write frame to output video
                out.write(frame)
                
                # Optional: Display frame (comment out for faster processing)
                # cv2.imshow('Frame', frame)
                # if cv2.waitKey(1) & 0xFF == ord('q'):
                #     break
                
                # Clean up old tracks
                for track_id in list(self.track_history.keys()):
                    if track_id not in track_ids:
                        del self.track_history[track_id]
                        if track_id in self.direction_status:
                            del self.direction_status[track_id]
        
        except Exception as e:
            print(f"Error during video processing: {str(e)}")
            raise
        
        finally:
            # Release resources
            cap.release()
            out.release()
            cv2.destroyAllWindows()
            
            # Verify output file was created successfully
            if os.path.exists(output_path) and os.path.getsize(output_path) > 0:
                print(f"Video processing completed. Output saved to: {output_path}")
            else:
                raise ValueError("Error: Output video file is empty or was not created")

def main():
    try:
        # Initialize detector
        detector = WrongLaneDetector()
        
        # Process video
        input_video = "C:/Users/DELL/Desktop/accident detection/numberplate/vid3.mp4"  # Replace with your video path
        output_video = "C:/Users/DELL/Desktop/accident detection/numberplate/wrong_out3.avi"  # Changed extension to .avi for better compatibility
        
        detector.process_video(input_video, output_video)
        
    except Exception as e:
        print(f"Error: {str(e)}")

if __name__ == "__main__":
    main()