# Image Captioning

In [None]:
import pandas as pd
!pip install tqdm
from tqdm import tqdm
tqdm.pandas()
import numpy as np
import re
# text
import string
from PIL import Image
import tensorflow as tf
import matplotlib.pyplot as plt

In [None]:
# # connect to a TPU and instantiate a distribution strategy
tpu = tf.distribute.cluster_resolver.TPUClusterResolver.connect()
tpu_strategy = tf.distribute.TPUStrategy(tpu)

In [None]:
# Read text data from txt file using pandas
text_data = pd.read_csv('/kaggle/input/flickr8k/captions.txt', sep='|')
# text_data.columns = ['image_id', 'caption']
text_data

In [None]:
# Separate the image name from the image id
# text_data['image_name'] = text_data['image_id'].apply(lambda x: x.split('#')[0])
# text_data['image_repeat'] = text_data['image_id'].apply(lambda x: x.split('#')[1])

text_data.rename(columns={"caption_number":"image_repeat","caption_text":"caption"},inplace=True)
text_data

In [None]:
# Drop rows with given index
text_data.drop(text_data[text_data['image_name']=="2258277193_586949ec62.jpg.1"].index, inplace=True)
text_data.reset_index(drop=True, inplace=True)

In [None]:
def preprocessing_text( line ):
    # lowercase all the english words
    line  = line.lower()
    # remove punctuation
    line = line.translate(str.maketrans('', '', string.punctuation))
    return line

print("Preprocessing Text")
text_data['caption'] = text_data['caption'].progress_apply( preprocessing_text )
# data.dropna( inplace=True )
# data.reset_index(drop=True, inplace=True)
text_data

In [None]:
# Add start and end tokens into english sentences
text_data['caption'] = text_data['caption'].progress_apply( lambda x: 'START '+ x + ' END' )

# Tokenize the english sentences using Keras tokenizer
from keras.preprocessing.text import Tokenizer

tokenizer = Tokenizer( lower=False , split=' ', char_level=False, oov_token="oovE",filters='!"#$%&()*+,-./:;<=>?@[\\]^_`{|}~\t\n') # num_words=num_words
tokenizer.fit_on_texts(text_data['caption'].values)
vocab_size = len(tokenizer.word_index) + 1
print("Vocabulary Size: ", vocab_size)

# Convert the english sentences into sequences
sequences = tokenizer.texts_to_sequences(text_data['caption'].values)
max_length = max([len(line) for line in sequences])

# padding sequence (max_length)
padded_sequences = []
for sequence in tqdm(sequences):
    if len(sequence) < max_length:
        sequence += [0] * (max_length - len(sequence))
    padded_sequences.append(sequence)

# print(tokenizer.word_index)
print(text_data['caption'][0])
print(sequences[0])
print(padded_sequences[0])

In [None]:
# remove the padding from the sequence
def remove_padding_from_sequence(sequence):
    return [word for word in sequence if word != 0]
    
# Print the sequence and compare with the original sentence using keras
n = 2
for j in range( n ):
    i = np.random.randint( text_data.shape[0] )
    print( i )
    print("Original Sentence ---> ", text_data["caption"][i])
    print("Sequence ---> ", padded_sequences[i])
    print(" Sequence into Sentence  ---> ", tokenizer.sequences_to_texts( [remove_padding_from_sequence(padded_sequences[i])] )) 
    print("---------------------   -  --------------------- \n ")


In [None]:
text_data['padded_sequences'] = padded_sequences
text_data

In [None]:
# Drop rows with given value of column
text_data_train = text_data.drop(text_data[text_data['image_repeat']==0].index)
text_data_train.reset_index(drop=True, inplace=True)
text_data_val = text_data[ text_data['image_repeat']==0 ]
text_data_val.reset_index(drop=True, inplace=True)
text_data_train.shape, text_data_val.shape

In [None]:
# Create images dataset from file names
image_width = 224
image_height = 224

def load_image(image_path, filenames):
    image_data = np.empty((len(filenames), image_height, image_width, 3),dtype=np.uint8)
    for i in tqdm(range(len(filenames))):
        image_data[i] = np.array(Image.open(image_path + filenames[i]).resize((image_width, image_height)), dtype=np.uint8)
    # image_data = tf.convert_to_tensor(image_data, dtype=tf.int32)
    return image_data
    

image_data_train = load_image('/kaggle/input/flickr8k/images/', text_data_train['image_name'])
image_data_val = load_image('/kaggle/input/flickr8k/images/', text_data_val['image_name'])

image_data_train.shape, image_data_val.shape

In [None]:
# Display fews images
plt.figure(figsize=(10,10))
for i in range(16):
    plt.subplot(4,4,i+1)
    plt.imshow(image_data_train[i])
    plt.axis('off')
    # caption (small font size)
    plt.text(0, 2, text_data_train['caption'][i], fontsize=12)
    # largen plot size
    plt.gcf().set_size_inches(40, 30)
    
plt.show()

In [None]:
text_data_train_in = np.concatenate( (np.array(list(text_data_train['padded_sequences']))[:, :-1] , np.zeros((image_data_train.shape[0],1), dtype=np.int32) ), axis=1 )
text_data_train_out = np.concatenate( (np.array(list(text_data_train['padded_sequences']))[:, 1:] , np.zeros((image_data_train.shape[0],1), dtype=np.int32) ), axis=1 )
text_data_test_in = np.concatenate( (np.array(list(text_data_val['padded_sequences']))[:, :-1] , np.zeros((image_data_val.shape[0],1), dtype=np.int32)) , axis=1 )
text_data_test_out = np.concatenate( ( np.array(list(text_data_val['padded_sequences']))[:, 1:] , np.zeros((image_data_val.shape[0],1), dtype=np.int32)) , axis=1 )

image_data_train.shape, image_data_val.shape, text_data_train_in.shape, text_data_train_out.shape, text_data_test_in.shape, text_data_test_out.shape

# Training Model

In [None]:
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.layers import Input, InputLayer, Embedding, LSTM, GRU, TimeDistributed, RepeatVector, Dense, Bidirectional, Flatten, LayerNormalization, Add, Dropout
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.applications.resnet50 import ResNet50

In [None]:
batch_size =  8 * tpu_strategy.num_replicas_in_sync # 16
epochs = 150
# steps_per_execution = 32

In [None]:
# Loss Function # Masked Sparse Categorical Cross Entropy
def masked_categorical_cross_entropy( y_true, y_pred ):
    # Calculate the loss for each item in the batch.
    loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')(y_true, y_pred)
    # Mask off the losses on padding.
    mask = tf.cast(y_true != 0, loss.dtype)
    loss *= mask
    # Return the total.
    return tf.reduce_sum(loss)/tf.reduce_sum(mask)

# Accuracy Metric
def masked_accuracy(y_true, y_pred):
    # Calculate the loss for each item in the batch.
    y_pred = tf.argmax(y_pred, axis=-1)
    y_pred = tf.cast(y_pred, y_true.dtype)
    match = tf.cast(y_true == y_pred, tf.float32)
    mask = tf.cast(y_true != 0, tf.float32)
    return tf.reduce_sum(match)/tf.reduce_sum(mask)

In [None]:
# # Creating the model
# channels = 3
# units = 2048

# # instantiating the model in the strategy scope creates the model on the TPU
# with tpu_strategy.scope():

#     ######  Image Encoder
#     class Image_Encoder(tf.keras.layers.Layer):
#         def __init__( self, input_shape, units, image_model=MobileNetV2, trainable=True ):
#             super(Image_Encoder,self).__init__()
#             self.input_image_shape = input_shape
#             self.image_model = image_model(include_top=False, weights='imagenet', input_shape=self.input_image_shape)
#             self.trainable = trainable
#             self.units = units
#             self.dense = Dense(units, activation='relu')

#         def call( self, image_encoder_inputs ):
#             image_features = self.image_model(image_encoder_inputs)
#             # Flatten
#             image_features = tf.keras.layers.Reshape((image_features.shape[1]*image_features.shape[2], image_features.shape[3]))( image_features )
#             # Dense   # ********** #
#             image_features = self.dense(image_features) # ********** #
#             return image_features

#         def get_config(self):
#             config = super(Image_Encoder,self).get_config()
#             config.update({ 'input_image_shape': self.input_image_shape, 'image_model': self.image_model, 'units': self.units, 'dense': self.dense, "trainable":self.trainable })
#             return config

#     class Decoder(tf.keras.layers.Layer):
#         def __init__(self, vocab_size, units, max_length):
#             super(Decoder,self).__init__()
#             self.vocab_size = vocab_size
#             self.units = units
#             self.max_length = max_length
#             self.Embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=self.units, input_length=self.max_length , mask_zero=True)
#             self.gru_layer = tf.keras.layers.GRU(self.units, return_sequences=True, return_state=True)
#             self.attention_layer = tf.keras.layers.Attention()
#             self.add = Add()
#             self.layernorm = LayerNormalization(axis=-1)
#             self.dense = tf.keras.layers.Dense(vocab_size)

#         def call( self, text_encoder_inputs, image_features ):
#             # Embedding
#             length = self.max_length
#             encoder = self.Embedding
#             encoder_embedding = encoder(text_encoder_inputs)
#             # RNN
#             gru_output, gru_state = self.gru_layer(encoder_embedding)
#             # Attention
#             # key & query --> gru_output | value --> encoder_output
#             context_vector = self.attention_layer([gru_output, image_features])
#             addition = self.add([gru_output,context_vector])
#             norm_layer = self.layernorm(addition)
#             # Dense
#             output = self.dense(norm_layer) # overall_decoder_attention_output
#             return output, gru_state

#         def get_config(self):
#             config = super(Decoder,self).get_config()
#             config.update({ 'vocab_size': self.vocab_size, 'units': self.units, 'max_length': self.max_length, 'Embedding': self.Embedding, "gru_layer":self.gru_layer,
#                              'attention_layer':self.attention_layer, 'add':self.add, 'layernorm':self.layernorm, "dense":self.dense })
#             return config


#     # Inputs
#     text_encoder_inputs = Input(shape=(max_length,))
#     image_encoder_inputs = Input(shape=(image_height, image_width, channels))

#     # Image Encoding
#     image_encoder = Image_Encoder( input_shape=(image_height, image_width, channels), image_model=ResNet50, units=units, trainable=True )
#     image_features = image_encoder( image_encoder_inputs )

#     # Decoding Caption
#     decoder = Decoder(vocab_size=vocab_size, units=units, max_length=max_length)
#     output, gru_state = decoder( text_encoder_inputs, image_features )

#     # Model
#     training_model = tf.keras.Model(inputs=[image_encoder_inputs,text_encoder_inputs], outputs=output)   
    
#     # Mark all layers in the model as trainable
#     for layer in training_model.layers:
#         layer.trainable = True

#     # Compile
#     lr_decayed_fn = tf.keras.optimizers.schedules.CosineDecay( initial_learning_rate=1e-4, decay_steps=1000)
#     training_model.compile(loss=masked_categorical_cross_entropy, optimizer=tf.keras.optimizers.AdamW(learning_rate=lr_decayed_fn),metrics=[masked_accuracy]) # Adam(learning_rate, beta_1=0.9, beta_2=0.98,epsilon=1e-9)
#     training_model.summary()

In [None]:
# Creating the model
channels = 3
units = 1024

# instantiating the model in the strategy scope creates the model on the TPU
with tpu_strategy.scope():

    # Image Encoder
    class Image_Encoder(tf.keras.layers.Layer):
        def __init__(self, input_shape, units, image_model=MobileNetV2, trainable=True):
            super(Image_Encoder, self).__init__()
            self.input_image_shape = input_shape
            self.image_model = image_model(include_top=False, weights='imagenet', input_shape=self.input_image_shape)
            self.trainable = trainable
            self.units = units
            self.dense = Dense(units, activation='relu')

        def call(self, image_encoder_inputs):
            image_features = self.image_model(image_encoder_inputs)
            # Flatten
            image_features = tf.keras.layers.Reshape((image_features.shape[1] * image_features.shape[2], image_features.shape[3]))(image_features)
            # Dense
            image_features = self.dense(image_features)
            return image_features

        def get_config(self):
            config = super(Image_Encoder, self).get_config()
            config.update({'input_image_shape': self.input_image_shape, 'image_model': self.image_model, 'units': self.units, 'dense': self.dense, "trainable": self.trainable})
            return config

    class Decoder(tf.keras.layers.Layer):
        def __init__(self, vocab_size, units, max_length, num_gru_layers=2):
            super(Decoder, self).__init__()
            self.vocab_size = vocab_size
            self.units = units
            self.max_length = max_length
            self.num_gru_layers = num_gru_layers
            self.Embedding = tf.keras.layers.Embedding(input_dim=vocab_size, output_dim=self.units, input_length=self.max_length, mask_zero=True)
            
            # List of GRU layers
            self.gru_layers = [tf.keras.layers.GRU(self.units, return_sequences=True, return_state=True) for _ in range(self.num_gru_layers)]
            
            self.attention_layer = tf.keras.layers.Attention()
            self.add = Add()
            self.layernorm = LayerNormalization(axis=-1)
            self.dense = tf.keras.layers.Dense(vocab_size)

            # Skip connection layers
            self.image_feature_projection = tf.keras.layers.Dense(units)
            self.image_feature_embedding = tf.keras.layers.Dense(units)

        def call(self, text_encoder_inputs, image_features):
            # Embedding
            length = self.max_length
            encoder = self.Embedding
            encoder_embedding = encoder(text_encoder_inputs)

            # Pass through GRU layers
            gru_output = encoder_embedding
            for gru_layer in self.gru_layers:
                gru_output, gru_state = gru_layer(gru_output)

            # Attention
            # key & query --> gru_output | value --> encoder_output
            context_vector = self.attention_layer([gru_output, image_features])
            addition = self.add([gru_output, context_vector])
            norm_layer = self.layernorm(addition)
            
            # Dense
            output = self.dense(norm_layer)  # overall_decoder_attention_output

            # Skip connections
            image_projection = self.image_feature_projection(image_features)
            image_embedding = self.image_feature_embedding(image_projection)
            
            # Adjust dimensions for skip connection
            image_embedding = tf.reduce_mean(image_embedding, axis=1, keepdims=True)  # Pooling across time steps
            image_embedding = tf.tile(image_embedding, [1, tf.shape(gru_output)[1], 1])  # Tile to match time steps
            
            image_skip_connection = self.add([gru_output, image_embedding])
            output = tf.concat([output, image_skip_connection], axis=-1)

            return output, gru_state

        def get_config(self):
            config = super(Decoder, self).get_config()
            config.update({'vocab_size': self.vocab_size, 'units': self.units, 'max_length': self.max_length, 'Embedding': self.Embedding, "gru_layers": self.gru_layers,
                           'attention_layer': self.attention_layer, 'add': self.add, 'layernorm': self.layernorm, "dense": self.dense,
                           'image_feature_projection': self.image_feature_projection, 'image_feature_embedding': self.image_feature_embedding})
            return config

    # Inputs
    text_encoder_inputs = Input(shape=(max_length,))
    image_encoder_inputs = Input(shape=(image_height, image_width, channels))

    # Image Encoding
    image_encoder = Image_Encoder(input_shape=(image_height, image_width, channels), image_model=ResNet50, units=units, trainable=True)
    image_features = image_encoder(image_encoder_inputs)

    # Decoding Caption
    decoder = Decoder(vocab_size=vocab_size, units=units, max_length=max_length, num_gru_layers=5)  # Added 3 GRU layers
    output, gru_state = decoder(text_encoder_inputs, image_features)

    # Model
    training_model = tf.keras.Model(inputs=[image_encoder_inputs, text_encoder_inputs], outputs=output)

    # Mark all layers in the model as trainable
    for layer in training_model.layers:
        layer.trainable = True

    # Compile
    # lr_decayed_fn = tf.keras.optimizers.schedules.CosineDecay(initial_learning_rate=1e-4, decay_steps=1000)
    training_model.compile(loss=masked_categorical_cross_entropy, optimizer=tf.keras.optimizers.AdamW(learning_rate=5e-4), metrics=[masked_accuracy])
    training_model.summary()


In [None]:
# early_stopping = EarlyStopping(monitor='loss', patience=20)
checkpoint = ModelCheckpoint("Image_Captioning_Transfomer_Model", moniter='loss',save_best_only=True, save_weights_only=False, mode='min')
# reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.25, patience=2, min_lr=1e-10,mode="min")
history = training_model.fit( [image_data_train,text_data_train_in], text_data_train_out, validation_data=([image_data_val,text_data_test_in],text_data_test_out), 
                                 epochs=epochs, batch_size=batch_size, callbacks=[checkpoint] )

# Setup for Inference

In [None]:
inference_model = tf.keras.Model(inputs=training_model.input, outputs=training_model.layers[3].output)
# output, gru_state = inference_model.predict( [ image_data_train[0:2], text_data_train_in[0:2]] )

In [None]:
def create_caption( image, model ):
    
    # Display fews images
    plt.figure(figsize=(4,4))
    plt.imshow(image)
    plt.axis('off')
    plt.show()
        
    image = np.expand_dims(image, axis=0) 
    initial_token = tokenizer.texts_to_sequences(["START"])[0]
    initial_token += [0] * (max_length - len(initial_token))

    gru_state = np.zeros( (1,units) )
    tokens = np.expand_dims( np.array( initial_token ), axis=0 )
    
    for i in range(1,max_length):
        output, gru_state = model.predict( [image,tokens], verbose=0 )
        current_pred_token = np.argmax(output[:,-1,:],axis=-1)

        tokens[0][i] = current_pred_token

    pred_caption = tokenizer.sequences_to_texts( tokens )
    print( "Predicted Caption: ", pred_caption )
    
    return pred_caption

index = 10
print( "Original Caption", " | ".join(list(text_data_train[ text_data_train['image_name']==text_data_train['image_name'][index] ]['caption'].values)) )
pred_caption = create_caption( image=image_data_train[index], model=inference_model )

# Create Caption

In [None]:
for index in np.random.randint(int(image_data_train.shape[0]/4 - 1),size=20):
    print( "Original Caption", " | ".join(list(text_data_train[ text_data_train['image_name']==text_data_train['image_name'][index] ]['caption'].values)) )
    pred_caption = create_caption( image=image_data_train[index], model=inference_model )
    print( "---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - ---- - \n \n" )

***