### Зависимости

In [None]:
import numpy as np
import pandas as pd

from tqdm import tqdm, tqdm_notebook
from PIL import Image
from pathlib import Path

from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
import torch
import torch.optim as optim

import seaborn as sns
from matplotlib import colors, pyplot as plt
%matplotlib inline

import warnings
warnings.filterwarnings(action='ignore', category=DeprecationWarning)

### Globals

In [None]:
# режимы датасета 
DATA_MODES = ['train', 'val', 'test']
# все изображения масштабируем к размеру 299*299 px
RESCALE_SIZE_INCEPTION = 299
RESCALE_SIZE_RESNET = 224
RESCALE_SIZE = 299
# работаем на видеокарте
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# фиксим рандом
RANDOM_STATE = 42
torch.manual_seed(42)
DIR = '/kaggle/input/shift2023/'
DEVICE

### Предварительный анализ данных

In [None]:
# загружаем метки с названиями файлов
data_labels = pd.read_csv(DIR + 'train.csv')
data_labels.info()

In [None]:
# для удобства меняем тип заблюренности
data_labels[['blur']] = data_labels[['blur']].astype('int')
data_labels.info()

In [None]:
# оценим количественные показатели разных классов
sns.catplot(data=data_labels, x="blur", kind="count")
#sns.displot(data_labels['blur'], stat='percent', discrete=True)
plt.show()

print('\n')
print(f'В датасете четких изображений - {len(data_labels[data_labels.iloc[:,1]== 0])}, размытых - {data_labels.shape[0] - len(data_labels[data_labels.iloc[:,1] == 0])}')

Количество изображений разных классов схоже. Воспользуемся стратификацией. Дополнительно можем не генерировать.

### DataSet

In [None]:
class CastomDataset(Dataset):
    """
    Датасет с картинками, который паралельно подгружает их из папок
    производит скалирование и превращение в торчевые тензоры
    """
    def __init__(self, files, mode):
        super().__init__()
        # список файлов для загрузки
        self.files = sorted(files)
        # режим работы
        self.mode = mode

        if self.mode not in DATA_MODES:
            print(f"{self.mode} is not correct; correct modes: {DATA_MODES}")
            raise NameError

        self.len_ = len(self.files)

        # загружем метки файлов
        if self.mode != 'test':
            self.labels = [np.array(data_labels[data_labels.iloc[:, 0] == path.name].iloc[:,1])[0] \
                           for path in self.files]
                                  
    def __len__(self):
        return self.len_
      
    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image
  
    def __getitem__(self, index):
        # проводим дополнительную обработку изображений 
        # переводим в тензоры и нормализуем
        if self.mode == 'train':
          transform = transforms.Compose([
            transforms.ToPILImage(),
            transforms.RandomVerticalFlip(0.5),
            transforms.RandomHorizontalFlip(0.5),
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
          ])

        if (self.mode == 'val') or (self.mode == 'test'):
          transform = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
          ])


        x = self.load_sample(self.files[index])
        x = self._prepare_sample(x)
        x = transform(x)
        if self.mode == 'test':
            return x
        else:
            y = self.labels[index]
            return x, y
        
    def _prepare_sample(self, image):
        image = image.resize((RESCALE_SIZE, RESCALE_SIZE))
        return np.array(image)

In [None]:
# для просмотра изображений
def imshow(inp, title=None, plt_ax=plt, default=False):
    """Imshow для тензоров"""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt_ax.imshow(inp)
    if title is not None:
        plt_ax.set_title(title)
    plt_ax.grid(False)

In [None]:
# пути нахождения train и test
TRAIN_DIR = Path(DIR + 'train/train')
TEST_DIR = Path(DIR + 'test/test')

train_val_files = list(TRAIN_DIR.rglob('*.jpg'))
test_files = list(TEST_DIR.rglob('*.jpg'))

### Деление выборки перед обучением

In [None]:
from sklearn.model_selection import train_test_split

train_val_labels = [np.array(data_labels[data_labels.iloc[:, 0] == path.name].iloc[:,1])[0] for path in train_val_files]
train_files, val_files = train_test_split(train_val_files, test_size=0.25, stratify=train_val_labels, random_state=RANDOM_STATE)

In [None]:
train_dataset = CastomDataset(train_files, mode='train')

val_dataset = CastomDataset(val_files, mode='val')

### Просмотр изображений

In [None]:
fig, ax = plt.subplots(nrows=3, ncols=3,figsize=(12, 12), \
                        sharey=True, sharex=True)
for fig_x in ax.flatten():
    random_characters = int(np.random.uniform(0,500))
    im_val, label = val_dataset[random_characters]
    img_label = val_dataset.labels[random_characters]
    imshow(im_val.data.cpu(), \
          title=img_label,plt_ax=fig_x)

### Обучение

Возьмем предобученную модель из PyTorch.

In [None]:
def train(train_files, val_files, model, epochs, batch_size):
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    history = [] # сохраняем данные о loss и accuracy для  train и val
    best_val_loss = 1
    best_val_acc = 0
    log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

    with tqdm(desc="epoch", total=epochs) as pbar_outer:

        # Generate the optimizers.
        
        
        optimizer = torch.optim.AdamW(model.parameters(), lr=3e-4)
        
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=optimizer, gamma=0.5)
        
        
        criterion = nn.CrossEntropyLoss()
        
        for epoch in range(epochs):
            train_loss, train_acc = fit_epoch(model, train_loader, criterion, optimizer)
            print("loss", train_loss)
            
            val_loss, val_acc = eval_epoch(model, val_loader, criterion)
            
            scheduler.step()
            
            # если loss и acc на val, улучшили показатели, сохраняем модель,
            # для будущих предсказаний
            if best_val_loss >= val_loss and best_val_acc <= val_acc:
              best_val_loss = val_loss
              best_val_acc = val_acc
              torch.save(model.state_dict(), 'best_model.pth')
              print(f"\n\nSave model's completed on {epoch+1} epoch's")

            history.append((train_loss, train_acc, val_loss, val_acc))
            
            pbar_outer.update(1)
            tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,\
                                           v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
               
    return [history, best_val_loss, best_val_acc]

In [None]:
def fit_epoch(model, train_loader, criterion, optimizer):
    running_loss = 0.0
    running_corrects = 0
    processed_data = 0
  
    for inputs, labels in train_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        preds = torch.argmax(outputs, 1)
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_data += inputs.size(0)
    
    train_loss = running_loss / processed_data
    train_acc = running_corrects.cpu().numpy() / processed_data
    return train_loss, train_acc

In [None]:
def eval_epoch(model, val_loader, criterion):
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    processed_size = 0

    for inputs, labels in val_loader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            preds = torch.argmax(outputs, 1)

        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_size += inputs.size(0)
    val_loss = running_loss / processed_size
    val_acc = running_corrects.double() / processed_size
    return val_loss, val_acc

In [None]:
def predict(model, test_loader):
    with torch.no_grad():
        logits = []
    
        for inputs in test_loader:
            inputs = inputs.to(DEVICE)
            model.eval()
            outputs = model(inputs).cpu()
            logits.append(outputs)
            
    probs = nn.functional.softmax(torch.cat(logits), dim=-1).numpy()
    return probs

#### Pretrained Inception_v3

In [None]:
model_inception_v3 = models.inception_v3(pretrained=True)
model_inception_v3

In [None]:
# num_features - размерность вектора фич, поступающего на вход FC
num_features = 2048
# n_classes - количество классов, которые будет предсказывать наша модель
n_classes = 2
# Заменяем Fully-Connected слой на наш линейный классификатор
model_inception_v3.fc = nn.Linear(in_features=num_features, out_features=n_classes)
model_inception_v3.AuxLogits.fc = nn.Linear(768, 2)

In [None]:
# перенос модели в cuda
model_inception_v3.to(DEVICE)

In [None]:
model_inception_v3.aux_logits = False

##### Обучение модели

In [None]:
epochs = 25

In [None]:
%%time
history_inception_v3 = train(train_dataset, val_dataset, model=model_inception_v3, epochs=epochs, batch_size=32)

##### Best results

In [None]:
print(f'best_loss = {history_inception_v3[1]}, best_acc = {history_inception_v3[2]}')

##### Кривые обучения

In [None]:
loss, acc, val_loss, val_acc = zip(*history_inception_v3[0])

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

##### Best model

In [None]:
model_inception_v3.load_state_dict(torch.load('best_model.pth'))

##### Predict_val

In [None]:
idxs = list(map(int, np.random.uniform(0,650, 20))) # индексы  20 рандомных изображений
imgs = [val_dataset[id][0].unsqueeze(0) for id in idxs] # изображения

# вероятности предсказаний к определенному классу
probs_ims = predict(model_inception_v3, imgs) 

In [None]:
probs_img = [np.round(prob[1], 1) for prob in probs_ims]

In [None]:
y_pred = np.argmax(probs_ims,-1)

actual_labels = [val_dataset[id][1] for id in idxs]

In [None]:
preds_class = [i for i in y_pred]

##### ROC_AUC

In [None]:
from sklearn.metrics import roc_auc_score

roc_auc_inception_v3 = roc_auc_score(actual_labels, preds_class, average='weighted')
np.round(roc_auc_inception_v3,2)

#### Pretrained ResNet18

In [None]:
model_resnet18 = models.resnet18(pretrained=True)

In [None]:
model_resnet18

In [None]:
# num_features - размерность вектора фич, поступающего на вход FC
num_features = 512
# n_classes - количество классов, которые будет предсказывать наша модель
n_classes = 2

# Заменяем Fully-Connected 
# решил усложнить жизнь модели Dropout'ом - 
#ощутимого прироста на этом датасете не вышло
 
model_resnet18.fc = nn.Sequential(
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(num_features, n_classes)
)

In [None]:
model_resnet18.to(DEVICE)

##### Обучение модели

In [None]:
%%time
history_resnet18 = train(train_dataset, val_dataset, model=model_resnet18, epochs=epochs, batch_size=32)

##### Best results

In [None]:
print(f'best_loss = {history_resnet18[1]}, best_acc = {history_resnet18[2]}')

##### Кривые обучения

In [None]:
loss, acc, val_loss, val_acc = zip(*history_resnet18[0])

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

##### Best model

In [None]:
model_resnet18.load_state_dict(torch.load('best_model.pth'))

##### Predict_val

In [None]:
idxs = list(map(int, np.random.uniform(0,650, 20))) # индексы  20 рандомных изображений
imgs = [val_dataset[id][0].unsqueeze(0) for id in idxs] # изображения

# вероятности предсказаний к определенному классу
probs_ims = predict(model_resnet18, imgs)

In [None]:
probs_img = [np.round(prob[1], 1) for prob in probs_ims]

In [None]:
y_pred = np.argmax(probs_ims,-1)

actual_labels = [val_dataset[id][1] for id in idxs]

In [None]:
preds_class = [i for i in y_pred]

##### ROC_AUC

In [None]:
from sklearn.metrics import roc_auc_score

roc_auc_resnet18 = roc_auc_score(actual_labels, preds_class, average='weighted')
roc_auc_resnet18

#### Pretrained DenseNet

In [None]:
model_dense = models.densenet121(pretrained=True)

In [None]:
model_dense

In [None]:
# num_features - размерность вектора фич, поступающего на вход FC
num_features = 1024
# n_classes - количество классов, которые будет предсказывать наша модель
n_classes = 2
# Заменяем Fully-Connected слой на наш линейный классификатор
model_dense.classifier = nn.Sequential(
    nn.Dropout(p=0.5, inplace=False),
    nn.Linear(num_features, n_classes)
)

In [None]:
model_dense.to(DEVICE)

##### Обучение модели Dense NET

In [None]:
%%time
history_dense = train(train_dataset, val_dataset, model=model_dense, epochs=epochs, batch_size=32)

##### Best results

In [None]:
print(f'best_loss = {history_dense[1]}, best_acc = {history_dense[2]}')

##### Кривые обучения

In [None]:
loss, acc, val_loss, val_acc = zip(*history_dense[0])

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

##### Best model

In [None]:
model_dense.load_state_dict(torch.load('best_model.pth'))

##### Predict_val

In [None]:
idxs = list(map(int, np.random.uniform(0,650, 20))) # индексы  20 рандомных изображений
imgs = [val_dataset[id][0].unsqueeze(0) for id in idxs] # изображения

# вероятности предсказаний к определенному классу
probs_ims = predict(model_dense, imgs)

In [None]:
probs_img = [np.round(prob[1], 1) for prob in probs_ims]

In [None]:
y_pred = np.argmax(probs_ims,-1)

actual_labels = [val_dataset[id][1] for id in idxs]

In [None]:
preds_class = [i for i in y_pred]

##### ROC_AUC

In [None]:
from sklearn.metrics import roc_auc_score

roc_auc_dense = roc_auc_score(actual_labels, preds_class, average='weighted')
roc_auc_dense

### Inception_V3 VS ResNet18 VS DenseNet

In [None]:
print(f'Inception_V3 - best_loss = {history_inception_v3[1]}, best_acc = {history_inception_v3[2]}, roc_auc_score = {roc_auc_inception_v3}')
print()
print(f'ResNet18 - best_loss = {history_resnet18[1]}, best_acc = {history_resnet18[2]}, roc_auc_score = {roc_auc_resnet18}')
print()
print(f'DenseNet - best_loss = {history_dense[1]}, best_acc = {history_dense[2]}, roc_auc_score = {roc_auc_dense}')

### Submit на Kaggle

In [None]:
test_dataset = CastomDataset(test_files, mode="test")
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=32)

probs_inception_v3 = predict(model_inception_v3, test_loader)
probs_resnet18 = predict(model_resnet18, test_loader)
probs_dense = predict(model_dense, test_loader)

preds_inception_v3 = [np.round(prob[1], 1) for prob in probs_inception_v3]
preds_resnet18 = [np.round(prob[1], 1) for prob in probs_resnet18]
preds_dense = [np.round(prob[1], 1) for prob in probs_dense]

test_filenames = [path.name for path in test_dataset.files]

In [None]:
import pandas as pd

submit_inception_v3 = pd.DataFrame({'filename': test_filenames, 'blur': preds_inception_v3})
submit_inception_v3.head()

In [None]:
submit_resnet18 = pd.DataFrame({'filename': test_filenames, 'blur': preds_resnet18})
submit_resnet18.head()

In [None]:
submit_dense = pd.DataFrame({'filename': test_filenames, 'blur': preds_dense})
submit_dense.head()

In [None]:
submit_inception_v3.to_csv('/kaggle/working/submit_inception_v3.csv', index=False)
submit_resnet18.to_csv('/kaggle/working/submit_resnet18.csv', index=False)
submit_dense.to_csv('/kaggle/working/submit_dense.csv', index=False)