# Car Identification Dataset

# Preliminary EDA from metadata files

In [None]:
import pandas as pd
from PIL import Image
import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
import numpy as np
import os

In [None]:
names_df = pd.read_csv('data/annotations/class_names.csv', header=None)
names_df.rename(columns={0:"class_names"}, inplace=True)
names_df.index = (np.arange(1, len(names_df) + 1))

data_df = pd.read_csv('data/annotations/cars_annos.csv', sep=";")

In [None]:
df = pd.merge(data_df, names_df, how='inner', left_on="class", right_index=True)
col = {x:x.lower() for x in df.columns}
df["class"] = df["class"].apply(lambda x: x-1)
df.rename(columns=col, inplace=True)

df.head()

In [None]:
print(f"The minimum in class is {df['class'].min()} and the maximum in class is {df['class'].max()}")
# print(df["class"].unique())

In [None]:
# Check to see if the dataset split is already fair by ensuring stratification of classes over the train and test set
test_df = df.loc[df["test"]==1]
train_df = df.loc[df["test"]==0]

print(f"total number of unique classes entire dataset {df['class'].nunique()}")
print(f'total number of unique classes in train_set is {test_df["class"].nunique()}')
print(f'total number of unique classes in test_set is {train_df["class"].nunique()}')

# Saving out the training and testing dataframes

- training dataframe
  - to be further split into training and validation dataset for training
- testing dataframe
  - to be kept as a hold out set for final testing

In [None]:

train_df.to_csv("data/annotations/train_df.csv", index=False) # "data/annotations/train_df.csv"
test_df.to_csv("data/annotations/test_df.csv", index=False)  # "data/annotations/test_df.csv"

In [None]:
# Random check to see if bounding boxes are able to completely cover the car in question

plt.figure(figsize=(20,20))
for i in range(5):
    rand = np.random.randint(len(df))
    img_name, x1, y1, x2, y2, _, _, class_names = df.iloc[rand,:]

    width = x2-x1
    height = y2-y1
    print(img_name, x1, y1, x2, y2, width, height, class_names)
    
    img = Image.open(f'data/car_ims/{img_name}')
    ax=plt.subplot(1,5,i+1)
    plt.imshow(img)
    rect = Rectangle((x1, y1), width, height, linewidth=1.5, edgecolor='r', facecolor='none')
    plt.gca().add_patch(rect)
    plt.title(f"{class_names}")

plt.show()

In [None]:
# Check to see if there are different modes in the dataset RGB vs RGBA, seems like all are jpg
df["image"].apply(lambda x: os.path.splitext(x)[1]).unique()
# In retrospect, this check was not sufficient as there were several images that had only one channel. 
# Alternate method was to use os.walk to return a list of metadata from the Images themselves

# Model Training
This section shows the experimentation stage for creation of dataset, dataloader, model, train and evalution loops.

The actual model and training was conducted using the .py files. 

While all effort has been taken to ensure consistency. In the event of discrepency please refer to .py file.

In [10]:
import os
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms as T
import torchvision.transforms.functional as TF
import torch.nn as nn
from torch.optim import Adam
from torchvision.models import resnet50, ResNet50_Weights, resnet101, ResNet101_Weights
from PIL import Image
from typing import Dict
from tqdm import tqdm 

from torch.utils.tensorboard import SummaryWriter
from src.config.load_config import read_yaml_file

In [2]:
cfg = read_yaml_file()

# Loading of training dataset
train_df_path = cfg["training"]["train_df_path"]
train_df = pd.read_csv(train_df_path) 
class_names = [x for x in train_df["class_names"].unique()]
num_class = train_df["class"].nunique()

In [3]:
class CarsDataset(Dataset):

    def __init__(self, csv_file:str, root_dir:str, transform=None, custom_crop=False)->Dict:
        """ Initializes the CarsDataset with the necessary variables
        Args:
            csv_file (str): Path to a CSV file containing the image paths and labels.
            root_dir (str): Root directory where the images are stored.
            transform (callable, optional): A function/transform that takes in a PIL image
                and returns a transformed version. Default: None.
            custom_crop (bool, optional): Whether to crop the images based on bounding box
                coordinates specified in the CSV file. Default: False.
        """
        self.dataframe = pd.read_csv(csv_file)
        self.root_dir = root_dir
        self.transform = transform
        self.custom_crop = custom_crop

    def __len__(self):
        """ Returns the number of samples in the dataset.
        Returns:
            int: The number of samples in the dataset.
        """
        return len(self.dataframe)

    def __getitem__(self, idx:str):
        """ Loads and preprocesses the image and label at the specified index.
            Provides an avenue to perform transformations on the dataset.
        Args:
            idx (int): The index of the sample to load.
        Returns:
            tuple: A tuple containing the preprocessed image tensor and its corresponding label tensor.
        """
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.root_dir, self.dataframe.image[idx])
        img = Image.open(img_name).convert("RGB")
        label = self.dataframe["class"][idx]
        left, top, right, bottom = self.dataframe.x1[idx], self.dataframe.y1[idx], self.dataframe.x2[idx], self.dataframe.y2[idx]
        
        if self.custom_crop:
            img = self._custom_crop(img, left, top, right, bottom)

        if self.transform:
            img = self.transform(img)

        return img, torch.tensor(int(label))

    def _custom_crop(self, img:Image, left:str, top:str, right:str, bottom:str)->Image:
        """ Crops the image based on bounding box coordinates.
        Args:
            img (PIL.Image): The image to crop.
            left (int): The left coordinate of the bounding box.
            top (int): The top coordinate of the bounding box.
            right (int): The right coordinate of the bounding box.
            bottom (int): The bottom coordinate of the bounding box.
        Returns:
            PIL.Image: The cropped image.
        """
        width = right-left
        height = bottom-top
        img = TF.crop(img, top, left, height, width)

        return img

In [4]:
# normalize mean and std from imagenet pretrained

# Set inital parameters of the training loop
epochs = cfg["training"]["epochs"]
batch_size = cfg["training"]["batch_size"]
custom_crop = cfg["training"]["custom_crop"]
# mean and standard dev as per pre-trained imagenet dataset (https://pytorch.org/hub/pytorch_vision_resnet/)
mean = [0.485, 0.456, 0.406] 
std = [0.229, 0.224, 0.225]
transform = T.Compose([
                T.Resize([224,224]),
                T.ToTensor(),
                T.Normalize(mean=mean, std=std),
                ])
train_num = int(round(0.8*len(train_df)))
valid_num = int(round(0.2*len(train_df)))

In [5]:
# Create dataset for training run
path_to_data = os.path.join(os.getcwd(), "data/car_ims")
dataset = CarsDataset(csv_file=train_df_path, root_dir=path_to_data, transform=transform, custom_crop=custom_crop)

# Split dataset into training and validation runs 
train_set, valid_set = torch.utils.data.random_split(dataset, [train_num, valid_num])

# Load the datasets into train and validation loaders respectively
train_loader = DataLoader(train_set, batch_size=batch_size, num_workers=4, shuffle=True)
valid_loader = DataLoader(valid_set, batch_size=batch_size, num_workers=4, shuffle=True)

# Model Training

In [6]:
class ClassifierModel(nn.Module):

    def __init__(self, num_class:int):
        """ Initializes the ClassifierModel instance
            Added a dropout to the last linear layer and amended out_features to num_class
        Args:
            num_class (int): The number of classes in the classification problem.
        """        
        super().__init__()
        self.cfg = read_yaml_file()

        if self.cfg["training"]["model_name"] == "resnet101":
            model_func = resnet101
            weight = ResNet101_Weights.IMAGENET1K_V2
        elif self.cfg["training"]["model_name"] == "resnet50":
            model_func = resnet50
            weight = ResNet50_Weights.IMAGENET1K_V2
        else:
            print(f"Model not found, please ensure model loaded in model.py file")

        if self.cfg["inference"]["run_inference"]==True:
            self.model = model_func()
        else:
            self.model = model_func(weights=weight)

        num_ftrs = self.model.fc.in_features
        self.model.fc = nn.Sequential(nn.Dropout(0.5), nn.Linear(num_ftrs, num_class))

    def forward(self, x:torch.Tensor)-> torch.Tensor:
        x = self.model(x)
        return x


In [11]:
# Instantiate Model
model = ClassifierModel(num_class)
if torch.cuda.is_available():
    model.to("cuda")
loss_fn = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001) 

In [None]:
# Instantiate tensorboard for metrics tracking
writer = SummaryWriter()

max_valid_acc=0.0
for e in range(epochs):
    
    # Training Loop
    train_loss = 0.0
    model.train()
    for data, labels in tqdm(train_loader):
        if torch.cuda.is_available():
            data, labels = data.to("cuda"), labels.to("cuda")
        optimizer.zero_grad()
        target = model(data)
        loss = loss_fn(target,labels)
        loss.backward()
        optimizer.step()
        train_loss += loss.item()
    
    training_loss = train_loss/len(train_loader)
    writer.add_scalar("Loss/Train", training_loss, e)
    
    print(f'Epoch {e+1} | Training Loss: {training_loss:.4f}')

    # Validation Loop
    valid_loss = 0.0
    valid_correct = 0
    model.eval()    
    for batch_idx, (data, labels) in enumerate(tqdm(valid_loader)):
        if torch.cuda.is_available():
            data, labels = data.cuda(), labels.cuda()
        
        target = model(data)
        loss = loss_fn(target,labels)
        valid_loss = loss.item() * data.size(0)

        # Storing of grid images in tensorboard,
        # Commented out for faster training times
        # grid = make_grid(data)
        # writer.add_image("images", grid)
        # writer.add_graph(model, data)

        _, predicted = torch.max(target.detach(), 1)
        valid_correct += (predicted == labels).sum().item()

    valid_loss = valid_loss/len(valid_loader)
    valid_acc = (valid_correct / len(valid_loader.dataset)) * 100
    writer.add_scalar("Correct/Val", valid_correct ,e)
    writer.add_scalar("Loss/Val", valid_loss, e)
    writer.add_scalar("Acc/Val", valid_acc, e)

    print(f'Epoch {e} | Validation Loss: {valid_loss:.4f} | Validation Accuracy: {valid_acc:.2f}%')
    
    # Save out the models only if its accuracy exceed config amount 
    # and does better than the highest accuracy so far
    save_above = cfg["training"]["save_above"]
    if (valid_acc>save_above) and (valid_acc>max_valid_acc):
        max_valid_acc = valid_acc
        
        model_name = cfg["training"]["model_name"]
        if custom_crop:
            name = f"{model_name}_{valid_acc:.2f}_crop"
        else:
            name = f"{model_name}_{valid_acc:.2f}_nocrop"
        
        model_path = os.path.join(os.getcwd(), f"models/{name}.pth") 
        torch.save(model.state_dict(), model_path)
    
        # Log out each layer of the model per epoch
        for name, weight in model.named_parameters():
            writer.add_histogram(name,weight, e)
            writer.add_histogram(f'{name}.grad',weight.grad, e)

writer.close()