In [None]:
!git clone https://github.com/ultralytics/yolov5.git
!pip install -r yolov5/requirements.txt

In [None]:
# Include all packages
import gc
import cv2
import shutil
import numpy as np
import pandas as pd
from time import time
from datetime import datetime

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from yolov5.models.yolo import Model
from sklearn.model_selection import train_test_split

from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

import torchvision
from torch.optim.lr_scheduler import ReduceLROnPlateau

In [None]:
try:
    from google.colab import drive
    drive.mount('/content/drive')
    import zipfile
    with zipfile.ZipFile('/content/drive/MyDrive/DL Project/DataSet.zip', 'r') as zip_ref:
        zip_ref.extractall('./')
except:
    print("Using Local Machine")

In [None]:
def ResizeImage(image: np.ndarray, x1: int, y1: int, x2: int, y2: int, newWidth: int, newHeight: int) -> tuple:
    originalHeight, originalWidth = image.shape[:2]
    widthScale = newWidth / originalWidth
    heightScale = newHeight / originalHeight
    resizedImage = cv2.resize(
        image, (newWidth, newHeight), interpolation=cv2.INTER_LINEAR)
    x1New, y1New = int(x1 * widthScale), int(y1 * heightScale)
    x2New, y2New = int(x2 * widthScale), int(y2 * heightScale)
    # scale = min(newWidth / originalWidth, newHeight / originalHeight)
    # resizedImage = cv2.resize(image, (round(originalWidth * scale), round(originalHeight * scale)), interpolation=cv2.INTER_LINEAR)
    # dx = round((newWidth - resizedImage.shape[1]) / 2)
    # dy = round((newHeight - resizedImage.shape[0]) / 2)
    # paddedImage = cv2.copyMakeBorder(resizedImage, dy, dy, dx, dx, cv2.BORDER_CONSTANT, value=[0, 0, 0])
    # x1New, y1New = int(x1 * scale + dx), int(y1 * scale + dy)
    # x2New, y2New = int(x2 * scale + dx), int(y2 * scale + dy)
    # return paddedImage, x1New, y1New, x2New, y2New
    return resizedImage, x1New, y1New, x2New, y2New


In [None]:
def LoadDataSet(dataSetFolderPath: str) -> tuple:
    images = []
    annotations = []
    annotationsFilePath = dataSetFolderPath+"/allAnnotations.csv"
    annotationsDataFrame = pd.read_csv(annotationsFilePath, sep=";")
    uniqueSigns = annotationsDataFrame['Annotation tag'].unique().tolist()
    for index, row in annotationsDataFrame[1:].iterrows():
        image = cv2.imread(dataSetFolderPath+"/"+row[0])
        images.append(image)
        annotations.append(
            [row[2], row[3], row[4], row[5]])

    del annotationsDataFrame

    return images, annotations, len(uniqueSigns)


In [None]:
def PreProcessDataSet(images: list, annotations: list, batchSize: int, resize: tuple) -> tuple:
    resizedImages = []
    newAnnotations = []
    for i, image in enumerate(images):
        [x1, y1, x2, y2] = annotations[i]
        resizedImage, x1New, y1New, x2New, y2New = ResizeImage(
            image, x1, y1, x2, y2, resize[0], resize[1])
        resizedImages.append(resizedImage)
        newAnnotations.append(
            [x1New, y1New, x2New, y2New])

    X_train, X_val, y_train, y_val = train_test_split(
        resizedImages, newAnnotations, test_size=0.3, random_state=42)

    return X_train, X_val, y_train, y_val


In [None]:
class CustomDataset(Dataset):
    def __init__(self, data, transform=None):
        self.data = data
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        inputData, label = self.data[idx]

        if self.transform:
            inputData = self.transform(inputData)
        inputData = torch.from_numpy(inputData).float()
        label = torch.tensor(label).float()
        return inputData, label

def CreateDataLoaders(X_train, X_val, y_train, y_val, batchSize):
    trainDataSet = []
    valDataSet = []
    for i in range(len(X_train)):
        trainDataSet.append((X_train[i], y_train[i]))

    for i in range(len(X_val)):
        valDataSet.append((X_val[i], y_val[i]))

    trainDataSet = CustomDataset(trainDataSet)
    valDataSet = CustomDataset(valDataSet)
    trainDataLoader = DataLoader(
        trainDataSet, batch_size=batchSize, shuffle=True, num_workers=4)
    valDataLoader = DataLoader(
        valDataSet, batch_size=batchSize, shuffle=False, num_workers=4)

    return trainDataLoader, valDataLoader


In [None]:

def TargetstoTensors(targets, batchSize, numAnchors, gridSizes):
    targetObj = []
    targetBox = []
    for grid_size in gridSizes:
        targetObj.append(torch.zeros((batchSize, numAnchors, grid_size, grid_size, 1)))
        targetBox.append(torch.zeros((batchSize, numAnchors, grid_size, grid_size, 4)))

    for batch_index, target in enumerate(targets):
        x1, y1, x2, y2 = target.long()
        x_center, y_center, width, height = (x1 + x2) / 2, (y1 + y2) / 2, x2 - x1, y2 - y1

        for i, grid_size in enumerate(gridSizes):
            x_cell, y_cell = int(x_center * grid_size), int(y_center * grid_size)
            anchor = 0
            try:
                targetObj[i][batch_index, anchor, y_cell, x_cell, 0] = 1
                targetBox[i][batch_index, anchor, y_cell, x_cell] = torch.tensor([x_center, y_center, width, height])
            except Exception as e:
                pass
    return targetObj, targetBox

In [None]:

class SignboardLoss(nn.Module):
    def __init__(self, num_anchors=3):
        super(SignboardLoss, self).__init__()
        self.num_anchors = num_anchors

    def forward(self, preds, targets):
        objectLoss = torch.tensor(0.0, device=preds[0].device)
        boxLoss = torch.tensor(0.0, device=preds[0].device)
        batchSize = preds[0].size(0)
        gridSizes = [pred.size(2) for pred in preds]
        targetObjList, targetBoxList = TargetstoTensors(targets, batchSize, self.num_anchors, gridSizes)

        for i, pred in enumerate(preds):
            targetObj = targetObjList[i].to(pred.device)
            targetBox = targetBoxList[i].to(pred.device)

            objectLoss += nn.BCEWithLogitsLoss()(pred[..., 4:5], targetObj)
            boxLoss += nn.MSELoss()(pred[..., :4], targetBox)

        total_loss = objectLoss + boxLoss
        return total_loss

In [None]:
def CreateYolov5Model(numClasses: int, version: str):
    congfigFile = "yolov5/models/yolov5{}.yaml".format(version)
    model = Model(congfigFile, ch=3, nc=numClasses)
    # model.load_state_dict(torch.load("yolov5{}.pt".format(version))["model"].state_dict(), strict=False)

    return model


In [None]:
def TrainEpoch(model, dataLoader, optimizer, lossFunction, device):
    print("Training Epoch")
    model.train()
    runningLoss = 0
    dataLoaderLen = len(dataLoader)
    for i, (inputs, targets) in enumerate(dataLoader):
        inputs = inputs.permute(0, 3, 1, 2)
        inputs = inputs.to(device)
        targets = targets.to(device)
        optimizer.zero_grad()
        with torch.set_grad_enabled(True):
            outputs = model(inputs)
            loss = lossFunction(outputs, targets)
            loss.backward()
            optimizer.step()

        runningLoss += loss.item() * inputs.size(0)
        if(((i*100)//dataLoaderLen) % 10 == 0):
            print((i*100//dataLoaderLen), end="%,")
    epochLoss = runningLoss / dataLoaderLen
    return model, epochLoss


In [None]:
def ValidateEpoch(model, dataLoader, lossFunction, device):
    print("Validating Epoch")
    model.eval()
    runningLoss = 0
    dataLoaderLen = len(dataLoader)
    for i, (inputs, targets) in enumerate(dataLoader):
        inputs = inputs.permute(0, 3, 1, 2)
        inputs = inputs.to(device)
        targets = targets.to(device)
        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = lossFunction(outputs, targets)
        runningLoss += loss.item() * inputs.size(0)
        if(((i*100)//dataLoaderLen) % 10 == 0):
            print((i*100//dataLoaderLen), end="%,")
    epochLoss = runningLoss / dataLoaderLen
    return epochLoss



In [None]:
def TrainModel(model, trainDataLoader, valDataLoader, epochs, optimizer, scheduler, lossFunction, device):
    for epoch in range(epochs):
        startTime = time()
        print("Epoch {}/{}:".format(epoch+1, epochs))
        startTime = time()
        model, trainingEpochLoss = TrainEpoch(model, trainDataLoader, optimizer, lossFunction, device)
        # validationEpochLoss = ValidateEpoch(model, valDataLoader, lossFunction, device)
        # scheduler.step(validationEpochLoss)
        scheduler.step(trainingEpochLoss)
        endTime = time()
        timeTaken = endTime - startTime
        print()
        print("Training Loss: {:.4f}".format(trainingEpochLoss))
        # print("validation Loss: {:.4f}".format(validationEpochLoss))
        print("Time taken: {}min, {}, secs".format(timeTaken//60, int(timeTaken % 60)))
    
    print("Training complete.")
    return model


In [None]:
batchSize = 32
inputShape = (640, 640)
epochs = 100
numAnchors = 3
yolo5Version = 'm'
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
print("Using {} device".format(device))

In [None]:
print("Downloading Weights of yolo5 Verion ", yolo5Version)
weightsURL = "https://github.com/ultralytics/yolov5/releases/download/v5.0/yolov5{}.pt".format(yolo5Version)
!wget {weightsURL}

In [None]:
images, annotations, numClasses = LoadDataSet("./DataSet")
numClasses = 1

In [None]:
X_train, X_val, y_train, y_val = PreProcessDataSet(
    images, annotations, batchSize, inputShape)
del images
del annotations
gc.collect()

In [None]:
trainDataLoader, valDataLoader = CreateDataLoaders(
    X_train, X_val, y_train, y_val, batchSize)
del X_train
del y_train
del X_val
del y_val
gc.collect()

In [None]:
yolov5Model = CreateYolov5Model(numClasses,yolo5Version)
optimizer = optim.Adam(yolov5Model.parameters(), lr=0.001)
yolov5LossFunction= SignboardLoss()
yolov5Model = yolov5Model.to(device)
yolov5LossFunction = yolov5LossFunction.to(device)
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10, verbose=True)


In [None]:
trainedModel = TrainModel(yolov5Model, trainDataLoader,valDataLoader, epochs, optimizer, scheduler, yolov5LossFunction, device)

In [None]:
date = datetime.now()
date = date.strftime("%m-%d-%H")
torch.save(trainedModel.state_dict(), 'yolov5Model' + date +'.pth')
shutil.copy('/content/yolov5Model' + date +'.pth', '/content/drive/MyDrive/DL Project/Trained Models/yolov5Model' + date +'.pth')
