In [2]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import pickle
import random
from skimage import io, transform, exposure
from skimage.transform import ProjectiveTransform, rotate, warp

In [3]:
def save_data(datafile, data):
    with open(datafile, 'wb') as file:
        pickle.dump(data, file)

def load_data(datafile):
    with open(datafile, 'rb') as file:
        return pickle.load(file)

In [3]:
#Loading Training Data
base_dir = os.path.join(os.getcwd(), 'Training_Images')
train_data = {i : [] for i in range(len(os.listdir(base_dir)))}
for i, folder in zip(train_data, os.listdir(base_dir)):
    folder_path = os.path.join(base_dir, folder)
    for file in os.listdir(folder_path):
        if file.endswith('ppm'):
            file_path = os.path.join(folder_path, file)
            image = np.array(io.imread(file_path))
            image = transform.resize(image, (32, 32), mode='edge')
            train_data[i].append(image)

In [4]:
#Load Test Data
df =  pd.read_csv('Test.csv')
test_data = {
    'feature' : [],
    'label' : []
}

base_dir = os.path.join(os.getcwd(), 'Test_Images')
for i, images in enumerate(os.listdir(base_dir)):
    if images.endswith('ppm'):
        images = os.path.join(base_dir, images)
        images = np.array(io.imread(images))
        images = transform.resize(images, (32, 32), mode ='edge')
        test_data['feature'].append(images)
        test_data['label'].append(df['ClassId'][i])

In [3]:
#Data Functions
def image_flip(feature, label):
    vert = [1, 5, 12, 15, 17]
    hori = [11, 12, 13, 15, 17, 18, 22, 26, 30, 35]
    both = [32, 40]
    chng = {19: 20, 20: 19,
            33: 34, 34: 33,
            36: 37, 37: 36,
            38: 39, 39: 38}

    if label in vert:
        vert_flipped = np.flip(feature, axis=0)
        yield vert_flipped, label
    
    if label in hori:
        hori_flipped = np.flip(feature, axis=1)
        yield hori_flipped, label
    
    if label in both:
        new_image = np.flip(feature, axis=(0, 1))
        yield new_image, label
    
    if label in chng:
        flipped_image = np.flip(feature, axis=1)
        yield flipped_image, chng[label]
    

def image_warp(feature, label=None, intensity=0.75):
    delta = 32 * .3 * intensity
    tl_top = random.uniform(-delta, delta)
    tl_left = random.uniform(-delta, delta)
    bl_bot = random.uniform(-delta, delta)
    bl_left = random.uniform(-delta, delta)
    tr_top = random.uniform(-delta, delta)
    tr_right = random.uniform(-delta, delta)
    br_bot = random.uniform(-delta, delta)
    br_right = random.uniform(-delta, delta)

    tf = ProjectiveTransform()
    tf.estimate(np.array(((tl_left, tl_top), 
                          (bl_left, 32 - bl_bot), 
                          (32 - br_right, 32 - br_bot), 
                          (32 - tr_right, tr_top))),
                np.array(((0, 0),
                          (0, 32),
                          (32, 32),
                          (32, 0)))
                )

    new_image = transform.warp(feature, tf, order=1, mode='edge')
    return new_image, label

def image_rotate(feature, label=None, intensity=0.75):
    delta = 30 * intensity
    new_image = transform.rotate(feature, angle=(random.uniform(-delta, delta)), mode='edge')
    return new_image, label

def grayscale_contrast(feature):
    new_image = 0.2989 * feature[:, :, 0] + 0.5870 * feature[:, :, 1] + 0.1140 * feature[:, :, 2]
    new_image = exposure.equalize_adapthist(new_image)
    return new_image



In [None]:
#Flip Images
dump = [[] for _ in range(len(train_data))]

for i, feature in train_data.items():
    for image in feature:
        for feat, labl in image_flip(image, i):
            dump[labl].append(feat)

for i in range(len(dump)):
    train_data[i] += dump[i]

print("Done Image Flip")

In [None]:
#Warp Images
temp = {i : value[:] for i, value in train_data.items()}
max_sample = max([len(temp[i]) for i in range(len(temp))])

for label, feature in train_data.items():
    while len(feature) < max_sample:
        aug_image, lbl = image_rotate(feature[random.randint(0, len(temp[label]) - 1)], label)
        aug_image, lbl = image_warp(aug_image, lbl)
        train_data[lbl].append(aug_image)

print("Done Image Augmentation")

In [8]:
#Store Data into single list
final_train = {
    'feature': [],
    'label': []
}

final_test = {
    'feature': [],
    'label' : []
}

for lbl, fet in train_data.items():
    for f in fet:
        final_train['feature'].append(grayscale_contrast(f))
        final_train['label'].append(lbl)

final_train['feature'] = np.stack(final_train['feature'])
final_train['label'] = np.stack(final_train['label'])

for fet, lbl in zip(test_data['feature'], test_data['label']):
    for f in fet:
        final_test['feature'].append(grayscale_contrast(f))
        final_test['label'].append(lbl)

final_test['feature'] = np.stack(final_test['feature'])
final_test['label'] = np.stack(final_test['label'])

In [9]:
save_data('training_data.pkl', final_train)
save_data('test_data.pkl', final_test)

In [4]:
train_data = load_data('training_data.pkl')
test_data = load_data('test_data.pkl')

# Model Design

In [5]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as opt
from torch.utils.data import DataLoader, TensorDataset
from torchvision import io, transforms

In [6]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
df = pd.read_csv('trafficsignclassifier.csv')

In [42]:
#Initial Hyperparameters
num_epochs = 100
batch_size = 100
learning_rate = 0.001
classes = tuple([i for i in df['signNames']])

In [43]:
data_transform = transforms.Normalize(mean=[0.5], std=[1])

In [44]:
class TrafficCNN(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv32 = nn.Conv2d(1, 32, 5, padding=2) 
        self.conv16 = nn.Conv2d(32, 64, 5, padding=2) 
        self.conv8 = nn.Conv2d(64, 128, 5, padding=2)
        self.maxp = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128 * 4 * 4, 1024)
        self.fc2 = nn.Linear(1024, 512)
        self.fc3 = nn.Linear(512, 43)

    def forward(self, x):
        # Conv -> ReLU -> MaxPool -> Conv -> ReLU -> MaxPool -> Conv -> ReLU -> Maxpool
        x = self.maxp(F.relu(self.conv32(x)))
        x = self.maxp(F.relu(self.conv16(x)))
        x = self.maxp(F.relu(self.conv8(x)))

        x = torch.flatten(x, 1)

        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)

        return x

In [45]:
X_train = torch.from_numpy(train_data['feature']).unsqueeze(1)
X_train = data_transform(X_train)
y_train = torch.from_numpy(train_data['label'])

In [None]:
X_test = torch.from_numpy(test_data['feature']).unsqueeze(1)
X_test = data_transform(X_test)
y_test = torch.from_numpy(test_data['label'])

In [46]:
train_dataset = TensorDataset(X_train, y_train)
loaded_train = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = TensorDataset(X_test, y_test)
loaded_test = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)


In [47]:
model = TrafficCNN().to(device)

criterion = nn.CrossEntropyLoss()
optimizer = opt.Adam(model.parameters(), lr=1e-4)

In [None]:
total_step = len(loaded_train)
for epoch in range(num_epochs):
    for i, (image, label) in enumerate(loaded_train):

        image = image.to(device)
        label = label.to(device)
        #Forward Pass
        out = model(image.float())

        try:
            loss = criterion(out, label)
        except Exception as e:
            print(f"Error in loss computation: {e}")
            continue

        #Backprop
        try:
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
        except Exception as e:
            print(f'Error in backpropagation: {e}')
            continue

        if (i+1) % 100 == 0:
            print(f'Epoch {epoch + 1}: Step [{(i+1) * 100}/{total_step}] | Loss {loss.item():.4f} ')
            

print("Done Training")

In [None]:
correct = 0
total = 0
with torch.no_grad():
    for data, label in loaded_test:
        data = data.to(device)
        label = label.to(device)
