In [None]:
""" 
Multi-class classification
"""
import cv2
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import torch
from torch import nn
from sklearn.model_selection import train_test_split

In [None]:
path = '../input/german-traffic-sign-recognition-dataset/german_trafficsign/'

In [None]:
df = pd.read_csv(path + '/Train.csv')
df.head()

In [None]:
id2label = {
                        0 : 'speed limit 20(kmph)', 
                        1 : 'speed limit 30(kmph)', 
                        2 : 'speed limit 50(kmph)', 
                        3 : 'speed limit 60(kmph)', 
                        4 : 'speed limit 70(kmph)', 
                        5 : 'speed limit 80(kmph)', 
                        6 : 'restriction ends 80(kmph)',
                        7 : 'speed limit 100(kmph)', 
                        8 : 'speed limit 120(kmph)',
                        9 : 'no overtaking', 
                        10 : 'no overtaking (trucks)', 
                        11 : 'priority at next intersection', 
                        12 : 'priority road', 
                        13 : 'give way', 
                        14 : 'stop', 
                        15 : 'no traffic both ways', 
                        16 : 'no trucks', 
                        17 : 'no entry', 
                        18 : 'danger', 
                        19 : 'bend left', 
                        20 : 'bend right', 
                        21 : 'bend', 
                        22 : 'uneven road', 
                        23 : 'slippery road', 
                        24 : 'road narrows', 
                        25 : 'construction', 
                        26 : 'traffic signal', 
                        27 : 'pedestrian crossing', 
                        28 : 'school crossing', 
                        29 : 'cycles crossing', 
                        30 : 'snow', 
                        31 : 'animals',
                        32 : 'restriction ends',
                        33 : 'go right',
                        34 : 'go left', 
                        35 : 'go straight',
                        36 : 'go right or straight', 
                        37 : 'go left or straight', 
                        38 : 'keep right',
                        39 : 'keep left', 
                        40 : 'roundabout',
                        41 : 'restriction ends (overtaking)',
                        42 : 'restriction ends (overtaking (trucks))'
                      }

In [None]:
#df['class_name'] = np.nan
#df.loc[df.ClassId.isin(prohibitory), "class_name"] = 'Prohibitory'
#df.loc[df.ClassId.isin(mandatory), "class_name"] = 'Mandatory'
#df.loc[df.ClassId.isin(danger), "class_name"] = 'Danger'
#df.loc[df.ClassId.isin(other), "class_name"] = 'Other'

In [None]:
train_df, valid_df = train_test_split(df, test_size =0.2, shuffle = True, random_state = 42)

In [None]:
LR = 0.001
BATCH_SIZE = 32
EPOCHS = 10
IMG_SIZE = 32
DEVICE = 'cuda'
#MODEL_NAME = 'efficient_b0'

In [None]:
row = df.iloc[80]
img_path = path + row.Path
label = row.ClassId

image = cv2.imread(img_path)
image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)

print(image.shape)
plt.imshow(image)
plt.title(label)
plt.axis('off')
plt.tight_layout()

In [None]:
row = df.iloc[800]
img_path = path + row.Path
label = row.ClassId

image = cv2.imread(img_path)
image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)

print(image.shape)
plt.imshow(image)
plt.title(label)
plt.axis('off')
plt.tight_layout()

In [None]:
from torchvision import transforms as T
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
import albumentations as A

In [None]:
def get_train_augs():
    return A.Compose([
              A.Resize(IMG_SIZE,IMG_SIZE),
              A.HorizontalFlip(p=0.5),
              
                 ])

def get_valid_augs():
    return A.Compose([
              A.Resize(IMG_SIZE,IMG_SIZE)

                 ])

In [None]:
class TrafficDataset(Dataset):
  def __init__(self,df,transforms = None):
    self.df = df
    self.transforms = transforms

  def __len__(self):
    return len(self.df)

  def __getitem__(self,idx):

    row = self.df.iloc[idx]
    image_path = row.Path
    labels = row.ClassId

    image = cv2.imread(path + image_path)
    image = cv2.cvtColor(image,cv2.COLOR_BGR2RGB)

    if self.transforms:
        data = self.transforms(image = image, labels = labels) # dictionary image as key and mask as value
        image = data['image']
        labels = data['labels']

    # convert (h,w,c) to (c,h,w)
    image = np.transpose(image,(2,0,1)).astype('float32')
    image = torch.Tensor(image) / 255.0

    return image , labels

In [None]:
trainset = TrafficDataset(train_df,get_train_augs())
validset = TrafficDataset(valid_df,get_valid_augs())

In [None]:
print(f"Size of Trainset : {len(trainset)}")
print(f"Size of Validset : {len(validset)}")

In [None]:
idx = 12
image,labels = trainset[idx]
image = np.transpose(image,(1,2,0))
plt.imshow(image)
plt.suptitle(labels)
plt.title(id2label[labels])
plt.axis('off')
plt.tight_layout()

In [None]:
trainloader = DataLoader(trainset,batch_size = BATCH_SIZE,shuffle = True)
validloader = DataLoader(validset,batch_size = BATCH_SIZE)
print("Total number of batches in trainloader :", len(trainloader))
print(f"Total no. of batches in validloader :{len(validloader)}")

In [None]:
for image , label in trainloader:
    break

print(f" Shape of image: {image.shape}")
print(f" Shape of label: {label.shape}")

In [None]:
class ClassifierModel(nn.Module):

    def __init__(self,num_classes,dropout) :
        super(ClassifierModel,self).__init__()
        self.num_classes = num_classes
        self.dropout = dropout
        self.features = nn.Sequential(
                        nn.Conv2d(in_channels=3, out_channels=64, kernel_size=3,  padding='same'),
                        nn.ReLU(inplace = True),
                        nn.Conv2d(in_channels = 64,out_channels = 64, kernel_size =3, padding ='same'),
                        nn.ReLU(inplace = True),
                        nn.MaxPool2d((2,2),stride = (2,2)),
                        
                        nn.Conv2d(in_channels=64, out_channels=64*2, kernel_size=3,padding='same'),
                        nn.ReLU(inplace = True),
                        nn.Conv2d(in_channels =128, out_channels = 64*2, kernel_size =3, padding ='same'),
                        nn.ReLU(inplace = True),
                        nn.MaxPool2d((2,2),stride = (2,2)),
            
                        nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, padding='same'),
                        nn.ReLU(inplace = True),
                        nn.Conv2d(in_channels =256, out_channels = 256, kernel_size =3, padding ='same'),
                        nn.ReLU(inplace = True),
                        nn.Conv2d(in_channels =256, out_channels = 256, kernel_size =3, padding ='same'),
                        nn.ReLU(inplace = True),
                        nn.MaxPool2d((2,2),stride = (2,2)),
                        )
        
        self.classifier = nn.Sequential(
            nn.Linear(256 * 4 * 4, 1024),
            nn.ReLU(),
            nn.Dropout(p=self.dropout),
            nn.Linear(1024, 1024),
            nn.ReLU(),
            nn.Dropout(p=self.dropout),
            nn.Linear(1024, self.num_classes),)
        #self.eff_net = timm.create_model('efficientnet_b0',pretrained = True,num_classes = 4)
    def forward(self,images,labels = None):
        #logits = self.eff_net(images)
        x = self.features(images)
        h = x.view(x.shape[0], -1)
        logits = self.classifier(h)
        if labels !=None:
            loss = nn.CrossEntropyLoss()(logits,labels)
            return logits,loss
        return logits

In [None]:
model = ClassifierModel(43,0.5)
model.to(DEVICE)

In [None]:
def calc_accuracy(y_pred,y_true):
    top_pred = y_pred.argmax(1, keepdim = True)
    correct_pred = top_pred.eq(y_true.view_as(top_pred)).sum()
    acc = correct_pred.float() / y_true.shape[0]
    return acc

In [None]:
def train_fn(model,dataloader,optimizer,current_epo):
    model.train()
    total_loss =0.0
    total_acc =0.0
    tk =tqdm(dataloader,desc = "EPOCH" + "[Train]"+str(current_epo +1)+ "/"+str(EPOCHS))
    for t,data in enumerate(tk):
        images,labels = data
        images,labels = images.to(DEVICE), labels.to(DEVICE)
        optimizer.zero_grad()
        logits,loss = model(images,labels)
        loss.backward()
        optimizer.step()
        total_loss+= loss.item()
        total_acc += calc_accuracy(logits,labels)
        tk.set_postfix({'loss':'%6f'%float(total_loss / (t+1)),'acc':'%6f'%float(total_acc / (t+1))})
    return total_loss/len(dataloader), total_acc/len(dataloader)


In [None]:
def eval_fn(model,dataloader,current_epo):

    model.eval()
    total_loss =0.0
    total_acc =0.0
    tk =tqdm(dataloader,desc = "EPOCH" + "[val]"+str(current_epo +1)+ "/"+str(EPOCHS))
    for t,data in enumerate(tk):
        images,labels = data
        images,labels = images.to(DEVICE), labels.to(DEVICE)
        logits,loss = model(images,labels)
        total_loss+= loss.item()
        total_acc += calc_accuracy(logits,labels)
        tk.set_postfix({'loss':'%6f'%float(total_loss / (t+1)),'acc':'%6f'%float(total_acc / (t+1))})
    return total_loss/len(dataloader), total_acc/len(dataloader)

In [None]:
optimizer = torch.optim.Adam(model.parameters(),lr = LR)

In [None]:
best_valid_loss = np.Inf
train_losses = []
valid_losses = []
for i in range(EPOCHS):
    train_loss,train_acc = train_fn(model,trainloader,optimizer,i)
    valid_loss,valid_acc = eval_fn(model,validloader,i)
    train_losses.append(train_loss)
    valid_losses.append(valid_loss)
    if valid_loss < best_valid_loss:
        torch.save(model.state_dict(),'best_model.pt')
        print("Saved Model")
        best_valid_loss = valid_loss

In [None]:
plt.plot(train_losses,'r')
plt.plot(valid_losses,'b')
plt.legend(['Training loss', 'Validation loss'])
plt.xlabel('Epochs')
plt.ylabel('Loss')

In [None]:
y_true = valid_df['ClassId']
y_true = np.asarray(y_true)

In [None]:
model.load_state_dict(torch.load('/kaggle/working/best_model.pt'))
predicted =[]

with torch.no_grad():
    model.eval()
    i = 0
    for idx in range(valid_df.shape[0]):
        image,label = validset[idx]
        logits_ = model(image.to(DEVICE).unsqueeze(0))   #(c,h,w)
        logits_ =nn.functional.softmax(logits_, dim =1)
        logit_ = logits_.squeeze().cpu().detach().numpy()
        pred_label = id2label[np.argmax(logit_)]
        true_label = id2label[np.argmax(label)]
        #print("predicted_label is:",pred_label ,"\n","Ground truth label is:",true_label)
        predicted.append(np.argmax(logit_))
        
predicted = np.asarray(predicted)      

In [None]:
from sklearn.metrics import accuracy_score, confusion_matrix,ConfusionMatrixDisplay

print(accuracy_score(y_true,predicted))

In [None]:
def plot_confusion_matrix(labels, pred_labels, classes):
    
    fig = plt.figure(figsize = (18, 16));
    ax = fig.add_subplot(1, 1, 1);
    cm = confusion_matrix(labels, pred_labels);
    cm = ConfusionMatrixDisplay(cm, display_labels = classes);
    cm.plot(values_format = 'd', cmap = 'Blues', ax = ax)
    plt.xticks(rotation = 20)
    
labels_arr = range(0, 43)
plot_confusion_matrix(y_true, predicted, labels_arr)

plt.savefig("confusion_matrix.png", bbox_inches = 'tight', pad_inches=0.5)

In [None]:
fig, axs = plt.subplots(5,4,figsize=(50,75))

for i in range(20):
    row = i // 4
    col = i % 4
    
    img_path = path + valid_df.iloc[i].Path
    img = cv2.imread(img_path)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    axs[row, col].imshow(img)
    title = "Pred: %s,\n Actual: %s" % (id2label[predicted[i]], id2label[y_true[i]])
    axs[row, col].set_title(title, fontsize=40)
    axs[row, col].axis('off')
plt.tight_layout()

plt.savefig("predictions.png", bbox_inches = 'tight', pad_inches=0.5)