<span style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">An Exception was encountered at '<a href="#papermill-error-cell">In [2]</a>'.</span>

In [None]:
import sys
import os

# Add the absolute path of the src folder to sys.path
sys.path.append(os.path.abspath('../ML/src'))


<span id="papermill-error-cell" style="color:red; font-family:Helvetica Neue, Helvetica, Arial, sans-serif; font-size:2em;">Execution using papermill encountered an exception here and stopped:</span>

In [2]:
import os
import cv2
import csv
import numpy as np
from ultralytics import YOLO
from collections import deque
from data_processing import extract_frames
from vehicle_detection import detect_vehicles

# Initialize variables for vehicle tracking
vehicle_positions = {}  # Dictionary to track vehicle positions across frames
vehicle_speeds = deque(maxlen=10)  # Store the speed of tracked vehicles
frame_rate = 2  # Assuming 2 frames per second for speed calculation
previous_vehicles = []  # Placeholder for previous vehicle positions
previous_frame = None  # Placeholder for previous frame

def calculate_average_speed(frame_path, previous_vehicles, previous_frame):
    """Estimate the average speed of vehicles in the frame using vehicle tracking."""
    current_vehicle_count, current_boxes = detect_vehicles(frame_path)

    total_speed = 0
    speed_count = 0

    if previous_frame is not None and len(previous_vehicles) > 0:
        for i, box in enumerate(current_boxes):
            # Extract bounding box coordinates and convert them to float
            x1, y1, x2, y2 = box.xywh[0].tolist()  # Convert Tensor to list and unpack
            current_position = (x1 + x2) / 2  # Take the middle point of the bounding box

            if i < len(previous_vehicles):
                previous_position = previous_vehicles[i]
                speed = abs(current_position - previous_position) * frame_rate  # Speed = distance/time
                total_speed += speed
                speed_count += 1

        if speed_count > 0:
            return round(total_speed / speed_count, 2)
    return 0

def calculate_density(frame_path,vehicle_count):
    """Calculate traffic density in the frame."""
    img = cv2.imread(frame_path)
    height, width, _ = img.shape
    total_area = height * width

    if total_area > 0:
        density = vehicle_count / (total_area / 100000)  # Normalize to vehicles per 100,000 pixels
        return round(density, 6)
    return 0

def calculate_queue_length(frame_path):
    """Estimate the queue length of stationary vehicles in the frame."""
    _, current_boxes = detect_vehicles(frame_path)

    vehicle_positions_in_line = []

    for box in current_boxes:
        # Extract bounding box coordinates and convert to float
        x1, _, x2, _ = box.xywh[0].tolist()  # Use .tolist() to convert Tensor to a list of values
        vehicle_positions_in_line.append((x1 + x2) / 2)  # Take the midpoint of the vehicle

    vehicle_positions_in_line.sort()

    if len(vehicle_positions_in_line) > 1:
        queue_length = (vehicle_positions_in_line[-1] - vehicle_positions_in_line[0]) * 0.02  # Scaling factor
    else:
        queue_length = 0

    return round(queue_length, 2)


ModuleNotFoundError: No module named 'data_processing'

In [None]:
import os
import cv2
import csv
from ultralytics import YOLO
from collections import deque
from data_processing import extract_frames
from vehicle_detection import detect_vehicles
from emergency_detection import detect_emergency_vehicles 

def process_video(video_path, output_frames_folder, output_metrics_folder, model_path="../ML/models/yolo11n.pt", frame_rate=1):
    """
    Process a video to extract frames, calculate metrics (including emergency vehicles), and save results to CSV.
    """
    video_name = os.path.basename(video_path).split('.')[0]
    video_frames_folder = os.path.join(output_frames_folder, video_name)
    video_metrics_folder = os.path.join(output_metrics_folder, video_name)

    # Create necessary folders if not already present
    os.makedirs(video_frames_folder, exist_ok=True)
    os.makedirs(video_metrics_folder, exist_ok=True)
    
    # Step 1: Extract frames from the video
    extract_frames(video_path, video_frames_folder, frame_rate)

    # Step 2: Calculate metrics for each frame and save to CSV
    metrics = []
    previous_vehicles = []
    previous_frame = None

    for frame_file in sorted(os.listdir(video_frames_folder)):  # Ensure frame order
        frame_path = os.path.join(video_frames_folder, frame_file)

        # Calculate metrics
        vehicle_count, current_boxes = detect_vehicles(frame_path, model_path)
        avg_speed = calculate_average_speed(frame_path, previous_vehicles, previous_frame)
        queue_length = calculate_queue_length(frame_path)
        density = calculate_density(frame_path,vehicle_count)
        print("emer")
        emergency = detect_emergency_vehicles(frame_path)

        # Append all metrics
        metrics.append({
            "Frame": frame_file,
            "Number_of_Vehicles": vehicle_count,
            "Average_Speed_km/h": avg_speed,
            "Traffic_Density": density,
            "Queue_Length_meters": queue_length,
            "Emergency_Vehicles": emergency
        })

        # Update previous vehicles and previous frame for the next iteration
        previous_vehicles = [(box.xywh[0][0].item() + box.xywh[0][2].item()) / 2 for box in current_boxes]  # Convert tensor to float
        previous_frame = cv2.imread(frame_path)  # Store the current frame as the previous frame

    # Step 3: Save metrics to CSV
    metrics_file_path = os.path.join(video_metrics_folder, f"{video_name}_metrics.csv")
    with open(metrics_file_path, mode='w', newline='') as file:
        writer = csv.DictWriter(file, fieldnames=["Frame", "Number_of_Vehicles", "Average_Speed_km/h", "Traffic_Density", "Queue_Length_meters", "Emergency_Vehicles"])
        writer.writeheader()
        writer.writerows(metrics)
    
    print(f"Metrics for {video_name} saved to {metrics_file_path}")



In [None]:
video_paths = [
    "../ML/data/videos/video1.mp4", 
    "../ML/data/videos/video2.mp4", 
    "../ML/data/videos/video3.mp4", 
    "../ML//videos/video4.mp4"
]

output_frames_folder = "../ML/outputs/frames/"
output_metrics_folder = "../ML/outputs/metrics/"

for video_path in video_paths:
    process_video(video_path, output_frames_folder, output_metrics_folder, frame_rate=1)