## Generating Synthetic Images from Textual Description using GANs

### Requirements
1. Tensorflow
2. numpy
3. scipy
4. pickle
5. json
6. opencv
7. h5py

In [1]:
import tensorflow as tf
import numpy as np
import argparse
import pickle
import h5py
import scipy
import random
import json
import os
import shutil
from util import tf_ops as ops
import cv2 as opencv
import random

In [2]:
# Hyper-parameters
z_dim = 100
t_dim = 256
batch_size = 64
image_size = 64
gf_dim = 64
df_dim = 64
gfc_dim = 1024
caption_vector_length = 2400
data_dir = "Data"
learning_rate = 0.0002
beta1 = 0.5
epochs = 600
save_every = 30
resume_model = None

In [3]:
def conv2d(_input, output_dim, k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02, name="conv2d"):
    with tf.variable_scope(name):
        w = tf.get_variable('w', [k_h, k_w, _input.get_shape()[-1], output_dim],
                            initializer=tf.truncated_normal_initializer(stddev=stddev))
        _conv = tf.nn.conv2d(_input, w, strides=[1, d_h, d_w, 1], padding='SAME')

        _biases = tf.get_variable('biases', [output_dim], initializer=tf.constant_initializer(0.0))
        _conv = tf.reshape(tf.nn.bias_add(_conv, _biases), _conv.get_shape())

        return _conv

In [4]:
def deconv2d(_input, output_shape, k_h=5, k_w=5, d_h=2, d_w=2, stddev=0.02,  name="deconv2d"):
    with tf.variable_scope(name):
        # filter : [height, width, output_channels, in_channels]
        w = tf.get_variable('w', [k_h, k_h, output_shape[-1], _input.get_shape()[-1]],
                            initializer=tf.random_normal_initializer(stddev=stddev))
        _deconv = tf.nn.conv2d_transpose(_input, w, output_shape=output_shape, strides=[1, d_h, d_w, 1])
        _biases = tf.get_variable('biases', [output_shape[-1]], initializer=tf.constant_initializer(0.0))
        _deconv = tf.reshape(tf.nn.bias_add(_deconv, _biases), _deconv.get_shape())
    return _deconv

In [5]:
def linear_layer(input_, output_size, scope=None, stddev=0.02, bias_start=0.0, with_w=False):
    shape = input_.get_shape().as_list()

    with tf.variable_scope(scope or "Linear"):
        matrix = tf.get_variable("Matrix", [shape[1], output_size], tf.float32,
                                 tf.random_normal_initializer(stddev=stddev))
        bias = tf.get_variable("bias", [output_size],
            initializer=tf.constant_initializer(bias_start))
        if with_w:
            return tf.matmul(input_, matrix) + bias, matrix, bias
        else:
            return tf.matmul(input_, matrix) + bias

In [6]:
def generator(t_z, t_text_embedding):
    s = image_size
    s2, s4, s8, s16 = int(s / 2), int(s / 4), int(s / 8), int(s / 16)

    reduced_text_embedding = tf.nn.leaky_relu(linear_layer(t_text_embedding, t_dim, scope= 'g_embedding'))
    z_concat = tf.concat([t_z, reduced_text_embedding], 1)
    z_ = linear_layer(z_concat, gf_dim * 8 * s16 * s16, scope = 'g_h0_lin')
    h0 = tf.reshape(z_, [-1, s16, s16, gf_dim * 8])
    h0 = tf.nn.relu(tf.layers.batch_normalization(h0, momentum=0.9,epsilon = 1e-5,  training=True))

    h1 = deconv2d(h0, [batch_size, s8, s8, gf_dim * 4], name='g_h1')
    h1 = tf.nn.relu(tf.layers.batch_normalization(h1, momentum=0.9,epsilon = 1e-5,  training=True))

    h2 = deconv2d(h1, [batch_size, s4, s4, gf_dim * 2], name='g_h2')
    h2 = tf.nn.relu(tf.layers.batch_normalization(h2, momentum=0.9,epsilon = 1e-5,  training=True))

    h3 = deconv2d(h2, [batch_size, s2, s2, gf_dim * 1], name='g_h3')
    h3 = tf.nn.relu(tf.layers.batch_normalization(h3, momentum=0.9,epsilon = 1e-5,  training=True))

    h4 = deconv2d(h3, [batch_size, s, s, 3], name='g_h4')

    return (tf.tanh(h4) / 2. + 0.5)

In [7]:
def discriminator(image, t_text_embedding, reuse=False):
    with tf.variable_scope(tf.get_variable_scope(), reuse=reuse):
        h0 = tf.nn.leaky_relu(conv2d(image, df_dim, name='d_h0_conv'))  # 32
        h1 = tf.nn.leaky_relu(tf.layers.batch_normalization(conv2d(h0, df_dim * 2, name='d_h1_conv'), momentum=0.9,epsilon = 1e-5,  training=True))  # 16
        h2 = tf.nn.leaky_relu(tf.layers.batch_normalization(conv2d(h1, df_dim * 4, name='d_h2_conv'), momentum=0.9,epsilon = 1e-5,  training=True))  # 8
        h3 = tf.nn.leaky_relu(tf.layers.batch_normalization(conv2d(h2, df_dim * 8, name='d_h3_conv'), momentum=0.9,epsilon = 1e-5,  training=True))  # 4

        # ADD TEXT EMBEDDING TO THE NETWORK
        reduced_text_embeddings = tf.nn.leaky_relu(linear_layer(t_text_embedding, t_dim, scope = 'd_embedding'))
        reduced_text_embeddings = tf.expand_dims(reduced_text_embeddings, 1)
        reduced_text_embeddings = tf.expand_dims(reduced_text_embeddings, 2)
        tiled_embeddings = tf.tile(reduced_text_embeddings, [1, 4, 4, 1], name='tiled_embeddings')

        h3_concat = tf.concat([h3, tiled_embeddings], 3, name='h3_concat')
        h3_new = tf.nn.leaky_relu(
            tf.layers.batch_normalization(conv2d(h3_concat, df_dim * 8, 1, 1, 1, 1, name='d_h3_conv_new'), momentum=0.9,epsilon = 1e-5,  training=True))  # 4

        h4 = linear_layer(tf.reshape(h3_new, [batch_size, -1]), 1, scope = 'd_h3_lin')

    return tf.nn.sigmoid(h4), h4

In [8]:
def build_model():
    t_real_image = tf.placeholder('float32', [batch_size, image_size, image_size, 3], name='real_image')
    t_wrong_image = tf.placeholder('float32', [batch_size, image_size, image_size, 3], name='wrong_image')
    t_real_caption = tf.placeholder('float32', [batch_size, caption_vector_length], name='real_caption_input')
    t_z = tf.placeholder('float32', [batch_size, z_dim])

    fake_image = generator(t_z, t_real_caption)

    disc_real_image, disc_real_image_logits = discriminator(t_real_image, t_real_caption)
    disc_wrong_image, disc_wrong_image_logits = discriminator(t_wrong_image, t_real_caption, reuse=True)
    disc_fake_image, disc_fake_image_logits = discriminator(fake_image, t_real_caption, reuse=True)

    g_loss = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=disc_fake_image_logits,
                                                                    labels=tf.ones_like(disc_fake_image)))

    d_loss1 = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=disc_real_image_logits,
                                                                     labels=tf.ones_like(disc_real_image)))
    d_loss2 = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=disc_wrong_image_logits,
                                                                     labels=tf.zeros_like(disc_wrong_image)))
    d_loss3 = tf.reduce_mean(tf.nn.sigmoid_cross_entropy_with_logits(logits=disc_fake_image_logits,
                                                                     labels=tf.zeros_like(disc_fake_image)))

    d_loss = d_loss1 + d_loss2 + d_loss3

    t_vars = tf.trainable_variables()
    d_vars = [var for var in t_vars if 'd_' in var.name]
    g_vars = [var for var in t_vars if 'g_' in var.name]

    input_tensors = {
        't_real_image': t_real_image,
        't_wrong_image': t_wrong_image,
        't_real_caption': t_real_caption,
        't_z': t_z
    }

    variables = {
        'd_vars': d_vars,
        'g_vars': g_vars
    }

    loss = {
        'g_loss': g_loss,
        'd_loss': d_loss
    }

    outputs = {
        'generator': fake_image
    }

    checks = {
        'd_loss1': d_loss1,
        'd_loss2': d_loss2,
        'd_loss3': d_loss3,
        'disc_real_image_logits': disc_real_image_logits,
        'disc_wrong_image_logits': disc_wrong_image,
        'disc_fake_image_logits': disc_fake_image_logits
    }

    return input_tensors, variables, loss, outputs, checks

In [9]:
input_tensors, variables, loss, outputs, checks = build_model()

Instructions for updating:
Use keras.layers.BatchNormalization instead.  In particular, `tf.control_dependencies(tf.GraphKeys.UPDATE_OPS)` should not be used (consult the `tf.keras.layers.batch_normalization` documentation).
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where


In [10]:
def load_training_data(data_dir):
    h = h5py.File(os.path.join(data_dir, 'flower_tv.hdf5'))
    flower_captions = {}
    for ds in h.items():
        flower_captions[ds[0]] = np.array(ds[1])
    image_list = [key for key in flower_captions]
    image_list.sort()

    img_75 = int(len(image_list)*0.75)
    training_image_list = image_list[0:img_75]
    random.shuffle(training_image_list)

    return {
        'image_list' : training_image_list,
        'captions' : flower_captions,
        'data_length' : len(training_image_list)
    }

In [11]:
def pre_process_image(image_file, image_size):
    _img = opencv.imread(image_file)
    _img_scaled = opencv.resize(_img, (image_size, image_size))
    if random.random() > 0.5:
        _img_scaled = np.fliplr(_img_scaled)
    return _img_scaled.astype('float32')

In [12]:
def get_training_batch(batch_no, batch_size, image_size, z_dim, 
    caption_vector_length, split, data_dir, loaded_data = None):
    real_images = np.zeros((batch_size, 64, 64, 3))
    wrong_images = np.zeros((batch_size, 64, 64, 3))
    captions = np.zeros((batch_size, caption_vector_length))

    cnt = 0
    image_files = []
    for i in range(batch_no * batch_size, batch_no * batch_size + batch_size):
        idx = i % len(loaded_data['image_list'])
        image_file =  os.path.join(data_dir, 'flowers/jpg/'+loaded_data['image_list'][idx])
        image_array = pre_process_image(image_file, image_size)
        real_images[cnt,:,:,:] = image_array

        # Improve this selection of wrong image
        wrong_image_id = random.randint(0,len(loaded_data['image_list'])-1)
        wrong_image_file =  os.path.join(data_dir, 'flowers/jpg/'+loaded_data['image_list'][wrong_image_id])
        wrong_image_array = pre_process_image(wrong_image_file, image_size)
        wrong_images[cnt, :,:,:] = wrong_image_array

        random_caption = random.randint(0,4)
        captions[cnt,:] = loaded_data['captions'][ loaded_data['image_list'][idx] ][ random_caption ][0:caption_vector_length]
        image_files.append( image_file )
        cnt += 1

    z_noise = np.random.uniform(-1, 1, [batch_size, z_dim])
    return real_images, wrong_images, captions, z_noise, image_files

In [13]:
d_optim = tf.train.AdamOptimizer(learning_rate, beta1 = beta1).minimize(loss['d_loss'], var_list=variables['d_vars'])
g_optim = tf.train.AdamOptimizer(learning_rate, beta1 = beta1).minimize(loss['g_loss'], var_list=variables['g_vars'])

sess = tf.InteractiveSession()
tf.initialize_all_variables().run()

saver = tf.train.Saver()
if resume_model:
    saver.restore(sess, resume_model)

loaded_data = load_training_data(data_dir)

Instructions for updating:
Use `tf.global_variables_initializer` instead.


In [14]:
for i in range(epochs):
    batch_no = 0
    while batch_no*batch_size < loaded_data['data_length']:
        real_images, wrong_images, caption_vectors, z_noise, image_files = get_training_batch(batch_no, batch_size, 
            image_size, z_dim, caption_vector_length, 'train', data_dir, loaded_data)

        # DISCR UPDATE
        check_ts = [ checks['d_loss1'] , checks['d_loss2'], checks['d_loss3']]
        _, d_loss, gen, d1, d2, d3 = sess.run([d_optim, loss['d_loss'], outputs['generator']] + check_ts,
            feed_dict = {
                input_tensors['t_real_image'] : real_images,
                input_tensors['t_wrong_image'] : wrong_images,
                input_tensors['t_real_caption'] : caption_vectors,
                input_tensors['t_z'] : z_noise,
            })
        print("********************************")
        print("Epoch:",i," , Batch No:",batch_no)
        print("********************************")
        print("First Discriminator Loss: \t", d1)
        print("Second Discriminator Loss: \t", d2)
        print("Third Discriminator Loss: \t", d3)
        print("Total Discriminator Loss: \t", d_loss)

        # GEN UPDATE
        _, g_loss, gen = sess.run([g_optim, loss['g_loss'], outputs['generator']],
            feed_dict = {
                input_tensors['t_real_image'] : real_images,
                input_tensors['t_wrong_image'] : wrong_images,
                input_tensors['t_real_caption'] : caption_vectors,
                input_tensors['t_z'] : z_noise,
            })

        # GEN UPDATE TWICE, to make sure d_loss does not go to 0
        _, g_loss, gen = sess.run([g_optim, loss['g_loss'], outputs['generator']],
            feed_dict = {
                input_tensors['t_real_image'] : real_images,
                input_tensors['t_wrong_image'] : wrong_images,
                input_tensors['t_real_caption'] : caption_vectors,
                input_tensors['t_z'] : z_noise,
            })

        print("Generator Loss: ", g_loss)
        print("")
        batch_no += 1
        if (batch_no % save_every) == 0:
            print("Saving Model...")
            save_path = saver.save(sess, "Data/Models/latest_model_{}_temp.ckpt".format(data_set))
    if i%5 == 0:
        save_path = saver.save(sess, "Data/Models/model_after_{}_epoch_{}.ckpt".format(data_set, i))

********************************
Epoch: 0  , Batch No: 0
********************************
First Discriminator Loss: 	 2.0663457
Second Discriminator Loss: 	 0.35173687
Third Discriminator Loss: 	 0.4354528
Total Discriminator Loss: 	 2.8535352
Generator Loss:  1.0315427

********************************
Epoch: 0  , Batch No: 1
********************************
First Discriminator Loss: 	 0.2915724
Second Discriminator Loss: 	 4.2517543
Third Discriminator Loss: 	 1.4265585
Total Discriminator Loss: 	 5.9698853
Generator Loss:  10.653404

********************************
Epoch: 0  , Batch No: 2
********************************
First Discriminator Loss: 	 5.8490763
Second Discriminator Loss: 	 0.047663122
Third Discriminator Loss: 	 0.00025780228
Total Discriminator Loss: 	 5.8969975
Generator Loss:  3.591416

********************************
Epoch: 0  , Batch No: 3
********************************
First Discriminator Loss: 	 3.4688506
Second Discriminator Loss: 	 0.24122734
Third Discrim

KeyboardInterrupt: 