In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import os
import pickle
import numpy as np
import pandas as pd
import re
from tqdm.notebook import tqdm
import csv
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add

from nltk.translate.bleu_score import corpus_bleu, SmoothingFunction
!pip install pycocoevalcap
!pip install rouge
from rouge import Rouge
from tqdm import tqdm
from pycocoevalcap.cider.cider import Cider
from pycocoevalcap.spice.spice import Spice


!pip install git+https://github.com/salaniz/pycocoevalcap

Collecting pycocoevalcap
  Downloading pycocoevalcap-1.2-py3-none-any.whl (104.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.3/104.3 MB[0m [31m6.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: pycocoevalcap
Successfully installed pycocoevalcap-1.2
Collecting rouge
  Downloading rouge-1.0.1-py3-none-any.whl (13 kB)
Installing collected packages: rouge
Successfully installed rouge-1.0.1
Collecting git+https://github.com/salaniz/pycocoevalcap
  Cloning https://github.com/salaniz/pycocoevalcap to /tmp/pip-req-build-6ibj30a5
  Running command git clone --filter=blob:none --quiet https://github.com/salaniz/pycocoevalcap /tmp/pip-req-build-6ibj30a5
  Resolved https://github.com/salaniz/pycocoevalcap to commit a24f74c408c918f1f4ec34e9514bc8a76ce41ffd
  Preparing metadata (setup.py) ... [?25l[?25hdone


**Defining the directories**

In [3]:
# Define paths to CSV files and image folders
folder_path = '/content/drive/MyDrive/custom_captions_dataset/'
train_csv_file = folder_path + 'train.csv'
train_img_folder = folder_path + 'train'
test_csv_file = folder_path + 'test.csv'
test_img_folder = folder_path + 'test'
val_csv_file = folder_path + 'val.csv'
val_img_folder = folder_path + 'val'

In [None]:
def get_preprocessed_caption(caption): # preprocess the captions
    caption = caption.replace('\s+', ' ')
    caption = caption.replace('[^A-Za-z]', '')
    caption = caption.strip().lower()
    caption = "startseq " + caption + " endseq"
    return caption

**Load Caption**

In [None]:
def load_caption(csv_file, img_folder):

    data = pd.read_csv(csv_file) # Load CSV file
    captions = []
    img_dict = {}

    for index, row in data.iterrows():

        image_path = os.path.join(img_folder, row['filename'])
        image_name = image_path.split('/')[-1].split('.')[0]
        caption = get_preprocessed_caption(row['caption'])
        captions.append(caption)
        img_dict[image_name] = caption

    return img_dict

**Load IMAGES**

In [None]:
def load_images(directory):
  features = {}
  for img_name in tqdm(os.listdir(directory)):

      img_path = directory + '/' + img_name # load the image from file
      image = load_img(img_path, target_size=(224, 224))
      image = img_to_array(image)  # convert image pixels to numpy array
      image = image.reshape((1, image.shape[0], image.shape[1], image.shape[2])) # reshape data for model
      image = preprocess_input(image) # preprocess image for vgg
      image_id = img_name.split('.')[0] # get image ID
      features[image_id] = image  # feature

  return features

**Instantiating VGG16 as encoder CNN model**

In [None]:
# load vgg16 model
model_vgg16 = VGG16()
model_vgg16 = Model(inputs=model_vgg16.inputs, outputs=model_vgg16.layers[-2].output) # restructure the model

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5


In [None]:
train_img = load_images(train_img_folder)
test_img = load_images(test_img_folder)
val_img = load_images(val_img_folder)

100%|██████████| 5715/5715 [02:26<00:00, 38.93it/s] 
100%|██████████| 1009/1009 [00:32<00:00, 31.46it/s]
100%|██████████| 946/946 [00:31<00:00, 29.76it/s] 


In [None]:
# Dumping the images into pickle files
with open(folder_path + 'train_img_2.pickle', 'wb') as handle:
  pickle.dump(train_img, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open(folder_path + 'test_img_2.pickle', 'wb') as handle:
  pickle.dump(test_img, handle, protocol=pickle.HIGHEST_PROTOCOL)

with open(folder_path + 'val_img_2.pickle', 'wb') as handle:
  pickle.dump(val_img, handle, protocol=pickle.HIGHEST_PROTOCOL)


In [None]:
with open(folder_path + 'train_img_2.pickle', 'rb') as handle:
  train_img = pickle.load(handle)

with open(folder_path + 'test_img_2.pickle', 'rb') as handle:
  test_img = pickle.load(handle)

with open(folder_path + 'val_img_2.pickle', 'rb') as handle:
  val_img = pickle.load(handle)

In [None]:
# Load data
train_mapping = load_caption(train_csv_file, train_img_folder)
test_mapping = load_caption(test_csv_file, test_img_folder)
val_mapping = load_caption(val_csv_file, val_img_folder)
print("Training data: {} ".format(len(train_mapping)))
print("Testing data: {} ".format(len(test_mapping)))
print("Validation data: {} ".format(len(val_mapping)))


Training data: 5715 
Testing data: 928 
Validation data: 946 


In [None]:
all_captions = list(train_mapping.values())
# tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index)

# get maximum length of the caption available
max_length = max(len(caption.split()) for caption in all_captions)
max_length

In [None]:
train_img_id = list(train_mapping.keys())
test_img_id = list(test_mapping.keys())
val_img_id = list(val_mapping.keys())

**GENERATOR FUNCTION**

In [None]:
# create data generator to get data in batch (avoids session crash)
def data_generator(data_keys, mapping, images, tokenizer, max_length, vocab_size, batch_size, model_vgg16):

    X1, X2, y = list(), list(), list()
    n = 0
    while 1:
        for key in data_keys:
            n += 1
            caption = mapping[key]
            seq = tokenizer.texts_to_sequences([caption])[0]
            feature = model_vgg16.predict(images[key], verbose=0)
            # split the sequence into X, y pairs
            for i in range(1, len(seq)):

                in_seq, out_seq = seq[:i], seq[i] # split into input and output pairs
                in_seq = pad_sequences([in_seq], maxlen=max_length)[0] # pad input sequence
                out_seq = to_categorical([out_seq], num_classes=vocab_size)[0] # encode output sequence
                # store the sequences
                X1.append(feature[0])
                X2.append(in_seq)
                y.append(out_seq)

            if n == batch_size:
                if not X1 or not X2 or not y: # Skip empty batches
                    X1, X2, y = list(), list(), list()
                    n = 0
                    continue
                X1, X2, y = np.array(X1), np.array(X2), np.array(y)
                yield [X1, X2], y
                X1, X2, y = list(), list(), list()
                n = 0


**MODEL**

In [None]:
# encoder model
# image feature layers
inputs1 = Input(shape=(4096,))
fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)

# sequence feature layers
inputs2 = Input(shape=(max_length,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

# decoder model
decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')



In [None]:
# plot the model
plot_model(model, show_shapes=True)

In [None]:
def idx_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word
    return None

**FUNCTION to predict captions**

In [None]:
# generate caption for an image
def predict_caption(model, image, tokenizer, max_length):
    in_text = 'startseq'
    for i in range(max_length):
        sequence = tokenizer.texts_to_sequences([in_text])[0] # encode input sequence
        sequence = pad_sequences([sequence], max_length) # pad the sequence
        feature = model_vgg16.predict(image, verbose=0)
        yhat = model.predict([feature, sequence], verbose=0) # predict next word
        yhat = np.argmax(yhat) # get index with high probability
        word = idx_to_word(yhat, tokenizer) # convert index to word

        if word is None: # stop if word not found
            break
        in_text += " " + word

        if word == 'endseq': # stop if we reach end tag
            break

    return in_text

**SCORE METRICS**

In [None]:
def compute_cider_score(actual, predicted):
    cider_scorer = Cider()
    score , cider_scores = cider_scorer.compute_score(actual, predicted)
    # print(score, cider_scores)
    # return sum(cider_scores) / len(cider_scores)
    return score # 1.2

def compute_spice_score(actual, predicted):
    spice_scorer = Spice()
    spice_score, _ = spice_scorer.compute_score(actual, predicted)
    return spice_score  # 0.05

def compute_rouge_l_score(actual, predicted):
    rouge = Rouge()
    true_cap = [caption[0] for caption in actual]
    pred_cap = [caption[0] for caption in predicted]
    scores = rouge.get_scores(pred_cap, true_cap, avg=True)
    return scores['rouge-l']['f']  # 0.4 - 0.5

import time

def compute_spice_score_with_progress(actual_dict, predicted_dict):
    spice_scorer = Spice()
    num_images = len(actual_dict)
    progress_interval = num_images // 100  # Update progress every 10% of images processed
    spice_scores = []

    print("Computing SPICE score...")
    start_time = time.time()
    for i, (img_id, actual_captions) in enumerate(actual_dict.items(), 1):
        spice_score, _ = spice_scorer.compute_score({img_id: actual_captions}, {img_id: predicted_dict[img_id]})
        spice_scores.append(spice_score)

        if i % progress_interval == 0 or i == num_images:
            elapsed_time = time.time() - start_time
            print(f"Progress: {i}/{num_images} images processed. Elapsed time: {elapsed_time:.2f} seconds")

    average_spice_score = sum(spice_scores) / num_images
    print("SPICE score calculation completed.")
    return average_spice_score

**EVALUATE**

In [None]:
import random
def evaluate_model(model, val_img_id, val_img, val_mapping):
    actual, predicted = {}, {}

    for key in tqdm(random.sample(val_img_id, 5)):  # Iterate through the first item for valing
        actual[key] = [val_mapping[key]]  # Wrap ground truth captions in a list
        predicted[key] = [predict_caption(model, val_img[key], tokenizer, max_length)]  # Wrap the generated caption in a list
        print(actual[key])
        print(predicted[key])

    return compute_rouge_l_score(list(actual.values()), list(predicted.values()))

In [None]:
model.summary()

**TRAINING THE MODEL**

In [None]:
# train the model
epochs = 8
batch_size = 32
steps = len(train_img_id) // batch_size

best_val_score = float('-inf')
best_model_path = 'best_model_t1.pickle'

for i in range(epochs):
    generator = data_generator(train_img_id, train_mapping, train_img, tokenizer, max_length, vocab_size, batch_size, model_vgg16) # create data generator
    model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1) # fit for one epoch
    val_score = evaluate_model(model, val_img_id, val_img, val_mapping) # Evaluate model on validation data

    # Save model if it has the best validation score so far
    if val_score > best_val_score:
        best_val_score = val_score
        with open(folder_path + best_model_path, 'wb') as handle:
            pickle.dump(model, handle, protocol=pickle.HIGHEST_PROTOCOL)
        print("Saved model with best validation score:", best_val_score)

In [None]:
best_model_path='best_model_t1.pickle'

Loading the best model stored above

In [None]:
with open(folder_path + best_model_path, 'rb') as handle:
    best_model = pickle.load(handle)

In [None]:
from PIL import Image
import matplotlib.pyplot as plt
# test_mapping['test_2']
img_path=f'{folder_path}test/test_1.jpg'
image = Image.open(img_path)
plt.imshow(image)

test_mapping['test_1']


In [None]:
# create the CSV file (Structure only)


# with open(folder_path + 't1_predictions_v3.csv', mode='w', newline='') as file:
#     writer = csv.writer(file)
#     writer.writerow(['serial','img_id', 'actual_caption', 'predicted_caption'])



**Predicted captions on TEST DATA**

In [None]:
from tqdm import tqdm
import csv
actual, predicted = {}, {}

# Save the data into a CSV file

count=0
for key in test_img_id:  # Iterate through the first item for testing
    count+=1
    actual[key] = [test_mapping[key]]  # Wrap ground truth captions in a list
    predicted[key] = [predict_caption(best_model, test_img[key], tokenizer, max_length)]  # Wrap the generated caption in a list
    with open(folder_path + 't1_predictions_v3.csv', mode='a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([count,key, actual[key][0], predicted[key][0]])
    print(f'{count} done')





Loading the predicted captions

In [4]:
def remove_start_end(sentence):
    words=sentence.split(' ')
    return " ".join(words[1:-1])


df_pred=pd.read_csv(folder_path + 't1_predictions_v3.csv')
image_id=df_pred['img_id']
actual_caption=df_pred['actual_caption']
pred_caption=df_pred['predicted_caption']


actual_caption=actual_caption.apply(remove_start_end)
pred_caption=pred_caption.apply(remove_start_end)

In [5]:

image_actual_dict = {}
for img_id, actual_cap in zip(image_id, actual_caption):
    if img_id in image_actual_dict:
        image_actual_dict[img_id].append(actual_cap)
    else:
        image_actual_dict[img_id] = [actual_cap]


image_pred_dict = {}
for img_id, pred_cap in zip(image_id, pred_caption):
    if img_id in image_pred_dict:
        image_pred_dict[img_id].append(pred_cap)
    else:
        image_pred_dict[img_id] = [pred_cap]

**ROGUE-L SCORE**

In [None]:
rouge_l_score = compute_rouge_l_score(list(image_actual_dict.values()), list(image_pred_dict.values()))
print(f"\nROUGE-L: {rouge_l_score}")



ROUGE-L: 0.35167331036641325


**CIDEr SCORE**

In [None]:
cider_score = compute_cider_score(image_actual_dict,image_pred_dict)
print(f"CIDEr: {cider_score}")


CIDEr: 0.05834273241798831


**SCPICE SCORE**

In [None]:
# spice_score = compute_spice_score(image_actual_dict, image_pred_dict)
# print(f"SPICE: {spice_score}")

Downloading stanford-corenlp-3.6.0 for SPICE ...
Progress: 384.5M / 384.5M (100.0%)
Extracting stanford-corenlp-3.6.0 ...
Done.


In [24]:
spice_score = compute_spice_score_with_progress(image_actual_dict, image_pred_dict)
print("SPICE score:", spice_score)

Computing SPICE score...
Progress: 9/928 images processed. Elapsed time: 29.74 seconds
Progress: 18/928 images processed. Elapsed time: 163.91 seconds
Progress: 27/928 images processed. Elapsed time: 412.93 seconds
Progress: 36/928 images processed. Elapsed time: 769.35 seconds
Progress: 45/928 images processed. Elapsed time: 951.24 seconds
Progress: 54/928 images processed. Elapsed time: 1128.94 seconds
Progress: 63/928 images processed. Elapsed time: 1294.75 seconds
Progress: 72/928 images processed. Elapsed time: 1618.40 seconds
Progress: 81/928 images processed. Elapsed time: 2033.41 seconds
Progress: 90/928 images processed. Elapsed time: 2189.24 seconds
Progress: 99/928 images processed. Elapsed time: 2522.99 seconds
Progress: 108/928 images processed. Elapsed time: 2759.10 seconds
Progress: 117/928 images processed. Elapsed time: 3010.54 seconds
Progress: 126/928 images processed. Elapsed time: 3185.84 seconds
Progress: 135/928 images processed. Elapsed time: 3616.18 seconds
Pro

In [26]:
print("SPICE score:", spice_score)

SPICE score: 0.137427499450287


**DISPLAY THE OUTPUTs**

In [28]:
for key in image_actual_dict.keys():
    print("\npic_id - ", key)
    print("original caption - ", remove_start_end(image_actual_dict[key][0]))
    print("predicted caption - ", remove_start_end(image_pred_dict[key][0]))


pic_id -  test_1
original caption -  a large building with bars on the windows in front of it. there is people walking in front of the building. there is a street in front of the building with many cars on it.
predicted caption -  this is a picture of a city street there are two people walking on the sidewalk the street is white the street is red and white the street is red and white there are two story buildings on the side of the street there are many buildings on the sidewalk the buildings are green and white the buildings are green and white the buildings are parked on the sidewalk

pic_id -  test_2
original caption -  a person is skiing through the snow. there is loose snow all around them from him jumping. the person is wearing a yellow snow suit. the person is holding two ski poles in their hands.
predicted caption -  this is an image of a man the man is wearing a black helmet the man is wearing a black helmet the man is wearing a black helmet the man is wearing a black helmet 