In [None]:
!pip -q install mat73 torcheval
!pip install -U albumentations

In [None]:
import os
import cv2
import json
import scipy.io
import shutil
import yaml
import requests
import random
import numpy as np
import torch
import torch.nn as nn

from path import Path
from tqdm import tqdm
from PIL import Image

import matplotlib.pyplot as plt
import albumentations as A
from albumentations.pytorch.transforms import ToTensorV2, ToTensor

from torch.utils.data import Dataset, DataLoader
from torcheval.metrics.functional import multiclass_f1_score, multiclass_accuracy
%matplotlib inline

In [None]:
ROOT_DIR = Path('/kaggle/')
INPUT_DIR = ROOT_DIR / "input"
WORKING_DIR = ROOT_DIR / "working"
DATASET_DIR = INPUT_DIR / "stanford-cars-dataset"
ANNOTATIONS_DIR = INPUT_DIR / "stanford-cars-dataset-annotations"

TRAIN_DIR = WORKING_DIR / "train"
VAL_DIR = WORKING_DIR / "val"

In [None]:
!ls $DATASET_DIR

In [None]:
class StanfordCarsDataset(Dataset):
    def __init__(self, source_path: str, annotation_path: str, transforms: callable = None):
        self.source_paths = self.__load_imgs_paths(source_path)
        self.labels = self.__load_labels(annotation_path)
        self.transforms = transforms

    def __load_imgs_paths(self, path: Path) -> list[Path]:
        if not os.path.exists(path):
            raise ValueError("Not found file %s" % str(path))
        return [path / filename for filename in os.listdir(path) if os.path.isfile(path / filename)]
    
    def __load_labels(self, path: Path):
        if not os.path.exists(path):
            raise ValueError("Not found file %s" % str(path))
            
        lable_mat = scipy.io.loadmat(path)
        labels = {}
        for arr in lable_mat['annotations'][0]:
            filename, label = str(arr[5][0]), int(arr[4][0,0])-1
            labels[filename] = label

        return labels
    
    @property
    def features_amount(self) -> int:
        return len(set(self.labels))

    def __len__(self):
        return len(self.source_paths)

    def __getitem__(self, index):
        img_path = self.source_paths[index]
        img = Image.open(img_path).convert('RGB')
        label = self.labels[os.path.basename(img_path)]
    
        if self.transforms:
            img = self.transforms(image=np.array(img))['image']
            img = img.to(torch.float)
            
        return img, label

In [None]:
transform = A.Compose([
    A.Resize(width=224, height=224),
    A.HorizontalFlip(p=0.5),
    A.RandomBrightnessContrast(p=0.2),
    ToTensorV2(),
])

test_transform = A.Compose([
    A.Resize(width=224, height=224),
    ToTensorV2(),
])

In [None]:
train_dataset = StanfordCarsDataset(
    source_path=DATASET_DIR / "cars_train/cars_train",
    annotation_path=ANNOTATIONS_DIR / "cars_train_annos.mat",
    transforms=transform,
)

In [None]:
test_dataset = StanfordCarsDataset(
    source_path=DATASET_DIR / "cars_test/cars_test",
    annotation_path=ANNOTATIONS_DIR / "cars_test_annos_withlabels_eval.mat",
    transforms=test_transform,
)

In [None]:
train_image, lable = train_dataset[0]
test_image, lable = test_dataset[0]

In [None]:
test_image

In [None]:
def display_image_grid():
    rows = 1
    cols = 2
    figure, ax = plt.subplots(nrows=rows, ncols=cols, figsize=(12, 6))
    for i, t in enumerate([train_dataset[random.randint(1, 1000)], test_dataset[random.randint(1, 1000)]]):
        img, label = t
        ax.ravel()[i].imshow(img.T)
        ax.ravel()[i].set_title(label)
        ax.ravel()[i].set_axis_off()
    plt.tight_layout()
    plt.show()

In [None]:
display_image_grid()

In [None]:
train_loader = DataLoader(dataset=train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(dataset=test_dataset, batch_size=32, shuffle=True)

In [None]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
def depthwise_block1(in_c1, k1, iterations, dep_stride=2):
    in_c = in_c1
    k = k1
    init = nn.Sequential()

    for i in range(iterations):
        init.add_module("some block", nn.Sequential(
            nn.Conv2d(in_c, in_c, 3, stride=dep_stride, padding=1, groups=in_c),
            nn.BatchNorm2d(num_features=in_c),
            nn.ReLU(),
            nn.Conv2d(in_c, k*in_c, 1, stride=1, padding=0),
            nn.BatchNorm2d(num_features=k*in_c),
            nn.ReLU()
        ))
        in_c = k*in_c
    return init

In [None]:
class MobileNetV1(nn.Module):
    def __init__(self, in_channels=3, in_size=(224, 224), num_classes=train_dataset.features_amount, verbouse: bool = False):
        super(MobileNetV1, self).__init__()
        
        self._verbouse = verbouse
        
        self.in_size = in_size
        self.num_classes = num_classes

        self.initial_layer = nn.Sequential(
            nn.Conv2d(in_channels, 32, 3, stride=2, padding=1),
            nn.BatchNorm2d(num_features=32),
            nn.ReLU(),
        )

#         self.depthwise_layers = nn.Sequential()

#         self.depthwise_layers.add_module("some block 1",depthwise_block1(32, 2, 1, 1))
        self.depthwise_layers1 = depthwise_block1(32, 2, 1, 1)

#         self.depthwise_layers.add_module("some block 2",depthwise_block1(64, 2, 1, 2))
        self.depthwise_layers2 = depthwise_block1(64, 2, 1, 2)

#         self.depthwise_layers.add_module("some block 3",depthwise_block1(128, 1, 1, 1))
        self.depthwise_layers3 = depthwise_block1(128, 1, 1, 1)

#         self.depthwise_layers.add_module("some block 4", depthwise_block1(128, 2, 1, 2))
        self.depthwise_layers4 = depthwise_block1(128, 2, 1, 2)

#         self.depthwise_layers.add_module("some block 5",depthwise_block1(256, 1, 1, 1))
        self.depthwise_layers5 = depthwise_block1(256, 1, 1, 1)

#         self.depthwise_layers.add_module("some block 6",depthwise_block1(256, 2, 1, 2))
        self.depthwise_layers6 = depthwise_block1(256, 2, 1, 2)

#         self.depthwise_layers.add_module("some block 7",depthwise_block1(512, 1, 5, 1))
        self.depthwise_layers7 = depthwise_block1(512, 1, 5, 1)

#         self.depthwise_layers.add_module("some block 8",depthwise_block1(512, 2, 1, 2))
        self.depthwise_layers8 = depthwise_block1(512, 2, 1, 2)

#         self.depthwise_layers.add_module("some block 9",depthwise_block1(1024, 1, 1, 2))
        self.depthwise_layers9 = depthwise_block1(1024, 1, 1, 2)
    
        self.pooling_layers = nn.AvgPool2d(kernel_size=7, stride=1)
        self.fc_layer = nn.Linear(in_features=1024, out_features=num_classes)
        self.classifier_layer = nn.Softmax()

    def forward(self, x):
        print("Input", x.shape) if self._verbouse else None
        x = self.initial_layer(x)
        print("Initial", x.shape) if self._verbouse else None
#         x = self.depthwise_layers(x)
        x =  self.depthwise_layers1(x)
        print("Depthwise 1", x.shape) if self._verbouse else None
        x =  self.depthwise_layers2(x)
        print("Depthwise 2", x.shape) if self._verbouse else None
        x =  self.depthwise_layers3(x)
        print("Depthwise 3", x.shape) if self._verbouse else None
        x =  self.depthwise_layers4(x)
        print("Depthwise 4", x.shape) if self._verbouse else None
        x =  self.depthwise_layers5(x)
        print("Depthwise 5", x.shape) if self._verbouse else None
        x =  self.depthwise_layers6(x)
        print("Depthwise 6", x.shape) if self._verbouse else None
        x =  self.depthwise_layers7(x)
        print("Depthwise 7", x.shape) if self._verbouse else None
        x =  self.depthwise_layers8(x)
        print("Depthwise 8", x.shape) if self._verbouse else None
#         x =  self.depthwise_layers9(x)
#         print("Depthwise 9", x.shape)
        x = self.pooling_layers(x)
        print("Pooling", x.shape) if self._verbouse else None
        x = self.classifier_layer(x)
        print("Classifier", x.shape) if self._verbouse else None
        return x

In [None]:
def train_epoch(model: torch.nn.Module, dataloader: DataLoader, optimizer, loss_fn):
    total_loss = 0

    model.train(True)

    for i, t in enumerate(tqdm(dataloader)):
        inputs, labels = t

        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        outputs = model(inputs)
        outputs = outputs.view(outputs.shape[0], -1)
        
        loss = loss_fn(outputs, labels)
        loss.backward()
        total_loss += loss.item()

        optimizer.step()

    return total_loss / len(dataloader)

In [None]:
def validate(model, dataloader, loss_fn):
    total_loss = 0
    pred_labels_ids = torch.Tensor().type(torch.int64)
    true_labels_ids = torch.Tensor().type(torch.int64)

    model.eval()

    with torch.no_grad():
        for i, t in enumerate(tqdm(dataloader)):
            inputs, labels = t

            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model.forward(inputs)
            outputs = outputs.view(outputs.shape[0], -1)
            
            loss = loss_fn(outputs, labels)

            total_loss += loss.item()
            true_labels_ids = torch.cat((true_labels_ids, labels.cpu().squeeze()))
            pred_labels_ids = torch.cat((pred_labels_ids, outputs.cpu().argmax(dim=-1, keepdim=False)))

    # compute loss and metrics
    avg_vloss = total_loss / len(dataloader)
    accuracy = multiclass_accuracy(pred_labels_ids, true_labels_ids)
    f1 = multiclass_f1_score(pred_labels_ids, true_labels_ids, num_classes=test_dataset.features_amount, average='weighted')
    f1_macro = multiclass_f1_score(pred_labels_ids, true_labels_ids, num_classes=test_dataset.features_amount, average='macro')

    return avg_vloss, accuracy.detach().numpy(), f1.detach().numpy(), f1_macro.detach().numpy()

In [None]:
def train_loop(model, train_loader, test_loader, optimizer, loss_fn, prefix, epochs=10):
    best_loss = 1e10

    storage = {'Train loss': [], 'Valid loss': [], 'Accuracy': [], 'F1': [], 'F1 macro': []}

    for epoch_number in range(epochs):
        print(f'\nEpoch {epoch_number+1}:')

        avg_train_loss = train_epoch(model, train_loader, optimizer, loss_fn)
        avg_val_loss, accuracy, f1, f1_macro = validate(model, test_loader, loss_fn)

        print('\nLoss train: {} valid {}'.format(avg_train_loss, avg_val_loss))
        print('Accuracy:', accuracy)
        print('F1:', f1)
        print('F1 macro:', f1_macro)

        storage['Train loss'].append(avg_train_loss)
        storage['Valid loss'].append(avg_val_loss)
        storage['Accuracy'].append(accuracy)
        storage['F1'].append(f1)
        storage['F1 macro'].append(f1_macro)

        # save best model
        if avg_val_loss < best_loss:
            best_loss = avg_val_loss
            model_path = f'{prefix}_{epoch_number+1}'
            torch.save(model.state_dict(), model_path)

    return storage

In [None]:
def plot_history(history):
    plt.plot(history['Train loss'])
    plt.plot(history['Valid loss'])
    plt.title('model loss')
    plt.ylabel('loss')
    plt.xlabel('epoch')
    plt.legend(['train', 'val'], loc='upper left')
    plt.show()

    plt.plot(history['Accuracy'])
    plt.plot(history['F1'])
    plt.plot(history['F1 macro'])
    plt.title('metrics')
    plt.ylabel('metrics')
    plt.xlabel('epoch')
    plt.legend(['Accuracy', 'F1', 'F1 macro'], loc='upper left')
    plt.show()

In [None]:
model = MobileNetV1().to(device)
ce = torch.nn.CrossEntropyLoss().to(device)
adam = torch.optim.Adam(model.parameters(), lr=0.0005)
history = train_loop(model, train_loader, test_loader, adam, ce, 'test', 15)
plot_history(history)