In [None]:
import os
os.environ['KAGGLE_CONFIG_DIR'] = "/content/"


In [None]:
!kaggle datasets download -d shweta2407/flickr8k-imageswithcaptions

In [None]:
!unzip flickr8k-imageswithcaptions.zip

In [None]:
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.applications.vgg16 import preprocess_input
from keras.models import Sequential, Model
from tensorflow import keras
import matplotlib.pyplot as plt
import string
from keras.applications.resnet50 import ResNet50
from pickle import dump
from pickle import load
from IPython.display import Image
from keras.layers import Dense, Flatten,Input, Convolution2D, Dropout, LSTM, TimeDistributed, Embedding, Bidirectional, Activation, RepeatVector,Concatenate
import numpy as np
from keras.models import load_model
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
from keras.layers.merge import add, concatenate
from keras.callbacks import ModelCheckpoint, EarlyStopping, ReduceLROnPlateau

In [None]:
image_path = 'Flickr8k_Dataset/Flicker8k_Dataset'
caption_path = 'Flickr8k_text/Flickr8k.token.txt'

In [None]:
def load_captions(path):
    captions_dict = {}    
    for caption in open(path):
        tokens = caption.split()
        caption_id, caption_text = tokens[0].split('.')[0], tokens[1:]
        caption_text = ' '.join(caption_text)
        if caption_id not in captions_dict:
            captions_dict[caption_id] = caption_text
        
    return captions_dict

captions_dict = load_captions(caption_path)

In [None]:
print(string.punctuation)

In [None]:
new_captions_dict = {}
table = str.maketrans('', '', string.punctuation)

for caption_id, caption_text in captions_dict.items():
    caption_text = caption_text.split()
    caption_text = [token.lower() for token in caption_text]
    caption_text = [token.translate(table) for token in caption_text]
    caption_text = [token for token in caption_text if len(token)>1]
    # store cleaned captions
    new_captions_dict[caption_id] = 'startseq ' + ' '.join(caption_text) + ' endseq'

In [None]:
caption_images_list = []
image_index = list(new_captions_dict.keys())
caption_images_list = [ image.split('.')[0] for image in os.listdir(image_path) if image.split('.')[0] in image_index ]

In [None]:
train_validate_images = caption_images_list[0:8081] 
test_images = caption_images_list[8081:8091]
test_images

In [None]:
def extract_features(directory, image_keys):
    model = ResNet50(include_top=False,weights='imagenet',input_shape=(224,224,3),pooling='avg')
    print(model.summary())
    features = dict()
    
    for name in image_keys:
        filename = directory + '/' + name + '.jpg'
        image = load_img(filename, target_size=(224, 224))
        image = img_to_array(image)
        image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
        image = preprocess_input(image)
        feature = model.predict(image, verbose=0)
        image_id = name.split('.')[0]
        # store feature
        features[image_id] = feature
         

    return features

In [None]:
train_validate_features1 = extract_features(image_path, train_validate_images)

In [None]:
print("{} : {}".format(list(train_validate_features1.keys())[0], train_validate_features1[list(train_validate_features1.keys())[0]] ))

In [None]:
len(train_validate_features1)

In [None]:
dump(train_validate_features1, open('./train_validate_features1.pkl', 'wb'))
# train_validate_features1 = load(open('./train_validate_features1.pkl', 'rb'))

In [None]:
# make a dictionary of image with caption for train_validate_images
train_validate_image_caption = {}

for image, caption in new_captions_dict.items():
    if image in train_validate_images and image in list(train_validate_features1.keys()):
        train_validate_image_caption.update({image : caption})

len(train_validate_image_caption)

In [None]:
print(list(train_validate_image_caption.values())[2])
Image(image_path+'/'+list(train_validate_image_caption.keys())[2]+'.jpg')

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(list(train_validate_image_caption.values()))
vocab_len = len(tokenizer.word_index) + 1
max_len = max(len(train_validate_image_caption[image].split()) for image in train_validate_image_caption)

print("vocab_len ", vocab_len)
print("max_len ", max_len)

def prepare_data(image_keys):
    # x1 will store  image feature, x2 will store one sequence and y will store the next sequence
    x1, x2, y = [], [], []
    for image in image_keys:
      caption = train_validate_image_caption[image]
      caption = caption.split()
      seq = tokenizer.texts_to_sequences([caption])[0]
      # print(seq)
      length = len(seq)

      for i in range(1, length):
        x2_seq, y_seq = seq[:i] , seq[i] 
        # print(y_seq) 
        x2_seq = pad_sequences([x2_seq], maxlen = max_len)[0]
        y_seq = to_categorical([y_seq], num_classes = vocab_len)[0]
        # print(y_seq) 
        x1.append( train_validate_features1[image][0] )
        x2.append(x2_seq)
        y.append(y_seq)
                
    return np.array(x1), np.array(x2), np.array(y)

In [None]:
train_x1, train_x2, train_y = prepare_data( train_validate_images[0:7081] )
validate_x1, validate_x2, validate_y = prepare_data( train_validate_images[7081:8081] )

In [None]:
embedding_size = 128
image_model = Sequential()

image_model.add(Dense(embedding_size, input_shape=(2048,), activation='relu'))
image_model.add(Dropout(0.5))
image_model.add(RepeatVector(max_len))

image_model.summary()

In [None]:
language_model = Sequential()

language_model.add(Embedding(input_dim=vocab_len, output_dim=embedding_size, input_length=max_len))
language_model.add(LSTM(256,return_sequences=True))
language_model.add(Dropout(0.5))
language_model.add(TimeDistributed(Dense(embedding_size)))

language_model.summary()

In [None]:
conca = Concatenate()([image_model.output, language_model.output])
x = LSTM(128, dropout=0.5, recurrent_dropout=0.5,return_sequences=True)(conca)
x = LSTM(512, dropout=0.5, recurrent_dropout=0.5,return_sequences=False)(x)
x = Dense(vocab_len)(x)
out = Activation('softmax')(x)
model = Model(inputs=[image_model.input, language_model.input], outputs = out)

optimizer = keras.optimizers.Adam(learning_rate=0.01)
model.compile(loss='categorical_crossentropy', optimizer=optimizer)
model.summary()

In [None]:
import tensorflow
filepath = './image_captioning.h5'
def scheduler(epoch, lr):
  if epoch < 15:
    return lr
  if epoch < 30 and epoch > 15:
    return lr/100 
  else:
    return lr/10000
callbacks = [ ModelCheckpoint(filepath= filepath, verbose = 2,save_best_only=True, monitor='val_loss', mode='min'),tensorflow.keras.callbacks.LearningRateScheduler(scheduler) ]

In [None]:
print("shape of train_x1 ", train_x1.shape)
print("shape of train_x2 ", train_x2.shape)
print("shape of train_y ", train_y.shape)
print()
print("shape of validate_x1 ", validate_x1.shape)
print("shape of validate_x2 ", validate_x2.shape)
print("shape of validate_y ", validate_y.shape)

In [None]:
plot_model(model)

In [None]:
BATCH_SIZE = 512
EPOCHS = 50
history = model.fit([train_x1, train_x2],  
                    train_y,              
                    verbose = 1,            
                    epochs = EPOCHS,
                    batch_size = BATCH_SIZE,
                    callbacks = callbacks, 
                    validation_data=([validate_x1, validate_x2], validate_y)) 

In [None]:
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train_loss','val_loss'], loc = 'upper right')
plt.show()

In [None]:
def extract_feat_single(filename):
    model = ResNet50(include_top=False,weights='imagenet',input_shape=(224,224,3),pooling='avg')
    image = load_img(filename, target_size=(224, 224))
    image = img_to_array(image)
    image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2]))
    image = preprocess_input(image)
    feature = model.predict(image, verbose=0)
    return feature

def word_for_id(integer, tokenizr):
    for word, index in tokenizr.word_index.items():
        if index == integer:
            return word
    return None

In [None]:
def generate_desc(model, tokenizer, photo, max_length):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        sequence = pad_sequences([sequence], maxlen=max_length)
        yhat = model.predict([photo,sequence], verbose=0)
        yhat = np.argmax(yhat)
        word = word_for_id(yhat, tokenizer)
        if word is None:
            break
        in_text += ' ' + word
        if word == 'endseq':
            break
    return in_text

## Testing

In [None]:
model = load_model('./image_captioning.h5')
tokenizr = Tokenizer()
tokenizr.fit_on_texts([caption for image, caption in new_captions_dict.items() if image in train_validate_images])
max_length = max_len

In [None]:
photo = extract_feat_single('Flickr8k_Dataset/Flicker8k_Dataset/554526471_a31f8b74ef.jpg')  

in_text = 'startseq'
for i in range(max_length):
    sequence = tokenizer.texts_to_sequences([in_text])[0]
    sequence = pad_sequences([sequence], maxlen=max_length)
    yhat = model.predict([photo,sequence], verbose=0)
    yhat = np.argmax(yhat)
    word = word_for_id(yhat, tokenizer)
    if word is None:
        break
    in_text += ' ' + word
    if word == 'endseq':
        break
in_text = in_text.replace('startseq','') 
in_text = in_text.replace('endseq','') 
print("Predicted caption -> ", in_text)
Image('Flickr8k_Dataset/Flicker8k_Dataset/554526471_a31f8b74ef.jpg')