# HELPERS


In [0]:
# pytorch
import torch
import numpy as np
from torch import nn, optim
from torch.autograd.variable import Variable
from torchvision import transforms, datasets
import matplotlib.pyplot as plt
from IPython.display import clear_output

def to_gpu(x):
  if torch.cuda.is_available():
    x = x.cuda()
  return x

# Training GANS


## Architecture

<img src="https://sthalles.github.io/assets/dcgan/GANs.png" width="600px"/>

## GAN seen as a *minmax* game

![](https://drive.google.com/uc?export=view&id=1KVkTMjTT7jr8gfIv-1c4LRosrCZRpJd9)

## Training procedure

![alt text](https://drive.google.com/uc?export=view&id=1_JMKgK48QbhU9W9F9DG6dPQpYZPcYGSE)


### Interesting improvements in GANs training
* Feature matching - matching disciminators features instead of output.
* Minibatch discrimination - appending similarity of fake images to disciminator to help him detect mode collapse
* Label smoothing - training with smoothed labels (0.9 instead of 1.0, done only for real examples)
* Using labels - train multi-class disciminator with *"fake"* as additional class
* Play with cost funciton:

![](https://cdn-images-1.medium.com/max/800/1*sE-ChIllxdrzIQBQhi33UQ.jpeg)
[source](https://towardsdatascience.com/gan-ways-to-improve-gan-performance-acf37f9f59b)

In [0]:
def display_status(epoch, num_epochs, d_error, g_error, d_pred_real, d_pred_fake):

    # var_class = torch.autograd.variable.Variable
    if isinstance(d_error, torch.autograd.Variable):
        d_error = d_error.data.cpu().numpy()
    if isinstance(g_error, torch.autograd.Variable):
        g_error = g_error.data.cpu().numpy()
    if isinstance(d_pred_real, torch.autograd.Variable):
        d_pred_real = d_pred_real.data
    if isinstance(d_pred_fake, torch.autograd.Variable):
        d_pred_fake = d_pred_fake.data


    print('Epoch: [{}/{}]'.format(epoch,num_epochs))
    print('Discriminator Loss: {:.4f}, Generator Loss: {:.4f}'.format(d_error, g_error))
    print('D(x): {:.4f}, D(G(z)): {:.4f}'.format(d_pred_real.mean(), d_pred_fake.mean()))


In [0]:
def ones_target(size):
    '''
    Tensor containing ones, with shape = size
    '''
    data = Variable(torch.ones(size, 1))
    return to_gpu(data)
def zeros_target(size):
    '''
    Tensor containing zeros, with shape = size
    '''
    data = Variable(torch.zeros(size, 1))
    return to_gpu(data)
  
def mones_target(size):
    data = Variable(torch.ones(size, 1))
    data *= -1
    return to_gpu(data)

In [0]:

def gan_magic(generator, discriminator, data_loader, noise, plot, epochs=100,
              disc_steps=1, test_samples=10, draw_every_n=1, d_lr=0.001, g_lr=0.001,
              mode="gan", clip=None):
  # Total number of epochs to train
  num_epochs = epochs

  test_noise = noise(test_samples)
  
  if mode == "gan":
    d_optimizer = optim.Adam(discriminator.parameters(), lr=d_lr)
    g_optimizer = optim.Adam(generator.parameters(), lr=g_lr)
  elif mode == "wgan":
    # WGAN with gradient clipping uses RMSprop instead of ADAM
    d_optimizer = torch.optim.RMSprop(discriminator.parameters(), lr=d_lr)
    g_optimizer = torch.optim.RMSprop(generator.parameters(), lr=g_lr)
    
  
  loss = nn.BCELoss()
  
  def train_discriminator(optimizer, real_data, fake_data):
    N = real_data.size(0)
    # Reset gradients
    optimizer.zero_grad()
    
    # 1.1 Predict on Real Data
    prediction_real = discriminator(real_data)
    # 1.2 Predict on Fake Data
    prediction_fake = discriminator(fake_data)

    # Calculate error and backpropagate
    if mode == "gan":
        error_real = loss(prediction_real, ones_target(N))
        error_real.backward()
        error_fake = loss(prediction_fake, zeros_target(N))
        error_fake.backward()
    elif mode == "wgan":
        error_real = prediction_real
        error_real.backward(ones_target(N))
        error_fake = prediction_fake
        error_fake.backward(mones_target(N))
        
    # 1.3 Update weights with gradients
    optimizer.step()
    
    if mode == "wgan":
        # Clip disciminator weights
        for p in discriminator.parameters():
            p.data.clamp_(-clip, clip)
    
    # Return error and predictions for real and fake inputs
    return error_real.mean() + error_fake.mean(), prediction_real, prediction_fake
  
  def train_generator(optimizer, fake_data):
    N = fake_data.size(0)

    # Reset gradients
    optimizer.zero_grad()

    # Sample noise and generate fake data
    prediction = discriminator(fake_data)

    # Calculate error and backpropagate
    if mode == "gan":
      error = loss(prediction, ones_target(N))
      error.backward()
    elif mode == "wgan":
      error = prediction
      error.backward(ones_target(N))

    # Update weights with gradients
    optimizer.step()

    # Return error
    return error.mean()
  


  for epoch in range(num_epochs):
      for n_batch, (real_batch,_) in enumerate(data_loader):
          N = real_batch.size(0)
          
          real_data = to_gpu(Variable(real_batch))

          # 1. Train Discriminator
          # 1.1 Generate fake data and detach 
          # (so gradients are not calculated for generator)
          fake_data = generator(noise(N)).detach()
          # 1.2 Train D
          d_error, d_pred_real, d_pred_fake = \
              train_discriminator(d_optimizer, real_data, fake_data)
      
          # 2. Train Generator every "k = disc_steps" steps
          if n_batch % disc_steps == 0:        
            # 2.1 Generate fake data again
            fake_data = generator(noise(N))
            # 2.2 Train G
            g_error = train_generator(g_optimizer, fake_data)

      # Display samples every few batches
      if epoch % draw_every_n == 0: 
          test_images = generator(test_noise).cpu()
          test_images = test_images.data
          plot(test_images)
          
      # Display status Logs
      display_status(
          epoch, num_epochs,
          d_error, g_error, d_pred_real, d_pred_fake
      )
        
 

# Vanilla GAN

In [0]:
LATENT_SIZE = 10
BATCH_SIZE = 32
TEST_SAMPLES = 16

def circle_noise(size):
    '''
    Generates a 1-d vector of gaussian sampled random values
    '''
    n = Variable(torch.rand(size, LATENT_SIZE))
    return to_gpu(n)

In [0]:
from sklearn.datasets import make_circles
import matplotlib.pyplot as plt
import random

# generate 2d classification dataset
X, _ = make_circles(n_samples=400, noise=0.05, factor=0.99)
X *= 0.8

def plot_circle(data):
  xax = data[:,0]
  yax = data[:,1]

  plt.scatter(xax, yax, c='red')
  plt.show()

class CircleGenerator:
  def __init__(self, data):
    self.data = data
    
    
  def gen_batches(self):
    for _ in range(len(self.data) // BATCH_SIZE):
      dcopy = self.data.copy()
      np.random.shuffle(dcopy)
      yield torch.from_numpy(dcopy[:BATCH_SIZE].astype(np.float32)), None # second one just for compatibility

  def __iter__(self):
    return self.gen_batches()

  
circle_data_loader = CircleGenerator(X)

plot_circle(X)

In [0]:
class DiscriminatorNet(torch.nn.Module):
    """
    A three hidden-layer discriminative neural network
    """
    def __init__(self, is_critic=False):
        super(DiscriminatorNet, self).__init__()
        self.is_critic = is_critic
        n_features = 2
        n_out = 1
        
        self.hidden0 = nn.Sequential( 
            nn.Linear(n_features, 50),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.hidden1 = nn.Sequential(
            nn.Linear(50, 20),
            nn.LeakyReLU(0.2),
            nn.Dropout(0.3)
        )
        self.out = nn.Sequential(
            torch.nn.Linear(20, n_out),
        )
        
        self.probas = nn.Sigmoid()

    def forward(self, x):
        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.out(x)
        if not self.is_critic:
          x = self.probas(x)
        return x

discriminator = to_gpu(DiscriminatorNet())


class GeneratorNet(torch.nn.Module):
    """
    A three hidden-layer generative neural network
    """
    def __init__(self):
        super(GeneratorNet, self).__init__()
        n_features = 10
        n_out = 2
        
        self.hidden0 = nn.Sequential(
            nn.Linear(n_features, 20),
            nn.LeakyReLU(0.2)
        )
        self.hidden1 = nn.Sequential(            
            nn.Linear(20, 50),
            nn.LeakyReLU(0.2)
        )
        
        self.out = nn.Sequential(
            nn.Linear(50, n_out),
            nn.Tanh()
        )

    def forward(self, x):
        x = self.hidden0(x)
        x = self.hidden1(x)
        x = self.out(x)
        return x

generator = to_gpu(GeneratorNet())

In [0]:
gan_magic(generator, discriminator, circle_data_loader, circle_noise,
          plot_circle, epochs=1000, disc_steps=20, test_samples=200, draw_every_n=10, d_lr=0.01, g_lr=0.01)


# DCGAN

 #### DCGAN summary [(source)](https://medium.com/@jonathan_hui/gan-dcgan-deep-convolutional-generative-adversarial-networks-df855c438f)
    
    1. Replace all max pooling with convolutional stride
    2. Use transposed convolution for upsampling.
    3. Eliminate fully connected layers.
    4. Use Batch normalization except the output layer for the generator and the input layer of the discriminator.
    5. Use ReLU in the generator except for the output which uses tanh.
    6. Use LeakyReLU in the discriminator.

In [0]:
LATENT_SIZE = 100
GEN_FEATS = 32
DIS_FEATS = 32
NUM_CHANNELS = 1
IMAGE_W = 28
IMAGE_H = 28
BATCH_SIZE = 32
TEST_SAMPLES = 16

In [0]:
#@title Convolutions size check (square images)

in_channels = 0  #@param {type: "slider", min: 0, max: 100}
out_channels = 28  #@param {type: "slider", min: 0, max: 100}
image_size = 1  #@param {type: "slider", min: 0, max: 100}
kernel_size = 4  #@param {type: "slider", min: 1, max: 10}
stride = 1  #@param {type: "slider", min: 0, max: 10}
padding = 0  #@param {type: "slider", min: 0, max: 10}


transpose_out = (image_size-1) * stride - (2*padding) + kernel_size
conv_out = (image_size + (2 * padding) - (kernel_size -1) -1) / stride + 1


print("ConvTranspose shape:", ('BS', out_channels, transpose_out, transpose_out))
print("ConvTranspose shape:", ('BS', out_channels, conv_out, conv_out))

In [0]:
def mnist_data():
    compose = transforms.Compose(
        [transforms.ToTensor(),
         transforms.Normalize([.5], [.5])
        ])
    out_dir = './dataset'
    return datasets.FashionMNIST(root=out_dir, train=True, transform=compose, download=True)

# Load data
data = mnist_data()
# Create loader with data, so that we can iterate over it
data_loader = torch.utils.data.DataLoader(data, batch_size=BATCH_SIZE, shuffle=True)
# Num batches
num_batches = len(data_loader)

In [0]:
def plot_images(images, is_numpy=False):
    if not is_numpy:
      images = images.numpy()
      
    images = images.reshape(images.shape[0], 28, 28)
    
    images_row = 6
    num_images = len(images)

    
    f, axarr = plt.subplots((num_images // images_row) + 1, 
                            images_row)
  
    for ax in axarr.flatten():
      ax.axis('off')
  
    for i, image in enumerate(images):
      ax = axarr[i // images_row, i % images_row]
      ax.imshow(image)
    clear_output()
    plt.show()

In [0]:
# custom weights initialization called on netG and netD
def weights_init(m):
    classname = m.__class__.__name__
    if classname.find('Conv') != -1:
        nn.init.normal_(m.weight.data, 0.0, 0.02)
    elif classname.find('BatchNorm') != -1:
        nn.init.normal_(m.weight.data, 1.0, 0.02)
        nn.init.constant_(m.bias.data, 0)

In [0]:
class DcDiscriminatorNet(torch.nn.Module):
    """
    A three hidden-layer discriminative neural network
    """
    def __init__(self, is_critic=False):
        super(DcDiscriminatorNet, self).__init__()
        self.is_critic = is_critic
        self.conv1 = nn.Sequential(
            # input is (nc) x 28 x 28
            nn.Conv2d(NUM_CHANNELS, DIS_FEATS, 4, 2, 1, bias=False),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.conv2 = nn.Sequential(
            # state size. (ndf) x 14 x 14
            nn.Conv2d(DIS_FEATS, DIS_FEATS*2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(DIS_FEATS*2),
            nn.LeakyReLU(0.2, inplace=True)
        )
        self.conv3 = nn.Sequential(
            # state size. (ndf*2) x 7 x 7
            nn.Conv2d(DIS_FEATS*2, DIS_FEATS*4, 3, 2, 1, bias=False),
            nn.BatchNorm2d(DIS_FEATS*4),
            nn.LeakyReLU(0.2, inplace=True),
        )
        self.linear1 = nn.Sequential(   
            # state size. (ndf*4) x 4 x 4
            nn.Linear(DIS_FEATS*4 * 4 * 4, 1),
        )
        
        self.probas = nn.Sigmoid()
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = x.view(-1, DIS_FEATS*4*4*4)
        x = self.linear1(x)
        if not self.is_critic:
          x = self.probas(x)
        return x

discriminator = to_gpu(DcDiscriminatorNet())
discriminator.apply(weights_init)

In [0]:

class DcGeneratorNet(torch.nn.Module):
    """
    A three hidden-layer generative neural network
    """
    def __init__(self):
        super(DcGeneratorNet, self).__init__()
       
        self.convt1 = nn.Sequential(
            # input is Z, going into a convolution
            nn.ConvTranspose2d(LATENT_SIZE, GEN_FEATS*4, 4, 1, 0, bias=False),
            nn.BatchNorm2d(GEN_FEATS*4),
            nn.ReLU(True))
        
        self.convt2 = nn.Sequential(
            # state size. (ngf*8) x 4 x 4
            nn.ConvTranspose2d(GEN_FEATS*4, GEN_FEATS*2, 3, 2, 1, bias=False),
            nn.BatchNorm2d(GEN_FEATS*2),
            nn.ReLU(True))
        
        self.convt3 = nn.Sequential(
            # state size. (ngf*4) x 7 x 7
            nn.ConvTranspose2d(GEN_FEATS*2,GEN_FEATS, 4, 2, 1, bias=False),
            nn.BatchNorm2d(GEN_FEATS),
            nn.ReLU(True))
        
        self.convt4 = nn.Sequential(
            # state size. (ngf*2) x 14 x 14
            nn.ConvTranspose2d(GEN_FEATS, NUM_CHANNELS, 4, 2, 1, bias=False),
            nn.Tanh()
            # state size. (NUM_CHANNELS x 28 x 28)
        )

    def forward(self, x):
        x = self.convt1(x)
        x = self.convt2(x)
        x = self.convt3(x)
        x = self.convt4(x)
        return x

generator = to_gpu(DcGeneratorNet())
generator.apply(weights_init)

In [0]:
def fmnist_noise(size):
    '''
    Generates a 1-d vector of gaussian sampled random values
    '''
    n = Variable(torch.randn(size, LATENT_SIZE, 1, 1))
    return to_gpu(n)
  

In [0]:
gan_magic(generator, discriminator, data_loader, fmnist_noise, plot_images, test_samples=TEST_SAMPLES,
          disc_steps=1, d_lr=0.0001, g_lr=0.0001)

## Interpolate

In [0]:
noise = fmnist_noise(10)

In [0]:
images = generator(noise).cpu().data
plot_images(images)

In [0]:

noise1 = noise[0]
noise2 = noise[2]

interpolation = []

for ratio in np.linspace(0,1,10):
  value = noise1 * (1-ratio) + noise2 * ratio
  img = generator(value[None,:,:,:]).cpu().data.numpy()
  interpolation.append(img.squeeze())
  
interpolation = np.array(interpolation)
  
plot_images(interpolation, is_numpy=True)

# WGAN

## Wasserstein distance

![](https://cdn-images-1.medium.com/max/800/1*6y-tz57odJpHh4pwRfXACw.png)

## Discriminator → Critic

<img src="https://i.imgflip.com/2x6470.jpg" alt="Drawing" height="400px;"/>

## Cost functions

<img src="https://cdn-images-1.medium.com/max/2600/1*5jF5gbIDwU6k9m1ILl0Utg.jpeg" width="800px"/>
[source](https://medium.com/@jonathan_hui/gan-wasserstein-gan-wgan-gp-6a1a2aa1b490)

## Training procedure

<img src="https://cdn-images-1.medium.com/max/1600/1*JOg9lC2JLl2Crmx5uk6S2g.png" width="800px"/>
[source](https://medium.com/@jonathan_hui/gan-wasserstein-gan-wgan-gp-6a1a2aa1b490)

### WGAN

In [0]:
del discriminator
del generator

LATENT_SIZE = 10
BATCH_SIZE = 32
TEST_SAMPLES = 16

circle_data_loader = CircleGenerator(X)

plot_circle(X)

In [0]:
generator = to_gpu(GeneratorNet())
discriminator = to_gpu(DiscriminatorNet(is_critic=True))

gan_magic(generator, discriminator, circle_data_loader, circle_noise,
          plot_circle, epochs=1000, disc_steps=20, test_samples=200, draw_every_n=10, d_lr=0.01, g_lr=0.01,
          mode="wgan", clip=0.01)

## DCWGAN

In [0]:
del generator
del discriminator

generator = to_gpu(DcGeneratorNet())
generator.apply(weights_init)

discriminator = to_gpu(DcDiscriminatorNet())
discriminator.apply(weights_init)

In [0]:
gan_magic(generator, discriminator, data_loader, fmnist_noise, plot_images, test_samples=TEST_SAMPLES,
          disc_steps=1, d_lr=0.0001, g_lr=0.0001, mode="wgan", clip=0.01)