In [1]:
import numpy as np

### Load Dataset

In [2]:
all_data = np.load('train_dev_test.npz')

In [3]:
train_encoder_output = all_data['train_encoder_output']
train_decoder_input = all_data['train_decoder_input']
train_decoder_target = all_data['train_decoder_target'][:,1:,:]
validation_encoder_output = all_data['validation_encoder_output']
validation_decoder_input = all_data['validation_decoder_input']
validation_decoder_target = all_data['validation_decoder_target'][:,1:,:]
test_encoder_output = all_data['test_encoder_output']
test_decoder_input = all_data['test_decoder_input']
test_decoder_target = all_data['test_decoder_target'][:,1:,:]

In [4]:
print("Train Encoder Output", train_encoder_output.shape)
print("Train Decoder Input", train_decoder_input.shape)
print("Train Decoder Target", train_decoder_target.shape)

Train Encoder Output (30000, 512)
Train Decoder Input (30000, 38)
Train Decoder Target (30000, 38, 2531)


In [5]:
from caption_utils import *
train_fns_list, dev_fns_list, test_fns_list = load_split_lists()

train_captions_raw, dev_captions_raw, test_captions_raw = get_caption_split()
vocab = create_vocab(train_captions_raw)
token2idx, idx2token = vocab_to_index(vocab)    
captions_data = (train_captions_raw.copy(), dev_captions_raw.copy(), test_captions_raw.copy())
train_captions, dev_captions, test_captions = process_captions(captions_data, token2idx)

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


## Model Architecture

In [6]:
from keras.models import Model, Sequential
from keras.layers import Input, Dense, BatchNormalization, RepeatVector, Concatenate, Merge, Masking
from keras.layers import LSTM, GRU, Embedding, TimeDistributed, Bidirectional
from keras import backend as K
from keras import optimizers
from keras.utils import plot_model

#### Parameters

In [7]:
emb_size = 300
lstm_size = 300
vocab_size = len(vocab)
max_length = train_decoder_target.shape[1]
lstm_layers = 2

lr = 0.001
dropout_rate = 0.2
batch_size = 32
n_epochs = 30

### Model

In [8]:
K.clear_session()

In [9]:
# Image -> Image embedding
image_input = Input(shape=(train_encoder_output.shape[1], ), name='image_input')
print("Image Input shape", image_input.shape)
img_emb = Dense(emb_size, activation='relu', name='img_embedding')(image_input)
print(img_emb.shape)

Image Input shape (?, 512)
(?, 300)


In [10]:
# Sentence to Word embedding
caption_inputs = Input(shape=(None, ), name='caption_input')
print("Caption Input Shape", caption_inputs.shape)
emb_layer = Embedding(input_dim=vocab_size, output_dim=emb_size, name='Embedding')
word_emb = emb_layer(caption_inputs)
print(word_emb.shape)

Caption Input Shape (?, ?)
(?, ?, 300)


In [11]:
decoder_cell = Bidirectional(LSTM(lstm_size, return_sequences=True, return_state=True, name='decoder', 
                                  dropout=dropout_rate, recurrent_dropout=dropout_rate), 
                             merge_mode='ave')
encoder_states = [img_emb, img_emb]
decoder_out = decoder_cell(word_emb, initial_state = encoder_states + encoder_states)[0]

In [12]:
output_layer = TimeDistributed(Dense(vocab_size, activation='softmax'))
decoder_out = output_layer(decoder_out)
print(decoder_out.shape)

(?, ?, 2531)


In [13]:
rmsprop = optimizers.RMSprop(lr=0.001, rho=0.9, epsilon=None, decay=1e-6)

In [14]:
model = Model(inputs=[image_input,caption_inputs], outputs=[decoder_out])

model.compile(optimizer=rmsprop,
               loss='categorical_crossentropy',
               metrics=['accuracy'])

In [15]:
model.summary()
#plot_model(model, to_file='model2.png', show_shapes=True)

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
caption_input (InputLayer)      (None, None)         0                                            
__________________________________________________________________________________________________
image_input (InputLayer)        (None, 512)          0                                            
__________________________________________________________________________________________________
Embedding (Embedding)           (None, None, 300)    759300      caption_input[0][0]              
__________________________________________________________________________________________________
img_embedding (Dense)           (None, 300)          153900      image_input[0][0]                
__________________________________________________________________________________________________
bidirectio

In [16]:
from keras.callbacks import ModelCheckpoint

checkpointer = ModelCheckpoint(filepath='saved_models/weights.best.VGG16.bi_final.hdf5', 
                               verbose=1, save_best_only=True)

In [None]:
model.fit([train_encoder_output, train_decoder_input], [train_decoder_target], 
           validation_data=([validation_encoder_output, validation_decoder_input], [validation_decoder_target]),
           epochs=n_epochs, batch_size=batch_size, callbacks=[checkpointer], verbose=2)

Train on 30000 samples, validate on 5000 samples
Epoch 1/30
 - 149s - loss: 0.7298 - acc: 0.1883 - val_loss: 0.3640 - val_acc: 0.2567

Epoch 00001: val_loss improved from inf to 0.36403, saving model to saved_models/weights.best.VGG16.bi_final.hdf5


  str(node.arguments) + '. They will not be included '


Epoch 2/30
 - 145s - loss: 0.2876 - acc: 0.2678 - val_loss: 0.2161 - val_acc: 0.2824

Epoch 00002: val_loss improved from 0.36403 to 0.21609, saving model to saved_models/weights.best.VGG16.bi_final.hdf5
Epoch 3/30
 - 144s - loss: 0.1923 - acc: 0.2844 - val_loss: 0.1562 - val_acc: 0.2928

Epoch 00003: val_loss improved from 0.21609 to 0.15617, saving model to saved_models/weights.best.VGG16.bi_final.hdf5
Epoch 4/30


## Inference

In [None]:
encoder_model = Model(image_input, encoder_states)

In [None]:
encoder_model.summary()

In [None]:
decoder_state_input_h = Input(shape=(lstm_size, ))
decoder_state_input_c = Input(shape=(lstm_size, ))

In [None]:
decoder_states_inputs = [decoder_state_input_h, decoder_state_input_c]
decoder_outputs, state_h, state_c = decoder_cell(word_emb, initial_state=decoder_states_inputs)

In [None]:
decoder_states = [state_h, state_c]
decoder_outputs = output_layer(decoder_outputs)
decoder_model = Model([caption_inputs] + decoder_states_inputs, [decoder_outputs] + decoder_states)

In [None]:
decoder_model.summary()

In [None]:
def decode_sequence(input_seq):
    # Encode the input as state vectors.
    states_value = encoder_model.predict(input_seq)

    # Generate empty target sequence of length 1.
    target_seq = np.zeros((1, 1, num_decoder_tokens))
    # Populate the first character of target sequence with the start character.
    target_seq[0, 0, target_token_index['\t']] = 1.

    # Sampling loop for a batch of sequences
    # (to simplify, here we assume a batch of size 1).
    stop_condition = False
    decoded_sentence = ''
    while not stop_condition:
        output_tokens, h, c = decoder_model.predict(
            [target_seq] + states_value)

        # Sample a token
        sampled_token_index = np.argmax(output_tokens[0, -1, :])
        sampled_char = reverse_target_char_index[sampled_token_index]
        decoded_sentence += sampled_char

        # Exit condition: either hit max length
        # or find stop character.
        if (sampled_char == '\n' or
           len(decoded_sentence) > max_decoder_seq_length):
            stop_condition = True

        # Update the target sequence (of length 1).
        target_seq = np.zeros((1, 1, num_decoder_tokens))
        target_seq[0, 0, sampled_token_index] = 1.

        # Update states
        states_value = [h, c]

    return decoded_sentence

## Load the Model with the Best Validation Loss

In [None]:
model.load_weights('saved_models/weights.best.VGG16.bi_final.hdf5')

In [None]:
def generate_seq(img_input):
    if img_input.shape != (1, 512):
        img_input = img_input.reshape(1, 512)
    
    assert(img_input.shape == (1, 512))
    stop_condition = False
    decoded_sentence = []
    target_seq = np.array([token2idx['<bos>']]).reshape(1, 1)
    states_value = encoder_model.predict(img_input)

    while not stop_condition:

        output_tokens, h, c = decoder_model.predict([target_seq] + states_value)

        sampled_token_index = int(np.argmax(output_tokens[0, -1, :]))

        sampled_char = idx2token[sampled_token_index]

        decoded_sentence += [sampled_char]

        if (sampled_char == '<eos>' or len(decoded_sentence) > 30):
            stop_condition = True

        target_seq = np.array([sampled_token_index]).reshape(1, 1)

        states_value = [h, c]

    return ' '.join(decoded_sentence)

In [None]:
for i in range(20):
    print(generate_seq(test_encoder_output[i*5, :]))
    print('*'*30)
    for j in range(5):
        print(intseq_to_caption(idx2token, test_captions[test_fns_list[i]][j]))
    print()

    

In [None]:
for i in range(20):
    for j in range(5):
        print(intseq_to_caption(idx2token, train_captions[train_fns_list[i]][j]))

In [None]:
from keras.models import load_model
model.save("bi_test.h5")

del model  # deletes the existing model
model = load_model('bi_test.h5')

In [None]:
from keras.applications.vgg16 import VGG16
from keras.preprocessing import image
from keras.applications.vgg16 import preprocess_input
import numpy as np

VGG16_model = VGG16(weights='imagenet', include_top=False, pooling='avg')

img_path = 'data/Arnav_Hankyu_Pulkit2.jpg'
img = image.load_img(img_path, target_size=(224, 224))
x = image.img_to_array(img)
x = np.expand_dims(x, axis=0)
x = preprocess_input(x)

features = VGG16_model.predict(x)

In [None]:
print(generate_seq(features))