In [None]:
!pip install ultralytics
import os
import json
import random
import shutil
import warnings
from pathlib import Path
from ultralytics import YOLO
import cv2
import numpy as np
import torch

# Suppress warnings and model output
warnings.filterwarnings("ignore")
import logging

Collecting ultralytics
  Downloading ultralytics-8.3.40-py3-none-any.whl.metadata (35 kB)
Collecting ultralytics-thop>=2.0.0 (from ultralytics)
  Downloading ultralytics_thop-2.0.12-py3-none-any.whl.metadata (9.4 kB)
Downloading ultralytics-8.3.40-py3-none-any.whl (898 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m898.5/898.5 kB[0m [31m23.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ultralytics_thop-2.0.12-py3-none-any.whl (26 kB)
Installing collected packages: ultralytics-thop, ultralytics
Successfully installed ultralytics-8.3.40 ultralytics-thop-2.0.12
Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.


In [None]:
logging.getLogger("ultralytics").setLevel(logging.CRITICAL)

In [None]:
os.environ["KAGGLE_CONFIG_DIR"] = "/content/kaggle.json"
!pip install -q kaggle

In [None]:
!ls -lha kaggle.json
!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
!chmod 600 /root/.kaggle/kaggle.json

-rw-r--r-- 1 root root 69 Dec  3 15:05 kaggle.json


In [None]:
!kaggle datasets download -d wjybuqi/traffic-light-detection-dataset
!unzip /content/traffic-light-detection-dataset.zip

Dataset URL: https://www.kaggle.com/datasets/wjybuqi/traffic-light-detection-dataset
License(s): CC0-1.0
Downloading traffic-light-detection-dataset.zip to /content
 99% 860M/873M [00:11<00:00, 117MB/s] 
100% 873M/873M [00:11<00:00, 82.2MB/s]
Archive:  /content/traffic-light-detection-dataset.zip
  inflating: submit_example.json     
  inflating: test_dataset/test_images/00007.jpg  
  inflating: test_dataset/test_images/00009.jpg  
  inflating: test_dataset/test_images/00024.jpg  
  inflating: test_dataset/test_images/00035.jpg  
  inflating: test_dataset/test_images/00038.jpg  
  inflating: test_dataset/test_images/00044.jpg  
  inflating: test_dataset/test_images/00051.jpg  
  inflating: test_dataset/test_images/00060.jpg  
  inflating: test_dataset/test_images/00069.jpg  
  inflating: test_dataset/test_images/00071.jpg  
  inflating: test_dataset/test_images/00074.jpg  
  inflating: test_dataset/test_images/00075.jpg  
  inflating: test_dataset/test_images/00086.jpg  
  inflating: t

In [None]:
dataset_dir = "/content/train_dataset/train_dataset"
images_dir = "/content/train_dataset/"
json_file = "/content/train_dataset/train.json"
output_dir = "/content/ProcessedTrafficLights"

In [None]:
os.makedirs(output_dir, exist_ok=True)
os.makedirs(os.path.join(output_dir, 'images', 'train'), exist_ok=True)
os.makedirs(os.path.join(output_dir, 'images', 'val'), exist_ok=True)
os.makedirs(os.path.join(output_dir, 'labels', 'train'), exist_ok=True)
os.makedirs(os.path.join(output_dir, 'labels', 'val'), exist_ok=True)

In [None]:
with open(json_file, 'r') as f:
    data = json.load(f)

annotations = data['annotations']


In [None]:
from collections import defaultdict

file_annotations = defaultdict(list)

In [None]:
for ann in annotations:
    filename = os.path.basename(ann['filename']).replace('\\', '/')
    file_annotations[filename].append(ann)

# List of all image filenames
all_images = list(file_annotations.keys())

In [None]:
random.shuffle(all_images)
split_idx = int(len(all_images) * 0.75)
train_files = all_images[:split_idx]
val_files = all_images[split_idx:]


In [None]:
def scale_bbox(bbox, img_width, img_height):
    xmin = bbox['xmin']
    ymin = bbox['ymin']
    xmax = bbox['xmax']
    ymax = bbox['ymax']

    x_expand = (xmax - xmin) * 1.5
    y_expand = (ymax - ymin) * 2

    xmin_new = max(0, xmin - x_expand / 2)
    ymin_new = max(0, ymin - y_expand / 2)
    xmax_new = min(img_width, xmax + x_expand / 2)
    ymax_new = min(img_height, ymax + y_expand / 2)

    return xmin_new, ymin_new, xmax_new, ymax_new


In [None]:
def convert_bbox_to_yolo(bbox, img_width, img_height):
    x_center = (bbox[0] + bbox[2]) / 2 / img_width
    y_center = (bbox[1] + bbox[3]) / 2 / img_height
    width = (bbox[2] - bbox[0]) / img_width
    height = (bbox[3] - bbox[1]) / img_height
    return x_center, y_center, width, height

In [None]:
def process_data(files, subset):
    for filename in files:
        anns = file_annotations[filename]
        img_path = os.path.join(images_dir, filename)
        #img_path = f"content/train_dataset/{filename}"

        # with open(img_path, 'r') as f:
        #   print(f)

        # Handle missing images
        if not os.path.exists(img_path):
            print(f"Image {filename} not found, skipping.")
            continue

        # Read image to get dimensions
        img = cv2.imread(img_path)
        img_height, img_width = img.shape[:2]


        # Copy image to output directory
        shutil.copy(img_path, os.path.join(output_dir, 'images', subset, filename[13:]))

        # Prepare label file
        label_lines = []
        for ann in anns:
            bbox = ann['bndbox']
            # Apply scaling
            xmin_new, ymin_new, xmax_new, ymax_new = scale_bbox(bbox, img_width, img_height)
            # Convert to YOLO format
            x_center, y_center, width, height = convert_bbox_to_yolo(
                [xmin_new, ymin_new, xmax_new, ymax_new], img_width, img_height
            )
            # For this example, we'll use class '0' for all traffic lights
            label_lines.append(f"0 {x_center} {y_center} {width} {height}")

        # Save label file
        label_filename = os.path.splitext(filename[13:])[0] + '.txt'
        label_path = os.path.join(output_dir, 'labels', subset, label_filename)
        with open(label_path, 'w') as f:
            f.write('\n'.join(label_lines))

In [None]:
# Process training and validation data
process_data(train_files, 'train')
process_data(val_files, 'val')

In [None]:
yaml_content = rf"""path: {output_dir}
train: images/train
val: images/val

names:
  0: traffic_light
"""

yaml_path = os.path.join(output_dir, 'traffic_light.yaml')
with open(yaml_path, 'w') as f:
    f.write(yaml_content)

In [None]:
model = YOLO('yolo11n.pt')  # Load the YOLO model

results = model.train(data=yaml_path, epochs=25, imgsz=640)




[34m[1mtrain: [0mScanning /content/ProcessedTrafficLights/labels/train.cache... 1950 images, 0 backgrounds, 0 corrupt: 100%|██████████| 1950/1950 [00:00<?, ?it/s]
[34m[1mval: [0mScanning /content/ProcessedTrafficLights/labels/val.cache... 650 images, 0 backgrounds, 0 corrupt: 100%|██████████| 650/650 [00:00<?, ?it/s]
       1/25      3.59G      2.758      2.864      1.727        117        640: 100%|██████████| 122/122 [01:08<00:00,  1.78it/s]
                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100%|██████████| 21/21 [00:14<00:00,  1.43it/s]
       2/25      3.58G      2.176      1.943      1.442        166        640: 100%|██████████| 122/122 [01:06<00:00,  1.84it/s]
                 Class     Images  Instances      Box(P          R      mAP50  mAP50-95): 100%|██████████| 21/21 [00:10<00:00,  2.01it/s]
       3/25      3.45G      2.081      1.714      1.391        141        640: 100%|██████████| 122/122 [01:05<00:00,  1.85it/s]
            

In [None]:
def compute_mse(pred_boxes, gt_boxes):
    if len(pred_boxes) == 0 or len(gt_boxes) == 0:
        return None  # Cannot compute MSE if there are no predictions or ground truths
    pred_boxes = np.array(pred_boxes)
    gt_boxes = np.array(gt_boxes)
    # For simplicity, match boxes by order
    min_len = min(len(pred_boxes), len(gt_boxes))
    pred_boxes = pred_boxes[:min_len]
    gt_boxes = gt_boxes[:min_len]
    mse = np.mean((pred_boxes - gt_boxes) ** 2)
    return mse


In [None]:
results

ultralytics.utils.metrics.DetMetrics object with attributes:

ap_class_index: array([0])
box: ultralytics.utils.metrics.Metric object
confusion_matrix: <ultralytics.utils.metrics.ConfusionMatrix object at 0x7a16afdcc5e0>
curves: ['Precision-Recall(B)', 'F1-Confidence(B)', 'Precision-Confidence(B)', 'Recall-Confidence(B)']
curves_results: [[array([          0,    0.001001,    0.002002,    0.003003,    0.004004,    0.005005,    0.006006,    0.007007,    0.008008,    0.009009,     0.01001,    0.011011,    0.012012,    0.013013,    0.014014,    0.015015,    0.016016,    0.017017,    0.018018,    0.019019,     0.02002,    0.021021,    0.022022,    0.023023,
          0.024024,    0.025025,    0.026026,    0.027027,    0.028028,    0.029029,     0.03003,    0.031031,    0.032032,    0.033033,    0.034034,    0.035035,    0.036036,    0.037037,    0.038038,    0.039039,     0.04004,    0.041041,    0.042042,    0.043043,    0.044044,    0.045045,    0.046046,    0.047047,
          0.048048, 

In [None]:
def get_ground_truth_boxes(files, subset):
    gt_boxes_dict = {}
    for filename in files:
        anns = file_annotations[filename]
        img_path = os.path.join(images_dir, filename)

        # Handle missing images
        if not os.path.exists(img_path):
            continue

        # Read image to get dimensions
        img = cv2.imread(img_path)
        img_height, img_width = img.shape[:2]

        gt_boxes = []
        for ann in anns:
            bbox = ann['bndbox']
            xmin_new, ymin_new, xmax_new, ymax_new = scale_bbox(bbox, img_width, img_height)
            gt_boxes.append([xmin_new, ymin_new, xmax_new, ymax_new])
        gt_boxes_dict[filename] = gt_boxes
    return gt_boxes_dict

In [None]:
train_gt_boxes = get_ground_truth_boxes(train_files, 'train')
val_gt_boxes = get_ground_truth_boxes(val_files, 'val')

In [None]:
train_predictions = {}
train_mse_list = []
for filename in train_files:
    img_path = os.path.join(output_dir, 'images', 'train', filename)
    img = cv2.imread(img_path)
    results = model(img)
    boxes = results[0].boxes.xyxy.cpu().numpy()  # Predicted bounding boxes
    train_predictions[filename] = boxes.tolist()
    gt_boxes = train_gt_boxes.get(filename, [])
    mse = compute_mse(boxes.tolist(), gt_boxes)
    if mse is not None:
        train_mse_list.append(mse)

train_mse = np.mean(train_mse_list) if train_mse_list else None


In [None]:
val_predictions = {}
val_mse_list = []
for filename in val_files:
    img_path = os.path.join(output_dir, 'images', 'val', filename)
    img = cv2.imread(img_path)
    results = model(img)
    boxes = results[0].boxes.xyxy.cpu().numpy()  # Predicted bounding boxes
    val_predictions[filename] = boxes.tolist()
    gt_boxes = val_gt_boxes.get(filename, [])
    mse = compute_mse(boxes.tolist(), gt_boxes)
    if mse is not None:
        val_mse_list.append(mse)

val_mse = np.mean(val_mse_list) if val_mse_list else None


In [None]:
results_dict = {
    'train': {
        'predictions': train_predictions,
        'mse': train_mse,
    },
    'validation': {
        'predictions': val_predictions,
        'mse': val_mse,
    }
}

# Only print Training and Validation MSE
print(f"Training MSE: {train_mse}")
print(f"Validation MSE: {val_mse}")

Training MSE: 1118612.0150730175
Validation MSE: 1135432.01055694


In [None]:
train_predictions['train_images/00418.jpg']

[[0.1900033950805664,
  211.88951110839844,
  69.98138427734375,
  640.0836791992188],
 [0.0, 457.9764099121094, 92.11576843261719, 631.5182495117188],
 [1.1159770488739014, 538.5988159179688, 68.32482147216797, 847.9644165039062]]

In [None]:
cropped_output_dir = "/content/cropped_output_dir"
image_dir = "/content/train_dataset/train_images"
json_path = "/content/train_dataset/train.json"
os.makedirs(cropped_output_dir, exist_ok=True)

In [None]:
cropped_images = list()
labels = list()
with open(json_path, 'r') as f:
    data = json.load(f)

# Process annotations
for annotation in data['annotations']:
    # Normalize filename path
    filename = annotation['filename'].replace("\\", "/")
    print(filename)
    image_path = os.path.join(image_dir, os.path.basename(filename))

    # Check if the image exists
    if not os.path.exists(image_path):
        print(f"Image file not found: {image_path}")
        continue

    # Load the image
    image = cv2.imread(image_path)

    img_height, img_width = image.shape[:2]
    if image is None:
        print(f"Failed to read image: {image_path}")
        continue

    # Process each inbox bounding box
    for idx, light in enumerate(annotation['inbox']):
        color = light.get('color', 'unknown')  # Default to 'unknown' if color not present
        bbox = light.get('bndbox', None)

        if not bbox:
            print(f"Missing bounding box for: {filename}, light index {idx}")
            continue

        # Extract bounding box coordinates
        xmin, ymin, xmax, ymax = scale_bbox(bbox, img_width, img_height)
        xmin = int(xmin)
        ymin = int(ymin)
        xmax = int(xmax)
        ymax = int(ymax)

        # Crop the image
        cropped_image = image[ymin:ymax, xmin:xmax]
        #Resize the cropped image
        resized_image = cv2.resize(cropped_image, (64, 64))
        # Resize to 64x64
        cropped_images.append(resized_image)
        labels.append(color)

        # Save cropped image
        output_filename = f"{os.path.splitext(os.path.basename(filename))[0]}_{idx}_{color}.jpg"
        output_path = os.path.join(cropped_output_dir, output_filename)
        cv2.imwrite(output_path, cropped_image)

        print(f"Saved cropped image: {output_path}")

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
train_images/02467.jpg
train_images/02467.jpg
train_images/02467.jpg
Saved cropped image: /content/cropped_output_dir/02467_0_yellow.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_green.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_red.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_green.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_red.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_red.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_red.jpg
train_images/02468.jpg
Saved cropped image: /content/cropped_output_dir/02468_0_red.jpg
train_images/02469.jpg
Saved cropped image: /content/cropped_output_dir/02469_0_yellow.jpg
train_images/02469.jpg
Saved cropped image: /content/cropped_output_dir/02469_0_red.jpg

In [None]:
import json
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, Dropout
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import ImageDataGenerator


# Convert data to numpy arrays
X = np.array(cropped_images) / 255.0  # Normalize pixel values
label_to_idx = {label: idx for idx, label in enumerate(set(labels))}
y = np.array([label_to_idx[label] for label in labels])
y = to_categorical(y, num_classes=len(label_to_idx))

# Split into training and validation sets
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

# Model architecture
model = Sequential([
    Conv2D(32, (3, 3), activation='relu', input_shape=(64, 64, 3)),
    MaxPooling2D((2, 2)),
    Conv2D(64, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Conv2D(128, (3, 3), activation='relu'),
    MaxPooling2D((2, 2)),
    Flatten(),
    Dense(128, activation='relu'),
    Dropout(0.5),
    Dense(len(label_to_idx), activation='softmax')
])

# Compile the model
model.compile(optimizer=Adam(learning_rate=0.001), loss='categorical_crossentropy', metrics=['accuracy'])

# Data augmentation
datagen = ImageDataGenerator(
    rotation_range=10,
    width_shift_range=0.1,
    height_shift_range=0.1,
    horizontal_flip=True
)
datagen.fit(X_train)

# Train the model
history = model.fit(
    datagen.flow(X_train, y_train, batch_size=32),
    validation_data=(X_val, y_val),
    epochs=20,
    verbose=1
)

# Save the model
model.save("traffic_light_color_model.h5")

# Evaluate the model
val_loss, val_accuracy = model.evaluate(X_val, y_val, verbose=1)
print(f"Validation Accuracy: {val_accuracy:.2f}")

# Save the label mapping for inference
with open("label_mapping.json", "w") as f:
    json.dump(label_to_idx, f)


Epoch 1/20
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m23s[0m 75ms/step - accuracy: 0.9030 - loss: 0.2825 - val_accuracy: 0.9687 - val_loss: 0.1332
Epoch 2/20
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 53ms/step - accuracy: 0.9684 - loss: 0.1370 - val_accuracy: 0.9756 - val_loss: 0.0990
Epoch 3/20
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 54ms/step - accuracy: 0.9729 - loss: 0.1244 - val_accuracy: 0.9793 - val_loss: 0.0905
Epoch 4/20
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 49ms/step - accuracy: 0.9735 - loss: 0.1231 - val_accuracy: 0.9804 - val_loss: 0.0849
Epoch 5/20
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m22s[0m 55ms/step - accuracy: 0.9725 - loss: 0.1199 - val_accuracy: 0.9788 - val_loss: 0.0935
Epoch 6/20
[1m236/236[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m13s[0m 53ms/step - accuracy: 0.9742 - loss: 0.1120 - val_accuracy: 0.9830 - val_loss: 0.0848
Epoch 7/20
[1m2



[1m59/59[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 3ms/step - accuracy: 0.9872 - loss: 0.0839
Validation Accuracy: 0.99
