In [17]:
import numpy as np
import torch
import os
import time
import random

from torch import nn
from torch.utils.data import DataLoader, Dataset
from torchvision.models import efficientnet_v2_s, EfficientNet_V2_S_Weights
from typing import Optional, Callable
from PIL import Image

random.seed(123)
np.random.seed(123)

Определим класс датасета.

In [18]:
# Задача класса - сформировать список путей к файлам изображений и соответствующих меток, а затем разделить этот список на тренировочный и тестовый датасет
class FruitDatasetMetadata():

    def __init__(self,
                 root='./data/fruit-recognition',
                 test_size=0.25,
                 dirs_to_exclude={'Total Number of Apples', 'Total Number of Kiwi fruit', 'Guava total', 'guava total final'}) -> None:

        super().__init__()

        self.img_with_labels_train = []
        self.img_with_labels_test = []
        self.n_labels = 0

        # массив массивов вида [<label>, [<image 1, image 2, ..., image N>]], где
        # label - метка класса (число), а image N - путь к файлу изображения
        img_grouped_by_labels = []

        # Рекурсивно обходим все директории и ищем в них файлы изображений.
        # Каждая директория с изображениями - отдельная категория.
        self._read_img_recurcive(root, dirs_to_exclude, img_grouped_by_labels)

        # делим загруженные данные на тренировочный и тестовый датасеты
        for label_with_images in img_grouped_by_labels:
            label = label_with_images[0]
            images = label_with_images[1]
            test_len = int(len(images) * test_size)
            test_images = images[:test_len]
            train_images = images[test_len:]
            for img in test_images:
                self.img_with_labels_test.append((img, label))
            for img in train_images:
                self.img_with_labels_train.append((img, label))

    def _read_img_recurcive(self, dir_path: str, dirs_to_exclude: set, img_grouped_by_labels: list):
        images = []
        with os.scandir(dir_path) as it:
            for entry in it:
                if entry.is_file():
                    if entry.name.endswith(('.png', '.jpg', '.jpeg')):
                        images.append(entry.path)
                    else:
                        print(f'File {entry.name} is not an image.')
                elif entry.is_dir():
                    if entry.name not in dirs_to_exclude:
                        # если директория, то смотрим что внутри
                        self._read_img_recurcive(entry.path, dirs_to_exclude, img_grouped_by_labels)
        if images:
            # метки являются числами, начинающиеся от 0
            label = self.n_labels
            img_grouped_by_labels.append((label, images))
            self.n_labels += 1


class FruitDataset(Dataset):

    def __init__(self,
                 img_with_labels: list,
                 transform: Optional[Callable]=None) -> None:

        super().__init__()

        self._img_with_labels = img_with_labels
        self._transform = transform

    def __len__(self):
        return len(self._img_with_labels)

    def __getitem__(self, idx):
        row = self._img_with_labels[idx]
        img_path = row[0]
        label = row[1]

        with Image.open(img_path) as img:
            img.load()
            if self._transform:
                img = self._transform(img)
            return img, label

In [19]:
dataset_metadata = FruitDatasetMetadata()

transform = EfficientNet_V2_S_Weights.IMAGENET1K_V1.transforms()
train_dataset = FruitDataset(img_with_labels=dataset_metadata.img_with_labels_train, transform=transform)
test_dataset = FruitDataset(img_with_labels=dataset_metadata.img_with_labels_test, transform=transform)

print(f'Train size: {len(train_dataset)}\nTest size: {len(test_dataset)}')

train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=64, shuffle=True)

Train size: 33601
Test size: 11191


In [20]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f'Device: {device}')

# создаем модель
weights = EfficientNet_V2_S_Weights.IMAGENET1K_V1
model = efficientnet_v2_s(weights=weights)
model.to(device)

# мы планируем дообучить последний слой модели, поэтому выключаем градиенты, чтобы веса сети не менялись при обучении
for param in model.parameters():
    param.requires_grad = False

# подстраиваем последний слой под наши данные
model.classifier = nn.Sequential(
    model.classifier[0],
    nn.Linear(model.classifier[1].in_features, dataset_metadata.n_labels)
)
model.classifier = model.classifier.cuda()

Device: cuda


Downloading: "https://download.pytorch.org/models/efficientnet_v2_s-dd5fe13b.pth" to /root/.cache/torch/hub/checkpoints/efficientnet_v2_s-dd5fe13b.pth
100%|██████████| 82.7M/82.7M [00:00<00:00, 153MB/s]


Функции train() и test() взяты из материалов лекции.

In [21]:
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        pred = model(X)
        loss = loss_fn(pred, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(X)
            print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


def test(dataloader, model, loss_fn, verbose=True, iterations=None):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0

    with torch.no_grad():
        for i, (X, y) in enumerate(dataloader):
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
            if iterations is not None and i >= iterations:
                break

    test_loss /= num_batches
    correct /= size
    if verbose:
        print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [22]:
epochs = 3
batch_size = 64
learning_rate = 1e-3

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, betas=(0.9, 0.999))

for t in range(epochs):
    start_time = time.time()
    print(f"Эпоха {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn)
    end_time = time.time()
    print(f'Время: {int(end_time - start_time)} sec')
print("Обучение завершено.")

Эпоха 1
-------------------------------
loss: 3.150638  [    0/33601]
loss: 0.814392  [ 6400/33601]
loss: 0.540098  [12800/33601]
loss: 0.339074  [19200/33601]
loss: 0.255522  [25600/33601]
loss: 0.150367  [32000/33601]
Test Error: 
 Accuracy: 97.8%, Avg loss: 0.295674 

Время: 716 sec
Эпоха 2
-------------------------------
loss: 0.169050  [    0/33601]
loss: 0.142830  [ 6400/33601]
loss: 0.182798  [12800/33601]
loss: 0.188642  [19200/33601]
loss: 0.140549  [25600/33601]
loss: 0.142162  [32000/33601]
Test Error: 
 Accuracy: 98.7%, Avg loss: 0.164353 

Время: 702 sec
Эпоха 3
-------------------------------
loss: 0.118705  [    0/33601]
loss: 0.179102  [ 6400/33601]
loss: 0.182101  [12800/33601]
loss: 0.112246  [19200/33601]
loss: 0.087596  [25600/33601]
loss: 0.065151  [32000/33601]
Test Error: 
 Accuracy: 99.1%, Avg loss: 0.138534 

Время: 677 sec
Обучение завершено.
