<a href="https://colab.research.google.com/github/TanyaChutani/ImageCaptionGenerator/blob/master/Image_captioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
from google.colab import drive
drive.mount('/content/drive')

In [0]:
!cp '/content/drive/My Drive/Flickr8k_Dataset.zip' '/content/Flickr8k.zip'
!cp '/content/drive/My Drive/fasttext.zip' '/content/fasttext.zip'
!cp '/content/drive/My Drive/Flickr8k_text.zip' '/content/Flickr8k_text.zip'
!unzip '/content/Flickr8k_text.zip'
!unzip '/content/Flickr8k.zip' -d '/content/Flickr8k'
!unzip '/content/fasttext.zip'

## Importing Libraries

In [0]:
import numpy as np
import pandas as pd
import nltk
import io
import cv2
import os
import tensorflow
import string
import pickle
import tensorflow as tf
from glob import glob
from collections import Counter
import matplotlib.pyplot as plt
from nltk.stem import WordNetLemmatizer
from wordcloud import WordCloud, STOPWORDS 
import pickle
import itertools
import pathlib
from tensorflow.python.keras.models import *
from tensorflow.python.keras.layers import *
from tensorflow.keras.utils import to_categorical
from tensorflow.python.keras.callbacks import ModelCheckpoint
from tensorflow.python.keras.preprocessing.text import Tokenizer
from tensorflow.python.keras.preprocessing.sequence import pad_sequences
from tensorflow.python.keras.applications.inception_v3 import InceptionV3,\
preprocess_input

nltk.download('wordnet')

### Prepare the Photo Feature Extractor

In [0]:
def pre_process(img_path):
  img=cv2.imread(img_path)
  dsize=(224,224)
  img=cv2.resize(img,dsize,interpolation=cv2.INTER_NEAREST)  
  img=np.expand_dims(img,axis=0)
  img=preprocess_input(img)
  return img

def model():
  model=InceptionV3(include_top=False,weights='imagenet')
  for layer in model.layers:
    layer.trainable=False
  return model

features_df=dict()
def feature_extractor(model,directory):
  for img in os.listdir(img_root):
    img_path=directory+"/"+img
    preproceesed_img=pre_process(img_path)
    feature=model.predict(preproceesed_img)
    image_id = (img_path.split('.')[0]).split('/')[4]
    features_df.update({image_id:feature})
  return features_df

img_root = '/content/Flickr8k/Flicker8k_Dataset'
inception_model=model()
features_df=feature_extractor(inception_model,img_root)
output = open('myfile.pkl', 'wb')
pickle.dump(features_df, output)
output.close()

In [0]:
infile = open('/content/myfile.pkl','rb')
features_df = pickle.load(infile)
infile.close()

features_df['2098418613_85a0c9afea']

## Prepare the text data
### Making dictionary of the text file 




In [0]:
text_df={}
directory='/content/Flickr8k.token.txt'
doc=open(directory,'r')

def make_dict(doc):
  for i,line in enumerate(doc):
    token=line.split()
    img_id,description=token[0],token[1:]
    description=' '.join(description)
    img_id=img_id.split('.')[0]
    if img_id not in text_df:
      text_df[img_id]=list()
    text_df[img_id].append(description) 
  return(text_df)
text_dict=make_dict(doc)


In [0]:
text_dict.pop('2258277193_586949ec62')

In [0]:
text_dict['2098418613_85a0c9afea']

### Preprocessing Of text
### Adding start and end sequence

In [0]:
def preprocess_text(text_df):
  porter = WordNetLemmatizer()
  punct = str.maketrans('', '', string.punctuation)
  for img_id,descriptions in text_df.items():
    pre_processed_words=[]
    for description in descriptions:
      words=description.split()
      words=[word.lower() for word in words]
      words =[word.translate(punct) for word in words]
      words=[word for word in words if word.isalpha()]
      words=[porter.lemmatize(word) for word in words]
      words=[word for word in words if len(word)>1]
      words='start '+' '.join(words)+' end'
      pre_processed_words.append(words)
    text_df[img_id]=pre_processed_words
  return text_df
text_dict=preprocess_text(text_dict)


In [0]:
text_dict['2098418613_85a0c9afea']

### Creating vocab of words
### Checking the most common and least common words


In [0]:
def create_vocab(text_df):
  vocab=[]
  for key in text_df.keys():
    vocab.extend(d.split() for d in text_df[key])
  words=[]
  for v_list in vocab:
    for word in v_list:
      words.append(word)
  return list(set(words))

vocab=create_vocab(text_dict)
vocab_size=len(vocab)

#Checking for the 10 most frequent words in the image description
counts = Counter(vocab)
print("Top 10 most frequent words",counts.most_common(10))
n=10
print("Top 10 least frequent words", counts.most_common()\
      [:-n-1:-1])


### Making Word Cloud of common words


In [0]:
#Generating wordcloud with the most frequent words
#!pip install wordcloud

def most_common(words):
  stopwords = set(STOPWORDS)
  stopwords.update(["end", "start"])
  counts = Counter(words)
  wc = WordCloud(max_words=1000, margin=10, background_color='white',
  scale=3, relative_scaling = 0.5, width=500, height=400,\
  stopwords=stopwords,random_state=1).generate(' '.join(words))
  plt.figure(figsize=(15,8))
  plt.imshow(wc)
  plt.axis("off")
  plt.show()

most_common(vocab)

### Tokenizing words using keras tokenizer

In [0]:
#creating word to index using keras tokenizer
def word_to_index(text_df):
  lines = []
  for key in text_df.keys():
    [lines.append(d) for d in text_df[key]]
  tokenizer = Tokenizer()
  tokenizer.fit_on_texts(lines)
  return(tokenizer.word_index)
word2index=word_to_index(text_dict)
idx2word = dict([(value, key) for key, value \
                 in word2index.items()]) 


### Max length

In [0]:
#Max length
def max_length(descriptions):
  lines=[]
  max_len=-1
  for key in descriptions.keys():
    for d in text_df[key]:
      if len(d.split())>max_len:
        max_len=len(d.split())
  return max_len

max_len=max_length(text_dict)

In [0]:
max_len

### Implementing fastText embedding on text data


In [0]:
def load_vectors(fname):
    fin = io.open(fname, 'r', encoding='utf-8',\
                  newline='\n', errors='ignore')
    embedding = dict()
    for line in fin:
        tokens = line.rstrip().split(' ')
        embedding[tokens[0]] = np.array(tokens[1:],\
                                        dtype='float32')
    return embedding
embedding=load_vectors('/content/wiki.simple.vec')

def fastText(embedding):
  embedding_dim = 300
  embedding_matrix = np.zeros((vocab_size+1, \
                               embedding_dim))
  for word, i in word2index.items():
      embedding_vector = embedding.get(word)
      if embedding_vector is not None:
          embedding_matrix[i] = embedding_vector
  return embedding_matrix,embedding_dim

embedding_matrix,embedding_dim=fastText(embedding)


In [0]:
text_values=np.array(tuple(text_dict.values()))
text_idx=np.array(tuple(text_dict.keys()))

### Creating Training Data


In [0]:
def create_train_data(text_values,text_idx,max_length,vocab_size):
  X1,X2,y=list(),list(),list()
  for i in range(0,len(text_idx)):
    for line in text_values[i]:
      numeric_seq = [word2index[word] for word in line.split()\
                     if word in word2index]
    for ii in range(1,len(numeric_seq)):
      in_seq,out_seq=numeric_seq[:ii],numeric_seq[ii]
      in_seq=pad_sequences([in_seq],maxlen=max_length,\
                           padding='post')[0]
      out_seq=to_categorical([out_seq],num_classes=vocab_size+1)[0]
      X2.append(in_seq)
      y.append(out_seq)
      X1.append(text_idx[i])

  return (X1),(X2),(y)

In [0]:
X1,X2,y=create_train_data(text_values,text_idx,max_len,vocab_size)

## Data Generators

In [0]:
batch_size=8
step_per_epoch=len(X1)//batch_size
epoch=30

def data_generator(X1,X2,y,batch_size,epoch,\
                   step_size,max_len,vocab_size):
  for j in range(0,epoch):
    for k in range(0,step_size):      
      for offset in range(0, len(X1), batch_size):
        batch_X1,batchX2,batchY=list(),list(),list()
        start_index=offset
        end_index=offset+batch_size
        for i in X1[start_index:end_index]:
          batch_X1.append(features_df[i][0][0][0])
        batchX2,batchY=X2[start_index:end_index],\
        y[start_index:end_index]
        yield [np.array(batch_X1),np.array(batchX2)],\
        np.array(batchY)
train_gen=data_generator(X1,X2,y,batch_size,epoch,\
                         step_per_epoch,max_len,vocab_size)

## Model

In [0]:
#Hyper params

#image 
input_img = Input(shape=(2048,))
feature_img1 = Dropout(0.5)(input_img)
feature_img2 = Dense(256, activation='relu')(feature_img1)

#text
inputs_text = Input(shape=(max_len,))
feature_text1 = Embedding(vocab_size+1,\
                          embedding_dim,name='embedding_layer')\
                          (inputs_text)
feature_text2 = Dropout(0.5)(feature_text1)
feature_text3 = GRU(256)(feature_text2)

#Concat
decoder1 = add([feature_img2, feature_text3])
batch_norm=BatchNormalization(momentum=0.99, epsilon=0.001)(decoder1)
decoder2 = Dense(256, activation='relu')(batch_norm)
outputs = Dense(vocab_size+1, activation='softmax')(decoder2)

model = Model(inputs=[input_img, inputs_text], outputs=outputs)
model.get_layer('embedding_layer').set_weights([embedding_matrix])
model.get_layer('embedding_layer').trainable = False

model.compile(optimizer='adam',loss='categorical_crossentropy',\
              metrics=['accuracy'])
model.summary()
filepath = 'ImageCaptioningModel.h5'
checkpoint = ModelCheckpoint(filepath, monitor='loss', \
                             save_best_only=True,mode='min')

model.fit_generator(train_gen, epochs=epoch, steps_per_epoch=step_per_epoch, \
                    callbacks=[checkpoint],verbose=1)
model.save_weights('/content/ImageCaptioningWeights.hdf5')


## Greedy Search

In [0]:
def greedy_search(model,photo,vocab,max_len):
  output_text='start'
  while True:
    seq=[word2index[str(i)] for i in output_text.split()]
    seq=pad_sequences([seq],maxlen=max_len,padding='post')
    seq=np.array(seq[0])
    y_predict=model.predict([np.expand_dims(photo,axis=0),\
                             np.expand_dims(seq,axis=0)])
    output_text=output_text+' '+ idx2word[np.argmax(y_predict[0])]
    if (len(output_text.split())>max_len) or \
    ((idx2word[np.argmax(y_predict[0])]) == 'end'):
      break
  output_text=output_text.split()
  final_text=[' '.join(output_text[1:-1])]
  return final_text    


## Result

In [0]:
def test_feature_extractor(model,img_path):
  test_features=list()
  preproceesed_img=pre_process(img_path)
  plt.imshow(np.squeeze(preproceesed_img))
  plt.show()
  model=InceptionV3(include_top=False,weights='imagenet')
  for layer in model.layers:
    layer.trainable=False
  feature=model.predict(preproceesed_img)
  test_features.append(feature)
  return np.array(test_features[0][0][0][0])

img_path='/content/image_caption.jpg'
model=load_model('/content/ImageCaptioningModel.h5')

test_feature=test_feature_extractor(model,img_path)

sent=greedy_search(model,test_feature,vocab_size,max_len)
print("Caption for image",sent)