In [None]:
# This file uses the CNN + LSTM architecture to label images from the coco captions dataset.
# CITATION: The LSTM model and its embeddings are insprired from the following tutorial: https://www.analyticsvidhya.com/blog/2020/11/create-your-own-image-caption-generator-using-keras/

In [None]:
# Basic Libraries
import numpy as np
import matplotlib.pyplot as plt
import warnings
import pickle
import json
import random
import collections
import string
from tqdm import tqdm
import itertools
from collections import Counter
import json
import os

# Auixillary
from google.colab import drive
from pycocotools.coco import COCO
from pycocoevalcap.eval import COCOEvalCap

# Tensorflow & Keras
from tensorflow.keras.preprocessing.image import ImageDataGenerator, load_img, img_to_array
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, GlobalAveragePooling2D, Activation, Dropout, Flatten, Dense, Input, Layer
from tensorflow.keras.layers import Embedding, LSTM, add, Concatenate, Reshape, concatenate, Bidirectional
from tensorflow.keras.applications.inception_v3 import InceptionV3
from keras.applications.inception_v3 import preprocess_input
from tensorflow.keras.applications.inception_v3 import preprocess_input
from keras import Input

# Commands
warnings.filterwarnings('ignore')
drive.mount('/content/drive')

# Create the local directories
#!unzip /content/drive/MyDrive/glove.6B.200d.txt -d /content/glove6b
#!unzip /content/drive/MyDrive/coco2017.zip -d /content/coco2017

In [None]:
# Initializing COCO API with the annotations file
coco = COCO('/content/coco2017/annotations/captions_train2017.json')

# Loading annotations from JSON into a variable
with open('/content/coco2017/annotations/captions_train2017.json', 'r') as f:
    annotations = json.load(f)

# Creating a mapping of image paths to their captions
image_caption_map = collections.defaultdict(list)

# Populating the image-caption mapping
for val in annotations['annotations']:
  im_path = '/content/coco2017/train2017/' + '%012d.jpg' % (val['image_id'])
  image_caption_map[im_path].append((f"{val['caption']}"))

# Reducing the dataset size by sampling a subset
image_caption_map = dict(random.sample(dict(image_caption_map).items(),
                              int(len(dict(image_caption_map)) * 0.001)))

In [3]:
# Extract image ID from its path
def get_image_id(im_path):
    return im_path.split('/')[-1].rsplit('.', 1)[0]

# Remove punctuation and convert description to lowercase
def process_description(desc):
    return ''.join(ch for ch in desc if ch not in set(string.punctuation)).lower()

# Convert image-caption mapping to ID-caption format
def transform_to_id_caption_format(image_caption_map):
    new_data = {}
    for path, caps in image_caption_map.items():
        new_data[get_image_id(path)] = [process_description(cap) for cap in caps]
    return new_data

In [4]:
# Preprocessing the captions by removing punctuations and converting to lowercase
preprocessed = {}
for path, caps in image_caption_map.items():
    preprocessed[path] = [process_description(cap) for cap in caps]

# Transforming preprocessed captions into a new format based on image IDs
new_image_map = transform_to_id_caption_format(preprocessed)

# Creating lines of text with image paths and their processed descriptions
lines = [f"{path} {description}"
         for path, d_list in new_image_map.items()
         for description in d_list]

# Joining all lines into a single string
new_descriptions = '\n'.join(lines)

# Extracting image paths from the map for training
train_img = list(image_caption_map.keys())

In [None]:
# Initializing a dictionary to hold batch of processed descriptions
descriptions_batch = {}
lines = new_descriptions.split('\n')

# Processing each line in the new descriptions
for line in tqdm(lines):
    if not line.strip():
        continue
    img_id, *img_desc = line.split()
    # Adding 'startseq' and 'endseq' tokens to each description
    if img_id in list(new_image_map.keys()):
        desc = 'startseq {} endseq'.format(' '.join(img_desc))
        descriptions_batch.setdefault(img_id, []).append(desc)


In [6]:
# Creating a flat list of all captions
captions_batch = list(itertools.chain.from_iterable(descriptions_batch.values()))

# Generating a vocabulary from the captions
words = [word for sent in captions_batch for word in sent.split()]
vocab = [word for word, count in Counter(words).items() if count >= 10]

# Creating word-to-index and index-to-word mappings
wordtoix = {word: index+1 for index, word in enumerate(vocab)}
ixtoword = {index: word for word, index in wordtoix.items()}
vocab_size = len(ixtoword) + 1

# Gathering all descriptions to determine the max length
all_desc = [desc for descs in descriptions_batch.values() for desc in descs]
max_length = max(map(lambda x: len(x.split()), all_desc))

In [7]:
# Initializing a dictionary to store word embeddings
embedding_map = {}
with open('/content/glove6b/glove.6B.200d.txt', encoding="utf-8") as f:
    for line in f:
        values = line.strip().split()
        embedding_map[values[0]] = np.asarray(values[1:], dtype='float32')

# Defining the embedding dimension
embedding_dim = 200
embedding_mat = np.zeros((vocab_size, embedding_dim))

# Populating the embedding matrix with known embeddings
embeddings_keep = {wordtoix[word]: embedding_map[word] for word in wordtoix if word in embedding_map}
for index, embeds in embeddings_keep.items():
    embedding_mat[index] = embeds

In [None]:
# Setting up the InceptionV3 model for image feature extraction
base_model = InceptionV3(weights='imagenet')
model_new = Model(inputs=base_model.input, outputs=base_model.layers[-2].output)

# Function to preprocess input images for the model
def image_preprocessing(im_path):
    image = load_img(im_path, target_size=(299, 299))
    image_arr = img_to_array(image)
    image_arr = np.expand_dims(image_arr, axis=0)
    return preprocess_input(image_arr)

# Extracting feature vectors from preprocessed images
def fetch_encoding(im_path):
    processed_image = image_preprocessing(im_path)
    feature_vector = model_new.predict(processed_image)
    return np.squeeze(feature_vector)

# Processing all training images to get their feature vectors
training_data = {im_path.split('/')[-1]: fetch_encoding(im_path) for im_path in tqdm(train_img)}

In [9]:
# Defining the path to save the training data
save_path = '/content/drive/MyDrive/training_data_1027231301.pkl'

# Saving the training data to a file using pickle
with open(save_path, 'wb') as file:
    pickle.dump(training_data, file, protocol=pickle.HIGHEST_PROTOCOL)

# Loading the training data back from the file
with open(save_path, 'rb') as file:
    training_data_loaded = pickle.load(file)

# Assigning the loaded data to the original variable
training_data = training_data_loaded

In [None]:
# Create initial feature layer
def feature_extractor(input_shape):
    inputs = Input(shape=input_shape)
    x = Dropout(0.5)(inputs)
    x = Dense(256, activation='relu')(x)
    return inputs, x

# Create sequence generator
def sequence_processor(input_shape, vocab_size, embedding_dim, embedding_mat):
    inputs = Input(shape=input_shape)
    x = Embedding(vocab_size, embedding_dim, mask_zero=True, weights=[embedding_mat], trainable=False)(inputs)
    x = Dropout(0.5)(x)
    x = LSTM(256)(x)
    return inputs, x

# Combine the entire model
def model_assembler(feature_output, sequence_output, vocab_size):
    x = add([feature_output, sequence_output])
    x = Dense(256, activation='relu')(x)
    return Dense(vocab_size, activation='softmax')(x)

# Funnel the features through the built model
feature_input, feature_output = feature_extractor((2048,))
sequence_input, sequence_output = sequence_processor((max_length,), vocab_size, embedding_dim, embedding_mat)
outputs = model_assembler(feature_output, sequence_output, vocab_size)

# Create the model
model = Model(inputs=[feature_input, sequence_input], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

In [11]:
# Function to preprocess a given description into sequences for training
def preprocess_description(desc, wordtoix, max_length):
    seq = [wordtoix.get(word) for word in desc.split() if word in wordtoix]
    seq = [s for s in seq if s is not None]
    X2, y = [], []

    # Create input-output pairs from the sequence
    for i in range(1, len(seq)):
        in_seq, out_seq = seq[:i], seq[i]
        in_seq = pad_sequences([in_seq], maxlen=max_length)[0]
        out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]
        X2.append(in_seq)
        y.append(out_seq)
    return X2, y

# Generator function to yield batches of data for training
def data_generator(descriptions, photos, wordtoix, max_length, num_photos_per_batch):
    X1, X2, y = [], [], []
    while True:
        for key, desc_list in descriptions.items():
            photo = photos.get(key + '.jpg')
            if photo is None:
                continue

            # Process each description for the photo
            for desc in desc_list:
                in_seqs, out_seqs = preprocess_description(desc, wordtoix, max_length)
                X1.extend([photo] * len(in_seqs))
                X2.extend(in_seqs)
                y.extend(out_seqs)

            # Yield a batch of data when the desired batch size is reached
            if len(X1) >= num_photos_per_batch:
                yield ([np.array(X1), np.array(X2)], np.array(y))
                X1, X2, y = [], [], []


In [None]:
# Set training hyperparams
epochs = 10
batch_size = 64
steps = len(descriptions_batch) // batch_size
generator = data_generator(descriptions_batch, training_data, wordtoix, max_length, batch_size)

# Fit the model
model.fit(generator, epochs=epochs, steps_per_epoch=steps, verbose=1)

In [None]:
# Generate a caption given a path
def generate_caption(photo, max_len=max_length):
    caption = ['startseq']

    for _ in range(max_len):
        tokens = [wordtoix[word] for word in caption if word in wordtoix]
        tokens_padded = pad_sequences([tokens], maxlen=max_len)
        predicted_word_index = np.argmax(model.predict([photo, tokens_padded], verbose=0))
        predicted_word = ixtoword[predicted_word_index]
        if predicted_word == 'endseq':
            break
        caption.append(predicted_word)
    processed_caption = caption[1:]
    if 'endseq' in processed_caption:
        processed_caption.remove('endseq')
    return ' '.join(processed_caption)

# Get a caption given a path
def get_caption(im_path):
    image = fetch_encoding(im_path)
    image = image.reshape((1, 2048))
    x=plt.imread(im_path)
    plt.imshow(x)
    plt.show()
    print(generate_caption(image))
    return generate_caption(image)

# Get example image
get_caption('/content/coco2017/train2017/000000266677.jpg')

In [None]:
# Defining the directory path for validation images
folder_path = '/content/coco2017/val2017/'
image_files = [f for f in os.listdir(folder_path) if os.path.isfile(os.path.join(folder_path, f))]

# Constructing the full path for each image file
for image_file in image_files:
    image_path = os.path.join(folder_path, image_file)

# List to store generated captions for each image
all_captions = []

# Processing each image to generate captions
for image_file in tqdm(image_files):
    image_path = os.path.join(folder_path, image_file)
    temp_dict = dict()

    # Extracting image ID and encoding from image path
    image_id = int(str(image_path).split('.')[0].split('/')[-1].lstrip('0'))
    image = fetch_encoding(image_path)
    image = image.reshape((1, 2048))

    # Generating caption for the image
    image_caption = generate_caption(image)
    temp_dict["image_id"] = image_id
    temp_dict["caption"] = image_caption
    all_captions.append(temp_dict)

# Saving all generated captions to a JSON file
try:
    with open('/content/results.json', 'w') as f:
        json.dump(all_captions, f)
except:
    pass

# Defining paths for annotation and results files
annotation_file = '/content/coco2017/annotations/captions_val2017.json'
results_file = '/content/results.json'

# Evaluating generated captions against ground truth using COCO tools
coco = COCO(annotation_file)
coco_result = coco.loadRes(results_file)
coco_eval = COCOEvalCap(coco, coco_result)
coco_eval.evaluate()

# Displaying evaluation scores for various metrics
for metric, score in coco_eval.eval.items():
    print(f'{metric}: {score:.3f}')
