In [0]:
import numpy as np
from os.path import exists
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
cuda_output = !ldconfig -p|grep cudart.so|sed -e 's/.*\.\([0-9]*\)\.\([0-9]*\)$/cu\1\2/'
accelerator = cuda_output[0] if exists('/dev/nvidia0') else 'cpu'
!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision
  
import torch
import torch.nn as nn
import torch.optim
import torch.nn.init
import torchvision.datasets as dataset
import torchvision.transforms as transforms
from torch.autograd import Variable
from torch import FloatTensor
import matplotlib.pylab as plt
import pandas as pd
import math
%matplotlib inline

In [0]:
plt.rcParams["figure.figsize"] = (20,3)
plt.rcParams["axes.grid"] = False
plt.axis('off')

In [0]:
root = './data'

train_set = dataset.MNIST(root=root, train=True, transform=transforms.ToTensor(), download=True)
test_set = dataset.MNIST(root=root, train=False, transform=transforms.ToTensor(), download=True)

train_loader = torch.utils.data.DataLoader(
                 dataset=train_set,
                 batch_size=32,
                 shuffle=True)
test_loader = torch.utils.data.DataLoader(
                dataset=test_set,
                batch_size=32,
                shuffle=False)

In [0]:
def add_noise(img):
    noise = torch.randn(img.size()) * 0.5
    noisy_img = img + noise.cuda()
    return noisy_img
    
def compute_one(model, n, noise=False):
    x = train_set[n][0]
    if noise:
      x = add_noise(x.cuda())
      
    x = x.view(1, 1, 28, 28).cpu().cuda()
    y = model(x)
    
    if type(y) == tuple:
      y = y[0]

    f,ax = plt.subplots(1, 2)

    ax[0].imshow(x[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
    ax[1].imshow(y.cpu().data.numpy()[0,0], interpolation="nearest")
    plt.show()
    
def compute_random(model, n, noise=False):
    for i in np.random.randint(0, len(train_set), n):
        compute_one(model, i, noise)

# Обычный автоэнкодер

In [0]:
class Autoencoder(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 26, 5, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
            
            nn.Conv2d(26, 36, 5, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        
            nn.Conv2d(36, 64, 3, stride=1),
            nn.ReLU(),
            nn.MaxPool2d(2),
        )
        
        self.in_linear = nn.Linear(64, 4)
        self.out_linear = nn.Linear(4, 64)
        
        self.decoder = nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 36, 3, 1),
            
            nn.Upsample(scale_factor=2),
            nn.ReLU(),
            nn.ConvTranspose2d(36, 26, 5, 1),
            
            nn.Upsample(scale_factor=2),
            nn.ReLU(),
            nn.ConvTranspose2d(26, 1, 5, 1),
        )
        
        self.initialize()
        
    def initialize(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                m.weight.data = nn.init.xavier_uniform_(m.weight)
                
    def forward(self, x):
        x = self.encoder(x)
        x = x.view(-1, 64)
        x = self.in_linear(x)
        
        self.embeding = x
        
        x = self.out_linear(x)
        x = x.view(-1, 64, 1, 1)
        x = self.decoder(x)
        
        return x
        

In [0]:
net = Autoencoder()
net.cuda()
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=0.001)

loss_acc = []

In [0]:
for ep in range(3):
    for batch_idx, (data, target) in enumerate(train_loader):
        data = data.cuda()
        optimizer.zero_grad()
        y = net(data)

        loss = criterion(y, data)
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.data[0])
        
    print('Epoch: ', ep, ', loss: ', float(loss.data[0]), sep='')

In [0]:
compute_random(net, 5)

In [0]:
def compute_intermediate_states(model, n1, n2, states):
    x1 = train_set[n1][0].cuda()
    x1 = x1.view(1, 1, 28, 28).cpu().cuda()
    y1 = model(x1)

    emb = [model.embeding]

    x2 = train_set[n2][0].cpu().cuda()
    x2 = x2.view(1, 1, 28, 28)
    y2 = model(x2)

    emb.append(model.embeding)
    
    delta = emb[0] - emb[1]
    
    f,ax = plt.subplots(1, states+2)
    
    ax[0].imshow(x2[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
    
    for i in range(states):
        state = (delta/states)*(i+1) + emb[1]
        x = model.out_linear(state)
        x = x.view(-1, 64, 1, 1).cpu().cuda()
        x = model.decoder(x)
        ax[i+1].imshow(x[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
        ax[i+1].set_xticks([])
        ax[i+1].set_yticks([])
        
    ax[-1].imshow(x1[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
    plt.show()

In [0]:
compute_intermediate_states(net, 10, 20, 20)
compute_intermediate_states(net, 30, 40, 20)
compute_intermediate_states(net, 50, 60, 20)
compute_intermediate_states(net, 70, 80, 20)
compute_intermediate_states(net, 90, 100, 20)
compute_intermediate_states(net, 110, 120, 20)
compute_intermediate_states(net, 130, 140, 20)
compute_intermediate_states(net, 150, 160, 20)

# Автоэнкодер, убирающий шум

In [0]:
noisy_net = Autoencoder().cuda()
criterion = nn.MSELoss().cuda()
optimizer = torch.optim.Adam(noisy_net.parameters(), lr=0.001)

loss_acc = []

In [0]:
for ep in range(5):
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        y = noisy_net(add_noise(data.cuda()))
        loss = criterion(y, data.cuda())
        loss.backward()
        optimizer.step()

        loss_acc.append(loss.data[0])
        
    print('Epoch: ', ep, ', loss: ', float(loss.data[0]), sep='')

In [0]:
compute_one(noisy_net, 1)
compute_one(noisy_net, 1, noise=True)

In [0]:
compute_random(noisy_net, 10, noise=True)

# Вариационный автоэнкодер

In [0]:
class ConvVAE(nn.Module):
    
    def __init__(self):
        super(ConvVAE, self).__init__()
        
        self.encoder = nn.Sequential( # 1x28x28 ->
            nn.Conv2d(1, 8, 3), # 8x26x26
            nn.ReLU(),
            
            nn.Conv2d(8, 16, 3), # 16x24x24
            nn.ReLU(),
            nn.MaxPool2d(2), # 16x12x12
            
            nn.Conv2d(16, 32, 3), # 32x10x10
            nn.ReLU(),
            nn.MaxPool2d(2), #32x5x5
            
            nn.Conv2d(32, 64, 3), # 32x3x3 = 288
            nn.ReLU(),
            nn.MaxPool2d(3), # 64x1x1
            
        )
        
        self.mu = nn.Linear(64, 2)
        self.logvar = nn.Linear(64, 2)
        
        
        self.decoder = nn.Sequential(
            nn.Conv2d(2, 64, 1, 1),
            nn.Upsample(scale_factor=3),

            nn.ConvTranspose2d(64, 32, 3),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            
            nn.ConvTranspose2d(32, 16, 3),
            nn.ReLU(),
            nn.Upsample(scale_factor=2),
            
            nn.ConvTranspose2d(16, 8, 3),
            nn.ReLU(),
            
            nn.ConvTranspose2d(8, 1, 3),
            nn.ReLU()
            
        )
        
        for m in self.modules():
            if isinstance(m, nn.Conv2d) or isinstance(m, nn.ConvTranspose2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data = nn.init.xavier_uniform_(m.weight)
            
    def reparameterize(self, mu, logvar):
        if self.training:
            std = logvar.mul(0.5).exp_()
            eps = Variable(std.data.new(std.size()).normal_())
            return eps.mul(std).add_(mu)
        else:
            return mu

    
    def encode(self, x):
        x = self.encoder(x)
        x = x.view(-1, 64)
        return self.mu(x), self.logvar(x)
    
    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        self.z = z.view(-1, 2, 1, 1)
        self.embeding = z.view(-1, 2, 1, 1)
        x = self.decoder(self.z)
        return x, mu, logvar

In [0]:
def loss_function(recon_x, x, mu, logvar, batch_size):
    BCE = mse_loss(recon_x, x)

    # see Appendix B from VAE paper:
    # Kingma and Welling. Auto-Encoding Variational Bayes. ICLR, 2014
    # https://arxiv.org/abs/1312.6114
    # 0.5 * sum(1 + log(sigma^2) - mu^2 - sigma^2)
    KLD = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    # Normalise by same number of elements as in reconstruction
    KLD /= batch_size * 784.

    return BCE + KLD

In [0]:
vae_net = ConvVAE().cuda()
mse_loss = nn.MSELoss().cuda()

optimizer = torch.optim.Adam(vae_net.parameters(), lr=0.001)
loss_acc = []

In [0]:
for ep in range(20):
    for batch_idx, (data, target) in enumerate(train_loader):
        optimizer.zero_grad()
        y, mu, logvar = vae_net(data.cuda())
        loss = loss_function( y, data.cuda(), mu, logvar, data.size()[0] )
        loss.backward()
        optimizer.step()
        loss_acc.append(loss.data[0])
    
    print('Epoch: ', ep, ', loss: ', float(loss.data[0]), sep='')

In [0]:
compute_random(vae_net, 5)

In [0]:
def compute_intermediate_states_vae(model, n1, n2, states):
    x1 = train_set[n1][0].cuda()
    x1 = x1.view(1, 1, 28, 28).cpu().cuda()
    y1 = model(x1)

    emb = [model.embeding]

    x2 = train_set[n2][0].cpu().cuda()
    x2 = x2.view(1, 1, 28, 28)
    y2 = model(x2)

    emb.append(model.embeding)
    
    delta = emb[0] - emb[1]
    
    f,ax = plt.subplots(1, states+2)
    
    ax[0].imshow(x2[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
    
    for i in range(states):
        state = (delta/states)*(i+1) + emb[1]
        x = model.decoder(state)
        ax[i+1].imshow(x[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
        ax[i+1].set_xticks([])
        ax[i+1].set_yticks([])
        
    ax[-1].imshow(x1[0].cpu().data.numpy().reshape((28,28)), interpolation="nearest")
    plt.show()

In [0]:
compute_intermediate_states_vae(vae_net, 10, 20, 15)