In [5]:
# MOUNT FOR GOOGLE DRIVE
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [6]:
!pip install ultralytics
# !git clone https://github.com/ultralytics/yolov5
# %cd yolov5
# !pip install -r requirements.txt

# %cd /content/drive/MyDrive/FinalProjectDeepLearning

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting ultralytics
  Downloading ultralytics-8.0.117-py3-none-any.whl (599 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m599.6/599.6 kB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: ultralytics
Successfully installed ultralytics-8.0.117


In [7]:
# REFERENCES

# GITHUB yolo https://github.com/ultralytics/yolov5

In [8]:
# IMPORTS
import os
import torch
from matplotlib import pyplot as plt
import numpy as np
import cv2
import itertools
import pandas as pd
from ultralytics import YOLO
import xml.etree.ElementTree as ET
import shutil
import random
from google.colab.patches import cv2_imshow
import glob

# Change directory to "FinalProjectDeepLearning"
os.chdir('/content/drive/MyDrive/FinalProjectDeepLearning')

In [9]:
# DATA PATHS
data_path = '/content/drive/MyDrive/FinalProjectDeepLearning/Dataset'
full_ijcnn_path = '/content/drive/MyDrive/FinalProjectDeepLearning/FullIJCNN2013'
test_videos_path = '/content/drive/MyDrive/FinalProjectDeepLearning/TestVideos'

In [10]:
# CLASS LABELS
trafficlight = 0
speedlimit = 1
crosswalk = 2
stop = 3

In [11]:
## DEFINITIONS

# FULLIJCNN DATA CONVERSION

def convert_ppm_to_png(ppm_path, png_path):
    image = cv2.imread(ppm_path + '.ppm', cv2.IMREAD_UNCHANGED)
    cv2.imwrite(png_path + '.png', image)

def get_image_dimensions(image):
    height, width = image.shape[:2]
    return width, height

def create_text_file(file_path, line_content):
    with open(file_path+'.txt', 'w') as file:
        file.write(line_content)

def convert_IJCNN_to_dataset():
  full_ijcnn_labels_to_speedlimit = [0,1,2,3,4,5,7,8]
  full_ijcnn_labels_to_stop = 14

  with open(full_ijcnn_path + '/gt.txt', 'r') as file_:
      lines_full_ijcnn  = file_.readlines()

  for line in lines_full_ijcnn:
    splited_line = line.split(';')

    file_name = splited_line[0][:-4]
    left_col = int(splited_line[1])
    top_row = int(splited_line[2])
    right_col = int(splited_line[3])
    bottom_row = int(splited_line[4])
    label = int(splited_line[5])

    if label == full_ijcnn_labels_to_stop:
      label = stop
    elif label in full_ijcnn_labels_to_speedlimit:
      label = speedlimit
    else:
      label = -1

    if label >= 0:
      image = cv2.imread(full_ijcnn_path + '/' + file_name + '.ppm')
      width, height = get_image_dimensions(image)

      bounding_box = convert_boundary((width, height), (left_col, right_col, top_row, bottom_row))

      with open(f'{data_path}/labels/{file_name}.txt', 'a') as out_file:
        out_file.write(" ".join([str(a) for a in (label, *bounding_box)]) + '\n')

      convert_ppm_to_png(full_ijcnn_path + '/' + file_name, data_path + '/images/' + file_name)

# FIRST DATASET DATA CONVERSION

def convert_boundary(size, boundary):
    dwidht, dheight = 1. / size[0], 1. / size[1]
    widht = boundary[1] - boundary[0]
    height = boundary[3] - boundary[2]
    x = (boundary[0] + boundary[1]) / 2.0 - 1
    y = (boundary[2] + boundary[3]) / 2.0 - 1
    return x * dwidht, y * dheight, widht * dwidht, height * dheight

def convert_RSD_to_dataset():
    for file_ in os.listdir(data_path+'/labels'):
        if file_.endswith('.xml'):
            file_name = file_.split('.')[0]
            out_file = open(data_path + '/labels/' + file_name + '.txt', 'w')

            tree = ET.parse(data_path + '/labels/' + file_)  # for the xml
            root = tree.getroot()
            size = root.find('size')
            width = int(size.find('width').text)
            height = int(size.find('height').text)

            labels = ['trafficlight', 'speedlimit', 'crosswalk', 'stop']

            for obj in root.iter('object'):
                class_name = obj.find('name').text
                if class_name in labels and int(obj.find('difficult').text) != 1:
                    xmlbox = obj.find('bndbox')
                    bounding_box = convert_boundary((width, height), [float(xmlbox.find(x).text) for x in ('xmin', 'xmax', 'ymin', 'ymax')])
                    class_id = labels.index(class_name)
                    out_file.write(" ".join([str(a) for a in (class_id, *bounding_box)]) + '\n'))


# ELIMINAR IMAGENES EN PARTICION DATASET

def clear_folder(folder_path):
  for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    if os.path.isfile(file_path):
      os.remove(file_path)
    elif os.path.isdir(file_path):
      clear_folder(file_path)
      os.rmdir(file_path)

# DATA DIVISIONS

def contains_string(target_string, string_list):
  for string in string_list:
    if string in target_string:
      return True
  return False

def data_division():
  imagesForDA = os.listdir(data_path + 'images')
  labelsForDA = os.listdir(data_path + 'labels')
  list_of_validation_samples = []
  list_of_test_samples = []

  for fileNameImage in imagesForDA:
    if contains_string(fileNameImage, list_of_validation_samples):
      shutil.move(data_path + 'images/'+fileNameImage, data_path + 'validation/images/'+fileNameImage)
    elif contains_string(fileNameImage, list_of_validation_samples):
      shutil.move(data_path + 'images/'+fileNameImage, data_path + 'test/images/'+fileNameImage)
    else:
      shutil.move(data_path + 'images/'+fileNameImage, data_path + 'train/images/'+fileNameImage)

  for labelName in labelsForDA:
    if labelName.endswith(".txt"):
      if contains_string(labelName, list_of_validation_samples):
        shutil.move(data_path + 'labels/'+labelName, data_path + 'validation/labels/'+labelName)
      elif contains_string(labelName, list_of_validation_samples):
        shutil.move(data_path + 'labels/'+labelName, data_path + 'validation/labels/'+labelName)
      else:
        shutil.move(data_path + 'labels/'+labelName, data_path + 'test/labels/'+labelName)

# DEVOLVER IMAGENES A ORIGINAL

def return_to_original():
  train_images = os.listdir(data_path + '/train/images')
  train_labels = os.listdir(data_path + '/train/labels')

  validation_images = os.listdir(data_path + '/validation/images')
  validation_labels = os.listdir(data_path + '/validation/labels')

  test_images = os.listdir(data_path + '/test/images')
  test_labels = os.listdir(data_path + '/test/labels')

  for fileNameImage in train_images:
    shutil.move(data_path + '/train/images/'+fileNameImage, data_path + '/images/'+fileNameImage)

  for fileNameImage in validation_images:
    shutil.move(data_path + '/validation/images/'+fileNameImage, data_path + '/images/'+fileNameImage)

  for fileNameImage in test_images:
    shutil.move(data_path + '/test/images/'+fileNameImage, data_path + '/images/'+fileNameImage)

  for fileLabel in train_labels:
    shutil.move(data_path + '/train/labels/'+fileLabel, data_path + '/labels/'+fileLabel)

  for fileLabel in validation_labels:
    shutil.move(data_path + '/validation/labels/'+fileLabel, data_path + '/labels/'+fileLabel)

  for fileLabel in test_labels:
    shutil.move(data_path + '/test/labels/'+fileLabel, data_path + '/labels/'+fileLabel)

#OTHER WAY TO DO DATA DIVISION

def divide_list_random(list_data, percentage):
    random.shuffle(list_data)
    length = len(list_data)
    split_index = int(length * percentage / 100)
    list_part1 = list_data[:split_index]
    list_part2 = list_data[split_index:]
    return list_part1, list_part2

def data_division_with_percentage():
  imagesForDA = os.listdir(data_path + '/images')
  labelsForDA = os.listdir(data_path + '/labels')

  labels = []

  for label in imagesForDA:
    labels.append(label[:-4])

  labels_train, labels_validation = divide_list_random(labels, 90)
  labels_validation, labels_test = divide_list_random(labels_validation, 50)

  for name in labels_train:
    shutil.move(data_path + '/images/'+name+'.png', data_path + '/train/images/'+name+'.png')
    shutil.move(data_path + '/labels/'+name+'.txt', data_path + '/train/labels/'+name+'.txt')

  for name in labels_validation:
    shutil.move(data_path + '/images/'+name+'.png', data_path + '/validation/images/'+name+'.png')
    shutil.move(data_path + '/labels/'+name+'.txt', data_path + '/validation/labels/'+name+'.txt')

  for name in labels_test:
    shutil.move(data_path + '/images/'+name+'.png', data_path + '/test/images/'+name+'.png')
    shutil.move(data_path + '/labels/'+name+'.txt', data_path + '/test/labels/'+name+'.txt')

# Function to delete files with exceptions

def delete_exception_files(exception_path, directory):
    # Load exceptions from the file
    with open(exception_path, 'r') as file:
        exceptions = file.read().splitlines()

    # Go through each exception
    for exception in exceptions:
        exception = exception.strip(',') # Remove trailing comma if it exists
        # Look for any file that includes the exception in its name, regardless of its extension
        for filepath in glob.glob(directory + f'/**/*{exception}.*', recursive=True):
            print(f'Deleting: {filepath}')
            os.remove(filepath)

# Freeze
def freeze_layer(trainer):
    model = trainer.model
    num_freeze = 10
    print(f"Freezing {num_freeze} layers")
    freeze = [f'model.{x}.' for x in range(num_freeze)]  # layers to freeze
    for k, v in model.named_parameters():
        v.requires_grad = True  # train all layers
        if any(x in k for x in freeze):
            print(f'freezing {k}')
            v.requires_grad = False
    print(f"{num_freeze} layers are freezed.")

# convert_RSD_to_dataset()

In [12]:
%cd /content/drive/MyDrive/FinalProjectDeepLearning

/content


In [None]:
# Execute the training
!python yolov5/train.py --img 320 --batch 16 --epochs 50 --data TrafficVOC.yaml --weights yolov5s.pt --workers 2

[31m[1mrequirements:[0m Ultralytics requirement "gitpython" not found, attempting AutoUpdate...
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting gitpython
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 184.3/184.3 kB 7.9 MB/s eta 0:00:00
Collecting gitdb<5,>=4.0.1 (from gitpython)
  Downloading gitdb-4.0.10-py3-none-any.whl (62 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 62.7/62.7 kB 321.5 MB/s eta 0:00:00
Collecting smmap<6,>=3.0.1 (from gitdb<5,>=4.0.1->gitpython)
  Downloading smmap-5.0.0-py3-none-any.whl (24 kB)
Installing collected packages: smmap, gitdb, gitpython
Successfully installed gitdb-4.0.10 gitpython-3.1.31 smmap-5.0.0

[31m[1mrequirements:[0m 1 package updated per ['gitpython']
[31m[1mrequirements:[0m ⚠️ [1mRestart runtime or rerun command for updates to take effect[0m

[34m[1mtrain: [0mweights=yolov5s.pt, cfg=, data=TrafficVOC.yaml, hyp

In [None]:
# Load the trained weights
model = torch.hub.load('ultralytics/yolov5', 'custom', path='yolov5/runs/train/exp4/weights/best.pt', force_reload=True)

In [None]:
# PASAR UN VIDEO -> SUBIR EL VIDEO, DIVIDIRLO EN FOTOGRAMAS, Y PASARLE CADA FOTOGRAMA POR EL VIDEO

def predict_video(video_path, output_path):
  cap = cv2.VideoCapture(video_path)

  #OUTPUT VIDEO
  frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
  frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
  fps = cap.get(cv2.CAP_PROP_FPS)
  codec = cv2.VideoWriter_fourcc(*"mp4v")
  output_video = cv2.VideoWriter(output_path, codec, fps, (frame_width, frame_height))

  while cap.isOpened():
      ret, frame = cap.read()

      if not ret:
          break

      # RESIZE IMAGE
      height, width = frame.shape[:2]
      frame = cv2.resize(frame, (2 * width, 2 * height))

      result = model(frame)

      output_video.write(np.squeeze(cv2.resize(np.squeeze(result.render()),(width, height))))

  cap.release()
  output_video.release()
  cv2.destroyAllWindows()


model.conf = 0.25
predict_video(test_videos_path + '/TestVideo1.mp4', test_videos_path + '/TestVideo1_Yolov5_ultima_oportunidad.mp4')


In [None]:
# OLD CODE TO EXECUTE THE TRAINING (LOWER LOSSES)

# Load a model
#model = YOLO("yolov8n.yaml")  # build a new model from scratch
#model = YOLO("yolov8n.pt")  # load a pretrained model (recommended for training)
#model = YOLO("yolov5s.yaml")  # build a new model from scratch
#model = YOLO("yolov5su.pt")  # load a pretrained model (recommended for training)

#model.add_callback("on_train_start", freeze_layer)

#model.train(data="TrafficVOC.yaml", epochs=50, imgsz = 640)  # train the model
#metrics = model.val()  # evaluate model performance on the validation set

#model.export()

#model = YOLO("runs/detect/train9/weights/best.pt")

#model.predict(test_videos_path+"/TestVideo1.mp4", save=True)