## Import Libraries and Configure Logging



In [None]:
import os
import shutil
import json
import xml.etree.ElementTree as ET
import yaml
from pathlib import Path
from tqdm import tqdm
import logging
import matplotlib.pyplot as plt
import cv2
import numpy as np
from collections import defaultdict

# Configure logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.info("Libraries imported and logging configured successfully.")

## Define Directories and Paths



#### Explanation: 

We define the base directory for raw datasets and a new combined_dataset directory to store preprocessed data, separating images and labels as per YOLOv5 conventions. The Path object ensures cross-platform compatibility.



In [None]:
# Base directory containing raw datasets
base_dir = Path('../datasets')
logger.debug(f"Base directory set to: {base_dir}")

# Combined dataset directory
combined_dataset_dir = base_dir / 'combined_dataset'
combined_images_dir = combined_dataset_dir / 'images'
combined_labels_dir = combined_dataset_dir / 'labels'

# Create directories if they don’t exist
for dir_path in [combined_images_dir, combined_labels_dir]:
    dir_path.mkdir(parents=True, exist_ok=True)
    logger.debug(f"Ensured directory exists: {dir_path}")

logger.info("Directory structure initialized.")

## Define Classes and Mappings



Explanation: 

We define 10 classes, removing 'pedestrian' and combining 'bicycle' and 'motorcycle' into 'bike'. The class_to_id dictionary assigns YOLOv5-compatible integer IDs. The class_mapping handles synonyms and excludes unwanted classes (e.g., 'People', 'person'). The get_final_class function ensures only selected classes are retained.


In [None]:
# Final selected classes (10 classes after adjustments)
selected_classes = ['door', 'table', 'openedDoor', 'chair', 'pole', 'bike', 'truck', 'car', 'dog', 'bus']
class_to_id = {cls: idx for idx, cls in enumerate(selected_classes)}
logger.debug(f"Selected classes: {selected_classes}")
logger.debug(f"Class to ID mapping: {class_to_id}")

# Class mapping to standardize and filter classes
class_mapping = {
    'motorbike': 'bike',
    'bicycle': 'bike',
    'motorcycle': 'bike',
    'open_door': 'openedDoor',
    'People': None,  # Ignored (maps to pedestrian)
    'person': None   # Ignored (maps to pedestrian)
}
logger.debug(f"Class mapping defined: {class_mapping}")

def get_final_class(cls):
    """Map a class name to its final form or None if ignored."""
    mapped_cls = class_mapping.get(cls, cls)
    final_cls = mapped_cls if mapped_cls in selected_classes else None
    logger.debug(f"Mapping class '{cls}' -> '{mapped_cls}' -> Final: '{final_cls}'")
    return final_cls

logger.info("Class definitions and mapping function initialized.")

## Helper Functions



Explanation: 

find_image_file handles variable image extensions, while convert_to_yolo_format normalizes bounding boxes to the YOLO format (center_x, center_y, width, height) required by YOLOv5.



In [None]:
def find_image_file(img_folder, base_name):
    """Find an image file with possible extensions."""
    for ext in ['.jpg', '.jpeg', '.png']:
        candidate = img_folder / (base_name + ext)
        if candidate.exists():
            logger.debug(f"Found image: {candidate}")
            return candidate
    logger.warning(f"No image found for base name {base_name} in {img_folder}")
    return None

def convert_to_yolo_format(bbox, img_width, img_height):
    """Convert bounding box from [xmin, ymin, xmax, ymax] to YOLO format."""
    xmin, ymin, xmax, ymax = bbox
    center_x = (xmin + xmax) / 2 / img_width
    center_y = (ymin + ymax) / 2 / img_height
    width = (xmax - xmin) / img_width
    height = (ymax - ymin) / img_height
    logger.debug(f"Converted bbox {bbox} to YOLO: [{center_x}, {center_y}, {width}, {height}]")
    return center_x, center_y, width, height

logger.info("Helper functions defined.")

## Process Ninja Dataset


Explanation: 

This function processes the ninja dataset, which uses JSON annotations. It combines all splits (train, valid, test) into one dataset, prefixes filenames with 'n_', converts bounding boxes to YOLO format, and logs every step. Empty label files are created for images with no selected classes, aligning with YOLOv5 practices.



In [None]:
def process_ninja(ninja_folder, combined_images_folder, combined_labels_folder):
    prefix = 'n_'
    splits = ['train', 'valid', 'test']
    image_count = 0
    label_count = 0

    for split in splits:
        ann_folder = ninja_folder / split / 'ann'
        img_folder = ninja_folder / split / 'img'
        if not ann_folder.exists() or not img_folder.exists():
            logger.warning(f"Missing 'ann' or 'img' folder in {split} split of ninja")
            continue

        ann_files = list(ann_folder.glob('*.json'))
        logger.info(f"Found {len(ann_files)} annotation files in ninja {split}")

        for ann_file in tqdm(ann_files, desc=f"Processing ninja {split}"):
            try:
                base_name = ann_file.stem.split('.')[0]  # e.g., '109' from '109.png.json'
                img_file = find_image_file(img_folder, base_name)
                if not img_file:
                    continue

                # Copy image with prefix
                new_img_name = f"{prefix}{img_file.name}"
                new_img_path = combined_images_folder / new_img_name
                shutil.copy(img_file, new_img_path)
                image_count += 1
                logger.debug(f"Copied image to {new_img_path}")

                # Parse JSON
                with open(ann_file, 'r') as f:
                    data = json.load(f)
                width = data['size']['width']
                height = data['size']['height']
                objects = data.get('objects', [])
                lines = []

                for obj in objects:
                    cls = obj.get('classTitle', 'unknown')
                    final_cls = get_final_class(cls)
                    if final_cls is None:
                        continue
                    points = obj.get('points', {}).get('exterior', [])
                    if len(points) != 2:
                        logger.warning(f"Invalid points data in {ann_file}: {points}")
                        continue
                    xmin, ymin = points[0]
                    xmax, ymax = points[1]
                    bbox = [xmin, ymin, xmax, ymax]
                    center_x, center_y, box_width, box_height = convert_to_yolo_format(bbox, width, height)
                    class_id = class_to_id[final_cls]
                    line = f"{class_id} {center_x:.6f} {center_y:.6f} {box_width:.6f} {box_height:.6f}"
                    lines.append(line)

                # Write label file (empty if no objects)
                new_label_name = new_img_name.rsplit('.', 1)[0] + '.txt'
                new_label_path = combined_labels_folder / new_label_name
                with open(new_label_path, 'w') as f:
                    f.write('\n'.join(lines))
                label_count += 1
                logger.debug(f"Wrote label file {new_label_path} with {len(lines)} annotations")

            except Exception as e:
                logger.error(f"Error processing {ann_file}: {e}")

    logger.info(f"Ninja processed: {image_count} images, {label_count} labels")
    return image_count, label_count

ninja_counts = process_ninja(base_dir / 'ninja', combined_images_dir, combined_labels_dir)
print(f"Ninja results: Images: {ninja_counts[0]}, Labels: {ninja_counts[1]}")

## Process WOTR Dataset



Explanation: 

For WOTR, we handle XML annotations, noting that the XML filename (not the internal tag) links to the image. We prefix filenames with 'w_' and convert bounding boxes to YOLO format, accounting for the nested 'WOTR' folder structure



In [None]:
def process_wotr(wotr_folder, combined_images_folder, combined_labels_folder):
    prefix = 'w_'
    ann_folder = wotr_folder / 'WOTR' / 'Annotations'
    img_folder = wotr_folder / 'WOTR' / 'JPEGImages'
    image_count = 0
    label_count = 0

    if not ann_folder.exists() or not img_folder.exists():
        logger.error(f"Cannot find 'Annotations' or 'JPEGImages' in {wotr_folder}")
        return 0, 0

    ann_files = list(ann_folder.glob('*.xml'))
    logger.info(f"Found {len(ann_files)} XML annotation files in WOTR")

    for ann_file in tqdm(ann_files, desc="Processing WOTR"):
        try:
            base_name = ann_file.stem  # Use XML filename, not internal tag
            img_path = img_folder / f"{base_name}.jpg"
            if not img_path.exists():
                logger.warning(f"No image found for {ann_file} at {img_path}")
                continue

            new_img_name = f"{prefix}{img_path.name}"
            new_img_path = combined_images_folder / new_img_name
            shutil.copy(img_path, new_img_path)
            image_count += 1
            logger.debug(f"Copied image to {new_img_path}")

            tree = ET.parse(ann_file)
            root = tree.getroot()
            size = root.find('size')
            width = int(size.find('width').text)
            height = int(size.find('height').text)
            objects = root.findall('object')
            lines = []

            for obj in objects:
                cls = obj.find('name').text
                final_cls = get_final_class(cls)
                if final_cls is None:
                    continue
                bndbox = obj.find('bndbox')
                xmin = float(bndbox.find('xmin').text)
                ymin = float(bndbox.find('ymin').text)
                xmax = float(bndbox.find('xmax').text)
                ymax = float(bndbox.find('ymax').text)
                bbox = [xmin, ymin, xmax, ymax]
                center_x, center_y, box_width, box_height = convert_to_yolo_format(bbox, width, height)
                class_id = class_to_id[final_cls]
                line = f"{class_id} {center_x:.6f} {center_y:.6f} {box_width:.6f} {box_height:.6f}"
                lines.append(line)

            new_label_name = new_img_name.rsplit('.', 1)[0] + '.txt'
            new_label_path = combined_labels_folder / new_label_name
            with open(new_label_path, 'w') as f:
                f.write('\n'.join(lines))
            label_count += 1
            logger.debug(f"Wrote label file {new_label_path} with {len(lines)} annotations")

        except Exception as e:
            logger.error(f"Error processing {ann_file}: {e}")

    logger.info(f"WOTR processed: {image_count} images, {label_count} labels")
    return image_count, label_count

wotr_counts = process_wotr(base_dir / 'WOTR', combined_images_dir, combined_labels_dir)
print(f"WOTR results: Images: {wotr_counts[0]}, Labels: {wotr_counts[1]}")

## Process Risk-Detection Dataset



Explanation: 

The risk-detection dataset is already in YOLO format, so we remap class IDs to our unified list, excluding 'pedestrian'. All splits are combined, and filenames are prefixed with 'r_'.



In [None]:
def process_risk_detection(risk_folder, combined_images_folder, combined_labels_folder):
    prefix = 'r_'
    yaml_path = risk_folder / 'data.yaml'
    image_count = 0
    label_count = 0

    if not yaml_path.exists():
        logger.error(f"data.yaml not found in {risk_folder}")
        return 0, 0

    with open(yaml_path, 'r') as f:
        data_yaml = yaml.safe_load(f)
    class_names = data_yaml.get('names', [])
    logger.debug(f"Loaded class names from data.yaml: {class_names}")

    splits = ['train', 'valid', 'test']
    for split in splits:
        label_folder = risk_folder / split / 'labels'
        img_folder = risk_folder / split / 'images'
        if not label_folder.exists() or not img_folder.exists():
            logger.warning(f"Missing 'labels' or 'images' in {split} split of risk-detection")
            continue

        label_files = list(label_folder.glob('*.txt'))
        logger.info(f"Found {len(label_files)} label files in risk-detection {split}")

        for label_file in tqdm(label_files, desc=f"Processing risk-detection {split}"):
            try:
                base_name = label_file.stem
                img_file = find_image_file(img_folder, base_name)
                if not img_file:
                    continue

                new_img_name = f"{prefix}{img_file.name}"
                new_img_path = combined_images_folder / new_img_name
                shutil.copy(img_file, new_img_path)
                image_count += 1
                logger.debug(f"Copied image to {new_img_path}")

                with open(label_file, 'r') as f:
                    lines = f.readlines()
                new_lines = []

                for line in lines:
                    parts = line.strip().split()
                    if not parts:
                        continue
                    class_idx = int(parts[0])
                    if class_idx >= len(class_names):
                        logger.warning(f"Class index {class_idx} out of range in {label_file}")
                        continue
                    cls = class_names[class_idx]
                    final_cls = get_final_class(cls)
                    if final_cls is None:
                        continue
                    new_class_id = class_to_id[final_cls]
                    new_line = f"{new_class_id} {' '.join(parts[1:])}"
                    new_lines.append(new_line)

                new_label_name = new_img_name.rsplit('.', 1)[0] + '.txt'
                new_label_path = combined_labels_folder / new_label_name
                with open(new_label_path, 'w') as f:
                    f.write('\n'.join(new_lines))
                label_count += 1
                logger.debug(f"Wrote label file {new_label_path} with {len(new_lines)} annotations")

            except Exception as e:
                logger.error(f"Error processing {label_file}: {e}")

    logger.info(f"Risk-detection processed: {image_count} images, {label_count} labels")
    return image_count, label_count

risk_counts = process_risk_detection(base_dir / 'risk-detection-1', combined_images_dir, combined_labels_dir)
print(f"Risk-detection results: Images: {risk_counts[0]}, Labels: {risk_counts[1]}")

## Image Size Analysis and Decision on Resizing



Explanation: 

We analyze image sizes to consider resizing but opt against it. YOLOv5 resizes images on-the-fly during training, and since annotations are normalized, keeping original sizes preserves detail and simplifies preprocessing.



In [None]:
def analyze_image_sizes():
    all_sizes = []
    for img_file in combined_images_dir.glob('*'):
        try:
            img = cv2.imread(str(img_file))
            if img is None:
                logger.warning(f"Failed to load image {img_file}")
                continue
            height, width = img.shape[:2]
            all_sizes.append((width, height))
            logger.debug(f"Image {img_file}: {width}x{height}")
        except Exception as e:
            logger.error(f"Error analyzing {img_file}: {e}")

    if not all_sizes:
        logger.warning("No image sizes to analyze")
        return

    widths, heights = zip(*all_sizes)
    plt.figure(figsize=(10, 6))
    plt.hist(widths, bins=30, alpha=0.5, label='Width')
    plt.hist(heights, bins=30, alpha=0.5, label='Height')
    plt.title('Image Size Distribution')
    plt.xlabel('Pixels')
    plt.ylabel('Frequency')
    plt.legend()
    plt.show()

    logger.info(f"Image size stats - Width: min={min(widths)}, max={max(widths)}, avg={np.mean(widths):.2f}")
    logger.info(f"Image size stats - Height: min={min(heights)}, max={max(heights)}, avg={np.mean(heights):.2f}")

analyze_image_sizes()
logger.info("Decision: Not resizing images to preserve annotation accuracy; YOLOv5 will handle resizing during training.")

## Verify Dataset Integrity



Explanation: 

We check for unpaired images and labels, logging any discrepancies. A sample image with bounding boxes is visualized to confirm correct annotation conversion.



In [None]:
def verify_dataset():
    image_files = set(f.stem for f in combined_images_dir.glob('*'))
    label_files = set(f.stem for f in combined_labels_dir.glob('*.txt'))
    
    missing_labels = image_files - label_files
    missing_images = label_files - image_files
    
    logger.info(f"Total images: {len(image_files)}, Total labels: {len(label_files)}")
    if missing_labels:
        logger.warning(f"Images missing labels: {len(missing_labels)} - {list(missing_labels)[:5]}")
    if missing_images:
        logger.warning(f"Labels missing images: {len(missing_images)} - {list(missing_images)[:5]}")
    
    # Visualize a sample
    sample_img = next(iter(combined_images_dir.glob('*')))
    sample_label = combined_labels_dir / (sample_img.stem + '.txt')
    img = cv2.imread(str(sample_img))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    height, width = img.shape[:2]

    if sample_label.exists():
        with open(sample_label, 'r') as f:
            lines = f.readlines()
        for line in lines:
            class_id, cx, cy, w, h = map(float, line.split())
            xmin = int((cx - w / 2) * width)
            ymin = int((cy - h / 2) * height)
            xmax = int((cx + w / 2) * width)
            ymax = int((cy + h / 2) * height)
            cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
            cv2.putText(img, selected_classes[int(class_id)], (xmin, ymin - 10), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)

    plt.figure(figsize=(10, 10))
    plt.imshow(img)
    plt.title(f"Sample: {sample_img.name}")
    plt.axis('off')
    plt.show()

verify_dataset()
logger.info("Dataset verification completed.")

In [None]:
import random

# Find label files with and without annotations
label_files_with_annotations = [f for f in combined_labels_dir.glob('*.txt') if f.stat().st_size > 0]
label_files_without_annotations = [f for f in combined_labels_dir.glob('*.txt') if f.stat().st_size == 0]

# Randomly select one from each
label_with_annotations = random.choice(label_files_with_annotations) if label_files_with_annotations else None
label_without_annotations = random.choice(label_files_without_annotations) if label_files_without_annotations else None

img_with_annotations = combined_images_dir / (label_with_annotations.stem + '.jpg') if label_with_annotations else None
img_without_annotations = combined_images_dir / (label_without_annotations.stem + '.jpg') if label_without_annotations else None

# Function to plot image with or without bounding boxes
def plot_image(img_path, label_path, title):
    img = cv2.imread(str(img_path))
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    height, width = img.shape[:2]

    if label_path.exists() and label_path.stat().st_size > 0:
        with open(label_path, 'r') as f:
            lines = f.readlines()
        for line in lines:
            parts = line.strip().split()
            if parts:  # Ensure line is not empty
                class_id, cx, cy, w, h = map(float, parts)
                xmin = int((cx - w / 2) * width)
                ymin = int((cy - h / 2) * height)
                xmax = int((cx + w / 2) * width)
                ymax = int((cy + h / 2) * height)
                cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                class_name = selected_classes[int(class_id)]
                cv2.putText(img, class_name, (xmin, ymin - 10), 
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    
    plt.figure(figsize=(10, 10))
    plt.imshow(img)
    plt.title(title)
    plt.axis('off')
    plt.show()

# Plot both images with descriptive titles
if img_with_annotations:
    plot_image(img_with_annotations, label_with_annotations, 
               f"Image with Annotations from Selected Classes: {img_with_annotations.name}")
else:
    print("No images with annotations found.")

if img_without_annotations:
    plot_image(img_without_annotations, label_without_annotations, 
               f"Image with Empty Label File: {img_without_annotations.name}")
else:
    print("No images without annotations found.")

## Plot Images with Bounding Boxes 



In [None]:
def get_images_with_class(class_name, num_images=1):
    """Get a list of images that contain the specified class."""
    images_with_class = []
    for label_file in combined_labels_dir.glob('*.txt'):
        with open(label_file, 'r') as f:
            lines = f.readlines()
        for line in lines:
            class_id = int(line.split()[0])
            if class_id < len(selected_classes) and selected_classes[class_id] == class_name:
                img_name = label_file.stem + '.jpg'  
                images_with_class.append(img_name)
                break
        if len(images_with_class) >= num_images:
            break
    print(f"Found {len(images_with_class)} images for class '{class_name}'")
    return images_with_class

def get_random_images(num_images=4):
    """Get a list of random images from the dataset."""
    all_images = list(combined_images_dir.glob('*.jpg'))  
    return [img.name for img in random.sample(all_images, min(num_images, len(all_images)))]

def plot_images_with_boxes(image_list, n_cols=4):
    """Plot images with bounding boxes in a grid."""
    n_images = len(image_list)
    n_rows = (n_images + n_cols - 1) // n_cols
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(20, 5 * n_rows))
    axes = axes.flatten()

    for i, img_name in enumerate(image_list):
        img_path = combined_images_dir / img_name
        label_path = combined_labels_dir / (img_path.stem + '.txt')

        # Load image
        img = cv2.imread(str(img_path))
        if img is None:
            print(f"Failed to load image: {img_path}")
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        height, width = img.shape[:2]

        # Load and draw labels
        boxes_drawn = 0
        if label_path.exists() and label_path.stat().st_size > 0:
            with open(label_path, 'r') as f:
                lines = f.readlines()
            for line in lines:
                parts = line.strip().split()
                if len(parts) < 5:
                    print(f"Invalid label format in {label_path}: {line.strip()}")
                    continue
                class_id = int(parts[0])
                if class_id >= len(selected_classes):
                    print(f"Class ID {class_id} out of range in {label_path}")
                    continue
                cx, cy, w, h = map(float, parts[1:])
                xmin = int((cx - w / 2) * width)
                ymin = int((cy - h / 2) * height)
                xmax = int((cx + w / 2) * width)
                ymax = int((cy + h / 2) * height)
                cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
                cv2.putText(img, selected_classes[class_id], (xmin, ymin - 10),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
                boxes_drawn += 1
            print(f"Drew {boxes_drawn} boxes for {img_name}")
        else:
            print(f"No labels or empty label file for {img_name}")

        # Plot image
        axes[i].imshow(img)
        axes[i].set_title(img_name)
        axes[i].axis('off')

    # Hide unused axes
    for j in range(i + 1, len(axes)):
        axes[j].axis('off')

    plt.tight_layout()
    plt.show()

# Collect images
images_to_plot = []
for cls in selected_classes:
    images_to_plot.extend(get_images_with_class(cls, num_images=1))
images_to_plot.extend(get_random_images(num_images=4))
images_to_plot = list(set(images_to_plot))  # Remove duplicates

print(f"Total images to plot: {len(images_to_plot)}")
plot_images_with_boxes(images_to_plot)

## Additional Visual Checks



In [None]:
from collections import Counter

# 1. Class Frequency Histogram
class_counts = Counter()
for label_file in combined_labels_dir.glob('*.txt'):
    with open(label_file, 'r') as f:
        lines = f.readlines()
    for line in lines:
        parts = line.strip().split()
        if parts:  # Skip empty lines
            class_id = int(parts[0])
            if class_id < len(selected_classes):
                class_counts[selected_classes[class_id]] += 1

plt.figure(figsize=(12, 6))
plt.bar(class_counts.keys(), class_counts.values())
plt.title('Class Frequency in Dataset')
plt.xlabel('Class')
plt.ylabel('Number of Instances')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

# 2. Plot Images with No Annotations
no_annotation_images = []
for img_path in combined_images_dir.glob('*.jpg'):  
    label_path = combined_labels_dir / (img_path.stem + '.txt')
    if not label_path.exists() or label_path.stat().st_size == 0:
        no_annotation_images.append(img_path.name)
        if len(no_annotation_images) >= 4:  # Limit to 4 for display
            break

if no_annotation_images:
    print(f"Found {len(no_annotation_images)} images with no annotations. Plotting a sample:")
    n_cols = 4
    n_rows = 1
    fig, axes = plt.subplots(n_rows, n_cols, figsize=(20, 5))
    axes = axes.flatten() if n_cols > 1 else [axes]

    for i, img_name in enumerate(no_annotation_images):
        img = cv2.imread(str(combined_images_dir / img_name))
        if img is not None:
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            axes[i].imshow(img)
            axes[i].set_title(img_name)
            axes[i].axis('off')

    for j in range(i + 1, n_cols):
        axes[j].axis('off')
    plt.tight_layout()
    plt.show()
else:
    print("No images with missing or empty annotations found.")

### Class Frequency Histogram with Values



In [None]:
class_counts = Counter()
for label_file in combined_labels_dir.glob('*.txt'):
    with open(label_file, 'r') as f:
        lines = f.readlines()
    for line in lines:
        parts = line.strip().split()
        if parts:
            class_id = int(parts[0])
            if class_id < len(selected_classes):
                class_counts[selected_classes[class_id]] += 1

plt.figure(figsize=(12, 6))
bars = plt.bar(class_counts.keys(), class_counts.values())
plt.title('Class Frequency in Dataset')
plt.xlabel('Class')
plt.ylabel('Number of Instances')
plt.xticks(rotation=45)

for bar in bars:
    yval = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2, yval + 0.05, int(yval), ha='center', va='bottom')

plt.tight_layout()
plt.show()

In [None]:
def plot_images_for_class(class_name, num_images=4):
    images_with_class = []
    for label_file in combined_labels_dir.glob('*.txt'):
        with open(label_file, 'r') as f:
            lines = f.readlines()
        for line in lines:
            parts = line.strip().split()
            if parts and selected_classes[int(parts[0])] == class_name:
                img_path = combined_images_dir / (label_file.stem + '.jpg')
                if img_path.exists():
                    images_with_class.append(img_path)
                    if len(images_with_class) >= num_images:
                        break
        if len(images_with_class) >= num_images:
            break
    
    if images_with_class:
        fig, axes = plt.subplots(1, len(images_with_class), figsize=(15, 5))
        for i, img_path in enumerate(images_with_class):
            img = cv2.imread(str(img_path))
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            label_path = combined_labels_dir / (img_path.stem + '.txt')
            with open(label_path, 'r') as f:
                lines = f.readlines()
            h, w = img.shape[:2]
            for line in lines:
                parts = line.strip().split()
                if parts:
                    class_id, x_center, y_center, width, height = map(float, parts)
                    x_min = int((x_center - width/2) * w)
                    y_min = int((y_center - height/2) * h)
                    x_max = int((x_center + width/2) * w)
                    y_max = int((y_center + height/2) * h)
                    cv2.rectangle(img, (x_min, y_min), (x_max, y_max), (255, 0, 0), 2)
                    cv2.putText(img, selected_classes[int(class_id)], (x_min, y_min-10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (255, 0, 0), 2)
            axes[i].imshow(img)
            axes[i].axis('off')
        plt.show()
    else:
        print(f"No images found for class '{class_name}'")

plot_images_for_class('car', num_images=4)

In [None]:
def get_annotation_stats():
    total_images = len(list(combined_images_dir.glob('*.jpg')))
    images_with_annotations = 0
    images_without_annotations = 0
    boxes_per_image = []

    for label_file in combined_labels_dir.glob('*.txt'):
        with open(label_file, 'r') as f:
            lines = f.readlines()
        num_boxes = len(lines)
        boxes_per_image.append(num_boxes)
        if num_boxes > 0:
            images_with_annotations += 1
        else:
            images_without_annotations += 1

    print(f"Total images: {total_images}")
    print(f"Images with annotations: {images_with_annotations}")
    print(f"Images without annotations: {images_without_annotations}")
    print(f"Average boxes per image: {sum(boxes_per_image) / len(boxes_per_image) if boxes_per_image else 0:.2f}")

get_annotation_stats()

In [None]:
def verify_random_sample(num_samples=5):
    all_images = list(combined_images_dir.glob('*.jpg'))
    sample_images = random.sample(all_images, min(num_samples, len(all_images)))
    for img_path in sample_images:
        img_name = img_path.name
        label_path = combined_labels_dir / (img_path.stem + '.txt')
        if label_path.exists() and label_path.stat().st_size > 0:
            with open(label_path, 'r') as f:
                lines = f.readlines()
            print(f"Image: {img_name}, Number of boxes: {len(lines)}")
        else:
            print(f"Image: {img_name}, No annotations")

verify_random_sample(num_samples=5)

In [None]:
def check_label_files(num_files=3):
    label_files = list(combined_labels_dir.glob('*.txt'))
    sample_files = random.sample(label_files, min(num_files, len(label_files)))
    for label_file in sample_files:
        with open(label_file, 'r') as f:
            content = f.read()
        print(f"Label file: {label_file.name}\nContent:\n{content}\n")

check_label_files(num_files=3)

In [None]:
# Function to draw bounding boxes on an image
def draw_boxes(img, label_path, selected_classes):
    height, width = img.shape[:2]
    with open(label_path, 'r') as f:
        lines = f.readlines()
    for line in lines:
        parts = line.strip().split()
        if len(parts) < 5:
            continue
        class_id, cx, cy, w, h = map(float, parts[:5])
        class_id = int(class_id)
        if class_id < len(selected_classes):
            class_name = selected_classes[class_id]
            xmin = int((cx - w/2) * width)
            ymin = int((cy - h/2) * height)
            xmax = int((cx + w/2) * width)
            ymax = int((cy + h/2) * height)
            cv2.rectangle(img, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
            cv2.putText(img, class_name, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.9, (0, 255, 0), 2)
    return img

# Collect images by class
class_images = {cls: [] for cls in selected_classes}
for label_file in combined_labels_dir.glob('*.txt'):
    with open(label_file, 'r') as f:
        lines = f.readlines()
    classes_in_file = set()
    for line in lines:
        parts = line.strip().split()
        if parts:
            class_id = int(parts[0])
            if class_id < len(selected_classes):
                classes_in_file.add(selected_classes[class_id])
    img_name = label_file.stem + '.jpg'
    for cls in classes_in_file:
        class_images[cls].append(img_name)

# Plot random 5 images for each class
for cls in selected_classes:
    print(f"\nPlotting up to 5 images for class: {cls}")
    if not class_images[cls]:
        print(f"No images found for class: {cls}")
        continue
    sample_images = random.sample(class_images[cls], min(5, len(class_images[cls])))
    for img_name in sample_images:
        img_path = combined_images_dir / img_name
        label_path = combined_labels_dir / (img_path.stem + '.txt')
        img = cv2.imread(str(img_path))
        if img is None:
            print(f"Failed to load image: {img_path}")
            continue
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB for Matplotlib
        annotated_img = draw_boxes(img, label_path, selected_classes)
        plt.figure(figsize=(10, 10))
        plt.imshow(annotated_img)
        plt.title(f"Class: {cls} - {img_name}")
        plt.axis('off')
        plt.show()

# Preprocessing

## Define Paths and Create New Directory



In [None]:
# Paths to combined images and labels
combined_images_dir = Path('../datasets/combined_dataset/images')
combined_labels_dir = Path('../datasets/combined_dataset/labels')
preprocessed_images_dir = Path('../preprocessed_dataset/images')
preprocessed_labels_dir = Path('../preprocessed_dataset/labels')

# Create preprocessed directories if they don’t exist
preprocessed_images_dir.mkdir(parents=True, exist_ok=True)
preprocessed_labels_dir.mkdir(parents=True, exist_ok=True)

logger.info(f"Original images directory: {combined_images_dir}")
logger.info(f"Original labels directory: {combined_labels_dir}")
logger.info(f"Preprocessed images will be saved to: {preprocessed_images_dir}")
logger.info(f"Preprocessed labels will be saved to: {preprocessed_labels_dir}")

## Helper Code



In [None]:
# List of possible image extensions to check
IMAGE_EXTENSIONS = ['.jpg', '.jpeg', '.png', '.bmp', '.tiff']

# Function to find an image file with any supported extension
def find_image_file(stem, image_dir, extensions):
    for ext in extensions:
        candidate = image_dir / (stem + ext)
        if candidate.exists():
            return candidate
    return None

## Analyze the Dataset



Let’s examine the dataset to understand its composition, as we know there are 70,901 images total, with 12,201 having annotations and 58,854 without.



In [None]:
# Get all label files
label_files = list(combined_labels_dir.glob('*.txt'))
logger.info(f"Total label files found: {len(label_files)}")

# Separate files with and without annotations
label_files_with_annotations = [f for f in label_files if f.stat().st_size > 0]
label_files_without_annotations = [f for f in label_files if f.stat().st_size == 0]

logger.info(f"Images with annotations: {len(label_files_with_annotations)}")
logger.info(f"Images without annotations: {len(label_files_without_annotations)}")

# Verify corresponding images exist
images_with_labels = [find_image_file(f.stem, combined_images_dir, IMAGE_EXTENSIONS) for f in label_files_with_annotations]
images_without_labels = [find_image_file(f.stem, combined_images_dir, IMAGE_EXTENSIONS) for f in label_files_without_annotations]

# Filter out None values where no image was found
images_with_labels = [img for img in images_with_labels if img is not None]
images_without_labels = [img for img in images_without_labels if img is not None]

missing_images = [f for f in images_with_labels + images_without_labels if not f.exists()]
if missing_images:
    logger.warning(f"Missing images: {len(missing_images)}. Examples: {missing_images[:5]}")
else:
    logger.info("All label files have corresponding images.")

## Balance the Dataset



Since there are far more images without labels, we’ll balance the dataset by selecting all 12,201 images with annotations and an equal number without annotations, totaling 24,402 images.



In [None]:
# Select all images with annotations
selected_label_files = label_files_with_annotations.copy()
N = len(label_files_with_annotations)

# Randomly select an equal number of images without annotations
if label_files_without_annotations:
    selected_without_annotations = random.sample(label_files_without_annotations, min(N, len(label_files_without_annotations)))
    selected_label_files.extend(selected_without_annotations)

logger.info(f"Selected {len(selected_label_files)} images for preprocessing:")
logger.info(f"- With annotations: {len(label_files_with_annotations)}")
logger.info(f"- Without annotations: {len(selected_label_files) - len(label_files_with_annotations)}")

## Define Preprocessing Functions



We need functions to preprocess images (clean, resize, normalize) and adjust labels accordingly.



In [None]:
# Target size for YOLO
S = 416

def preprocess_image(img, S):
    """Resize image to SxS while maintaining aspect ratio, pad, and normalize."""
    if img is None:
        return None, None
    H, W = img.shape[:2]
    scale = min(S / W, S / H)
    new_W = int(W * scale)
    new_H = int(H * scale)
    resized_img = cv2.resize(img, (new_W, new_H), interpolation=cv2.INTER_LINEAR)
    pad_left = (S - new_W) // 2
    pad_top = (S - new_H) // 2
    pad_right = S - new_W - pad_left
    pad_bottom = S - new_H - pad_top
    padded_img = cv2.copyMakeBorder(resized_img, pad_top, pad_bottom, pad_left, pad_right, 
                                    cv2.BORDER_CONSTANT, value=[0, 0, 0])
    normalized_img = padded_img.astype(np.float32) / 255.0
    return normalized_img, (scale, pad_left, pad_top, S, W, H)

def adjust_label(label_path, scale, pad_left, pad_top, S, W, H):
    """Adjust bounding box coordinates for resized and padded image."""
    if label_path.stat().st_size == 0:
        return []
    with open(label_path, 'r') as f:
        lines = f.readlines()
    new_lines = []
    for line in lines:
        parts = line.strip().split()
        if not parts:
            continue
        class_id = parts[0]
        x_center_norm, y_center_norm, width_norm, height_norm = map(float, parts[1:])
        # Convert to pixel coordinates
        x_center = x_center_norm * W
        y_center = y_center_norm * H
        width = width_norm * W
        height = height_norm * H
        # Scale
        x_center_scaled = x_center * scale
        y_center_scaled = y_center * scale
        width_scaled = width * scale
        height_scaled = height * scale
        # Add padding
        x_center_padded = x_center_scaled + pad_left
        y_center_padded = y_center_scaled + pad_top
        # Normalize to [0,1]
        x_center_new = x_center_padded / S
        y_center_new = y_center_padded / S
        width_new = width_scaled / S
        height_new = height_scaled / S
        new_lines.append(f"{class_id} {x_center_new:.6f} {y_center_new:.6f} {width_new:.6f} {height_new:.6f}")
    return new_lines

logger.info(f"Preprocessing functions defined. Target size: {S}x{S}")

## Select Visualization Samples



We’ll visualize a few samples to verify preprocessing.



In [None]:
plot_samples = True
num_samples = 2
sampled_with_annotations = random.sample([f for f in selected_label_files if f.stat().st_size > 0], num_samples)
sampled_without_annotations = random.sample([f for f in selected_label_files if f.stat().st_size == 0], num_samples)
sampled_images = sampled_with_annotations + sampled_without_annotations

logger.info(f"Selected {len(sampled_images)} images for visualization:")
for sample in sampled_images:
    logger.info(f"- {sample.name} (Has annotations: {sample.stat().st_size > 0})")

## Preprocess the Dataset



Now, we process each selected image and label, with visualization for samples.



In [None]:
for label_file in tqdm(selected_label_files, desc="Preprocessing images"):
    image_file = find_image_file(label_file.stem, combined_images_dir, IMAGE_EXTENSIONS)
    if image_file is None:
        logger.warning(f"Image file not found for label: {label_file}")
        continue
    img = cv2.imread(str(image_file))
    if img is None:
        logger.warning(f"Failed to read image: {image_file}")
        continue
    
    preprocessed_img, (scale, pad_left, pad_top, S, W, H) = preprocess_image(img, S)
    
    # Save preprocessed image as .jpg
    preprocessed_image_path = preprocessed_images_dir / f"{label_file.stem}.jpg"
    cv2.imwrite(str(preprocessed_image_path), (preprocessed_img * 255).astype(np.uint8))
    
    # Adjust and save label
    preprocessed_label_path = preprocessed_labels_dir / label_file.name
    if label_file.stat().st_size > 0:
        new_lines = adjust_label(label_file, scale, pad_left, pad_top, S, W, H)
        with open(preprocessed_label_path, 'w') as f:
            for line in new_lines:
                f.write(line + '\n')
    else:
        open(preprocessed_label_path, 'a').close()
    
    # Visualize samples
    if plot_samples and label_file in sampled_images:
        # Original image
        img_original = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        if label_file.stat().st_size > 0:
            with open(label_file, 'r') as f:
                lines = f.readlines()
            for line in lines:
                parts = line.strip().split()
                if parts:
                    class_id, x_center_norm, y_center_norm, width_norm, height_norm = map(float, parts)
                    xmin = int((x_center_norm - width_norm / 2) * W)
                    ymin = int((y_center_norm - height_norm / 2) * H)
                    xmax = int((x_center_norm + width_norm / 2) * W)
                    ymax = int((y_center_norm + height_norm / 2) * H)
                    cv2.rectangle(img_original, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        plt.figure(figsize=(10, 5))
        plt.imshow(img_original)
        plt.title(f"Original: {image_file.name}")
        plt.axis('off')
        plt.show()

        # Preprocessed image
        img_preprocessed = (preprocessed_img * 255).astype(np.uint8)
        img_preprocessed = cv2.cvtColor(img_preprocessed, cv2.COLOR_BGR2RGB)
        if label_file.stat().st_size > 0:
            for line in new_lines:
                parts = line.strip().split()
                class_id, x_center_new, y_center_new, width_new, height_new = map(float, parts)
                xmin = int((x_center_new - width_new / 2) * S)
                ymin = int((y_center_new - height_new / 2) * S)
                xmax = int((x_center_new + width_new / 2) * S)
                ymax = int((y_center_new + height_new / 2) * S)
                cv2.rectangle(img_preprocessed, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        plt.figure(figsize=(10, 5))
        plt.imshow(img_preprocessed)
        plt.title(f"Preprocessed: {image_file.name}")
        plt.axis('off')
        plt.show()

logger.info(f"Preprocessing completed. Total images processed: {len(selected_label_files)}")

## Verify the Preprocessed Dataset



We verify that the preprocessed dataset has no missing or corrupt data.



In [None]:
def verify_preprocessed_dataset():
    image_files = set(f.stem for f in preprocessed_images_dir.glob('*.jpg'))
    label_files = set(f.stem for f in preprocessed_labels_dir.glob('*.txt'))
    
    missing_labels = image_files - label_files
    missing_images = label_files - image_files
    
    if missing_labels:
        logger.warning(f"Images missing labels: {len(missing_labels)} - {list(missing_labels)[:5]}")
    if missing_images:
        logger.warning(f"Labels missing images: {len(missing_images)} - {list(missing_images)[:5]}")
    else:
        logger.info("Preprocessed dataset is complete: All images have corresponding labels and vice versa.")
    
    logger.info(f"Total preprocessed images: {len(image_files)}")
    logger.info(f"Total preprocessed labels: {len(label_files)}")

verify_preprocessed_dataset()