# Traffic Sign Detection and Classification

This work focuses on the development of a program for the automatic detection and classification of a subset of traffic signs, namely traffic lights, stop signs, speed limit signs and crosswalk signs, using a Deep Learning approach.

The first step is to import all the necessaries libraries that will be used in this project.

In [None]:
import gc
import sys
import cv2
import numpy as np
import pandas as pd
from bbox import BBox2D
import xml.etree.ElementTree as ET
from math import prod
from tqdm import tqdm
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from torchvision import models, transforms

## Hyperparameters

Now we must define the hyperparameters to be used

In [None]:
class Config:
    model_name = "vgg16"
    classes = {'trafficlight': 0, 'stop': 1, 'speedlimit': 2, 'crosswalk': 3}
    data_folder = './data'
    annotations_folder = './data/annotations/'
    images_folder = './data/images/'
    images_size = 300
    num_epochs = 250
    learning_rate = 1e-3
    batch_size = 16
    num_filters = 32
    kernel_size = 5
    pool_size = 2
    padding = 0
    stride = 1
    num_workers = 2 
    device =  "cuda" if torch.cuda.is_available() else "cpu"

## Dataset Classes

These are the Dataset classes that we created, one for the advanced version (multilabel) and the other for the other versions (basic and intermediate)

In [None]:
class ImageClassificationDataset(Dataset):
    def __init__(self, images, transform=None):
        self.images = pd.DataFrame(images, columns=['image_name'])
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = cv2.imread(f'{Config.images_folder}{self.images.iloc[idx, 0]}.png')
        try:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        except:
            print(f'Error reading image {self.images.iloc[idx, 0]}.png')
            return None
        
        if self.transform:
            image = self.transform(image)

        tree = ET.parse(Config.annotations_folder + f'{self.images.iloc[idx, 0]}.xml')
        correct_labels = [movie.text for movie in tree.getroot().iter('name')]
        objects = [obj for obj in tree.getroot().iter('object')]
        objects = [(obj.find('name').text, [int(obj.find('bndbox').find('xmin').text), int(obj.find('bndbox').find('ymin').text), int(obj.find('bndbox').find('xmax').text), int(obj.find('bndbox').find('ymax').text)]) for obj in objects]

        labels = []

        greater_area = 0
        label = None
        if correct_labels:
            for obj in objects:
                box = BBox2D(obj[1])
                area = box.height * box.width
                greater_area = area if area > greater_area else greater_area
                label = obj[0] if (area > greater_area or label is None) else label
        
        labels = Config.classes[label]
        labels = np.asarray(labels)
        labels = torch.from_numpy(labels.astype('long'))

        result = {
            'name': self.images.iloc[idx, 0],
            'image': image.float(),
            'labels': labels.float()
        }

        return result

class ImageMultiLabelDataset(Dataset):
    def __init__(self, images, transform=None):
        self.images = pd.DataFrame(images, columns=['image_name'])
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = cv2.imread(f'{Config.images_folder}{self.images.iloc[idx, 0]}.png')
        try:
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        except:
            print(f'Error reading image {self.images.iloc[idx, 0]}.png')
            return None
        if self.transform:
            image = self.transform(image)

        tree = ET.parse(Config.annotations_folder + f'{self.images.iloc[idx, 0]}.xml')
        correct_labels = [movie.text for movie in tree.getroot().iter('name')]

        labels = []
        for cl in Config.classes.keys():
            labels.append(1) if cl in correct_labels else labels.append(0)

        labels = np.asarray(labels)
        labels = torch.from_numpy(labels.astype('long'))

        result = {
            'name': self.images.iloc[idx, 0],
            'image': image.float(),
            'labels': labels.float()
        }

        return result

## Utils

Define some utils functions mainly for the data display

In [None]:
class Utils:
    @staticmethod
    def calculate_output_size(input_size):
        return (input_size - Config.kernel_size + 2*Config.padding) / Config.stride + 1

    @staticmethod
    def learning_curve_graph(train_history, val_history):
        plt.subplot(2, 1, 1)
        plt.title('Cross Entropy Loss')
        plt.plot(train_history['loss'], label='train')
        plt.plot(val_history['loss'], label='val')
        plt.legend(loc='best')

        plt.subplot(2, 1, 2)
        plt.title('Classification Accuracy')
        plt.plot(train_history['accuracy'], label='train')
        plt.plot(val_history['accuracy'], label='val')

        plt.tight_layout()
        plt.legend(loc='best')
        plt.show()
    
    @staticmethod
    def display_predictions(model, data, multilabel=False):    
        plt.figure(figsize=(15, 15))

        for ind, data in enumerate(tqdm(data)):
            if (ind % 100 == 0) and (ind != 0): plt.show()
            ind = ind % 100

            ######
            # Save this somewhere else
            ######
            inputs, labels = data['image'], data['labels']
            labels = labels.type(torch.LongTensor) 
            inputs, labels = inputs.to('cuda'), labels.to('cuda')
            
            pred = model(inputs)
            probs = F.softmax(pred, dim=1)
            final_pred = torch.argmax(probs, dim=1)

            if multilabel:
                threshold = 0.3
                final_pred = np.array([[1 if i > threshold else 0 for i in j] for j in probs])
                final_pred = torch.from_numpy(final_pred)

            inputs = inputs[0].cpu()
            
            ######
            # Save this somewhere else
            ######
            if multilabel:
                plt.subplot(10, 10, ind + 1)
                plt.axis("off")
                labels = [idx for idx, label in enumerate(labels[0]) if label.item() == 1]
                preds = [idx for idx, pred in enumerate(final_pred[0]) if pred.item() == 1]
                for i, label in enumerate(labels):
                    plt.text(50*i, -1, label, fontsize=14, color='green') # correct
                for j, pred in enumerate(preds):
                    plt.text(50*(i+1) + 50*j, -1, pred, fontsize=14, color='red') # predicted
            else:
                plt.subplot(10, 10, ind + 1)
                plt.axis("off")
                plt.text(0, -1, labels[0].item(), fontsize=14, color='green') # correct
                plt.text(100, -1, final_pred[0].item(), fontsize=14, color='red') # predicted

            plt.imshow(inputs.permute(1, 2, 0).numpy())
        plt.show()

## Train and Test methods

The main methods of our program, which contain the training and testing processes

In [None]:
class Iterator:
    @staticmethod
    def epoch_iterator(dataloader, model, loss_function, optimizer=None, is_train=True, multilabel=False):
        if is_train: assert optimizer is not None, 'When training, please provide an optimizer.'
        
        num_batches = len(dataloader)

        if is_train:
            model.train() # put model in train mode
        else:
            model.eval()

        total_loss = 0.0
        preds = []
        labels = []

        with torch.set_grad_enabled(is_train):
            for batch, data in enumerate(tqdm(dataloader)):
                data['labels'] = data['labels'].type(torch.LongTensor) if not multilabel else data['labels']
                X, y = data['image'].to(Config.device), data['labels'].to(Config.device)

                # Compute prediction error
                pred = model(X)
                m = nn.Sigmoid()

                loss = loss_function(m(pred), y)

                # Backpropagation
                if is_train:
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

                # Save training metrics
                total_loss += loss.item()

                probs = F.softmax(pred, dim=1)
                final_pred = torch.argmax(probs, dim=1)

                if multilabel:
                    threshold = 0.25
                    final_pred = np.array([[1.0 if i > threshold else 0.0 for i in j] for j in probs])
                    final_pred = torch.from_numpy(final_pred).to(Config.device)
                
                preds.extend(final_pred.cpu().numpy())
                labels.extend(y.cpu().numpy())

        return total_loss / num_batches, accuracy_score(labels, preds)

    @staticmethod
    def train(model, train_dataloader, validation_dataloader, loss_fn, multilabel=False):
        train_history = {'loss': [], 'accuracy': []}
        val_history = {'loss': [], 'accuracy': []}

        optimizer = torch.optim.Adam(model.parameters(), lr=1e-3) if multilabel else torch.optim.SGD(model.parameters(), lr=1e-3)
        
        best_val_loss = np.inf
        print("\nStart training...")

        for t in range(Config.num_epochs):
            print(f"\nEpoch {t+1}")
            train_loss, train_acc = Iterator.epoch_iterator(train_dataloader, model, loss_fn, optimizer, multilabel=multilabel)
            print(f"Train loss: {train_loss:.3f} \t Train acc: {train_acc:.3f}")

            val_loss, val_acc = Iterator.epoch_iterator(validation_dataloader, model, loss_fn, is_train=False, multilabel=multilabel)
            print(f"Val loss: {val_loss:.3f} \t Val acc: {val_acc:.3f}")

            # Save best model
            if val_loss < best_val_loss:
                best_val_loss = val_loss
                save_dict = {'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'epoch': t}
                torch.save(save_dict, './pth_models/' + Config.model_name + '_best_model.pth')

            # Save latest model
            save_dict = {'model': model.state_dict(), 'optimizer': optimizer.state_dict(), 'epoch': t}
            torch.save(save_dict, './pth_models/' + Config.model_name + '_latest_model.pth')

            # Values for plotting
            train_history["loss"].append(train_loss)
            train_history["accuracy"].append(train_acc)
            val_history["loss"].append(val_loss)
            val_history["accuracy"].append(val_acc)
        
        print("Finished")
        return train_history, val_history

    @staticmethod
    def test(model, test_data, loss_function, multilabel=False):
        # Load the best model (i.e. model with the lowest val loss...might not be the last model)
        # We could also load the optimizer and resume training if needed
        model = model.to(Config.device)
        checkpoint = torch.load('./pth_models/' + Config.model_name + '_best_model.pth')
        model.load_state_dict(checkpoint['model'])

        test_loss, test_acc = Iterator.epoch_iterator(test_data, model, loss_function, is_train=False, multilabel=multilabel)
        print(f"\nTest Loss: {test_loss:.3f} \nTest Accuracy: {test_acc:.3f}")

        Utils.display_predictions(model, test_data, multilabel)


## Neural Networks

Here we define the models of the Neural Networks to be used

In [None]:
###################################################
# Basic
###################################################
class ClassificationResNet:
    def __init__(self, pre_trained = True):
        self.pre_trained = pre_trained

    def model(self):
        model = models.resnet50(pretrained=self.pre_trained)
        model.fc = nn.Linear(2048, 4)
        model.to(Config.device)
        return model

    def run(self, train_dl, test_dl, validation_dl, loss_fn):
        model = self.model()

        train_history, val_history = Iterator.train(model, train_dl, validation_dl, loss_fn)
        Utils.learning_curve_graph(train_history, val_history)

        Iterator.test(model, test_dl, loss_fn)

class ClassificationVGG16:
    def __init__(self, pre_trained = True):
        self.pre_trained = pre_trained

    def model(self):
        model = models.vgg16(pretrained=self.pre_trained)
        model.classifier[6] = nn.Linear(4096, 4)
        model.to(Config.device)
        return model

    def run(self, train_dl, test_dl, validation_dl, loss_fn):
        model = self.model()

        train_history, val_history = Iterator.train(model, train_dl, validation_dl, loss_fn)
        Utils.learning_curve_graph(train_history, val_history)

        Iterator.test(model, test_dl, loss_fn)

###################################################
# Intermediate
###################################################
class ClassificationCustomNetwork(nn.Module):
    def __init__(self):
        super(ClassificationCustomNetwork, self).__init__()
        self.num_conv_layer = 2
        self.num_max_pool = 1
        output_size = Config.images_size
        for _ in range(self.num_conv_layer):
            output_size = Utils.calculate_output_size(output_size)
        self.output_shape = (output_size, output_size, Config.num_filters)

        for _ in range(self.num_max_pool):
            self.output_shape = (self.output_shape[0]/Config.pool_size, self.output_shape[1]/Config.pool_size, self.output_shape[2])

        self.layers = nn.Sequential(
            nn.Conv2d(3, Config.num_filters, Config.kernel_size),
            nn.ReLU(),
            nn.Conv2d(Config.num_filters, Config.num_filters, Config.kernel_size),
            nn.ReLU(),
            nn.MaxPool2d(Config.pool_size),

            nn.Dropout(0.25),
            nn.Flatten(),
            nn.Linear(int(prod(self.output_shape)), 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 4),
        )

    def forward(self, x):
        logits = self.layers(x)
        return logits

class ClassificationCustomModel:
    def __init__(self, pre_trained = True):
        self.pre_trained = pre_trained

    def model(self):
        return ClassificationCustomNetwork().to(Config.device)

    def run(self, train_dl, test_dl, validation_dl, loss_fn):
        model = self.model()

        train_history, val_history = Iterator.train(model, train_dl, validation_dl, loss_fn)
        Utils.learning_curve_graph(train_history, val_history)

        Iterator.test(model, test_dl, loss_fn)

###################################################
# Advanced - adapt the previous models to solve the original problem, i.e. multilabel classification, and compare their performance
###################################################
class ClassificationMultilabel:
    def __init__(self, model_name, pre_trained = True):
        self.model_name = model_name
        self.pre_trained = pre_trained

    def model(self):
        if self.model_name == 'vgg16': return ClassificationVGG16().model()
        elif self.model_name == 'resnet': return ClassificationResNet().model()
        elif self.model_name == 'custom': return ClassificationCustomModel().model()
        sys.exit('Invalid model')

    def run(self, train_dl, test_dl, validation_dl, loss_fn): 
        model = ClassificationCustomNetwork().to(Config.device)

        train_history, val_history = Iterator.train(model, train_dl, validation_dl, loss_fn, multilabel=True)
        Utils.learning_curve_graph(train_history, val_history)

        Iterator.test(model, test_dl, loss_fn, multilabel=True)

# Run the main code

In [None]:
gc.collect()
torch.cuda.empty_cache()

###################################################
# Global Variables
###################################################
print(f"Using {Config.device} device\n")

###################################################
# Transforms
###################################################
transforms_dict = {
    "train": transforms.Compose([ 
                    transforms.ToPILImage(),
                    transforms.Resize((Config.images_size, Config.images_size)), 
                    transforms.RandomHorizontalFlip(p=0.5),
                    transforms.RandomRotation(degrees=45),
                    transforms.ToTensor()
                ]),
    "validation": transforms.Compose([
                    transforms.ToPILImage(),
                    transforms.Resize((Config.images_size, Config.images_size)), 
                    transforms.ToTensor()
                ]),
    "test": transforms.Compose([
                transforms.ToPILImage(),
                transforms.Resize((Config.images_size, Config.images_size)),
                transforms.ToTensor()
            ])
}

###################################################
# Read Images
###################################################
def read_images(filename):
    images = []
    with open(filename) as file:
        while (line := file.readline().rstrip()):
            images.append(line)
    return images

val_train_images = read_images('./data/train.txt')
test_images = read_images('./data/test.txt')

train_ratio = int(0.8 * len(val_train_images))
validation_ratio = len(val_train_images) - train_ratio

train_images = list(val_train_images[:train_ratio])
validation_images = list(val_train_images[-validation_ratio:])

###################################################
# Models
###################################################
if __name__ == "__main__":
    version = input('Enter the desired version (basic, intermediate, advanced): ')

    if version == 'basic':
        architecture = input('Choose the architecture (vgg16, resnet): ')
        if architecture == 'vgg16':
            neural_network = ClassificationVGG16(True)
        elif architecture == 'resnet':
            neural_network = ClassificationResNet(True)
        else:
            sys.exit('Invalid architecture')
    elif version == 'intermediate':
        neural_network = ClassificationCustomModel(True)
    elif version == 'advanced':
        model = input('Choose the model (vgg16, resnet, custom): ')
        neural_network = ClassificationMultilabel(model, True)
    else:
        sys.exit('Invalid version')

    if version == 'advanced':
        train_data = ImageMultiLabelDataset(train_images, transforms_dict['train'])
        validation_data = ImageMultiLabelDataset(validation_images, transforms_dict['validation'])
        test_data = ImageMultiLabelDataset(test_images, transforms_dict['test'])
    else:
        train_data = ImageClassificationDataset(train_images, transforms_dict['train'])
        validation_data = ImageClassificationDataset(validation_images, transforms_dict['validation'])
        test_data = ImageClassificationDataset(test_images, transforms_dict['test'])

    print(f'Training size: {len(train_data)}\nValidation size: {len(validation_data)} \nTest size: {len(test_data)}\n')

    labels_quantity = {'trafficlight': [], 'stop': [], 'speedlimit': [], 'crosswalk': []}
    for image in train_data:
        if version == 'advanced':
            labels_idx = [i for i, x in enumerate(list(image['labels'])) if x == 1]
            labels = [list(labels_quantity)[idx] for idx in labels_idx]
            for label in labels:
                labels_quantity[label].append(image)
        else:
            label = list(labels_quantity)[int(image['labels'].item())]
            labels_quantity[label].append(image)

    print('Labels quantity:')
    for key, value in labels_quantity.items():
        print(f'\t{key}: {len(value)} images')

    total_presences = sum([len(value) for value in labels_quantity.values()])
    weights = [1 - len(value)/total_presences for value in labels_quantity.values()]

    if version == 'advanced':
        loss_fn = nn.BCELoss(weight = torch.tensor(weights, dtype=torch.float, device=Config.device))
    else:
        loss_fn = nn.CrossEntropyLoss(weight = torch.tensor(weights, dtype=torch.float, device=Config.device))

    train_data = torch.utils.data.DataLoader(train_data, batch_size=Config.batch_size, shuffle=True, drop_last=True)
    validation_data = torch.utils.data.DataLoader(validation_data, batch_size=Config.batch_size, shuffle=False, drop_last=False)
    test_data = torch.utils.data.DataLoader(test_data, batch_size=1, shuffle=True, drop_last=False)
    
    neural_network.run(train_data, test_data, validation_data, loss_fn)
