In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [12]:
from numpy import array
from pickle import load
import keras
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.utils import plot_model
from keras.models import Model
from keras.layers import Input
from keras.layers import Dense
from keras.layers import LSTM
from keras.layers import Embedding
from keras.layers import Dropout
from keras.layers.merge import add
from keras.callbacks import ModelCheckpoint

## File Loading Helper Functions
Helps load previously processed image features and cleaned image descriptions.

In [4]:
# load doc into memory
def load_doc(filename):
  # open the file as read only
  file = open(filename, 'r')
  # read all text
  text = file.read()
  # close the file
  file.close()
  return text

# load list of unique photo ids, derived from image file names
def load_set(filename):
  doc = load_doc(filename)
  dataset = list()
  # process line by line
  for line in doc.split('\n'):
    # skip empty lines
    if len(line) < 1:
      continue
    # get the image identifier
    identifier = line.split('.')[0]
    dataset.append(identifier)
  return set(dataset)

# load clean descriptions into memory
def load_clean_descriptions(filename, dataset):
    # load document
    doc = load_doc(filename)
    descriptions = dict()
    for line in doc.split('\n'):
      # split line by white space
      tokens = line.split()
      # split id from description
      image_id, image_desc = tokens[0], tokens[1:]
      # skip images not in the set
      if image_id in dataset:
        # create list
        if image_id not in descriptions:
          descriptions[image_id] = list()
        # wrap description in tokens
        desc = 'startseq ' + ' '.join(image_desc) + ' endseq'
        # store
        descriptions[image_id].append(desc)
    return descriptions

# load photo features
def load_photo_features(filename, dataset):
	# load all features
	all_features = load(open(filename, 'rb'))
	# filter features
	features = {k: all_features[k] for k in dataset}
	return features

## Tokenizer and Tokenizer helper functions
Tokenizer encodes English captions to vectors,
and transforms those vectors into uniform-length sequences. Tokenizer is fitted upon the training descriptions text. 

In [5]:
# Build tokenizer
# Note: add limit to vocabulay?  ~9500 to 5000

# convert a dictionary of clean descriptions (image_id: list of descriptions) to a general list of all descriptions
def to_lines(descriptions):
	all_desc = list()
	for key in descriptions.keys():
		[all_desc.append(d) for d in descriptions[key]]
	return all_desc
 
#vocab_limit = 5000
# fit a tokenizer given caption descriptions
def create_tokenizer(descriptions):
	lines = to_lines(descriptions)
	#tokenizer = Tokenizer(num_words=vocab_limit)
	tokenizer = Tokenizer()
	tokenizer.fit_on_texts(lines)
	return tokenizer

# Helper function; calculate the length of the description with the most words
def calc_max_length(description):
  lines = to_lines(description)
  return max(len(d.split()) for d in lines)

## Creating sequences

Takes the tokenizer, the maximum length of the descriptions, dictionary of all descriptions, and dictionary of photo features and transforms data into input/output pairs of data for training the model.

In [6]:
# create sequences of images, input sequences, and output words for an image
def create_sequences(tokenizer, max_length, descriptions, photos_features, vocab_size):
  X1, X2, y = list(), list(), list()
  # walk through each image id
  for key, desc_list in descriptions.items():
    # walk through each description for the image
    for desc in desc_list:
      # encode the sequence
      seq = tokenizer.texts_to_sequences([desc])[0]
      # split one sequence into multiple X,y pairs
      for i in range(1, len(seq)):
        # split into input and output pair - words up to i, and i (where i is the next word)
        in_seq, out_seq, = seq[:i], seq[i]
        # pad input sequence 
        in_seq = pad_sequences([in_seq], maxlen = max_length)[0]
        # encode output sequence
        out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
        # store
        X1.append(photos_features[key][0])
        X2.append(in_seq)
        y.append(out_seq)
  return array(X1), array(X2), array(y)

## Transformer
Implements attention for the text descriptons.

In [7]:
# https://keras.io/examples/nlp/text_classification_with_transformer/

import tensorflow as tf
from keras import layers

# Implement multi head self attention as a Keras layer
class MultiHeadSelfAttention(layers.Layer):
    def __init__(self, embed_dim, num_heads=8):
        super(MultiHeadSelfAttention, self).__init__()
        self.embed_dim = embed_dim
        self.num_heads = num_heads
        if embed_dim % num_heads != 0:
            raise ValueError(
                f"embedding dimension = {embed_dim} should be divisible by number of heads = {num_heads}"
            )
        self.projection_dim = embed_dim // num_heads
        self.query_dense = layers.Dense(embed_dim)
        self.key_dense = layers.Dense(embed_dim)
        self.value_dense = layers.Dense(embed_dim)
        self.combine_heads = layers.Dense(embed_dim)

    def attention(self, query, key, value):
        score = tf.matmul(query, key, transpose_b=True)
        dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
        scaled_score = score / tf.math.sqrt(dim_key)
        weights = tf.nn.softmax(scaled_score, axis=-1)
        output = tf.matmul(weights, value)
        return output, weights

    def separate_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, inputs):
        # x.shape = [batch_size, seq_len, embedding_dim]
        batch_size = tf.shape(inputs)[0]
        query = self.query_dense(inputs)  # (batch_size, seq_len, embed_dim)
        key = self.key_dense(inputs)  # (batch_size, seq_len, embed_dim)
        value = self.value_dense(inputs)  # (batch_size, seq_len, embed_dim)
        query = self.separate_heads(
            query, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        key = self.separate_heads(
            key, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        value = self.separate_heads(
            value, batch_size
        )  # (batch_size, num_heads, seq_len, projection_dim)
        attention, weights = self.attention(query, key, value)
        attention = tf.transpose(
            attention, perm=[0, 2, 1, 3]
        )  # (batch_size, seq_len, num_heads, projection_dim)
        concat_attention = tf.reshape(
            attention, (batch_size, -1, self.embed_dim)
        )  # (batch_size, seq_len, embed_dim)
        output = self.combine_heads(
            concat_attention
        )  # (batch_size, seq_len, embed_dim)
        return output
    
    def get_config(self):
        config = super(MultiHeadSelfAttention, self).get_config()
        config.update({"embed_dim": self.embed_dim,
                "num_heads": self.num_heads,
                "projection_dim": self.projection_dim,
                "query_dense": self.query_dense,
                "key_dense": self.key_dense,
                "value_dense": self.value_dense,
                "combine_heads": self.combine_heads})
        return config   


# Implement a Transformer block as a layer
class TransformerBlock(layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerBlock, self).__init__()
        self.att = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ffn = keras.Sequential(
            [layers.Dense(ff_dim, activation="relu"), layers.Dense(embed_dim),]
        )
        self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = layers.Dropout(rate)
        self.dropout2 = layers.Dropout(rate)

    def call(self, inputs, training):
        attn_output = self.att(inputs)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs + attn_output)
        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        return self.layernorm2(out1 + ffn_output)
    
    def get_config(self):
        config = super(TransformerBlock, self).get_config()
        config.update({"att": self.att,
                "ffn": self.ffn,
                "layernorm1": self.layernorm1,
                "layernorm2": self.layernorm2,
                "dropout1": self.dropout1,
                "dropout2": self.dropout2
                })
        return config
        #config = super(TransformerBlock)
        #return cfg   

# Implement embedding layer (Do we need this? Are we to keep current embedding layer for model? - I think DELETE)
# Two seperate embedding layers: one for tokens, one for token index (position)
class TokenAndPositionEmbedding(layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = layers.Embedding(input_dim=vocab_size, output_dim=embed_dim)
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

    # https://stackoverflow.com/questions/58678836/notimplementederror-layers-with-arguments-in-init-must-override-get-conf
    def get_config(self):
        config = super(TokenAndPositionEmbedding, self).get_config()
        config.update({"token_emb": self.token_emb,
                "pos_emb": self.pos_emb})
        return config



## Bulding model with transformer for text

In [24]:
# Defining the captioning model
# Two inputs: photo (Photo Feature Extractor), and word sequences (Sequence Processor)
# Regularlization: 50% dropout to avoid overfitting
# Output of model: next word in sequence
# NOTE: InceptionV3 -> 2048 feature vector

from keras.layers import GlobalAveragePooling1D

def define_model(vocab_size, max_length):
  # feature extractor model. Input: photo features vector of 2048 elements
  inputs1 = Input(shape=(2048,))
  fe1 = Dropout(0.5)(inputs1)
  # Use convolution to reduce features vector from 2048 to 256
  fe2 = Dense(256, activation='relu')(fe1)
  # sequence model
  inputs2 = Input(shape=(max_length,))
  # uses mask to ignore padded values
  se1 = TokenAndPositionEmbedding(max_length, vocab_size, 256)(inputs2) 
  se2 = TransformerBlock(256, 2, 256)(se1) # where TransformerBlock(embed_dim, num_heads, ff_dim)
  se3 = GlobalAveragePooling1D()(se2) 
  se4 = Dropout(0.5)(se3)
  se5 = Dense(256, activation='relu')(se4)
  se6 = Dropout(0.25)(se5)
  # decoder model
  decoder1 = add([fe2, se6])
  decoder2 = Dense(256, activation='relu')(decoder1)
  outputs = Dense(vocab_size, activation='softmax')(decoder2)
  # tie it together [image, seq] -> [word]
  model = Model(inputs=[inputs1, inputs2], outputs = outputs)
  optimizer = keras.optimizers.Adam(lr=0.01)
  model.compile(loss='categorical_crossentropy', optimizer=optimizer)
  # save model layout 
  print(model.summary())
  plot_model(model, to_file='/content/drive/MyDrive/ImageCaptioningProject/NewModels/transformer_model.png', show_shapes=True)
  return model

## Load training data 
Creates input/output pairs for the training data <br>
input: image features, text descriptions <br>
output: next word

In [9]:
# load training dataset
filename = '/content/drive/My Drive/ImageCaptioningProject/Flickr8k_text/Flickr_8k.trainImages.txt'
train = load_set(filename)
# descriptions
train_descriptions = load_clean_descriptions('/content/drive/My Drive/ImageCaptioningProject/descriptions.txt', train)
# photo features
train_features = load_photo_features('/content/drive/My Drive/ImageCaptioningProject/features.pkl', train)
# prepare tokenizer
tokenizer = create_tokenizer(train_descriptions)

# Embedding layer expects input_dim to be vocab size + 1
vocab_size = len(tokenizer.word_index) + 1
# determine the maximum sequence length
max_length = calc_max_length(train_descriptions)
# prepare sequences
X1train, X2train, ytrain = create_sequences(tokenizer, max_length, train_descriptions, train_features, vocab_size)


## Load validation data
Creates input/output pairs for the validation data

In [10]:
# load validation set
filename = '/content/drive/My Drive/ImageCaptioningProject/Flickr8k_text/Flickr_8k.devImages.txt'
validation = load_set(filename)
# descriptions
validation_descriptions = load_clean_descriptions('/content/drive/My Drive/ImageCaptioningProject/descriptions.txt', validation)
# photo features
validation_features = load_photo_features('/content/drive/My Drive/ImageCaptioningProject/features.pkl', validation)

# prepare sequences
X1val, X2val, yval = create_sequences(tokenizer, max_length, validation_descriptions, validation_features, vocab_size)

## Train model
Models with improved loss are saved each epoch

In [None]:
# Fit model

# define the model
model = define_model(vocab_size, max_length)

# Adding checkpoint - save the model when it improves, 
# and then use the model with the best skill as the final model.
# https://www.tensorflow.org/tutorials/keras/save_and_load
# SavedModel_format rather than .h5, since saving a custom model

# Define checkpoint callback
filepath = '/content/drive/MyDrive/ImageCaptioningProject/NewModels/TransformerModelsWithLR/model-ep{epoch:03d}-loss{loss:.3f}-val_loss{val_loss:.3f}'
checkpoint = ModelCheckpoint(filepath, monitor='val_loss', verbose=1, save_best_only=True, mode='min')

# fit model
model.fit([X1train, X2train], ytrain, epochs=10, verbose=1, callbacks=[checkpoint], validation_data=([X1val, X2val], yval))

Model: "functional_11"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_11 (InputLayer)           [(None, 34)]         0                                            
__________________________________________________________________________________________________
token_and_position_embedding_3  (None, 34, 256)      1948928     input_11[0][0]                   
__________________________________________________________________________________________________
transformer_block_3 (Transforme (None, 34, 256)      395776      token_and_position_embedding_3[0]
__________________________________________________________________________________________________
global_average_pooling1d_2 (Glo (None, 256)          0           transformer_block_3[0][0]        
______________________________________________________________________________________

## Evaluate model

In [16]:
from nltk.translate.bleu_score import corpus_bleu

# map an integer to word
def word_for_id(integer, tokenizer):
  for word, index in tokenizer.word_index.items():
    if index == integer:
      return word
  return none

# generate a description for an image
def generate_desc(model, tokenizer, photo, max_length):
  # seed generation process with start flag
  in_text = 'startseq'
  # iterate over the whole length of the sequence
  for i in range(max_length):
    # integer encode input sequence 
    sequence = tokenizer.texts_to_sequences([in_text])[0]
    # pad input
    sequence = pad_sequences([sequence], maxlen=max_length)
    # predict next word
    yhat = model.predict([photo, sequence], verbose=0)
    # convert probability to an integer
    yhat = argmax(yhat)
    # map integer to word
    word = word_for_id(yhat, tokenizer)
    # stop if we cannot map the word
    if word is None:
      break
    # append as input for generating the next word
    in_text += ' ' + word
    # stop if we predict the end of the sequence 
    if word == 'endseq':
      break
  return in_text

# evaluate the skill of the model
def evaluate_model(model, descriptions, photos, tokenizer, max_length):
  actual, predicted = list(), list()
  # step over the whole set
  for key, desc_list in descriptions.items():
    # generate descriptions
    yhat = generate_desc(model, tokenizer, photos[key], max_length)
    # store actual and predicted
    references = [d.split() for d in desc_list]
    actual.append(references)
    predicted.append(yhat.split())
  # calculate BLEU scores
  print('BLEU-1: %f' % corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0)))
  print('BLEU-2: %f' % corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0)))
  print('BLEU-3: %f' % corpus_bleu(actual, predicted, weights=(0.3, 0.3, 0.3, 0)))
  print('BLEU-4: %f' % corpus_bleu(actual, predicted, weights=(0.25, 0.25, 0.25, 0.25)))

In [None]:
from keras.models import load_model
from numpy import argmax

# load tokenizer
tokenizer = load(open('/content/drive/My Drive/ImageCaptioningProject/develop_tokenizer.pkl', 'rb'))
# previously defined
max_length = 34

# load test set, unseen by the model
filename = '/content/drive/My Drive/ImageCaptioningProject/Flickr8k_text/Flickr_8k.testImages.txt'
test = load_set(filename)
# descriptions
test_descriptions = load_clean_descriptions('/content/drive/My Drive/ImageCaptioningProject/descriptions.txt', test)
# photo features
test_features = load_photo_features('/content/drive/My Drive/ImageCaptioningProject/features.pkl', test)

# load the model
filename = '/content/drive/MyDrive/ImageCaptioningProject/NewModels/TransformerModels/latestModel/'
saved_model = load_model(filename)

# evaluate model
evaluate_model(saved_model, test_descriptions, test_features, tokenizer, max_length)

In [19]:
from tensorflow.keras.applications import InceptionV3
from keras.preprocessing.image import load_img
from keras.preprocessing.image import img_to_array
from keras.applications.inception_v3 import preprocess_input

# extract features of new uncaptioned photos
def extract_features(filename):
  # load model
  features_model = InceptionV3()
  # omitting unneccessary classification layer
  features_model = Model(inputs=features_model.inputs, outputs=features_model.layers[-2].output)
  # load photo
  image = load_img(filename, target_size=(299, 299))
  # convert image pixels to numpy array
  image = img_to_array(image)
  # reshape image array for model input
  image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2])) 
  # prepare image for CNN. Normalizes image array to range [-1, 1], matching format of images used to train InceptionV3
  image = preprocess_input(image)
  # get image features
  feature = features_model.predict(image, verbose=0)
  return feature

In [None]:
from keras.models import load_model
from numpy import argmax
saved_model = load_model('/content/drive/MyDrive/ImageCaptioningProject/NewModels/TransformerModelsWithLR/model-ep002-loss5.782-val_loss5.686')

photo = extract_features('/content/drive/MyDrive/ImageCaptioningProject/uncaptioned_images/example3.jpg')
description = generate_desc(saved_model, tokenizer, photo, max_length)
print(description)