# Imports

In [None]:
import cv2
import os
import copy
import joblib
import copy
import string
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.models import Model
from keras.models import Sequential
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Embedding
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.callbacks import ModelCheckpoint
from nltk.translate.bleu_score import sentence_bleu, corpus_bleu
from keras_preprocessing.sequence import pad_sequences
from keras.applications import Xception, InceptionV3, ResNet50, ResNet50V2, VGG16
from keras.layers import TimeDistributed, Activation, RepeatVector, Concatenate


# Configuration

In [None]:
config = {
    'images_path': 'train_data/Flicker8k_Dataset/',
    'train_data_path': 'train_data/Flickr_8k.trainImages.txt',
    'val_data_path': 'train_data/Flickr_8k.devImages.txt',
    'captions_path': 'train_data/Flickr8k.token.txt',
    'features_path': 'model_data/features.joblib',
    'model_data_path': 'model_data/',
    'model_load_path': 'model_data/model_inceptionv3_epoch-20_train_loss-2.4050_val_loss-3.0527.hdf5',
    'num_of_epochs': 50,
    'max_length': 40,  # This is set manually after training of model and required for test.py
    'batch_size': 3,
    'test_data_path': 'test_data/',
    'model_type': 'xception',
    'tokenizer_path': 'model_data/tokenizer.joblib',
    'random_seed': 5
}

lstmConfig = {
    'embedding_size': 128
}


# Load Images

In [None]:
def load_images(path):
    return os.listdir(path)


# CNN Model for Feature Extraction

### Load Model

In [None]:
def CNNModel(model_type):
    models = {
        'inceptionv3': InceptionV3(include_top=True),
        'xception': Xception(include_top=True),
        'vgg16': VGG16(include_top=True),
        'resnet50': ResNet50(include_top=True),
        'resnet50v2': ResNet50V2(include_top=True)
    }
    size = 224
    if model_type == 'xception':
        size = 299
    base_model = models[model_type]
    last = base_model.layers[-2].output
    main_model = Model(inputs=base_model.input, outputs=last)
    return main_model, size


## Feature Extraction

In [None]:
def extract_features(path, model_type):
    features = {}
    model, size = CNNModel(model_type)
    images = load_images(path)
    for i in images:
        path = config['images_path']+i
        img = cv2.imread(path)
        img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        img = cv2.resize(img, (size, size))
        img = img.reshape(1, size, size, 3)
        pred = model.predict(img).reshape(2048,)
        image_id = i.split('.')[0]
        features[image_id] = pred
    return features


# Text Preprocessing

### Load Captions

In [None]:
def load_captions(filename):
    file = open(filename, 'r')
    doc = file.read()
    file.close()
    captions = dict()
    # Process lines by line
    _count = 0
    for line in doc.split('\n'):
        # Split line on white space
        tokens = line.split()
        if len(line) < 2:
            continue
        # Take the first token as the image id, the rest as the caption
        image_id, image_caption = tokens[0], tokens[1:]
        # Extract filename from image id
        image_id = image_id.split('.')[0]
        # Convert caption tokens back to caption string
        image_caption = ' '.join(image_caption)
        # Create the list if needed
        if image_id not in captions:
            captions[image_id] = list()
        # Store caption
        captions[image_id].append(image_caption)
        _count = _count+1
    print('Parsed captions: {}'.format(_count))
    return captions


### Clean Captions

In [None]:
def clean_captions(captions):
    # Prepare translation table for removing punctuation
    table = str.maketrans('', '', string.punctuation)
    for _, caption_list in captions.items():
        for i in range(len(caption_list)):
            caption = caption_list[i]
            # Tokenize i.e. split on white spaces
            caption = caption.split()
            # Convert to lowercase
            caption = [word.lower() for word in caption]
            # Remove punctuation from each token
            caption = [w.translate(table) for w in caption]
            # Remove hanging 's' and 'a'
            caption = [word for word in caption if len(word) > 1]
            # Remove tokens with numbers in them
            caption = [word for word in caption if word.isalpha()]
            # Store as string
            caption_list[i] = ' '.join(caption)


### Save Captions

In [None]:
def save_captions(captions, filename):
    lines = list()
    for key, captions_list in captions.items():
        for caption in captions_list:
            lines.append(key + ' ' + caption)
    data = '\n'.join(lines)
    file = open(filename, 'w')
    file.write(data)
    file.close()


### Preprocess Data

In [None]:
def preprocessData():
    print('Using {} model'.format(config['model_type'].title()))
    # Extract features from all images
    fName = 'features_'+config['model_type']+'.joblib'
    pText = ''
    if os.path.exists(config['model_data_path']+fName):
        pText = 'Image features already generated at '
        print(pText + config['model_data_path']+fName)
    else:
        pText = 'Generating image features using '
        print(pText+config['model_type']+' model...')
        images_features = extract_features(
            config['images_path'], config['model_type'])

        joblib.dump(images_features, config['model_data_path']+fName)
        pText = 'Completed & Saved features for {} images successfully'
        print(pText.format(len(images_features)))
    # Load file containing captions and parse them
    if os.path.exists(config['model_data_path']+'captions.txt'):
        pText = 'Parsed caption file already generated at '
        print(pText+config['model_data_path']+'captions.txt')
    else:
        print('Parsing captions file...')
        captions = load_captions(config['captions_path'])
        # Clean captions
        # Ignore this function because Tokenizer from keras will handle cleaning
        # clean_captions(captions)
        # Save captions
        save_captions(captions, config['model_data_path']+'captions.txt')
        print('Parsed & Saved successfully')


# Load Data

In [None]:
def load_set(filename):
    file = open(filename, 'r')
    doc = file.read()
    file.close()
    ids = list()
    # Process line by line
    for line in doc.split('\n'):
        # Skip empty lines
        if len(line) < 1:
            continue
        # Get the image identifier(id)
        _id = line.split('.')[0]
        ids.append(_id)
    return set(ids)


### Load Cleaned Captions

In [None]:
def load_cleaned_captions(filename, ids):
    file = open(filename, 'r')
    doc = file.read()
    file.close()
    captions = dict()
    _count = 0
    # Process line by line
    for line in doc.split('\n'):
        # Split line on white space
        tokens = line.split()
        # Split id from caption
        image_id, image_caption = tokens[0], tokens[1:]
        # Skip images not in the ids set
        if image_id in ids:
            # Create list
            if image_id not in captions:
                captions[image_id] = list()
            # Wrap caption in start & end tokens
            caption = 'startseq ' + ' '.join(image_caption) + ' endseq'
            # Store
            captions[image_id].append(caption)
            _count = _count+1
    return captions, _count


### Load Image Features

In [None]:
def load_image_features(filename, ids):
    # load all features
    all_features = joblib.load(filename)
    # filter features
    features = {_id: all_features[_id] for _id in ids}
    return features


In [None]:
def to_lines(captions):
    all_captions = list()
    for image_id in captions.keys():
        [all_captions.append(caption) for caption in captions[image_id]]
    return all_captions


In [None]:
from keras.preprocessing.text import Tokenizer


def create_tokenizer(captions):
    lines = to_lines(captions)
    tokenizer = Tokenizer()
    tokenizer.fit_on_texts(lines)
    return tokenizer


In [None]:
def calc_max_length(captions):
    lines = to_lines(captions)
    return max(len(line.split()) for line in lines)


In [None]:
def int_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None


### Create Sequences

In [None]:
def create_sequences(tokenizer, max_length, captions_list, image):
    # X1 : input for image features
    # X2 : input for text features
    # y  : output word
    X1, X2, y = list(), list(), list()
    vocab_size = len(tokenizer.word_index) + 1
    # Walk through each caption for the image
    for caption in captions_list:
        # Encode the sequence
        seq = tokenizer.texts_to_sequences([caption])[0]
        # Split one sequence into multiple X,y pairs
        for i in range(1, len(seq)):
            # Split into input and output pair
            in_seq, out_seq = seq[:i], seq[i]
            # Pad input sequence
            in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
            # Encode output sequence
            out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
            # Store
            X1.append(image)
            X2.append(in_seq)
            y.append(out_seq)
    return X1, X2, y


### Generator Function

In [None]:
def data_generator(images, captions, tokenizer, max_length, batch_size, random_seed):
    # Setting random seed for reproducibility of results
    random.seed(random_seed)
    # Image ids
    image_ids = list(captions.keys())
    _count = 0
    assert batch_size <= len(
        image_ids), 'Batch size must be less than or equal to {}'.format(len(image_ids))
    while True:
        if _count >= len(image_ids):
            # Generator exceeded or reached the end so restart it
            _count = 0
        # Batch list to store data
        input_img_batch, input_sequence_batch, output_word_batch = list(), list(), list()
        for i in range(_count, min(len(image_ids), _count+batch_size)):
            # Retrieve the image id
            image_id = image_ids[i]
            # Retrieve the image features
            image = images[image_id][0]
            # Retrieve the captions list
            captions_list = captions[image_id]
            # Shuffle captions list
            random.shuffle(captions_list)
            input_img, input_sequence, output_word = create_sequences(
                tokenizer, max_length, captions_list, image)
            # Add to batch
            for j in range(len(input_img)):
                input_img_batch.append(input_img[j])
                input_sequence_batch.append(input_sequence[j])
                output_word_batch.append(output_word[j])
        _count = _count + batch_size
        yield [[np.array(input_img_batch), np.array(input_sequence_batch)], np.array(output_word_batch)]


### Load Training Data

In [None]:
def loadTrainData():
    train_image_ids = load_set(config['train_data_path'])
    # Check if we already have preprocessed data saved and if not, preprocess the data.
    # Create and save 'captions.txt' & features.pkl
    preprocessData()
    # Load captions
    train_captions, _count = load_cleaned_captions(
        config['model_data_path']+'captions.txt', train_image_ids)
    # Load image features
    train_image_features = load_image_features(
        config['model_data_path']+'features_'+config['model_type']+'.joblib')
    print('Available images for training: '+len(train_image_features))
    print('Available captions for training: '+_count)
    if not os.path.exists(config['model_data_path']+'tokenizer.pkl'):
        # Prepare tokenizer
        tokenizer = create_tokenizer(train_captions)
        # Save the tokenizer
        joblib.dump(tokenizer, config['model_data_path']+'tokenizer.joblib')
    # Determine the maximum sequence length
    max_length = calc_max_length(train_captions)
    return train_image_features, train_captions, max_length


### Load Validation Data

In [None]:
def loadValData():
    val_image_ids = load_set(config['val_data_path'])
    # Load captions
    val_captions, _count = load_cleaned_captions(
        config['model_data_path']+'captions.txt', val_image_ids)
    # Load image features
    val_features = load_image_features(
        config['model_data_path']+'features_'+config['model_type']+'.joblib', val_image_ids)
    print('Available images for validation: '+len(val_features))
    print('Available captions for validation: '+_count)
    return val_features, val_captions


# LSTM Model for Training and Prediction

### Configure Model

In [None]:
def LSTM_Model(vocab_size, max_len, model_type):
    embedding_size = lstmConfig['embedding_size']
    dimension = 2048
    if model_type == 'vgg16':
        # VGG16 outputs a 4096 dimensional vector for each image, which we'll feed to RNN Model
        dimension = 4096
    image_model = Sequential()

    image_model.add(
        Dense(embedding_size, input_shape=(dimension,), activation='relu'))
    image_model.add(RepeatVector(max_len))

    image_model.summary()

    language_model = Sequential()

    language_model.add(Embedding(input_dim=vocab_size,
                                 output_dim=embedding_size, input_length=max_len))
    language_model.add(LSTM(256, return_sequences=True))
    language_model.add(TimeDistributed(Dense(embedding_size)))

    language_model.summary()

    conca = Concatenate()([image_model.output, language_model.output])
    x = LSTM(128, return_sequences=True)(conca)
    x = LSTM(512, return_sequences=False)(x)
    x = Dense(vocab_size)(x)
    out = Activation('softmax')(x)
    model = Model(inputs=[image_model.input,
                  language_model.input], outputs=out)

    # model.load_weights("mine_model_weights.h5")
    model.compile(loss='categorical_crossentropy',
                  optimizer='RMSprop', metrics=['accuracy'])
    return model


In [None]:
def generate_caption(model, tokenizer, image, max_length):
    # Seed the generation process
    in_text = 'startseq'
    # Iterate over the whole length of the sequence
    for _ in range(max_length):
        # Integer encode input sequence
        sequence = tokenizer.texts_to_sequences([in_text])[0]
        # Pad input
        sequence = pad_sequences([sequence], maxlen=max_length)
        # Predict next word
        # The model will output a prediction, which will be a probability distribution over all words in the vocabulary.
        yhat = model.predict([image, sequence], verbose=0)
        # The output vector representins a probability distribution where maximum probability is the predicted word position
        # Take output class with maximum probability and convert to integer
        yhat = np.argmax(yhat)
        # Map integer back to word
        word = int_to_word(yhat, tokenizer)
        # Stop if we cannot map the word
        if word is None:
            break
        # Append as input for generating the next word
        in_text += ' ' + word
        # Stop if we predict the end of the sequence
        if word == 'endseq':
            break
    return in_text


In [None]:
def evaluate_model(model, images, captions, tokenizer, max_length):
    actual, predicted = list(), list()
    for image_id, caption_list in tqdm(captions.items()):
        yhat = generate_caption(model, tokenizer, images[image_id], max_length)
        ground_truth = [caption.split() for caption in caption_list]
        actual.append(ground_truth)
        predicted.append(yhat.split())
    print('BLEU Scores :')
    print('A perfect match results in a score of 1.0, whereas a perfect mismatch results in a score of 0.0.')
    print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
    print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
    print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
    print('BLEU-4: %f' %
          corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))


In [None]:
# Setting random seed for reproducibility of results
random.seed(config['random_seed'])

X1train, X2train, max_length = loadTrainData()

X1val, X2val = loadValData()

tokenizer = joblib.load(config['tokenizer_path'])
vocab_size = len(tokenizer.word_index) + 1

# model = RNNModel(vocab_size, max_length, rnnConfig, config['model_type'])
model = LSTM_Model(vocab_size, max_length, config['model_type'])
print('LSTM_Model Summary : ')
print(model.summary())

"""
    *Train the model save after each epoch
"""
num_of_epochs = config['num_of_epochs']
batch_size = config['batch_size']
steps_train = len(X2train)//batch_size
if len(X2train) % batch_size != 0:
    steps_train = steps_train+1
steps_val = len(X2val)//batch_size
if len(X2val) % batch_size != 0:
    steps_val = steps_val+1
model_save_path = config['model_data_path']+"model_"+str(
    config['model_type'])+"_epoch-{epoch:02d}_train_loss-{loss:.4f}_val_loss-{val_loss:.4f}.hdf5"
checkpoint = ModelCheckpoint(
    model_save_path, monitor='val_loss', verbose=1, save_best_only=True, mode='min')
callbacks = [checkpoint]

print('steps_train: {}, steps_val: {}'.format(steps_train, steps_val))
print('Batch Size: {}'.format(batch_size))
print('Total Number of Epochs = {}'.format(num_of_epochs))

# Shuffle train data
ids_train = list(X2train.keys())
random.shuffle(ids_train)
X2train_shuffled = {_id: X2train[_id] for _id in ids_train}
X2train = X2train_shuffled

# Create the train data generator
# returns [[img_features, text_features], out_word]
generator_train = data_generator(
    X1train, X2train, tokenizer, max_length, batch_size, config['random_seed'])
# Create the validation data generator
# returns [[img_features, text_features], out_word]
generator_val = data_generator(
    X1val, X2val, tokenizer, max_length, batch_size, config['random_seed'])

# Fit for one epoch
history = model.fit_generator(generator_train,
                              epochs=num_of_epochs,
                              steps_per_epoch=steps_train,
                              validation_data=generator_val,
                              validation_steps=steps_val,
                              callbacks=callbacks,
                              verbose=1)

"""
	*Evaluate the model on validation data and ouput BLEU score
"""
print('Model trained successfully. Running model on validation set for calculating BLEU score ')
evaluate_model(model, X1val, X2val, tokenizer, max_length)


### Plot Model

In [None]:
plot_model(model, show_shapes=True)


### Plot Graph

In [None]:
# Get training and test loss histories
training_loss = history.history['loss']
test_loss = history.history['val_loss']

# Create count of the number of epochs
epoch_count = range(1, len(training_loss) + 1)

# Visualize loss history
plt.plot(epoch_count, training_loss, 'r--')
plt.plot(epoch_count, test_loss, 'b-')
plt.legend(['Training Loss', 'Test Loss'])
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.show()
