# Manga DCGAN

This notebook is an expriment of using DC-GAN (Deep Convolutional Generative Adverserial Network) to generate comic/manga characters.

In [None]:
%matplotlib inline

import time

import pickle as pkl

import numpy as np
import tensorflow as tf

from PIL import Image
import os
from glob import glob

In [None]:
!mkdir checkpoints

## Network Input

In [None]:
def model_inputs(real_dim, z_dim):
    input_real = tf.placeholder(tf.float32, (None, *real_dim), name='input_real')
    input_z = tf.placeholder(tf.float32, (None, z_dim), name='input_z')
    return input_real, input_z

## Data Processing

In [None]:
def get_image(image_path, width, height, mode):
    """
    Read image from image_path
    :param image_path: Path of image
    :param width: Width of image
    :param height: Height of image
    :param mode: Mode of image
    :return: Image data
    """
    image = Image.open(image_path)
    
    if image.size == (width, height):
        return np.array(image.convert(mode))
    else:
        return None

def get_batch(image_files, width, height, mode):
    data_batch = np.array(
        [get_image(file, width, height, mode) for file in image_files]).astype(np.float32)
    
    # Make sure the images are in 4 dimensions
    if len(data_batch.shape) < 4:
        data_batch = data_batch.reshape(data_batch.shape + (1,))
        
    return data_batch

def scale(x, feature_range=(-1, 1)):
    # scale to (0, 1)
    x = ((x - x.min())/(255 - x.min()))
    
    # scale to feature_range
    min, max = feature_range
    x = x * (max - min) + min
    return x

In [None]:
class Dataset:
    def __init__(self, data_files, scale_func=None):
        """
        Initialize the class
        :param data_files: List of files in the database
        :param scale_func: Scale function
        """
        IMAGE_WIDTH = 96
        IMAGE_HEIGHT = 96
        image_channels = 3
        
        if scale_func is None:
            self.scaler = scale
        else:
            self.scaler = scale_func
            
        self.image_mode = 'RGB'
        self.data_files = data_files
        self.shape = len(data_files), IMAGE_WIDTH, IMAGE_HEIGHT, image_channels
        
    def get_batches(self, batch_size):
        """
        Generate batches
        :param batch_size: Batch size
        :return Batches of data
        """
        current_index = 0
        while current_index + batch_size <= self.shape[0]:
            data_batch = get_batch(
                self.data_files[current_index:current_index + batch_size],
                *self.shape[1:3],
                self.image_mode)

            current_index += batch_size
            
            yield self.scaler(data_batch)

## Generator

In [None]:
def generator(z, output_dim, reuse=False, alpha=0.2, training=True):
    with tf.variable_scope('generator', reuse=reuse):
        
        # First fully connected layer
        layer1 = tf.layers.dense(z, 6*6*1024)
        layer1 = tf.reshape(layer1, (-1, 6, 6, 1024))
        layer1 = tf.layers.batch_normalization(layer1, training=training)
        layer1 = tf.maximum(alpha * layer1, layer1) # Leaky ReLU
        
        # First conv layer
        layer2 = tf.layers.conv2d_transpose(layer1, 512, 5, strides=2, padding='same')
        layer2 = tf.layers.batch_normalization(layer2, training=training)
        layer2 = tf.maximum(alpha * layer2, layer2) # Leaky ReLU
        
        # Second conv layer
        layer3 = tf.layers.conv2d_transpose(layer2, 256, 5, strides=2, padding='same')
        layer3 = tf.layers.batch_normalization(layer3, training=training)
        layer3 = tf.maximum(alpha * layer3, layer3) # Leaky ReLU
        
        # Third conv layer
        layer4 = tf.layers.conv2d_transpose(layer3, 128, 5, strides=2, padding='same')
        layer4 = tf.layers.batch_normalization(layer4, training=training)
        layer4 = tf.maximum(alpha * layer4, layer4) # Leaky ReLU
        
        # Output layer, 96x96x3
        logits = tf.layers.conv2d_transpose(layer4, output_dim, 5, strides=2, padding='same')
        
        out = tf.tanh(logits)
        
        return out

## Discriminator

In [None]:
def discriminator(x, reuse=False, alpha=0.2):
    with tf.variable_scope('discriminator', reuse=reuse):
        # Input is 96x96x3
        layer1 = tf.layers.conv2d(x, 128, 5, strides=2, padding='same')
        relu1 = tf.maximum(alpha * layer1, layer1)

        # 48x48x128
        layer2 = tf.layers.conv2d(relu1, 256, 5, strides=2, padding='same')
        bn2 = tf.layers.batch_normalization(layer2, training=True)
        relu2 = tf.maximum(alpha * bn2, bn2)

        # 24x24x256
        layer3 = tf.layers.conv2d_transpose(relu2, 512, 5, strides=2, padding='same')
        bn3 = tf.layers.batch_normalization(layer3, training=True)
        relu3 = tf.maximum(alpha * bn3, bn3)

        # 12x12x512
        layer4 = tf.layers.conv2d_transpose(relu3, 1024, 5, strides=2, padding='same')
        bn4 = tf.layers.batch_normalization(layer4, training=True)
        relu4 = tf.maximum(alpha * bn4, bn4)

        # 6x6x1024
        flat = tf.reshape(relu4, (-1, 6*6*1024))
        logits = tf.layers.dense(flat, 1)
        out = tf.sigmoid(logits)

        return out, logits

## Model Loss

In [None]:
def model_loss(input_real, input_z, output_dim, alpha=0.2):
    """
    Get the loss for the discriminator and generator
    :param input_real: Images from the real dataset
    :param input_z: Z input
    :param out_channel_dim: The number of channels in the output image
    :return: A tuple of (discriminator loss, generator loss)
    """
    g_model = generator(input_z, output_dim, alpha=alpha)
    d_model_real, d_logits_real = discriminator(input_real, alpha=alpha)
    d_model_fake, d_logits_fake = discriminator(g_model, reuse=True, alpha=alpha)
    
    g_loss = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(logits=d_logits_fake, labels=tf.ones_like(d_model_fake)))
    
    d_loss_real = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(logits=d_logits_real, labels=tf.ones_like(d_model_real)))
    d_loss_fake = tf.reduce_mean(
        tf.nn.sigmoid_cross_entropy_with_logits(logits=d_logits_fake, labels=tf.zeros_like(d_model_fake)))
    
    d_loss = d_loss_real + d_loss_fake
    
    return d_loss, g_loss

## Optimizers

In [None]:
def model_opt(d_loss, g_loss, learning_rate, beta1):
    """
    Get optimization operations
    :param d_loss: Discriminator loss Tensor
    :param g_loss: Generator loss Tensor
    :param learning_rate: Learning Rate Placeholder
    :param beta1: The exponential decay rate for the 1st moment in the optimizer
    :return: A tuple of (discriminator training operation, generator training operation)
    """
    # Get weights and bias to update
    t_vars = tf.trainable_variables()
    d_vars = [var for var in t_vars if var.name.startswith('discriminator')]
    g_vars = [var for var in t_vars if var.name.startswith('generator')]
    
    # Optimize, Using Adam optimizer
    with tf.control_dependencies(tf.get_collection(tf.GraphKeys.UPDATE_OPS)):
        d_train_opt = tf.train.AdamOptimizer(learning_rate, beta1=beta1).minimize(d_loss, var_list=d_vars)
        g_train_opt = tf.train.AdamOptimizer(learning_rate, beta1=beta1).minimize(g_loss, var_list=g_vars)

    return d_train_opt, g_train_opt

## Building the model

In [None]:
class GAN:
    def __init__(self, real_size, z_size, learning_rate, alpha=0.2, beta1=0.5):
        tf.reset_default_graph()
        
        # Create input place holders
        self.input_real, self.input_z = model_inputs(real_size, z_size)
        
        # Get the model losses
        self.d_loss, self.g_loss = model_loss(self.input_real, self.input_z, real_size[2], alpha=alpha)
        
        # Get the optimized parameters
        self.d_opt, self.g_opt = model_opt(self.d_loss, self.g_loss, learning_rate, beta1=beta1)      

In [None]:
# Helper method to visualize the generated outout
def view_samples(epoch, samples, nrows, ncols, figsize=(5,5)):
    fig, axes = plt.subplots(figsize=figsize, nrows=nrows, ncols=ncols, 
                             sharey=True, sharex=True)
    for ax, img in zip(axes.flattern(), samples[epoch]):
        ax.axis('off')
        img = ((img - img.min())*255 / (img.max() - img.min())).astype(np.uint8)
        ax.set_adjustable('box-forced')
        im = ax.imshow(img, aspect='equal')
    
    # No gap between subplots
    plt.subplots_adjust(wspace=0, hspace=0)
    return fig, axes

In [None]:
def train(model, dataset, epochs, batch_size, print_every=10, show_every=100, figsize=(5,5)):
    saver = tf.train.Saver() # Saver used to save the checkpoints
    samples, losses = [], [] # Outputs
    
    sample_z = np.random.uniform(-1, 1, size=(9, z_size)) # Generate 9 images
    
    steps = 0 # This variable is for showing the generator images
    
    with tf.Session() as sess:
        # Initialize the variables, this is a 
        # standard tensorflow operation
        sess.run(tf.global_variables_initializer())
        
        # Loop through epochs...
        start_time = time.time()
        for e in range(epochs):
            for x in dataset.get_batches(batch_size):
                steps += 1
                
                # Sample random noise for G
                batch_z = np.random.uniform(-1, 1, size=(batch_size, z_size))
                
                # Run optimizers
                _ = sess.run(model.d_opt, feed_dict={model.input_real: x, model.input_z: batch_z})
                _ = sess.run(model.g_opt, feed_dict={model.input_z: batch_z, model.input_real: x})
                
                # Display options
                if steps % print_every == 0:
                    end_time = time.time()
                    delta_time = end_time - start_time # get the delta time since last report
                    start_time = time.time() # reset start time
                    # At the end of each epoch, get the losses and print them out
                    train_loss_d = model.d_loss.eval({model.input_z:batch_z, model.input_real:x})
                    train_loss_g = model.g_loss.eval({model.input_z:batch_z})
                    print("Epoch {}/{}...".format(e+1, epochs),
                          "Discriminator Loss: {:.4f}...".format(train_loss_d),
                          "Generator Loss: {:.4f}".format(train_loss_g),
                          " | batch_size=%d steps=%d delta_time=%.2f(s)" % (batch_size, steps, delta_time))
                    
                    # Save losses for later view
                    losses.append((train_loss_d, train_loss_g))
                    
                if steps % show_every == 0:
                    comic_gen = session.run(
                                    generator(model.input_z, 3, reuse=True, training=False),
                                    feed_dict={model.input_z: sample_z})
                    # Display generated samples
                    samples.append(comic_gen)
                    _ = view_samples(-1, samples, 3, 3, figsize=figsize)
                    plt.show()
                    
            saver.save(sess, './checkpoints/' + 'generator_' + 'epoch_{}'.format(e+1)+ '.ckpt')  
   
    with open('samples.pkl', 'wb') as f:
        pkl.dump(samples, f)
        
    return losses, samples

## Hyperparameters

In [None]:
real_size = (96,96,3)
z_size = 100
learning_rate = 0.0002
batch_size = 10
epochs = 1
alpha = 0.2
beta1 = 0.5

# Create the network
model = GAN(real_size, z_size, learning_rate, alpha=alpha, beta1=beta1)

## Execution

In [None]:
data_folder_path = os.getcwd() + '/data'

dataset = Dataset(glob(os.path.join(data_folder_path, '**/*.jpg'), recursive=True))

# Training the network
start_time = time.time()
losses, samples = train(model, dataset, epochs, batch_size, print_every=100, show_every=10000, figsize=(12,12))
end_time = time.time()
print("Time elapsed: %.2f sec" % (end_time - start_time))