In [None]:
import pandas as pd
import cv2
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision.datasets import GTSRB
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt


# Image preprocessing function
def image_processing_function(image_list):
    augmented_image = []
    clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8))
    df = pd.read_csv("Annotate.csv", header=None)
    annotate = df.values.tolist()

    for i in range(len(image_list)):
        image_gray = cv2.cvtColor(image_list[i], cv2.COLOR_BGR2GRAY)
        cropped_image = image_gray[
            int(annotate[i][1]):int(annotate[i][3]),
            int(annotate[i][0]):int(annotate[i][2])
        ]
        cropped_image = cv2.resize(cropped_image, (44, 42))
        clahe_image = clahe.apply(cropped_image)
        kernel = np.array([
            [0, -1, 0],
            [-1, 5, -1],
            [0, -1, 0]
        ])
        sharpened = cv2.filter2D(clahe_image, -1, kernel)
        augmented_image.append(sharpened)
    return augmented_image

class CustomGTSRBDataset(Dataset):
    def __init__(self, dataset, annotations_file):
        self.dataset = dataset
        self.annotations_file = annotations_file

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        image, label = self.dataset[idx]  # Get image and label
        image = np.array(image)  # Convert PIL image to NumPy array
        processed_image = image_processing_function([image])[0]  # Apply preprocessing
        processed_image = torch.tensor(processed_image, dtype=torch.float32).unsqueeze(0)  # Convert to tensor
        return processed_image, label


# Load GTSRB dataset
root_dir = './data/GTSRB'
gtsrb_train = GTSRB(root=root_dir, split='train', download=True)
gtsrb_test = GTSRB(root=root_dir, split='test', download=True)

# Apply custom dataset with preprocessing
train_dataset = CustomGTSRBDataset(gtsrb_train, "Annotate.csv")
test_dataset = CustomGTSRBDataset(gtsrb_test, "Annotate.csv")

# Data loaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)
class TrafficSignCNN(nn.Module):
    def __init__(self, num_classes):
        super(TrafficSignCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.pool = nn.MaxPool2d(2, 2)
        self.dropout = nn.Dropout(0.5)

        # Calculate the flattened feature size dynamically
        self._to_linear = None
        self._initialize_flattened_size((1, 42, 44))  # Input image size (channels, height, width)

        self.fc1 = nn.Linear(self._to_linear, 256)
        self.fc2 = nn.Linear(256, num_classes)

    def _initialize_flattened_size(self, input_size):
        dummy_input = torch.zeros(1, *input_size)
        x = self.pool(torch.relu(self.bn1(self.conv1(dummy_input))))
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))
        self._to_linear = x.numel()  # Total number of elements in the flattened tensor

    def forward(self, x):
        x = self.pool(torch.relu(self.bn1(self.conv1(x))))
        x = self.pool(torch.relu(self.bn2(self.conv2(x))))
        x = self.pool(torch.relu(self.bn3(self.conv3(x))))
        x = x.view(x.size(0), -1)  # Dynamically flatten the tensor
        x = torch.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x



# Define training setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Get the number of unique classes
num_classes = len(set(sample[1] for sample in gtsrb_train))
model = TrafficSignCNN(num_classes=num_classes).to(device)
loss_function = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training and validation loops
train_losses, train_accuracies = [], []
test_losses, test_accuracies = [], []
best_test_acc = 0.0
epochs = 30

for epoch in range(epochs):
    model.train()
    running_loss, correct_train, total_train = 0.0, 0, 0
    all_train_preds, all_train_labels = [], []

    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)


        optimizer.zero_grad()
        outputs = model(inputs)

        if outputs.shape[0] != labels.shape[0]:
            raise ValueError(f"Mismatch in batch sizes: outputs={outputs.shape[0]}, labels={labels.shape[0]}")

        loss = loss_function(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs.data, 1)
        total_train += labels.size(0)
        correct_train += (predicted == labels).sum().item()

        all_train_preds.extend(predicted.cpu().numpy())
        all_train_labels.extend(labels.cpu().numpy())

    train_loss = running_loss / len(train_loader)
    train_accuracy = 100 * correct_train / total_train
    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    train_precision = precision_score(all_train_labels, all_train_preds, average='weighted', zero_division=0)
    train_recall = recall_score(all_train_labels, all_train_preds, average='weighted', zero_division=0)
    train_f1 = f1_score(all_train_labels, all_train_preds, average='weighted', zero_division=0)

    # Validation loop
    model.eval()
    test_running_loss, correct_test, total_test = 0.0, 0, 0
    all_test_preds, all_test_labels = [], []

    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)

            if outputs.shape[0] != labels.shape[0]:
                raise ValueError(f"Mismatch in batch sizes: outputs={outputs.shape[0]}, labels={labels.shape[0]}")

            loss = loss_function(outputs, labels)
            test_running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total_test += labels.size(0)
            correct_test += (predicted == labels).sum().item()

            all_test_preds.extend(predicted.cpu().numpy())
            all_test_labels.extend(labels.cpu().numpy())

    test_loss = test_running_loss / len(test_loader)
    test_accuracy = 100 * correct_test / total_test
    test_losses.append(test_loss)
    test_accuracies.append(test_accuracy)

    test_precision = precision_score(all_test_labels, all_test_preds, average='weighted', zero_division=0)
    test_recall = recall_score(all_test_labels, all_test_preds, average='weighted', zero_division=0)
    test_f1 = f1_score(all_test_labels, all_test_preds, average='weighted', zero_division=0)

    if test_accuracy > best_test_acc:
        best_test_acc = test_accuracy
        torch.save(model.state_dict(), 'best_model.pth')

    print(f'Epoch [{epoch+1}/{epochs}]')
    print(f'Training Loss: {train_loss:.3f}')
    print(f'Training Accuracy: {train_accuracy:.2f}%')
    print(f'Training Precision: {train_precision:.3f}, Recall: {train_recall:.3f}, F1-Score: {train_f1:.3f}')
    print(f'Testing Accuracy: {test_accuracy:.2f}%')
    print(f'Testing Precision: {test_precision:.3f}, Recall: {test_recall:.3f}, F1-Score: {test_f1:.3f}')
    print(f'Best Testing Accuracy: {best_test_acc:.2f}%')
    print('--------------------')

# Plotting Metrics
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
ax1.plot(train_losses, label='Train Loss')
ax1.plot(test_losses, label='Test Loss')
ax1.legend()
ax1.set_title('Loss vs Epochs')
ax2.plot(train_accuracies, label='Train Accuracy')
ax2.plot(test_accuracies, label='Test Accuracy')
ax2.legend()
ax2.set_title('Accuracy vs Epochs')
plt.show()

# Confusion Matrix
conf_matrix = confusion_matrix(all_test_labels, all_test_preds)
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Blues")
plt.title("Confusion Matrix")
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.show()


Epoch [1/30]
Training Loss: 2.602
Training Accuracy: 27.47%
Training Precision: 0.239, Recall: 0.275, F1-Score: 0.239
Testing Accuracy: 42.95%
Testing Precision: 0.452, Recall: 0.430, F1-Score: 0.392
Best Testing Accuracy: 42.95%
--------------------
Epoch [2/30]
Training Loss: 2.013
Training Accuracy: 40.99%
Training Precision: 0.386, Recall: 0.410, F1-Score: 0.384
Testing Accuracy: 51.97%
Testing Precision: 0.540, Recall: 0.520, F1-Score: 0.504
Best Testing Accuracy: 51.97%
--------------------
Epoch [3/30]
Training Loss: 1.792
Training Accuracy: 46.81%
Training Precision: 0.461, Recall: 0.468, F1-Score: 0.450
Testing Accuracy: 58.00%
Testing Precision: 0.580, Recall: 0.580, F1-Score: 0.563
Best Testing Accuracy: 58.00%
--------------------
Epoch [4/30]
Training Loss: 1.656
Training Accuracy: 50.74%
Training Precision: 0.507, Recall: 0.507, F1-Score: 0.491
Testing Accuracy: 59.70%
Testing Precision: 0.634, Recall: 0.597, F1-Score: 0.586
Best Testing Accuracy: 59.70%
-----------------