# üöë AmbuRoute - Real-Time Detection System
## Phase 4: Real-Time Ambulance Detection & Traffic Control

This notebook implements the real-time ambulance detection system and traffic signal control logic for the AmbuRoute project.


In [2]:
# Import necessary libraries
import os
import cv2
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import time
import threading
import queue
import json
from datetime import datetime
import warnings
warnings.filterwarnings('ignore')

# Import YOLOv5 and other ML libraries
from ultralytics import YOLO
import torch

# Set up plotting
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

print("üö¶ Real-Time Detection System Environment Ready!")
print(f"üîß PyTorch version: {torch.__version__}")
print(f"üéÆ CUDA available: {torch.cuda.is_available()}")
if torch.cuda.is_available():
    print(f"üéÆ GPU: {torch.cuda.get_device_name(0)}")


üö¶ Real-Time Detection System Environment Ready!
üîß PyTorch version: 2.8.0+cpu
üéÆ CUDA available: False


## üö¶ Real-Time Ambulance Detection System


In [3]:
class RealTimeAmbulanceDetector:
    """Real-time ambulance detection and traffic control system"""
    
    def __init__(self, model_path, confidence_threshold=0.5, iou_threshold=0.6):
        self.model_path = model_path
        self.confidence_threshold = confidence_threshold
        self.iou_threshold = iou_threshold
        
        # Load the trained model
        self.model = YOLO(model_path)
        
        # Traffic signal states
        self.signal_states = {}
        self.detection_history = []
        self.ambulance_detected = False
        self.last_detection_time = None
        
        # Performance metrics
        self.fps_counter = 0
        self.start_time = time.time()
        self.detection_count = 0
        
        print(f"‚úÖ Real-time detector initialized with model: {model_path}")
    
    def detect_ambulance(self, frame):
        """Detect ambulances in a single frame"""
        try:
            # Run inference
            results = self.model(frame, conf=self.confidence_threshold, iou=self.iou_threshold)
            
            # Process results
            detections = []
            for result in results:
                if result.boxes is not None:
                    for box in result.boxes:
                        # Get bounding box coordinates
                        x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                        confidence = box.conf[0].cpu().numpy()
                        class_id = int(box.cls[0].cpu().numpy())
                        
                        # Only process ambulance detections (class 0)
                        if class_id == 0:
                            detections.append({
                                'bbox': [int(x1), int(y1), int(x2), int(y2)],
                                'confidence': float(confidence),
                                'class_id': class_id,
                                'timestamp': time.time()
                            })
            
            return detections
            
        except Exception as e:
            print(f"‚ùå Error in detection: {e}")
            return []
    
    def draw_detections(self, frame, detections):
        """Draw bounding boxes and labels on frame"""
        annotated_frame = frame.copy()
        
        for detection in detections:
            x1, y1, x2, y2 = detection['bbox']
            confidence = detection['confidence']
            
            # Draw bounding box
            cv2.rectangle(annotated_frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
            
            # Draw label
            label = f"Ambulance: {confidence:.2f}"
            label_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.6, 2)[0]
            cv2.rectangle(annotated_frame, (x1, y1 - label_size[1] - 10), 
                         (x1 + label_size[0], y1), (0, 255, 0), -1)
            cv2.putText(annotated_frame, label, (x1, y1 - 5), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 0), 2)
        
        return annotated_frame
    
    def update_traffic_signal(self, detections, camera_id="default"):
        """Update traffic signal based on detections"""
        current_time = time.time()
        
        if detections:
            # Ambulance detected - set signal to GREEN
            self.ambulance_detected = True
            self.last_detection_time = current_time
            self.signal_states[camera_id] = {
                'state': 'GREEN',
                'timestamp': current_time,
                'confidence': max([d['confidence'] for d in detections]),
                'reason': 'Ambulance detected'
            }
            self.detection_count += 1
        else:
            # No ambulance detected
            if self.ambulance_detected and self.last_detection_time:
                # Check if enough time has passed since last detection
                time_since_detection = current_time - self.last_detection_time
                if time_since_detection > 10:  # 10 seconds buffer
                    self.ambulance_detected = False
                    self.signal_states[camera_id] = {
                        'state': 'RED',
                        'timestamp': current_time,
                        'confidence': 0.0,
                        'reason': 'No ambulance detected'
                    }
            else:
                self.signal_states[camera_id] = {
                    'state': 'RED',
                    'timestamp': current_time,
                    'confidence': 0.0,
                    'reason': 'No ambulance detected'
                }
    
    def draw_traffic_signal(self, frame, camera_id="default"):
        """Draw traffic signal status on frame"""
        if camera_id not in self.signal_states:
            return frame
        
        signal_info = self.signal_states[camera_id]
        state = signal_info['state']
        confidence = signal_info['confidence']
        reason = signal_info['reason']
        
        # Choose colors based on signal state
        if state == 'GREEN':
            signal_color = (0, 255, 0)  # Green
            text_color = (0, 0, 0)  # Black text
        else:
            signal_color = (0, 0, 255)  # Red
            text_color = (255, 255, 255)  # White text
        
        # Draw signal status box
        cv2.rectangle(frame, (10, 10), (400, 120), signal_color, -1)
        cv2.rectangle(frame, (10, 10), (400, 120), (255, 255, 255), 2)
        
        # Add text
        cv2.putText(frame, f"TRAFFIC SIGNAL: {state}", (20, 40), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1, text_color, 2)
        cv2.putText(frame, f"Confidence: {confidence:.2f}", (20, 70), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, text_color, 2)
        cv2.putText(frame, f"Reason: {reason}", (20, 100), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, text_color, 2)
        
        return frame
    
    def process_video_stream(self, video_source=0, output_path=None, display=True):
        """Process video stream for real-time detection"""
        print(f"üé• Starting video stream processing...")
        print(f"üìπ Video source: {video_source}")
        
        # Open video source
        cap = cv2.VideoCapture(video_source)
        if not cap.isOpened():
            print(f"‚ùå Error: Could not open video source {video_source}")
            return
        
        # Get video properties
        fps = int(cap.get(cv2.CAP_PROP_FPS)) or 30
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        
        print(f"üìä Video properties: {width}x{height} @ {fps} FPS")
        
        # Setup video writer if output path provided
        if output_path:
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
        
        frame_count = 0
        start_time = time.time()
        
        try:
            while True:
                ret, frame = cap.read()
                if not ret:
                    print("üìπ End of video stream")
                    break
                
                # Detect ambulances
                detections = self.detect_ambulance(frame)
                
                # Update traffic signal
                self.update_traffic_signal(detections)
                
                # Draw detections
                annotated_frame = self.draw_detections(frame, detections)
                
                # Draw traffic signal status
                annotated_frame = self.draw_traffic_signal(annotated_frame)
                
                # Add FPS counter
                frame_count += 1
                if frame_count % 30 == 0:  # Update FPS every 30 frames
                    elapsed_time = time.time() - start_time
                    current_fps = frame_count / elapsed_time
                    cv2.putText(annotated_frame, f"FPS: {current_fps:.1f}", 
                               (width - 150, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                
                # Write frame to output
                if output_path:
                    out.write(annotated_frame)
                
                # Display frame
                if display:
                    cv2.imshow('AmbuRoute - Real-time Detection', annotated_frame)
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        print("üõë Stopping video processing...")
                        break
                
        except KeyboardInterrupt:
            print("üõë Interrupted by user")
        
        finally:
            # Cleanup
            cap.release()
            if output_path:
                out.release()
            cv2.destroyAllWindows()
            
            # Print statistics
            elapsed_time = time.time() - start_time
            avg_fps = frame_count / elapsed_time
            print(f"üìä Processing Statistics:")
            print(f"   Total frames: {frame_count}")
            print(f"   Total time: {elapsed_time:.2f} seconds")
            print(f"   Average FPS: {avg_fps:.2f}")
            print(f"   Ambulance detections: {self.detection_count}")
    
    def process_video_file(self, video_path, output_path=None, display=True):
        """Process a video file for detection"""
        print(f"üé¨ Processing video file: {video_path}")
        
        if not Path(video_path).exists():
            print(f"‚ùå Video file not found: {video_path}")
            return
        
        self.process_video_stream(video_path, output_path, display)
    
    def get_detection_statistics(self):
        """Get current detection statistics"""
        return {
            'total_detections': self.detection_count,
            'ambulance_detected': self.ambulance_detected,
            'last_detection_time': self.last_detection_time,
            'signal_states': self.signal_states,
            'fps': self.fps_counter
        }

# Initialize detector (will be loaded with trained model)
print("‚úÖ Real-time detection system ready!")


‚úÖ Real-time detection system ready!
