# Competition 3: Team 21

112062649 王俊皓

112062650 廖士傑

##  Reverse Image Caption

In [1]:
class experimental_settings:
    def __init__(self,
                 enc=True,
                 gen=True,
                 dis=True,
                 enc_do_batchnorm=False,
                 delete_checkpoint=False):
        self.enc = enc
        self.gen = gen
        self.dis = dis
        self.enc_do_batchnorm = enc_do_batchnorm
        self.delete_checkpoint = delete_checkpoint # not implemented yet
        
        # ============================ #
        # automatic
        # ============================ #
        
        self.caption_type = 'sentence' if self.enc else 'id'


expSettings = experimental_settings(enc=True,
                                    gen=True,
                                    dis=True,
                                    delete_checkpoint=True)

## Import

In [2]:
from __future__ import absolute_import, division, print_function, unicode_literals
import tensorflow as tf
from tensorflow.keras import layers
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' 
import string
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import PIL
import random
import time
from pathlib import Path
import math

import re
from IPython import display

GPU check

In [3]:
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # Restrict TensorFlow to only use the first GPU
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')

        # Currently, memory growth needs to be the same across GPUs
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPUs")
    except RuntimeError as e:
        # Memory growth must be set before GPUs have been initialized
        print(e)

1 Physical GPUs, 1 Logical GPUs


## Loading Data

In [4]:
dictionary_path = './dictionary'
vocab = np.load(dictionary_path + '/vocab.npy')
print('there are {} vocabularies in total'.format(len(vocab)))

word2Id_dict = dict(np.load(dictionary_path + '/word2Id.npy'))
id2word_dict = dict(np.load(dictionary_path + '/id2Word.npy'))
print('Word to id mapping, for example: %s -> %s' % ('flower', word2Id_dict['flower']))
print('Id to word mapping, for example: %s -> %s' % ('1', id2word_dict['1']))
print('Tokens: <PAD>: %s; <RARE>: %s' % (word2Id_dict['<PAD>'], word2Id_dict['<RARE>']))

there are 5427 vocabularies in total
Word to id mapping, for example: flower -> 1
Id to word mapping, for example: 1 -> flower
Tokens: <PAD>: 5427; <RARE>: 5428


In [5]:
def sent2IdList(line, MAX_SEQ_LENGTH=20):
    MAX_SEQ_LIMIT = MAX_SEQ_LENGTH
    padding = 0
    
    # data preprocessing, remove all puntuation in the texts
    prep_line = re.sub('[%s]' % re.escape(string.punctuation), ' ', line.rstrip())
    prep_line = prep_line.replace('-', ' ')
    prep_line = prep_line.replace('-', ' ')
    prep_line = prep_line.replace('  ', ' ')
    prep_line = prep_line.replace('.', '')
    tokens = prep_line.split(' ')
    tokens = [
        tokens[i] for i in range(len(tokens))
        if tokens[i] != ' ' and tokens[i] != ''
    ]
    l = len(tokens)
    padding = MAX_SEQ_LIMIT - l
    
    # make sure length of each text is equal to MAX_SEQ_LENGTH, and replace the less common word with <RARE> token
    for i in range(padding):
        tokens.append('<PAD>')
    line = [
        word2Id_dict[tokens[k]]
        if tokens[k] in word2Id_dict else word2Id_dict['<RARE>']
        for k in range(len(tokens))
    ]

    return line

text = "the flower shown has yellow anther red pistil and bright red petals."
print(text)
print(sent2IdList(text))

the flower shown has yellow anther red pistil and bright red petals.
['9', '1', '82', '5', '11', '70', '20', '31', '3', '29', '20', '2', '5427', '5427', '5427', '5427', '5427', '5427', '5427', '5427']


In [6]:
@tf.function
def id2Sent(ids):
    return " ".join([id2word_dict[idx] for idx in ids]).strip()

#def batch_id2Sent(batch_ids):
    #return [id2Sent(ids) for ids in batch_ids]
    
def batch_id2Sent(batch_ids):
    def process_single(ids):
        # Convert a single tensor of IDs to a sentence
        ids = ids.numpy()  # Convert Tensor to NumPy
        sentence = " ".join([id2word_dict.get(idx, "<UNK>") for idx in ids])  # Handle unknown IDs
        return sentence

    # Use tf.py_function to apply Python function inside the TensorFlow graph
    sentences = tf.map_fn(
        lambda ids: tf.py_function(process_single, [ids], tf.string),
        batch_ids,
        fn_output_signature=tf.string
    )
    return sentences


print(sent2IdList(text))
print(id2Sent(sent2IdList(text)))

['9', '1', '82', '5', '11', '70', '20', '31', '3', '29', '20', '2', '5427', '5427', '5427', '5427', '5427', '5427', '5427', '5427']
tf.Tensor(b'the flower shown has yellow anther red pistil and bright red petals <PAD> <PAD> <PAD> <PAD> <PAD> <PAD> <PAD> <PAD>', shape=(), dtype=string)


In [7]:
data_path = './dataset'
text2ImgData = pd.read_pickle(data_path + '/text2ImgData.pkl')
num_training_sample = len(text2ImgData)
n_images_train = num_training_sample
print('There are %d image in training data' % (n_images_train))

There are 7370 image in training data


In [8]:
def caption2string(cap):
    output = []
    for sen in cap:
        s = " ".join([id2word_dict[idx] for idx in sen]).strip()
        output.append(s.split(' <PAD>')[0])
    return output

# adding caption as strings
text2ImgData['Captions_string'] = text2ImgData['Captions'].apply(caption2string)

In [9]:
text2ImgData.head(5)

Unnamed: 0_level_0,Captions,ImagePath,Captions_string
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
6734,"[[9, 2, 17, 9, 1, 6, 14, 13, 18, 3, 41, 8, 11,...",./102flowers/image_06734.jpg,[the petals of the flower are pink in color an...
6736,"[[4, 1, 5, 12, 2, 3, 11, 31, 28, 68, 106, 132,...",./102flowers/image_06736.jpg,[this flower has white petals and yellow pisti...
6737,"[[9, 2, 27, 4, 1, 6, 14, 7, 12, 19, 5427, 5427...",./102flowers/image_06737.jpg,[the petals on this flower are pink with white...
6738,"[[9, 1, 5, 8, 54, 16, 38, 7, 12, 116, 325, 3, ...",./102flowers/image_06738.jpg,[the flower has a smooth purple petal with whi...
6739,"[[4, 12, 1, 5, 29, 11, 19, 7, 26, 70, 5427, 54...",./102flowers/image_06739.jpg,[this white flower has bright yellow stamen wi...


In [10]:
text2ImgData['Captions_string'][:1].tolist()

[['the petals of the flower are pink in color and have a yellow center',
  'this flower is pink and white in color with petals that are multi colored',
  'the purple petals have shades of white with white anther and filament',
  'this flower has large pink petals and a white stigma in the center',
  'this flower has petals that are pink and has a yellow stamen',
  'a flower with short and wide petals that is light purple',
  'this flower has small pink petals with a yellow center',
  'this flower has large rounded pink petals with curved edges and purple veins',
  'this flower has purple petals as well as a white stamen']]

In [11]:
# in this competition, you have to generate image in size 64x64x3
IMAGE_SIZE = 64
IMAGE_HEIGHT = IMAGE_SIZE
IMAGE_WIDTH = IMAGE_SIZE
IMAGE_CHANNEL = 3

def training_data_generator(caption, image_path, caption_type='id'):
    # load in the image according to image path
    img = tf.io.read_file(image_path)
    img = tf.image.decode_image(img, channels=3)
    img = tf.image.convert_image_dtype(img, tf.float32)
    img.set_shape([None, None, 3])
    img = tf.image.resize(img, size=[IMAGE_HEIGHT, IMAGE_WIDTH])
    img.set_shape([IMAGE_HEIGHT, IMAGE_WIDTH, IMAGE_CHANNEL])
    if caption_type == 'id':
        caption = tf.cast(caption, tf.int32)
    elif caption_type == 'sentence':
        caption = tf.convert_to_tensor(caption, dtype=tf.string)

    return img, caption

def dataset_generator(filenames, batch_size, data_generator, caption_type='id'):
    # load the training data into two NumPy arrays
    if filenames != None:
        df = pd.read_pickle(filenames)
    else:
        df = text2ImgData
    
    if caption_type == 'id':
        captions = df['Captions'].values
    elif caption_type == 'sentence':
        captions = df['Captions_string'].values
    else:
        raise ValueError('for dataset_generator, caption_type= should be \'id\' or \'sentence\'.')
        
    caption = []
    # each image has 1 to 10 corresponding captions
    # we choose one of them randomly for training
    
    # ============================================ #
    # TODO: augmentation
    # idea 1 (difficulty: easy)
    #     training data has multiple captions, right now it picks a random one.
    #     we can make it so that every caption is an entry and multiple captions link to the same image.
    # idea 2 (difficulty: medium)
    #     after text embedding, use the average of 2 caption embeddings to generate a new caption.
    #     the data does not need to have an image tied to it, it just have the label 0 (fake image).
    # ============================================ #
    for i in range(len(captions)):
        caption.append(random.choice(captions[i]))
    caption = np.asarray(caption)
    
    if caption_type == 'id':
        caption = caption.astype(np.int)
        
    image_path = df['ImagePath'].values
    
    # assume that each row of `features` corresponds to the same row as `labels`.
    assert caption.shape[0] == image_path.shape[0]
    
    datagen_func = lambda cap, img: data_generator(cap, img, caption_type=caption_type)
    
    dataset = tf.data.Dataset.from_tensor_slices((caption, image_path))
    dataset = dataset.map(datagen_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.shuffle(len(caption)).batch(batch_size, drop_remainder=True)
    dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

    return dataset

In [12]:
BATCH_SIZE = 64
dataset = dataset_generator(
    #data_path + '/text2ImgData.pkl',
    None,
    BATCH_SIZE, 
    training_data_generator, 
    caption_type=expSettings.caption_type)

In [13]:
# dataset testing ground
for img, cap in dataset.take(1):
    print("Image shape:", img.numpy().shape)
    print("Caption shape:", cap.numpy().shape)

Image shape: (64, 64, 64, 3)
Caption shape: (64,)


In [14]:
from tensorflow.keras.layers import Conv2DTranspose, Conv2D, BatchNormalization, LeakyReLU

# custom layers
class flattened_dense(tf.keras.layers.Layer):
    """
    a dense layer that is made compatible with convolution layers
    by flattening the input first and followed by a dense layer.
    """
    def __init__(self, channels=64, kernel_initializer="glorot_uniform"):
        super().__init__()
        self.flatten = tf.keras.layers.Flatten()
        self.dense = tf.keras.layers.Dense(channels, kernel_initializer=kernel_initializer)
        
    def call(self, inputs):
        fl = self.flatten(inputs)
        return self.dense(fl)
    
class conv_block(tf.keras.layers.Layer):
    """
    a convolution layer with batch normalization and leaky relu activation
    """
    def __init__(self, filters=128, kernel_size=1, strides=1, kernel_initializer='glorot_uniform'):
        super().__init__()
        self.conv = Conv2D(filters=filters,
                           kernel_size = (kernel_size, kernel_size),
                           strides=(strides, strides),
                           padding='same',
                           kernel_initializer=kernel_initializer)
        self.bn = BatchNormalization()
        self.activation = LeakyReLU(alpha=0.1)
    
    def call(self, inputs):
        return self.activation(self.bn(self.conv(inputs)))
    
class deconv_block(tf.keras.layers.Layer):
    """
    a deconvolution layer with batch normalization and leaky relu activation
    """
    def __init__(self, filters=128, kernel_size=3, strides=2, kernel_initializer='glorot_uniform'):
        super().__init__()
        self.deconv = Conv2DTranspose(filters=filters,
                                    kernel_size = (kernel_size, kernel_size),
                                    strides=(strides, strides),
                                    padding='same',
                                    kernel_initializer=kernel_initializer)
        self.bn = BatchNormalization()
        self.activation = LeakyReLU(alpha=0.1)
    
    def call(self, inputs):
        return self.activation(self.bn(self.deconv(inputs)))

In [15]:
import tensorflow_hub as hub

class TextEncoder(tf.keras.Model):
    """
    Encode text (a caption) into hidden representation
    input: text, which is a list of ids
    output: embedding, or hidden representation of input text in dimension of RNN_HIDDEN_SIZE
    """
    def __init__(self, hparas, experimental=False, do_batchnorm=False):
        super(TextEncoder, self).__init__()
        self.exp=experimental
        self.do_batchnorm = do_batchnorm
        self.hparas = hparas
        self.batch_size = self.hparas['BATCH_SIZE']
        
        # embedding with tensorflow API
        self.embedding = layers.Embedding(self.hparas['VOCAB_SIZE'], self.hparas['EMBED_DIM'])
        # RNN, here we use GRU cell, another common RNN cell similar to LSTM
        self.gru = layers.GRU(self.hparas['RNN_HIDDEN_SIZE'],
                              return_sequences=True,
                              return_state=True,
                              recurrent_initializer='glorot_uniform')
        if self.exp:
            self.embed = hub.load('./checkpoints/universal_sentence_encoder')
    
    def call(self, text, hidden):
        if self.exp:
            with tf.device('/CPU:0'): # TODO if you find a way to use GPU, go for it.
                output_last = self.embed(text)
                
            state = hidden # not updating state for compatibility reasons
            
        else:
            text = self.embedding(text)
            output, state = self.gru(text, initial_state = hidden)
            output_last = output[:, -1, :]
        
        # normalization in-batch
        if self.do_batchnorm:
            mean = tf.reduce_mean(output_last, axis=0, keepdims=True)  # Mean across the batch
            std = tf.math.reduce_std(output_last, axis=0, keepdims=True)  # Std across the batch
            normalized = (output_last - mean) / (std + 1e-6)  # Avoid division by zero
        else:
            normalized = output_last
        
        return normalized, state
    
    def initialize_hidden_state(self):
        return tf.zeros((self.hparas['BATCH_SIZE'], self.hparas['RNN_HIDDEN_SIZE']))

In [16]:
class Generator(tf.keras.Model):
    """
    Generate fake image based on given text(hidden representation) and noise z
    input: text and noise
    output: fake image with size 64*64*3
    """
    def __init__(self, hparas, experimental=False):
        super(Generator, self).__init__()
        self.exp = experimental
        self.hparas = hparas
        self.flatten = tf.keras.layers.Flatten()
        self.d1 = tf.keras.layers.Dense(self.hparas['DENSE_DIM'])
        self.d2 = tf.keras.layers.Dense(64*64*3)
        if self.exp:
            self.deconv_depth = int(math.log(IMAGE_SIZE, 2)) - 1
            self.dnoise = tf.keras.layers.Dense(256)
            self.starter = tf.keras.layers.Dense(2*2*256)
            self.deconv = [deconv_block(filters=256) for i in range(self.deconv_depth)]
            self.conv_in_deconv = [conv_block(filters=128, kernel_size=3) for i in range(self.deconv_depth)]
            self.head1 = conv_block(filters=128, kernel_size=3, strides=1)
            self.head2 = conv_block(filters=64, kernel_size=3, strides=1)
            self.head3 = conv_block(filters=16, kernel_size=1, strides=1)
            self.headf = conv_block(filters=3, kernel_size=1, strides=1)
            
    def call(self, text, noise_z, debug_output=False):
        text = self.flatten(text)
        text = self.d1(text)
        text = tf.nn.leaky_relu(text)
        
        # deconvolution
        if self.exp:
            dnoise = self.dnoise(noise_z)
            noisy_text = tf.concat([text, dnoise], axis=1)
            img = self.starter(noisy_text)
            img = tf.reshape(img, [-1, 2, 2, 256])
            debug = []
            for i in range(int(math.log(IMAGE_SIZE, 2))):
                debug.append(img)
                img = self.deconv[i](img)
                img = self.conv_in_deconv[i](img)

            img = self.head1(img)
            img = self.head2(img)
            img = self.head3(img)
            img = self.headf(img)
            #helper = self.d2(tf.concat([noise_z, text], axis=1))
            #helper = tf.reshape(helper, [-1, 64, 64, 3])
            logits = tf.reshape(img, [-1, IMAGE_SIZE, IMAGE_SIZE, 3])
            #logits = logits + helper
            output = tf.nn.tanh(logits)
        
        # concatenate input text and random noise
        else:
            text_concat = tf.concat([noise_z, text], axis=1)
            text_concat = self.d2(text_concat)
        
            logits = tf.reshape(text_concat, [-1, 64, 64, 3])
            output = tf.nn.tanh(logits)
            debug_output = output
        
        if debug_output:
            return logits, output, debug
        else:
            return logits, output

In [17]:
from tensorflow.keras.applications import ResNet50

class Discriminator(tf.keras.Model):
    """
    Differentiate the real and fake image
    input: image and corresponding text
    output: labels, the real image should be 1, while the fake should be 0
    """
    def __init__(self, hparas, experimental=False):
        super(Discriminator, self).__init__()
        self.exp = experimental
        self.hparas = hparas
        if self.exp:
            self.resnet_base = ResNet50(input_shape=(IMAGE_SIZE, IMAGE_SIZE, 3), weights='imagenet', include_top=False)
            for layer in self.resnet_base.layers:
                layer.trainable = False
        self.flatten = tf.keras.layers.Flatten()
        self.d_text = tf.keras.layers.Dense(self.hparas['DENSE_DIM'])
        self.d_img = tf.keras.layers.Dense(self.hparas['DENSE_DIM'])
        self.d = tf.keras.layers.Dense(1)
    
    def call(self, img, text):
        text = self.flatten(text)
        text = self.d_text(text)
        text = tf.nn.leaky_relu(text)
        
        if self.exp:
            img = self.resnet_base(img)
        img = self.flatten(img)
        img = self.d_img(img)
        img = tf.nn.leaky_relu(img)
        
        # concatenate image with paired text
        img_text = tf.concat([text, img], axis=1)
        
        logits = self.d(img_text)
        output = tf.nn.sigmoid(logits)
        
        return logits, output

Parameters and settings

In [18]:
hparas = {
    'MAX_SEQ_LENGTH': 20,                     # maximum sequence length
    'EMBED_DIM': 256,                         # word embedding dimension
    'VOCAB_SIZE': len(word2Id_dict),          # size of dictionary of captions
    'RNN_HIDDEN_SIZE': 128,                   # number of RNN neurons
    'Z_DIM': 512,                             # random noise z dimension
    'DENSE_DIM': 128,                         # number of neurons in dense layer
    'IMAGE_SIZE': [64, 64, 3],                # render image size
    'BATCH_SIZE': 64,
    'LR_GEN': 1e-3,
    'LR_DIS': 1e-4,
    'LR_DECAY': 0.5,                          # unused
    'BETA_1': 0.5,
    'N_EPOCH': 50,                            # number of epoch for demo
    'N_SAMPLE': num_training_sample,          # size of training data
    'CHECKPOINTS_DIR': './checkpoints/demo',  # checkpoint path
    'PRINT_FREQ': 1                           # printing frequency of loss
}

In [19]:
text_encoder = TextEncoder(hparas, 
                           experimental=expSettings.enc,
                           do_batchnorm=expSettings.enc_do_batchnorm)

generator = Generator(hparas,
                      experimental=expSettings.gen)

discriminator = Discriminator(hparas,
                              experimental=expSettings.dis)

In [20]:
# test text encoder
for img, cap in dataset.take(1):
    print("Image shape:", img.numpy().shape)
    print("Caption shape:", cap.shape)
    with tf.device('/CPU:0'):
        output, _ = text_encoder(cap, 0)
        print("Caption embed shape:", output.shape)

Image shape: (64, 64, 64, 3)
Caption shape: (64,)
Caption embed shape: (64, 512)


In [21]:
# This method returns a helper function to compute cross entropy loss
cross_entropy = tf.keras.losses.BinaryCrossentropy(from_logits=True)

In [22]:
def discriminator_loss(real_logits, fake_logits):
    # output value of real image should be 1
    real_loss = cross_entropy(tf.ones_like(real_logits), real_logits)
    # output value of fake image should be 0
    fake_loss = cross_entropy(tf.zeros_like(fake_logits), fake_logits)
    total_loss = real_loss + fake_loss
    return total_loss

def generator_loss(fake_output):
    # output value of fake image should be 0
    return cross_entropy(tf.ones_like(fake_output), fake_output)

In [23]:
# we use seperated optimizers for training generator and discriminator
generator_optimizer = tf.keras.optimizers.Adam(hparas['LR_GEN'], clipvalue=10.0)
discriminator_optimizer = tf.keras.optimizers.Adam(hparas['LR_DIS'], clipvalue=10.0)

In [24]:
if expSettings.delete_checkpoint:
    # TODO remove old checkpoint files
    pass

# one benefit of tf.train.Checkpoint() API is we can save everything seperately
checkpoint_dir = hparas['CHECKPOINTS_DIR']
checkpoint_prefix = os.path.join(checkpoint_dir, "ckpt")
checkpoint = tf.train.Checkpoint(generator_optimizer=generator_optimizer,
                                 discriminator_optimizer=discriminator_optimizer,
                                 text_encoder=text_encoder,
                                 generator=generator,
                                 discriminator=discriminator)

In [25]:
@tf.function
def train_step(real_image, caption, hidden, imshow=False):
    # random noise for generator
    noise = tf.random.normal(shape=[hparas['BATCH_SIZE'], hparas['Z_DIM']], mean=0.0, stddev=0.1)
    
    with tf.GradientTape() as gen_tape, tf.GradientTape() as disc_tape:
        text_embed, hidden = text_encoder(caption, hidden)
        _, fake_image = generator(text_embed, noise)
        if imshow:
            plt.imshow(fake_image[0])

        real_logits, real_output = discriminator(real_image, text_embed)
        fake_logits, fake_output = discriminator(fake_image, text_embed)

        g_loss = generator_loss(fake_logits)
        d_loss = discriminator_loss(real_logits, fake_logits)

    grad_g = gen_tape.gradient(g_loss, generator.trainable_variables)
    grad_d = disc_tape.gradient(d_loss, discriminator.trainable_variables)

    generator_optimizer.apply_gradients(zip(grad_g, generator.trainable_variables))
    discriminator_optimizer.apply_gradients(zip(grad_d, discriminator.trainable_variables))
    
    return g_loss, d_loss

In [26]:
@tf.function
def test_step(caption, noise, hidden):
    text_embed, hidden = text_encoder(caption, hidden)
    _, fake_image = generator(text_embed, noise)
    return fake_image

Sample Debugging (unused)

In [27]:
def merge(images, size):
    h, w = images.shape[1], images.shape[2]
    img = np.zeros((h * size[0], w * size[1], 3))
    for idx, image in enumerate(images):
        i = idx % size[1]
        j = idx // size[1]
        img[j*h:j*h+h, i*w:i*w+w, :] = image
    return img

def imsave(images, size, path):
    # getting the pixel values between [0, 1] to save it
    return plt.imsave(path, merge(images, size)*0.5 + 0.5)

def save_images(images, size, image_path):
    return imsave(images, size, image_path)

In [28]:
def sample_generator(caption, batch_size, caption_type='id'):
    if caption_type == 'sentence':
        caption = caption2string(caption)
    caption = np.asarray(caption)
    if caption_type == 'id':
        caption = caption.astype(np.int)
    dataset = tf.data.Dataset.from_tensor_slices(caption)
    dataset = dataset.batch(batch_size)
    return dataset

In [29]:
ni = int(np.ceil(np.sqrt(hparas['BATCH_SIZE'])))
sample_size = hparas['BATCH_SIZE']
sample_seed = np.random.normal(loc=0.0, scale=1.0, size=(sample_size, hparas['Z_DIM'])).astype(np.float32)
sample_sentence = ["the flower shown has yellow anther red pistil and bright red petals."] * int(sample_size/ni) + \
                  ["this flower has petals that are yellow, white and purple and has dark lines"] * int(sample_size/ni) + \
                  ["the petals on this flower are white with a yellow center"] * int(sample_size/ni) + \
                  ["this flower has a lot of small round pink petals."] * int(sample_size/ni) + \
                  ["this flower is orange in color, and has petals that are ruffled and rounded."] * int(sample_size/ni) + \
                  ["the flower has yellow petals and the center of it is brown."] * int(sample_size/ni) + \
                  ["this flower has petals that are blue and white."] * int(sample_size/ni) +\
                  ["these white flowers have petals that start off white in color and end in a white towards the tips."] * int(sample_size/ni)

for i, sent in enumerate(sample_sentence):
    sample_sentence[i] = sent2IdList(sent)
sample_sentence = sample_generator(sample_sentence, hparas['BATCH_SIZE'], caption_type=expSettings.caption_type)

In [30]:
# test the sample dataset
for cap in sample_sentence.take(1):
    print("Caption shape:", cap.numpy().shape)
    emb, _ = text_encoder(cap, None)
    print("Caption embeddings:", emb)

Caption shape: (64,)
Caption embeddings: tf.Tensor(
[[-0.01346336  0.06881763 -0.05529514 ... -0.01518281 -0.00633275
   0.01500697]
 [-0.01346336  0.06881763 -0.05529514 ... -0.01518281 -0.00633275
   0.01500697]
 [-0.01346336  0.06881763 -0.05529514 ... -0.01518281 -0.00633275
   0.01500697]
 ...
 [-0.00790838  0.06233827  0.01107207 ... -0.02398296 -0.03450323
  -0.00868433]
 [-0.00790838  0.06233827  0.01107207 ... -0.02398296 -0.03450323
  -0.00868433]
 [-0.00790838  0.06233827  0.01107207 ... -0.02398296 -0.03450323
  -0.00868433]], shape=(64, 512), dtype=float32)


In [31]:
if not os.path.exists('samples/demo'):
    os.makedirs('samples/demo')

Training and testing

In [32]:
def train(dataset, epochs):
    # hidden state of RNN
    hidden = text_encoder.initialize_hidden_state()
    steps_per_epoch = int(hparas['N_SAMPLE']/hparas['BATCH_SIZE'])
    
    for epoch in range(hparas['N_EPOCH']):
        g_total_loss = 0
        d_total_loss = 0
        start = time.time()
        imshow = False
        
        for image, caption in dataset:
            g_loss, d_loss = train_step(image, caption, hidden, imshow=imshow)
            imshow = False
            g_total_loss += g_loss
            d_total_loss += d_loss
            
        time_tuple = time.localtime()
        time_string = time.strftime("%m/%d/%Y, %H:%M:%S", time_tuple)
            
        print("Epoch {}, gen_loss: {:.4f}, disc_loss: {:.4f}".format(epoch+1,
                                                                     g_total_loss/steps_per_epoch,
                                                                     d_total_loss/steps_per_epoch))
        print('Time for epoch {} is {:.4f} sec'.format(epoch+1, time.time()-start))
        
        print('======================================')
        
        # save the model
        if True:
            checkpoint.save(file_prefix = checkpoint_prefix)
        
        # visualization
        if (epoch + 1) % hparas['PRINT_FREQ'] == 0:
            for caption in sample_sentence:
                fake_image = test_step(caption, sample_seed, hidden)
            save_images(fake_image, [ni, ni], 'samples/demo/train_{:02d}.jpg'.format(epoch))

In [33]:
train(dataset, hparas['N_EPOCH'])

  "shape. This may consume a large amount of memory." % value)


Epoch 1, gen_loss: 0.7468, disc_loss: 1.4494
Time for epoch 1 is 42.7817 sec
Epoch 2, gen_loss: 1.8876, disc_loss: 0.3602
Time for epoch 2 is 25.8533 sec
Epoch 3, gen_loss: 2.9052, disc_loss: 0.1175
Time for epoch 3 is 25.5019 sec
Epoch 4, gen_loss: 3.6762, disc_loss: 0.0530
Time for epoch 4 is 24.9849 sec
Epoch 5, gen_loss: 4.3117, disc_loss: 0.0279
Time for epoch 5 is 24.3139 sec
Epoch 6, gen_loss: 4.7637, disc_loss: 0.0170
Time for epoch 6 is 25.0501 sec
Epoch 7, gen_loss: 5.1496, disc_loss: 0.0116
Time for epoch 7 is 25.4220 sec
Epoch 8, gen_loss: 5.4737, disc_loss: 0.0084
Time for epoch 8 is 25.2321 sec
Epoch 9, gen_loss: 5.7559, disc_loss: 0.0063
Time for epoch 9 is 24.8955 sec
Epoch 10, gen_loss: 6.0287, disc_loss: 0.0048
Time for epoch 10 is 24.3862 sec
Epoch 11, gen_loss: 6.2457, disc_loss: 0.0039
Time for epoch 11 is 24.9875 sec
Epoch 12, gen_loss: 6.4476, disc_loss: 0.0031
Time for epoch 12 is 24.2506 sec
Epoch 13, gen_loss: 6.6298, disc_loss: 0.0026
Time for epoch 13 is 34.

In [34]:
def test_caption2string(ls):
    return " ".join([id2word_dict[idx] for idx in ls]).strip().split(' <PAD>')[0]

def testing_data_generator(caption, index, caption_type='id'):
    if caption_type == 'id':
        caption = tf.cast(caption, tf.float32)
    return caption, index

def testing_dataset_generator(batch_size, data_generator, caption_type='id'):
    data = pd.read_pickle('./dataset/testData.pkl')
    
    if caption_type == 'sentence':
        data['Captions_string'] = data['Captions'].apply(test_caption2string)
        captions = data['Captions_string'].values
    elif caption_type == 'id':
        captions = data['Captions'].values
        
    caption = []
    for i in range(len(captions)):
        caption.append(captions[i])
    caption = np.asarray(caption)
    
    if caption_type == 'id':
        caption = caption.astype(np.int)
        
    datagen_func = lambda cap, img: data_generator(cap, img, caption_type=caption_type)
        
    index = data['ID'].values
    index = np.asarray(index)
    
    dataset = tf.data.Dataset.from_tensor_slices((caption, index))
    dataset = dataset.map(datagen_func, num_parallel_calls=tf.data.experimental.AUTOTUNE)
    dataset = dataset.repeat().batch(batch_size)
    
    return dataset

In [35]:
testing_dataset = testing_dataset_generator(hparas['BATCH_SIZE'], testing_data_generator, caption_type=expSettings.caption_type)

In [36]:
# test the testing dataset
for cap, img in testing_dataset.take(1):
    print("Image shape:", img.numpy().shape)
    print("Caption shape:", cap.numpy().shape)

Image shape: (64,)
Caption shape: (64,)


In [37]:
data = pd.read_pickle('./dataset/testData.pkl')
captions = data['Captions'].values

NUM_TEST = len(captions)
EPOCH_TEST = int(NUM_TEST / hparas['BATCH_SIZE'])

In [38]:
if not os.path.exists('./inference/demo'):
    os.makedirs('./inference/demo')

In [39]:
def inference(dataset):
    hidden = text_encoder.initialize_hidden_state()
    sample_size = hparas['BATCH_SIZE']
    sample_seed = np.random.normal(loc=0.0, scale=0.1, size=(sample_size, hparas['Z_DIM'])).astype(np.float32)
    print(sample_seed[0:3, :])
    
    step = 0
    start = time.time()
    for captions, idx in dataset:
        if step > EPOCH_TEST:
            break
        
        fake_image = test_step(captions, sample_seed, hidden)
        step += 1
        for i in range(hparas['BATCH_SIZE']):
            plt.imsave('./inference/demo/inference_{:04d}.jpg'.format(idx[i]), fake_image[i].numpy()*0.5 + 0.5)
            
            if i == 0 and step == 1: 
                #print(captions)
                text_embed_t, hidden_t = text_encoder(captions, hidden)
                #print(text_embed_t)
                #print(fake_image[0:1, 0:5, 0:5, :])
                _, _, debug = generator(text_embed_t, sample_seed, debug_output=True)
                print(debug[0][0:3, 0:5, 0:5, 0:5])
                print(debug[1][0:3, 0:5, 0:5, 0:5])
                print(debug[2][0:3, 0:5, 0:5, 0:5])
                print(debug[3][0:3, 0:5, 0:5, 0:5])
                print(debug[4][0:3, 0:5, 0:5, 0:5])
                print(debug[5][0:3, 0:5, 0:5, 0:5])
                pred_logit, pred = discriminator(fake_image, text_embed_t)
                #print(pred_logit)
                
            
    print('Time for inference is {:.4f} sec'.format(time.time()-start))

In [40]:
#checkpoint.restore(checkpoint_dir + f'/ckpt-50')
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
if latest_checkpoint:
    print(f"Restoring from {latest_checkpoint}")
    checkpoint.restore(latest_checkpoint)
else:
    print("No checkpoint found.")

Restoring from ./checkpoints/demo\ckpt-50


In [41]:
inference(testing_dataset)

[[ 0.03997521  0.12315663  0.0012033  ...  0.09444813  0.04159125
  -0.0623869 ]
 [ 0.02975813 -0.0556995  -0.02886274 ...  0.06946493  0.00538529
  -0.11472443]
 [ 0.03350148 -0.11274898  0.0861249  ... -0.02996353  0.0239036
  -0.06305609]]
tf.Tensor(
[[[[ 0.51452786  0.31561685  0.24741508  0.53797287 -0.08812072]]]


 [[[ 0.42233303  0.41356817  0.1980545   0.55985254 -0.09268352]]]


 [[[ 0.43788376  0.42990467  0.22601737  0.56862533 -0.09424648]]]], shape=(3, 1, 1, 5), dtype=float32)
tf.Tensor(
[[[[-0.04447858 -0.05870918 -0.07735799 -0.01680809 -0.07863136]
   [-0.05671952 -0.03642096 -0.02854978  0.40831465 -0.03101559]]

  [[-0.04036732 -0.05539403 -0.04838633 -0.01496394 -0.05411673]
   [-0.04058008 -0.04114917 -0.05409572 -0.00714706 -0.03851094]]]


 [[[-0.04692612 -0.06288823 -0.08255851 -0.01891819 -0.08199824]
   [-0.06052252 -0.03786369 -0.02938552  0.39597896 -0.02984612]]

  [[-0.04122457 -0.06067694 -0.05251662 -0.01640655 -0.05995019]
   [-0.04510291 -0.04418432 -0

Time for inference is 2.8339 sec


In [42]:
%cd ./testing
!python inception_score.py ../inference/demo output.csv 21
%cd ../

C:\Users\User\Courses\24aut_deep_learning\24aut-deep-learning\deep-learning-comp3\testing
1 Physical GPUs, 1 Logical GPUs
--------------Evaluation Success-----------------
C:\Users\User\Courses\24aut_deep_learning\24aut-deep-learning\deep-learning-comp3


In [43]:
text_encoder.summary()

Model: "text_encoder"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 embedding (Embedding)       multiple                  0 (unused)
                                                                 
 gru (GRU)                   multiple                  0 (unused)
                                                                 
Total params: 0
Trainable params: 0
Non-trainable params: 0
_________________________________________________________________


In [44]:
generator.summary()

Model: "generator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten (Flatten)           multiple                  0         
                                                                 
 dense (Dense)               multiple                  65664     
                                                                 
 dense_1 (Dense)             multiple                  0 (unused)
                                                                 
 dense_2 (Dense)             multiple                  131328    
                                                                 
 deconv_block (deconv_block)  multiple                 886016    
                                                                 
 deconv_block_1 (deconv_bloc  multiple                 2655488   
 k)                                                              
                                                         

In [45]:
discriminator.summary()

Model: "discriminator"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 resnet50 (Functional)       (None, 2, 2, 2048)        23587712  
                                                                 
 flatten_1 (Flatten)         multiple                  0         
                                                                 
 dense_9 (Dense)             multiple                  65664     
                                                                 
 dense_10 (Dense)            multiple                  1048704   
                                                                 
 dense_11 (Dense)            multiple                  257       
                                                                 
Total params: 24,702,337
Trainable params: 1,114,625
Non-trainable params: 23,587,712
_________________________________________________________________
