<a href="https://colab.research.google.com/github/polevev/kaggle/blob/main/classifyier_(3).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Демо-экзамен по курсу "Прикладной анализ данных". Вариант 2

Выполнил: Маркович Лев Иванович, С22-712

НИЯУ МИФИ

Вставьте ниже скриншот с результатом вашего решения в Leaderboard на Kaggle: https://www.kaggle.com/competitions/lamoda-images-classification/data

![photo_2025-07-17_17-48-21.jpg](attachment:f14e41de-1bc4-47ee-90ef-46bc6cea09c6.jpg)

![image.png](attachment:5aa652ba-c244-485b-bae3-b7d49b9a0030.png)

## 1. Описание задачи и данные

В рамках данного задания вам предстоит разработать алгоритм, который автоматически классифицирует изображения предметов одежды на две категории:

- bryuki (брюки),
- bluzy (блузки).

Каждое изображение содержит один предмет одежды на однотонном фоне. Цель — по изображению предсказать, к какому классу принадлежит предмет.

Ссылка на соревнование и на данные: https://www.kaggle.com/competitions/lamoda-images-classification/data

In [None]:
# Import libraries
# TODO

import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
# Пропишите дополнительные библиотеки, которые потребуются для решения
import torch
import numpy as np
import pickle
import numpy as np
from skimage import io

from tqdm import tqdm, tqdm_notebook
from PIL import Image
from pathlib import Path

from torchvision import transforms
from multiprocessing.pool import ThreadPool
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import Dataset, DataLoader
import torch.nn as nn
from torchvision import models

from matplotlib import colors, pyplot as plt
import torchvision.transforms as tfs

In [None]:
# установим библиотеку для работы с датасетами на kaggle
!pip install opendatasets --quiet

Небольшая инструкция как напрямую скачать данные с kaggle в Colab ноутбук: https://www.geeksforgeeks.org/python/how-to-import-kaggle-datasets-directly-into-google-colab/

In [None]:
import opendatasets as od
import pandas as pd

url = 'https://www.kaggle.com/competitions/lamoda-images-classification/'
od.download(url)

## 1. Провести предварительное исследование данных

- сколько у вас есть изображений? Все ли они одного разрешения? Какой баланс классов в данных?
- какие есть особенности датасета?
- если необходимо произвести очистку и предварительную обработку данных.

In [None]:
#определим директории с тренировочными и тестовыми файлами
TRAIN_DIR = Path('/kaggle/working/lamoda-images-classification/images/train')
TEST_DIR = Path('/kaggle/working/lamoda-images-classification/images/test')

train_files=sorted(list(TRAIN_DIR.rglob("*.jpg")))
test_files=sorted(list(TEST_DIR.rglob("*.jpg")))

In [None]:
train_files[0], test_files[0]

In [None]:
len(train_files), len(test_files)

In [None]:
#В тестовых файлах помимо картинки идет еще название -> вытащим его отдельно
train_labels = [path.stem.split("_")[-1] for path in train_files]
train_labels

In [None]:
print(f'Число изображений для обучения и валидации: {len(train_files)}')
print(f'Число изображений для финальной проверки работы модели: {len(test_files)}')

train_sizes = [Image.open(file).size for file in train_files]
test_sizes = [Image.open(file).size for file in test_files]

unique_train_sizes = set(train_sizes)
unique_test_sizes = set(test_sizes)

print("Уникальные размеры train:", unique_train_sizes)
print("Уникальные размеры test:", unique_test_sizes)

In [None]:
counts = pd.Series(train_labels).value_counts()
counts

In [None]:
#В целом balanced
plt.figure(figsize=(8,8))
plt.barh(counts.keys(), counts.sort_values(ascending=True))
plt.title('Train files characters distribution')
plt.xlabel('Count')
plt.ylabel('Characters')
plt.show()

Особенности: 2 класса, неплохо сбалансированы, но есть 2 уникальных разрешения -> необходимо привести к одному.

In [None]:
def is_image_valid(path):
    try:
        img = Image.open(path)  # не загружает, но проверяет корректность
        return True
    except:
        return False

valid_train_files = [p for p in train_files if is_image_valid(p)]
print(f"Удалено {len(train_files) - len(valid_train_files)} поврежденных изображений")

Все файлы открываются

## 2. Подготовить данные для обучения

- Создать тренировочную и тестовую подвыборки для обучения и тестирования работы моделей
- Создать класс на фреймворке `PyTorch` для создания датасета
- Создать `loader` для передачи данных в модель нейронной сети

In [None]:
class MyDataset(Dataset):
    # def __init__ - конструктор класс
    def __init__(self, files, mode, label_encoder=None, labels=None):
        super().__init__()
        # список файлов для загрузки
        self.files = files
        self.labels = labels
        # режим работы
        self.mode = mode

        if self.mode not in ['train', 'val', 'test']:
            raise ValueError(f"Invalid mode {self.mode}. Use 'train', 'val' or 'test'")

        # Усиленная аугментация для тренировки
        self.train_transform = tfs.Compose([
            transforms.Lambda(lambda img: img.convert("RGB")),
            tfs.Resize(299),
            tfs.RandomCrop(299),
            tfs.RandomAffine(degrees=15, shear=10, scale=(0.8, 1.2)),
            tfs.ColorJitter(brightness=0.3, contrast=0.3, saturation=0.3),
            tfs.GaussianBlur(kernel_size=(3,3), sigma=(0.1, 0.5)),
            tfs.RandomPerspective(distortion_scale=0.2, p=0.5),
            tfs.RandomHorizontalFlip(),
            tfs.ToTensor(),
            tfs.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            tfs.RandomErasing(p=0.5, scale=(0.02, 0.1))
        ])
        # Для валидации только изменяем саму картинку
        self.val_transform = tfs.Compose([
            transforms.Lambda(lambda img: img.convert("RGB")),
            tfs.Resize(299),
            tfs.CenterCrop(299),
            tfs.ToTensor(),
            tfs.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
        ])

        self.len_ = len(self.files)

        #Вытаскиваем лейблы
        if label_encoder is None and mode != 'test':
            self.label_encoder = LabelEncoder()
            self.all_labels = [path.stem.split("_")[-1] for path in files]
            self.label_encoder.fit(self.all_labels)
        else:
            self.label_encoder = label_encoder

            with open('label_encoder.pkl', 'wb') as le_dump_file:
                  pickle.dump(self.label_encoder, le_dump_file)

    def __len__(self):
        return self.len_
    #Для открытия изображений
    def load_sample(self, file):
        image = Image.open(file)
        image.load()
        return image

    def __getitem__(self, index):
        file = self.files[index]
        image = self.load_sample(file)

        if self.mode == 'train':
          image = self.train_transform(image)
        else:
          image = self.val_transform(image)
        #Для теста вообще не изменяем картинки
        if self.mode == 'test':
            return image

        label = self.label_encoder.transform([self.labels[index]])[0]
        return image, label

In [None]:
all_labels = [path.stem.split("_")[-1] for path in train_files]

# Создаем и обучаем encoder
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

In [None]:
from sklearn.model_selection import train_test_split
#Разделим на подвыборки и стратифицируем по лейблам, важно сохранить соответствие лейблов картинкам
files_with_labels = [(path, path.stem.split("_")[-1]) for path in train_files]
train_files_split, val_files_split = train_test_split(
    files_with_labels,
    test_size=0.25,
    stratify=[label for (_, label) in files_with_labels]
)

In [None]:
# Разделяем обратно на файлы и метки
train_files = [path for (path, _) in train_files_split]
val_files = [path for (path, _) in val_files_split]
train_labels = [label for (_, label) in train_files_split]
val_labels = [label for (_, label) in val_files_split]

In [None]:
len(train_files), len(val_files)

In [None]:
train_files[49].name, train_labels[49]

In [None]:
train_dataset = MyDataset(train_files, mode='train', label_encoder=label_encoder, labels=train_labels)
test_dataset = MyDataset(test_files, mode='test')
val_dataset = MyDataset(val_files, mode='val', label_encoder=label_encoder, labels=val_labels)

In [None]:
for i in range(5):
    print(f"Файл: {val_dataset.files[i].name}, Метка: {val_dataset[i][1]}")

In [None]:
def imshow(inp, title=None, plt_ax=plt, default=False):
    """Imshow для тензоров"""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt_ax.imshow(inp)
    if title is not None:
        plt_ax.set_title(title)
    plt_ax.grid(False)

In [None]:
#Посмотрим что получилось

fig, ax = plt.subplots(nrows=3, ncols=3,figsize=(12, 12))

for fig_x in ax.flatten():
    idx = np.random.randint(0, len(val_dataset))
    #Возьмем рандомные изображения из val
    im_val, label = val_dataset[idx]
    #Возьмем их лейблы, переформатируем
    img_label = val_dataset.label_encoder.classes_[label]
    #Отрисуем с помощью вспомогательной функции
    imshow(im_val, title=img_label, plt_ax=fig_x)

## 3. Реализация модели

- создать класс для модели
- создать функцию для обучения модели
- создать функцию для валидации модели

In [None]:
#Для начала сами напишем простую модель, здесь только Conv слои, ReLU и макс пулинги, на вход подается RGB изображение размерности (3, H, W)
#На выходе используется полносвязный слой для классификации на наше количество классов(2)
class RSNAModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=8, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=8, out_channels=16, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv4 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )
        self.conv5 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=96, kernel_size=3),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2)
        )

        self.out = nn.Linear(3456, 1)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)

        x = x.view(x.size(0), -1)
        logits = self.out(x)
        return logits

In [None]:
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

DEVICE = torch.device("cuda")

In [None]:
def train_one_epoch(model, train_dataloader, optimizer, criterion):
    '''
    Эта функция проходит по всем данным в `train_dataloader`, выполняет шаг моделью (forward),
    вычисляет лосс, выполняет обратное распространение с помощью заданного
    оптимизатора и обновляет веса модели.
    В конце эпохи возвращаются средний лосс и
    точность на обучающем наборе данных.
    '''

    running_loss = 0.0
    running_corrects = 0
    processed_data = 0

    for inputs, labels in train_dataloader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)
        optimizer.zero_grad()

        outputs = model(inputs)
        outputs = outputs.view(-1)
        loss = criterion(outputs, labels.float())
        loss.backward()
        optimizer.step()
        probs = torch.sigmoid(outputs)
        preds = (probs > 0.5).long()
        running_loss += loss.item()*inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
        processed_data += inputs.size(0)

    train_loss = running_loss/processed_data
    train_acc = running_corrects.cpu().numpy() / processed_data
    return train_loss, train_acc

def valid_one_epoch(model, valid_dataloader, criterion):
    '''
    Эта функция проходит по всем данным в `val_loader`, выполняет прямое
    распространение, вычисляет лосс и точность модели. В конце эпохи
    возвращаются средний лосс и точность на валидационном наборе данных.
    '''
    model.eval()
    running_loss = 0.0
    running_corrects = 0
    processed_size = 0

    for inputs, labels in valid_dataloader:
        inputs = inputs.to(DEVICE)
        labels = labels.to(DEVICE)

        #Не вычисляем градиенты на val
        with torch.set_grad_enabled(False):
            outputs = model(inputs)
            outputs = outputs.view(-1)
            loss = criterion(outputs, labels.float())
            probs = torch.sigmoid(outputs)
            preds = (probs > 0.5).long()

        running_loss += loss.item() * inputs.size(0)
        #Считаем корректно предсказанные
        running_corrects += torch.sum(preds == labels.data)
        processed_size += inputs.size(0)
    val_loss = running_loss/processed_size
    val_acc = running_corrects.cpu().numpy()/processed_size
    return val_loss, val_acc

## 4. Обучение модели
*   Определить необходимые параметры для обучения модели (learning rate, optimizer, loss function, etc)
*   Реализовать процесс обучения модели
*   Оценить эффективность работы модели



In [None]:
def init_weights(m):
    if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
        nn.init.kaiming_normal_(m.weight)
        if m.bias is not None: nn.init.zeros_(m.bias)

In [None]:
#Сделаем функции для тренировки и обучения модели на заданном количестве эпох
def train(train_files, val_files, model, epochs, batch_size):
  '''
  Эта функция создает загрузчики данных для обучающего и валидационного наборов,
  а затем выполняет обучение модели в течение заданного количества эпох.
  В конце каждой эпохи выводятся значения потерь и точности для обучающего и
  валидационного наборов. История обучения сохраняется и возвращается.
  '''
  #shuffle используется только при тренировке
  train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
  val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

  history = []
  log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_acc {t_acc:0.4f} val_acc {v_acc:0.4f}"

  # Инициализация весов
  model.apply(init_weights)
  opt = torch.optim.AdamW(model.parameters(), lr=1e-4, weight_decay=1e-5)
  #Шедулер для лучшего обучения
  scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, 'min', patience=3)
  criterion = nn.BCEWithLogitsLoss()

  best_acc = 0.0
  early_stop_patience = 5
  no_improve = 0

  #Используем AdamW который чуть умнее обычного Adam и бинарную кросс энтропию для задачи бинарной классификации
  with tqdm(desc="epoch", total=epochs) as pbar_outer:
        for epoch in range(epochs):
            model.train()
            train_loss, train_acc = train_one_epoch(model, train_loader, opt, criterion)

            model.eval()
            val_loss, val_acc = valid_one_epoch(model, val_loader, criterion)

            scheduler.step(val_loss)

            # Ранняя остановка
            if val_acc > best_acc:
                best_acc = val_acc
                no_improve = 0
                torch.save(model.state_dict(), 'best_model.pth')
            else:
                no_improve += 1
                if no_improve >= early_stop_patience:
                    print(f"\nEarly stopping at epoch {epoch+1}")
                    break

            history.append((train_loss, train_acc, val_loss, val_acc))
            pbar_outer.update(1)
            tqdm.write(log_template.format(ep=epoch+1, t_loss=train_loss,
                                         v_loss=val_loss, t_acc=train_acc, v_acc=val_acc))
  return history

In [None]:
def predict(model, test_loader):
  '''
  Эта функция принимает модель и загрузчик тестовых данных, применяет модель к
  изображениям и возвращает вероятности предсказанных классов.
  '''

  with torch.no_grad():
    logits = []

    model.eval()
    for inputs in test_loader:
        inputs = inputs.to(DEVICE)
        outputs = model(inputs)
        outputs = outputs.squeeze(1)
        logits.append(outputs.cpu())
  #Возвращаем массив предсказанных классов
  all_output = torch.cat(logits, dim=0)
  probs = torch.sigmoid(all_output)
  preds = (probs > 0.5).long()
  return preds

In [None]:
simple_cnn = RSNAModel().to(DEVICE)
simple_cnn

In [None]:
history = train(train_dataset, val_dataset, model=simple_cnn, epochs=20, batch_size=64)

In [None]:
loss, acc, val_loss, val_acc = zip(*history)

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

In [None]:
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16)
probs = predict(simple_cnn, test_loader)

In [None]:
probs

In [None]:
label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

In [None]:
preds = label_encoder.inverse_transform(probs)
test_filenames = [path.name for path in test_dataset.files]

In [None]:
preds

In [None]:
test_filenames

In [None]:
pd.Series(preds).value_counts()

In [None]:
test = pd.DataFrame({
    "index": test_filenames,
    "label": preds
})

In [None]:
test.to_csv('simp_submission.csv', index = False)

![image.png](attachment:7bc345d5-7e2b-447d-bb90-2592d917bc7f.png)

# Отлично предсказывает даже такая простая модель

## 5. Экспериментальная часть

- протестировать не менее 3х разных моделей для решения задачи
- сравнить их эффективность (по метрикам и потерям)
- сделать выводы

In [None]:
#Попробуем реснет
model = models.resnet18(pretrained=True)

In [None]:
#Модель достаточно умнее предыдущей, добавлены батчнормы, даунсемплы, Adaptive average pooling
model

In [None]:
model.fc = nn.Sequential(
    nn.Linear(model.fc.in_features, 512),
    nn.BatchNorm1d(512),
    nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(512, 1)
)

In [None]:
res_model = model.to(DEVICE)

In [None]:
history = train(train_dataset, val_dataset, model=res_model, epochs=20, batch_size=64)

In [None]:
loss, acc, val_loss, val_acc = zip(*history)

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

In [None]:
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16)
probs_res = predict(res_model, test_loader)

In [None]:
label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

In [None]:
preds_res = label_encoder.inverse_transform(probs_res)
test_filenames = [path.name for path in test_dataset.files]

In [None]:
preds_res

In [None]:
pd.Series(preds_res).value_counts()

In [None]:
test_res = pd.DataFrame({
    "index": test_filenames,
    "label": preds_res
})

In [None]:
test_res.to_csv('res_submission.csv', index = False)

![image.png](attachment:b16092bb-043e-433e-849e-9c1ed09ee8a3.png)

# Score еще возрос, что ожидаемо от более сложной модели

In [None]:
from torchvision.models import inception_v3, Inception_V3_Weights
#Попробуем еще архитектуру inception
weights = Inception_V3_Weights.DEFAULT
model = inception_v3(weights=weights, aux_logits=True)
model.aux_logits = False
model

In [None]:
model.fc = nn.Linear(model.fc.in_features, 1)

In [None]:
inception = model.to(DEVICE)

In [None]:
history = train(train_dataset, val_dataset, model=inception, epochs=20, batch_size=64)

In [None]:
loss, acc, val_loss, val_acc = zip(*history)

In [None]:
plt.figure(figsize=(15, 9))
plt.plot(loss, label="train_loss")
plt.plot(val_loss, label="val_loss")
plt.legend(loc='best')
plt.xlabel("epochs")
plt.ylabel("loss")
plt.show()

In [None]:
test_loader = DataLoader(test_dataset, shuffle=False, batch_size=16)
probs_ins = predict(inception, test_loader)

In [None]:
label_encoder = pickle.load(open("label_encoder.pkl", 'rb'))

In [None]:
preds_ins = label_encoder.inverse_transform(probs_ins)
test_filenames = [path.name for path in test_dataset.files]

In [None]:
pd.Series(preds_ins).value_counts()

In [None]:
test_ins = pd.DataFrame({
    "index": test_filenames,
    "label": preds_ins
})

In [None]:
test_ins.to_csv('ins_submission.csv', index = False)

![image.png](attachment:0b25f292-3c9f-41ac-ba64-8e899d56b2ef.png)

Немного хуже, но разница в 2 тысячных невелика

# Итого - 3 разные модели, в целом все справились неплохо, но resnet18 все же справилась лучше по accuracy, но это не значит, что она лучшая во всём, поэтому можно сказать что все модели предсказывают отлично.