In [None]:
import os
from collections import defaultdict
import numpy as np
import PIL
from matplotlib import pyplot as plt
import random
from queue import PriorityQueue
%matplotlib inline

from tensorflow.keras import Sequential, Model
from tensorflow.keras.layers import Embedding, LSTM, Dense, Input, Bidirectional, RepeatVector, Concatenate, Activation
from tensorflow.keras.activations import softmax
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.preprocessing.sequence import pad_sequences

from tensorflow.keras.applications.inception_v3 import InceptionV3

from tensorflow.keras.optimizers import Adam

In [None]:
def load_image_list(filename):
    with open(filename,'r') as image_list_f: 
        return [line.strip() for line in image_list_f]    

In [None]:
train_list = load_image_list(os.path.join(FLICKR_PATH, 'Flickr_8k.trainImages.txt'))
dev_list = load_image_list(os.path.join(FLICKR_PATH,'Flickr_8k.devImages.txt'))
test_list = load_image_list(os.path.join(FLICKR_PATH,'Flickr_8k.testImages.txt'))

In [None]:
len(train_list), len(dev_list), len(test_list)

In [None]:
dev_list[20]

In [None]:
IMG_PATH = os.path.join(FLICKR_PATH, "Flickr8k_Dataset")

In [None]:
import numpy as np
import PIL
from matplotlib import pyplot as plt

In [None]:
image = PIL.Image.open(os.path.join(IMG_PATH, dev_list[20]))
image

If cant't see the image, try

In [None]:
plt.imshow(image)

299x299 pixels, with 3 colours channels. 

In [None]:
np.asarray(image).shape

In [None]:
np.asarray(image)

In [None]:
new_image = np.asarray(image.resize((299,299))) / 255.0
plt.imshow(new_image)

In [None]:
new_image.shape

In [None]:
def get_image(image_name):
    image = PIL.Image.open(os.path.join(IMG_PATH, image_name))
    return np.asarray(image.resize((299,299))) / 255.0                     

In [None]:
plt.imshow(get_image(dev_list[25]))

In [None]:
from tensorflow.keras.applications.inception_v3 import InceptionV3
img_model = InceptionV3(weights='imagenet') # This will download the weight files for you and might take a while.

In [None]:
img_model.summary() # this is quite a complex model. 

This is a prediction model,so the output is typically a softmax-activated vector representing 1000 possible object types. Because we are interested in an encoded representation of the image we are just going to use the second-to-last layer as a source of image encodings. Each image will be encoded as a vector of size 2048. 

Please use the following hack: hook up the input into a new Keras model and use the penultimate layer of the existing model as output.

In [None]:
new_input = img_model.input
new_output = img_model.layers[-2].output
img_encoder = Model(new_input, new_output) # This is the final Keras image encoder model we will use.

At this point, you may want to add a GPU to the VM you are using (if not using already).

In [None]:
encoded_image = img_encoder.predict(np.array([new_image]))

In [None]:
encoded_image

In [None]:
img_model = InceptionV3(weights='imagenet')
def img_generator(img_list):
    for image in img_list:
        image_loaded = get_image(image)
        yield np.array([image_loaded])

In [None]:
enc_train = img_encoder.predict_generator(img_generator(train_list), steps=len(train_list), verbose=1)

In [None]:
enc_train[11]

In [None]:
enc_dev = img_encoder.predict_generator(img_generator(dev_list), steps=len(dev_list), verbose=1)

In [None]:
enc_test = img_encoder.predict_generator(img_generator(test_list), steps=len(test_list), verbose=1)

In [None]:
# Choose a suitable location here
OUTPUT_PATH = "suitablelocation" 
if not os.path.exists(OUTPUT_PATH):
    os.mkdir(OUTPUT_PATH)

In [None]:
np.save(os.path.join(OUTPUT_PATH,"encoded_images_train.npy"), enc_train)
np.save(os.path.join(OUTPUT_PATH,"encoded_images_dev.npy"), enc_dev)
np.save(os.path.join(OUTPUT_PATH,"encoded_images_test.npy"), enc_test)

In [None]:
def read_image_descriptions(filename):    
    image_descriptions = defaultdict(list)
    with open(filename, 'r') as file:
        for line in file:
            tabsplit = line.split('\t')
            name = tabsplit[0].split('#')[0]
            description = ["<START>"] + tabsplit[1].lower().split() + ["<END>"]
            image_descriptions[name].append(description)
    return image_descriptions

In [None]:
descriptions = read_image_descriptions(f"{FLICKR_PATH}/Flickr8k.token.txt")

In [None]:
print(descriptions[dev_list[0]])

In [None]:
def get_id_to_word(descriptions):
    words = set()
    for doc in descriptions.values():
        for description in doc:
            for word in description:
                words.add(word)
    return {ix:word for ix, word in enumerate(sorted(words))}

def get_word_to_id(descriptions):
    words = set()
    for doc in descriptions.values():
        for description in doc:
            for word in description:
                words.add(word)
    return {word:ix for ix, word in enumerate(sorted(words))}

id_to_word = get_id_to_word(descriptions)

In [None]:
word_to_id = get_word_to_id(descriptions)

In [None]:
max(len(description) for image_id in train_list for description in descriptions[image_id])

In [None]:
MAX_LEN = 40
EMBEDDING_DIM=300
vocab_size = len(word_to_id)

# Text input
text_input = Input(shape=(MAX_LEN,))
embedding = Embedding(vocab_size, EMBEDDING_DIM, input_length=MAX_LEN)(text_input)
x = Bidirectional(LSTM(512, return_sequences=False))(embedding)
pred = Dense(vocab_size, activation='softmax')(x)
model = Model(inputs=[text_input],outputs=pred)
model.compile(loss='categorical_crossentropy', optimizer='RMSprop', metrics=['accuracy'])

model.summary()

In [None]:
def text_training_generator(batch_size=128):
    while True:
        batch_image_ids = np.random.choice(train_list, batch_size)
        
        input_sequences_list = []
        output_words_list = []

        for image_id in batch_image_ids:
            descs = descriptions[image_id]
            desc = random.choice(descs)
            tokenized_desc = [word_to_id.get(word, None) for word in desc]

            tokenized_desc = [word_id for word_id in tokenized_desc if word_id is not None]

            input_sequences = []
            output_words = []

            for i in range(1, len(tokenized_desc)):
                input_seq = tokenized_desc[:i]
                output_word = tokenized_desc[i]

                input_sequences.append(input_seq)
                output_words.append(output_word)

            input_sequences = pad_sequences(input_sequences, maxlen=MAX_LEN, padding='pre')
            output_words = to_categorical(output_words, num_classes=vocab_size)
            input_sequences_list.append(input_sequences)
            output_words_list.append(output_words)


        input_batch = np.vstack(input_sequences_list)
        output_batch = np.vstack(output_words_list)

        yield (input_batch, output_batch)

Finallyyy, train the model 

In [None]:
batch_size = 128
generator = text_training_generator(batch_size)
steps = len(train_list) * MAX_LEN // batch_size 

In [None]:
model.fit_generator(generator, steps_per_epoch=steps, verbose=True, epochs=10)

Greedy decoder

In [None]:
def decoder():
    input_seq = [word_to_id['<START>']]

    # Loop until the <END> token is predicted or the sequence reaches MAX_LEN words
    while len(input_seq) < MAX_LEN:
        padded_input_seq = pad_sequences([input_seq], maxlen=MAX_LEN, padding='pre')
        predicted_word_probs = model.predict(padded_input_seq)
        most_likely_word_id = np.argmax(predicted_word_probs, axis=-1)[0]
        predicted_word = id_to_word[most_likely_word_id]

        if predicted_word == '<END>':
            break

        input_seq.append(most_likely_word_id)

    decoded_sequence = [id_to_word[word_id] for word_id in input_seq]

    return decoded_sequence

In [None]:
print(decoder())

In [None]:
def sample_decoder():
    input_seq = [word_to_id['<START>']]
    while len(input_seq) < MAX_LEN:
        padded_input_seq = pad_sequences([input_seq], maxlen=MAX_LEN, padding='pre')
        predicted_word_probs = model.predict(padded_input_seq)
        epsilon = 1e-7
        normalized_word_probs = predicted_word_probs[0] / (np.sum(predicted_word_probs[0]+epsilon))
        sampled_word_id = np.random.multinomial(1, normalized_word_probs.astype('float64')).argmax()
        predicted_word = id_to_word[sampled_word_id]
        if predicted_word == '<END>':
            break

        input_seq.append(sampled_word_id)

    decoded_sequence = [id_to_word[word_id] for word_id in input_seq]

    return decoded_sequence

In [None]:
for i in range(10): 
    print(sample_decoder())

In [None]:
MAX_LEN = 40
EMBEDDING_DIM=300
IMAGE_ENC_DIM=300

# Image input
img_input = Input(shape=(2048,))
img_enc = Dense(300, activation="relu") (img_input)
images = RepeatVector(MAX_LEN)(img_enc)

# Text input
text_input = Input(shape=(MAX_LEN,))
embedding = Embedding(vocab_size, EMBEDDING_DIM, input_length=MAX_LEN)(text_input)
x = Concatenate()([images,embedding])
y = Bidirectional(LSTM(256, return_sequences=False))(x) 
pred = Dense(vocab_size, activation='softmax')(y)
model = Model(inputs=[img_input,text_input],outputs=pred)
model.compile(loss='categorical_crossentropy', optimizer="RMSProp", metrics=['accuracy'])

model.summary()

In [None]:
enc_train = np.load(f"{OUTPUT_PATH}/encoded_images_train.npy")
enc_dev = np.load(f"{OUTPUT_PATH}/encoded_images_dev.npy")

In [None]:
def training_generator(batch_size=128):
    while True:
        batch_image_ids = np.random.choice(train_list, batch_size)

        image_inputs = []
        text_inputs = []
        next_words = []

        for image_id in batch_image_ids:
            descs = descriptions[image_id]
            desc = random.choice(descs)

            tokenized_desc = [word_to_id.get(word, None) for word in desc]
            tokenized_desc = [word_id for word_id in tokenized_desc if word_id is not None]

            for i in range(1, len(tokenized_desc)):
                input_seq = tokenized_desc[:i]
                output_word = tokenized_desc[i]

                image_inputs.append(enc_train[train_list.index(image_id)])

                text_inputs.append(input_seq)
                next_words.append(output_word)


        text_inputs = pad_sequences(text_inputs, maxlen=MAX_LEN, padding='pre')
        next_words = to_categorical(next_words, num_classes=vocab_size)

        yield ([np.array(image_inputs), np.array(text_inputs)], np.array(next_words))

In [None]:
batch_size = 128
generator = training_generator(batch_size)
steps = len(train_list) * MAX_LEN // batch_size 

In [None]:
model.fit_generator(generator, steps_per_epoch=steps, verbose=True, epochs=20)

In [None]:
model.save_weights(f"{OUTPUT_PATH}/model.h5")

In [None]:
def image_decoder(enc_image):
    input_seq = [word_to_id['<START>']]

    while len(input_seq) < MAX_LEN:
        padded_input_seq = pad_sequences([input_seq], maxlen=MAX_LEN, padding='pre')

        input_data = [enc_image.reshape(1, -1), padded_input_seq]

        predicted_word_probs = model.predict(input_data, verbose=0)
        sampled_word_id = int(np.argmax(predicted_word_probs[0]))
        predicted_word = id_to_word[sampled_word_id]
        if predicted_word == '<END>':
            break

        input_seq.append(sampled_word_id)

    decoded_sequence = [id_to_word[word_id] for word_id in input_seq]
    return decoded_sequence


sanity check

In [None]:
plt.imshow(get_image(train_list[0]))
image_decoder(enc_train[0])

In [None]:
plt.imshow(get_image(dev_list[1]))
image_decoder(enc_dev[0])

In [None]:
def img_beam_decoder(n, image_enc):
    start_token = word_to_id['<START>']
    end_token = word_to_id['<END>']

    initial_beam = (1.0, [start_token])

    beam_queue = PriorityQueue()
    beam_queue.put((-initial_beam[0], initial_beam[1]))

    for _ in range(MAX_LEN):
        candidates = []

        while not beam_queue.empty():
            prob, seq = beam_queue.get()
            prob = -prob  

            padded_input_seq = pad_sequences([seq], maxlen=MAX_LEN, padding='pre')
            input_data = [image_enc.reshape(1, -1), padded_input_seq]

            predicted_word_probs = model.predict(input_data,verbose = 0).flatten()

            top_word_indices = np.argpartition(predicted_word_probs, -n)[-n:]

            for word_index in top_word_indices:
                if word_index != end_token:
                    new_prob = prob * predicted_word_probs[word_index]
                    new_seq = seq + [word_index]
                    candidates.append((new_prob, new_seq))

        candidates.sort(key=lambda x: x[0], reverse=True)
        candidates = candidates[:n]

        beam_queue = PriorityQueue()
        for prob, seq in candidates:
            beam_queue.put((-prob, seq))

    best_sequence_prob, best_sequence = candidates[0]

    decoded_sequence = [id_to_word[word_id] for word_id in best_sequence]

    return decoded_sequence

img_beam_decoder(3, enc_dev[0])

In [None]:
indices = np.random.choice(len(enc_dev), 5)
images_to_show = [enc_dev[i] for i in indices]
for image in images_to_show:
    print(image_decoder(image))
    print(img_beam_decoder(3,image))
    print(img_beam_decoder(5, image))
    