На этом семинаре необходимо будет (1) реализовать простейшую metric learning архитектуру на основе сиамской нейросети с Contrastive Loss и использовать ее для поиска похожих изображений (2) реализовать fully convolutional сеть для задачи image super-resolution. 

# Metric Learning (0.7 балла)

In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
from collections import defaultdict
from IPython.display import clear_output

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
from  torchvision import datasets, transforms
import torch.optim as optim
from torch.utils.data.sampler import Sampler, BatchSampler
from torch.utils.data import DataLoader
from torch.nn.modules.loss import MSELoss

from tqdm import tqdm_notebook as tqdm

Вам необходимо реализовать вычисление Contrastive Loss - одну из самых популярных функций потерь для metric learning. Contrastive Loss получает на вход пару векторов $x_i$ и $x_j$ (признаковые описания объектов $i$ и $j$, полученные нейросетью) и метку $y_{ij}$, причем $y_{ij} = 0$, если объекты "похожи" (принадлежат одному классу), и $y_{ij} = 1$, если объекты "различны" (принадлежат различным классам). Формально определим Contrastive Loss следующим образом:

$$
L(x_i, x_j, y_{ij}) = (1 - y_{ij})\|x_i - x_j\|^2 + y_{ij}max(0, m - \|x_i - x_j\|^2)
$$

где $m$ - гиперпараметр (его можно взять равным единице).

Вместо того, чтобы формировать обучающее множество из всевозможных пар, можно поступить проще: будем пропускать батч из $N$ обучаюших изображений через сеть (тем самым получая соответствующие векторы $x$), а значение лосса вычислять как среднее значение функции $L$ на всех парах в этом батче. Тогда в обучении на каждом батче участвует $\frac{N(N-1)}{2}$ пар, что существенно ускоряет сходимость на практике. Реализуйте предложенный вариант Contrastive Loss.

In [None]:
class ContrastiveLoss(torch.nn.Module):
    def __init__(self, margin=1.0):
        super(ContrastiveLoss, self).__init__()
        self.margin = margin

    def forward(self, x, y):
        <your code>

В задачах metric learning, как правило, необходимо, чтобы количества "положительных" и "отрицательных" пар в обучении отличалось несильно. Поэтому в случае большого количества классов случайное формирование батчей неэффективно - в таком случае количество "положительных" пар очень мало. Поэтому будем формировать обучающие батчи размера $N$ следующим образом: будем брать $\frac{N}{2}$ элементов из некоторого класса (они между собой будут формировать "положительные пары"), а оставшиеся $\frac{N}{2}$ элементов будем брать случайно. Таким образом мы гарантируем, что в каждом обучающем батче будет достаточно "положительных" пар.

Реализуйте предложенную логику в рамках Pytorch, реализовав собственный BatchSampler. Ваш самплер должен формировать каждый батч размера $N$ следующим образом: $\frac{N}{2}$ объектов извлекаются из некоторого случайного класса, оставшиеся $\frac{N}{2}$ объектов извлекаются случайно.

In [None]:
class ContrastiveSampler(BatchSampler):
    def __init__(self, batch_size, num_classes, labels):
        self.num_classes = num_classes
        self.imgs_per_class = labels.size()[0] // num_classes
        <your code>
        
    def __iter__(self):
        num_yielded = 0
        while num_yielded < (self.num_classes * self.imgs_per_class):
            batch = []
            <your code>
            num_yielded += self.batch_size
            yield batch

В этом задании будем работать с небольшими изображениями одежды из датасета Fashion-MNIST.

In [None]:
input_size = 784
num_classes = 10
batch_size = 256


download_path = '/tmp'
train_dataset = datasets.FashionMNIST(root=download_path, 
                                   train=True, 
                                   transform=transforms.ToTensor(),
                                   download=True)

test_dataset = dsets.FashionMNIST(root=download_path, 
                                  train=False, 
                                  transform=transforms.ToTensor())

train_loader = DataLoader(
    dataset=train_dataset, 
    batch_sampler=ContrastiveSampler(batch_size=batch_size, num_classes=num_classes, labels=train_dataset.train_labels), 
    shuffle=False)

test_loader = DataLoader(
    dataset=test_dataset, 
    batch_sampler=ContrastiveSampler(batch_size=batch_size, num_classes=num_classes, labels=test_dataset.test_labels), 
    shuffle=False)

Реализуйте сеть несложной архитектуры, содержащую три сверточных слоя из 20 фильтров с макс-пулингом, а также два полносвязных слоя из 128 нейронов. Выход последнего слоя будет подаваться на вход Contrastive Loss.

In [None]:
class Flatten(nn.Module):
    def forward(self, x):
        return x.view(x.size()[0], -1)

class ContrastiveNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn1 = nn.Sequential(
            <your code>
        )

    def forward(self, x):
        output = self.cnn1(x)
        return output

Наши обычные функции для тренировки и отображения графиков:

In [None]:
def plot_history(log, name=None):
    """log is list of dictionaries like 
        [
            {'train_step': 0, 'train_loss': 10.0, 'train_acc': 0.0}, 
            ...
            {'train_step': 100, 'val_loss': 0.1, 'val_acc': 0.9},
            ...
        ]
    """
    if name is None:
        name='loss'
    train_points, val_points = [], []
    train_key = 'train_{}'.format(name)
    val_key = 'val_{}'.format(name)

    for entry in log:
        if train_key in entry:
            train_points.append((entry['train_step'], entry[train_key]))
        if val_key in entry:
            val_points.append((entry['train_step'], entry[val_key]))
    
    plt.figure()
    plt.title(name)
    x, y = list(zip(*train_points))
    plt.plot(x, y, label='train', zorder=1)
    x, y = list(zip(*val_points))
    plt.scatter(x, y, label='val', zorder=2, marker='+', s=180, c='orange')
    
    plt.legend(loc='best')
    plt.grid()
    plt.show()

In [None]:
contrastive_loss = ContrastiveLoss()

def train_model(model, optimizer, train_loader, val_loader, epochs=3):
    log = []
    train_step = 0
    for epoch in range(epochs):
        model.train()
        for x, y in tqdm(train_loader):
            optimizer.zero_grad()
            output = model(x)
            loss = contrastive_loss(output, y)
            
            loss.backward()
            optimizer.step()
            
            log.append(dict(
                train_loss=loss.item(),
                train_step=train_step,
            ))
            train_step += 1

        # валидационные метрики надо усредних за все валидационные батчи
        # hint: для аккумулирования величин удобно взять defaultdict
        tmp = defaultdict(list)
        model.eval()
        for x, y in tqdm(val_loader):
            with torch.no_grad():
                output = model(x)
                loss = contrastive_loss(output, y)
                tmp['loss'].append(loss.item())
                
                
        log.append(dict(
            val_loss = np.mean(tmp['loss']),  # скаляры
            train_step=train_step,
        ))
        
        clear_output()
        plot_history(log, name='loss') 

Обучите сеть с параметрами, указанными ниже.

In [None]:
model = ContrastiveNetwork()
opt = torch.optim.Adam(model.parameters(), lr=0.0005)

train_model(model, opt, train_loader, test_loader, epochs=2)

Извлеките векторные описания тестовых изображений (a.k.a эмбеддинги). У вас должно получиться 10000 128-мерных векторов.

In [None]:
testImages = test_dataset.test_data
embeddings = model(Variable(test_dataset.test_data.view(-1,1,28,28)).float())

Код ниже демонстрирует поисковую выдачу для трех изображений-запросов. Выдача формируется на основе близости эмбеддингов.

In [None]:
queryCount = 3
queries = embeddings[:queryCount,:].data.numpy()
database = embeddings[queryCount:,:].data.numpy()
plt.figure(figsize=[15, 4.5])
for i in range(queryCount):
    results = np.argsort(np.sum((database-queries[i,:])**2, axis=1))[:10]
    plt.subplot(queryCount, 11, i * 11 + 1)
    plt.title("Query: %i" % i)
    plt.imshow(test_dataset.test_data[i].numpy().reshape([28, 28]), cmap='gray')
    plt.axis('off')
    for k in range(10):
        plt.subplot(queryCount, 11, i * 11 + k + 2)
        plt.imshow(test_dataset.test_data[results[k]+queryCount].numpy().reshape([28, 28]), cmap='gray')
        plt.axis('off')

# Super-resolution (0.3 балла)

В этой части вам предстоит реализовать простейшую архитектуру для решения задачи image super-resolution.

In [None]:
input_size = 784
num_classes = 10
batch_size = 256

download_path = '/tmp'
train_dataset = datasets.MNIST(root=download_path, 
                            train=True, 
                            transform=transforms.ToTensor(),
                            download=True)

test_dataset = datasets.MNIST(root=download_path, 
                           train=False, 
                           transform=transforms.ToTensor())

train_loader = DataLoader(dataset=train_dataset,
                          batch_size=batch_size,
                          shuffle=False)

test_loader = DataLoader(dataset=test_dataset, 
                         batch_size=batch_size,         
                         shuffle=False)

Мы будем увеличивать изображения размера (14,14) в два раза по каждому измерению. Как правило, перед подачей на вход нейросети изображение низкого разрешения увеличивают до нужного размера билинейной интерполяцией, а нейросеть улучшает результат интерполяции, не меняя пространственные размеры изображения.

Реализуйте нейросеть из трех сверточных слоев (10 фильтров на каждом слое), которая будет получать на вход черно-белое изображение и выдавать на выход изображение такого же размера.

In [None]:
class SuperResolutionNetwork(nn.Module):
    def __init__(self):
        super().__init__()
        self.cnn1 = nn.Sequential(
            <your code>
        )

    def forward(self, x):
        output = self.cnn1(x)
        return output + x

Нам потребуется несколько переписать тренировку:
- метки классов не нужны
- таргет будем получать с помощью ресайзов из входных данных

In [None]:
def low_res_and_high_res(images_batch):
    result = images_batch.clone()
    low_res_transform = transforms.Resize((14,14))
    high_res_transform = transforms.Resize((28,28))
    toTensorTransform = transforms.ToTensor()
    toImageTransform = transforms.ToPILImage()
    for i in range(images_batch.size()[0]):
        result[i] = toTensorTransform(high_res_transform(low_res_transform(toImageTransform(images_batch[i]))))
    return result


def train_super_res_model(model, optimizer, train_loader, val_loader, epochs=3):
    log = []
    train_step = 0
    for epoch in range(epochs):
        model.train()
        for x, _ in tqdm(train_loader):
            target = low_res_and_high_res(x)
            
            optimizer.zero_grad()
            output = model(x)        
            loss = F.mse_loss(output, target)
            loss.backward()
            optimizer.step()
        
            log.append(dict(
                train_loss=loss.item(),
                train_step=train_step,
            ))
            train_step += 1

        # валидационные метрики надо усредних за все валидационные батчи
        # hint: для аккумулирования величин удобно взять defaultdict
        tmp = defaultdict(list)
        model.eval()
        for x, y in tqdm(val_loader):
            with torch.no_grad():
                target = low_res_and_high_res(x)
                output = model(x)
                loss = F.mse_loss(output, target)
                tmp['loss'].append(loss.item())
                
        log.append(dict(
            val_loss = np.mean(tmp['loss']),  # скаляры
            train_step=train_step,
        ))
        
        clear_output()
        plot_history(log, name='loss')

Оптимизируйте сеть с параметрами, указанными ниже.

In [None]:
model = SuperResolutionNetwork()
opt = torch.optim.Adam(model.parameters(), lr=0.005, weight_decay=0.05)
train_super_res_model(model, opt, train_loader, test_loader, epochs=1)

In [None]:
test_images = test_dataset.test_data.float() / 255
test_images_blurred = low_res_and_high_res(test_images[:100].view(-1,1,28,28))
result_cnn = model(Variable(test_images_blurred))

Код ниже визуализирует исходные изображения (28,28) и реконструкции, полученные с помощью нейросети.
Не удивляйтесь, есть качество реконструкций покажется низким, скоро вы узнаете, что MSE-loss, который мы использовали при обучении, не является оптимальным для задачи super-resolution (гораздо лучше работают adversarial-сети, про которые вам расскажут через пару недель).

In [None]:
examplesCount = 6
plt.figure(figsize=[10, 10])
for i in range(examplesCount):
    plt.subplot(examplesCount, 3, i * 3 + 1)
    plt.title("Original: %i" % i)
    plt.imshow(test_dataset.test_data[i].numpy().reshape([28, 28]), cmap='gray')
    plt.axis('off')
    plt.subplot(examplesCount, 3, i * 3 + 2)
    plt.title("Super-ressed: %i" % i)
    plt.imshow(np.clip(result_cnn[i].data.numpy().reshape([28, 28]), 0, 1), cmap='gray')
    plt.axis('off')
    plt.subplot(examplesCount, 3, i * 3 + 3)
    plt.title("Upscaled initial %i" % i)
    plt.imshow(test_images_blurred[i].numpy().reshape([28, 28]), cmap='gray')
    plt.axis('off')