# Import Required Libraries 

In [None]:
import numpy as np 
import pandas as pd 
import os
import seaborn as sns
import matplotlib.pyplot as plt
import random

from PIL import Image
from skimage import io

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split, Dataset
import torch.optim as optim

import torchvision.datasets
import torchvision.transforms as T 
from torchvision.io import read_image
from torchvision.datasets import DatasetFolder
from torchvision.datasets.folder import default_loader

# Loading The Data

In [None]:
meta_df = pd.read_csv('/kaggle/input/gtsrb-german-traffic-sign/Meta.csv')
train_df = pd.read_csv('/kaggle/input/gtsrb-german-traffic-sign/Train.csv')
test_df = pd.read_csv('/kaggle/input/gtsrb-german-traffic-sign/Test.csv')

In [None]:
data_path = '/kaggle/input/gtsrb-german-traffic-sign'
train_data_path = os.path.join(data_path, 'Train')
test_data_path = os.path.join(data_path, 'Test')
meta_data_path = os.path.join(data_path, 'Meta')

# Exploring The Data

In [None]:
#exploring Meta.csv file

print("")
print("------------------------------------------------")
print(meta_df.head())
print("------------------------------------------------")

print("number of classes in the dataset:",meta_df.ClassId.nunique())
print("number of Shape Ids in the dataset:",meta_df.ShapeId.nunique())
print("number of Color Ids in the dataset:",meta_df.ColorId.nunique())
print("number of Sign Ids in the dataset:",meta_df.SignId.nunique())
print("")

## Plotting the original Traffic signs

In [None]:
signs = [os.path.join(data_path, meta_df.Path.to_list()[i]) for i in range(43)]
fig, axes = plt.subplots(11, 4, figsize=(15, 10))
for i, image_path in enumerate(signs):
    image = Image.open(image_path)
    row = i // 4
    col = i % 4
    axes[row, col].imshow(image)
    axes[row, col].axis('off')
    

plt.show();

In [None]:
#exploring Train.csv file

print("")
print("------------------------------------------------")
print(train_df.head())
print("------------------------------------------------")

print("number of Training Samples in the dataset:",train_df.shape[0])
print("number of Test Samples in the dataset:",test_df.shape[0])
print("number of Classes in the dataset:",train_df["ClassId"].nunique())
print("The Maximum Width:",train_df["Width"].max())
print("The Maximum Height:",train_df["Height"].max())

## The Distribution of the Class labels in the dataset 

In [None]:
classes = train_df["ClassId"].value_counts().head(43)
plt.figure(figsize=(12,6))
plt.title("Distribution of Class Labels in the dataset")
plt.ylabel('Counts')
plt.xlabel('Classes')

sns.barplot(y=classes.values, x=classes.index,color='g');

## Plotting samples of the Traffic signs 

In [None]:
data_path = '/kaggle/input/gtsrb-german-traffic-sign'
train_data_path = os.path.join(data_path, 'Train')
valid_data_path = os.path.join(data_path, 'Test')

In [None]:
folder_names = [os.path.join(train_data_path, str(i)) for i in random.choices(range(43), k=20)]
file_names = [os.path.join(fldr, os.listdir(fldr)[0]) for fldr in folder_names]

fig, axes = plt.subplots(4, 5, figsize=(12, 8))
for i, image_path in enumerate(file_names):
    image = Image.open(image_path)
    row = i // 5
    col = i % 5
    axes[row, col].imshow(image)
    axes[row, col].axis('off')

plt.show();

# Building Custom Dataset for Traffic signs

In [None]:
# Transforming the Data ToTensor and Normalize it 

transforms = T.Compose([T.ToTensor(),T.Resize((225,225)),
    T.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])

In [None]:
class TSignsDataset(Dataset):
    def __init__(self, df, root_dir,transform=None):
        self.df = df
        self.root_dir = root_dir
        self.transform = transform
    
    def __len__(self):
        return len(self.df)
    
    def __getitem__(self,index):
        image_path = os.path.join(self.root_dir,self.df.iloc[index,7]) #the column of paths in df is 7
        image = Image.open(image_path)
        y_class = torch.tensor(self.df.iloc[index, 6]) #the column of ClsassId in df is 6
        
        if self.transform:
            image = self.transform(image)

        return (image, y_class)

In [None]:
training_set = TSignsDataset(train_df,data_path,transform=transforms)
validation_set = TSignsDataset(test_df,data_path,transform=transforms)

# Loading The data into DataLoaders

In [None]:
"""
#splitting the data into training and validation sets

L = len(training_set)
Len = int(0.8*L)
train_set, val_set = torch.utils.data.random_split(training_set,[Len,int(L-Len)])
"""

In [None]:
#Loading the data into DataLoader

train_loader = DataLoader(dataset=training_set, batch_size=32, shuffle=True)
valid_loader = DataLoader(dataset=validation_set, batch_size=32, shuffle=False)

dataloaders = {'training':train_loader,'validation':valid_loader}
dataset_sizes = {'training':len(train_loader.dataset),'validation':len(valid_loader.dataset)}
print(dataset_sizes)


# ResNet Explained

**ResNet is a type of CNN.It was designed to tackle the issue of vanishing gradients in deep networks, which was a major hindrance in developing deep neural networks. Its architecture enables the network to learn multiple layers of features without getting stuck in local minima.**

### Here are the key features of the ResNet (Residual Network) architecture:

* Residual Connections: ResNet incorporates residual connections, which allow for training very deep neural networks and alleviate the vanishing gradient problem. 

* Identity Mapping: ResNet uses identity mapping as the residual function, which makes the training process easier by learning the residual mapping rather than the actual mapping.

* Depth: ResNet enables the creation of very deep neural networks, which can improve performance on image recognition tasks. 

* Fewer Parameters: ResNet achieves better results with fewer parameters, making it computationally more efficient.

* State-of-the-art Results: ResNet has achieved state-of-the-art results on various image recognition tasks and has become a widely used benchmark for image recognition tasks.

* General and Effective Approach: The authors conclude that residual connections are a general and effective approach for enabling deeper networks.

### How ResNet Works?

* ResNet works by adding residual connections to the network, which helps to maintain the information flow throughout the network and prevents the gradients from vanishing.

* The residual connection is a shortcut that allows the information to bypass one or more layers in the network and reach the output directly.

* The residual connection allows the network to learn the residual function and make small updates to the parameters, which enables the network to converge faster and achieve better performance.

* This enables the network to learn residual functions and helps the network to converge faster and achieve better performance.

* The residual connection is based on the idea that instead of trying to learn the complex mapping between the inputs and the outputs, it is easier to learn the residual function, which maps the inputs to the desired outputs.

### The Problem Statement
Deep Neural Networks provide more accuracy as the number of layers increases. But, when we go deeper into the network, the accuracy of the network decreases instead of increasing. An increase in the depth of the network increases the training error, which ultimately increases the test error. Because of this, the network cannot generalize well for new data, which becomes inefficient. This degradation indicates that the increase in the model layer does not aid the model’s performance.

### The solution
Adding more layers to a suitably deep model leads to higher training errors. The paper presents how architectural changes like residual learning tackle this degradation problem using residual networks. Residual Network adds an identity mapping between the layers. Applying identity mapping to the input will give the output the same as the input. The skip connections directly pass the input to the output, effectively allowing the network to learn an identity function. The paper presents a deep convolutional neural network architecture that solves the vanishing gradients problem and enables the training of deep networks. It showed that deep residual networks could be trained effectively, achieving improved accuracy on several benchmark datasets compared to previous state-of-the-art models.

### Residual learning


![image.png](attachment:184e7065-e4da-4300-b65f-3c0638fa3662.png)

H(x) = F(x) + x
Where,
* X is the input to the set of layers
* F(x) is the residual function
* H(x) is the mapping function from input to output

### The Layer Blocks of the ResNet

![image.png](attachment:da5daf91-f362-4ca9-a73b-78a7d50e91ea.png) 

![image.png](attachment:6ad1ce42-7db8-42cb-b905-711fe8da1601.png) 

![image.png](attachment:baf18d4e-6e58-403e-b097-67fbdd9af770.png) 

![image.png](attachment:7d2ba585-cb27-45b8-a401-5aeb201f96f3.png)

### The detailed architicture of the networks with different depth
50-layers = [3, 4, 6, 3]

101-layer = [3, 4, 23, 3]

152-layer = [3, 8, 36, 3]




![image.png](attachment:8dce57ac-01c9-4b74-89b0-6281471ee919.png)

**note that the number of out_channels in the third conv layer is always 4*(out_channels) of the first and second,this factor is represented in the block class as (expansion = 4).**

![image.png](attachment:28346c75-456d-4d3d-a9c8-2d0b48b885a0.png)



# Building The ResNet Model from scratch

### Generic Residual block 

In [None]:
class block(nn.Module):
    def __init__(
        self, in_channels, out_channels, identity_downsample=None, stride=1):
        super().__init__()
        self.expansion = 4
        self.conv1 = nn.Conv2d(in_channels,out_channels,kernel_size=1,stride=1,padding=0,bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels,out_channels,kernel_size=3,stride=stride,padding=1,bias=False,)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.conv3 = nn.Conv2d(out_channels,out_channels * self.expansion,kernel_size=1,stride=1,padding=0,bias=False,)
        self.bn3 = nn.BatchNorm2d(out_channels * self.expansion)
        self.relu = nn.ReLU()
        self.identity_downsample = identity_downsample
        self.stride = stride

    def forward(self, x):
        identity = x.clone()

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)

        if self.identity_downsample is not None:
            identity = self.identity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x

### Generic implementation of ResNet Class

In [None]:
class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(image_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # Essentially the entire ResNet architecture are in these 4 lines below
        self.layer1 = self._make_layer(block, layers[0], out_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], out_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], out_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], out_channels=512, stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        self.fc = nn.Linear(512 * 4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, num_residual_blocks, out_channels, stride):
        identity_downsample = None
        layers = []

        # Either if we half the input space for ex, 56x56 -> 28x28 (stride=2), or channels changes
        # we need to adapt the Identity (skip connection) so it will be able to be added
        # to the layer that's ahead
        if stride != 1 or self.in_channels != out_channels * 4:
            identity_downsample = nn.Sequential(nn.Conv2d(self.in_channels,out_channels * 4,kernel_size=1,stride=stride,bias=False)
                                                ,nn.BatchNorm2d(out_channels * 4))

        layers.append(block(self.in_channels, out_channels, identity_downsample, stride))

        # The expansion size is always 4 for ResNet 50,101,152
        self.in_channels = out_channels * 4

        # For example for first resnet layer: 256 will be mapped to 64 as intermediate layer,
        # then finally back to 256. Hence no identity downsample is needed, since stride = 1,
        # and also same amount of channels.
        for i in range(num_residual_blocks - 1):
            layers.append(block(self.in_channels, out_channels))

        return nn.Sequential(*layers)

### The ResNet: 3 levels of depth

In [None]:
def ResNet50(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 4, 6, 3], img_channel, num_classes)


def ResNet101(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 4, 23, 3], img_channel, num_classes)


def ResNet152(img_channel=3, num_classes=1000):
    return ResNet(block, [3, 8, 36, 3], img_channel, num_classes)

In [None]:
"""
# testing the network on random tensor

def test():
    BATCH_SIZE = 32
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    net = ResNet50(img_channel=3, num_classes=43).to(device)
    y = net(torch.randn(BATCH_SIZE, 3, 225, 225)).to(device)
    assert y.size() == torch.Size([BATCH_SIZE, 43])
    print(y.size())

test()
"""

# Training The model

In [None]:
# def Train(model,criterion,optimizer,num_epochs,batch_size,dataloaders,out_path):
    

#     best_model_weights = model.state_dict()
#     best_acc = 0.0
    
#     for epoch in range(num_epochs):
#         print("epoch {}/{}".format(epoch+1,num_epochs))
#         print("*" * 10)
        
#         for x in ["training","validation"]:
#             if x == "training" :
#                 model.train()
#             else:
#                 model.eval()
                
#             running_loss = 0.0
#             running_accuracy = 0
            
#             for data in dataloaders[x]:
#                 img , y = data
#                 img , y = img.to(device) , y.to(device)
                
#                 optimizer.zero_grad()
#                 y_pred = model(img)
#                 loss = criterion(y_pred,y)
#                 _, preds = torch.max(y_pred, dim=1)
                
#                 if x == 'training':
#                     loss.backward()
#                     optimizer.step()
                    
#                 running_loss += loss.item()
#                 running_accuracy += torch.sum(preds == y.data)
            
                                                
#             epoch_loss = running_loss / dataset_sizes[x]
#             epoch_acc = running_accuracy / dataset_sizes[x]
            
#             print('{} Loss: {:.4f} || Accuracy: {:.4f}'.format(x, epoch_loss, epoch_acc))

#             # deep copy the model
#             if x == 'validation' and epoch_acc > best_acc:
#                 best_acc = epoch_acc
                                                

#     # load best model weights
#     torch.save(model.state_dict(), out_path)
#     return print('Best validation Accuracy: {:4f}'.format(best_acc))

# #set device
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# model = ResNet50(img_channel=3, num_classes=43).to(device)
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(),lr = 0.001)
# num_epochs = 6
# batch_size = 32
# out_path = "/kaggle/working/best_model.pt"

# #train the model
# Train(model,criterion,optimizer,num_epochs,batch_size,dataloaders,out_path)

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import matplotlib.pyplot as plt

def Train(model, criterion, optimizer, num_epochs, batch_size, dataloaders, out_path):
    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []

    best_model_weights = model.state_dict()
    best_acc = 0.0

    for epoch in range(num_epochs):
        print("epoch {}/{}".format(epoch+1, num_epochs))
        print("*" * 10)

        for phase in ["training", "validation"]:
            if phase == "training":
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_accuracy = 0

            for data in dataloaders[phase]:
                img, y = data
                img, y = img.to(device), y.to(device)

                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == "training"):
                    y_pred = model(img)
                    loss = criterion(y_pred, y)
                    _, preds = torch.max(y_pred, dim=1)

                    if phase == "training":
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * img.size(0)
                running_accuracy += torch.sum(preds == y.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_accuracy.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} || Accuracy: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            if phase == 'training':
                train_losses.append(epoch_loss)
                train_accuracies.append(epoch_acc)
            else:
                val_losses.append(epoch_loss)
                val_accuracies.append(epoch_acc)

            if phase == 'validation' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_weights = model.state_dict()

    torch.save(best_model_weights, out_path)
    print('Best validation Accuracy: {:4f}'.format(best_acc))

    # Plotting
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.plot(range(1, num_epochs + 1), train_losses, label='Training Loss')
    plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.title('Loss Over Epochs')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(range(1, num_epochs + 1), train_accuracies, label='Training Accuracy')
    plt.plot(range(1, num_epochs + 1), val_accuracies, label='Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.title('Accuracy Over Epochs')
    plt.legend()

    plt.tight_layout()
    plt.show()



# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define model, criterion, optimizer, etc.
# Assuming dataloaders and ResNet50 are defined elsewhere

model = ResNet50(img_channel=3, num_classes=43).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 1
batch_size = 32
out_path = "/kaggle/working/best_model.pt"

# Train the model
Train(model, criterion, optimizer, num_epochs, batch_size, dataloaders, out_path)


In [None]:
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np

# Define a function to calculate precision, recall, and F1 score for each class
def class_wise_metrics(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred)
    cr = classification_report(y_true, y_pred, output_dict=True)

    precision = []
    recall = []
    f1_score = []
    for cls in range(len(cm)):
        precision.append(cr[str(cls)]['precision'])
        recall.append(cr[str(cls)]['recall'])
        f1_score.append(cr[str(cls)]['f1-score'])

    return cm, precision, recall, f1_score

# Calculate and print confusion matrix, precision, recall, and F1 score for each class
def print_metrics(model, dataloader):
    model.eval()
    y_true = []
    y_pred = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)

    cm, precision, recall, f1_score = class_wise_metrics(y_true, y_pred)

    # Confusion Matrix
    print("Confusion Matrix:")
    print(cm)
    print()

    # Table of precision, recall, and F1 score
    print("Class-wise Metrics:")
    print("{:<10} {:<10} {:<10} {:<10}".format("Class", "Precision", "Recall", "F1 Score"))
    for i in range(len(cm)):
        print("{:<10} {:<10.4f} {:<10.4f} {:<10.4f}".format(i, precision[i], recall[i], f1_score[i]))

# Example usage:
print_metrics(model, dataloaders['validation'])


# Loading The trained Model 

In [None]:
model.load_state_dict(torch.load(out_path))



# References

* Deep Residual Learning for Image Recognition: https://arxiv.org/abs/1512.03385
* ResNet Explained :https://www.analyticsvidhya.com/blog/2023/02/deep-residual-learning-for-image-recognition-resnet-explained/
* Pytorch ResNet implementation from Scratch: https://www.youtube.com/watch?v=DkNIBBBvcPs