# Распознавание цифр

В этот раз мы будем использовать датасет MNIST, который содержить около 60 000 картинок с цифрами, которые были написаны отруки. Каждая картинка в нём имеет размер 28х28 пикселей. Цифры: от 0 до 9. 

То есть в этом случае мы имеем дело с мультиклассовой классификацией. 

> MNIST расшифровывается как Modified National Institute of Standart and Technology. 

Сначала импортируем все необходимые библиотеки. 

> Для распознавания будем использовать torch 

In [None]:
import os 

CURRENT_DIR = os.path.dirname(os.path.abspath("__file__"))

import torch
import torchvision
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch.nn as nn
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torchvision.transforms import transforms
import torch.nn.functional as F

from tqdm import tqdm
from ipywidgets import interact, IntSlider

In [None]:
random_seed = 42
torch.backends.cudnn.enabled = False
torch.manual_seed(random_seed)

Инициализируем параметры 

In [None]:
CONFIG = { 
    "input_size": 784,      # 28x28 
    "hidden_size_1": 200,   # размер 1-го скрытого слоя
    "hidden_size_2": 150,   # размер 2-го скрытого слоя
    "hidden_size_3": 100,   # размер 3-го скрытого слоя
    "hidden_size_4": 80,    # размер 4-го скрытого слоя 
    "output": 10,           # кол-во выходов сети (т.к. цифры от 0 до 9)
    "bach_size": 132,       # обычно используется степень 2-ки
    "lr_rate": 0.01
}

Загружаем данные, повезло, что в библиотеке torchvision уже есть функция, которая всё сделает за нас. 

In [None]:
# загружаем обучающую выборку 
train_data = torchvision.datasets.MNIST(
    "mnist_content", train=True, transform=transforms.ToTensor(), download=True
)

In [None]:
# разделяем обучающую выборку на обучающую и валидационную выборки
# 70% для обучения, 30% для валидации
train_size = int(len(train_data) * 0.7)
valid_size = len(train_data) - train_size
train_data, valid_data = torch.utils.data.random_split(train_data, [train_size, valid_size])

In [None]:
# загружаем тестовую выборку
test_data = torchvision.datasets.MNIST(
    "mnist_content", train=False, transform=None, download=True
)

Создаём лоядеры данных. Эти лоадеры прослойкой между выборками и кодом модели, так как модель ожидает данные в определённой форме, лоадер делает эту "грязную" работу за нас. Что есть удобство! 

In [None]:
train_dataloader=torch.utils.data.DataLoader(
    dataset=train_data, 
    batch_size=CONFIG["bach_size"],
    shuffle=True
)
valid_dataloader = torch.utils.data.DataLoader(
    dataset=valid_data, 
    batch_size=CONFIG["bach_size"],
    shuffle=False
)
test_dataloader=torch.utils.data.DataLoader(
    dataset=test_data, 
    batch_size=CONFIG["bach_size"],
    shuffle=False
)

In [None]:
data=iter(train_dataloader)
samples,labels=next(data)
print(f"Number of samples: {samples.shape}")
print(f"Number of labels: {labels.shape}")

Давайте посмотрим на картинки 

In [None]:
plt.figure(figsize=(10,5))
for i in range(10):
    plt.subplot(2,5,i+1)
    plt.imshow(samples[i][0],cmap='BuPu')
plt.show()

Очевидно, что все люди пишут цифры по-разному, и временами даже сам человек не может быть до конца уверен, что за цифра на картинке. 

Давайте напишем класс самой модели, которая будет обучаться. 

In [None]:
class MNIST(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, hidden_size3, hidden_size, output):
        super().__init__()
        self.f_connected1=nn.Linear(input_size, hidden_size1)
        self.f_connected2=nn.Linear(hidden_size1, hidden_size2)
        self.f_connected3=nn.Linear(hidden_size2, hidden_size3)
        self.f_connected4=nn.Linear(hidden_size3, hidden_size)
        self.out_connected=nn.Linear(hidden_size, output)

    def forward(self,x):
            out=F.relu(self.f_connected1(x)) 
            out=F.relu(self.f_connected2(out))
            out=F.relu(self.f_connected3(out))
            out=F.relu(self.f_connected4(out))
            out=self.out_connected(out)
            return out

* `nn.Module` - это класс из pytorch, его можно рассматривать как довольно "удобного" родителя для своих моделей. 
* `nn.Linear` - это линейный слой
*  `F.relu` - функция активации relu (вообще внутри torch есть много разных уже реализованных функций активации: relu, leaky relu, softmax, sigmoid etc.)

Давайте определимся с устройством, на котором будут выполняться вычисления. Если на вашей машине есть видеокарта и её драйвера установлены правильно, то вы увидите имя 'cuda' в DEVICE, если нет видяхи, то 'cpu'. 

In [None]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

DEVICE

Давайте создадим объект нашей модели, а параметры для неё возьмём из конфига. 

In [None]:
model = MNIST(
    input_size=CONFIG["input_size"], 
    hidden_size1=CONFIG["hidden_size_1"],
    hidden_size2=CONFIG["hidden_size_2"], 
    hidden_size3=CONFIG["hidden_size_3"], 
    hidden_size=CONFIG["hidden_size_4"], 
    output=CONFIG["output"]
)
# сразу отправить модель на устройство 
model = model.to(DEVICE)

Чтобы посмотреть из чего вообще состоит модель, можно воспользовать print.

In [None]:
print(model)

Если вы используете уже предобученную модель из torch, то print тоже будет работать и покажет вам весь внутренний мир модели.

Дальше определяем функцию потерь и метод, по которому будет считаться градиент. 

In [None]:
# функция потерь
criterion=nn.CrossEntropyLoss()
# алгоритм для расчёта градиентного спуска 
optimizer=torch.optim.Adam(model.parameters(),lr=CONFIG["lr_rate"])
# создаём сущность, которая автоматически уменьшит шаг обучения в случае, 
# когда функция потерь перестанет уменьшаться в течение N эпох (patience)
scheduler = ReduceLROnPlateau(
    optimizer, mode="min", patience=10, min_lr=1e-8, verbose=True
)

Это была подготовительная стадия. Теперь давайте организуем цикл для обучения.

In [None]:
NUM_EPOCHS = 100

checkpoint_dpath = os.path.join(CURRENT_DIR, "mnist_checkpoints")
os.makedirs(checkpoint_dpath, exist_ok=True)

best_val_loss = None
best_metric = 0

for epoch in range(NUM_EPOCHS):
    print(f"--- Epoch {epoch} ---")
    for phase in ["train", "val"]:
        epoch_loss = []
        if phase == "train":
            model.train()
            loader = train_dataloader
        else:
            model.eval()
            loader = valid_dataloader

        for images, labels in tqdm(loader, desc=f"{phase.upper()} Processing"):
            images = images.reshape(-1, 28 * 28)

            optimizer.zero_grad()
            with torch.set_grad_enabled(phase == "train"):
                print(images.shape)
                output=model(images)
                loss = criterion(output, labels)
                if phase == "train":
                    loss.backward()
                    optimizer.step()

                epoch_loss.append(loss.item())
    
        epoch_mean_loss = np.mean(epoch_loss)
        print(f"Stage: {phase.upper()}\t| Epoch Loss: {epoch_mean_loss:.10f}")

        if phase == "val":
            if best_val_loss is None or epoch_mean_loss < best_val_loss:
                best_val_loss = epoch_mean_loss

                checkpoint_path = os.path.join(checkpoint_dpath, "best.pth")
                print(f"*** Best state {best_val_loss} saved to {checkpoint_path}")
                save_state = {"model_state": model.state_dict()}
                torch.save(save_state, checkpoint_path)
            else:
                scheduler.step(epoch_mean_loss)
    
    checkpoint_path = os.path.join(checkpoint_dpath, "last.pth")
    print(f"* Last state saved to {checkpoint_path}")
    save_state = {"model_state": model.state_dict()}
    torch.save(save_state, checkpoint_path)

In [None]:
def image_2_tensor(image: np.ndarray) -> torch.Tensor:
    if len(image.shape) == 2:
        image = image[:, :, np.newaxis]

    tensor = torch.from_numpy(image)
    tensor = tensor.float()
    tensor = tensor.div(255)
    tensor = tensor.permute(2, 0, 1)
    return tensor

def tensor_2_image(tensor, dtype=np.uint8, move_channels=False):
    if move_channels:
        tensor = tensor.permute(1, 2, 0)
    np_array = tensor.numpy()
    np_array *= 255
    np_array = np_array.astype(dtype)

    return np_array

In [None]:
import logging


class Inference:
    def __init__(self, model, device) -> None:
        self._logger = logging.getLogger(self.__class__.__name__)

        self.model = model
        self.device = device

    def _prepare_model(self):
        if self._device is None:
            self._device = "cuda" if torch.cuda.is_available() else "cpu"
        self._device = torch.device(self._device)

        self._model.eval()
        self._model = self._model.to(self._device)

    @classmethod
    def from_file(cls, fpath: str, device=None, **kwargs):
        if device is None:
            device = "cuda" if torch.cuda.is_available() else "cpu"

        model_chk = torch.load(fpath, map_location=device)

        return cls.from_checkpoint(model_chk, device=device, **kwargs)
    
    @classmethod
    def from_checkpoint(cls, checkpoint_state: dict, **kwargs):
        model_state = checkpoint_state["model_state"]

        model = MNIST(
            input_size=CONFIG["input_size"], 
            hidden_size1=CONFIG["hidden_size_1"],
            hidden_size2=CONFIG["hidden_size_2"], 
            hidden_size3=CONFIG["hidden_size_3"], 
            hidden_size=CONFIG["hidden_size_4"], 
            output=CONFIG["output"]
        )
        model.load_state_dict(model_state)

        obj_ = cls(model=model, **kwargs)
        return obj_
    
    def get_prediction(self, image: np.ndarray):
        image = image.reshape(-1, 28 * 28) 
        # переводим массив в тензор
        image_t = torch.from_numpy(image)
        image_t = image_t.float()
        image_t = image_t.to(self.device)

        with torch.no_grad():
            output = self.model(image_t)
            _,prediction=torch.max(output,1)
        
        return prediction.tolist()[0]


In [None]:
model_fpath = os.path.join(checkpoint_dpath, "best.pth")
infer = Inference.from_file(model_fpath)

In [None]:
@interact
def show_predictions(index=IntSlider(val=0, min=0, max=len(test_data)-1)):
    test_img, true_label = test_data[index]
    test_img = np.array(test_img)
    
    pred_label = infer.get_prediction(test_img)

    plt.figure(figsize=[5, 5])
    plt.imshow(test_img, cmap="gray")
    print(f"True Label: {true_label}")
    print(f"Predicted Label: {pred_label}")
    plt.show()

## Полезные ссылки 

* [MNIST Handwritten Digit Recognition Using Pytorch](https://medium.com/analytics-vidhya/training-mnist-handwritten-digit-data-using-pytorch-5513bf4614fb)