In [None]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import cv2
from sklearn.model_selection import train_test_split
import random
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch.nn.functional as F
import torchinfo
from torchinfo import summary
from tqdm.auto import tqdm
import torch.optim as optim
import pickle

path = "Data"
labelFile = "labels.csv"
BATCH_SIZE = 20
epochs = 1
steps_per_epoch = 2000
imageDimensions = (32,32,3)
test_ratio = 0.2
validation_ratio = 0.2

#import images

cnt = 0
images = []
classno = []
list_data = os.listdir(path)
print(len(list_data))
no_of_classes = len(list_data)

for i in range(no_of_classes):
    pic_list = os.listdir(path+"/"+str(cnt))
    for pic in pic_list:
        pic_path = path+"/"+str(cnt)+"/"+pic
        img = cv2.imread(pic_path)
        images.append(img)
        classno.append(cnt)
    print(cnt)
    cnt += 1

images = np.array(images)
classno = np.array(classno)
print(images.shape)
print(classno.shape)

#split data
X_train, X_test, y_train, y_test = train_test_split(images, classno, test_size=test_ratio)
X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=validation_ratio)
print(X_train.shape)
print(X_test.shape)
print(X_val.shape)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
data = pd.read_csv(labelFile)
print(data.shape)
print(data.head())

In [None]:
num_of_samples = []
cols = 5
num_classes = no_of_classes

fig, axs = plt.subplots(nrows=num_classes, ncols=cols, figsize=(5,50))
fig.tight_layout()
for i in range(cols):
    for j,row in data.iterrows():
        x_selected = X_train[y_train == j]
        axs[j][i].imshow(x_selected[np.random.randint(0,len(x_selected)) - 1,:,:], cmap="gray")
        axs[j][i].axis("off")
        if i==2:
            axs[j][i].set_title(str(j)+ "-" + row["Name"])
            num_of_samples.append(len(x_selected))

In [None]:
print(num_of_samples)
plt.figure(figsize=(12,4))
plt.bar(range(0,num_classes), num_of_samples)
plt.title("Distribution of the training data")
plt.xlabel("Class number")
plt.ylabel("Number of images")
plt.show()

In [None]:
def grayscale(img):
    img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    return img
def equalize(img):
    img = cv2.equalizeHist(img)
    return img
def preprocessing(img):
    img = grayscale(img)
    img = equalize(img)
    img = img/255
    return img

X_train = np.array(list(map(preprocessing, X_train)))
X_val = np.array(list(map(preprocessing, X_val)))
X_test = np.array(list(map(preprocessing,X_test)))


In [None]:
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
X_val = X_val.reshape(X_val.shape[0], X_val.shape[1], X_val.shape[2], 1)
X_test = X_test.reshape(X_test.shape[0],X_test.shape[1],X_test.shape[2],1)

In [None]:
plt.imshow(X_train[0])

In [None]:
data_transforms = transforms.Compose([
    transforms.RandomAffine(
        degrees=10,
        translate=(0.1, 0.1),
        scale=(0.8, 1.2),
        shear=10
    ),
    transforms.ToTensor()
])

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None):
        """
        Args:
            images (numpy.ndarray): Image data (N, H, W, C).
            labels (numpy.ndarray): Corresponding labels (N,).
            transform (callable, optional): Optional transform to be applied on an image.
        """
        self.images = images
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        label = self.labels[idx]

        # Convert NumPy image (H, W, C) to PIL image for transformation
        image = transforms.ToPILImage()(image)

        if self.transform:
            image = self.transform(image)

        return image, label

In [None]:
X_train_dataset = CustomDataset(X_train, y_train, transform=data_transforms)
X_val_dataset = CustomDataset(X_val, y_val,transform=data_transforms)
X_test_dataset = CustomDataset(X_test, y_test)

X_train_loader = DataLoader(X_train_dataset, batch_size=BATCH_SIZE, shuffle=True)
X_val_loader = DataLoader(X_val_dataset, batch_size=BATCH_SIZE)
X_test_loader = DataLoader(X_test_dataset, batch_size=BATCH_SIZE)

X_train_dataset[3478][0]

In [None]:
fig,axs=plt.subplots(2,15,figsize=(20,5))
fig.tight_layout()

for i in range(15):
    axs[0][i].imshow(X_train_dataset[i][0].numpy().reshape(32,32,1))
    axs[1][i].set_title(X_train_dataset[i][1])
    axs[0][i].axis('off')
    axs[1][i].axis('off')
plt.show()

In [None]:
class MyModel(nn.Module):
    def __init__(self,input_shape,num_classes):
        super(MyModel, self).__init__()
        no_of_filters = 60
        size_of_filter = (5,5)
        size_of_filter2 = (3,3)
        size_of_pool = (2,2)
        no_of_nodes = 500

        self.conv1 = nn.Conv2d(in_channels = 1, out_channels = no_of_filters, kernel_size = size_of_filter)
        self.conv2 = nn.Conv2d(in_channels = no_of_filters, out_channels = no_of_filters, kernel_size = size_of_filter)
        self.pool1 = nn.MaxPool2d(kernel_size = size_of_pool)

        self.conv3 = nn.Conv2d(in_channels = no_of_filters, out_channels = no_of_filters//2, kernel_size = size_of_filter2)
        self.conv4 = nn.Conv2d(in_channels = no_of_filters//2, out_channels = no_of_filters//2, kernel_size = size_of_filter2)
        self.pool2 = nn.MaxPool2d(kernel_size = size_of_pool)

        self.flat = nn.Flatten()

        self.fc1 = nn.Linear(self.get_flatten_shape(input_shape), no_of_nodes)
        self.fc2 = nn.Linear(no_of_nodes, num_classes)

        self.dropout = nn.Dropout(p=0.5)

    def get_flatten_shape(self, input_shape):
        dummy_input = torch.zeros(1, 1, input_shape[2], input_shape[3])
        x = self.pool1(self.conv2(self.conv1(dummy_input)))
        x = self.pool2(self.conv4(self.conv3(x)))
        return x.numel()

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = self.pool1(x)

        x = F.relu(self.conv3(x))
        x = F.relu(self.conv4(x))
        x = self.pool2(x)
        x = self.dropout(x)

        x = F.relu(self.fc1(self.flat(x)))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
input_shape = X_train_dataset[0][0].shape
num_classes = no_of_classes
model = MyModel(input_shape=[1,1,32,32], num_classes=num_classes).to(device)
model

In [None]:
img_batch, label_batch = next(iter(X_train_loader))
print(f"Image batch shape: {img_batch.shape}\n")
print(f"Label batch shape: {label_batch.shape}")
# 2. Get a single image from the batch and unsqueeze the image so its shape fits the model
img_single, label_single = img_batch[0].unsqueeze(dim=0), label_batch[0]
print(f"Single image shape: {img_single.shape}\n")

# 3. Perform a forward pass on a single image
model.eval()
with torch.inference_mode():
    pred = model(img_single.to(device))

# 4.convert model logits -> pred probs -> pred label
print(f"Output logits:\n{pred}\n")
print(f"Output prediction probabilities:\n{torch.softmax(pred, dim=1)}\n")
print(f"Output prediction label:\n{torch.argmax(torch.softmax(pred, dim=1), dim=1)}\n")
print(f"Actual label:\n{label_single}")

In [None]:
summary(model, input_size = [1,1,32,32])

In [None]:
def train_step(model: torch.nn.Module,
               dataloader: torch.utils.data.DataLoader,
               loss_fn: torch.nn.Module,
               optimizer: torch.optim.Optimizer):
    # Put model in train mode
    model.train()

    # Setup train loss and train accuracy values
    train_loss, train_acc = 0, 0

    # Loop through data loader data batches
    for batch, (X, y) in enumerate(dataloader):
        # Send data to target device
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        y_pred = model(X)

        # 2. Calculate  and accumulate loss
        loss = loss_fn(y_pred, y)
        train_loss += loss.item()

        # 3. Optimizer zero grad
        optimizer.zero_grad()

        # 4. Loss backward
        loss.backward()

        # 5. Optimizer step
        optimizer.step()

        # Calculate and accumulate accuracy metrics across all batches
        y_pred_class = torch.argmax(torch.softmax(y_pred, dim=1), dim=1)
        train_acc += (y_pred_class == y).sum().item()/len(y_pred)

    # Adjust metrics to get average loss and accuracy per batch
    train_loss = train_loss / len(dataloader)
    train_acc = train_acc / len(dataloader)
    return train_loss, train_acc

In [None]:
def test_step(model: torch.nn.Module,
              dataloader: torch.utils.data.DataLoader,
              loss_fn: torch.nn.Module):
    # Put model in eval mode
    model.eval()

    # Setup test loss and test accuracy values
    test_loss, test_acc = 0, 0

    # Turn on inference context manager
    with torch.inference_mode():
        # Loop through DataLoader batches
        for batch, (X, y) in enumerate(dataloader):
            # Send data to target device
            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            test_pred_logits = model(X)

            # 2. Calculate and accumulate loss
            loss = loss_fn(test_pred_logits, y)
            test_loss += loss.item()

            # Calculate and accumulate accuracy
            test_pred_labels = test_pred_logits.argmax(dim=1)
            test_acc += ((test_pred_labels == y).sum().item()/len(test_pred_labels))

    # Adjust metrics to get average loss and accuracy per batch
    test_loss = test_loss / len(dataloader)
    test_acc = test_acc / len(dataloader)
    return test_loss, test_acc

In [None]:
# 1. Take in various parameters required for training and test steps
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          epochs: int = 5):

    # 2. Create empty results dictionary
    results = {"train_loss": [],
        "train_acc": [],
        "test_loss": [],
        "test_acc": []
    }

    # 3. Loop through training and testing steps for a number of epochs
    for epoch in tqdm(range(epochs)):
        train_loss, train_acc = train_step(model=model,
                                           dataloader=train_dataloader,
                                           loss_fn=loss_fn,
                                           optimizer=optimizer)
        test_loss, test_acc = test_step(model=model,
            dataloader=test_dataloader,
            loss_fn=loss_fn)

        # 4. Print out what's happening
        print(
            f"Epoch: {epoch+1} | "
            f"train_loss: {train_loss:.4f} | "
            f"train_acc: {train_acc:.4f} | "
            f"test_loss: {test_loss:.4f} | "
            f"test_acc: {test_acc:.4f}"
        )

        # 5. Update results dictionary
        # Ensure all data is moved to CPU and converted to float for storage
        results["train_loss"].append(train_loss.item() if isinstance(train_loss, torch.Tensor) else train_loss)
        results["train_acc"].append(train_acc.item() if isinstance(train_acc, torch.Tensor) else train_acc)
        results["test_loss"].append(test_loss.item() if isinstance(test_loss, torch.Tensor) else test_loss)
        results["test_acc"].append(test_acc.item() if isinstance(test_acc, torch.Tensor) else test_acc)

    # 6. Return the filled results at the end of the epochs
    return results

In [None]:
# Define loss and optimizer
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(params = model.parameters(), lr=0.001)

# pickle_in=open("model_trained.p","rb")
# model=pickle.load(pickle_in)



In [None]:
results = train(model = model, train_dataloader=X_train_loader, test_dataloader=X_val_loader,optimizer = optimizer, loss_fn=loss_fn,epochs=15)

In [None]:
def plot_loss_curves(results):
    """Plots training curves of a results dictionary.

    Args:
        results (dict): dictionary containing list of values, e.g.
            {"train_loss": [...],
             "train_acc": [...],
             "test_loss": [...],
             "test_acc": [...]}
    """

    # Get the loss values of the results dictionary (training and test)
    loss = results['train_loss']
    test_loss = results['test_loss']

    # Get the accuracy values of the results dictionary (training and test)
    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    # Figure out how many epochs there were
    epochs = range(len(results['train_loss']))

    # Setup a plot
    plt.figure(figsize=(15, 7))

    # Plot loss
    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    # Plot accuracy
    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()

In [None]:
plot_loss_curves(results)

Save and Test the model

In [None]:
torch.save(model.state_dict(), "model_trained.pth")
cv2.waitKey(0)

In [None]:
frameWidth= 640         # CAMERA RESOLUTION
frameHeight = 480
brightness = 180
threshold = 0.9        # PROBABLITY THRESHOLD
font = cv2.FONT_HERSHEY_SIMPLEX

In [None]:
cap = cv2.VideoCapture(0)
cap.set(3, frameWidth)
cap.set(4, frameHeight)
cap.set(10, brightness)
# IMPORT THE TRANNIED MODEL
model = MyModel(input_shape=[1,1,32,32], num_classes=num_classes).to(device)
model.load_state_dict(torch.load("model_trained.pth", map_location=device))
model.to(device)
model.eval()

In [None]:
def getClassName(classNo):
  if   classNo == 0: return 'Speed Limit 20 km/h'
  elif classNo == 1: return 'Speed Limit 30 km/h'
  elif classNo == 2: return 'Speed Limit 50 km/h'
  elif classNo == 3: return 'Speed Limit 60 km/h'
  elif classNo == 4: return 'Speed Limit 70 km/h'
  elif classNo == 5: return 'Speed Limit 80 km/h'
  elif classNo == 6: return 'End of Speed Limit 80 km/h'
  elif classNo == 7: return 'Speed Limit 100 km/h'
  elif classNo == 8: return 'Speed Limit 120 km/h'
  elif classNo == 9: return 'No passing'
  elif classNo == 10: return 'No passing for vechiles over 3.5 metric tons'
  elif classNo == 11: return 'Right-of-way at the next intersection'
  elif classNo == 12: return 'Priority road'
  elif classNo == 13: return 'Yield'
  elif classNo == 14: return 'Stop'
  elif classNo == 15: return 'No vechiles'
  elif classNo == 16: return 'Vechiles over 3.5 metric tons prohibited'
  elif classNo == 17: return 'No entry'
  elif classNo == 18: return 'General caution'
  elif classNo == 19: return 'Dangerous curve to the left'
  elif classNo == 20: return 'Dangerous curve to the right'
  elif classNo == 21: return 'Double curve'
  elif classNo == 22: return 'Bumpy road'
  elif classNo == 23: return 'Slippery road'
  elif classNo == 24: return 'Road narrows on the right'
  elif classNo == 25: return 'Road work'
  elif classNo == 26: return 'Traffic signals'
  elif classNo == 27: return 'Pedestrians'
  elif classNo == 28: return 'Children crossing'
  elif classNo == 29: return 'Bicycles crossing'
  elif classNo == 30: return 'Beware of ice/snow'
  elif classNo == 31: return 'Wild animals crossing'
  elif classNo == 32: return 'End of all speed and passing limits'
  elif classNo == 33: return 'Turn right ahead'
  elif classNo == 34: return 'Turn left ahead'
  elif classNo == 35: return 'Ahead only'
  elif classNo == 36: return 'Go straight or right'
  elif classNo == 37: return 'Go straight or left'
  elif classNo == 38: return 'Keep right'
  elif classNo == 39: return 'Keep left'
  elif classNo == 40: return 'Roundabout mandatory'
  elif classNo == 41: return 'End of no passing'
  elif classNo == 42: return 'End of no passing by vechiles over 3.5 metric tons'

In [None]:
while True:
    success, imgOriginal = cap.read()
    if not success:
        print("Error: Could not read frame from camera.")
        break

    img = cv2.resize(imgOriginal, (32, 32))
    img = preprocessing(img)
    img = img.reshape(1, 1, 32, 32)
    img = torch.from_numpy(img).float().to(device)

    with torch.inference_mode():
        outputs = model(img)
        classIndex = torch.argmax(outputs, dim=1).item()
        probabilityValue = torch.max(torch.softmax(outputs, dim=1)).item()

    cv2.putText(imgOriginal, "CLASS:", (20, 35), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)
    cv2.putText(imgOriginal, "PROBABILITY:", (20, 75), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)

    if probabilityValue > threshold:
        cv2.putText(imgOriginal, f"{classIndex} {getClassName(classIndex)}", (120, 35), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)
        cv2.putText(imgOriginal, f"{round(probabilityValue * 100, 2)}%", (180, 75), font, 0.75, (0, 0, 255), 2, cv2.LINE_AA)

    cv2.imshow("Result", imgOriginal)

    if cv2.waitKey(1) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()