<a href="https://colab.research.google.com/github/viswambhar-yasa/image_captioning/blob/master/training_policy_net.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!git clone https://github.com/viswambhar-yasa/image_captioning

Cloning into 'image_captioning'...
remote: Enumerating objects: 19, done.[K
remote: Counting objects: 100% (19/19), done.[K
remote: Compressing objects: 100% (14/14), done.[K
remote: Total 19 (delta 6), reused 17 (delta 4), pack-reused 0[K
Unpacking objects: 100% (19/19), done.


In [None]:
cd /content/image_captioning

/content/image_captioning


In [None]:
! python3 /content/image_captioning/data_extraction.py

In [55]:
import tensorflow as tf
from tensorflow.keras.layers import GlobalAveragePooling2D, Dense, Embedding, LSTM, BatchNormalization, Bidirectional
from tensorflow.keras.applications import Xception, InceptionV3
from tensorflow.keras.models import Model
from tensorflow.python.keras.layers.recurrent import GRU


def image_encoder(img_input, trainable_layers=0, CNN_Type='Xception', Embed_Size=256, display=False):
    print('Building CNN model')
    if CNN_Type == 'Xception':
        cnn_pre_trained_model = Xception(include_top=False, weights='imagenet', input_tensor=img_input)
    else:
        cnn_pre_trained_model = InceptionV3(include_top=False, weights='imagenet', input_tensor=img_input)
    for i, layer in enumerate(cnn_pre_trained_model.layers):
        if len(cnn_pre_trained_model.layers) - i < trainable_layers:
            layer.trainable = True
        else:
            layer.trainable = False
    cnn_inputs = cnn_pre_trained_model.inputs
    base_model = cnn_pre_trained_model.output
    base_model = GlobalAveragePooling2D(name='global_average_pooling')(base_model)
    embed_image = tf.keras.layers.Dense(Embed_Size, activation='tanh', name='embed_image')(base_model)
    feature_extraction_model = Model(inputs=cnn_inputs, outputs=embed_image, name='CNN encoder model')
    print('CNN model {output shape}:', embed_image.shape)
    if display:
        tf.keras.utils.plot_model(feature_extraction_model, to_file='base_model.png', show_shapes=True)
    return feature_extraction_model


def txt_decoder(rnn_input, Embed_Size=256, Bi_Direction=False, RNN_Type='LSTM', RNN_Layers=2):
    print('Building RNN model')
    for i in range(RNN_Layers):
        x = BatchNormalization()(rnn_input)
        print(i)
        if RNN_Type == 'LSTM':
            if i == (RNN_Layers - 1):
                if Bi_Direction:
                    rnn_out = Bidirectional(LSTM(int(Embed_Size/2)))(x)
                else:
                    rnn_out = LSTM(Embed_Size)(x)
            else:
                if Bi_Direction:
                    rnn_out = Bidirectional(LSTM(int(Embed_Size/2), return_sequences=True))(x)
                else:
                    rnn_out = LSTM(Embed_Size, return_sequences=True)(x)
        else:
            if i == (RNN_Layers - 1):
                if Bi_Direction:
                    rnn_out = Bidirectional(GRU(Embed_Size))(x)
                else:
                    rnn_out = GRU(Embed_Size)(x)
            else:
                if Bi_Direction:
                    rnn_out = Bidirectional(GRU(Embed_Size/2, return_sequences=True))(x)
                else:
                    rnn_out = GRU(Embed_Size, return_sequences=True)(x)
        rnn_input = rnn_out
    return rnn_out


def Caption_model_gen(NET, img_shape=(256, 256, 3), vocab_size=8763, Embed_Size=256, max_length=20, display=False):
    img_input = tf.keras.Input(shape=img_shape)
    cnn_model = image_encoder(img_input, trainable_layers=0, CNN_Type='Xception', display=False)
    embed_image = tf.keras.layers.Dense(Embed_Size, activation='tanh')(cnn_model.output)

    text_input = tf.keras.Input(shape=(max_length,))
    Embedding_layer = Embedding(input_dim=vocab_size, output_dim=Embed_Size, input_length=max_length, mask_zero=True)(
        text_input)

    whole_seq_output = txt_decoder(Embedding_layer, Embed_Size=Embed_Size,
                                                                          Bi_Direction=True, RNN_Type='LSTM',
                                                                          RNN_Layers=3)
    print('final_carry_state {rnn output shape}:', whole_seq_output.shape)
    rnn_output = whole_seq_output
    if NET == 'policy':
        image_txt_embed = tf.keras.layers.add([embed_image, rnn_output])
        print('Image and text {add shape}:', image_txt_embed.shape)
        policy_net_output = tf.keras.layers.Dense(vocab_size, activation='softmax')(image_txt_embed)
        policy_net_model = Model(inputs=[img_input, text_input], outputs=policy_net_output, name='Policy_Net')

        print('output {shape}', policy_net_output.shape)
        print('Policy Net built successfully \n')
        if display:
            tf.keras.utils.plot_model(policy_net_model, to_file='policy_net.png', show_shapes=True)
        return policy_net_model
    elif NET == 'value':
        image_txt_embed = tf.keras.layers.concatenate([embed_image, rnn_output], axis=-1)
        print('Image and text {concat shape}:', image_txt_embed.shape)
        hidden_layer_1 = Dense(1024, activation='tanh', name='MLP_layer1')(image_txt_embed)
        hidden_layer_2 = Dense(512, activation='tanh', name="MLP_layer2")(hidden_layer_1)
        value_net_outputs = Dense(1, activation='tanh', name='decoder_output')(hidden_layer_2)
        value_net_model = Model(inputs=[img_input, text_input], outputs=value_net_outputs, name='Value_Net')
        print('output {shape}', value_net_outputs.shape)
        print('Value Net built successfully \n')
        if display:
            tf.keras.utils.plot_model(value_net_model, to_file='value_net.png', show_shapes=True)
        return value_net_model
    else:
        feature_vector = Dense(512, activation='tanh')(embed_image)
        text_sequence_vector = Dense(512, activation='tanh', name='rnn_linear')(rnn_output)
        print('Image feature vector shape:', feature_vector.shape)
        print('Text sequence vector shape:', text_sequence_vector.shape)
        reward_model = Model(inputs=[img_input, text_input], outputs=[feature_vector, text_sequence_vector],
                             name='reward net model')
        print('Reward Net built successfully \n')
        if display:
            tf.keras.utils.plot_model(reward_model, to_file='reward_net.png', show_shapes=True)
        return reward_model


if __name__ == "__main__":
    print('TensorFlow Version', tf.__version__)
    #actor_model = Caption_model_gen('policy')
    #critic_model = Caption_model_gen('value')
    #reward = Caption_model_gen('reward')


TensorFlow Version 2.7.0


In [56]:
import tensorflow as tf
from main import Caption_model_gen
from data_processing import data_processing
from data_generator import captions_generation
import pickle

print('TensorFlow Version', tf.__version__)
vocab_size = 5000
max_length = 25


captions_text_path = r'/content/Flickr8k.token.txt'
captions_extraction = data_processing(captions_text_path)
trn_images_id_text = r'/content/Flickr_8k.trainImages.txt'
train_cleaned_seq, train_cleaned_dic = captions_extraction.cleaning_sequencing_captions(trn_images_id_text)
val_images_id_text = r'/content/Flickr_8k.devImages.txt'
val_cleaned_seq, val_cleaned_dic = captions_extraction.cleaning_sequencing_captions(val_images_id_text)
test_images_id_text = r'/content/Flickr_8k.testImages.txt'
test_cleaned_seq, test_cleaned_dic = captions_extraction.cleaning_sequencing_captions(test_images_id_text)
captions_extraction.tokenization(train_cleaned_seq, vocab_size)
print("No of captions: Training-" + str(len(train_cleaned_seq) / 5) + " Validation-" + str(
    len(val_cleaned_seq) / 5) + " test-" + str(len(test_cleaned_seq) / 5))

train_cap_tok = captions_extraction.sentence_tokenizing(train_cleaned_dic)
val_cap_tok = captions_extraction.sentence_tokenizing(val_cleaned_dic)
test_cap_tok = captions_extraction.sentence_tokenizing(test_cleaned_dic)

image_pth_rt = r"/content/Flicker8k_Dataset/" #+ r"\\"
trn_dataset = captions_generation(train_cap_tok, vocab_size, image_pth_rt, max_length)
val_dataset = captions_generation(val_cap_tok, vocab_size, image_pth_rt, max_length)

inputs, outputs = next(iter(trn_dataset))
print(inputs[0].shape, inputs[1].shape, outputs.shape)


TensorFlow Version 2.7.0
max_length of captions 30
max_length of captions 26
max_length of captions 29
No of captions: Training-1200.0 Validation-200.0 test-200.0
Vocab size 5000
Vocab size 5000
Vocab size 5000
(285, 256, 256, 3) (285, 24) (285, 5000)


In [58]:
actor_model = Caption_model_gen(NET='policy', vocab_size=vocab_size, Embed_Size=512, max_length=max_length-1,display=True)
#actor_model.summary()
actor_model.compile(loss=tf.keras.losses.categorical_crossentropy,
                    optimizer=tf.keras.optimizers.Adam(learning_rate=0.001),
                    metrics=['accuracy'])


Building CNN model
CNN model {output shape}: (None, 256)
Building RNN model
final_carry_state {rnn output shape}: (None, 512)
Image and text {add shape}: (None, 512)
output {shape} (None, 5000)
Policy Net built successfully 



In [59]:
actor_model.summary()

Model: "Policy_Net"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_33 (InputLayer)          [(None, 256, 256, 3  0           []                               
                                )]                                                                
                                                                                                  
 block1_conv1 (Conv2D)          (None, 127, 127, 32  864         ['input_33[0][0]']               
                                )                                                                 
                                                                                                  
 block1_conv1_bn (BatchNormaliz  (None, 127, 127, 32  128        ['block1_conv1[0][0]']           
 ation)                         )                                                        

In [None]:


def scheduler(epoch, lr):
    if epoch < 10:
        return lr
    else:
        return lr * tf.math.exp(-0.1)


lr_callback = tf.keras.callbacks.LearningRateScheduler(scheduler)

checkpoint_filepath = '/content'
early_stop_callback = tf.keras.callbacks.EarlyStopping(
    monitor='loss', patience=10)

model_checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
    filepath=checkpoint_filepath,
    monitor='accuracy',
    mode='auto')

callback = [early_stop_callback, model_checkpoint_callback]

history = actor_model.fit(trn_dataset, steps_per_epoch=10, epochs=60, shuffle=False, validation_data=val_dataset,
                          validation_steps=5)

model_parameters = history.history

f = open("/content/history_model_0.001.pkl", "wb")
pickle.dump(model_parameters, f)
f.close()


In [17]:
actor_model.save_weights('/content/initial_policy_net_model.h5')

In [19]:
test_dataset = captions_generation(test_cap_tok, vocab_size, image_pth_rt, max_length)

In [20]:
actor_model.evaluate(test_dataset)

   1255/Unknown - 5189s 4s/step - loss: 3.5979 - accuracy: 0.2958

KeyboardInterrupt: ignored