##### Disclaimer: this notebook is not complete yet

The goal of this project is to create a model for real-time traffic light detection. The dataset used for this project is BDD100k,  however, the annotations and the whole dataset strcture was taken from https://datasetninja.com/bdd100k.

This notebook covers analysis and preperation of the dataset for the training process and shows the results of it.

For the training we are using the state-of-the-art Ultralytics YOLOv8 CV model and the sample version of the dataset with some changes to it.

The whole process will be done in the follwing steps:
1. Load the json files and generate a dataframe from it
2. Review the dataframe
3. Count the objects and store results to another dataframe
4. Generate labels for the YOLO format and exclude all the objects except traffic lights
5. Train the model - this, I cant do localy so its not in the notebook (this is either done on an EC2 machine or Google Colab)
6. Review the results (confusion matrix, performance metrics and a couple of test examples)
7. If we are not satisfied with the results, increase the amount of pictures with traffic lights by taking the pictures and annotations from the complete 100k dataset and/or reduce the amount of background pictures and repeat the process from step 4

#### Step 1 - Loading the json files and generating a dataframe

In [None]:
import numpy as np
import pandas as pd
import json
import os
import itertools
from tqdm import tqdm

In [None]:
def load_annotations(annotations_folder_path):
    annotations = []
    for annotation_name in tqdm(os.listdir(annotations_folder_path)):
        annotation_path = os.path.join(annotations_folder_path, annotation_name)
        annotation = json.load(open(annotation_path))
        annotation["filename"] = annotation_name.replace(".json", "")
        annotations.append(annotation)
    return annotations

In [None]:
def get_flattened_dict(input_dict, base_key="", output_dict={}, index_value=None):
    
    for key in input_dict.keys():
        
        if base_key == "":
            full_key = key     
        else:
            if index_value is None:
                full_key = base_key + "/" + key
            else:
                full_key = base_key + "/" + key + "/" + index_value

        if isinstance(input_dict[key], dict):
            get_flattened_dict(input_dict[key], full_key, output_dict)
            
        elif isinstance(input_dict[key], list):
            for index, item in enumerate(input_dict[key]):
                full_list_key = full_key
                get_flattened_dict(item, full_list_key, output_dict, str(index))
                    
        else:
            output_dict[full_key] = input_dict[key]

In [None]:
KEY_ORDER_DICT = {
    "description": "00-00000-00",
    "tags":[],
    "size": {
        "width": "02-0000-00",
        "height": "03-0000-00"
    },
    "objects": []
}

In [None]:
def get_column_index(split_key, key_order_dict, level=0, id_value=None):
    key = split_key[level]
   
        
    if isinstance(key_order_dict[key], dict):
        return get_column_index(split_key, key_order_dict[key], level+1, id_value)
    
    else:
        column_index = key_order_dict[key]
        if id_value is not None:
            split_column_index = column_index.split("-")
            split_column_index[1] = split_column_index[1].replace("id", id_value)[-5:]
            column_index = "-".join(split_column_index)
        return column_index

In [None]:


def get_sorted_columns(annotation_df_columns):
    sorted_annotation_df_cols = []
    for annotation_df_col in annotation_df_columns:
        split_col = annotation_df_col.split("/")
        col_index = get_column_index(split_col, KEY_ORDER_DICT, 0)
        sorted_annotation_df_cols.append(str(col_index) + "#" + annotation_df_col)
    sorted_annotation_df_cols.sort()
    sorted_annotation_df_cols = [sorted_annotation_df_col.split("#")[1] for sorted_annotation_df_col in sorted_annotation_df_cols]
    return sorted_annotation_df_cols



In [None]:
def get_annotation_df(annotations):
    annotation_dicts = []
    for annotation in annotations:
        annotation_dict = {}
        get_flattened_dict(annotation, "", annotation_dict)
        annotation_dicts.append(annotation_dict)
    annotation_dicts_keys = set(list(itertools.chain.from_iterable([list(annotation_dict.keys()) for annotation_dict in annotation_dicts])))
    annotation_dicts_keys = get_sorted_columns(list(annotation_dicts_keys))
    annotation_df_dict = {key: [] for key in annotation_dicts_keys}
    for annotation_dict in annotation_dicts:
        for key in annotation_df_dict:
            value = annotation_dict[key] if key in annotation_dict else np.nan
            annotation_df_dict[key].append(value)
    json_df = pd.DataFrame(annotation_df_dict)
    return json_df

In [None]:
train_annotations_path = "bdd100ksample/val/ann"
train_annotations = load_annotations(train_annotations_path)



df2 = pd.DataFrame.from_dict(train_annotations, orient='columns')


df2

#### Step 2 - review tha dataframe 

In [None]:
df2

#### Step 3 - object counting and generating two more dataframes

In [None]:
objetcs_id_dict = {
    0: 'car',
    1: 'bus',
    2: 'drivable area',
    3: 'lane',
    4: 'traffic sign',
    5: 'truck',
    6: 'person',
    7: 'traffic light',
    8: 'rider',
    9: 'bike',
    10:'motor',
    11:'train'
}

In [None]:
df_totals = pd.DataFrame({
    'car':[0],
    'bus':[0],
    'drivable area':[0],
    'lane':[0],
    'traffic sign':[0],
    'truck':[0],
    'person':[0],
    'traffic light':[0],
    'rider':[0],
    'bike':[0],
    'motor':[0],
    'train':[0]
})
df_per_picture = pd.DataFrame({
    'car':[0],
    'bus':[0],
    'drivable area':[0],
    'lane':[0],
    'traffic sign':[0],
    'truck':[0],
    'person':[0],
    'traffic light':[0],
    'rider':[0],
    'bike':[0],
    'motor':[0],
    'train':[0]
})

for i in range(len(df2.index)): 
    picture_flags = [False, False, False, False, False, False, False, False, False, False, False, False]
    for o in df2.loc[i,'objects']:
        o_id = (o['classId']-6508800)
        df_totals.loc[0,objetcs_id_dict[o_id]] = df_totals.loc[0,objetcs_id_dict[o_id]] + 1
        if not picture_flags[o_id]:
            df_per_picture.loc[0,objetcs_id_dict[o_id]] = df_per_picture.loc[0,objetcs_id_dict[o_id]] + 1
            picture_flags[o_id] = True
        

In [None]:
df_totals

In [None]:
df_per_picture

#### Step 4 - Generating the labels in YOLO format

In [None]:
for i in range(len(df2.index)):
    save_location = "bdd100ksample/train/labels/" + df2.loc[i,'filename'].rstrip('.jpg') + ".txt"
    file = open(save_location,'w')
    for o in df2.loc[i,'objects']:
        if o['classId'] - 6508800 != 7:
            continue
        str_line = ""
        str_line += ("0 ")
        str_line += (str((o['points']['exterior'][1][0]+o['points']['exterior'][0][0])/(2*1280)) + " ")
        str_line += (str((o['points']['exterior'][1][1]+o['points']['exterior'][0][1])/(2*720)) + " ")
        str_line += (str((o['points']['exterior'][1][0]-o['points']['exterior'][0][0])/(1280)) + " ")
        str_line += (str((o['points']['exterior'][1][1]-o['points']['exterior'][0][1])/(720)) + "\n")
        file.write(str_line)
    file.close()

#### Step 5 - Train the model (code below shouldn't be here)

In [None]:
#!pip install ultralytics
from ultralytics import YOLO

from IPython.display import display, Image

In [None]:
model = YOLO('yolov8n.yaml')

In [None]:
results = model.train(data='bdd100ksample/data.yaml', batch = 2, imgsz=1280)

#### Step 6 - review the results

In [None]:
import cv2

In [None]:
imgs = []
model = YOLO('mod2.pt')
for img in tqdm(os.listdir( 'bdd100ksample/test/images')):
    img_path = os.path.join('bdd100ksample/test/images', img)
    imgs.append(img_path)
    
for img in imgs:
    imgr = cv2.imread(img,1)
    results = model(img)[0]
    for result in results.boxes.data.tolist():
        x1, y1, x2, y2, score, class_id = result
        cv2.rectangle(imgr, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 4)
        cv2.putText(imgr, results.names[int(class_id)].upper(), (int(x1), int(y1 - 10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.3, (0, 255, 0), 3, cv2.LINE_AA)
        cv2.imshow('',imgr)
        if cv2.waitKey(0) == ord('q'):
            break
    if cv2.waitKey(0) == ord('q'):
        cv2.destroyAllWindows()
        break      
    cv2.destroyAllWindows()

Generisanje datafrejma - ceo dataset

Train

In [None]:
train_annotations_path = "bdd100k/train/ann"
train_annotations = load_annotations(train_annotations_path)

df = pd.DataFrame.from_dict(train_annotations, orient='columns')

df

In [None]:
df_totals2 = pd.DataFrame({
    'car':[0],
    'bus':[0],
    'drivable area':[0],
    'lane':[0],
    'traffic sign':[0],
    'truck':[0],
    'person':[0],
    'traffic light':[0],
    'rider':[0],
    'bike':[0],
    'motor':[0],
    'train':[0]
})
df_per_picture2 = pd.DataFrame({
    'car':[0],
    'bus':[0],
    'drivable area':[0],
    'lane':[0],
    'traffic sign':[0],
    'truck':[0],
    'person':[0],
    'traffic light':[0],
    'rider':[0],
    'bike':[0],
    'motor':[0],
    'train':[0]
})

for i in range(len(df.index)): 
    picture_flags = [False, False, False, False, False, False, False, False, False, False, False, False]
    for o in df.loc[i,'objects']:
        o_id = (o['classId']-6508800)
        df_totals2.loc[0,objetcs_id_dict[o_id]] = df_totals2.loc[0,objetcs_id_dict[o_id]] + 1
        if not picture_flags[o_id]:
            df_per_picture2.loc[0,objetcs_id_dict[o_id]] = df_per_picture2.loc[0,objetcs_id_dict[o_id]] + 1
            picture_flags[o_id] = True

In [None]:
df_totals2

In [None]:
df_per_picture2

Val

Transformacija sample dataseta

1. Brisanje svih slika i anotacija koje ne sadrze semafor u sebi

In [None]:
jpg_dict = {}

for picture in tqdm(os.listdir('bdd100ksample/train/images')):
       jpg_dict[picture] = False


In [None]:
for ind in df2.index:
    for obj in df2['objects'][ind]:
        if (obj['classId']-6508800)==7:
            jpg_dict[df2['filename'][ind]] = True
            break


In [None]:
for pair in jpg_dict:
    if not jpg_dict[pair]:
        os.remove('bdd100k/train/ann/'+pair+'.json')


2. Kopiranje slika iz celog dataseta u sample

In [None]:
jpg_dict = {}

for ind in df.index:
    jpg_dict[df['filename'][ind]] = False
    

In [None]:
for picture in tqdm(os.listdir('bdd100ksample/train/images')):
       jpg_dict[picture] = True

In [None]:
import shutil
sem_picture_num = 900-435
no_sem_picture_num = 27
for ind in df.index:
    pic_filename = df['filename'][ind]
    
    if jpg_dict[pic_filename]:
        continue
        
    is_sem_pic = False
    for obj in df['objects'][ind]:
        if (obj['classId']-6508800)==7:
            shutil.copyfile('bdd100k/train/ann/'+pic_filename+'.json', 'bdd100ksample/train/ann/'+pic_filename+'.json')
            shutil.copyfile('bdd100k/train/images/'+pic_filename, 'bdd100ksample/train/images/'+pic_filename)
            sem_picture_num-=1
            is_sem_pic = True
            break
    if not is_sem_pic and no_sem_picture_num > 0:
        shutil.copyfile('bdd100k/train/ann/'+pic_filename+'.json', 'bdd100ksample/train/ann/'+pic_filename+'.json')
        shutil.copyfile('bdd100k/train/images/'+pic_filename, 'bdd100ksample/train/images/'+pic_filename)
        no_sem_picture_num-=1
    if no_sem_picture_num==0:
        break

Prikaz rezultata sa poboljsanim sample datasetom ...

Transformacija celog dataseta

1. Smanjivanje broja slika bez semafora na 1000

In [None]:
total_no_sem = 1000
for ind in df.index:
    to_delete = True
    for obj in df['objects'][ind]:
        if (obj['classId']-6508800)==7:
            to_delete = False
            break
    if total_no_sem>0:
        total_no_sem -=1
        to_delete = False
    if to_delete:
        os.remove('bdd100k/train/ann/'+pair+'.json')
        os.remove('bdd100k/train/images/'+pair)


ponovno ucitavanje dataseta

In [None]:
train_annotations_path = "bdd100k/train/ann"
train_annotations = load_annotations(train_annotations_path)

df = pd.DataFrame.from_dict(train_annotations, orient='columns')

df

2. Transformacija u yolo format

In [None]:
#srediti jos

for i in range(len(df.index)):
    save_location = "bdd100k/train/labels/" + df.loc[i,'filename'].rstrip('.jpg') + ".txt"
    file = open(save_location,'w')
    for o in df.loc[i,'objects']:
        if o['classId'] - 6508800 != 7:
            continue
        str_line = ""
        str_line += ("0 ")
        str_line += (str((o['points']['exterior'][1][0]+o['points']['exterior'][0][0])/(2*1280)) + " ")
        str_line += (str((o['points']['exterior'][1][1]+o['points']['exterior'][0][1])/(2*720)) + " ")
        str_line += (str((o['points']['exterior'][1][0]-o['points']['exterior'][0][0])/(1280)) + " ")
        str_line += (str((o['points']['exterior'][1][1]-o['points']['exterior'][0][1])/(720)) + "\n")
        file.write(str_line)
    file.close()


Prikaz rezultata nad celim skupom...

In [None]:
for i in range(len(df.index)): 
    img = cv2.imread("bdd100k/train/images/"+df.loc[i,'filename'],1)
   # print(i)
    for o in df.loc[i,'objects']:
        o_id = (o['classId']-6508800)
        if o_id != 7:
            continue
        cv2.rectangle(img,(o['points']['exterior'][0][0],o['points']['exterior'][0][1]),(o['points']['exterior'][1][0],o['points']['exterior'][1][1]),(0,0,255),2)
        cv2.imshow(df.loc[i,'filename'],img)
        if cv2.waitKey(0) == ord('q'):
            break
    if cv2.waitKey(0) == ord('q'):
        cv2.destroyAllWindows()
        break      
    cv2.destroyAllWindows()
        




In [None]:
import os


import cv2


VIDEOS_DIR = os.path.join('.', 'videos')

video_path = 'vid5.mp4'
video_path_out = '{}_out.mp4'.format(video_path)

cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
H, W, _ = frame.shape
out = cv2.VideoWriter(video_path_out, cv2.VideoWriter_fourcc(*'MP4V'), int(cap.get(cv2.CAP_PROP_FPS)), (W, H))



# Load a model
model = YOLO('mod.pt')  # load a custom model

threshold = 0.5

while ret:

    results = model(frame)[0]

    for result in results.boxes.data.tolist():
        x1, y1, x2, y2, score, class_id = result

        if score > threshold:
            cv2.rectangle(frame, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 4)
            cv2.putText(frame, results.names[int(class_id)].upper(), (int(x1), int(y1 - 10)),
                        cv2.FONT_HERSHEY_SIMPLEX, 1.3, (0, 255, 0), 3, cv2.LINE_AA)

    out.write(frame)
    ret, frame = cap.read()

cap.release()
out.release()
cv2.destroyAllWindows()