## Imports

In [3]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px # this is another plotting library for interactive plot

from sklearn.model_selection import train_test_split
from sklearn import metrics, manifold # we will use the metrics and manifold learning modules from scikit-learn
from pathlib import Path # to interact with file paths
from PIL import Image # to interact with images
from tqdm import tqdm # progress bar
from pprint import pprint # pretty print (useful for a more readable print of objects like lists or dictionaries)

import torch
import torch.nn as nn
import torchvision
from torchvision.io import read_image
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import cv2 as cv

print("Pytorch version:", torch.__version__)
print("Torchvision version:", torchvision.__version__)
print("OpenCV version:", cv.__version__)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# device = torch.device("cpu")

Pytorch version: 1.10.0+cu113
Torchvision version: 0.11.1+cu113
OpenCV version: 4.5.4


## Load Pretrained Net and create Detector 

In [4]:
# TESTING, DONT RUN THIS CELL
# Model
from helper_functions import *
# #get sleep package
# from time import sleep
# # model = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True) #faster but less accurate
# yolo = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True) 
# # model = torch.hub.load('ultralytics/yolov5', 'yolov5n6', pretrained=True) 
# # model = torch.hub.load('ultralytics/yolov3', 'yolov3') #bad 


#save the model so that opencv can load it
import torch
import torch.onnx
import torchvision
import torchvision.models as models
import sys

#load classes
class_list = []
with open("models/classes.txt", "r") as f:
    class_list = [cname.strip() for cname in f.readlines()]

def unwrap_detection(output_data):
    class_ids = []
    confidences = []
    boxes = []
    rows = output_data.shape[0]
    for r in range(rows):
        row = output_data[r]
        confidence = row[4]
        if confidence >= 0.4:
            classes_scores = row[5:]
            _, _, _, max_indx = cv.minMaxLoc(classes_scores)
            class_id = max_indx[1]
            if (classes_scores[class_id] > .25):
                confidences.append(confidence)
                class_ids.append(class_id)
                x, y, w, h = row[0].item(), row[1].item(), row[2].item(), row[3].item() 
                box = [int(x), int(y), int(w), int(h)]
                boxes.append(box)
    indexes = cv.dnn.NMSBoxes(boxes, confidences, 0.25, 0.45) 
    result_class_ids = []
    result_confidences = []
    result_boxes = []
    for i in indexes:
        result_confidences.append(confidences[i])
        result_class_ids.append(class_ids[i])
        result_boxes.append(boxes[i])
    return result_class_ids, result_confidences, result_boxes

onnx_yolo_path = "models/yolov5s_128_320.onnx"

yolo =  cv.dnn.readNetFromONNX(onnx_yolo_path) 
print(yolo)

images = [cv.imread(f"test_imgs/img_{i+1}.png") for i in range(100)]

for i in tqdm(range(100)):
    image = images[i]
    image = cv.resize(image, (320, 240))
    #crop to 320x128
    image = image[:128,:]
    assert image.shape == (128, 320, 3), f"Image shape is {image.shape}"
    cv.imshow("original image", image)
    cv.waitKey(1)
    input = cv.dnn.blobFromImage(image, 1/255.0, (320, 128), (0, 0, 0), swapRB=True, crop=False)
    # print(input.shape)
    yolo.setInput(input)
    preds = yolo.forward()
    output = preds[0]
    result_class_ids, result_confidences, result_boxes = unwrap_detection(output)
    # print(result_class_ids, result_confidences, result_boxes)
    for i in range(len(result_boxes)):
        box = result_boxes[i]
        cv.rectangle(image, (box[0], box[1]), (box[0] + box[2], box[1] + box[3]), (0, 255, 0), 2)
        cv.putText(image, class_list[result_class_ids[i]], (box[0], box[1]), cv.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
    cv.imshow("image", image)
    cv.waitKey(1)

cv.waitKey(1)

cv.destroyAllWindows()

<dnn_Net 0x7efcc84a48b0>


100%|██████████| 100/100 [00:06<00:00, 14.63it/s]


In [13]:
# import cv2 
# import numpy as np 
# import time 
# #Loading Yolo 
# net = cv2.dnn.readNetFromDarknet("yolov10_tiny-custom.cfg",r"models/yolov3_tiny-custom_total.weights")
# classes = [] 
# with open("coco.names", "r") as f: 
#   classes = [line.strip() for line in f.readlines()] 
# layer_names = net.getLayerNames() 
# outputlayers=[layer_names[i[0] - 1] for i in net.getUnconnectedOutLayers()] 
# print(outputlayers)

error: OpenCV(4.5.4) /tmp/pip-req-build-3129w7z7/opencv/modules/dnn/src/darknet/darknet_importer.cpp:207: error: (-212:Parsing error) Failed to parse NetParameter file: yolov10_tiny-custom.cfg in function 'readNetFromDarknet'


In [None]:
# Model
# model = torch.hub.load('ultralytics/yolov5', 'yolov5n', pretrained=True) #faster but less accurate
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True) 
# model = torch.hub.load('ultralytics/yolov5', 'yolov5n6', pretrained=True) 
# model = torch.hub.load('ultralytics/yolov3', 'yolov3') #bad 
model.to(device)

# Analyze network

# print(model)

# for param_name, param in model.named_parameters():
#     print(param_name)

# for i, (k, v) in enumerate(model.named_parameters()):
#     print(f'{i} - {k}')

#https://github.com/ultralytics/yolov5/issues/1314

#backnbone is layers 0->9

backbone_layers = [f'model.{x}' for x in range(10)]

backbone = nn.Sequential(
    model.model.model.model[0],
    model.model.model.model[1],
    model.model.model.model[2],
    model.model.model.model[3],
    model.model.model.model[4],
    model.model.model.model[5],
    model.model.model.model[6],
    model.model.model.model[7],
    model.model.model.model[8],
    model.model.model.model[9],
    model.model.model.model[10],
    )

# print(backbone)

class FeatureExtractor(nn.Module):
    def __init__(self, backbone): #(default for 640x320)
        super().__init__()

        ## Pretrained layers
        self.pretrained = backbone

        ### Flatten layer
        self.flatten = nn.Flatten(start_dim=1)

    def forward(self, x):
        # Apply convolutions
        x = self.pretrained(x)
        # Flatten
        x = self.flatten(x)
        return x

class Detector(nn.Module):
    def __init__(self, add_inputs=4, regr_out=22, class_out=13, features=76800): #(default for 640x320)
        super().__init__()
        ### Linear sections
        self.lin = nn.Sequential(
            # First linear layer
            nn.Linear(in_features=features+add_inputs, out_features=512),
            nn.ReLU(True),
            nn.Linear(in_features=512, out_features=regr_out+class_out),
        )
        # #regression output
        # self.lin_regr = nn.Sequential(
        #     # First linear layer
        #     nn.Linear(in_features=1024, out_features=512),
        #     nn.ReLU(True),
        #     # nn.Dropout(p=0.5),
        #     # Second linear
        #     nn.Linear(in_features=512, out_features=regr_out)
        # )
        # # classification output
        # self.lin_class = nn.Sequential(
        #     # First linear layer
        #     nn.Linear(in_features=1024, out_features=512),
        #     nn.ReLU(True),
        #     # nn.Dropout(p=0.5),
        #     # Second linear
        #     nn.Linear(in_features=512, out_features=class_out)
        # )
        
    def forward(self, x):
        x = self.lin(x)
        return x

feature_extractor=FeatureExtractor(backbone)
#define detector
detector = Detector(add_inputs=4, regr_out=22-16+2, class_out=13, features=20480)

#freeeze backbone
for param in feature_extractor.pretrained.parameters():
    param.requires_grad = False

feature_extractor.to(device)
detector.to(device)

# #check
# for param_name, param in detector.named_parameters():
#     print('%s \t- requires_grad=%s' % (param_name, param.requires_grad))

In [None]:
# test backbone
#show the image with opencv
img = cv.imread('tests/test_img.jpg')
#resize to 480 x 640
img = cv.resize(img, (320, 240))
#convert to tensor
img = torch.from_numpy(img).float().permute(2, 0, 1)
#add dimension
img = img.unsqueeze(0).to(device)
print(img.shape)

detector.eval()

# Inference
with torch.no_grad():
    data = torch.zeros(1, 4).to(device)
    feat = feature_extractor(img)
    input = torch.cat((feat, data), dim=1)
    print(input.shape)
    out = detector(input) 
    print(out.shape) # (320, 240)->torch.Size([1, 20480])
                    # (640, 480)->torch.Size([1, 76800])





## Loading images and Labels

In [None]:
#dataset
class CsvDataset(Dataset):
    def __init__(self, folder, transform=None, in_ram=False):
        self.transform = transform
        self.in_ram = in_ram
        self.folder = folder
        self.data = []
        class_labels = []
        with open(folder+'/classification_labels.csv', 'r') as f:
            lines = f.read().split('\n')
            lines = lines[0:-1] #remove footer
            # Get x and y values from each line and append to self.data
            labels = []
            for i in tqdm(range(len(lines))):
                line = lines[i]
                sample = line.split(',')
                #convert to float
                label = np.array([float(s) for s in sample])
                #convert to tensor
                label = torch.from_numpy(label).float()
                # img = img.unsqueeze(0)
                class_labels.append(label)

        input_data = []
        with open(folder+'/input_data.csv', 'r') as f:
            lines = f.read().split('\n')
            lines = lines[0:-1] #remove footer
            # Get x and y values from each line and append to self.data
            labels = []
            for i in tqdm(range(len(lines))):
                line = lines[i]
                sample = line.split(',')
                #convert to float
                label = np.array([float(s) for s in sample])
                #convert to tensor
                label = torch.from_numpy(label).float()
                # img = img.unsqueeze(0)
                input_data.append(label)

        #load labels
        with open(folder+'/regression_labels.csv', 'r') as f:
            lines = f.read().split('\n')
            lines = lines[0:-1] #remove footer
            # Get x and y values from each line and append to self.data
            labels = []
            for i in tqdm(range(len(lines))):
                line = lines[i]
                sample = line.split(',')
                #convert to float
                label = np.array([float(s) for s in sample])
                #convert to tensor
                label = torch.from_numpy(label).float()
                #load img
                # img = cv.imread(folder+f'/img_{i+1}.png')
                # img = cv.resize(img, (320, 240))
                if self.in_ram:
                    img = cv.imread(folder+f'/img_{i+1}.png')
                    img = cv.resize(img, (320, 240))
                    img = torch.from_numpy(img).float().permute(2, 0, 1)
                    self.data.append((img, input_data[i], label, class_labels[i]))  
                else:
                    self.data.append((input_data[i], label, class_labels[i])) #no image
                    #save img to disk
                    # cv.imwrite(folder+f'/img_{i+1}.png', img)

    def __len__(self):
        # The length of the dataset is simply the length of the self.data list
        return len(self.data)

    def __getitem__(self, idx):
        # Our sample is the element idx of the list self.data
        sample = self.data[idx]
        if not self.in_ram:
            # img = read_image(read_image(img_path))
            img = cv.imread(self.folder+f'/img_{idx+1}.png')
            img = cv.resize(img, (320, 240))
            img = torch.from_numpy(img).float().permute(2, 0, 1)
            sample = (img, sample[0], sample[1], sample[2])
        if self.transform:
            sample = self.transform(sample)
        return sample

#create dataset
train_dataset = CsvDataset(folder='training_imgs')

train_dataloader = DataLoader(train_dataset, batch_size=128, shuffle=True)

In [None]:
#test dataloader
sample = next(iter(train_dataloader))
print(sample[0].shape)
print(sample[1].shape)
print(sample[2].shape)
print(sample[3].shape)

## Training

In [None]:
# Training function
def train_epoch(ext, det, dataloader, class_loss_fn, regr_loss_fn, optimizer, device):
    # Set the model to training mode
    ext.eval() #dont train the extractor
    det.train() #train detector
    # Initialize the loss
    train_loss_class = []
    train_loss_regr = []

    err_losses = []
    dist_losses = []
    curv_losses = []
    bb_losses = []

    # Loop over the training batches
    for (img, input_data, regr_label, class_label) in tqdm(dataloader):
        # Move the input and target data to the selected device
        img, input_data, regr_label, class_label = img.to(device), input_data.to(device), regr_label.to(device), class_label.to(device)
        # Compute the features
        features = ext(img)
        #concatenate features and input_data
        input = torch.cat((features, input_data), dim=1)
        # Zero the gradients
        optimizer.zero_grad()
        # Compute the output
        output = det(input)

        #regression 22 values
        #classification: 3 states, 3 next states, 7 signs
        regr_out = output[:, :22-16+2]
        err_out = regr_out[:, :2]
        dist_out = regr_out[:, 2]
        curv_out = regr_out[:, 3]
        bb_out = regr_out[:, 4:8]

        err_label = regr_label[:, :2]
        dist_label = regr_label[:, 2]
        curv_label = regr_label[:, 3]
        bb_label = regr_label[:, 4:8]

        state_out = output[:, 22-16+2:25-16+2]
        next_out = output[:, 25-16+2:28-16+2]
        sign_out = output[:, 28-16+2:]
        
        state_label = class_label[:, :3]
        next_label = class_label[:, 3:6]
        sign_label = class_label[:, 6:]

        # Compute the losses
        # regr_loss = regr_loss_fn(regr_out, regr_label)

        err_loss = 50.0*regr_loss_fn(err_out, err_label)
        dist_loss = 0.1*regr_loss_fn(dist_out, dist_label)
        curv_loss = 0.1*regr_loss_fn(curv_out, curv_label)
        bb_loss = 1.0*regr_loss_fn(bb_out, bb_label)


        state_loss = 0.5*class_loss_fn(state_out, state_label)
        next_loss = 0.1*class_loss_fn(next_out, next_label)
        sign_loss = 5.0*class_loss_fn(sign_out, sign_label)
        loss = err_loss + dist_loss + curv_loss + bb_loss + state_loss + next_loss + sign_loss

        
        # Compute the gradients
        loss.backward()
        # Update the weights
        optimizer.step()
        #batch loss
        c_loss = (state_loss + next_loss + sign_loss).detach().cpu().numpy()
        train_loss_class.append(c_loss)
        err_losses.append(err_loss.detach().cpu().numpy())
        dist_losses.append(dist_loss.detach().cpu().numpy())
        curv_losses.append(curv_loss.detach().cpu().numpy())
        bb_losses.append(bb_loss.detach().cpu().numpy())
    # Return the average training loss
    train_loss_c = np.mean(train_loss_class)
    err_loss = np.mean(err_losses)
    dist_loss = np.mean(dist_losses)
    curv_loss = np.mean(curv_losses)
    bb_loss = np.mean(bb_losses)
    # print(f"Training loss: {train_loss}")
    return train_loss_c, err_loss, dist_loss, curv_loss, bb_loss

def get_avg_loss(ext, det, dataloader, class_loss_fn, regr_loss_fn, device):
    ext.eval()
    det.eval()
    class_losses = []
    regr_losses = []
    with torch.no_grad():
        for (img, input_data, regr_label, class_label) in tqdm(dataloader):
            # Move the input and target data to the selected device
            img, input_data, regr_label, class_label = img.to(device), input_data.to(device), regr_label.to(device), class_label.to(device)
            # Compute the features
            features = ext(img)
            #concatenate features and input_data
            input = torch.cat((features, input_data), dim=1)
            # Compute the output
            output = det(input)
            
            #regression 22 values
            #classification: 3 states, 3 next states, 7 signs
            regr_out = output[:, :22]
            state_out = output[:, 22:25]
            next_out = output[:, 25:28]
            sign_out = output[:, 28:]
            
            state_label = class_label[:, :3]
            next_label = class_label[:, 3:6]
            sign_label = class_label[:, 6:]

            # Compute the losses
            regr_loss = regr_loss_fn(regr_out, regr_label)
            state_loss = class_loss_fn(state_out, state_label)
            next_loss = class_loss_fn(next_out, next_label)
            sign_loss = class_loss_fn(sign_out, sign_label)
            class_loss = state_loss + next_loss + sign_loss

            class_losses.append(class_loss.detach().cpu().numpy())
            regr_losses.append(regr_loss.detach().cpu().numpy())
    # Return the accuracy and test loss
    class_loss = np.mean(class_losses)
    regr_loss = np.mean(regr_losses)
    return class_loss, regr_loss

In [None]:
# #load models
# detector.load_state_dict(torch.load('detector.pt'))
# feature_extractor.load_state_dict(torch.load('feature_extractor.pt'))

#parameters
lr = 0.001
epochs = 3
optimizer = torch.optim.Adam(detector.parameters(), lr=lr)
regr_loss_fn = nn.MSELoss()
class_loss_fn = nn.CrossEntropyLoss()

for epoch in range(epochs):
    print(f"Epoch {epoch+1}/{epochs}")
    train_loss_c, err_loss, dist_loss, curv_loss, bb_loss = train_epoch(feature_extractor, detector, train_dataloader, class_loss_fn, regr_loss_fn, optimizer, device)
    print(f"err_loss: {err_loss}")
    print(f"dist_loss: {dist_loss}")
    print(f"curv_loss: {curv_loss}")
    print(f"bb_loss: {bb_loss}")
    print(f"Classification loss: {train_loss_c}")
    torch.save(detector.state_dict(), 'models/detector.pt')
    torch.save(feature_extractor.state_dict(), 'models/feature_extractor.pt')

In [None]:
#testing
test_dataset = CsvDataset(folder='test_imgs')
test_dataloader = DataLoader(test_dataset, batch_size=100, shuffle=True)

#get accuracy
train_class_loss, train_regr_loss = get_avg_loss(feature_extractor, detector, train_dataloader, class_loss_fn, regr_loss_fn, device)
test_class_loss, test_regr_loss = get_avg_loss(feature_extractor, detector, test_dataloader, class_loss_fn, regr_loss_fn, device)

print(f"Training classification loss: {train_class_loss}")
print(f"Training regression loss: {train_regr_loss}\n")
print(f"Testing classification loss: {test_class_loss}")
print(f"Testing regression loss: {test_regr_loss}")

In [None]:
print(test_dataset.data[0][0].shape)

In [None]:
detector.load_state_dict(torch.load('models/detector.pt'))
feature_extractor.load_state_dict(torch.load('models/feature_extractor.pt'))

# #save pytorch model
# torch.save(detector.state_dict(), 'detector.pt')
# torch.save(feature_extractor.state_dict(), 'feature_extractor.pt')

#save the model so that opencv can load it
import torch
import torch.onnx
import torchvision
import torchvision.models as models
import sys

device = torch.device('cpu')
detector.to(device)
feature_extractor.to(device)
 
onnx_detector_path = "models/detector.onnx"
onnx_feature_extractor_path = "models/feature_extractor.onnx"

# set the model to inference mode
detector.eval()
feature_extractor.eval()
 
# Create some sample input in the shape this model expects 
# This is needed because the convertion forward pass the network once 
dummy_input = torch.randn(1, 3, 240, 320)
dummy_input2 = torch.randn(1, 20484)
torch.onnx.export(feature_extractor, dummy_input, onnx_feature_extractor_path, verbose=True)
torch.onnx.export(detector, dummy_input2, onnx_detector_path, verbose=True)


In [None]:
#test with opencv
sample_image = "training_imgs/img_1.png"
images = [cv.imread(f"training_imgs/img_{i+1}.png") for i in range(100)]
 
#The Magic:
det =  cv.dnn.readNetFromONNX(onnx_detector_path) 
ext = cv.dnn.readNetFromONNX(onnx_feature_extractor_path)

for i in tqdm(range(100)):
    image = images[i]
    blob = cv.dnn.blobFromImage(image, 1.0, (320, 240),(0, 0, 0), swapRB=True, crop=False)
    ext.setInput(blob)
    features = ext.forward()
    # print(features.shape)
    action_vec = np.ones((1,4))
    input = np.concatenate((features, action_vec), axis=1)
    # print(input.shape)
    det.setInput(input)
    preds = det.forward()

print(f"Predictions: {preds}")
print(f"Predictions shape: {preds.shape}")