In [1]:
# Import libraries

import pickle as pkl
import matplotlib.pyplot as plt
import numpy as np
from scipy.io import loadmat
import tensorflow as tf

from urllib import urlretrieve
from os.path import isfile, isdir
from tqdm import tqdm

In [2]:
!mkdir data

mkdir: cannot create directory ‘data’: File exists


In [3]:
# Get svhn dataset

data_dir = 'data/'

if not isdir(data_dir):
    raise Exception('Data directory does not exist')
    
    
# Custom class to show download progress
class DLProgress(tqdm):
    last_block = 0
    
    def hook(self, block_num=1, block_size=1, total_size=None):
        self.total = total_size
        self.update((block_num - self.last_block) * block_size)
        self.last_block = block_num
        
if not isfile(data_dir + 'train_32x32.mat'):
    progressBar = DLProgress(unit='B', unit_scale=True, miniters=1, desc='SVHN train dataset')
    urlretrieve(
        'http://ufldl.stanford.edu/housenumbers/train_32x32.mat',
        data_dir + 'train_32x32.mat',
        progressBar.hook
    )

if not isfile(data_dir + 'test_32x32.mat'):
    progressBar = DLProgress(unit='B', unit_scale=True, miniters=1, desc='SVHN test dataset')
    urlretrieve(
        'http://ufldl.stanford.edu/housenumbers/test_32x32.mat',
        data_dir + 'test_32x32.mat',
        progressBar.hook
    )

In [4]:
# Load test and train set

train_set = loadmat(data_dir + 'train_32x32.mat')
test_set = loadmat(data_dir + 'test_32x32.mat')

In [5]:
# Scale images to match generator output

def scale(x, feature_range=(-1, 1)):
    # scale to (0, 1)
    x = ((x - x.min())/(255 - x.min()))
    
    # scale to feature_range
    min, max = feature_range
    x = x * (max - min) + min
    return x

In [6]:
# Split into test and validation sets

class Dataset():
    def __init__(self, train, test, val_frac=0.5, shuffle=False, scale_func=None):
        split_index = int(len(test['y']) * (1 - val_frac))
        
        # Test and validation input set
        self.test_x = test['X'][:, :, :, :split_index]
        self.valid_x = test['X'][:, :, :, split_index:]
        
        # Test and validation labels set
        self.test_y = test['y'][:split_index]
        self.valid_y = test['y'][split_index:]
        
        # Train input and label sets
        self.train_x, self.train_y = train['X'], train['y']
        
        # Roll the specified axis backwards
        self.train_x = np.rollaxis(self.train_x, 3)
        self.valid_x = np.rollaxis(self.valid_x, 3)
        self.test_x = np.rollaxis(self.test_x, 3)
        
        # Scale
        self.scaler = scale_func if scale_func else scale
        self.shuffle = shuffle
        
    def batches(self, batch_size):
        if self.shuffle:
            index = np.arange(len(dataset.train_x))
            np.random.shuffle(index)
            self.train_x = self.train_x[index]
            self.train_y = self.train_y[index]
        
        for i in range(0, len(self.train_y), batch_size):
            x = self.train_x[i: i + batch_size]
            y = self.train_y[i: i + batch_size]
            
            yield self.scaler(x), self.scaler(y)

In [13]:
# Model inputs

def model_inputs(discriminator_dim, generator_dim):
    input_discriminator = tf.placeholder(
        tf.float32,
        shape=[None, discriminator_dim],
        name='input_discriminator'
    )
    
    input_generator = tf.placeholder(
        tf.float32,
        shape=[None, generator_dim],
        name='input_discriminator'
    )

In [8]:
# Generator

def generator(_input, output_dim, reuse=False, alpha=0.2, is_training=True):
    with tf.variable_scope('generator', reuse=reuse):
        ## Fully connected layer
        fully = tf.layers.dense(_input, 4 * 4 * 512)
        # Reshape it to start the convolutional stack
        fully = tf.reshape(fully, (-1, 4, 4, 512))
        # Apply batch normalization
        fully = tf.layers.batch_normalization(fully, trianing=is_training)
        # Apply leaky relu
        fully = tf.maximum(alpha * fully, fully)
        
        ## First deconvolution
        conv_1 = tf.layers.conv2d_transpose(fully, 256, 5, strides=2, padding='same')
        # Apply batch normalization
        conv_1 = tf.layers.batch_normalization(conv_1, training=is_training)
        # Apply leaky relu
        conv_1 = tf.maximum(alpha * conv_1, conv_1)
        
        ## Second deconvolution
        conv_2 = tf.layers.conv2d_transpose(conv_1, 128, 5, strides=2, padding='same')
        # Apply batch normalization
        conv_2 = tf.layers.batch_normalization(conv_2, training=is_training)
        # Apply leaky relu
        conv_2 = tf.maximum(alpha * conv_2, conv_2)
        
        ## Output deconvolution layer
        
        logits = tf.layers_conv2d_transpose(conv_2, output_dim, 5, strides=2, padding='same')
        return tf.tanh(logits)

In [9]:
def discriminator(_input, reuse=False, alpha=0.2):
    with tf.variable_scope('discriminator', reuse=reuse):
        ## First convolution
        conv_1 = tf.layers.conv2d(_input, 64, 5, strides=2, padding='same')
        # Apply leaky relu without batch normalization
        conv_1 = tf.maximum(alpha * conv_1, conv_1)
        
        ## Second convolution
        conv_2 = tf.layers.conv2d(conv_1, 128, 5, strides=2, padding='same')
        # Apply batch normalization
        conv_2 = tf.layers.batch_normalization(conv_2, training=True)
        # Apply leaky relu
        conv_2 = tf.maximum(alpha * conv_2, conv_2)
        
        ## Third convolution
        conv_3 = tf.layers.conv2d(conv_2, 256, 5, strides=2, padding='same')
        # Apply batch normalization
        conv_3 = tf.layers.batch_normalization(conv_3, training=True)
        # Apply leaky relu
        conv_3 = tf.maximum(alpha * conv_3, conv_3)
        
        ## Output fully connected layer and flatten it
        output = tf.reshape(conv_3, (-1, 4 * 4 * 256))
        logits = tf.layers.dense(output, 1)
        output = tf.sigmoid(logits)
        
        return output, logits

In [14]:
# Model loss

def model_loss(input_discriminator, input_generator, output_dime, alpha=0.2):
    ## Create genereator model
    generator_model = generator(
        input_generator,
        output_dim,
        alpha=alpha
    )
    
    ## Create discriminator real model
    discriminator_model_real, discriminator_logits_real = discriminator(
        input_discriminator,
        alpha=alpha
    )
    
    ## Create discriminator fake model
    discriminator_model_fake, discriminator_logits_fake = discriminator(
        generator_model,
        reuse=True,
        alpha=alpha
    )
    
    ## Get discriminator real loss
    discriminator_loss_real = tf.nn.sigmoid_cross_entropy_with_logits(
        logits=discriminator_logits_real,
        labels=tf.ones_like(discriminator_model_real)
    )
    discriminator_loss_real = tf.reduce_mean(discriminator_loss_real)
    
    ## Get discriminator fake loss
    discriminator_loss_fake = tf.nn.sigmoid_cross_entropy_with_logits(
        logits=discriminator_logits_fake,
        labels=tf.zeros_like(discriminator_model_fake)
    )
    discriminator_loss_fake = tf.reduce_mean(discriminator_loss_fake)
    
    ## Add discriminator losses
    discriminator_loss = discriminator_loss_real + discriminator_loss_fake
    
    ## Get generator loss
    generator_loss = tf.nn.sigmoid_cross_entropy_with_logits(
        logits=discriminator_logits_fake,
        labels=tf.ones_like(discriminator_model_fake)
    )
    generator_loss = tf.reduce_mean(generator_loss)
    
    return discriminator_loss, generator_loss
    

In [18]:
# Optimizers

def model_optimizers(discriminator_loss, generator_loss, learning_rate, beta):
    ## Split trainable vars into generator and discriminator vars
    training_vars = tf.trainable_variables()
    discriminator_vars = [var for var in training_vars if var.name.startswith('discriminator')]
    generator_vars = [var for var in training_vars if var.name.startswith('generator')]
    
    ## Optimize
    
    with tf.control_dependecies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)):
        discriminator_train = tf.train.AdamOptimizer(
            learning_rate,
            beta1=beta
        ).minimize(discriminator_loss, var_list=discriminator_vars)
        
        generator_train = tf.train.AdamOptimizer(
            learning_rate,
            beta1=beta
        ).minimize(generator_loss, var_list=generator_vars)
    
    return discriminator_train, generator_train