In [0]:
# Block1
import chainer
chainer.print_runtime_info()
print('GPU availability:', chainer.cuda.available)
print('cuDNN availablility:', chainer.cuda.cudnn_enabled)

In [0]:
# Block2
from google.colab import drive
drive.mount('/content/drive/')

In [0]:
# Block3
# Network model_________________________________________________________________

import math
import numpy as np

import chainer
import chainer.distributions as D
import chainer.functions as F
import chainer.links as L
from chainer import reporter
    
# For stabilization and increasing training speed
# But, in this program we don't use these functions
def feature_vector_normalization(x, eps=1e-8):
    # x: (B, C, H, W)
    alpha = 1.0 / F.sqrt(F.mean(x*x, axis=1, keepdims=True) + eps)
    return F.broadcast_to(alpha, x.data.shape) * x

def minibatch_std(x):
    m = F.mean(x, axis=0, keepdims=True)
    v = F.mean((x - F.broadcast_to(m, x.shape))*(x - F.broadcast_to(m, x.shape)), axis=0, keepdims=True)
    sd = F.mean(F.sqrt(v + 1e-8), keepdims=True)
    std = F.broadcast_to(std, (x.shape[0], 1, x.shape[2], x.shape[3]))
    return F.concat([x, std], axis=1)
    
class EqualizedConv2d(chainer.Chain):
    def __init__(self, in_ch, out_ch, ksize, stride, pad):
        w = chainer.initializers.Normal(1.0) # equalized learning rate
        self.inv_c = np.sqrt(2.0/(in_ch*ksize**2))
        super(EqualizedConv2d, self).__init__()
        with self.init_scope():
            self.c = L.Convolution2D(in_ch, out_ch, ksize, stride, pad, initialW=w)
    def __call__(self, x):
        return self.c(self.inv_c * x)
    
class EqualizedLinear(chainer.Chain):
    def __init__(self, in_ch, out_ch):
        w = chainer.initializers.Normal(1.0) # equalized learning rate
        self.inv_c = np.sqrt(2.0/in_ch)
        super(EqualizedLinear, self).__init__()
        with self.init_scope():
            self.c = L.Linear(in_ch, out_ch, initialW=w)
    def __call__(self, x):
        return self.c(self.inv_c * x)
    

# VAE___________________________________________________________________________
class AvgELBOLoss(chainer.Chain):
    """gLoss function of VAE.
    The loss value is equal to ELBO (Evidence Lower Bound)
    multiplied by -1.
    Args:
        encoder (chainer.Chain): A neural network which outputs variational
            posterior distribution q(z|x) of a latent variable z given
            an observed variable x.
        decoder (chainer.Chain): A neural network which outputs conditional
            distribution p(x|z) of the observed variable x given
            the latent variable z.
        prior (chainer.Chain): A prior distribution over the latent variable z.
        beta (float): Usually this is 1.0. Can be changed to control the
            second term of ELBO bound, which works as regularization.
        k (int): Number of Monte Carlo samples used in encoded vector.
    """

    def __init__(self, encoder, decoder, prior, batch_size, beta=1.0, k=1):
        super(AvgELBOLoss, self).__init__()
        self.beta = beta
        self.k = k
        self.batch_size = batch_size

        with self.init_scope():
            self.encoder = encoder
            self.decoder = decoder
            self.prior = prior

    def __call__(self, x1, x2):
        q_z = self.encoder(x1)
        z = q_z.sample(self.k)
        p_x = self.decoder(z)
        p_z = self.prior()
        p_x = p_x * 255

        reconstr = 0
        for i in range(self.k):
            reconstr += F.mean_squared_error(x2, p_x) / self.k
            
        kl_penalty = F.mean(F.sum(chainer.kl_divergence(q_z, p_z), axis=-1))
        loss = reconstr + self.beta * kl_penalty
        reporter.report({'loss': loss}, self)
        reporter.report({'reconstr': reconstr}, self)
        reporter.report({'kl_penalty': kl_penalty}, self)
        return loss


class Encoder(chainer.Chain):

    def __init__(self, n_latent, ch=512, bottom_width=4, wscale=0.02):
        super(Encoder, self).__init__()
        with self.init_scope():
            w = chainer.initializers.Normal(wscale)
            self.c0_0 = L.Convolution2D(3, ch // 8, 3, 1, 1, initialW=w)
            self.c0_1 = L.Convolution2D(ch // 8, ch // 4, 4, 2, 1, initialW=w)
            self.c1_0 = L.Convolution2D(ch // 4, ch // 4, 3, 1, 1, initialW=w)
            self.c1_1 = L.Convolution2D(ch // 4, ch // 2, 4, 2, 1, initialW=w)
            self.c2_0 = L.Convolution2D(ch // 2, ch // 2, 3, 1, 1, initialW=w)
            self.c2_1 = L.Convolution2D(ch // 2, ch // 1, 4, 2, 1, initialW=w)
            self.c3_0 = L.Convolution2D(ch // 1, ch // 1, 3, 1, 1, initialW=w)
            self.mu = L.Linear(
                 bottom_width * bottom_width * ch, n_latent, initialW=w)
            self.ln_sigma = L.Linear(
                 bottom_width * bottom_width * ch, n_latent, initialW=w)
            self.bn0_1 = L.BatchNormalization(ch // 4, use_gamma=False)
            self.bn1_0 = L.BatchNormalization(ch // 4, use_gamma=False)
            self.bn1_1 = L.BatchNormalization(ch // 2, use_gamma=False)
            self.bn2_0 = L.BatchNormalization(ch // 2, use_gamma=False)
            self.bn2_1 = L.BatchNormalization(ch // 1, use_gamma=False)
            self.bn3_0 = L.BatchNormalization(ch // 1, use_gamma=False)

    def forward(self, x):
        h = F.leaky_relu(self.c0_0(x))
        h = F.leaky_relu(self.bn0_1(self.c0_1(h)))
        h = F.leaky_relu(self.bn1_0(self.c1_0(h)))
        h = F.leaky_relu(self.bn1_1(self.c1_1(h)))
        h = F.leaky_relu(self.bn2_0(self.c2_0(h)))
        h = F.leaky_relu(self.bn2_1(self.c2_1(h)))
        h = F.leaky_relu(self.bn3_0(self.c3_0(h)))
        mu = self.mu(h)
        ln_sigma = self.ln_sigma(h)  # log(sigma)
        return D.Normal(loc=mu, log_scale=ln_sigma)


class Decoder(chainer.Chain):

    def __init__(self, n_latent, bottom_width=4, ch=512, wscale=0.02):
        super(Decoder, self).__init__()
        self.ch = ch
        self.bottom_width = bottom_width
        with self.init_scope():
            w = chainer.initializers.Normal(wscale)
            self.l0 = L.Linear(
                n_latent, bottom_width * bottom_width * ch, initialW=w)
            self.c1 = L.Convolution2D(ch, ch // 2, 3, 1, 1, initialW=w)
            self.c2 = L.Convolution2D(ch // 2, ch // 4, 3, 1, 1, initialW=w)
            self.c3 = L.Convolution2D(ch // 4, ch // 8, 3, 1, 1, initialW=w)
            self.c4 = L.Convolution2D(ch // 8, 3, 3, 1, 1, initialW=w)
            self.bn0 = L.BatchNormalization(bottom_width * bottom_width * ch)
            self.bn1 = L.BatchNormalization(ch // 2)
            self.bn2 = L.BatchNormalization(ch // 4)
            self.bn3 = L.BatchNormalization(ch // 8)

    def forward(self, z, inference=False):
        h = F.reshape(F.tanh(self.bn0(self.l0(z[0]))), 
                      (len(z[0]), self.ch, self.bottom_width, self.bottom_width))
        h = F.leaky_relu(self.bn1(self.c1(h)))
        h = F.unpooling_2d(h, 2, 2, 0, outsize=(8, 8))
        h = F.leaky_relu(self.bn2(self.c2(h)))
        h = F.unpooling_2d(h, 2, 2, 0, outsize=(16, 16))
        h = F.leaky_relu(self.bn3(self.c3(h)))
        h = F.unpooling_2d(h, 2, 2, 0, outsize=(32 ,32))
        x = F.sigmoid(self.c4(h))
        return x


class Prior(chainer.Link):

    def __init__(self, n_latent):
        super(Prior, self).__init__()

        self.loc = np.zeros(n_latent, np.float32)
        self.scale = np.ones(n_latent, np.float32)
        self.register_persistent('loc')
        self.register_persistent('scale')

    def forward(self):
        return D.Normal(self.loc, scale=self.scale)


def make_encoder(n_latent):
    return Encoder(n_latent)


def make_decoder(n_latent):
    return Decoder(n_latent)


def make_prior(n_latent):
    return Prior(n_latent)

In [0]:
# Block4
# train_________________________________________________________________________
"""Chainer example: train a VAE on MNIST
"""
import argparse
import os
import glob
from PIL import Image

import numpy as np

import chainer
from chainer import training
from chainer.training import extensions
from chainer import serializers

def main():
    """
    parser = argparse.ArgumentParser(description='VAE_cnn for FDC')
    parser.add_argument('--initmodel', '-m', default='',
                        help='Initialize the model from given file')
    parser.add_argument('--resume', '-r', default='',
                        help='Resume the optimization from snapshot')
    parser.add_argument('--gpu', '-g', default=0, type=int,
                        help='GPU ID (negative value indicates CPU)')
    parser.add_argument('--out', '-o', default='results',
                        help='Directory to output the result')
    parser.add_argument('--epoch', '-e', default=100, type=int,
                        help='number of epochs to learn')
    parser.add_argument('--dim-z', '-z', default=20, type=int,
                        help='dimention of encoded vector')
    parser.add_argument('--dim-h', default=500, type=int,
                        help='dimention of hidden layer')
    parser.add_argument('--beta', default=1.0, type=float,
                        help='Regularization coefficient for '
                             'the second term of ELBO bound')
    parser.add_argument('--k', '-k', default=1, type=int,
                        help='Number of Monte Carlo samples used in '
                             'encoded vector')
    parser.add_argument('--binary', action='store_true',
                        help='Use binarized MNIST')
    parser.add_argument('--batch-size', '-b', type=int, default=100,
                        help='learning minibatch size')
    parser.add_argument('--test', action='store_true',
                        help='Use tiny datasets for quick tests')
    args = parser.parse_args()
    """
    
    dim_z = 50
    batch_size = 1000
    epoch = 500
    beta = 1.0
    k = 1
    # Please change variable "out"
    out = ""
    snapshot_interval = 100

    print('GPU: 0')
    print('# dim z: {}'.format(dim_z))
    print('# Minibatch-size: {}'.format(batch_size))
    print('# epoch: {}'.format(epoch))
    print('')
    
    # Prepare VAE model
    encoder = make_encoder(dim_z)
    decoder = make_decoder(dim_z)
    prior = make_prior(dim_z)
    avg_elbo_loss = AvgELBOLoss(encoder, decoder, prior, batch_size, beta=beta, k=k)
    
    avg_elbo_loss.to_gpu(0)
        
    # Setup an optimizer
    optimizer = chainer.optimizers.Adam()
    optimizer.setup(avg_elbo_loss)
    
    # Load CIFAR-10 dataset
    train, test = chainer.datasets.get_cifar10(withlabel=False, scale=255.)
    
    # If you use CIFAR-10, pleae remove "shuffle=False" in train_iter.
    train_iter = chainer.iterators.SerialIterator(
        train, batch_size, shuffle=False)
    valid_iter = chainer.iterators.SerialIterator(
        valid, len(valid), repeat=False, shuffle=False)
    test_iter = chainer.iterators.SerialIterator(
        test, len(test), repeat=False, shuffle=False)
    
    # Set up an updater. StandardUpdater can explicitly specify a loss function
    # used in the training with 'loss_func' option
    updater = training.updaters.StandardUpdater(
        train_iter, optimizer, device=0, loss_func=avg_elbo_loss)
    
    trainer = training.Trainer(updater, (epoch, "epoch"), out=out)
    snapshot_interval = (snapshot_interval, "iteration")
    trainer.extend(extensions.Evaluator(valid_iter, avg_elbo_loss, device=0))
    trainer.extend(extensions.PlotReport(["main/reconstr", "validation/main/reconstr"], x_key="epoch", file_name="loss.png"))
    trainer.extend(extensions.snapshot_object(
        encoder, "encoder_{.updater.iteration}.npz"), trigger=snapshot_interval)
    trainer.extend(extensions.snapshot_object(
        decoder, "decoder_{.updater.iteration}.npz"), trigger=snapshot_interval)
    trainer.extend(extensions.LogReport())
    trainer.extend(extensions.PrintReport(['epoch', 'main/loss', 'validation/main/loss',
                                           'main/reconstr', 'main/kl_penalty', 'elapsed_time']))
    trainer.extend(extensions.ProgressBar())
        
    # Run the training
    trainer.run()
    
    
if __name__ == '__main__':
    main()