In [15]:
import os

import numpy as np
import torch
import pyro
from pyro.contrib.examples.util import MNIST
import torch.nn as nn
import torchvision.transforms as transforms

import pyro.distributions as dist
import pyro.contrib.examples.util # patches torchvision
from pyro.infer import SVI, Trace_ELBO
from pyro.optim import Adam

# Set the random seed for reproducibility


In [16]:
def setup_data_loaders(batch_size=128):
    root = '.\data'
    trans = transforms.ToTensor()
    train_set =MNIST(root = root, train = True, transform = trans, download = True)
    test_set = MNIST(root = root, train = False, transform = trans)

    kwargs = {'num_workers': 1 }
    train_loader = torch.utils.data.DataLoader(dataset = train_set, batch_size= batch_size,
                                               shuffle=True, **kwargs)
    test_loader = torch.utils.data.DataLoader(dataset = test_set, batch_size=batch_size,
                                               shuffle=False, **kwargs)
    return train_loader, test_loader



In [17]:
#DECODER , 먼저 인코더와 디코더를 어떻게 제공할건지 생각 필요
#결국 variational autoencoder는 인코더와 디코더로 구성되어 있고, latent variable에 변분방법이 사용될 뿐
class Decoder(nn.Module):
    def __init__(self, z_dim, hidden_dim):
        super().__init__()
        #setup the two linear transformations
        self.fc1 = nn.Linear(z_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, 784)

        #setup the non-linearity
        self.softplus = nn.Softplus()
        self.sigmoid = nn.Sigmoid()

    def forward(self, z):
        #define the forward computation on the latent z
        #first compute the hidden units
        hidden = self.softplus(self.fc1(z))
        #return the parameter for the output Bernoulli
        #each is of size batch_size *784
        loc_img = self.sigmoid(self.fc2(hidden))
        #return the output
        return loc_img

In [18]:
#encoder의 정의
class Encoder(nn.Module):
    def __init__(self,z_dim, hidden_dim):
        super().__init__()
        #setup the two linear transformations
        self.fc1 = nn.Linear(784, hidden_dim)
        self.fc21 = nn.Linear(hidden_dim, z_dim)
        self.fc22 = nn.Linear(hidden_dim, z_dim)
        #setup the non-linearity
        self.softplus = nn.Softplus()

    def forward(self, x):
        #define the forward computation on the input x
        #first shape the minibatch to have pixels in the rightmost dim
        x = x.view(-1, 784)
        #then compute the hidden units
        hidden = self.softplus(self.fc1(x))
        #then return a mean vector and a square root covariance matrix
        #each of size batch_size * z_dim
        z_loc = self.fc21(hidden)
        z_scale = torch.exp(self.fc22(hidden))
        return z_loc, z_scale

In [19]:
#Model class
def model(self,x):
    #register PyTorch module 'decoder' with Pyro (prior 정의)
    pyro.module("decoder", self.decoder)
    with pyro.plate("data", x.shape[0]):
        #setup hyperparameters ofr prior p(z) , 정규분포포
        z_loc = x.new_zeros(torch.Size((x.shape[0], self.z_dim)))
        z_scale = x.new_ones(torch.Size((x.shape[0], self.z_dim)))

        #sample from prior p(z)
        #value will be sampled by guide when computing the ELBO
        z = pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
        #sample from decoder p(x|z)
        loc_img = self.decoder(z)
        #sample from the output Bernoulli, 베르누이 분포로 분류 (liklihoood)
        pyro.sample("obs", dist.Bernoulli(loc_img).to_event(1), obs=x.view(-1, 784))

In [20]:
#define the guide ( i.2. variational distribution q(z|x) )
def guide(self, x):
    #register PyTorch module 'encoder' with Pyro
    pyro.module("encoder", self.encoder)
    with pyro.plate("data", x.shape[0]):
        #compute the parameters of the variational distribution q(z|x)
        z_loc, z_scale = self.encoder(x)
        #sample from the variational distribution q(z|x)
        #value will be used by model when computing the ELBO
        z = pyro.sample("latent", dist.Normal(z_loc, z_scale).to_event(1))
        return z

In [21]:
def train(svi, train_loader):
    #initialize loss accumulator
    epoch_loss = 0.

    #do a training epoch over each mini-batch x returned
    #by the data loader
    for x, _ in train_loader:
        #do ELBO gradient and accumulate loss
        epoch_loss += svi.step(x)

    #return epoch loss
    normalizer_train = len(train_loader.dataset)
    total_epoch_loss_train = epoch_loss / normalizer_train
    return total_epoch_loss_train

In [22]:
def evaluate(svi, test_loader):
    #initalize loss accumulato
    epoch_loss = 0.

    #do a training epoch over each mini-batch x returned
    #by the data loader
    for x, _ in test_loader:
        #do ELBO gradient and accumulate loss
        epoch_loss += svi.evaluate_loss(x)

    #return epoch loss
    normalizer_train = len(test_loader.dataset)
    total_epoch_loss_train = epoch_loss / normalizer_train
    return total_epoch_loss_train

In [23]:
train_elbo = []
test_elbo = []
#training loop
for epoch in range(NUM_EPOCHS):
    total_epoch_loss_train = train(svi, train_loader)
    train_elbo.append(-total_epoch_loss_train)

    if epoch % TEST_FREQUENCY == 0 :
        #report test diagnostics
        total_epoch_loss_test = evaluate(svi, test_loader)
        test_elbo.append(-total_epoch_loss_test)


NameError: name 'NUM_EPOCHS' is not defined