# Auto-Encoder megvalósítása PyTorch-val

Ebben a notebook-ban megismerjük és implementálunk auto-encoder-eket. Egy vázlatos rajza az auto-encoder-nek az alábbi ábrán látható.

![autoencoder](https://drive.google.com/uc?export=download&id=10sZv98I38nAKqhyVtZPHkmCeeNPgKn6r)

Egy auto-encoder három részből áll:

1. encoder (magasabb dimenziós bemenet -(tömörítés)- alacsonyabb dimenzió)
2. encoded layer (látens vektor, bottleneck) 
3. decoder (alacsonyabb dimenzió -(kitömörítés)- magasabb dimenziós kimenet)

A leginkább számontartott AE variánsok, amikből mi is megtekintünk kettőt:

* basic autoencoder
* sparse autoencoder
* contractive autoencoder
* denoising autoencoder
* variational autoencoder.

Referenciák: 

[Contractive autoencoder](http://www.icml-2011.org/papers/455_icmlpaper.pdf) </br>
[Variational autoencoder](https://arxiv.org/pdf/1606.05908.pdf)

## Implementáció

Először a CAE-t implementáljuk. A Frobenius-normát kézzel számoljuk egy adott architektúrára. Másodiknak a VAE-t készítjük el. A CAE egy példa diszkriminatív modellre, míg a VAE egy példa generatív modellre. 

In [None]:
%matplotlib inline

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import TensorDataset, DataLoader
from pckutils import mnist, utils
import json
from matplotlib import pyplot as plt
import time

## MNIST példa, CAE és VAE

MNIST adat beolvasása.

In [None]:
data = mnist.load_mnist('data')

In [None]:
# fekete-fehér (bináris) kép generálása a szürke árnyalatos képből
# trainloader készítése
x_binary = utils.create_binary_image(data.X_train) 
X = torch.Tensor(x_binary)
tensors = TensorDataset(X)
trainloader = DataLoader(tensors, batch_size=128, shuffle=True)

### CAE

Elsőként a kontraktív AE-t készítjük el. A pytorch rendelkezik autogradiens modullal, de az a jelen esetben a nagy látens tér miatt hajlamos lassú és memória igényes lenni, ezért inkább kézzel számoljuk a deriváltat. A lassúság abból adódik, hogy túl nagy lesz a visszaterjesztés során a számítási fa.

In [None]:
class MNISTautoencoderCAE(nn.Module):
    
    def __init__(self, in_f, out_f):
        super(MNISTautoencoderCAE, self).__init__()
        self.in_f = in_f
        self.out_f = out_f
        
        self.lin_enc = nn.Linear(in_f, out_f)
        self.lin_dec = nn.Linear(out_f, in_f)
        
    def forward(self, x):
        '''
        x - batch_size x in_f
        '''
        y_encoded = torch.sigmoid(self.lin_enc(x))
        self.ae_reg = self.jacobi_loss_calc(y_encoded)
        y_out = torch.sigmoid(self.lin_dec(y_encoded))
        return y_out
    
    def jacobi_loss_calc(self, y):
        sigmoid_der = y * (1-y)
        w = list(self.lin_enc.parameters())[0]
        sigmoid_der_2 = sigmoid_der**2
        w_2 = w**2
        return torch.sum(torch.matmul(sigmoid_der_2, w_2))
    
    def generate_image(self):
        x_random = torch.rand(self.out_f)
        return torch.sigmoid(self.lin_dec(x_random))
    
    def generate_image_from_random(self, x_random):
        return torch.sigmoid(self.lin_dec(x_random))

In [None]:
# segédfüggvény az AE tanításához
def train_ae(ae, trainloader, lr, beta, device):
    ae.device = device
    criterion = nn.MSELoss()
    optimizer = optim.Adam(ae.parameters(), lr=lr)
    running_loss_reg = 0.0
    running_loss_rec = 0.0
    cntr = 0
    
    start = time.process_time()
    for epoch in range(10):
        for i, batch in enumerate(trainloader, 1):
            cntr += 1
            
            optimizer.zero_grad()  # enélkül a gradiensek akkumulálódnak és a tanítás lelassulhat
            x = batch[0]
            x = x.to(device)
        
            y = ae(x)
            loss_reg = beta * ae.ae_reg
            loss_rec = criterion(y, x)
            loss = loss_reg + loss_rec
            loss.backward()
            optimizer.step()
        
            running_loss_reg += loss_reg.item()
            running_loss_rec += loss_rec.item()
        if (epoch + 1) % 2 == 0:
            print('[%3d, %3d] loss_reg: %.3f  loss_rec: %.3f' %
                (epoch + 1, i, running_loss_reg / 200, running_loss_rec / 200))
            running_loss_reg = 0.0
            running_loss_rec = 0.0
    end = time.process_time()

    print("Ellapsed time: %.5f" %(end - start))

Tanítás az MNIST-en, majd megnézzük a helyreállítás minőségét.

In [None]:
device = torch.device("cpu")
ae = MNISTautoencoderCAE(784, 400).to(device)
train_ae(ae, trainloader, 1e-3, 5e-4, device)

In [None]:
x_test = utils.create_binary_image([data.X_test[15]])
y = ae(torch.tensor(x_test[0]).view(1, -1).to(device))

# eredeti
plt.imshow(x_test[0].reshape((28, 28)), cmap='gray')

In [None]:
# helyreállított
plt.imshow(y.detach().numpy().reshape((28, 28)), cmap='gray')

### VAE

In [None]:
class MNISTautoencoderVAE(nn.Module):
    
    def __init__(self, feature_in, feature_out):
        super(MNISTautoencoderVAE, self).__init__()
        self.feature_in = feature_in
        self.feature_out = feature_out
        
        self.lin_enc1 = nn.Linear(feature_in, 600)
        self.lin_enc2 = nn.Linear(600, 500)
        self.lin_enc_mu = nn.Linear(500, feature_out)
        self.lin_enc_std = nn.Linear(500, feature_out)
        
        self.lin_dec1 = nn.Linear(feature_out, 500)
        self.lin_dec2 = nn.Linear(500, 600)
        self.lin_dec3 = nn.Linear(600, feature_in)
        
        self.ae_reg = 0.0
        self.device = 0.0
        
    def forward(self, x):
        # encoder 
        x_ = torch.relu(self.lin_enc1(x))
        x_ = torch.relu(self.lin_enc2(x_))
        mu = torch.tanh(self.lin_enc_mu(x_)) # mean
        std = torch.relu(self.lin_enc_std(x_)) + 1e-8 # standard deviation
        samples = torch.normal(torch.zeros(mu.size(0), self.feature_out), torch.ones(mu.size(0), self.feature_out)).to(self.device)
        self.u = mu + std * samples # reparametrization trick
        
        # regularizáció
        self.ae_reg = self.calculate_reg(mu, std)
        
        # decoder
        y_ = torch.relu(self.lin_dec1(self.u))
        y_ = torch.relu(self.lin_dec2(y_))
        return torch.sigmoid(self.lin_dec3(y_))
    
    def calculate_reg(self, mu, std):
        kl_div = 0.5 * (std*std + mu*mu - 1.0 - torch.log(std*std))
        return kl_div.sum()/mu.size(0) # devide by the batch_size
    
    def generate_image(self):
        x_random = torch.normal(torch.zeros(self.feature_out), torch.ones(self.feature_out)).to(self.device)
        y_ = torch.relu(self.lin_dec1(x_random))
        y_ = torch.relu(self.lin_dec2(y_))
        return torch.sigmoid(self.lin_dec3(y_))

A reparametrization trick-hez minden mintához új elemet generálunk a standard normális eloszlásból. Nem elég az egész batch-re egy értéket generálni, mert akkor a tanulás sebesége nagyon lassú lesz.

### VAE tanítása

In [None]:
device = torch.device("cuda:0")
mae = MNISTautoencoderVAE(28*28, 400).to(device)
mae.device = device
criterion = nn.BCELoss()
optimizer = optim.Adam(mae.parameters(), lr=1e-3)
running_loss_reg = 0.0
running_loss_rec = 0.0
cntr = 0

for epoch in range(60):
    for i, batch in enumerate(trainloader, 1):
        
        optimizer.zero_grad()
        
        x = batch[0]
        x = x.to(device)
        
        y = mae(x)
        loss_reg = 5e-4*mae.ae_reg
        loss_rec = criterion(y, x)
        loss = loss_reg + loss_rec
        loss.backward()
        optimizer.step()
        
        running_loss_reg += loss_reg.item()
        running_loss_rec += loss_rec.item()
        cntr += 1
    if (epoch+1) % 10 == 0:
        print('[%3d, %3d] loss_reg: %.3f  loss_rec: %.3f' %
            (epoch + 1, i, running_loss_reg / cntr, running_loss_rec / cntr))
        running_loss_reg = 0.0
        running_loss_rec = 0.0
        cntr = 0

In [None]:
# súlyok mentése
weights = list(map(lambda x: x.cpu().detach().numpy(), mae.parameters()))
utils.save_parameters(weights, "weights/Autoencoder.json")

In [None]:
# visszatöltés (ne kelljen újra tanítani a teszteléshez később)
weights = utils.load_parameters("weights/Autoencoder.json")
mae = MNISTautoencoderCAE(28*28, 400)
pms = list(map(torch.from_numpy, weights))

for i, p in enumerate(mae.parameters()):
    p.data = pms[i]

### Bemenet és kimenet vizuális ellenőrzése

In [None]:
x_test = utils.create_binary_image([data.X_test[12]])
y = mae(torch.tensor(x_test[0]).view(1, -1).to(device))

In [None]:
plt.imshow(y.cpu().detach().numpy().reshape((28, 28)), cmap='gray')

In [None]:
plt.imshow(x_test[0].reshape((28, 28)), cmap='gray')

In [None]:
img = mae.generate_image()
plt.imshow(img.cpu().detach().numpy().reshape((28, 28)), cmap='gray')