# Базовое решение задачи

## Импорт

In [None]:
import os
from pathlib import Path
from PIL import Image
import numpy as np
import pandas as pd
import random
import time
import matplotlib.pyplot as plt

from sklearn.preprocessing import LabelEncoder

In [None]:
import torch
from torch.utils.data import Dataset
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [None]:
import torchvision
from torchvision import transforms

In [None]:
# from torchinfo import summary
from tqdm.notebook import tqdm, trange

In [None]:
torch.cuda.device_count()

In [None]:
def torch_stats(): 
    torch_version = ".".join(torch.__version__.split(".")[:2])
    print('torch version:',torch_version)
    
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print('Using device:', device)
    dtype = torch.float32
    
    if device.type == 'cuda':
        cuda_version  = torch.__version__.split("+")[-1]
        print("cuda: ", cuda_version)
        
        torch.set_default_tensor_type(torch.cuda.FloatTensor)
        print('Cuda is available:',torch.cuda.is_available())

        n_devices = torch.cuda.device_count()
        print('number of devices: %d'%(n_devices))
        if n_devices > 0:
            device = 'cuda:0'
        for cnt_device in range(n_devices):
            print(torch.cuda.get_device_name(cnt_device))
            print('Memory Usage:')
            print('Allocated:', round(torch.cuda.memory_allocated(cnt_device)/1024**3,1), 'GB')
            print('Cached:   ', round(torch.cuda.memory_reserved(cnt_device)/1024**3,1), 'GB')


    torch.set_default_dtype(dtype) # float32
    print('default data type:',dtype)
    
    num_workers=os.cpu_count()
    print ('available number of workers:',num_workers)
    
    return device, dtype, num_workers
#-------------------------------
def torch_seed(seed = 42, deterministic = True):
    random.seed(seed) # random and transforms
    np.random.seed(seed) #numpy
    torch.manual_seed(seed) #cpu
    torch.cuda.manual_seed(seed) #gpu
    torch.backends.cudnn.deterministic=deterministic #cudnn 

In [None]:
device, dtype, num_workers = torch_stats()
torch_seed(seed = 42, deterministic = True)

## Загрузка набора данны 

In [None]:
from torchvision.datasets.utils import download_and_extract_archive

In [None]:
# url = 'https://github.com/MVRonkin/Deep-Learning-Foundation-Course/raw/main/2024Light/ContestDataset.zip'
# root_directory = os.path.join(os.getcwd(),'.')
# download_and_extract_archive(url, root_directory)

Созададим базовый набор преобразований для полносвязной сети. Удобно будет пользоваться одноканальными иозбражениями в градации серого.

In [None]:
SIZE = (32,32)
transform_ = lambda x: x.flatten()/255

transform = transforms.Compose([
    transforms.Resize(size=SIZE),
    transforms.Grayscale(),
    transforms.ToTensor(),
    transform_,])

Проверим содержание директории с данными. Содержение представляет собой:
* `train` - директория с тренировочным набором данных;
* `train.csv` - файл с описанием данных в формате `csv`;
* `test` - директория с описанием закрытого тестового набора данных, результаты котрого будут оценены в соревновании;
* `test.csv` - файл с описанием закрытого набора данных в формате `csv`;
* `test_open` - директория с описанием открытого тестового набора данных, для самопроверки;
* `test_open.csv` - файл с описанием открытого набора данных в формате `csv`;

Файлы с открытам описанием данных представляют собой два столбца, сопоставляющих название файла и его метку. В закрытом наборе данных описание - это заданный порядок названий файлов без меток.

In [None]:
path2data = 'ContestDataset'
os.listdir(path2data)

Дополнительно откроем каджый файл описания, чтобы убедиться что он имеет верный формат.

In [None]:
csv = pd.read_csv(os.path.join(path2data,'train.csv'))
csv.head(1)

In [None]:
csv = pd.read_csv(os.path.join(path2data,'test_open.csv'))
csv.head(1)

In [None]:
csv = pd.read_csv(os.path.join('ContestDataset','test.csv'))
csv.shape

Можно заметить, что классы заданы их именами. Нам потребуется присвоить классам численные метки.

In [None]:
from sklearn.preprocessing import LabelEncoder
le = LabelEncoder()
csv = pd.read_csv(os.path.join('ContestDataset','test_open.csv'))
print(np.unique(le.fit_transform(csv['class'])), csv['class'].unique() )

In [None]:
le.inverse_transform(np.array((2,)))[0]

Теперь создадим класс для работы с данными. Класс представляет собой типичный `Dataset class` `PyTorch`.

Аргументы конструктора:
* `image_dir` - путь к директории c изображениями;
* `data_path` - путь к файлу с описанием изорбажений;
* `transform` - набор преобразований изображения в формате `torchvision.transforms`;
* `le` - заданный энкодер меток (Отметим, что тут мог бы быть более общий класс `target_transform`.

Наиболее важной частью класса является метод `__getitem__`, позволяющий работать с экземплярами класса как со списком, обращаясь по индексу.  

Метод возвращает пару `(x,y)`, где `x` - изображение, а `y` или метка, или название файла, если метки нет.

In [None]:
class ContestDataset(Dataset):
    def __init__(self, image_dir = 'train', data_path = 'train.csv', transform=None, le = False):
        
        self.image_dir = image_dir
        
        csv = pd.read_csv(data_path)      
        
        self.fnames = csv['file_name']        

        self.transform = transform
        
        self.le = le
        
        if 'class' in csv.columns:
            if self.le == False:
                self.le = LabelEncoder()
                self.le.fit(csv['class'])
                
            self.class_names = list(csv['class'].unique())                            
            self.labels = self.le.transform(csv['class'])
            self.labels_exist = True

        else:
            self.labels = None
            self.labels_exist = False

    
    def __len__(self):
        return len(self.fnames)

    def __getitem__(self, idx):
        img = Image.open(os.path.join(self.image_dir,self.fnames[idx]) ) 
#         img = np.asarray(img, dtype=float) 

        if self.transform:
            img = self.transform(img)
        
        if self.labels_exist:
            label = self.labels[idx] 
        else:
            label = self.fnames[idx]

        return img, label

In [None]:
train_data =  ContestDataset(image_dir = os.path.join('ContestDataset','train'), 
                             data_path = os.path.join('ContestDataset','train.csv'),                              
                             transform=transform, le = le)
print(len(train_data))
x,y = train_data[8]
print(x.shape, y)
plt.figure(figsize=(3,3));plt.imshow(x.reshape(*SIZE), cmap='gray'); plt.title('class: '+str(y)); plt.axis('off'); plt.show();

In [None]:
test_data =  ContestDataset(image_dir = os.path.join('ContestDataset','test_open'), 
                            data_path = os.path.join('ContestDataset','test_open.csv'), 
                            transform=transform, le = le)

print(len(test_data))
x,y = test_data[8]
print(x.shape)
plt.figure(figsize=(3,3));plt.imshow(x.reshape(*SIZE), cmap='gray'); plt.title('test: '+str(y)); plt.axis('off'); plt.show();

In [None]:
submit_data =  ContestDataset(image_dir = os.path.join('ContestDataset','test'), 
                              data_path = os.path.join('ContestDataset','test.csv'), 
                              transform=transform, le = le)

print(len(submit_data))
x, _ = submit_data[8]
print(x.shape)
plt.figure(figsize=(3,3));plt.imshow(x.reshape(*SIZE), cmap='gray'); plt.title('submit_data'); plt.axis('off'); plt.show();

In [None]:
fig, axs = plt.subplots(2,4, figsize=(12,4));

x,y = train_data[0]

print('TRAIN', len(train_data), 'shape', x.shape)
for i,idx in enumerate(np.random.randint(0, len(train_data), 4)):  
    x,y = train_data[idx]    
    axs[0,i].imshow(x.reshape(*SIZE), cmap='gray'); 
    axs[0,i].axis('off'); 
    axs[0,i].set_title('class:'+str(y)); 

x, _ = submit_data[0]
print('SUBMIT', len(submit_data), 'shape', x.shape)
for i,idx in enumerate(np.random.randint(0, len(submit_data), 4)):  
    x, _ = submit_data[idx]    
    axs[1,i].imshow(x.reshape(*SIZE), cmap='gray'); 
    axs[1,i].axis('off'); 
    axs[1,i].set_title('test data'); 

plt.show();

In [None]:
g = torch.Generator('cuda')  # ← Always CPU for random_split
g.manual_seed(42)

## Выделение валидационной подвыборки

In [None]:
TRAIN_RATIO = 0.8

n_train_examples = int(len(train_data) * TRAIN_RATIO)
n_valid_examples = len(train_data) - n_train_examples

train_data, valid_data = torch.utils.data.random_split(train_data, [n_train_examples, n_valid_examples], generator=g)

print(f"Training data size : {len(train_data)}, Validation data size : {len(valid_data)}, Test data size : {len(test_data)}")

## Создание загрузчика данных

Обратите внимание что на текущий момент имеет место 4 набора данных, и соответственно создается 4 загрузчика данных:
* `train_loader` - тренировочные данные;
* `val_loader`   - валидационные данные;
* `test_loader`  - тестовые данные (открытый тест для самопроверки);
* `submit_loader` - тестовые данные для подачи (закрытый тест).

In [None]:
g_loader = torch.Generator(device=device)  # Explicitly set to CPU
g_loader.manual_seed(0)

In [None]:
import torch.utils.data as data

BATCH_SIZE = 64

train_loader = data.DataLoader(train_data,
                                 shuffle=True,
                                 batch_size=BATCH_SIZE,
                                 generator=g_loader)

val_loader = data.DataLoader(valid_data,
                                 batch_size=BATCH_SIZE,
                                 generator=g_loader)

test_loader = data.DataLoader(test_data,
                                batch_size=BATCH_SIZE,
                             generator=g_loader)


submit_loader = data.DataLoader(submit_data,
                                batch_size=BATCH_SIZE,
                               generator=g_loader)
 
print(f"Training data batches : {len(train_loader)}, Validation data batches : {len(val_loader)}, Test data batches : {len(test_loader)}, Submit data batches : {len(test_loader)}")

In [None]:
images, label =  next(iter(val_loader))
images.shape, label.shape


In [None]:
images = images.reshape(-1,1,*SIZE)
batch = torchvision.utils.make_grid(images, nrow = int(np.sqrt(images.shape[0])), padding = 0)
vis_batch = batch.data.numpy().transpose((1, 2, 0))*255
plt.imshow(vis_batch); 
plt.axis('off');
plt.show()
images.shape, batch.shape,  vis_batch.shape

## Создание модели

В данном случае создадим базовую модель - ее результаты нужно исключительно для того, чтобы понять, что набор данных позволяет обучить какую то нейронную сеть и понять какие оценки точности могут считать минимально-достижимыми.

In [None]:
class Model(nn.Module):
    def __init__(self):
        super().__init__()
        self.fc1 = nn.Linear(SIZE[0]*SIZE[1], 256)
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, 128)
        self.fc_out = nn.Linear(128, 11)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x)+ x) 
        x = self.fc_out(x)
        return x
        

In [None]:
model = Model()

# summary(model,input_size =(1,SIZE[0]*SIZE[1]))

## Функции для обучения

Функции описывают минимальный рабочий вариант обучения и проверки качества работы нейронной сети.

In [None]:
def train(model, dataloader, optimizer, criterion, metric,  device):

    epoch_loss = 0
    epoch_acc  = 0

    model.train()

    for (x, y) in tqdm(dataloader, desc="Training", leave=False):

        x, y = x.to(device).float(), y.to(device).long()

        optimizer.zero_grad(set_to_none = True)

        y_pred = model(x)

        loss = criterion(y_pred, y)
        acc  = metric(y_pred, y)

        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        epoch_acc  += acc.item()

    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

def evaluate(model, dataloader, criterion, metric, device):

    epoch_loss = 0
    epoch_acc  = 0

    model.eval()

    with torch.inference_mode():
        
        for (x, y) in tqdm(dataloader, desc="Evaluating", leave=False):

            x, y = x.to(device).float(), y.to(device).long()

            y_pred = model.forward(x)
            
            loss = criterion(y_pred, y)
            acc  = metric( y_pred, y)

            epoch_loss += loss.item()
            epoch_acc  += acc.item()

    return epoch_loss / len(dataloader), epoch_acc / len(dataloader)

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

EPOCHS = 100

def fit(model, train_loader, val_loader, optimizer, criterion, metric, epochs = EPOCHS, device='cpu',  path_best = 'best_model.pt', verbose = True):

   
    best_valid_loss = float('inf')

    for epoch in trange(epochs):

        start_time = time.monotonic()

        train_loss, train_acc = train(model, train_loader, optimizer, criterion, metric, device)
        valid_loss, valid_acc = evaluate(model, val_loader, criterion, metric, device)

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), path_best)

        end_time = time.monotonic()

        epoch_mins, epoch_secs = epoch_time(start_time, end_time)

        if verbose == True:
            print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
            print(f'\tTrain Loss: {train_loss:.3f} | Train Acc: {train_acc*100:.2f}%',
            f' | Val. Loss: {valid_loss:.3f} | Val. Acc: {valid_acc*100:.2f}%') 


## Инициализация и обучение модели

Параметры специально не подбирались.

В качестве метрики используется `accuracy` - так как в закрытом наборе качество работы будет также оцениваться по этой метрике.

In [None]:
device

In [None]:
model = Model()
criterion = nn.CrossEntropyLoss()
model = model.to(device)
criterion = criterion.to(device)
optimizer = optim.Adam(model.parameters(), lr = 5e-3)


def calculate_accuracy(y_pred, y):
    with torch.no_grad():
        top_pred = y_pred.argmax(1, keepdim=True)
        correct = top_pred.eq(y.view_as(top_pred)).sum()
        acc = correct.float() / y.shape[0]
    return acc

metric = calculate_accuracy

In [None]:
fit(model, train_loader, val_loader, optimizer, criterion, metric, epochs = 100, device=device, verbose = True)

## Тест модели на лучшей эпохе

In [None]:
model.load_state_dict(torch.load('best_model.pt'))

test_loss, test_acc = evaluate(model, test_loader, criterion, metric, device)

print(f'Test Loss: {test_loss:.3f} | Test Acc: {test_acc*100:.2f}%')

## Создание файла для отправки

Тут созадна специальная отдельная функция `submit` результат работы которой - это `DataFrame` с двумя колноками:
* `file_name`	- название файла из закрытого набора данных
* `class` - предсказанная метка класса (в формате числа от 0 и выше).

In [None]:
def submit(model, dataloader, device):

    df = pd.DataFrame(columns = ('file_name','class'))
    model.eval()

    with torch.inference_mode():
        
        for x,y in tqdm(dataloader, desc="Evaluating", leave=False):

            x = x.to(device).float()
            y_pred = model.forward(x)
            if y_pred.is_cuda: 
                cls = list(y_pred.argmax(1, keepdim=False).data.cpu().numpy())
            else:
                cls = list(y_pred.argmax(1, keepdim=False).data.numpy())
            cls_label = le.inverse_transform(np.array((cls)))[0]
            df = pd.concat([df,pd.DataFrame({'file_name':y,'class':cls_label})], ignore_index = True)

    return df.reset_index()

In [None]:
len(submit_data)

In [None]:
df = submit(model, submit_loader, device)

In [None]:
df

Сохраним результат.

<code style="color:red">__Большая просьба сохранять результат с указанием ФИО !__ </code>

In [None]:
path_submit = 'submit.csv'

df[['file_name','class']].to_csv(path_submit, index = False )
print('FULL PATH:  \n', os.path.join(os.getcwd(), path_submit) )

Проверка файла на открытие

In [None]:
frame = pd.read_csv('submit.csv')

In [None]:
frame.groupby('class').count()