In [8]:
import os
from nuscenes.nuscenes import NuScenes
from nuscenes.utils.splits import create_splits_scenes
from PIL import Image
import torch
from ultralytics import YOLO
import numpy as np
import json
from tqdm import tqdm


class NuScenesConesDataset:
    def __init__(self, dataroot, version="v1.0-mini", split="train"):
        self.nusc = NuScenes(version=version, dataroot=dataroot, verbose=True)
        self.split = split

        # Get scene names from splits
        self.scene_names = create_splits_scenes()[split]

        # Map scene names to scene tokens
        self.scene_tokens = []
        for scene in self.nusc.scene:
            if scene["name"] in self.scene_names:
                self.scene_tokens.append(scene["token"])

    def prepare_dataset(self, output_dir, max_images=None):
        """
        Prepare dataset in YOLO format with improved label handling
        """
        # Create directories
        images_dir = os.path.join(output_dir, "images", self.split)
        labels_dir = os.path.join(output_dir, "labels", self.split)

        os.makedirs(images_dir, exist_ok=True)
        os.makedirs(labels_dir, exist_ok=True)

        image_count = 0
        skipped_count = 0

        # Track all annotations
        for scene_token in tqdm(
            self.scene_tokens, desc=f"Processing {self.split} scenes"
        ):
            scene_rec = self.nusc.get("scene", scene_token)
            sample = self.nusc.get("sample", scene_rec["first_sample_token"])

            while sample:
                # Check if we've reached the maximum number of images
                if max_images is not None and image_count >= max_images:
                    print(f"Reached maximum number of images ({max_images})")
                    break

                # Get camera images
                cam_front_data = self.nusc.get(
                    "sample_data", sample["data"]["CAM_FRONT"]
                )

                # Get image
                img_path = os.path.join(self.nusc.dataroot, cam_front_data["filename"])
                if not os.path.exists(img_path):
                    print(f"Warning: Image not found at {img_path}")
                    skipped_count += 1
                    continue

                try:
                    img = Image.open(img_path)
                except Exception as e:
                    print(f"Warning: Could not open image {img_path}: {e}")
                    skipped_count += 1
                    continue

                # Get annotations
                annotations = []
                has_cones = False  # Flag to track if image has any cone annotations

                for ann_token in sample["anns"]:
                    ann_rec = self.nusc.get("sample_annotation", ann_token)
                    if ann_rec["category_name"] == "movable_object.traffic_cone":
                        has_cones = True
                        try:
                            # Get 2D bbox in image coordinates
                            bbox = self.nusc.get_box(ann_rec["token"])
                            corners_2d = self.nusc.box_to_keypoints(
                                cam_front_data["token"], bbox
                            )

                            # Convert to YOLO format (normalized coordinates)
                            x_min, y_min = np.min(corners_2d, axis=0)
                            x_max, y_max = np.max(corners_2d, axis=0)

                            # Skip if bbox is outside image bounds
                            if (
                                x_min >= img.width
                                or x_max <= 0
                                or y_min >= img.height
                                or y_max <= 0
                            ):
                                continue

                            # Clip coordinates to image bounds
                            x_min = max(0, x_min)
                            x_max = min(img.width, x_max)
                            y_min = max(0, y_min)
                            y_max = min(img.height, y_max)

                            # YOLO format: <class> <x_center> <y_center> <width> <height>
                            x_center = ((x_min + x_max) / 2) / img.width
                            y_center = ((y_min + y_max) / 2) / img.height
                            bbox_width = (x_max - x_min) / img.width
                            bbox_height = (y_max - y_min) / img.height

                            # Ensure values are within [0, 1]
                            if (
                                0 <= x_center <= 1
                                and 0 <= y_center <= 1
                                and 0 <= bbox_width <= 1
                                and 0 <= bbox_height <= 1
                            ):
                                annotations.append(
                                    f"0 {x_center:.6f} {y_center:.6f} {bbox_width:.6f} {bbox_height:.6f}"
                                )
                        except Exception as e:
                            print(f"Warning: Error processing annotation: {e}")
                            continue

                # Only save image if it has valid cone annotations
                if has_cones and annotations:
                    # Save image and labels
                    img_filename = os.path.basename(cam_front_data["filename"])
                    label_filename = os.path.splitext(img_filename)[0] + ".txt"

                    # Save image
                    img_save_path = os.path.join(images_dir, img_filename)
                    img.save(img_save_path)

                    # Save labels
                    label_save_path = os.path.join(labels_dir, label_filename)
                    with open(label_save_path, "w") as f:
                        f.write("\n".join(annotations))

                    image_count += 1

                    if image_count % 10 == 0:
                        print(f"Processed {image_count} images with cone annotations")

                # Move to next sample
                if not sample["next"]:
                    break
                sample = self.nusc.get("sample", sample["next"])

        print(f"\nDataset preparation complete:")
        print(f"- Total images processed with cone annotations: {image_count}")
        print(f"- Images skipped: {skipped_count}")

        # Verify dataset
        self._verify_dataset(images_dir, labels_dir)

    def _verify_dataset(self, images_dir, labels_dir):
        """
        Verify that the dataset was created correctly
        """
        image_files = set(
            os.path.splitext(f)[0]
            for f in os.listdir(images_dir)
            if f.endswith((".jpg", ".png"))
        )
        label_files = set(
            os.path.splitext(f)[0] for f in os.listdir(labels_dir) if f.endswith(".txt")
        )

        print("\nDataset verification:")
        print(f"- Number of images: {len(image_files)}")
        print(f"- Number of label files: {len(label_files)}")

        # Check for mismatches
        images_without_labels = image_files - label_files
        labels_without_images = label_files - image_files

        if images_without_labels:
            print(f"Warning: Found {len(images_without_labels)} images without labels")
        if labels_without_images:
            print(f"Warning: Found {len(labels_without_images)} labels without images")

        # Verify label format
        invalid_labels = []
        for label_file in os.listdir(labels_dir):
            if label_file.endswith(".txt"):
                with open(os.path.join(labels_dir, label_file), "r") as f:
                    lines = f.readlines()
                    for i, line in enumerate(lines):
                        parts = line.strip().split()
                        if len(parts) != 5:
                            invalid_labels.append((label_file, i + 1))

        if invalid_labels:
            print(f"Warning: Found {len(invalid_labels)} invalid label entries")
            for label_file, line_num in invalid_labels[:5]:  # Show first 5 errors
                print(f"  - {label_file}:line {line_num}")

In [9]:
import shutil

dataroot = "../data/v1.0-mini"  # Replace with your NuScenes data path
output_dir = "../output_6"  # Replace with your desired output path

if os.path.exists(output_dir):
    shutil.rmtree(output_dir)

# Prepare datasets
train_dataset = NuScenesConesDataset(dataroot, split="train")
train_dataset.prepare_dataset(
    output_dir, max_images=100
)  # Increased max_images for better training

val_dataset = NuScenesConesDataset(dataroot, split="val")
val_dataset.prepare_dataset(output_dir, max_images=50)

Loading NuScenes tables for version v1.0-mini...
23 category,
8 attribute,
4 visibility,
911 instance,
12 sensor,
120 calibrated_sensor,
31206 ego_pose,
8 log,
10 scene,
404 sample,
31206 sample_data,
18538 sample_annotation,
4 map,
Done loading in 0.430 seconds.
Reverse indexing ...
Done reverse indexing in 0.1 seconds.


Processing train scenes: 100%|██████████| 6/6 [00:00<00:00, 273.61it/s]


Dataset preparation complete:
- Total images processed with cone annotations: 0
- Images skipped: 0

Dataset verification:
- Number of images: 0
- Number of label files: 0
Loading NuScenes tables for version v1.0-mini...





23 category,
8 attribute,
4 visibility,
911 instance,
12 sensor,
120 calibrated_sensor,
31206 ego_pose,
8 log,
10 scene,
404 sample,
31206 sample_data,
18538 sample_annotation,
4 map,
Done loading in 0.236 seconds.
Reverse indexing ...
Done reverse indexing in 0.1 seconds.


Processing val scenes: 100%|██████████| 4/4 [00:00<00:00, 258.36it/s]


Dataset preparation complete:
- Total images processed with cone annotations: 0
- Images skipped: 0

Dataset verification:
- Number of images: 0
- Number of label files: 0



