In [None]:
import pickle
import numpy as np
import matplotlib.pyplot as plt
import zipfile
from shutil import copyfile
from IPython.display import clear_output
from PIL import Image
from nltk.translate.bleu_score import sentence_bleu
from textwrap import wrap
from contextlib import redirect_stdout

from google.colab import drive

import tensorflow as tf
from keras import regularizers
from keras.layers import TextVectorization, Input, Dropout, Dense, Embedding, LSTM, add
from keras.utils import to_categorical, pad_sequences, Sequence, plot_model, load_img, img_to_array
from keras.models import Model, load_model
from keras.callbacks import Callback
from keras.losses import CategoricalCrossentropy
from keras.optimizers import Adam
from keras.initializers import Constant
from keras.preprocessing.image import ImageDataGenerator

In [None]:
# mount Google Drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [None]:
GDRIVE_WORKING_PATH = "/content/gdrive/MyDrive/image_cap"

# Data Preparation


## Read pre-saved files

In [None]:
with open(f"{GDRIVE_WORKING_PATH}/list_id_train.pkl", "rb") as f:
  train_ids = pickle.load(f)
with open(f"{GDRIVE_WORKING_PATH}/list_id_test.pkl", "rb") as f:
  test_ids = pickle.load(f)
with open(f"{GDRIVE_WORKING_PATH}/list_id_val.pkl", "rb") as f:
  val_ids = pickle.load(f)

In [None]:
with open(f"{GDRIVE_WORKING_PATH}/mapping_id_caption.pkl", "rb") as f:
  mapping_id_caption = pickle.load(f)

### Read Vectorizers

In [None]:
def get_vectorizer():
  from_disk = pickle.load(open(f"{GDRIVE_WORKING_PATH}/vectorizer.pkl", "rb"))
  new_v = TextVectorization.from_config(from_disk.get("config"))
  new_v.set_weights(from_disk.get("weights"))
  return new_v

vectorizer = get_vectorizer()
VOCAB_SIZE = len(vectorizer.get_vocabulary())
VOCAB_SIZE

4159

In [None]:
def get_vectorizer_aug():
  from_disk = pickle.load(open(f"{GDRIVE_WORKING_PATH}/vectorizer_augmented.pkl", "rb"))
  new_v = TextVectorization.from_config(from_disk.get("config"))
  new_v.set_weights(from_disk.get("weights"))
  return new_v

vectorizer_aug = get_vectorizer_aug()
VOCAB_SIZE_AUG = len(vectorizer_aug.get_vocabulary())
VOCAB_SIZE_AUG

7557

### Read Embedding Layers

In [None]:
def get_embedding_layer():

  with open(f"{GDRIVE_WORKING_PATH}/embedding_matrix.pkl", "rb") as f:
    embedding_matrix = pickle.load(f)

  with open(f"{GDRIVE_WORKING_PATH}/embedding_layer.pkl", "rb") as f:
    embedding_layer = pickle.load(f)

  return embedding_layer

In [None]:
def get_embedding_layer_aug():

  with open(f"{GDRIVE_WORKING_PATH}/embedding_matrix_augmented.pkl", "rb") as f:
    embedding_matrix = pickle.load(f)

  with open(f"{GDRIVE_WORKING_PATH}/embedding_layer_augmented.pkl", "rb") as f:
    embedding_layer = pickle.load(f)

  return embedding_layer

In [None]:
def get_vectorized_caption():
  with open(f"{GDRIVE_WORKING_PATH}/vectorized_captions_train.pkl", "rb") as f:
    enc_train = pickle.load(f)

  with open(f"{GDRIVE_WORKING_PATH}/vectorized_captions_test.pkl", "rb") as f:
    enc_test = pickle.load(f)

  with open(f"{GDRIVE_WORKING_PATH}/vectorized_captions_val.pkl", "rb") as f:
    enc_val = pickle.load(f)

  return enc_train, enc_test, enc_val

In [None]:
def get_vectorized_caption_train_aug():
  with open(f"{GDRIVE_WORKING_PATH}/vectorized_aug_captions_train.pkl", "rb") as f:
    enc_train = np.array(pickle.load(f))

  with open(f"{GDRIVE_WORKING_PATH}/vectorized_aug_captions_val.pkl", "rb") as f:
    enc_val = np.array(pickle.load(f))

  return enc_train, enc_val

### Read IMG Features

In [None]:
def get_features_vgg():
  with open(f"{GDRIVE_WORKING_PATH}/dict_features_train_vgg16.pkl", "rb") as f:
    img_features_train = pickle.load(f)
  img_train = np.squeeze(np.array(list(img_features_train.values())))

  with open(f"{GDRIVE_WORKING_PATH}/dict_features_test_vgg16.pkl", "rb") as f:
    img_features_test = pickle.load(f)
  img_test = np.squeeze(np.array(list(img_features_test.values())))

  with open(f"{GDRIVE_WORKING_PATH}/dict_features_val_vgg16.pkl", "rb") as f:
    img_features_val = pickle.load(f)
  img_val = np.squeeze(np.array(list(img_features_val.values())))

  return img_train, img_val, img_test

In [None]:
def get_features_vgg_places():

  with open(f"{GDRIVE_WORKING_PATH}/dict_features_train_vgg16_places.pkl", "rb") as f:
    img_features_train = pickle.load(f)
  img_train = np.squeeze(np.array(list(img_features_train.values())))

  with open(f"{GDRIVE_WORKING_PATH}/dict_features_test_vgg16_places.pkl", "rb") as f:
    img_features_test = pickle.load(f)
  img_test = np.squeeze(np.array(list(img_features_test.values())))

  with open(f"{GDRIVE_WORKING_PATH}/dict_features_val_vgg16_places.pkl", "rb") as f:
    img_features_val = pickle.load(f)
  img_val = np.squeeze(np.array(list(img_features_val.values())))

  return img_train, img_val, img_test

## Data Generator

### For one single caption per image

In [None]:
class DataGen(Sequence):

  def __init__(self, img_features, cap_encoded, batch_size, vocab_size, max_length):

    self.img = img_features
    self.cap = cap_encoded
    self.batch_size = batch_size
    self.vocab_size = vocab_size
    self.max_length = max_length
    self.n = len(self.img)

  def __len__(self):
    return self.n // self.batch_size

  def __getitem__(self, idx):
    low = idx * self.batch_size
    high = min(low + self.batch_size, self.n)
    batch_x_img = self.img[low:high]
    batch_x_cap = self.cap[low:high]

    X1, X2, y = self.__get_data(batch_x_img, batch_x_cap)
    return [X1, X2], y

  def __get_data(self, batch_img, batch_cap):
    X1, X2, y = list(), list(), list()

    for img, cap in zip(batch_img, batch_cap):
      feature = img
      seq = cap

      for i in range(1, len(seq)):
        in_seq, out_seq = seq[:i], seq[i]
        in_seq = pad_sequences([in_seq], maxlen=self.max_length, padding='post')[0]
        out_seq = to_categorical([out_seq], num_classes=self.vocab_size)[0]
        X1.append(feature)
        X2.append(in_seq)
        y.append(out_seq)

    X1, X2, y = np.array(X1), np.array(X2), np.array(y)

    return X1, X2, y

### For list of captions per image

In [None]:
class DataGenAugmented(Sequence):

  def __init__(self, img_features, cap_encoded, batch_size, vocab_size, max_length):

    self.img = img_features
    self.cap = cap_encoded
    self.batch_size = batch_size
    self.vocab_size = vocab_size
    self.max_length = max_length
    self.n = len(self.img)

  def __len__(self):
    return self.n // self.batch_size

  def __getitem__(self, idx):
    low = idx * self.batch_size
    high = min(low + self.batch_size, self.n)
    batch_x_img = self.img[low:high]
    batch_x_cap = self.cap[low:high]

    X1, X2, y = self.__get_data(batch_x_img, batch_x_cap)
    return [X1, X2], y

  def __get_data(self, batch_img, batch_cap):
    X1, X2, y = list(), list(), list()

    for img, list_cap in zip(batch_img, batch_cap):
      feature = img

      for cap in list_cap:
        seq = cap

        for i in range(1, len(seq)):
          in_seq, out_seq = seq[:i], seq[i]
          in_seq = pad_sequences([in_seq], maxlen=self.max_length, padding='post')[0]
          out_seq = to_categorical([out_seq], num_classes=self.vocab_size)[0]
          X1.append(feature)
          X2.append(in_seq)
          y.append(out_seq)

    X1, X2, y = np.array(X1), np.array(X2), np.array(y)

    return X1, X2, y

# Model Definition

## Model Basic: VGG16 + LSTM

In [None]:
def get_model_basic(voc_size, cap_length, emb_layer, folder_name):
  # image feature layers
  inputs1 = Input(shape=(4096,))
  d2 = Dense(256, activation='relu')(inputs1)

  # sequence feature layers
  inputs2 = Input(shape=(cap_length,))
  emb = emb_layer(inputs2)
  lstm = LSTM(256)(emb)

  # merge
  merge = add([d2, lstm])
  d3 = Dense(256, activation="relu")(merge)

  # softmax layer
  outputs = Dense(voc_size, activation="softmax")(d3)

  m = Model(inputs=[inputs1, inputs2], outputs=outputs)

  # loss function
  m.compile(loss="categorical_crossentropy", optimizer="adam", metrics="accuracy")

  plot_model(m, to_file=f"{GDRIVE_WORKING_PATH}/models/{folder_name}/plot.png", show_shapes=True)

  with open(f"{GDRIVE_WORKING_PATH}/models/{folder_name}/summary.txt", "w") as f:
    with redirect_stdout(f):
      m.summary()

  return m

## Model Basic + Dropout

In [None]:
def get_model_with_dropout(voc_size, cap_length, emb_layer, folder_name):
  # image feature layers
  inputs1 = Input(shape=(4096,))
  drop1 = Dropout(0.5)(inputs1)
  d2 = Dense(256, activation='relu')(drop1)

  # sequence feature layers
  inputs2 = Input(shape=(cap_length,))
  emb = emb_layer(inputs2)
  lstm = LSTM(256)(emb)

  # merge
  merge = add([d2, lstm])
  drop3 = Dropout(0.5)(merge)
  d3 = Dense(256, activation="relu")(drop3)

  # softmax layer
  outputs = Dense(voc_size, activation="softmax")(d3)

  m = Model(inputs=[inputs1, inputs2], outputs=outputs)

  # loss function
  m.compile(loss="categorical_crossentropy", optimizer="adam", metrics="accuracy")

  plot_model(m, to_file=f"{GDRIVE_WORKING_PATH}/models/{folder_name}/plot.png")

  with open(f"{GDRIVE_WORKING_PATH}/models/{folder_name}/summary.txt", "w") as f:
    with redirect_stdout(f):
      m.summary()

  return m

## Model Basic + L1 Regularization


In [None]:
def get_model_with_reg(voc_size, cap_length, emb_layer, folder_name):
  # image feature layers
  inputs1 = Input(shape=(4096,))
  d2 = Dense(256, activation='relu', kernel_regularizer='l1')(inputs1)

  # sequence feature layers
  inputs2 = Input(shape=(cap_length,))
  emb = emb_layer(inputs2)
  lstm = LSTM(256)(emb)

  # merge
  merge = add([d2, lstm])
  d3 = Dense(256, activation="relu", kernel_regularizer='l1')(merge)

  # softmax layer
  drop3 = Dropout(0.5)(d3)
  outputs = Dense(voc_size, activation="softmax")(drop3)

  m = Model(inputs=[inputs1, inputs2], outputs=outputs)

  # loss function
  m.compile(loss="categorical_crossentropy", optimizer="adam", metrics="accuracy")

  plot_model(m, to_file=f"{GDRIVE_WORKING_PATH}/models/{folder_name}/plot.png")

  with open(f"{GDRIVE_WORKING_PATH}/models/{folder_name}/summary.txt", "w") as f:
    with redirect_stdout(f):
      m.summary()

  return m

## Model Basic + Dropout + Data Augmentation

In [None]:
def get_model_with_dropout_aug(voc_size, cap_length, emb_layer, folder_name):
  # image feature layers
  inputs1 = Input(shape=(4096,))
  drop1 = Dropout(0.5)(inputs1)
  d2 = Dense(256, activation='relu')(drop1)

  # sequence feature layers
  inputs2 = Input(shape=(cap_length,))
  emb = emb_layer(inputs2)
  lstm = LSTM(256)(emb)

  # merge
  merge = add([d2, lstm])
  drop3 = Dropout(0.5)(merge)
  d3 = Dense(256, activation="relu")(drop3)

  # softmax layer
  outputs = Dense(voc_size, activation="softmax")(d3)

  m = Model(inputs=[inputs1, inputs2], outputs=outputs)

  # loss function
  m.compile(loss="categorical_crossentropy", optimizer="adam", metrics="accuracy")

  plot_model(m, to_file=f"{GDRIVE_WORKING_PATH}/models/{folder_name}/plot.png")

  with open(f"{GDRIVE_WORKING_PATH}/models/{folder_name}/summary.txt", "w") as f:
    with redirect_stdout(f):
      m.summary()

  return m

## Model Places + Dropout

In [None]:
def get_model_places(voc_size, cap_length, emb_layer, folder_name):
  # image feature layers
  inputs1 = Input(shape=(4096,))
  drop1 = Dropout(0.5)(inputs1)
  d2 = Dense(256, activation='relu')(drop1)

  # sequence feature layers
  inputs2 = Input(shape=(cap_length,))
  emb = emb_layer(inputs2)
  lstm = LSTM(256)(emb)

  # merge
  merge = add([d2, lstm])
  drop3 = Dropout(0.5)(merge)
  d3 = Dense(256, activation="relu")(drop3)

  # softmax layer
  outputs = Dense(voc_size, activation="softmax")(d3)

  m = Model(inputs=[inputs1, inputs2], outputs=outputs)

  # loss function
  m.compile(loss="categorical_crossentropy", optimizer="adam", metrics="accuracy")

  plot_model(m, to_file=f"{GDRIVE_WORKING_PATH}/models/{folder_name}/plot.png")

  with open(f"{GDRIVE_WORKING_PATH}/models/{folder_name}/summary.txt", "w") as f:
    with redirect_stdout(f):
      m.summary()

  return m

# Training

In [None]:
def save_history_plot(h, path):
  plt.subplot(1, 2, 1)
  plt.plot(h.history['loss'])
  plt.plot(h.history['val_loss'])
  plt.ylim([0.0, 3.0])
  plt.xlabel('Epoch')
  plt.ylabel('Loss')
  plt.legend(['train', 'valid'])

  plt.subplot(1, 2, 2)
  plt.plot(h.history['accuracy'])
  plt.plot(h.history['val_accuracy'])
  plt.ylim([0.5, 1.0])
  plt.xlabel('Epoch')
  plt.ylabel('Accuracy')
  plt.legend(['train', 'valid'])

  plt.savefig(f"{path}/train_val_loss.png")

In [None]:
early_stop = tf.keras.callbacks.EarlyStopping(
  monitor="val_loss",
  patience=10,
  restore_best_weights=True
)

In [None]:
save_on_epoch_end = tf.keras.callbacks.ModelCheckpoint(
  f"{GDRIVE_WORKING_PATH}/models/dropout_aug/saved_model",
  verbose = 0,
  save_freq="epoch"
)

## Model Basic: VGG16 + LSTM

The model stopped after 32 epochs, the weights are restored to best epoch

In [None]:
if False:
  CAPTION_LENGTH = 73
  embedding_layer = get_embedding_layer()
  img_train, img_val, _ = get_features_vgg()
  caption_encoded_train, _, caption_encoded_val = get_vectorized_caption()
  model = get_model_basic(VOCAB_SIZE, CAPTION_LENGTH, embedding_layer, folder_name="basic")
  g_train = DataGen(img_train, caption_encoded_train, 32, VOCAB_SIZE, CAPTION_LENGTH)
  g_val = DataGen(img_val, caption_encoded_val, 32, VOCAB_SIZE, CAPTION_LENGTH)
  history = model.fit(g_train, validation_data=g_val, epochs=50, verbose=1, callbacks=early_stop)
  save_history_plot(history, f"{GDRIVE_WORKING_PATH}/models/basic")
  model.save(f"{GDRIVE_WORKING_PATH}/models/basic/saved_model")

## Model Basic + Dropout

In [None]:
if False:
  CAPTION_LENGTH = 73
  embedding_layer = get_embedding_layer()
  img_train, img_val, _ = get_features_vgg()
  caption_encoded_train, _, caption_encoded_val = get_vectorized_caption()
  model = get_model_with_dropout(VOCAB_SIZE, CAPTION_LENGTH, embedding_layer, folder_name="plus_dropout")
  g_train = DataGen(img_train, caption_encoded_train, 32, VOCAB_SIZE, CAPTION_LENGTH)
  g_val = DataGen(img_val, caption_encoded_val, 32, VOCAB_SIZE, CAPTION_LENGTH)
  history = model.fit(g_train, validation_data=g_val, epochs=50, verbose=1, callbacks=early_stop)
  save_history_plot(history, f"{GDRIVE_WORKING_PATH}/models/plus_dropout")
  model.save(f"{GDRIVE_WORKING_PATH}/models/plus_dropout/saved_model")

## Model Basic + L1 Regularization

In [None]:
if False:
  CAPTION_LENGTH = 73
  embedding_layer = get_embedding_layer()
  img_train, img_val, _ = get_features_vgg()
  caption_encoded_train, _, caption_encoded_val = get_vectorized_caption()
  model = get_model_with_reg(VOCAB_SIZE, CAPTION_LENGTH, embedding_layer, folder_name="plus_reg")
  g_train = DataGen(img_train, caption_encoded_train, 32, VOCAB_SIZE, CAPTION_LENGTH)
  g_val = DataGen(img_val, caption_encoded_val, 32, VOCAB_SIZE, CAPTION_LENGTH)
  history = model.fit(g_train, validation_data=g_val, epochs=50, verbose=1, callbacks=early_stop)
  save_history_plot(history, f"{GDRIVE_WORKING_PATH}/models/plus_reg")
  model.save(f"{GDRIVE_WORKING_PATH}/models/plus_reg/saved_model")

## Model Basic + Dropout + Data Augmentation

In [None]:
if False:
  CAPTION_LENGTH = 76
  embedding_layer = get_embedding_layer_aug()
  img_train, img_val, _ = get_features_vgg()
  caption_encoded_train, caption_encoded_val = get_vectorized_caption_train_aug()
  model = get_model_with_dropout_aug(VOCAB_SIZE_AUG, CAPTION_LENGTH, embedding_layer, folder_name="dropout_aug")
  g_train = DataGenAugmented(img_train, caption_encoded_train, 4, VOCAB_SIZE_AUG, CAPTION_LENGTH)
  g_val = DataGenAugmented(img_val, caption_encoded_val, 4, VOCAB_SIZE_AUG, CAPTION_LENGTH)
  history = model.fit(g_train, validation_data=g_val, epochs=50, verbose=1, callbacks=[save_on_epoch_end, early_stop])
  save_history_plot(history, f"{GDRIVE_WORKING_PATH}/models/dropout_aug")
  model.save(f"{GDRIVE_WORKING_PATH}/models/dropout_aug/saved_model")

## Model Basic + Dropout with Places Weights

In [None]:
if False:
  CAPTION_LENGTH = 73
  embedding_layer = get_embedding_layer()
  img_train, img_val, _ = get_features_vgg_places()
  caption_encoded_train, _, caption_encoded_val = get_vectorized_caption()

  # unable to convert to blob (missing img features)
  indexes = [train_ids.index('19575'), train_ids.index('19579'), train_ids.index('17975')]
  caption_encoded_train = list(caption_encoded_train)
  indexes = [8733, 10056, 13201]
  for index in sorted(indexes, reverse=True):
      del caption_encoded_train[index]
  caption_encoded_train = np.array(caption_encoded_train)

  model = get_model_places(VOCAB_SIZE, CAPTION_LENGTH, embedding_layer, folder_name="plus_dropout_places")
  g_train = DataGen(img_train, caption_encoded_train, 32, VOCAB_SIZE, CAPTION_LENGTH)
  g_val = DataGen(img_val, caption_encoded_val, 32, VOCAB_SIZE, CAPTION_LENGTH)
  history = model.fit(g_train, validation_data=g_val, epochs=50, verbose=1, callbacks=early_stop)
  save_history_plot(history, f"{GDRIVE_WORKING_PATH}/models/plus_dropout_places")
  model.save(f"{GDRIVE_WORKING_PATH}/models/plus_dropout_places/saved_model")