# Model - Image Captioning

This notebook outlines a step to develop image captioning model.
This will be done in a few step.  

1. Extract features of images from NASNet   
2. Train LSTM model using image + caption data 
3. Test on new images features

In [1]:
import numpy as np

import matplotlib.pyplot as plt
import matplotlib.image as mpimg

plt.style.use('fivethirtyeight')
%matplotlib inline

from tensorflow.keras.applications import NASNetLarge 
from tensorflow.keras.applications.nasnet import preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.models import Model, model_from_json, load_model
from tensorflow.keras.layers import Input, Dropout, Dense, Embedding, LSTM, add
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from sklearn.model_selection import train_test_split

import pickle
import os
from timeit import default_timer as timer

import string
import re

In [2]:
import gc

def clean_up(vars_):
    # remove unnecessary variables to avoid clogging
    for var in vars_:
        del var 
    print(gc.collect())

In [3]:
flicker_img_dir = 'IMAGES/Flicker/Flicker8k_Dataset'
flicker_text_dir = 'IMAGES/Flicker/labels'

## Feature Extraction
---

First, we will extract features of all available images using NASNet.

In [4]:
def feature_extractor(dir_, network):
    ''' 
    iterate through files in dir_ 
    and get features running on network
    return a dictionary with image id as a key
    '''
    model = network()
    model = Model(inputs = model.inputs, outputs = model.layers[-2].output)
    fnames = [x for x in os.listdir(dir_) if x.endswith('.jpg')]
    result = {}
    i = 1
    n = len(fnames)
    
    for fn in fnames:
        img = load_img(f'{dir_}/{fn}', target_size = (331, 331))
        img = np.expand_dims(img, 0)
        img = preprocess_input(img)
        feature = model.predict(img)
        ind = fn.split('.')[0]
        result[ind] = feature
        print(f'{i}/{n} feature extraction completed')
        i += 1
    return result

In [None]:
# run
features = feature_extractor(flicker_img_dir, NASNetLarge)

In [6]:
# Saving
#with open('PKL/features.pkl', 'wb') as fp:
#    pickle.dump(features, fp, pickle.HIGHEST_PROTOCOL)

In [5]:
# loading
with open('PKL/features.pkl', 'rb') as fp:
    features = pickle.load(fp)

## Dictionary of Descriptions
---
Next, we will create a reference dictionary with the image_id and associated descriptions after descriptions are being preprocessed for tokenization.

In [13]:
# read the description file
with open(f'{flicker_text_dir}/Flickr8k.token.txt', 'r') as fn:
    text = fn.readlines()

In [14]:
# extract only image id and description
pattern = '([0-9a-z_]*)\.jpg.*\\t(.*)\\n'
p = re.compile(pattern)
descriptions_pairs = [p.findall(x)[0] for x in text]

In [5]:
def description_preprocessing(list_of_tuples, n = None):
    '''
    given description pairs, return a dictionary of descriptions
    if n is specified, only select the n or less descriptions
    per image
    '''
    descriptions = {}

    table_ = str.maketrans('', '', string.punctuation+string.digits)

    for ind, text in list_of_tuples:
        text = text.lower()
        text = str.translate(text, table_)
        text = [x for x in text.split() if len(x) > 1] # remove trailing alphabet
        text = 'seqini ' + ' '.join(text) + ' seqfin' # add initial and ending tokens
        if ind in descriptions:
            descriptions[ind].append(text)
        else:
            descriptions[ind] = [text]
    if n:
        # if n is assigned cap the number of description for each image to be n
        for k, v in descriptions.items():
            if len(v) > n:
                descriptions[k] = list(np.random.choice(v, n, replace = False))
    return descriptions

In [17]:
descriptions = description_preprocessing(descriptions_pairs)

In [None]:
# saving
#with open('PKL/descriptions.pkl', 'wb') as fp:
#    pickle.dump(descriptions, fp)

In [7]:
# loading
#with open('PKL/descriptions.pkl', 'rb') as fp:
#    descriptions = pickle.load(fp)

## Cross-Validation
---
Now I'll split the list of images into train/val/test sets (70:15:15)

In [18]:
train_list, test_list = train_test_split(list(descriptions.keys()), test_size = 0.3, random_state = 22)
val_list, test_list = train_test_split(test_list, test_size = 0.5, random_state = 22)

## Generating Inputs
---
Now we need to create input and output series for LSTM network.  
We need two separate inputs: image features, description as sequences. 
The output is the next word in the sequence.  

In [6]:
def get_keys(dict_):
    ''' 
    Helper to return a list of keys 
    given a dictionary
    '''
    return list(dict_.keys())

def get_vals(dict_):
    ''' 
    Helper to return a list of values 
    given a dictionary
    '''
    return list(dict_.values())
    
def get_features(features_dict, img_ind):
    ''' 
    Helper to return feature values 
    given a feature dictionary and image ids
    '''
    if isinstance(img_ind, list):
        return [features_dict[x][0] for x in img_ind]
    elif isinstance(img_ind, str):
        return features_dict[img_ind][0]
    else:
        print('img_ind must be a list or string type')
        return None

Now I will create a sequence generator. This class object will keep information on feature dictionary, description dictionary, and tokenizer for select list, and generate train/test inputs and outputs.

In [7]:
class sequence_generator:
    def __init__(self, dictionary, features):
        ''' INPUT: a dictionary of descriptions and features '''
        self.dictionary = dictionary
        self.features = features
        self.img_index = get_keys(self.dictionary)
        self.texts = get_vals(self.dictionary)

    def update_selection(self, list_):
        ''' 
        INPUT: select list of image indices
        Create selector, and subsets (select_dictionary, select_img_inds, select_texts)
        '''
        self.selector = list_
        self.select_dictionary = {k: v for k, v in self.dictionary.items() if (k in list_) & (k in self.features)}
        self.select_img_inds = get_keys(self.select_dictionary)
        self.select_texts = get_vals(self.select_dictionary)
    
    def sequence_process(self, dict_):
        ''' Helper to process breakdown on all select dictionary '''
        X1, X2, Y = [], [], []

        def breakdown_sequence(list_):
            ''' Helper to return a list of breakdown sequences and the output '''
            x, y = [], []
            for i in range(1, len(list_)):
                x.append(list_[:i])
                y.append(list_[i])
            return x, y
        
        for ind, texts in dict_.items():
            sequences = self.tokenizer.texts_to_sequences(texts)
            
            for seq in sequences:
                x, y = breakdown_sequence(seq)

                X1.extend(np.repeat(ind, len(y)))
                X2.extend(x)
                Y.extend(y)

        return X1, X2, Y

    def train_generator(self, train_list):
        '''
        INPUT a list of training ids, 
        RETURN image inputs, text inputs, and outputs
        ASSIGN max_length and vocab size
        '''
        self.update_selection(train_list)

        self.tokenizer = Tokenizer()
        self.tokenizer.fit_on_texts(np.concatenate(self.select_texts))
        self.num_vocab = len(self.tokenizer.word_index)+1
        
        dict_ = self.select_dictionary
        

        X1, X2, Y = self.sequence_process(dict_)
        
        X2 = pad_sequences(X2)
        self.max_length = X2.shape[1]
    
        Y = to_categorical(Y, self.num_vocab)
        X1 = get_features(self.features, X1)
        return np.array(X1), np.array(X2), np.array(Y)

    def validation_generator(self, val_list):
        '''
        INPUT a list of validation ids, 
        RETURN image inputs, text inputs and outputs
        '''
        self.update_selection(val_list)
        
        dict_ = self.select_dictionary

        X1, X2, Y = self.sequence_process(dict_)
        X2 = pad_sequences(X2, maxlen = self.max_length)
        Y = to_categorical(Y, num_classes = self.num_vocab)
        X1 = get_features(self.features, X1)

        return np.array(X1), np.array(X2), np.array(Y)
    
    def get_num_vocab(self):
        ''' 
        return the number of unique vocabularies 
        in the training set
        '''
        return self.num_vocab
    def get_max_length(self):
        '''
        return the maximum length of vocabularies 
        in the training set
        '''
        return self.max_length
    def get_tokenizer(self):
        '''
        return the Keras Tokenizer object fitted on 
        training set
        '''
        return self.tokenizer

## Network Architecture
---
Now we will develop LSTM architecture.

In [29]:
# initialize processor with descriptions and features references
processor = sequence_generator(descriptions, features)

In [30]:
# get inputs and output
train_X1, train_X2, train_Y = processor.train_generator(train_list)
val_X1, val_X2, val_Y = processor.validation_generator(val_list)

In [31]:
# get max length and number of vocabularies
max_length = processor.get_max_length()
num_vocab = processor.get_num_vocab()

In [None]:
#first path (Features)
in1 = Input(shape = (4032,))
img_layer1 = Dropout(0.5)(in1)
img_layer2 = Dense(64, activation = 'relu')(img_layer1)

# second path (LSTM)
in2 = Input(shape=(max_length,))
text_layer1 = Embedding(num_vocab, 64, mask_zero = True)(in2)
text_layer2 = Dropout(0.5)(text_layer1)
text_layer3 = LSTM(64)(text_layer2)

# output
output_layer1 = add([img_layer2, text_layer3])
output_layer2 = Dense(64, activation = 'relu')(output_layer1)
output = Dense(num_vocab, activation = 'softmax')(output_layer2)

# compile model
model = Model(inputs = [in1, in2], outputs = output)
model.compile(loss = 'categorical_crossentropy', 
             optimizer = 'adam')

# Overfitting control
cp = EarlyStopping(patience = 3, restore_best_weights= True)

# training
start = timer()

history = model.fit([train_X1, train_X2], train_Y, 
                    epochs=10, 
                    validation_data = ([val_X1, val_X2], val_Y),
                    workers = 7, 
                    callbacks = [cp], 
                    verbose = 1
                   )
end = timer()
elapsed = end - start
print('Total Time Elapsed: ', int(elapsed//60), ' minutes ', (round(elapsed%60)), ' seconds')

In [None]:
# reviewing structure
plot_model(model, to_file='PNG/basemodel_arch.png')

In [None]:
# saving model
# model.save('MODEL/base_model')

In [None]:
# loading model
# model = load_model('MODEL/base_model')

In [None]:
def plot_performance(hist):
    ''' function to plot training and validation loss '''
    hist_ = hist.history
    epochs = hist.epoch
    
    plt.figure()
    plt.plot(epochs, hist_['loss'], label='Training loss')
    plt.plot(epochs, hist_['val_loss'], label='Validation loss')
    plt.title('Training and validation loss')
    plt.legend()
    
    plt.show()

In [None]:
plot_performance(history)

In [1]:
#!export PATH=/Library/TeX/texbin:$PATH