# Emergency Vehicle Detection - Testing and Auto-Annotation

This notebook tests the trained YOLOv5 model and demonstrates auto-annotation capabilities.

In [None]:
import torch
import cv2
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path
import yaml
import pandas as pd
from tqdm.auto import tqdm
import torchvision.transforms as T
from torchvision.transforms import functional as F

In [None]:
# Load configuration
with open('../Dataset/dataset.yaml', 'r') as f:
    config = yaml.safe_load(f)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Load trained model
model = torch.load('best_model.pt', map_location=device)
model.eval()

# Initialize normalizer
normalizer = T.Normalize(mean=[0.485, 0.456, 0.406],
                        std=[0.229, 0.224, 0.225])

In [None]:
def preprocess_image(image, target_size=640):
    """Preprocess image for model inference"""
    # Convert BGR to RGB
    if isinstance(image, np.ndarray):
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image = F.to_pil_image(image)
    
    # Resize maintaining aspect ratio
    w, h = image.size
    scale = target_size / max(w, h)
    new_w = int(w * scale)
    new_h = int(h * scale)
    image = F.resize(image, [new_h, new_w])
    
    # Convert to tensor and normalize
    image = F.to_tensor(image)
    image = normalizer(image)
    
    return image, (scale, (new_w, new_h))

In [None]:
def detect_image(image, conf_threshold=0.5):
    """Detect vehicles in a single image"""
    # Preprocess image
    img_tensor, (scale, (w, h)) = preprocess_image(image)
    img_tensor = img_tensor.unsqueeze(0).to(device)
    
    # Run inference
    with torch.no_grad():
        results = model(img_tensor)
    
    # Process detections
    detections = []
    for pred in results.pred[0]:
        if pred.conf > conf_threshold:
            # Scale coordinates back to original image size
            x1, y1, x2, y2 = pred[:4] / scale
            conf = pred.conf
            cls = int(pred.cls)
            
            detection = {
                'class': config['names'][cls],
                'confidence': float(conf),
                'bbox': [float(x1), float(y1), float(x2), float(y2)]
            }
            detections.append(detection)
    
    return detections

In [None]:
def draw_detections(image, detections):
    """Draw detection results on the image"""
    colors = {
        'Ambulance': (255, 0, 0),    # Red
        'Fire Engine': (0, 0, 255),  # Blue
        'Police': (0, 255, 0),       # Green
        'Non Emergency': (128, 128, 128)  # Gray
    }
    
    img_copy = image.copy()
    
    for det in detections:
        bbox = np.array(det['bbox']).astype(int)
        label = f"{det['class']} {det['confidence']:.2f}"
        color = colors.get(det['class'], (0, 255, 0))
        
        # Draw box
        cv2.rectangle(img_copy,
                     (bbox[0], bbox[1]),
                     (bbox[2], bbox[3]),
                     color, 2)
        
        # Draw label with background
        text_size = cv2.getTextSize(label, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 2)[0]
        cv2.rectangle(img_copy,
                     (bbox[0], bbox[1] - text_size[1] - 5),
                     (bbox[0] + text_size[0], bbox[1]),
                     color, -1)
        cv2.putText(img_copy, label,
                   (bbox[0], bbox[1] - 5),
                   cv2.FONT_HERSHEY_SIMPLEX,
                   0.5, (255, 255, 255), 2)
    
    return img_copy

In [None]:
def generate_annotations(images_dir, output_file, conf_threshold=0.7):
    """Generate annotations for images using the trained model"""
    annotations = []
    images = Path(images_dir).glob('*.jpg')
    
    for img_path in tqdm(images, desc='Generating annotations'):
        image = cv2.imread(str(img_path))
        h, w = image.shape[:2]
        
        detections = detect_image(image, conf_threshold)
        
        for det in detections:
            bbox = det['bbox']
            
            # Convert to YOLO format
            x_center = (bbox[0] + bbox[2]) / 2 / w
            y_center = (bbox[1] + bbox[3]) / 2 / h
            width = (bbox[2] - bbox[0]) / w
            height = (bbox[3] - bbox[1]) / h
            
            annotations.append({
                'image': img_path.name,
                'x_center': x_center,
                'y_center': y_center,
                'width': width,
                'height': height,
                'class': list(config['names'].keys())[list(config['names'].values()).index(det['class'])],
                'confidence': det['confidence']
            })
    
    # Save annotations
    pd.DataFrame(annotations).to_csv(output_file, index=False)
    print(f"Saved {len(annotations)} annotations to {output_file}")
    
    return annotations

In [None]:
# Test on sample images
test_images = ['../Dataset/val/images/ambulance1.jpg', 
               '../Dataset/val/images/fire_engine2.jpg']

for img_path in test_images:
    print(f"\nProcessing {img_path}...")
    img = cv2.imread(img_path)
    if img is None:
        print(f"Could not read image at {img_path}")
        continue
        
    detections = detect_image(img)
    img_with_detections = draw_detections(img, detections)
    
    plt.figure(figsize=(12, 8))
    plt.imshow(cv2.cvtColor(img_with_detections, cv2.COLOR_BGR2RGB))
    plt.axis('off')
    plt.title(f'Detections for {Path(img_path).name}')
    plt.show()
    
    print("Detections:")
    for det in detections:
        print(f"- {det['class']}: {det['confidence']:.2f}")

In [None]:
# Test video detection
def process_video(video_path, output_path=None, display=True):
    """Process video for vehicle detection"""
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error: Could not open video at {video_path}")
        return
    
    # Get video properties
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    
    # Initialize video writer if output path is provided
    if output_path:
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
    
    frame_count = 0
    try:
        with tqdm(total=total_frames, desc="Processing video") as pbar:
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                # Process every 2nd frame for speed
                if frame_count % 2 == 0:
                    detections = detect_image(frame)
                    frame_with_detections = draw_detections(frame, detections)

                    if output_path:
                        out.write(frame_with_detections)

                    if display:
                        cv2.imshow('Emergency Vehicle Detection', frame_with_detections)
                        if cv2.waitKey(1) & 0xFF == ord('q'):
                            break

                frame_count += 1
                pbar.update(1)
    
    finally:
        cap.release()
        if output_path:
            out.release()
        if display:
            cv2.destroyAllWindows()

# Test on video
video_path = '../Dataset/test_video.mp4'
output_path = '../Dataset/output_video.mp4'
process_video(video_path, output_path)

In [None]:
# Generate new annotations for a directory of images
new_images_dir = '../Dataset/new_images'
output_annotations = '../Dataset/new_annotations.csv'

# Uncomment to generate annotations
# annotations = generate_annotations(new_images_dir, output_annotations)