# Avoiding Road Accidents

## Dataset: 

http://vllab1.ucmerced.edu/~hhsu22/rear_signal/rear_signal#

### Dataset statistics

Total sequences: 649
Total frames: 63637

*Number of sequences in each class*:

OOO: 188 BOO: 211 OLO: 78 BLO: 63

OOR:  58 BOR:  33 OLR:  9 BLR:  9

*Number of frames in each class:*:


OOO: 21867 BOO: 17874 OLO: 6271 BLO: 6380

OOR:  4728 BOR:  3527 OLR: 1600 BLR: 1390

In [None]:
import torch
print(f"CUDA available: {torch.cuda.is_available()}")
print(f"Current device: {torch.cuda.get_device_name(0)}")

In [None]:
!pip install opencv-python
!pip install ultralytics

In [None]:
import logging

# Setup logger
import logging
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

In [None]:
import os
from pathlib import Path
from glob import glob
import shutil
import cv2
import numpy as np

In [None]:
os.makedirs("data/yolo_dataset/images/train", exist_ok=True)
os.makedirs("data/yolo_dataset/images/val", exist_ok=True)
os.makedirs("data/yolo_dataset/labels/train", exist_ok=True)
os.makedirs("data/yolo_dataset/labels/val", exist_ok=True)

In [None]:
# Define class mapping
class_mapping = {
    'OOO': 0,  # Normal
    'BOO': 1,  # Braking
    'OLO': 2,  # Left signal
    'BLO': 3,  # Brake + Left signal
    'OOR': 4,  # Right signal
    'BOR': 5,  # Brake + Right signal
    'OLR': 6,  # Hazard lights
    'BLR': 7   # Brake + Hazard lights
}


In [None]:
def detect_taillights(image_path):
    # Read the image
    image = cv2.imread(str(image_path))
    if image is None:
        return None, None
        
    # Get image dimensions for calculating relative values later
    height, width = image.shape[:2]
    
    # Convert to HSV color space
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    
    # Define red color range for taillights
    lower_red1 = np.array([0, 100, 100])
    upper_red1 = np.array([10, 255, 255])
    lower_red2 = np.array([160, 100, 100])
    upper_red2 = np.array([180, 255, 255])
    
    # Create masks for red regions
    mask1 = cv2.inRange(hsv, lower_red1, upper_red1)
    mask2 = cv2.inRange(hsv, lower_red2, upper_red2)
    mask = cv2.bitwise_or(mask1, mask2)
    
    # Apply morphological operations
    kernel = np.ones((5,5), np.uint8)
    mask = cv2.morphologyEx(mask, cv2.MORPH_CLOSE, kernel)
    mask = cv2.morphologyEx(mask, cv2.MORPH_OPEN, kernel)
    
    # Find contours
    contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
    
    # Filter contours based on area
    min_area = 50
    valid_contours = [cnt for cnt in contours if cv2.contourArea(cnt) > min_area]
    
    if valid_contours:
        all_x = []
        all_y = []
        for contour in valid_contours:
            for point in contour:
                all_x.append(point[0][0])
                all_y.append(point[0][1])
        
        # Calculate absolute coordinates with padding
        x_min = max(min(all_x) - 10, 0)
        x_max = min(max(all_x) + 10, width)
        y_min = max(min(all_y) - 5, 0)
        y_max = min(max(all_y) + 5, height)
        
        # Convert to YOLO format (x_center, y_center, width, height) - all relative
        x_center = (x_min + x_max) / (2 * width)  # relative center x
        y_center = (y_min + y_max) / (2 * height)  # relative center y
        rel_width = (x_max - x_min) / width  # relative width
        rel_height = (y_max - y_min) / height  # relative height
        
        return True, (x_center, y_center, rel_width, rel_height)
    
    return False, None

In [None]:
def process_images(footage_path, split='train'):
    footage_path = Path(footage_path)
    frame_paths = list(footage_path.rglob('light_mask/frame*.png'))
    logger.info(f"Found {len(frame_paths)} frames")

    # Create directories if they don't exist
    Path(f"data/yolo_dataset/images/{split}").mkdir(parents=True, exist_ok=True)
    Path(f"data/yolo_dataset/labels/{split}").mkdir(parents=True, exist_ok=True)
    
    for frame_path in frame_paths:
        try:
            # Debug print the full frame path
            # logger.info(f"Processing frame: {frame_path}")
            # logger.info(f"Frame path exists: {frame_path.exists()}")

            # Extract class from path
            frame_path_str = str(frame_path)
            for class_name in class_mapping.keys():
                if f"_{class_name}_" in frame_path_str:
                    break
            else:
                logger.error(f"Could not find class name in path: {frame_path}")
                continue

            # Generate filenames
            filename = frame_path.name
            dest_path = Path(f"data/yolo_dataset/images/{split}/{filename}")
            label_path = Path(f"data/yolo_dataset/labels/{split}/{filename}").with_suffix('.txt')

            # Debug print the destination path
            # logger.info(f"Destination path: {dest_path}")

            # Verify source file exists before copying
            if not frame_path.exists():
                logger.error(f"Source file does not exist: {frame_path}")
                continue

            # Create parent directories if they don't exist
            dest_path.parent.mkdir(parents=True, exist_ok=True)

            # Copy image
            try:
                shutil.copy2(str(frame_path), str(dest_path))
            except Exception as copy_error:
                logger.error(f"Error copying file: {str(copy_error)}")
                logger.error(f"From: {frame_path}")
                logger.error(f"To: {dest_path}")
                continue

            # Detect taillights and get YOLO format coordinates
            success, bbox = detect_taillights(frame_path)

            # Create label file with detected coordinates or fallback values
            with open(label_path, 'w') as f:
                if success and bbox:
                    x_center, y_center, rel_width, rel_height = bbox
                    f.write(f'{class_mapping[class_name]} {x_center:.6f} {y_center:.6f} {rel_width:.6f} {rel_height:.6f}\n')
                else:
                    # Fallback to default values if detection fails
                    logger.warning(f"Detection failed for {filename}, using default values")
                    f.write(f'{class_mapping[class_name]} 0.5 0.8 0.3 0.2\n')

            # logger.info(f"Successfully processed: {filename}")

        except Exception as e:
            logger.error(f"Error processing {frame_path}: {str(e)}")

In [None]:
base_path = Path("data/rear_signal_dataset")

with open('data/rear_signal_dataset/Easy.txt', 'r') as f:
    easy_sequences = f.read().splitlines()
logger.info(f"Found {len(easy_sequences)} easy sequences")

# Split into train and validation
train_sequences = easy_sequences[:int(len(easy_sequences)*0.8)]
val_sequences = easy_sequences[int(len(easy_sequences)*0.8):]

logger.debug(f"Processing {len(train_sequences)} train sequences...")
for seq in train_sequences:
    # Reconstruct the correct path
    # Extract the base parts of the sequence name
    base_parts = seq.split('_')[:4]  # Get the first 4 parts
    base_name = '_'.join(base_parts)
    class_name = seq.split('_')[-2]  # Get class name (BOO, OOO, etc)
    sequence_num = seq.split('_')[-1]  # Get sequence number

    # Construct the full path
    seq_path = base_path.joinpath(base_name, f"{base_name}_{class_name}", f"{base_name}_{class_name}_{sequence_num}")
    # logger.debug(f"Looking for sequence at: {seq_path}")

    if os.path.exists(seq_path):
        process_images(seq_path, 'train')
    else:
        if base_parts[0].startswith('test'):
            complete_split = seq.split('_')
            if complete_split[2].startswith('idx'):
                base_parts = seq.split('_')[:3]
            else:
                base_parts = seq.split('_')[:2]
            base_name = '_'.join(base_parts)
            class_name = seq.split('_')[-2]
            sequence_num = seq.split('_')[-1]

            # construct full path for test
            seq_path = base_path.joinpath(base_name, f"{base_name}_{class_name}", f"{base_name}_{class_name}_{sequence_num}")
            # logger.debug(f"Looking for sequence at: {seq_path}")

            if os.path.exists(seq_path):
                process_images(seq_path, 'train')
            else:
                logger.error(f"Sequence path does not exist: {seq_path}")


logger.debug(f"Processing {len(val_sequences)} validation sequences...")
for seq in val_sequences:
    # logger.debug(f"Processing validation sequence: {seq}")
    base_parts = seq.split('_')[:4]
    base_name = '_'.join(base_parts)
    class_name = seq.split('_')[-2]
    sequence_num = seq.split('_')[-1]

    seq_path = base_path.joinpath(base_name, f"{base_name}_{class_name}", f"{base_name}_{class_name}_{sequence_num}")

    if os.path.exists(seq_path):
        process_images(seq_path, 'val')
    else:
        if base_parts[0].startswith('test'):
            complete_split = seq.split('_')
            if complete_split[2].startswith('idx'):
                base_parts = seq.split('_')[:3]
            else:
                base_parts = seq.split('_')[:2]
            base_name = '_'.join(base_parts)
            class_name = seq.split('_')[-2]
            sequence_num = seq.split('_')[-1]

            # Construct full path for test
            seq_path = base_path.joinpath(base_name, f"{base_name}_{class_name}", f"{base_name}_{class_name}_{sequence_num}")
            # logger.debug(f"Looking for sequence at: {seq_path}")

            if os.path.exists(seq_path):
                process_images(seq_path, 'val')
            else:
                logger.error(f"Sequence path does not exist: {seq_path}")


# Print final statistics
train_images = len(glob('data/yolo_dataset/images/train/*.png'))
val_images = len(glob('data/yolo_dataset/images/val/*.png'))

logger.info(f"Final Statistics:")
logger.info(f"Training images: {train_images}")
logger.info(f"Validation images: {val_images}")


still have 3406 out of 15432 frames having default boundary box values

In [None]:
# Create yaml file first
yaml_content = f"""
path: {os.path.abspath('data/yolo_dataset')}  # dataset root dir
train: images/train  # train images (relative to 'path')
val: images/val  # val images (relative to 'path')

# Classes
nc: {len(class_mapping)}  # number of classes
names: {list(class_mapping.keys())}  # class names
"""

with open('dataset.yaml', 'w') as f:
    f.write(yaml_content)

In [None]:
!pip uninstall -y ultralytics
!pip install torch torchvision ultralytics==8.0.196


In [None]:
!yolo task=detect mode=train model=yolov8n.pt data=dataset.yaml epochs=5 imgsz=640 batch=16 device=cuda