In [None]:
#Import Libraries
from __future__ import print_function, division
import argparse
import os
import time
import utils
import torch
import torchvision
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torch.autograd import Variable
from torchvision import transforms
from scipy.io import loadmat
from PIL import Image
import random
import torch.nn.functional as F
from torch.optim import lr_scheduler
import copy

from matplotlib.pyplot import imshow
import matplotlib.pyplot as plt

seed = 25
np.random.seed(seed)
torch.manual_seed(seed)

#using GPU
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [None]:
root_dir = "data/cars_train/"
car_annotations_path = "data/devkit/cars_train_annos.mat"
car_metadata_path = "data/devkit/cars_meta.mat"

#Load Meta Data
meta_data = loadmat(car_metadata_path)
meta_data = np.concatenate(meta_data["class_names"][0])

#nb_classes
nb_classes = len(meta_data)

#Load and split train, val and test samples
dataset = utils.Load_Images(root_dir=root_dir, annotations_path=car_annotations_path, seed=seed, test_split=0.2)

In [None]:
class car_dataset(Dataset):
    """
    Description:
        Pytorch Dataset class for reading Car Dataset meta files and images.
    Arguments:
        files (list, required): 
        root_dir (str, required): Root directory path
        meta_data (list, required): exctracted meta data
    Returns a Dictionary
    """
    def __init__(self, files, root_dir, meta_data, image_transform=None):
        
        self.root_dir = root_dir
        self.image_transform = image_transform
        
        #image file names
        self.image_files = [file[-1][0] for file in files]
        
        #Class ID
        self.id = [file[-2][0] - 1 for file in files]
        
        #Class Name
        self.class_name = [meta_data[file[-2][0] - 1][0] for file in files]
        
        #Get Car Year
        self.carYear, self.carYear_ID = utils.get_Year(self.class_name)

        #Get Car Maker
        self.carMaker, self.carMaker_ID = utils.get_Maker(self.class_name)
        
        #Get Car Type
        self.carType, self.carType_ID = utils.get_Type(self.class_name)
        
        #change and Move there is still time
        self.year_count = len(np.unique(self.carYear_ID))
        self.maker_count = len(np.unique(self.carMaker_ID))
        self.type_count = len(np.unique(self.carType_ID))
        
    def __len__(self):
        return len(self.id)
    
    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.image_files[idx])
        img = Image.open(img_path)
        
        if self.image_transform:
            img = self.image_transform(img)
        
        target = torch.from_numpy(np.array(self.id[idx]))[0]
        
        class_count = {
                      "year": self.year_count,
                      "maker": self.maker_count,
                      "type": self.type_count
                     }

        sample = {'Image':img, 'class_ID':target, "class_name":self.class_name[idx],
                 'year_ID':self.carYear_ID[idx], 'maker_ID':self.carMaker_ID[idx],
                 'type_ID':self.carType_ID[idx], "class_count":class_count}
        
        return sample
    
class ImbalancedDatasetSampler(torch.utils.data.sampler.Sampler):
    """Samples elements randomly from a given list of indices for imbalanced dataset creating
        a weight depeding of the frequency of the class
    Arguments:
        dataset (list, optional): a list of indices
        class_type (int, optional): 
    """
    def __init__(self, dataset, class_type):
                      
        self.indices = list(range(len(dataset)))
        
        self.num_samples = len(self.indices) 
        # distribution of classes in the dataset 
        label_to_count = {}
        for idx in self.indices:
            label = self._get_label(dataset, idx, class_type)
            if label in label_to_count:
                label_to_count[label] += 1
            else:
                label_to_count[label] = 1
        # weight for each sample
        weights = [1.0 / label_to_count[self._get_label(dataset, idx, class_type)] for idx in self.indices]
        self.weights = torch.DoubleTensor(weights)

    def _get_label(self, dataset, idx, class_type):
        return dataset[idx][class_type].item()
                
    def __iter__(self):
        return (self.indices[i] for i in torch.multinomial(
                self.weights, self.num_samples, replacement=True))

    def __len__(self):
        return self.num_samples

In [None]:
image_transformers = {'training': transforms.Compose([transforms.Resize((224, 224)),
                                                       transforms.RandomRotation(degrees=50),
                                                       transforms.RandomHorizontalFlip(0.8),
                                                       transforms.RandomPerspective(p=0.1),
                                                     transforms.ColorJitter(brightness=0.8, contrast=0.8),
                                                     transforms.ToTensor()]),
                      'validation': transforms.Compose([transforms.Resize((224, 224)),
                                                       transforms.ToTensor()])
                     }
    

training_data = car_dataset(dataset["training"],
                            root_dir = root_dir,
                            meta_data = meta_data,
                            image_transform = image_transformers["training"]
                           )

#
train_loader = torch.utils.data.DataLoader(training_data, batch_size=15, 
                                           sampler=ImbalancedDatasetSampler(training_data, "class_ID"))

#train_loader = torch.utils.data.DataLoader(training_data, batch_size=15, shuffle=True)

validation_data = car_dataset(dataset["validation"], 
                             root_dir = root_dir,
                             meta_data = meta_data,
                             image_transform  = image_transformers["validation"])

validation_loader = torch.utils.data.DataLoader(validation_data, 
                                                batch_size=15, shuffle=True)

dataloaders = {"training":train_loader, "validation":validation_loader}
dataSizes = {"training":len(dataset["training"]), "validation":len(dataset["validation"])}

#Get Count per each Class
class_year = training_data[0]["class_count"]["year"]
class_maker = training_data[0]["class_count"]["maker"]
class_type = training_data[0]["class_count"]["type"]

In [None]:
#Load Pretrained Model
#use Resnet 18 cause its small :)
fr_model = torchvision.models.resnet18(pretrained=True)

#freeze layers
for param in fr_model.parameters():
    param.requires_grad = False
    
print(fr_model)
num_ftrs = fr_model.fc.in_features
fr_model.fc = torch.nn.Linear(num_ftrs, 1024)

In [None]:
class multi_task_model(torch.nn.Module):
    def __init__(self, base_model,nb_maker_classes, nb_type_classes):
        super(multi_task_model, self).__init__()
        
        self.base_model = base_model
        
        self.x1 = torch.nn.Linear(1024, 512)
        torch.nn.init.xavier_normal_(self.x1.weight) 
        self.bn1 = torch.nn.BatchNorm1d(512, eps = 2e-1)
        
        
        self.x2 = torch.nn.Linear(512,512)
        torch.nn.init.xavier_normal_(self.x2.weight)
        self.bn2 = torch.nn.BatchNorm1d(512, eps = 2e-1)
        
        #adding one more on top of the heads
        self.x3 = torch.nn.Linear(512, 256)
        torch.nn.init.xavier_normal_(self.x3.weight)
        
        self.x4 = torch.nn.Linear(256, 256)
        torch.nn.init.xavier_normal_(self.x4.weight)
        
        #Auxiliary tasks
        self.y_maker = torch.nn.Linear(256, nb_maker_classes)
        torch.nn.init.xavier_normal_(self.y_maker.weight)
        self.y_type = torch.nn.Linear(256, nb_type_classes)
        torch.nn.init.xavier_normal_(self.y_type.weight)
        
        #Main Task
        self.y = torch.nn.Linear(256, 196)
        torch.nn.init.xavier_normal_(self.y.weight)
        
    def forward(self, x):
        
        x1 = self.base_model(x)
        
        x1 = self.bn1(F.relu(self.x1(x1)))
        
        #My auxiliary Tasks
        y_maker_output = F.softmax(self.y_maker(self.x3(x1)))
        y_type_output = F.softmax(self.y_type(self.x3(x1)))
        
        '''
                #My auxiliary Tasks
        y_maker_output = F.softmax(self.y_maker(self.x3(x1)))
        y_type_output = F.softmax(self.y_type(self.x3(x1)))
        '''
        
        x1 = self.bn2(F.relu(self.x2(x1)))
        
        y_class_output = F.softmax(self.y_maker(self.x3(x1)))
        
        return y_maker_output, y_type_output