In [1]:
import tarfile
import urllib.request as urllib2
import os
from os import listdir
from os.path import isfile, join
import re
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import matplotlib.image as img
import numpy as np
import scipy.io
import pandas as pd
import seaborn as sns
from PIL import *
from PIL import ImageFile
from PIL import Image

In [2]:
# download Stanford car dataset
def getting_data(url, path):
    data = urllib2.urlopen(url)
    tar_package = tarfile.open(fileobj=data, mode='r:gz')
    tar_package.extractall(path)
    tar_package.close()
    print("Data extracted and saved.")

getting_data("http://ai.stanford.edu/~jkrause/car196/car_ims.tgz", "./")

# download metadata
def getting_metadata(url, filename):
    labels = urllib2.urlopen(url)
    file = open(filename, 'wb')
    file.write(labels.read())
    file.close()
    print("Metadata downloaded and saved.")

getting_metadata("http://ai.stanford.edu/~jkrause/car196/cars_annos.mat", "car_metadata.mat")

Data extracted and saved.
Metadata downloaded and saved.


In [3]:
class MetaParsing():
    '''
    Class for parsing image and meta-data for the Stanford car dataset to create a custom dataset.
    path: The filepah to the metadata in .mat format.
    *args: Accepts dictionaries with self-created labels which will be extracted from the metadata (e.g. {0: 'Audi', 1: 'BMW', 3: 'Other').
    year: Can be defined to create two classes (<=year and later).
    '''
    def __init__(self, path, *args, year=None):
        # load metadata in matlab format
        self.mat = scipy.io.loadmat(path)
        # used to create cohort flag
        self.year = year
        # hold two mapping dictionary(s)
        self.args = args
        self.annotations = np.transpose(self.mat['annotations'])
        # extract the file name for each sample
        self.file_names = [annotation[0][0][0].split("/")[-1] for annotation in self.annotations]
        # extract the index of the label for each sample
        self.label_indices = [annotation[0][5][0][0] for annotation in self.annotations]
        # extract the car names as strings
        self.car_names = [x[0] for x in self.mat['class_names'][0]]
        # create a list with car names instead of label indices for each sample
        self.translated_car_names = [self.car_names[x-1] for x in self.label_indices]
      
    def brand_types(self, base_dict, x):
        y = list(base_dict.keys())[-1]
        # perform string-based matching
        for k,v in base_dict.items():
            if v in x: 
                y=k
        return y

    def parsing(self):
        result = []
        # retrieve the indexes of brand and type of vehicle
        for arg in self.args:
            temp_list = [self.brand_types(arg, x) for x in self.translated_car_names]
            result.append(temp_list)
        # retrieve the cohort tagging flag
        if self.year != None:
            years_list = [0 if int(x.split(" ")[-1]) <= self.year else 1 for x in self.translated_car_names]
            result.append(years_list)
        return result, self.file_names, self.translated_car_names

In [4]:
brand_dict = {0: 'Audi', 1: 'BMW', 2: 'Chevrolet', 3: 'Dodge', 4: 'Ford', 5: 'Other'}
vehicle_types_dict = {0: 'Convertible', 1: 'Coupe', 2: 'SUV', 3: 'Van', 4: 'Other'}
results, file_names, translated_car_names = MetaParsing("./car_metadata.mat", 
                                brand_dict, vehicle_types_dict, year=2009).parsing()

In [5]:
def count_classes(base_dict, base_list):
  for i in range(len(list(base_dict.keys()))):
    print("{}: {}".format(base_dict[i], str(base_list.count(i))))
# count of brand names
count_classes(brand_dict, results[0])
# count of type of vehicle
count_classes(vehicle_types_dict, results[1])

Audi: 1169
BMW: 1055
Chevrolet: 1799
Dodge: 1253
Ford: 1035
Other: 9874
Convertible: 1907
Coupe: 2143
SUV: 2855
Van: 832
Other: 8448


In [6]:
from torch.utils.data import Dataset, DataLoader
class CarDataset(Dataset):
  def __init__(self, car_path, transform, mapping_dict):
    self.path = car_path
    self.folder = [x for x in listdir(car_path)]
    self.transform = transform
    self.mapping_dict = mapping_dict

  def __len__(self):
    return len(self.folder)

  def __getitem__(self, idx):
    img_loc = os.path.join(self.path, self.folder[idx])
    image = Image.open(img_loc).convert('RGB')
    single_img = self.transform(image)
    # retrieve the corresponding labels
    label1 = mapping_dict[self.folder[idx]][0]
    label2 = mapping_dict[self.folder[idx]][1]
    label3 = mapping_dict[self.folder[idx]][2]
    sample = {'image':single_img, 'labels': {'label_brand':label1, 'label_vehicle_type':label2, 'label_cohort':label3}}
    return sample   

In [7]:
# the overall mapping dictionary
mapping_dict = dict(zip(file_names,list(zip(results[0],results[1],results[2]))))
# define a chain of pre-processing transformations
data_transforms = transforms.Compose([
        transforms.Resize((224,224)),
        transforms.ToTensor(),
        transforms.Normalize((0.5,0.5,0.5), (0.5,0.5,0.5))
    ])

# create the customized dataset instance
cardata = CarDataset("./car_ims", transform=data_transforms, mapping_dict=mapping_dict)

# split the data in training and testing
train_len = int(cardata.__len__()*0.8)
test_len = int(cardata.__len__()*0.2)
train_set, val_set = torch.utils.data.random_split(cardata, [train_len, test_len])

# create the dataloader for each dataset
train_loader = DataLoader(train_set, batch_size=16, shuffle=True, num_workers=2, drop_last=True)
test_loader = DataLoader(val_set, batch_size=16, shuffle=False, num_workers=2, drop_last=True)

In [8]:
sample = next(iter(train_loader))

In [9]:
sample['labels'].keys()

dict_keys(['label_brand', 'label_vehicle_type', 'label_cohort'])

In [10]:
print("Keys in the current batch: {}".format(sample.keys()))
print("Size for the images in the current batch: {}".format(sample['image'].shape))
print("Size for the brand target in the current batch: {}".format(sample['labels']['label_brand'].shape))
print("Brand indexes in the current batch: {}".format(sample['labels']['label_brand']))

Keys in the current batch: dict_keys(['image', 'labels'])
Size for the images in the current batch: torch.Size([16, 3, 224, 224])
Size for the brand target in the current batch: torch.Size([16])
Brand indexes in the current batch: tensor([5, 2, 3, 2, 5, 5, 3, 2, 4, 0, 5, 5, 5, 1, 1, 5])


In [11]:
import torchvision.models as models
# download the pre-trained weights for resnet34 model architecture
resnet = models.resnet34(pretrained=True)
# access all layers including the last classification head
list(resnet.children())[-3:]

[Sequential(
   (0): BasicBlock(
     (conv1): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
     (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (relu): ReLU(inplace=True)
     (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
     (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (downsample): Sequential(
       (0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
       (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     )
   )
   (1): BasicBlock(
     (conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
     (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (relu): ReLU(inplace=True)
     (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
     (bn2): Batc

In [12]:
# remove the last fully connected layer
model_wo_fc = nn.Sequential(*(list(resnet.children())[:-1]))

In [13]:
# generate output of the current sample batch
output_sample = model_wo_fc(sample['image'])
print(output_sample.shape)
print(torch.flatten(output_sample, 1).shape)

torch.Size([16, 512, 1, 1])
torch.Size([16, 512])


In [14]:
# flatten the output
output_sample_flatten = torch.flatten(output_sample, 1)
# add another FC layer
brand = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=6)
        )
brand(output_sample_flatten).shape

torch.Size([16, 6])

In [15]:
class MultilabelClassifier(nn.Module):
    def __init__(self, n_brand, n_vehicle_type, n_cohort):
        super().__init__()
        # download the backbone architecture
        self.resnet = models.resnet34(pretrained=True)
        # remove the last FC layer
        self.model_wo_fc = nn.Sequential(*(list(self.resnet.children())[:-1]))
        # define the branching head for brand classification
        self.brand = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=n_brand)
        )
        # define the branching head for vehicle type classification
        self.vehicle_type = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=n_vehicle_type)
        )
        # define the branching head for cohort classification
        self.cohort = nn.Sequential(
            nn.Dropout(p=0.2),
            nn.Linear(in_features=512, out_features=n_cohort)
        )
    # define the flow of the model
    def forward(self, x):
        x = self.model_wo_fc(x)
        x = torch.flatten(x, 1)
        return {
            'brand': self.brand(x),
            'vehicle_type': self.vehicle_type(x),
            'cohort': self.cohort(x)
        }

In [16]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = MultilabelClassifier(6, 5, 2).to(device)

In [17]:
def criterion(loss_func, outputs, pictures):
    losses = 0
    # sum all individual losses
    for i, key in enumerate(outputs):
        losses += loss_func(outputs[key], pictures['labels'][f'label_{key}'].to(device))
    return losses

def training(model, device, lr_rate, num_epochs, train_loader):
    losses = []
    checkpoint_losses = []
    optimizer = torch.optim.Adam(model.parameters(), lr=lr_rate)
    n_total_steps = len(train_loader)
    loss_func = nn.CrossEntropyLoss()
    # start model training in a nested loop
    for epoch in range(num_epochs):
        for i, pictures in enumerate(train_loader):
            # grab input images
            images = pictures['image'].to(device)
            # obtain model predictions
            outputs = model(images)
            # calculate current loss
            loss = criterion(loss_func, outputs, pictures)
            losses.append(loss.item())
            # clear historical gradients
            optimizer.zero_grad()
            # apply autograd
            loss.backward()
            # perform gradient descent update
            optimizer.step()

            if (i+1) % (n_total_steps) == 0:
                checkpoint_loss = torch.tensor(losses).mean().item()
                checkpoint_losses.append(checkpoint_loss)
                print (f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{n_total_steps}], Loss: {checkpoint_loss:.4f}')
    return checkpoint_losses

checkpoint_losses = training(model, device, 0.0001, 10, train_loader)

Epoch [1/10], Step [809/809], Loss: 1.8724
Epoch [2/10], Step [809/809], Loss: 1.3812
Epoch [3/10], Step [809/809], Loss: 1.0810
Epoch [4/10], Step [809/809], Loss: 0.8908
Epoch [5/10], Step [809/809], Loss: 0.7647
Epoch [6/10], Step [809/809], Loss: 0.6745
Epoch [7/10], Step [809/809], Loss: 0.6057
Epoch [8/10], Step [809/809], Loss: 0.5502
Epoch [9/10], Step [809/809], Loss: 0.5073
Epoch [10/10], Step [809/809], Loss: 0.4702


In [18]:
def validation(model, dataloader, *args):
  all_predictions = torch.tensor([]).to(device)
  all_true_labels = torch.tensor([]).to(device)

  with torch.no_grad():
      n_correct = []
      n_class_correct = []
      n_class_samples = []
      n_samples = 0

      for arg in args:
          n_correct.append(len(arg))
          n_class_correct.append([0 for i in range(len(arg))])
          n_class_samples.append([0 for i in range(len(arg))])

      for pictures in dataloader:
          images = pictures['image'].to(device)
          outputs = model(images)
          # obtain target outputs in a nested list
          labels = [pictures['labels'][label].to(device) for label in pictures['labels']]

          for i, out in enumerate(outputs):
              # retrieve the predicted class with the maximum logit for all samples in the batch
              _, predicted = torch.max(outputs[out], 1)
              # record the cumulative number of correct predictions
              n_correct[i] += (predicted == labels[i]).sum().item()
              # get the cumulative number of samples till current batch
              if i == 0:
                  n_samples += labels[i].size(0)

              for k in range(len(predicted)):
                  label = labels[i][k]
                  pred = predicted[k]
                  # get number of correct predictions for each category in each class
                  if label == pred:
                      n_class_correct[i][label] += 1
                  # get total number of predictions for each category in each class
                  n_class_samples[i][label] += 1
            
      return n_correct, n_samples, n_class_correct, n_class_samples

def class_acc(n_correct, n_samples, n_class_correct, n_class_samples, class_list):
    for i in range(len(class_list)):
        print("-------------------------------------------------")
        acc = 100.0 * n_correct[i] / n_samples
        print(f'Overall class performance: {round(acc,1)} %')
        for k in range(len(class_list[i])):
            acc = 100.0 * n_class_correct[i][k] / n_class_samples[i][k]
            print(f'Accuracy of {class_list[i][k]}: {round(acc,1)} %')
    print("-------------------------------------------------")

classes_brand = list(brand_dict.values())
classes_vehicle_type = list(vehicle_types_dict.values())
classes_epoch = ['2009 and earlier','2010 and later']
class_list = [classes_brand,classes_vehicle_type,classes_epoch]

n_correct, n_samples, n_class_correct, n_class_samples = validation(model, test_loader, 
                            classes_brand, classes_vehicle_type, classes_epoch)

class_acc(n_correct, n_samples, n_class_correct, n_class_samples,class_list)

-------------------------------------------------
Overall class performance: 90.8 %
Accuracy of Audi: 83.6 %
Accuracy of BMW: 89.6 %
Accuracy of Chevrolet: 78.6 %
Accuracy of Dodge: 84.3 %
Accuracy of Ford: 75.7 %
Accuracy of Other: 96.2 %
-------------------------------------------------
Overall class performance: 87.8 %
Accuracy of Convertible: 85.0 %
Accuracy of Coupe: 67.0 %
Accuracy of SUV: 91.9 %
Accuracy of Van: 85.1 %
Accuracy of Other: 92.4 %
-------------------------------------------------
Overall class performance: 90.6 %
Accuracy of 2009 and earlier: 88.6 %
Accuracy of 2010 and later: 91.4 %
-------------------------------------------------
