<a href="https://colab.research.google.com/github/willjhliang/traffic-sign-recognition/blob/main/models.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Download dataset from github repo
!git clone --quiet https://github.com/willjhliang/traffic-sign-recognition.git
!mv traffic-sign-recognition/data .
!rm -r traffic-sign-recognition

In [None]:
import os
from copy import deepcopy
import itertools
from tqdm import tqdm
import random

import numpy as np
import pandas as pd
from PIL import Image
import cv2
from matplotlib import pyplot as plt
plt.style.use('seaborn-whitegrid')

from sklearn.model_selection import KFold
from sklearn.base import clone
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import AdaBoostClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression 
from sklearn.svm import SVC
from sklearn.decomposition import PCA
import xgboost as xgb

import torch
from torch import nn
from torch import optim
from torch.utils import data
from torch import Tensor
import torchvision

from sklearn.metrics import confusion_matrix
import seaborn as sns
from sklearn.metrics import classification_report
from tabulate import tabulate

In [None]:
K = 36                  # Number of classes
S = 32                  # Size of image, dimension is (s, s, 3)
class_size = 320        # Number of images per class
validation_ratio = 0.1  # Proportion of training data to set aside for validation
test_ratio = 0.1        # Proportion of full training data to set aside for testing

random_seed = 19104     # Seed all random operations to ensure reproducibility
random.seed(random_seed)
np.random.seed(random_seed)
torch.manual_seed(random_seed)

# Dataset

In [None]:
def load_data(datapath):
    """Loads images from files and performs basic pre-processing."""
    data = {}
    for k in range(K):
        data[k] = []

    for f in os.listdir(datapath):
        k = int(f[:3])  # Get label from filename
        img = Image.open(os.path.join(datapath, f))
        img = np.asarray(img) / 255  # Set pixel values to [0, 1]
        if len(data[k]) < class_size:
            data[k].append(img)

    train_data, test_data = {}, {}
    for k in range(K):
        random.shuffle(data[k])
        split = int(len(data[k]) * test_ratio)
        train_data[k] = data[k][split:]
        test_data[k] = data[k][:split]
    
    return train_data, test_data

In [None]:
train_data, test_data = load_data('data/filtered_images')
labels = pd.read_csv("data/filtered_labels.csv")

## Data Exploration

We'll explore the dataset by displaying example images from each class. We also plot the number of images belonging to each class and find that it's extremely variable.

In [None]:
fig, axs = plt.subplots(6, 10, figsize=(15, 8))
for k, (i, j) in itertools.zip_longest(range(K), list(itertools.product(range(6), range(10))), fillvalue=-1):
    axs[i,j].axis('off')
    if k >= 0:
        axs[i,j].imshow(train_data[k][0])  # Visualize the first image of every class

In [None]:
fig, axs = plt.subplots(6, 10, figsize=(15, 8))
for k, (i, j) in itertools.zip_longest(range(K), list(itertools.product(range(6), range(10))), fillvalue=-1):
    axs[i,j].axis('off')
    if k >= 0 and len(test_data[k]) > 0:
        axs[i,j].imshow(test_data[k][0])  # Visualize the first image of every class in the testing set

In [None]:
train_class_dist = [len(train_data[k]) for k in range(K)]
test_class_dist = [len(test_data[k]) for k in range(K)]

fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].bar(list(range(K)), train_class_dist)
axs[1].bar(list(range(K)), test_class_dist);

## Data Preprocessing

To preprocess our data, we'll first augment the classes with fewer image examples. Our augmentation scheme includes cropping, rotation, and brightness changes; note that we don't apply any flips since it violates the symbols on traffic signs.

After augmentation, we reshape the data to an array format and store labels as integers.

In [None]:
def center_crop(img, center_percentage):
    """Crops out edges of an image, leaving the center."""
    width, height, _ = img.shape
    width_offset = int(width * (1 - center_percentage) / 2)
    height_offset = int(height * (1 - center_percentage) / 2)
    img = img[width_offset:width-width_offset, height_offset:height-height_offset]
    return img


def rotate_img(img, angle):
    """Rotates an image and replaces empty space with black."""
    height, width, _ = img.shape
    center_x, center_y = (width // 2, height // 2)

    rot_mat = cv2.getRotationMatrix2D((center_x, center_y), angle, 1.0)
    cos = np.abs(rot_mat[0, 0])
    sin = np.abs(rot_mat[0, 1])

    new_width = int((height * sin) + (width * cos))
    new_height = int((height * cos) + (width * sin))
    rot_mat[0, 2] += (new_width / 2) - center_x
    rot_mat[1, 2] += (new_height / 2) - center_y

    img = cv2.warpAffine(img, rot_mat, (new_width, new_height))
    img = cv2.resize(img, (width, height))

    return img


def shift_brightness(img, shift):
    """Adjusts brightness of all pixels in image."""
    img = np.clip(img + shift, 0, 1)
    return img

In [None]:
def augment_img(img):
    """Augments image with rotation, cropping, and brightness shifts."""
    rot_angle = random.randint(-20, 20)
    crop_center_percentage = random.randint(70, 90) / 100
    crop_center_percentage = 0.8
    brightness_shift = random.randint(-10, 10) / 100

    img = rotate_img(img, rot_angle)
    # img = center_crop(img, crop_center_percentage)
    # img = shift_brightness(img, brightness_shift)
    img = center_crop(img, 0.8)

    return img

In [None]:
fig, axs = plt.subplots(6, 10, figsize=(15, 8))
for k, (i, j) in itertools.zip_longest(range(K), list(itertools.product(range(6), range(10))), fillvalue=-1):
    axs[i,j].axis('off')
    if k >= 0:
        img = augment_img(train_data[k][-1])
        axs[i,j].imshow(augment_img(img))

In [None]:
max_k_size = max([len(train_data[k]) for k in range(K)])
for k in range(K):
    k_size = len(train_data[k])
    for i in range(max_k_size - k_size):  # Add augmented images until we have class_size images
        train_data[k].append(augment_img(train_data[k][i % k_size]))

In [None]:
aug_class_dist = [len(train_data[k]) for k in range(K)]

fig, axs = plt.subplots(1, 2, figsize=(12, 4))
axs[0].bar(list(range(K)), train_class_dist)
axs[1].bar(list(range(K)), aug_class_dist);

In [None]:
def prepare_data(data):
    """Converts image-label data from map to numpy arrays."""
    X = []
    y = []
    for k in range(K):
        for i in data[k]:
            i = cv2.resize(i, (S, S))
            X.append(np.swapaxes(i, 0, -1))
            y.append(k)
    X = np.array(X)
    y = np.array(y)
    
    shuffled_indices = np.random.permutation(len(X))
    X = X[shuffled_indices]
    y = y[shuffled_indices]
    X_flattened = np.reshape(X, (X.shape[0], -1))
    
    return X, X_flattened, y

In [None]:
X_train, X_train_flattened, y_train = prepare_data(train_data)
X_test, X_test_flattened, y_test = prepare_data(test_data)

In [None]:
def get_validation(X_train, y_train):
    """Splits training data into train and validation sets. Used in models below."""
    val_split = int(X_train.shape[0] * validation_ratio)
    X_train, X_val = X_train[val_split:], X_train[:val_split]
    y_train, y_val = y_train[val_split:], y_train[:val_split]
    return X_train, X_val, y_train, y_val

## Dimensionality Reduction

With 32 x 32 features, our models below take a long time to converge. We try both PCA and neural network autoencoders to reduce the feature space before training our sklearn models.

In [None]:
covar_matrix = PCA(n_components=32*32)
covar_matrix.fit(X_train_flattened)
variance = covar_matrix.explained_variance_ratio_
var=np.cumsum(np.round(covar_matrix.explained_variance_ratio_, decimals=3)*100)
plt.plot(var[:300]);

In [None]:
pca = PCA(n_components=130)
pca.fit(X_train_flattened)
X_train_pca = pca.transform(X_train_flattened)
X_test_pca = pca.transform(X_test_flattened)

# Models

The following is a set of models we run on the data. Starting with the most simple baseline K-Nearest Neighbors, we move toward more complex models.
1. Baseline KNN
2. Adaboost
3. Logistic Regression
4. Kernelized SVM
5. Dense Neural Network
6. Convolutional Neural Network

We also test two more advanced strategies.
1. Autoencoder dimensionality reduction allows us to embed the images in a lower dimensional space, which may lead to stronger classification performance by simpler models.
2. Transfer learning with a CNN allows us to adapt weights from pre-trained networks to our traffic sign recognition problem.

## Baseline KNN

Train a baseline K-Nearest Neighbors models to classify traffic sign images. Use 5-Fold cross validation to determine the best value of K.

In [None]:
kf = KFold(n_splits=5)


def evaluate_kfold(model_base, X_train, y_train):
    """Evaluates the given model with K-Fold cross validation."""
    total_acc = 0
    for train_index, val_index in kf.split(X_train): # Iterate through folds
       # Split data into training data and validation data
        X_train_fold, X_val_fold = X_train[train_index], X_train[val_index]
        y_train_fold, y_val_fold = y_train[train_index], y_train[val_index]
 
        # Train model
        model = clone(model_base)
        model.fit(X_train_fold, y_train_fold)
        total_acc += model.score(X_val_fold, y_val_fold)
       
    avg_acc = total_acc / 5
    return avg_acc


def knn(X_train, y_train, X_test, y_test):
    k_values = [1, 3, 5, 7, 9, 11, 13, 15]
    best_k = -1
    best_acc = 0

    accs = []
    for k_neighbors in tqdm(k_values, leave=False):
        avg_acc = evaluate_kfold(KNeighborsClassifier(n_neighbors = k_neighbors), X_train, y_train)
        accs.append(avg_acc)
        if avg_acc > best_acc:
            best_acc = avg_acc
            best_k = k_neighbors
 
    # Plot validation scores for each tested k-value
    plt.plot(k_values, accs)
    plt.show()
    print(f"Optimal k: {best_k}")
 
    # Train model with the best k value
    model = KNeighborsClassifier(n_neighbors=best_k)
    model.fit(X_train, y_train)
    
    return model.score(X_test, y_test)

## Adaboost

In [None]:
def adaboost(X_train_full, y_train_full, X_test, y_test):
    X_train, X_val, y_train, y_val = get_validation(X_train_full, y_train_full)

    learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
    best_lr = -1
    best_acc = 0

    accs = []
    for lr in tqdm(learning_rates, leave=False):
        model = AdaBoostClassifier(
            DecisionTreeClassifier(max_depth=1),
            n_estimators=200,
            algorithm="SAMME.R",
            learning_rate=lr,
            random_state=random_seed
        )
        model.fit(X_train, y_train)
        acc = model.score(X_val, y_val)
        accs.append(acc)
        if acc > best_acc:
            best_acc = acc
            best_lr = lr
  
    plt.plot(accs)
    plt.xticks(list(range(len(accs))), [str(lr) for lr in learning_rates])
    plt.show()
    print(f'Optimal learning rate: {best_lr}')
    
    model = AdaBoostClassifier(
        DecisionTreeClassifier(max_depth=1),
        n_estimators=200,
        algorithm="SAMME.R",
        learning_rate=lr,
        random_state=random_seed
    )
    model.fit(X_train_full, y_train_full)
 
    return model.score(X_test, y_test)

## XGBoost

In [None]:
def xgboost(X_train_full, y_train_full, X_test, y_test):
    X_train, X_val, y_train, y_val = get_validation(X_train_full, y_train_full)

    learning_rates = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8]
    best_lr = -1
    best_acc = 0
 
    accs = []
    for lr in tqdm(learning_rates, leave=False):
        model = xgb.XGBClassifier(n_estimators=200, max_depth=1, learning_rate=0.1, objective='multi:softmax', booster='gbtree', num_classes=K)
        model.fit(X_train, y_train)
        acc = model.score(X_val, y_val)
        accs.append(acc)
        if acc > best_acc:
            best_acc = acc
            best_lr = lr
     
    plt.plot(accs)
    plt.xticks(list(range(len(accs))), [str(lr) for lr in learning_rates])
    plt.show()
    print(f'Optimal learning rate: {best_lr}')
  
    model = xgb.XGBClassifier(n_estimators=200, max_depth=1, learning_rate=lr, objective='multi:softmax', booster='gbtree', num_classes=K)
    model.fit(X_train_full, y_train_full)
    return model.score(X_test, y_test)


## Logistic Regression

In [None]:
def logistic_regression(X_train_full, y_train_full, X_test, y_test):
    X_train, X_val, y_train, y_val = get_validation(X_train_full, y_train_full)

    C_values = [0.01, 0.1, 1.0, 10, 100]
    best_C = -1
    best_acc = 0
    
    accs = []
    for c in tqdm(C_values, leave=False):
        model = LogisticRegression(
            penalty='l2',
            C=c,
            multi_class='multinomial',
            solver='saga',
            max_iter=500
        )
        model.fit(X_train, y_train)
        acc = model.score(X_val, y_val)
        accs.append(acc)
        if acc > best_acc:
            best_acc = acc
            best_C = c
        
    # Get the best combination
    plt.plot(accs)
    plt.xticks(list(range(len(accs))), [str(c) for c in C_values])
    plt.show()
    print(f'Optimal C: {best_C}')
 
    # Train new model with best combination
    bestlogModel = LogisticRegression(
        penalty='l2',
        C=best_C,
        multi_class = 'multinomial',
        solver='saga',
        max_iter=500
    )
    bestlogModel.fit(X_train_full, y_train_full)
    return bestlogModel.score(X_test, y_test)

## Kernelized SVM

In [None]:
def kernel_svm(X_train_full, y_train_full, X_test, y_test):
    X_train, X_val, y_train, y_val = get_validation(X_train_full, y_train_full)

    kernels = ['linear', 'poly', 'rbf']
    C_values = [0.01, 0.1, 1, 10, 100]
    best_kernel = ''
    best_C = -1
    best_acc = 0

    accs = {}
    for kernel in kernels:
        accs[kernel] = []
    for kernel, c in tqdm(itertools.product(kernels, C_values), leave=False):
        model = SVC(kernel=kernel, C=c)
        model.fit(X_train, y_train)
        acc = model.score(X_val, y_val)
        accs[kernel].append(acc)
        if acc > best_acc:
            best_acc = acc
            best_C = c
            best_kernel = kernel
    
    best_accs = [max(accs[kernel]) for kernel in kernels]
    plt.bar(kernels, best_accs)
    plt.xticks(list(range(len(best_accs))), kernels)
    plt.show()
    print(f'Optimal kernel: {best_kernel}')

    model = SVC(kernel=best_kernel, C=best_C)
    model.fit(X_train_full, y_train_full)
    return model.score(X_test, y_test)

## Dense Neural Network

In [None]:
def load_torch_data(X_train, y_train, X_test, y_test):
    X_train, X_val, y_train, y_val = get_validation(X_train, y_train)
    train_set = data.TensorDataset(Tensor(X_train), Tensor(y_train))
    val_set = data.TensorDataset(Tensor(X_val), Tensor(y_val))
    test_set = data.TensorDataset(Tensor(X_test), Tensor(y_test))
    train_loader = data.DataLoader(train_set, batch_size=32, shuffle=True)
    val_loader = data.DataLoader(val_set, batch_size=32, shuffle=True)
    test_loader = data.DataLoader(test_set, batch_size=32, shuffle=True)
    return train_loader, val_loader, test_loader
    

def train_network(model, train_loader, val_loader, epochs, lr):
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr) 

    train_loss = []
    train_acc = []
    val_acc = []
    for epoch in range(epochs):
        running_loss = 0
        for itr, (image, label) in enumerate(train_loader):
            optimizer.zero_grad()
            y_predicted = model(image)
            label = label.long()

            loss = criterion(y_predicted, label)
            running_loss += loss.item()
            loss.backward()
            optimizer.step()
    
        train_loss.append(running_loss)
        train_acc.append(evaluate_network(model, train_loader))
        val_acc.append(evaluate_network(model, val_loader))
        print(f'Epoch: {epoch+1:03}, Loss: {running_loss:9.4f}, Train Accuracy: {train_acc[-1]:.4f}, Validation Accuracy: {val_acc[-1]:.4f}')

    fig, axs = plt.subplots(1, 2, figsize=(12, 4))
    axs[0].plot(train_loss)
    axs[1].plot(list(range(epochs)), train_acc, val_acc);

    return model


def evaluate_network(model, dataloader):
    correct = 0
    total = 0
    with torch.no_grad():
        for itr, (image, label) in enumerate(dataloader):
            outputs = model(image)
            _, predicted = torch.max(outputs.data, 1)
            correct += predicted.eq(label.reshape(len(label),)).sum() 
            total += float(len(label))
        accuracy = correct / total
        return accuracy

In [None]:
class NN(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(32 * 32 * 3, 128)
        self.layer2 = nn.Linear(128, 64)
        self.out_layer = nn.Linear(64, K)
        self.relu = nn.ReLU()
    
    def forward(self, x):
        x = self.relu(self.layer1(x))
        x = self.relu(self.layer2(x))
        x = self.out_layer(x)
        return x

## Convolutional Neural Network

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.relu = nn.ReLU()
        self.max_pool2d = nn.MaxPool2d(kernel_size=2, stride=2)
        self.flatten = nn.Flatten()
        self.conv_1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.batch_norm_1 = nn.BatchNorm2d(32)
        self.conv_2 = nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.batch_norm_2 = nn.BatchNorm2d(32)
        self.conv_3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.batch_norm_3 = nn.BatchNorm2d(64)
        self.conv_4 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.batch_norm_4 = nn.BatchNorm2d(64)
        self.dropout_1 = nn.Dropout(0.5)
        self.conv_5 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.batch_norm_5 = nn.BatchNorm2d(64)
        self.conv_6 = nn.Conv2d(in_channels=64, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.batch_norm_6 = nn.BatchNorm2d(64)
        self.dropout_2 = nn.Dropout(0.5)
        self.linear_1 = nn.Linear(4 * 4 * 64, 128)
        self.dropout_3 = nn.Dropout(0.25)
        self.linear_2 = nn.Linear(128, K)

    def forward(self, x):
        x = self.relu(self.batch_norm_1(self.conv_1(x)))
        x = self.relu(self.batch_norm_2(self.conv_2(x)))
        x = self.max_pool2d(x)
        x = self.relu(self.batch_norm_3(self.conv_3(x)))
        x = self.relu(self.batch_norm_4(self.conv_4(x)))
        x = self.dropout_1(x)
        x = self.max_pool2d(x)
        x = self.relu(self.batch_norm_5(self.conv_5(x)))
        x = self.relu(self.batch_norm_6(self.conv_6(x)))
        x = self.dropout_2(x)
        x = self.max_pool2d(x)
        x = self.flatten(x)
        x = self.relu(self.linear_1(x))
        x = self.dropout_3(x)
        x = self.linear_2(x)
        return x

## Transfer Learning

In [None]:
def Resnet():
    model = torchvision.models.resnet18(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    
    num_ftrs = model.fc.in_features
    model.fc = nn.Linear(num_ftrs, K)
    return model

In [None]:
def VGG16():
    model = torchvision.models.vgg16(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False
    
    num_ftrs = model.classifier[-1].in_features
    model.classifier[-1] = nn.Linear(num_ftrs, K)
    return model

In [None]:
def EfficientNet():
    model = torchvision.models.efficientnet_b0(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False

    num_ftrs = model.classifier[-1].in_features
    model.classifier[-1] = nn.Linear(num_ftrs, K)
    return model

# Training and Evaluation
We now run all the models defined above.

In [None]:
def generateConfusionMatrix(y_actual, y_pred):
    mat = confusion_matrix(y_actual, y_pred)
    plt.figure(figsize = (30, 30))
    sns.heatmap(mat.T, square=True, annot=True, fmt='d', cbar=False, xticklabels = labels['Name'], yticklabels = labels['Name'])
    plt.xlabel('true label')
    plt.ylabel('predicted label')
 
 
def classificationReport(y_actual, y_pred):
    print(classification_report(y_actual, y_pred, target_names = labels['Name']))
 

def perClassAccuracy(y_actual, y_pred):
    mat = confusion_matrix(y_actual, y_pred)
    class_accuracies = mat.diagonal()/(mat.sum(axis = 1))
    tablearray = np.column_stack((labels['Name'], class_accuracies))
    print(tabulate(tablearray, headers = ['Label', 'Accuracy'], tablefmt = 'fancy_grid'))

In [None]:
print('========== KNN ==========')
acc = knn(X_train_flattened, y_train, X_test_flattened, y_test)
print(f'Test Accuracy: {acc}')

In [None]:
print('========== KNN (PCA) ==========')
acc = knn(X_train_pca, y_train, X_test_pca, y_test)
print(f'Test Accuracy: {acc}')

In [None]:
print('========== Adaboost (PCA) ==========')
acc = adaboost(X_train_pca, y_train, X_test_pca, y_test)
print(f'Test Accuracy: {acc}')

In [None]:
print('========== Logistic Regression (PCA) ==========')
acc = logistic_regression(X_train_pca, y_train, X_test_pca, y_test)
print(f'Test Accuracy: {acc}')

In [None]:
print('========== Kernelized SVM (PCA) ==========')
acc = kernel_svm(X_train_pca, y_train, X_test_pca, y_test)
print(f'Test Accuracy: {acc}')

In [None]:
print('========== Dense Neural Network ==========')
train_loader, val_loader, test_loader = load_torch_data(X_train_flattened, y_train, X_test_flattened, y_test)
model = train_network(NN(), train_loader, val_loader, 30, 1e-3)
acc = evaluate_network(model, test_loader)
print(f'Test Accuracy: {acc}')

In [None]:
print('========== Convolutional Neural Network ==========')
train_loader, val_loader, test_loader = load_torch_data(X_train, y_train, X_test, y_test)
model = train_network(CNN(), train_loader, val_loader, 10, 1e-3)
accuracy = evaluate_network(model, test_loader)
print(f'Test Accuracy: {accuracy}')

In [None]:
print('========== Transfer Learning Resnet ==========')
train_loader, val_loader, test_loader = load_torch_data(X_train, y_train, X_test, y_test)
model = train_network(Resnet(), train_loader, val_loader, 10, 1e-3)
accuracy = evaluate_network(model, test_loader)
print(f'Test Accuracy: {accuracy}')

In [None]:
print('========== Transfer Learning VGG16 ==========')
train_loader, val_loader, test_loader = load_torch_data(X_train, y_train, X_test, y_test)
model = train_network(VGG16(), train_loader, val_loader, 10, 1e-3)
accuracy = evaluate_network(model, test_loader)
print(f'Test Accuracy: {accuracy}')

In [None]:
print('========== Transfer Learning EfficientNet ==========')
train_loader, val_loader, test_loader = load_torch_data(X_train, y_train, X_test, y_test)
model = train_network(EfficientNet(), train_loader, val_loader, 10, 1e-3)
accuracy = evaluate_network(model, test_loader)
print(f'Test Accuracy: {accuracy}')