## Kaggle соревенование "Dirty vs Cleaned"
 https://www.kaggle.com/c/platesv2/overview
<br><br>Необходимо решить задачу классификации тарелок на две категории "чистые" и "грязные".
Одна из особенностей задачи --- размер тренировчного набора данных. Всего 40 картинок, по 20 картинок на каждый класс. Тестовая выборка состоит из 744 картинок.
<br>
<br>
Примеры изображений из тренировочной выборки:
<br>
<img style="float: left;" src="0014.jpg">
<img  src="0013.jpg">

<br>Подключение к диску, импорт библиотек и фиксирование random_seed для частичной воспроизводимости.

In [0]:
from google.colab import drive

In [None]:
drive.mount('/content/gdrive')

In [None]:
%cd 'gdrive/My Drive/Colab Notebooks/CV_Course/Kaggle/plates/plates/'

In [None]:
import numpy as np
import os

In [0]:
import torch
import random

random.seed(8)
np.random.seed(8)
torch.manual_seed(8)
torch.cuda.manual_seed(8)
torch.backends.cudnn.deterministic = True

In [0]:
import shutil 

train_dir = 'train'
val_dir = 'val'

class_names = ['cleaned', 'dirty']

## Подготовка данных и аугментации
<br>Функция для цветовой инверсии

In [0]:
import random
from PIL import ImageOps
import torchvision.transforms.functional as F

def my_inverse_transformation(img, p=0.5):
    if not F._is_pil_image(img):
        raise TypeError('img should be PIL Image. Got {}'.format(type(img)))
    if random.random() > p:
        return ImageOps.invert(img)
    else:
        return img

<br>Класс трансформации для сегментации тарелки и отделения ее от фона с помощью преобразования Хофа.   

In [0]:
import cv2
from PIL import Image
from torchvision.transforms.functional import _is_pil_image
from torchvision import transforms, models


class Segmentate:
    center_crop = transforms.CenterCrop(224)
    initThresh = 105

    def __init__(self):
        pass

    def __call__(self, img):
        if not _is_pil_image(img):
            raise TypeError('Img should be PIL Image. Got {}'.format(type(img)))

        cimg = img.copy()
        img = np.array(img)

        # Convert to gray-scale
        gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)

        p1 = self.initThresh
        p2 = self.initThresh * 0.4

        # Detect circles using HoughCircles transform
        circles = cv2.HoughCircles(gray, cv2.HOUGH_GRADIENT, 1, 500, param1=p1, param2=p2, minRadius=10,
                                   maxRadius=170)

        t = 400
        if circles is None:
            return self.center_crop(cimg)

        c = np.uint16(np.around(circles))[0, 0]

        # Draw the outer circle
        cv2.circle(img, (c[0], c[1]), c[2] + t // 2 - 15, (0, 0, 0), t)

        thr = -10
        # Centering ad cropping
        try:
            img = img[c[1] - c[2] - thr:c[1] + c[2] + thr, c[0] - c[2] - thr:c[0] + c[2] + thr]
            pil_img = Image.fromarray(img)
        except ValueError:
            return cimg

        return pil_img

<br>Класс трансформации для изменения контраста изображения.

In [0]:
class ChangeContrast:
    def __init__(self, level):
        self.level = level
        
    def __call__(self, img):
        self.factor_ = (259 * (self.level + 255)) / (255 * (259 - self.level))
        def contrast(c):
            return 128 + self.factor_ * (c - 128)
        return img.point(contrast)
    
    def __repr__(self):
        return self.__class__.__name__ + '(contrast_level={})'.format(self.level)

<br>Создание datasets.ImageFolder и DataLoader для загрузки картинок, создание батчей и автоматического применения трансформаций.

In [0]:
import torch
import torchvision
import matplotlib.pyplot as plt
import time
import copy
import PIL

shift_const = [0.485, 0.456, 0.406]
scale_const = [0.229, 0.224, 0.225]

train_transforms = transforms.Compose([
    transforms.Pad(50, padding_mode='edge'),
    transforms.RandomRotation((0, 360), expand=True),
    transforms.CenterCrop(224),    
    # ChangeContrast(65),
    # Segmentate(),
    # transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomVerticalFlip(),
    # transforms.ColorJitter(0.5, 0.6, 0.4, 0.25),
    # transforms.Lambda(lambda x: my_inverse_transformation(x, 0.3)),
    transforms.ToTensor(),
    # transforms.Lambda(lambda x: x[np.random.permutation(3), :, :]),
    transforms.Normalize(shift_const, scale_const)
])

val_transforms = transforms.Compose([
    transforms.CenterCrop(224),
    # ChangeContrast(65),
    # Segmentate(),
    # transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(shift_const, scale_const)
])

train_dataset = torchvision.datasets.ImageFolder(train_dir, train_transforms)
val_dataset = torchvision.datasets.ImageFolder(val_dir, val_transforms)

batch_size = 20

train_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=batch_size, shuffle=True, num_workers=batch_size)
val_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=batch_size, shuffle=True, num_workers=batch_size)


In [None]:
print(len(train_loader), len(train_dataset))
print(len(val_loader), len(val_dataset))

<br>Функции для отображения данных

In [0]:
def show_input(input_tensor, title=''):
    image = input_tensor.permute(1, 2, 0).numpy() * scale_const + shift_const
    plt.imshow(image.clip(0, 1))
    plt.title(title)
    plt.show()
    plt.pause(0.001)

<br>Визуализируем данные после применения аугментаций

In [None]:
X_batch, y_batch = next(iter(train_loader))

for x_item, y_item in zip(X_batch, y_batch):
    show_input(x_item, title=class_names[y_item])

Примеры изображений:
<br>
<img style="float: left;" src="Безымянный3.jpg">
<img  src="Безымянный4.jpg">

## Создание моделей
<br>Модуль pretrainedmodels для предобученных моделей для дальнейшего transfer learning

In [None]:
!pip install pretrainedmodels

<br>Функции инциализации претреннированных на ImageNet моделей. Последний слой отрезается и заменяется на подходящий полносвязный слой.

In [0]:
def make_pnasnet():
    import pretrainedmodels
    model = pretrainedmodels.pnasnet5large(num_classes=1000)
    model.last_linear = torch.nn.Linear(model.last_linear.in_features, 2)

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    return model


def make_xception():
    import pretrainedmodels
    model = pretrainedmodels.xception()
    model.last_linear = torch.nn.Linear(model.last_linear.in_features, 2)

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    return model


def make_resnext():
    import pretrainedmodels
    model = pretrainedmodels.se_resnext50_32x4d()
    model.last_linear = torch.nn.Linear(model.last_linear.in_features, 2)

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    return model

def make_inception_resnet_v2():
    import pretrainedmodels
    model = pretrainedmodels.inceptionresnetv2()

    model.last_linear = torch.nn.Linear(model.last_linear.in_features, 2)
    
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    return model

def make_vgg():
    model = models.vgg19_bn(pretrained=True)
    child_counter = 0
    for child in model.features.children():
        if child_counter < 48:
            for param in child.parameters():
                param.requires_grad = False
        child_counter += 1
        
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    return model

def make_resnet():
    model = models.resnet18(pretrained=True)
    for param in model.parameters():
        param.requires_grad = False

    model.fc = torch.nn.Linear(model.fc.in_features,2)

    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    model = model.to(device)
    return model

## Обучение моделей
<br>Класс для поиска наилучшей комбинации гиперпараметров перебором по сетке параметров. Процедура особенно необходма из-за маленького количества обучающих примеров, которые образуют сильно неустойчивое, "рваное" пространство, в котором изменяются веса. Поэтому каждую тренировку надежнее проводить таким способом.

In [0]:
from torchvision import models
from itertools import product

class GridSearch():
    def __init__(self, loss, optimizer=torch.optim.SGD, optimizer_params={}, scheduler=torch.optim.lr_scheduler.StepLR, scheduler_params={}):
        """
        [object]_params : dict
            Словарь, значения которого - списки значений именованных параметров объекта [object] 
            ----
            Пример:
                scheduler_params={'step_size': [7], 'gamma': [0.1, 0.01, 0.001]}
        """
        self.loss = loss
        self.optimizer = optimizer
        self.optimizer_params = optimizer_params
        self.scheduler = scheduler
        self.scheduler_params = scheduler_params
        self.best_model = ...
        self.best_acc = 0
        
    def tune(self, create_model_func):
        
        self.param_grid_ = list(product(*{**self.optimizer_params, **self.scheduler_params}.values()))
        self.param_grid_score_ = {}
        
        self.param_grid_len_ = len(self.param_grid_)
        for (i, param_set) in enumerate(self.param_grid_):
            print('Parameters set {}/{}'.format(i, self.param_grid_len_), end='\n')
            print(param_set)

            self.model_ = create_model_func()
            
            if self.optimizer == torch.optim.SGD:
                self.optimizer_ = self.optimizer(self.model_.parameters(), *param_set[:len(self.optimizer_params)])
            elif self.optimizer == torch.optim.Adam:
                self.optimizer_ = self.optimizer(self.model_.parameters(), *param_set[:len(self.optimizer_params)])
            else:
                raise RuntimeError('unknown type of optimizer')
                
            self.scheduler_ = self.scheduler(self.optimizer_, *param_set[len(self.optimizer_params):])

            _, best_model, score = train_model(self.model_, self.loss, self.optimizer_, self.scheduler_, n_epochs=26)
            if score > self.best_acc:
                self.best_model = copy.deepcopy(best_model)
                self.best_acc = score
            else:
                del best_model
#             self.current_score_ = sum(score[-3:]) / 3
            
#             self.param_grid_score_[str(param_set)[1:-1]] = self.current_score_
#         self.best_score_ = max(self.param_grid_score_.values())
#         self.best_params_ = max(self.param_grid_score_, key=lambda key: self.param_grid_score_[key])

<br>Функция тренировки модели.

In [0]:
import copy
def train_model(model, loss, optimizer, scheduler, n_epochs):
    best_acc = 0
    validation_accuracy = []
    
    for epoch in range(n_epochs):
        print('Epoch {}:'.format(epoch))
        
        for phase in ['train', 'val']:
            if phase == 'train':
                dataloader = train_loader
                model.train()
            else:
                dataloader = val_loader
                model.eval()
                
            batch_loss = 0.
            batch_acc = 0.
            
            for inputs, labels in dataloader:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                optimizer.zero_grad()
                
                with torch.set_grad_enabled(phase == 'train'):
                    preds = model(inputs)
                    loss_val = loss(preds, labels)
                    preds_class = preds.argmax(dim=1)
                    
                    if phase == 'train':
                        loss_val.backward()
                        optimizer.step()
                        scheduler.step()
                        
                batch_loss += loss_val.item()
                batch_acc += (preds_class == labels.data).float().mean().item()
                
            epoch_loss = batch_loss / len(dataloader)
            epoch_acc = batch_acc / len(dataloader)
            
            if phase == 'val':
                validation_accuracy.append(epoch_acc)
                if epoch_acc > best_acc:
                    best_model = copy.deepcopy(model)
                    best_acc = epoch_acc
        
            print('{} Loss: {:.3f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
            
    return model, best_model, best_acc    

In [0]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

<br> Функция потерь - бинарная кросс энтропия, так как задача бинарной классификации
<br> Оптимизатор - стохастический градиентый спуск, так как поддается наиболее тонкой настройке
<br> Планировщик - косинусный планировщик с отжигом, так как поверхность функкции потерь неустойчивая и необходимо побывать как можно в большем количестве локальных минимумов

In [0]:
loss = torch.nn.CrossEntropyLoss()
optimizer_params={'lr': [0.01, 0.1],
                  'momentum': [0.9, 0],
                  'dampening': [0.01],
                  'weight_decay': [0.1, 0.0001, 1],
                  'nesterov': [False]
                 }
scheduler_params = {'T_max': [5, 15]}

model_cv = GridSearch(loss, optimizer=torch.optim.SGD, optimizer_params=optimizer_params,
                     scheduler=torch.optim.lr_scheduler.CosineAnnealingLR, scheduler_params=scheduler_params)

<br>Тренируем модель на разных наборах гиперпараметров.

In [None]:
model_cv.tune(make_inception_resnet_v2)

In [None]:
model_cv.best_acc

<br>Дообучаем лучшую на модель на лучшем наборе гиперпараметров в надежде получить более высокий результат

In [None]:
best_model = copy.deepcopy(model_cv.best_model)

In [0]:
optimizer = torch.optim.SGD(best_model.parameters(), lr=0.09, momentum=0.4, dampening=0.1, weight_decay=0.0001)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=8)

In [None]:
_, final_model, val_acc = train_model(best_model, loss, optimizer, scheduler, n_epochs=30)

## Предсказание
<br>Лучшая модель получена, можно делать предсказание
<br><br>Загружаем тестовый набор данных, немного изменяя библитечный класс datasets.ImageFolder для сохранения пути до картинки, который впоследствии послужит для создания индексов в датафрейме.

In [None]:
import shutil
test_dir = 'test'
data_root = 'gdrive/My Drive/Colab Notebooks/CV_Course/Kaggle/plates/plates'
shutil.copytree(os.path.join(data_root, 'test'), os.path.join('gdrive/My Drive/Colab Notebooks/CV_Course/Kaggle/plates/plates/test', 'unknown'))

In [0]:
class ImageFolderWithPaths(torchvision.datasets.ImageFolder):
    def __getitem__(self, index):
        original_tuple = super(ImageFolderWithPaths, self).__getitem__(index)
        path = self.imgs[index][0]
        tuple_with_path = (original_tuple + (path, ))
        return tuple_with_path

In [0]:
test_dataset = ImageFolderWithPaths('test', val_transforms)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=744,
                                         shuffle=False, num_workers=8)

<br>Функция предсказания

In [0]:
from PIL import Image
def evaluate(model, loader): 
    model.eval()

    test_predictions = []
    test_img_paths = []
    for inputs, _, paths in loader:
        inputs = inputs.to(device)
        with torch.set_grad_enabled(False):
            preds = model(inputs)
        test_predictions.append(torch.nn.functional.softmax(preds, dim=1)[:, 1].data.cpu().numpy())
        test_img_paths.extend(paths)

    test_predictions = np.concatenate(test_predictions)
    return test_predictions, test_img_paths

In [0]:
test_predictions, test_img_paths = evaluate(the_best_model, test_loader)

<br>Посмотрим на полученнные предсказания

In [0]:
inputs, labels, paths = next(iter(test_loader))

In [None]:
for img, pred in zip(inputs, test_predictions):
     show_input(img, title=pred)

<br>Пример предсказаний:
<br>
<img style="float: left;" src="Безымянный2.jpg">
<img  src="Безымянный.jpg">

<br>Формируем файл с предсказаниями для сабмита.

In [0]:
submission_df = pd.DataFrame.from_dict({'id': test_img_paths,
                                        'label': test_predictions})

In [0]:
submission_df['label'] = submission_df['label'].map(lambda pred: 'dirty'
                                                   if pred > 0.5 else 'cleaned')
submission_df['id'] = submission_df['id'].str.replace('test/unknown/', '')
submission_df['id'] = submission_df['id'].str.replace('.jpg', '')
submission_df.set_index('id', inplace=True)

In [None]:
submission_df.head(10)

<img style="float: left;" src="123.jpg">


In [0]:
submission_df.to_csv('submission.csv')

In [0]:
!rm -rf train val test

<br>Сабмитим и получаем... 0.955