# Сверточные нейронные сети (ноутбук)

> Знакомимся со сверточными нейронными сетями. Учимся решать задачу классификации изображений.

In [None]:
import warnings

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torchvision.transforms as T
from IPython.display import clear_output
from PIL import Image
from matplotlib import cm
from time import perf_counter
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST
from tqdm import tqdm

warnings.filterwarnings('ignore')

plt.rc('font', size=30)

## Opencv
Библиотека для удобной работы с картинками

## Загрузка изображения

Загрузим изображение из интернета, например, мем про юккури.

In [None]:
! wget "https://upload.wikimedia.org/wikipedia/commons/thumb/e/ec/Mona_Lisa%2C_by_Leonardo_da_Vinci%2C_from_C2RMF_retouched.jpg/270px-Mona_Lisa%2C_by_Leonardo_da_Vinci%2C_from_C2RMF_retouched.jpg" -O mona_lisa.jpg

## Чтение изображения

In [None]:
img = Image.open('./mona_lisa.jpg')

In [None]:
img.format

In [None]:
img.show()

In [None]:
img.size

In [None]:
img.mode

In [None]:
img

## Матричное представление

In [None]:
img_matrix = np.array(img)

In [None]:
img_matrix.shape

In [None]:
plt.figure(figsize=(20, 10))

plt.imshow(img_matrix)

plt.show()

In [None]:
for i, (cmap, color) in enumerate(
    zip(
        [cm.Reds, cm.Greens, cm.Blues],
        ['Red', 'Green', 'Blue']
    )
):
    plt.figure(figsize=(20, 10))
    plt.imshow(img_matrix[:, :, i], cmap=cmap)
    plt.title(color)
    plt.show()

In [None]:
img_matrix[:, :, 0].min(), img_matrix[:, :, 0].max()

In [None]:
img_matrix

## Свертка

Свертка в `PyTorch` представлена модулем `nn.Conv2d` со следующими параметрами:

- in_channels (int) – Количество каналов во входном изображении

- out_channels (int) – Количество каналов в выходном изображении

- kernel_size (int or tuple) – Размер ядра свертки

- stride (int or tuple, optional) – Страйд (шаг ядра свертки). По умолчанию: 1

- padding (int, tuple or str, optional) – Размер паддинга. По умолчанию: 0

- padding_mode (string, optional) – 'zeros', 'reflect', 'replicate' or 'circular'. По умолчанию: 'zeros'

- dilation (int or tuple, optional) – Дилейшн (шаг между элементами внутри ядра). По умолчанию: 1

- bias (bool, optional) – добавлять ли обучаемый сдвиг. По умолчанию: True

Как рассчитываются итоговые шейпы:
$$H_{out} = \left\lfloor\frac{H_{in}  + 2 \times \text{padding}[0] - \text{dilation}[0] \times (\text{kernel\_size}[0] - 1) - 1}{\text{stride}[0]} + 1\right\rfloor$$
$$W_{out} = \left\lfloor\frac{W_{in}  + 2 \times \text{padding}[1] - \text{dilation}[1] \times (\text{kernel\_size}[1] - 1) - 1}{\text{stride}[1]} + 1\right\rfloor$$



- [Калькудятор расчета свертки](https://madebyollin.github.io/convnet-calculator/) (не тестил для всех возможных сценариев)
- https://www.youtube.com/@animatedai - У чувака красивые ролики про свертки и не только
- [Convolution explainer](https://poloclub.github.io/cnn-explainer/)


In [None]:
conv1 = nn.Conv2d(
    in_channels=3,
    out_channels=10,
    kernel_size=3
)

In [None]:
conv1.weight

In [None]:
conv1.weight.shape

In [None]:
conv1.bias.shape

In [None]:
list(conv1.parameters())

Одна из операций самых нужных трансформаций - переход к устоявшемуся тензорному представлению изображения в торче:

$$H \times W \times C \to C \times H \times W$$
$$[0, 255] \to [0, 1]$$


Ее осуществляет `T.ToTensor`:

In [None]:
?T.ToTensor

In [None]:
img_tensor = T.ToTensor()(img).unsqueeze(0)

In [None]:
img_tensor.shape

In [None]:
output = conv1(img_tensor)

In [None]:
output.shape

In [None]:
conv2 = nn.Conv2d(
    in_channels=3,
    out_channels=10,
    kernel_size=3,
    padding=1
)

In [None]:
output = conv2(img_tensor)

In [None]:
output.shape

## Пулинг

Пулинг представлен в модуле `torch.nn`, в основном будем использовать `MaxPool2d` и `AvgPool2d`.

Параметры:

- kernel_size – размер окошка

- stride – страйд окошка. По умолчанию равен kernel_size

- padding – сколько нулевого паддинга добавлять по краям. По умолчанию: 0.

In [None]:
img_tensor = torch.randint(0, 10, size=(10, 10), dtype=torch.float32).unsqueeze(0)

In [None]:
img_tensor

In [None]:
pooling1 = nn.MaxPool2d(kernel_size=2)

In [None]:
pooling1(img_tensor)

In [None]:
pooling2 = nn.AvgPool2d(kernel_size=2)

In [None]:
pooling2(img_tensor)

## Датасет MNIST

In [None]:
mnist_train = MNIST(
    "./mnist",
    train=False,
    download=True

)

In [None]:
mnist_train.data.shape

In [None]:
mnist_train.test_labels.shape

In [None]:
mnist_train[0][0]

In [None]:
mnist_train = MNIST(
    "./mnist",
    train=True,
    download=True,
    transform=T.Compose([T.ToTensor(), T.Resize(28)])
)

In [None]:
mnist_valid = MNIST(
    "./mnist",
    train=False,
    download=True,
    transform=T.ToTensor()
)

In [None]:
plt.figure(figsize=(10, 10))
plt.title(f'Number {mnist_train[15][1]}')
plt.imshow(mnist_train[15][0][0])

plt.show()

In [None]:
plt.figure(figsize=(10, 10))
plt.title(f'Number {mnist_train[10000][1]}')
plt.imshow(mnist_train[10000][0][0])

plt.show()

In [None]:
len(mnist_train)

In [None]:
len(mnist_valid)

In [None]:
train_loader = DataLoader(mnist_train, batch_size=1024, shuffle=True, num_workers=12)
valid_loader = DataLoader(mnist_valid, batch_size=1024, shuffle=False, num_workers=12)

## Полносвязная нейронная сеть

Обучим полносвязную нейронную сеть на датасете MNIST.

In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
def train(model: nn.Module) -> float:
    model.train()

    train_loss = 0

    for x, y in tqdm(train_loader, desc='Train'):
        optimizer.zero_grad()

        output = model(x.to(DEVICE))

        loss = loss_fn(output, y.to(DEVICE))

        train_loss += loss.detach()

        loss.backward()

        optimizer.step()

    train_loss /= len(train_loader)
    
    return train_loss.item()

In [None]:
@torch.inference_mode()
def evaluate(model: nn.Module, loader: DataLoader):
    model.eval()

    total_loss = 0
    total = 0
    correct = 0

    for x, y in tqdm(loader, desc='Evaluation'):
        y = y.to(DEVICE)
        output = model(x)

        loss = loss_fn(output, y)

        total_loss += loss.detach()

        y_pred = torch.argmax(output, -1)
        total += y.size(0)
        correct += (y_pred == y).sum().item()

    total_loss /= len(loader)
    accuracy = correct / total

    return total_loss.item(), accuracy

In [None]:
def plot_stats(
    train_loss,
    valid_loss,
    valid_accuracy,
    title: str
):
    plt.figure(figsize=(16, 8))

    plt.title(title + ' loss')

    plt.plot(train_loss, label='Train loss')
    plt.plot(valid_loss, label='Valid loss')
    plt.legend()
    plt.grid()

    plt.show()

    plt.figure(figsize=(16, 8))

    plt.title(title + ' valid accuracy')

    plt.plot(valid_accuracy)
    plt.grid()

    plt.show()

In [None]:
def get_number_of_model_parameters(model: nn.Module) -> int:
    return sum(p.numel() for p in model.parameters())

In [None]:
class CustomSequential(nn.Sequential):
    def __init__(self, *layers):
        super().__init__(*layers)
        self._num_params = get_number_of_model_parameters(self)
        self.to(DEVICE)

        print(self)

    def __str__(self):
        return f"{'*'*75}\nModel:\n{self.__repr__()}\n\nNumber of parameters: {self._num_params}\n{'*'*75}"

    def forward(self, x):
        """
        This wrapper allows us to forget about handling devices mismatch at all OUTSIDE training loop
        
        !! Can be inconvenient for large models and in production
        """
        if x.device != DEVICE:
            x = x.to(DEVICE)
        return super().forward(x)

In [None]:
first_model = CustomSequential(
    nn.Flatten(),
    nn.Linear(28 * 28, 500),
    nn.ReLU(),
    nn.Linear(500, 200),
    nn.ReLU(),
    nn.Linear(200, 10)
)

optimizer = torch.optim.Adam(first_model.parameters(), lr=1e-3)

loss_fn = nn.CrossEntropyLoss().to(DEVICE)

In [None]:
num_epochs = 15

train_loss_history, valid_loss_history = [], []
valid_accuracy_history = []

start = perf_counter()

for epoch in range(num_epochs):
    valid_loss, valid_accuracy = evaluate(first_model, valid_loader)
    
    train_loss = train(first_model)

    train_loss_history.append(train_loss)
    valid_loss_history.append(valid_loss)
    valid_accuracy_history.append(valid_accuracy)

    clear_output()

    plot_stats(train_loss_history, valid_loss_history, valid_accuracy_history, 'MLP model')
    

valid_loss, valid_accuracy = evaluate(first_model, valid_loader)
    
print(f'Total training and evaluation time {perf_counter() - start:.5f}')

## Сверточная сеть

In [None]:
second_model = CustomSequential(
    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5), #28*28
    nn.LeakyReLU(),
    nn.MaxPool2d(kernel_size=2), #14*14

    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=5), # 14 - 5 + 1
    nn.LeakyReLU(),
    nn.MaxPool2d(kernel_size=2),
    
    nn.Flatten(),
    nn.Linear(4 * 4 * 16, 256),
    nn.LeakyReLU(),
    nn.Linear(256, 10))


x = torch.rand((1, 1, 28, 28))
print(x.shape)
print(second_model(x).shape)

In [None]:
second_model = CustomSequential(
    nn.Conv2d(in_channels=1, out_channels=16, kernel_size=5), #28*28 -> 24*24
    nn.LeakyReLU(),
    nn.MaxPool2d(kernel_size=2), #24*24 -> 12*12

    nn.Conv2d(in_channels=16, out_channels=16, kernel_size=5), #12*12 -> 8*8
    nn.LeakyReLU(),
    nn.MaxPool2d(kernel_size=2), # 8*8 -> 4*4

    nn.Flatten(),
    nn.Linear(4 * 4 * 16, 256),
    nn.LeakyReLU(),
    nn.Linear(256, 10)
)

optimizer = torch.optim.Adam(second_model.parameters(), lr=7e-4)

loss_fn = nn.CrossEntropyLoss().to(DEVICE)

In [None]:
num_epochs = 10

train_loss_history, valid_loss_history = [], []
valid_accuracy_history = []

start = perf_counter()

for epoch in range(num_epochs):
    train_loss = train(second_model)

    valid_loss, valid_accuracy = evaluate(second_model, valid_loader)

    train_loss_history.append(train_loss)
    valid_loss_history.append(valid_loss)
    valid_accuracy_history.append(valid_accuracy)

    clear_output()

    plot_stats(train_loss_history, valid_loss_history, valid_accuracy_history, 'CONV model')

print(f'Total training and evaluation time {perf_counter() - start:.5f}')

In [None]:
second_model.eval()

with torch.no_grad():
    for img, cls in valid_loader:
        print("predicted: ", torch.argmax(second_model(img)[:20], dim=-1).cpu())
        print("true:      ", cls[:20], end=f"\n\n{'*' * 80}\n\n")
        

## Зафигачим обученную модель

(пример адаптирован из https://github.com/mryab/efficient-dl-systems/tree/main)

Можно взять предобученную модель и сразу ей пользоваться, давайте так и сделаем!


Создадим простой http-запрос, который идет по ссылке и качает оттуда картинку, побрабатывает ее и передает нейрокне, которую мы предварительно проинициализировали предобученными весами

Будем получать от нее ответы, какие объекты содержатся на картинке

In [None]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn_v2, FasterRCNN_ResNet50_FPN_V2_Weights

CATEGORIES = FasterRCNN_ResNet50_FPN_V2_Weights.COCO_V1.meta['categories']
SCORE_THRESHOLD = 0.75


model = fasterrcnn_resnet50_fpn_v2(weights=FasterRCNN_ResNet50_FPN_V2_Weights.COCO_V1).to(DEVICE)
model.eval()

In [None]:
import requests

In [None]:
def get_prediction(image_url, model):
    response = requests.get(image_url, stream=True)  # request to the image address, get response
    image = Image.open(response.raw)  # read raw bytes

    # model goes brrr
    image_transformed = T.ToTensor()(image).unsqueeze(0).to(DEVICE)
    with torch.no_grad():
        prediction = model(image_transformed)[0]
    labels_predicted = prediction['labels'].data.cpu().numpy()
    scores = prediction['scores'].data.cpu().numpy()

    # get predictions
    labels_selected = labels_predicted[scores > SCORE_THRESHOLD]
    objects = [CATEGORIES[label_id] for label_id in labels_selected]
    
    image.show()

    return objects


get_prediction("https://upload.wikimedia.org/wikipedia/commons/thumb/e/ec/Mona_Lisa%2C_by_Leonardo_da_Vinci%2C_from_C2RMF_retouched.jpg/270px-Mona_Lisa%2C_by_Leonardo_da_Vinci%2C_from_C2RMF_retouched.jpg", model)

In [None]:

get_prediction("https://media.licdn.com/dms/image/v2/C5622AQEfw4J2wKWv8A/feedshare-shrink_800/feedshare-shrink_800/0/1642765164511?e=2147483647&v=beta&t=2CH5JXpPyOIbF1qCN1eFKTWN16-W38bhIO7l5LP_Mfc", model)

In [None]:
get_prediction("https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTPMiNrFTdy4vYgOcjhcN6PPqEVB08V3KdQ2A&s", model)

In [None]:
get_prediction("https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcQ92tpDktYrkx4camlJR3yUpXI5_ElqdZfKWA&s", model)

In [None]:
get_prediction("https://i.redd.it/j586af7nxvu41.jpg", model)