# 3. 深層生成モデルの実装例：VAE

In [None]:
from __future__ import print_function
import torch
import torch.utils.data
from torch import nn, optim
from torch.nn import functional as F
from torchvision import datasets, transforms
from torchvision.utils import save_image

from Tars.distributions import NormalModel, BernoulliModel
from Tars.utils import get_dict_values

ここでは，2で説明した確率分布クラスの記述方法を使って， 深層生成モデルの一つであるVAEを実装してみましょう．

VAEは， 次の生成モデルを考えます．

\begin{equation*}
p_{\theta}(x,z)=p_{\theta}(x|z)p(z)
\end{equation*}

ただし，xは観測変数，zは潜在変数，$\theta$はパラメータです．

VAEでは，$p_{\theta}(x|z)$は深層ニューラルネットワークで定義されるので，$\theta$はニューラルネットワークのパラメータになります．

VAEの目標は，周辺尤度$p(x)=\int p_\theta(x|z)p(z)dz$を最大化するように生成モデルを学習することです． ただし，これは直接最大化することができません．

そこで，代わりに次のような変分下界$\mathcal{L}(x)$を最大化することで，モデルを学習します．

\begin{equation*}
\log p(x) \geq E_{q_\phi(z|x)}[\log \frac{p_\theta(x,z)}{q_\phi(z|x)}] = \mathcal{L}(x)
\end{equation*}

ただし，$q_\phi(z|x)$は事後分布$p(z|x)$の近似分布で，$\phi$はそのパラメータです．VAEではこの分布も深層ニューラルネットワークでモデル化します（amortized inference）．

それぞれの分布について，$q_\phi(ｚ|x)$は$x$を$z$にエンコードするので**エンコーダ**，$p_\theta(x|z)$は$z$から$x$にデコードするので**デコーダ**と呼ばれます． このことから，このモデルは**variational autoencoders（VAE）**と呼ばれます．

ではこれに従って，Tarsの分布クラスで，各分布を設定しましょう．

In [None]:
x_dim = 784
z_dim = 64

# inference model q(z|x)
class Inference(NormalModel):
    def __init__(self):
        super(Inference, self).__init__(cond_var=["x"], var=["z"])

        self.fc1 = nn.Linear(x_dim, 400)
        self.fc21 = nn.Linear(400, z_dim)
        self.fc22 = nn.Linear(400, z_dim)

    def forward(self, x):
        h1 = F.relu(self.fc1(x))
        return self.fc21(h1), F.softplus(self.fc22(h1))
    
q = Inference()
    
# generative model p(x|z)    
class Generator(BernoulliModel):
    def __init__(self):
        super(Generator, self).__init__(cond_var=["z"], var=["x"])

        self.fc3 = nn.Linear(z_dim, 400)
        self.fc4 = nn.Linear(400, x_dim)

    def forward(self, x):
        h3 = F.relu(self.fc3(x))
        return F.sigmoid(self.fc4(h3))
    
_p = Generator()
    
# prior model p(z)
loc = torch.tensor(0.).cuda()
scale = torch.tensor(1.).cuda()
prior = NormalModel(loc=loc, scale=scale, var=["z"], dim=z_dim)

ここでは，エンコーダにガウス分布，デコーダにベルヌーイ分布を用いています.

次に，生成モデルの同時分布を設定します．

In [None]:
p = _p * prior

print(p.prob_text)
print(p.prob_factorized_text)

今回は，GPU計算するので，各分布をGPUに載せます．

In [None]:
device = "cuda"

p.to(device)
q.to(device)

次に，目的関数を設定します． VAEの目的関数は，変分下界でした． 

変分下界の計算は，前回の確率分布クラスの特性を使うことで，以下のように非常にシンプルに書けます．

In [None]:
def elbo(x):
    #1. sample from q(z|x) 
    samples = q.sample(x)
    
    #2. caluculate the lower bound (log p(x,z) - log q(z|x))
    lower_bound = p.log_likelihood(samples) - q.log_likelihood(samples)

    loss = -torch.mean(lower_bound)

    return loss

また，パラメータや最適化手法を設定します．

In [None]:
q_params = list(q.parameters())
p_params = list(p.parameters())
params = q_params + p_params

optimizer = optim.Adam(params, lr=1e-3)

学習用のデータを読み込みます．今回はMNISTを使います．

In [None]:
batch_size = 128

kwargs = {'num_workers': 1, 'pin_memory': True}
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                   transform=transforms.ToTensor()),
    batch_size=batch_size, shuffle=True, **kwargs)
test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, transform=transforms.ToTensor()),
    batch_size=batch_size, shuffle=True, **kwargs)

最後に，trainとtestの関数を設定します．

In [None]:
log_interval = 10

def train(epoch):
    p.train()
    q.train()    
    
    train_loss = 0
    for batch_idx, (data, _) in enumerate(train_loader):
        data = data.to(device)
        optimizer.zero_grad()
        
        # calculate the ELBO
        loss = elbo({"x": data.view(-1, 784)})
        
        # backprop
        loss.backward()

        # update params
        optimizer.step()        
        
        train_loss += loss
        
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader),
                loss.item()))

    print('====> Epoch: {} Average loss: {:.4f}'.format(
          epoch, train_loss / len(train_loader.dataset)))

In [None]:
def test(epoch):
    p.eval()
    q.eval()    
    
    test_loss = 0
    for i, (data, _) in enumerate(test_loader):
        data = data.cuda()
        with torch.no_grad():
            loss = elbo({"x": data.view(-1, 784)})        
        test_loss += loss
        
        z = q.sample({"x": data.view(-1, 784)})
        z = get_dict_values(z, _p.cond_var, return_dict=True)
        recon_batch = _p.sample_mean(z)
        if i == 0:
            n = min(data.size(0), 8)
            comparison = torch.cat([data[:n],
                                  recon_batch.view(batch_size, 1, 28, 28)[:n]])
            save_image(comparison.cpu(),
                     'results/reconstruction_' + str(epoch) + '.png', nrow=n)

    test_loss /= len(test_loader.dataset)
    print('====> Test set loss: {:.4f}'.format(test_loss))

In [None]:
epochs = 10

for epoch in range(1, epochs + 1):
    train(epoch)
    test(epoch)
    with torch.no_grad():
        sample_z = 0.3 * torch.randn(64, z_dim).to(device)
        sample = _p.sample_mean({"z":sample_z}).cpu()
        save_image(sample.view(64, 1, 28, 28),
                   'results/sample_' + str(epoch) + '.png')

なお， ここでは下界を実装しましたが，TarsではTars.models以下に様々な学習モデルが用意されています．

変分推論の場合は，次のように，Tars.models.VIを使うことで学習できます．

In [None]:
from Tars.models import VI
model = VI(p, q, optim.Adam, {"lr":1e-3})

In [None]:
def train(epoch):
    train_loss = 0
    for batch_idx, (data, _) in enumerate(train_loader):
        data = data.to(device)
        lower_bound, loss = model.train({"x": data.view(-1, 784)})
        train_loss += loss
        if batch_idx % log_interval == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader),
                loss.item()))

    print('====> Epoch: {} Average loss: {:.4f}'.format(
          epoch, train_loss / len(train_loader.dataset)))

In [None]:
def test(epoch):
    test_loss = 0
    for i, (data, _) in enumerate(test_loader):
        data = data.to(device)
        lower_bound, loss = model.test({"x": data.view(-1, 784)})
        test_loss += loss
        z = q.sample({"x": data.view(-1, 784)})
        z = get_dict_values(z, _p.cond_var, return_dict=True)
        recon_batch = _p.sample_mean(z)
        if i == 0:
            n = min(data.size(0), 8)
            comparison = torch.cat([data[:n],
                                  recon_batch.view(batch_size, 1, 28, 28)[:n]])
            save_image(comparison.cpu(),
                     'results/reconstruction_' + str(epoch) + '.png', nrow=n)

    test_loss /= len(test_loader.dataset)
    print('====> Test set loss: {:.4f}'.format(test_loss))

In [None]:
for epoch in range(1, epochs + 1):
    train(epoch)
    test(epoch)
    with torch.no_grad():
        sample_z = 0.3 * torch.randn(64, z_dim).to(device)
        sample = _p.sample_mean({"z":sample_z}).cpu()
        save_image(sample.view(64, 1, 28, 28),
                   'results/sample_' + str(epoch) + '.png')