In [None]:
import os
from collections import defaultdict
import numpy as np
import PIL
from matplotlib import pyplot as plt
%matplotlib inline
import tensorflow as tf
from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Embedding, LSTM, Dense, Input, Bidirectional, RepeatVector, Concatenate, Activation
from tensorflow.keras.activations import softmax
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.applications.inception_v3 import InceptionV3

from tensorflow.keras.optimizers import Adam

In [None]:
def load_image_list(filename):
    with open(filename,'r') as image_list_f: 
        return [line.strip() for line in image_list_f]

In [None]:
folder = 'dataset'
train_list = load_image_list(os.path.join(folder, 'Flickr_8k.trainImages.txt'))
dev_list = load_image_list(os.path.join(folder,'Flickr_8k.devImages.txt'))
test_list = load_image_list(os.path.join(folder,'Flickr_8k.testImages.txt'))

In [None]:
len(train_list), len(dev_list), len(test_list)


In [None]:
IMG_PATH = os.path.join(folder, "Flickr8k_Dataset")
image = PIL.Image.open(os.path.join(IMG_PATH, dev_list[100]))
image

In [52]:
OUTPUT_PATH = "encodings" 
if not os.path.exists(OUTPUT_PATH):
    os.mkdir(OUTPUT_PATH)

In [None]:
def get_image(image_name):
    image = PIL.Image.open(os.path.join(IMG_PATH, image_name))
    return np.asarray(image.resize((299,299))) / 255.0  

In [None]:
plt.imshow(get_image(dev_list[25]))

In [None]:
img_model = InceptionV3(weights='imagenet')
img_model.summary()

In [None]:
new_input = img_model.input
new_output = img_model.layers[-2].output
img_encoder = Model(new_input, new_output)

In [None]:
def encode(image):
    image = preprocess_img(image)
    vec = model.predict(image)
    vec = np.reshape(vec, (vec.shape[1]))
    return vec

In [None]:
import time
#run the encode function on all train images
start = time.time()
encoding_train = {}
for img in train_img:
    encoding_train[img[len(images):]] = encode(img)
print("Time Taken is: " + str(time.time() - start))

#Encode all the test images
start = time.time()
encoding_test = {}
for img in test_img:
    encoding_test[img[len(images):]] = encode(img)
print("Time taken is: " + str(time.time() - start))

In [None]:
new_image = get_image(dev_list[25])
encoded_image = img_encoder.predict(np.array([new_image]))
encoded_image

In [None]:
def img_generator(img_list):
    for img_file in img_list:
        img = PIL.Image.open(os.path.join("dataset", "Flickr8k_Dataset", img_file))
        img = img.resize((299, 299))  # Resize the image to (299, 299)
        img_array = np.array(img)
        img_array = np.expand_dims(img_array, axis=0)  # Add an extra dimension for batch size
        yield img_array
        img.close()

In [None]:
enc_train = img_encoder.predict_generator(img_generator(train_list), steps=len(train_list), verbose=1)
enc_dev = img_encoder.predict_generator(img_generator(dev_list), steps=len(dev_list), verbose=1)
enc_test = img_encoder.predict_generator(img_generator(test_list), steps=len(test_list), verbose=1)

In [None]:
OUTPUT_PATH = "encodings" 
if not os.path.exists(OUTPUT_PATH):
    os.mkdir(OUTPUT_PATH)

np.save(os.path.join(OUTPUT_PATH,"encoded_images_train.npy"), enc_train)
np.save(os.path.join(OUTPUT_PATH,"encoded_images_dev.npy"), enc_dev)
np.save(os.path.join(OUTPUT_PATH,"encoded_images_test.npy"), enc_test)    

In [None]:
#load image captions

def read_image_descriptions(filename):    
    image_descriptions = defaultdict(list)   
    
    with open(filename,'r') as f:
        for line in f:
            image_name, image_description = line.strip().split('\t')
            #print(image_name[:-2], "<START> " + image_description.lower() + " <END>")
            sequence = "<START> " + image_description.lower() + " <END>"
            image_descriptions[image_name[:-2]].append(sequence.split())

    return image_descriptions

descriptions = read_image_descriptions("dataset/Flickr8k.token.txt")
print(descriptions[dev_list[0]])


In [None]:
#create dictionary that maps tokens in training data to numbers 

from collections import defaultdict

id_to_word = defaultdict(str)
word_to_id = defaultdict(int)

train_set_tokens = set()

for file in train_list:
    for seq in descriptions[file]:
        for word in seq:
            train_set_tokens.add(word)

train_set_tokens = list(train_set_tokens)
train_set_tokens.sort()

for i, word in enumerate(train_set_tokens):
    id_to_word[i] = word
    word_to_id[word] = i



In [None]:
print(word_to_id["dog"], id_to_word[1985])

In [None]:

def load_clean_descriptions(des, dataset):
    dataset_des = dict()
    for key, des_list in des.items():
        if key in dataset:
            if key not in dataset_des:
                dataset_des[key] = list()
            for line in des_list:
                dataset_des[key].append(' '.join(line))
    return dataset_des

train_descriptions = load_clean_descriptions(descriptions, train_list)


In [66]:
#data generation

enc_train = np.load(os.path.join(OUTPUT_PATH,"encoded_images_train.npy"))
enc_dev = np.load(os.path.join(OUTPUT_PATH,"encoded_images_dev.npy"))
enc_test = np.load(os.path.join(OUTPUT_PATH,"encoded_images_test.npy"))

train_features = defaultdict(list)

for i, image_id in enumerate(train_list):
    train_features[image_id].append(enc_train[i])


X1, X2, y = list(), list(), list()
MAX_LEN = 40
vocab_size = len(word_to_id)

for key, des_list in train_descriptions.items():
    pic = train_features[key][0]
    
    for cap in des_list:
        seq = [word_to_id[word] for word in cap.split(' ') if word in word_to_id]
        for i in range(1, len(seq)):
            in_seq, out_seq = seq[:i], seq[i]
            in_seq = pad_sequences([in_seq], maxlen = MAX_LEN)[0]
            out_seq = to_categorical([out_seq], num_classes = vocab_size)[0]
            #store
            X1.append(pic)
            X2.append(in_seq)
            y.append(out_seq)

X2 = np.array(X2)
X1 = np.array(X1)
y = np.array(y)
print(X1.shape)

(383454, 2048)


In [None]:
embeddings_index = {}

glove = open('./glove.6B.200d.txt', 'r', encoding = 'utf-8').read()
for line in glove.split("\n"):
    values = line.split(" ")
    word = values[0]
    indices = np.asarray(values[1: ], dtype = 'float32')
    embeddings_index[word] = indices
print('Total word vectors: ' + str(len(embeddings_index)))

emb_dim = 200
emb_matrix = np.zeros((vocab_size, emb_dim))
for word, i in word_to_id.items():
    emb_vec = embeddings_index.get(word)
    if emb_vec is not None:
        emb_matrix[i] = emb_vec
emb_matrix.shape


In [70]:
from keras.utils import pad_sequences
from keras.preprocessing.text import Tokenizer
from keras.models import Model
from keras.layers import Flatten, Dense, LSTM, Dropout, Embedding, Activation
from keras.layers import concatenate, BatchNormalization, Input
from keras.layers import add
from keras.utils import to_categorical
from keras.applications.inception_v3 import InceptionV3, preprocess_input
from keras.utils import plot_model

In [None]:
# define the model
ip1 = Input(shape = (2048, ))
fe1 = Dropout(0.2)(ip1)
fe2 = Dense(256, activation = 'relu')(fe1)
ip2 = Input(shape = (MAX_LEN, ))
se1 = Embedding(vocab_size, emb_dim, mask_zero = True)(ip2)
se2 = Dropout(0.2)(se1)
se3 = LSTM(256)(se2)
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation = 'relu')(decoder1)
outputs = Dense(vocab_size, activation = 'softmax')(decoder2)
model = Model(inputs = [ip1, ip2], outputs = outputs)
model.summary()

In [72]:
def greedy_search(pic):
    start = '<START>'
    for i in range(MAX_LEN):
        seq = [word_to_id[word] for word in start.split() if word in word_to_id]
        seq = pad_sequences([seq], maxlen = MAX_LEN)
        yhat = model.predict([pic, seq])
        yhat = np.argmax(yhat)
        word = id_to_word[yhat]
        start += ' ' + word
        if word == '<END>':
            break
    final = start.split()
    final = final[1:-1]
    final = ' '.join(final)
    return final

KeyboardInterrupt: 