In [None]:
import numpy as np
import matplotlib.pyplot as plt
from keras.models import Model, load_model
from keras.preprocessing import image
import time
import pickle
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.layers import *

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd
dataset = pd.read_csv('/content/drive/MyDrive/drive-download-20210515T180709Z-001/Flickr8k_text/Flickr8k.lemma.token.txt',delimiter='\t',header=None) 


In [None]:
print(dataset.shape)

In [None]:
dataset.head()

In [None]:
print(dataset[0][3])
print(dataset[1][3])

# Data Cleaning

In [None]:
import re
import nltk
corpus = []
for i in range(0, dataset.shape[0]):
    cap = re.sub('[^a-zA-Z]', ' ', dataset[1][i])  #Removed all other characters except alphabets
    cap = cap.lower()                              #Converted to lower case
    cap = cap.split()                               
    cap=[word for word in cap if len(word)>1]      #Removed single letter words
    cap = ' '.join(cap)                            # Joined with spaces
    cap= '<startseq> '+cap+' <endseq>'
    corpus.append(cap)                             #list of captions

In [None]:
for i in range(50): print(corpus[i:i+1])
print(len(corpus))

In [None]:
i=0
while i < dataset.shape[0]:
  dataset[0][i]=dataset[0][i].split('.')[0]        #Removed characters after '.'
  i=i+1

In [None]:
print(dataset.head)

In [None]:
type(dataset)

In [None]:
from collections import defaultdict
new_dict= defaultdict(lambda : [])          #Created a dictionary with image ids as key and captions as value
i=0
while i <len(corpus):
  # print(dataset[0][i])
  new_dict[dataset[0][i]].append(corpus[i])
  i=i+1                                                     

In [None]:
print(len(new_dict))

In [None]:
print(new_dict['2513260012_03d33305cf'])

In [None]:
all_vocab = []              #A list of all the words in the captions

for key in new_dict.keys():
    [ all_vocab.append(i) for des in new_dict[key] for i in des.split()]

In [None]:
print("total words appearing : " , len(all_vocab))

In [None]:
from collections import Counter

counter = Counter(all_vocab)

In [None]:
dic_ = dict(counter)

In [None]:
sorted_dic = sorted(dic_.items(), key = lambda x: x[1], reverse=True)   #List with words and their corresponding frequency

In [None]:
print(len(sorted_dic))

In [None]:
for i in range(100):
  print(sorted_dic[i])

In [None]:
threshold_value = 10

d = [(x) for x in sorted_dic if x[1]>threshold_value]

In [None]:
len(d)

In [None]:
all_vocab = [x[0] for x in d]                       #Updating all_vocab

In [None]:
len(all_vocab)

In [None]:
f = open('new_dict.txt', 'w')
f.write(str(new_dict))
f.close()

In [None]:
with open('/content/drive/MyDrive/drive-download-20210515T180709Z-001/Flickr8k_text/Flickr_8k.trainImages.txt') as f:
  train=f.read()

In [None]:
train = [e[:-4] for e in train.split('\n')[:-1]]                                #train image ids

In [None]:
for i in range(len(train)):
  if i>10:
    break
  print(train[i])

In [None]:
with open('/content/drive/MyDrive/drive-download-20210515T180709Z-001/Flickr8k_text/Flickr_8k.testImages.txt')as f:
  test=f.read()

In [None]:
test = [e[:-4] for e in test.split('\n')[:-1]]                                    #test image ids

In [None]:
for i in range(len(test)):
  if i>10:
    break
  print(test[i])

In [None]:
train_descriptions = {}                                                           #dictionary with keys as image ids and values as corresponding captions
for t in train:
    train_descriptions[t] = []
    for cap in new_dict[t]:
        train_descriptions[t].append(cap)

In [None]:
print(train_descriptions['2903617548_d3e38d7f88'][0])

In [None]:
images='/content/drive/MyDrive/drive-download-20210515T180709Z-001/Flicker8k_Images/'

In [None]:
img_id='2903617548_d3e38d7f88.jpg'
path = images + img_id
img = plt.imread(path)
plt.imshow(img)
plt.show()
for i in range(5):
  print(train_descriptions['2903617548_d3e38d7f88'][i])

In [None]:
i=0
for keys,values in train_descriptions.items():
 i=i+1
 
 print(keys)
 print(values)
 if i>20:break

# Data Preprocessing- Images

In [None]:
from keras.applications.resnet50 import ResNet50, preprocess_input

In [None]:
model = ResNet50(weights = 'imagenet', input_shape = (224,224,3))

In [None]:
model.summary()

In [None]:
model_new = Model(inputs = model.input, outputs =  model.layers[-2].output)

In [None]:
def preprocess_image(img):
    img = image.load_img(img, target_size=(224,224))
    img = image.img_to_array(img)
    img = preprocess_input(img)
    img = np.expand_dims(img, axis = 0)

    return img


In [None]:
def encode_image(img):
    img = preprocess_image(img)
    fea_vec = model_new.predict(img)
    fea_vec = fea_vec.reshape(fea_vec.shape[1], )
    return fea_vec

In [None]:
start = time.time()

encoding_train = {}                #Dictionary with keys as image ids and values as encoding corresponding to the images                        

for ix, img in enumerate(train):
    
    img = images+train[ix]+".jpg"
    
    p = encode_image(img)
    
    encoding_train[ img[len(images):] ] = p
    
    
    if ix%100 == 0:
        print("Encoding image :" + str(ix))


In [None]:
cd

In [None]:
start = time.time()

encoding_test = {}

for ix, img in enumerate(test):
    
    img = images+test[ix]+".jpg"
    
    p = encode_image(img)
    
    encoding_test[ img[len(images):] ] = p
    
    
    if ix%100 == 0:
        print("Encoding image :" + str(ix))


In [None]:
with open("/content/drive/MyDrive/drive-download-20210515T180709Z-001/encoded_train_images.pkl", 'wb') as f:
    pickle.dump(encoding_train, f )

In [None]:
with open("/content/drive/MyDrive/drive-download-20210515T180709Z-001/encoded_test_images.pkl", 'wb') as f:
    pickle.dump(encoding_test, f )

In [None]:
with open("/content/drive/MyDrive/drive-download-20210515T180709Z-001/encoded_train_images.pkl", 'rb') as f:
    encoding_train = pickle.load(f)

In [None]:
with open("/content/drive/MyDrive/drive-download-20210515T180709Z-001/encoded_test_images.pkl", 'rb') as f:
    encoding_test = pickle.load(f)

In [None]:
word_to_idx = {}
idx_to_word = {}

ix = 1

for e in all_vocab:
    #print(ix,e)
    word_to_idx[e] = ix
    idx_to_word[ix] = e
    ix +=1

In [None]:
i=0
for keys,values in word_to_idx.items():
 i=i+1
 
 print(keys,values)
 
 if i>20:break

In [None]:
i=0
for keys,values in idx_to_word.items():
 i=i+1
 
 print(keys,values)
 
 if i>20:break

In [None]:
for i in range(21):print(all_vocab[i])

In [None]:
len(all_vocab)

In [None]:
vocab_size = len(idx_to_word) + 1
print(vocab_size)

In [None]:
all_caption_len = []

for key in train_descriptions.keys():
    for cap in train_descriptions[key]:
        all_caption_len.append(len(cap.split()))

In [None]:
print(len(all_caption_len))
print(all_caption_len[:50])

In [None]:
max_len = max(all_caption_len)
print(max_len) 

In [None]:
def data_generator(train_descriptions, encoding_train, word_to_idx, max_len,  num_photos_per_batch ):
    X1, X2, y = [], [], []
    
    n=0
    cnt = 0
    all_items = list(train_descriptions.keys())
    
    while True:
      n+=1
      # print(cnt)
      key = all_items[cnt]
      desc_list = train_descriptions[key]              
      cnt+=1
      cnt= (cnt%len(all_items))


      photo = encoding_train[key+'.jpg']          #feature vector
            #print(photo.shape)
      for desc in desc_list:                       #desc : iterates through the 5 captions
        seq = [word_to_idx[word] for word in desc.split() if word in word_to_idx]                
                
        for i in range(1, len(seq)): 
            in_seq = seq[0:i]
            out_seq = seq[i]
            
            in_seq = pad_sequences( [in_seq], maxlen=max_len, value= 0, padding='post')[0]
        
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            
            X1.append(photo)
            X2.append(in_seq)
            y.append(out_seq)

      if n == num_photos_per_batch:
          yield [np.array(X1), np.array(X2)] , np.array(y)
          X1, X2, y = [], [], []
          n = 0         

In [None]:
for i in data_generator(train_descriptions, encoding_train, word_to_idx, max_len, 3):
    X, y = i
    print(X[0].shape) 
    print(X[1].shape)
    print(y.shape)
    break

In [None]:
embeddings = {}

with open('/content/drive/MyDrive/glove.6B.50d.txt', 'r', encoding='utf-8') as f:
    for line in f:
        values = line.split()
        word = values[0]
        coeffs = np.array(values[1:], dtype="float32")
        
        embeddings[word] = coeffs


In [None]:
i=0
for keys,values in embeddings.items():
 i=i+1
 
 print(keys,values)
 
 if i>10:break

In [None]:
print(len(embeddings))

In [None]:
def getOutputEmbeddings():

    emb_dim = 50
    embedding_matrix_output = np.zeros((vocab_size, emb_dim ))
    
    for word, idx in word_to_idx.items():
        
        emb_vec = embeddings.get(word)
        
        if emb_vec is not None:
            embedding_matrix_output[idx] = emb_vec
            
    return embedding_matrix_output

In [None]:
embedding_output = getOutputEmbeddings()

In [None]:
embedding_output.shape

In [None]:
print(embedding_output[3:5])

In [None]:
input_img_fea = Input(shape=(2048,))
inp_img1 = Dropout(0.5)(input_img_fea)
inp_img2 = Dense(256, activation='relu')(inp_img1)

In [None]:
print(inp_img2.shape)

In [None]:
input_cap = Input(shape=(max_len,))
inp_cap1 = Embedding(input_dim= vocab_size, output_dim=50, mask_zero=True)(input_cap)
#print(inp_cap1)
inp_cap2 = Dropout(0.5)(inp_cap1)
inp_cap3 = LSTM(256)(inp_cap2)

In [None]:
print(inp_cap3.shape)

In [None]:
decoder1 = add([inp_img2, inp_cap3])
print(decoder1.shape)
decoder2 = Dense(256, activation='relu')(decoder1)
output = Dense(vocab_size, activation='softmax')(decoder2)


model = Model(inputs = [input_img_fea, input_cap]  , outputs =  output )

In [None]:
model.summary()

In [None]:
model.layers[2].set_weights([embedding_output])
model.layers[2].trainable = False

In [None]:
model.compile(loss="categorical_crossentropy", optimizer='adam') 

In [None]:
epochs = 30
number_photos_per_batch = 50
steps = len(train_descriptions)//number_photos_per_batch

mytraingen = data_generator(train_descriptions, encoding_train, word_to_idx, max_len, number_photos_per_batch)

model.fit(mytraingen,steps_per_epoch=steps,epochs = epochs,verbose=1)


Epoch 13/30
Epoch 14/30
Epoch 15/30
Epoch 16/30
Epoch 17/30
Epoch 18/30
Epoch 19/30
Epoch 20/30
Epoch 21/30
 20/120 [====>.........................] - ETA: 8:32 - loss: 2.7587

In [None]:
model.save("final_model.h5") 

In [None]:
model.save_weights("/content/drive/MyDrive/drive-download-20210515T180709Z-001/weights.h5") 

In [None]:
model=load_model('/content/final_model.h5')

# Predictions

In [None]:
def predict(photo_enc,model):
    in_text = "<startseq>"
    
    for i in range(max_len):
        sequence = [word_to_idx[word] for word in in_text.split() if word in word_to_idx]
        #print(sequence)
        sequence = pad_sequences([sequence], maxlen=max_len, padding='post')
        
        y_pred = model.predict([photo_enc, sequence])
        y_pred = np.argmax(y_pred)
        word = idx_to_word[y_pred]
        
        in_text += " "+word
        
        if word == '<endseq>':
            break
        
        
    final_caption = in_text.split()
    final_caption = final_caption[1:-1]
    final_caption = " ".join(final_caption)
    return final_caption

In [None]:
one=encode_image('/content/sample1.jpg').reshape((1,2048))
print(one)
img = plt.imread('/content/sample1.jpg')
plt.imshow(img)
plt.show()
predict1=predict(one,model)
print(predict1)

In [None]:
two=encode_image('/content/sample2.jpg').reshape((1,2048))
print(two)
img = plt.imread('/content/sample2.jpg')
plt.imshow(img)
plt.show()
predict2=predict(two,model)
print(predict2)

In [None]:
three=encode_image('/content/sample3.jpg').reshape((1,2048))
print(three)
img = plt.imread('/content/sample3.jpg')
plt.imshow(img)
plt.show()
predict3=predict(three,model)
print(predict3)

In [None]:
four=encode_image('/content/sample4.jpg').reshape((1,2048))
print(four)
img = plt.imread('/content/sample4.jpg')
plt.imshow(img)
plt.show()
predict4=predict(four,model)
print(predict4)

In [None]:
five=encode_image('/content/sample5.jpg').reshape((1,2048))
print(five)
img = plt.imread('/content/sample5.jpg')
plt.imshow(img)
plt.show()
predict5=predict(five,model)
print(predict5)