# Imports

In [None]:
import numpy as np
import pandas as pd
import cv2
import os
from glob import glob
from keras.utils import to_categorical
from keras_preprocessing.sequence import pad_sequences
from keras.applications import ResNet50
from nltk.translate.bleu_score import sentence_bleu
import copy
import matplotlib.pyplot as plt
import joblib
import copy


# Image Preprocessing

### Load Images

In [None]:
images_path = '../Images/'
images = glob(images_path+'*.jpg')
print("No. of Images: ", len(images))
print("List View: ", images[:5])


### View Sample Images

In [None]:
# for i in range(5):
plt.figure()
img = cv2.imread(images[0])
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
plt.imshow(img)


# CNN Model for Feature Extraction

### Load Model

In [None]:
base_model = ResNet50(include_top=True)
base_model.summary()


### Configure Model

In [None]:
from keras.models import Model
last = base_model.layers[-2].output
main_model = Model(inputs=base_model.input, outputs=last)
main_model.summary()


## Feature Extraction

In [None]:
images_features = {}
model_type = 'resnet50'
SAVE = 'features_'+model_type+'.joblib'

if os.path.exists(SAVE):
    images_features = joblib.load(SAVE)
else:
    for i in images:
        img = cv2.imread(i)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (224, 224))
        img = img.reshape(1, 224, 224, 3)
        pred = main_model.predict(img).reshape(2048,)
        img_name = i.split('\\')[-1]
        images_features[img_name] = pred

    joblib.dump(images_features, SAVE)

print("Number of Features: ", len(images_features))


# Text Preprocessing

### Load Captions

In [None]:
caption_path = '../captions.txt'
captions = open(caption_path, 'rb').read().decode('utf-8').split('\n')
print("No. of Captions: ", len(captions))
print(captions[1].split(',')[1])


### Map Images to Captions

In [None]:
captions_dict = {}
# reference_dict = copy.deepcopy(captions_dict)
for i in captions:
    try:
        img_name = i.split(',')[0]
        caption = i.split(',')[1]
        if img_name in images_features:
            if img_name not in captions_dict:
                captions_dict[img_name] = [caption]
            else:
                captions_dict[img_name].append(caption)
    except:
        pass

reference_dict = copy.deepcopy(captions_dict)
len(captions_dict)


In [None]:
for k, v in reference_dict.items():
    for x in range(len(reference_dict[k])):
        reference_dict[k][x] = reference_dict[k][x].split()
# reference_dict['Images\\1000268201_693b08cb0e.jpg']


### Visualize Images with Captions

In [None]:
import matplotlib.pyplot as plt

for k in images_features.keys():
    plt.figure()
    img_name = '../Images/' + k
    img = cv2.imread(img_name)
    print(img_name)
    img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
    plt.xlabel(captions_dict[k][-1])
    plt.imshow(img)
    break


### Preprocess Captions

In [None]:
def preprocessed(txt):
    modified = txt.lower()
    modified = 'startofseq ' + modified + ' endofseq'
    return modified


for k, v in captions_dict.items():
    for vv in v:
        captions_dict[k][v.index(vv)] = preprocessed(vv)


### Create Vocabulary

In [None]:
count_words = {}
for k, vv in captions_dict.items():
    for v in vv:
        for word in v.split():
            if word not in count_words:

                count_words[word] = 0

            else:
                count_words[word] += 1

print("Vocab Size: ", len(count_words))


### Converting Vocabulary to Integer Vocabulary

In [None]:
THRESH = -1
count = 1
new_dict = {}
for k, v in count_words.items():
    if count_words[k] > THRESH:
        new_dict[k] = count
        count += 1

print("Vocab Size: ", len(new_dict))


In [None]:
print("Before: ", captions_dict['1000268201_693b08cb0e.jpg'])
new_dict['<OUT>'] = len(new_dict)
for k, vv in captions_dict.items():
    for v in vv:
        encoded = []
        for word in v.split():
            if word not in new_dict:
                encoded.append(new_dict['<OUT>'])
            else:
                encoded.append(new_dict[word])

        captions_dict[k][vv.index(v)] = encoded

print("After: ", captions_dict['1000268201_693b08cb0e.jpg'])


# Generator Function

In [None]:
MAX_LEN = 0
for k, vv in captions_dict.items():
    for v in vv:
        if len(v) > MAX_LEN:
            MAX_LEN = len(v)

print("Max Length of a Word: ", MAX_LEN)

Batch_size = 5000
VOCAB_SIZE = len(new_dict)


def generator(image_ids, photo, cleaned_captions):
    X1 = []
    X2 = []
    y_out = []

    for image_id in image_ids:
        captions = cleaned_captions[image_id]
        for v in captions:
            # for k, caption_list in captions.items():
            #     for v in caption_list:
            for i in range(1, len(v)):
                X1.append(photo[image_id])

                in_seq = [v[:i]]
                out_seq = v[i]

                in_seq = pad_sequences(
                    in_seq, maxlen=MAX_LEN, padding='post', truncating='post')[0]
                out_seq = to_categorical([out_seq], num_classes=VOCAB_SIZE)[0]

                X2.append(in_seq)
                y_out.append(out_seq)

    return np.array(X1), np.array(X2, dtype='float64'), np.array(y_out, dtype='float64')


In [None]:
def train_validation_test_split(descriptions, train_size=0.74, validation_size=0.13):
    image_ids = list(descriptions.keys())

    train_split = int(len(image_ids) * train_size)
    validation_split = int(len(image_ids) * (train_size + validation_size))

    train = image_ids[:train_split]
    validation = image_ids[train_split:validation_split]
    test = image_ids[validation_split:]
    return train, validation, test


In [None]:
image_id_train, image_id_validation, image_id_test = train_validation_test_split(
    captions_dict)


In [None]:
len(image_id_train), len(image_id_validation), len(image_id_test)


In [None]:
X1_train, X2_train, y_train = generator(
    image_id_train, images_features, captions_dict)
X1_val, X2_val, y_val = generator(
    image_id_validation, images_features, captions_dict)


In [None]:
X1_train.shape, X2_train.shape, y_train.shape, X1_val.shape


# RNN Model for Training and Prediction

### Imports

In [None]:
from keras_preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model, Sequential
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import TimeDistributed, Activation, RepeatVector, Concatenate


### Configure Model

In [None]:
embedding_size = 128
max_len = MAX_LEN
vocab_size = len(new_dict)

image_model = Sequential()

image_model.add(Dense(embedding_size, input_shape=(2048,), activation='relu'))
image_model.add(RepeatVector(max_len))

image_model.summary()

language_model = Sequential()

language_model.add(Embedding(input_dim=vocab_size,
                   output_dim=embedding_size, input_length=max_len))
language_model.add(LSTM(256, return_sequences=True))
language_model.add(TimeDistributed(Dense(embedding_size)))

language_model.summary()

conca = Concatenate()([image_model.output, language_model.output])
x = LSTM(128, return_sequences=True)(conca)
x = LSTM(512, return_sequences=False)(x)
x = Dense(vocab_size)(x)
out = Activation('softmax')(x)
model = Model(inputs=[image_model.input, language_model.input], outputs=out)

# model.load_weights("mine_model_weights.h5")
model.compile(loss='categorical_crossentropy',
              optimizer='RMSprop', metrics=['accuracy'])
model.summary()


### Plot Model

In [None]:
plot_model(model, show_shapes=True)


### Fit Model

In [None]:
EPOCH = 2
BATCH_SIZE = 512
history = model.fit([X1_train, X2_train], y_train,
                    batch_size=BATCH_SIZE,
                    epochs=EPOCH,
                    validation_data=([X1_val, X2_val], y_val))


### Save Model

In [None]:
inv_dict = {v: k for k, v in new_dict.items()}  # For Prediction
model.save('model_'+model_type+'_'+str(EPOCH)+'E.h5')
model.save_weights('mine_model_weights_'+model_type+'_'+str(EPOCH)+'E.h5')
np.save('vocab'+model_type+'.npy', new_dict)  # type: ignore


In [None]:
# Get training and test loss histories
training_loss = history.history['loss']
test_loss = history.history['val_loss']

# Create count of the number of epochs
epoch_count = range(1, len(training_loss) + 1)

# Visualize loss history
plt.plot(epoch_count, training_loss, 'r')
plt.plot(epoch_count, test_loss, 'b')
plt.legend(['Training Loss', 'Test Loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()

In [None]:
joblib.dump(history.history,'history_'+model_type+str(EPOCH)+'.joblib')

# Predictions

In [None]:
def getImage(x):

    test_img_path = images[x]

    test_img = cv2.imread(test_img_path)
    test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB)

    test_img = cv2.resize(test_img, (224, 224))

    test_img = np.reshape(test_img, (1, 224, 224, 3))

    return test_img


In [None]:
metrics = {'BLEU-1': [], 'BLEU-2': [], 'BLEU-3': [], 'BLEU-4': []}
for i in range(5):
    # for i in range(TEST):
    #     no = i
    no = np.random.randint(1500, 6000, (1, 1))[0, 0]
    test_feature = main_model.predict(getImage(no)).reshape(1, 2048)

    test_img_path = images[no]
    test_img = cv2.imread(test_img_path)
    test_img = cv2.cvtColor(test_img, cv2.COLOR_BGR2RGB)

    text_inp = ['startofseq']

    count = 0
    caption = ''
    while count < 40:  # Assuming number of words in a caption is not more than 40
        count += 1

        encoded = []
        for i in text_inp:
            encoded.append(new_dict[i])

        encoded = [encoded]

        encoded = pad_sequences(encoded, padding='post',
                                truncating='post', maxlen=MAX_LEN)

        # Predicting next word which has Higher Probability
        prediction = np.argmax(model.predict([test_feature, encoded]))

        sampled_word = inv_dict[prediction]

        if sampled_word == 'endofseq':
            break

        caption = caption + ' ' + sampled_word

        text_inp.append(sampled_word)

    ref = reference_dict[test_img_path.split('\\')[-1]]
    metrics['BLEU-1'].append(sentence_bleu(ref,
                             caption.split(), weights=(1, 0, 0, 0)))
    metrics['BLEU-2'].append(sentence_bleu(ref,
                             caption.split(), weights=(0.5, 0.5, 0, 0)))
    metrics['BLEU-3'].append(sentence_bleu(ref,
                             caption.split(), weights=(0.33, 0.33, 0.33, 0)))
    metrics['BLEU-4'].append(sentence_bleu(ref,
                             caption.split(), weights=(0.25, 0.25, 0.25, 0.25)))
    plt.figure()
    plt.imshow(test_img)
    plt.xlabel(caption)


In [None]:
metrics
