In [None]:
import os
import random
import pickle
import numpy as np
from tqdm.notebook import tqdm

from tensorflow.keras.applications.vgg16 import VGG16 , preprocess_input
from tensorflow.keras.preprocessing.image import load_img , img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input , Dense , LSTM , Embedding , Dropout , add
from tensorflow.keras.layers import Attention, Concatenate, RepeatVector, Permute
from tensorflow.keras.layers import MultiHeadAttention, LayerNormalization, GlobalAveragePooling1D, Dense

In [None]:
from google.colab import drive
drive.mount('/content/drive')

os.environ['KAGGLE_CONFIG_DIR'] = '/content/drive/MyDrive/kaggle'

In [None]:
!kaggle datasets download -d adityajn105/flickr8k
!kaggle datasets download -d hsankesara/flickr-image-dataset

!unzip -q flickr8k.zip -d flickr8k
!unzip -q flickr-image-dataset.zip -d flickr30k

!ls flickr8k
!ls flickr30k

In [None]:
BASE_DIR = 'flickr8k'
BASE_DIR1 = 'flickr30k'

In [None]:
model = VGG16()

model = Model(inputs = model.inputs , outputs = model.layers[-2].output)

plot_model(model, to_file='vgg16.png', show_shapes=True, dpi=48)

In [None]:
from PIL import Image

image = Image.open('vgg16.png')

width, height = image.size

half_height = height // 2
top_half = image.crop((0, 0, width, half_height))
bottom_half = image.crop((0, half_height, width, height))

top_half.save('top_half.png')
bottom_half.save('bottom_half.png')

import cv2

top_half_path = 'top_half.png'
bottom_half_path = 'bottom_half.png'

top_half = cv2.imread(top_half_path)
bottom_half = cv2.imread(bottom_half_path)

bottom_half_resized = cv2.resize(bottom_half, (top_half.shape[1], top_half.shape[0]))

new_image = cv2.hconcat([top_half, bottom_half_resized])

cv2.imwrite('concatenated_image.png', new_image)


In [None]:
import tensorflow as tf

class PositionalEncoding(tf.keras.layers.Layer):
    def __init__(self, max_len, d_model):
        super(PositionalEncoding, self).__init__()
        self.max_len = max_len
        self.d_model = d_model

    def get_angles(self, pos, i, d_model):
        angle_rates = 1 / tf.pow(10000.0, (2 * tf.cast(i, tf.float32) / tf.cast(d_model, tf.float32)))
        return pos * angle_rates

    def call(self, inputs):
        position = tf.range(0, self.max_len, delta=1, dtype=tf.float32)[:, tf.newaxis]
        angle_rads = self.get_angles(position, tf.range(0, self.d_model, 2), self.d_model)

        sines = tf.math.sin(angle_rads[:, 0::2])

        cosines = tf.math.cos(angle_rads[:, 1::2])

        pos_encoding = tf.concat([sines, cosines], axis=-1)

        pos_encoding = tf.expand_dims(pos_encoding, axis=0)

        return tf.concat([inputs, pos_encoding], axis=-1)


In [None]:
features = {}
directory = os.path.join(BASE_DIR, 'Images')

for img_name in tqdm(os.listdir(directory)):

    img_path = directory + '/' + img_name
    image = load_img(img_path, target_size=(224, 224))

    image = img_to_array(image)

    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))

    image = preprocess_input(image)

    feature = model.predict(image, verbose=0)

    image_id = img_name.split('.')[0]

    features[image_id] = feature

In [None]:
pickle.dump(features, open(os.path.join('features.pkl'), 'wb'))

In [None]:
with open(os.path.join('features.pkl'), 'rb') as f:
    features = pickle.load(f)

In [None]:
with open(os.path.join(BASE_DIR, 'captions.txt'), 'r') as f:
    next(f)
    captions_doc = f.read()

In [None]:
mapping = {}

for line in tqdm(captions_doc.split('\n')):

    tokens = line.split(',')
    if len(line) < 2:
        continue
    image_id, caption = tokens[0], tokens[1:]

    image_id = image_id.split('.')[0]

    caption = " ".join(caption)

    if image_id not in mapping:
        mapping[image_id] = []

    mapping[image_id].append(caption)

In [None]:
len(mapping)

In [None]:
def clean(mapping):
    for key, captions in mapping.items():
        for i in range(len(captions)):

            caption = captions[i]

            caption = caption.lower()

            caption = caption.replace('[^A-Za-z]', '')

            caption = caption.replace('\s+', ' ')

            caption = 'startseq ' + " ".join([word for word in caption.split() if len(word)>1]) + ' endseq'
            captions[i] = caption

In [None]:
mapping['1000268201_693b08cb0e']

In [None]:
clean(mapping)

In [None]:
mapping['1000268201_693b08cb0e']

In [None]:
all_captions = []
for key in mapping:
    for caption in mapping[key]:
        all_captions.append(caption)

In [None]:
len(all_captions)

In [None]:
all_captions[:10]

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index) + 1

In [None]:
vocab_size

In [None]:
max_length = max(len(caption.split()) for caption in all_captions)
max_length

In [None]:
image_ids = list(mapping.keys())
split = int(len(image_ids) * 0.90)
train = image_ids[:split]
test = image_ids[split:]
validation , test = test[:len(test)//2] , test[len(test)//2:]

In [None]:
def data_generator(data_keys, mapping, features, tokenizer, max_length, vocab_size, batch_size):

    X1, X2, y = list(), list(), list()
    n = 0
    while 1:
        for key in data_keys:
            n += 1
            captions = mapping[key]

            for caption in captions:

                seq = tokenizer.texts_to_sequences([caption])[0]

                for i in range(1, len(seq)):

                    in_seq, out_seq = seq[:i], seq[i]

                    in_seq = pad_sequences([in_seq], maxlen=max_length)[0]

                    out_seq = to_categorical([out_seq],num_classes=vocab_size)[0]

                    X1.append(features[key][0])
                    X2.append(in_seq)
                    y.append(out_seq)
            if n == batch_size:
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                yield [X1, X2], y
                X1, X2, y = list(), list(), list()
                n = 0

In [None]:
import tensorflow as tf
from keras.losses import categorical_crossentropy
from keras.optimizers import Adam

def custom_loss(y_true, y_pred):

    target_shape = tf.shape(y_true)
    target_shape = tf.concat([target_shape[:1], [35, 8485]], axis=0)
    y_true = tf.reshape(y_true, target_shape)

    loss = categorical_crossentropy(y_true, y_pred)
    return loss

In [None]:
inputs1 = Input(shape=(4096,))
fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)

inputs2 = Input(shape=(max_length,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model1 = Model(inputs=[inputs1, inputs2], outputs=outputs)
model1.compile(loss='categorical_crossentropy', optimizer='adam')

plot_model(model1, show_shapes=True)

In [None]:
def transformer_model(max_length, vocab_size):
    inputs = Input(shape=(max_length,))
    embedding_layer = Embedding(vocab_size, 256)(inputs)
    positional_encoding = PositionalEncoding(max_length, 256)(embedding_layer)

    attention = MultiHeadAttention(num_heads=8, key_dim=256)(positional_encoding, positional_encoding)
    attention = LayerNormalization(epsilon=1e-6)(attention + positional_encoding)

    pooling_output = GlobalAveragePooling1D()(attention)
    outputs = Dense(vocab_size, activation='softmax')(pooling_output)

    model = Model(inputs=inputs, outputs=outputs)
    return model

model2 = transformer_model(max_length, vocab_size)
model2.compile(loss='categorical_crossentropy', optimizer='adam')

plot_model(model2, show_shapes=True)


In [None]:
import tensorflow as tf
escall = tf.keras.callbacks.EarlyStopping(
    monitor='val_loss',
    patience=5
)

In [None]:
epochs = 50
batch_size = 32
steps = len(train) // batch_size

generator = data_generator(train, mapping, features, tokenizer, max_length, vocab_size, batch_size)
validator = data_generator(validation, mapping, features, tokenizer, max_length, vocab_size, batch_size)


In [None]:
history =  model1.fit(generator, epochs=epochs,validation_data = validator ,validation_steps=int(0.15 * len(test)),callbacks = [escall],  steps_per_epoch=steps, verbose=1)


In [None]:
model1.save('best_model.h5')

In [None]:
from tensorflow.keras.models import load_model

model = load_model('best_model.h5')


In [None]:
def idx_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

In [None]:
def predict_caption(model, image, tokenizer, max_length):

    in_text = 'startseq'

    for i in range(max_length):

        sequence = tokenizer.texts_to_sequences([in_text])[0]

        sequence = pad_sequences([sequence], max_length)

        yhat = model1.predict([image, sequence], verbose=0)

        yhat = np.argmax(yhat)

        word = idx_to_word(yhat, tokenizer)

        if word is None:
            break

        in_text += " " + word

        if word == 'endseq':
            break
    return in_text

In [None]:
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate import meteor_score

actual, predicted = list(), list()

for key in tqdm(test):

    captions = mapping[key]

    y_pred = predict_caption(model1, features[key], tokenizer, max_length)

    actual_captions = [caption.split() for caption in captions]
    y_pred = y_pred.split()

    actual.append(actual_captions)
    predicted.append(y_pred)

print("BLEU-1: %f" % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
print("BLEU-2: %f" % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
print("Meteor %f" % meteor_score.meteor_score(actual, predicted))


In [None]:
from PIL import Image
import matplotlib.pyplot as plt
def generate_caption(image_name):

    image_id = image_name.split('.')[0]
    img_path = os.path.join(BASE_DIR, "Images", image_name)
    image = Image.open(img_path)


    y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
    print(y_pred)
    plt.imshow(image)

In [None]:
folder_path = "flickr8k/Images"
all_images = []

files = os.listdir(folder_path)

all_images.extend(files)

def x(choice):
    image_id = all_images[choice]
    return image_id


In [None]:
generate_caption("1001773457_577c3a7d70.jpg")