# Pixel to Pixel Generative Adversarial Networks

[Deep Convolutional Generative Adversarial Networks](https://arxiv.org/abs/1511.06434) is a model which is capable of learning resuable feature representations from large unlabeled image datasets. It helps bridge the gap between the success of CNNs for supervised learning and unsupervised learning. 

In this tutorial, we'll use DCGAN model to train on [LWF Face Dataset](http://vis-www.cs.umass.edu/lfw/), which contains about 13000 images of faces. We'll see that good image representation can be built by GAN and our model is capable of generating photo-realistic human face images.


In [30]:
from __future__ import print_function
import os
import matplotlib as mpl
import tarfile
import matplotlib.image as mpimg
from matplotlib import pyplot as plt

import mxnet as mx
from mxnet import gluon
from mxnet import ndarray as nd
from mxnet.gluon import nn, utils
from mxnet.gluon.nn import Dense, Activation, Conv2D, Conv2DTranspose, \
    BatchNorm, LeakyReLU, Flatten, HybridSequential, HybridBlock, Dropout
from mxnet import autograd
import numpy as np

## Set Training parameters

In [31]:
epochs = 100
batch_size = 32
latent_z_size = 100

use_gpu = True
ctx = mx.gpu() if use_gpu else mx.cpu()

lr = 0.0002
beta1 = 0.5

## Download and preprocess LWF Face Dataset

In [32]:
dataset = 'facades'

We first resize images to size 64*64. Then normalize image pixel values to be between -1 and 1.

In [33]:
img_wd = 256
img_ht = 256
train_img_path = '%s/train' % (dataset)
val_img_path = '%s/val' % (dataset)

def load_data(path, batch_size, is_reversed=False):
    img_in_list = []
    img_out_list = []
    for path, _, fnames in os.walk(path):
        for fname in fnames:
            if not fname.endswith('.jpg'):
                continue
            img = os.path.join(path, fname)
            img_arr = mx.image.imread(img)
            # Crop input and output images
            img_arr_in, img_arr_out = [mx.image.fixed_crop(img_arr, 0, 0, img_wd, img_ht),
                                       mx.image.fixed_crop(img_arr, img_wd, 0, img_wd, img_ht)]
            img_arr_in, img_arr_out = [nd.transpose(img_arr_in, (2,0,1)), 
                                       nd.transpose(img_arr_out, (2,0,1))]
            img_arr_in, img_arr_out = [img_arr_in.reshape((1,) + img_arr_in.shape), 
                                       img_arr_out.reshape((1,) + img_arr_out.shape)]
            img_in_list.append(img_arr_out if is_reversed else img_arr_in)
            img_out_list.append(img_arr_in if is_reversed else img_arr_out)

    return mx.io.NDArrayIter(data=[nd.concatenate(img_in_list), 
                                   nd.concatenate(img_out_list)], batch_size=batch_size)

train_data = load_data(train_img_path, batch_size)
val_data = load_data(val_img_path, batch_size)

Visualize 4 images:

In [34]:
def visualize(img_arr):
    plt.imshow(img_arr.asnumpy().transpose(1, 2, 0).astype(np.uint8))
    plt.axis('off')

img_in_list, img_out_list = train_data.next().data
for i in range(4):
    plt.subplot(2,4,i+1)
    visualize(img_in_list[i])
    plt.subplot(2,4,i+5)
    visualize(img_out_list[i])
plt.show()

## Defining the networks

The core to DCGAN architecture is adopting and modifying CNN architecture:
* Replace any pooling layers with strided convolutions (discriminator) and fractional-strided convolutions (generator).

* Use batchnorm in both the generator and the discriminator.

* Remove fully connected hidden layers for deeper architectures.

* Use ReLU activation in generator for all layers except for the output, which uses Tanh.

* Use LeakyReLU activation in the discriminator for all layers.

![alt text](img/dcgan.png "DCGAN Architecture")

In [35]:
# Define Unet generator skip block
class UnetSkipUnit(HybridBlock):
    def __init__(self, inner_channels, outer_channels, inner_block=None, innermost=False, outermost=False,
                 use_dropout=False, use_bias=False):
        super(UnetSkipUnit, self).__init__()
        self.outermost = outermost
        en_conv = Conv2D(channels=inner_channels, kernel_size=4, strides=2, padding=1,
                         in_channels=outer_channels, use_bias=use_bias)
        en_relu = LeakyReLU(alpha=0.2)
        en_norm = BatchNorm(in_channels=inner_channels)
        de_relu = Activation(activation='relu')
        de_norm = BatchNorm(in_channels=outer_channels)

        if innermost:
            de_conv = Conv2DTranspose(channels=outer_channels, kernel_size=4, strides=2, padding=1,
                                      in_channels=inner_channels, use_bias=use_bias)
            encoder = [en_relu, en_conv]
            decoder = [de_relu, de_conv, de_norm]
            model = encoder + decoder
        elif outermost:
            de_conv = Conv2DTranspose(channels=outer_channels, kernel_size=4, strides=2, padding=1,
                                      in_channels=inner_channels * 2)
            encoder = [en_conv]
            decoder = [de_relu, de_conv, Activation(activation='tanh')]
            model = encoder + [inner_block] + decoder
        else:
            de_conv = Conv2DTranspose(channels=outer_channels, kernel_size=4, strides=2, padding=1,
                                      in_channels=inner_channels * 2, use_bias=use_bias)
            encoder = [en_relu, en_conv, en_norm]
            decoder = [de_relu, de_conv, de_norm]
            model = encoder + [inner_block] + decoder
            if use_dropout:
                model += [Dropout(rate=0.5)]

        self.model = HybridSequential()
        for block in model:
            self.model.add(block)

    def hybrid_forward(self, F, x):
        if self.outermost:
            return self.model(x)
        else:
            return F.concat([self.model(x), x], dim=1)

# Define Unet generator
class UnetGenerator(HybridBlock):
    def __init__(self, in_channels, num_downs, ngf=64, use_dropout=False):
        super(UnetGenerator, self).__init__()

        #Build unet generator structure
        unet = UnetSkipUnit(ngf * 8, ngf * 8, innermost=True)
        for _ in range(num_downs - 5):
            unet = UnetSkipUnit(ngf * 8, ngf * 8, unet, use_dropout=True)
        unet = UnetSkipUnit(ngf * 8, ngf * 4, unet)
        unet = UnetSkipUnit(ngf * 4, ngf * 2, unet)
        unet = UnetSkipUnit(ngf * 2, ngf * 1, unet)
        unet = UnetSkipUnit(ngf, in_channels, unet, outermost=True)

        self.model = unet

    def hybrid_forward(self, F, x):
        return self.model(x)

# Define the PatchGAN discriminator
class Disciminator(HybridBlock):
    def __init__(self, in_channels, ndf=64, n_layers=3, use_sigmoid=True, use_bias=True):
        super(Disciminator, self).__init__()
        self.model = HybridSequential()
        kernel_size = 4
        padding = int(mx.nd.ceil((kernel_size - 1)/2))
        self.model.add(Conv2D(channels=ndf, kernel_size=kernel_size, strides=2,
                              padding=padding, in_channels=in_channels))
        self.model.add(LeakyReLU(alpha=0.2))

        nf_mult = 1
        for n in range(1, n_layers):
            nf_mult_prev = nf_mult
            nf_mult = min(2 ** n, 8)
            self.model.add(Conv2D(channels=ndf * nf_mult, kernel_size=kernel_size, strides=2,
                                  padding=padding, in_channels=ndf * nf_mult_prev,
                                  use_bias=use_bias))
            self.model.add(BatchNorm(in_channels=ndf * nf_mult))
            self.model.add(LeakyReLU(alpha=0.2))

        nf_mult_prev = nf_mult
        nf_mult = min(2 ** n_layers, 8)
        self.model.add(Conv2D(channels=ndf * nf_mult, kernel_size=kernel_size, strides=1,
                              padding=padding, in_channels=ndf * nf_mult_prev,
                              use_bias=use_bias))
        self.model.add(BatchNorm(in_channels=ndf * nf_mult))
        self.model.add(LeakyReLU(alpha=0.2))
        self.model.add(Conv2D(channels=1, kernel_size=kernel_size, strides=1,
                              padding=padding, in_channels=ndf * nf_mult))

    def hybrid_forward(self, F, x):
        return self.model(x)
    
netG = UnetGenerator(in_channels=3, num_downs=8)
netD = Discriminator(in_channels=3)

## Setup Loss Function and Optimizer
We use binary cross entropy as loss function and adam as optimizer. Initialize parameters with normal distribution.

In [None]:
# loss
GAN_loss = gluon.loss.SigmoidBinaryCrossEntropyLoss()
L1_loss = gluon.loss.L1Loss()

# initialize the generator and the discriminator
netG.initialize(mx.init.Normal(0.02), ctx=ctx)
netD.initialize(mx.init.Normal(0.02), ctx=ctx)

# trainer for the generator and the discriminator
trainerG = gluon.Trainer(netG.collect_params(), 'adam', {'learning_rate': lr, 'beta1': beta1})
trainerD = gluon.Trainer(netD.collect_params(), 'adam', {'learning_rate': lr, 'beta1': beta1})

## Training Loop
We recommend to use gpu to boost training. After a few epochs, we can see human-face-like images are generated

In [None]:
from datetime import datetime
import time
import logging

real_label = mx.nd.ones((batch_size,), ctx=ctx)
fake_label = mx.nd.zeros((batch_size,),ctx=ctx)

def facc(label, pred):
    pred = pred.ravel()
    label = label.ravel()
    return ((pred > 0.5) == label).mean()
metric = mx.metric.CustomMetric(facc)

stamp =  datetime.now().strftime('%Y_%m_%d-%H_%M')
logging.basicConfig(level=logging.DEBUG)

for epoch in range(epochs):
    tic = time.time()
    btic = time.time()
    train_data.reset()
    iter = 0
    for batch in train_data:
        ############################
        # (1) Update D network: maximize log(D(x)) + log(1 - D(G(z)))
        ###########################
        data = batch.data[0].as_in_context(ctx)
        latent_z = mx.nd.random_normal(0, 1, shape=(batch_size, latent_z_size, 1, 1), ctx=ctx)

        with autograd.record():
            # train with real image
            output = netD(data).reshape((-1, 1))
            errD_real = loss(output, real_label)
            metric.update([real_label,], [output,])

            # train with fake image
            fake = netG(latent_z)
            output = netD(fake).reshape((-1, 1))
            errD_fake = loss(output, fake_label)
            errD = errD_real + errD_fake
            errD.backward()
            metric.update([fake_label,], [output,])

        trainerD.step(batch.data[0].shape[0])

        ############################
        # (2) Update G network: maximize log(D(G(z)))
        ###########################
        with autograd.record():
            fake = netG(latent_z)
            output = netD(fake).reshape((-1, 1))
            errG = loss(output, real_label)
            errG.backward()

        trainerG.step(batch.data[0].shape[0])

        # Print log infomation every ten batches
        if iter % 10 == 0:
            name, acc = metric.get()
            logging.info('speed: {} samples/s'.format(batch_size / (time.time() - btic)))
            logging.info('discriminator loss = %f, generator loss = %f, binary training acc = %f at iter %d epoch %d' 
                     %(nd.mean(errD).asscalar(), 
                       nd.mean(errG).asscalar(), acc, iter, epoch))
        iter = iter + 1
        btic = time.time()

    name, acc = metric.get()
    metric.reset()
    logging.info('\nbinary training acc at epoch %d: %s=%f' % (epoch, name, acc))
    logging.info('time: %f' % (time.time() - tic))

    # Visualize one generated image for each epoch
    fake_img = fake[0]
    visualize(fake_img)
    plt.show()

## results
Generate some face images with generator.


In [None]:
num_image = 8
for i in range(num_image):
    latent_z = mx.nd.random_normal(0, 1, shape=(1, latent_z_size, 1, 1), ctx=ctx)
    img = netG(latent_z)
    plt.subplot(2,4,i+1)
    visualize(img[0])
plt.show()

Walk in the latent space. We can see that small changes in the latent space result in smooth changes in generated images. This is a good sign that the model has learnt relevant and interesting representations.

In [None]:
num_image = 12
latent_z = mx.nd.random_normal(0, 1, shape=(1, latent_z_size, 1, 1), ctx=ctx)
step = 0.05
for i in range(num_image):
    img = netG(latent_z)
    plt.subplot(3,4,i+1)
    visualize(img[0])
    latent_z += 0.05
plt.show()