In [1]:
# IMPORT MODULES

import os
import pickle
import numpy as np
from tqdm.notebook import tqdm

from tensorflow.keras.applications.vgg16 import VGG16 , preprocess_input
from tensorflow.keras.preprocessing.image import load_img , img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model
from tensorflow.keras.utils import to_categorical , plot_model
from tensorflow.keras.layers import Input , Dense , LSTM , Embedding , Dropout , add



Load the Dataset

In [2]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("adityajn105/flickr8k")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'flickr8k' dataset.
Path to dataset files: /kaggle/input/flickr8k


In [3]:
# load the vgg16 model
model = VGG16()
# restructure the model
model = Model(inputs=model.inputs , outputs=model.layers[-2].output)
# summarize
print(model.summary())

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5
[1m553467096/553467096[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m14s[0m 0us/step


None


# Extract features from the image

In [None]:
features = {}
directory = os.path.join(path , 'Images')

for img_name in tqdm(os.listdir(directory)):
    # load the image from the file
    img_path = directory + '/' + img_name
    image = load_img(img_path , target_size=(224,224))
    # convert the image pixel to numpy array
    image = img_to_array(image)
    # reshape data from model
    image = image.reshape((1 , image.shape[0] , image.shape[1] , image.shape[2]))
    # preprocess image for vgg
    image = preprocess_input(image)
    #extract features
    feature = model.predict(image , verbose=0)
    # get image id
    img_id = img_name.split('.')[0]
    # store features
    features[img_id] = feature



  0%|          | 0/8091 [00:00<?, ?it/s]

In [None]:
import os
import pickle


os.makedirs('/kaggle/working', exist_ok=True)

output_path = '/kaggle/working/features.pkl'

with open(output_path, 'wb') as f:
    pickle.dump(features, f)

print(f"✅ Features saved successfully at: {output_path}")


In [None]:
import pickle


pickle_path = '/kaggle/working/features.pkl'

with open(pickle_path, 'rb') as f:
    features = pickle.load(f)

print("✅ Features loaded successfully")
print("Number of images/features:", len(features))


#Load the caption data

In [None]:
with open(os.path.join(path , 'captions.txt') , 'r') as f:
    next(f)
    captions_doc = f.read()

In [None]:
# create mapping  of images to caption
mapping = {}
# process lines
for line in tqdm(captions_doc.split('\n')):
    # split the line by comma(,)
    tokens = line.split(',')
    if len(line) < 2:
        continue
    image_id , caption = tokens[0] , tokens[1:]
    # remove extension from image
    image_id = image_id.split('.')[0]
    # convert caption list to string
    caption = " ".join(caption)
    # create list if needed
    if image_id not in mapping:
        mapping[image_id] = []
    # store the captions
    mapping[image_id].append(caption)

In [None]:
len(mapping)

In [None]:
def clean(mapping):
    for key , captions in mapping.items():
        for i in range(len(captions)):
          # take one caption at a time
          caption = captions[i]
          # preprocessing steps
          # convert to lowercase
          caption = caption.lower()
          # delete digits , special chars , etc
          caption = caption.replace('[^A-Za-z]' , '')
          caption = caption.replace('\s+' , ' ')
          # add start and end tags to the caption
          caption = 'startseq ' + " ".join([word for word in caption.split() if len(word)>1]) + ' endseq'
          captions[i] = caption

In [None]:
# before preprocess of text
mapping['1000268201_693b08cb0e']

In [None]:
# preprocess the text
clean(mapping)

In [None]:
# after preprocessing the text
mapping['1000268201_693b08cb0e']

In [None]:
all_captions = []
for key in mapping:
  for caption in mapping[key]:
    all_captions.append(caption)

In [None]:
len(all_captions)

In [None]:
all_captions[:10]

In [None]:
# tokenize the text
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size =  len(tokenizer.word_index) +  1

In [None]:
vocab_size

In [None]:
# get maximum length of caption
max_length = max(len(caption.split()) for caption in all_captions)
max_length

#Train Test Split

In [None]:
image_ids = list(mapping.keys())
split = int(len(image_ids)*0.90)
split

In [None]:
train = image_ids[:split]
test = image_ids[split:]

# Create data generator to get data in batch ( this avoid session crash)

In [None]:


def data_generator(data_keys,mapping ,features, tokenizer, max_length, vocab_size, batch_size):
  # loop over images
  X1 , X2 , y = list() , list() , list()
  n = 0
  while 1:
    for key in data_keys:
      n += 1
      captions = mapping[key]
      for caption in captions:
        seq = tokenizer.texts_to_sequences([caption])[0]
        # split the sequence into x , y pairs
        for i in range(1,len(seq)):
          in_seq , out_seq = seq[:i] , seq[i]
          #pad the sequence
          in_seq = pad_sequences([in_seq],maxlen=max_length)[0]
          # encode the output sequence
          out_seq = to_categorical([out_seq],num_classes=vocab_size)[0]
          # store the sequence
          X1.append(features[key][0])
          X2.append(in_seq)
          y.append(out_seq)
    if n == batch_size:
      X1 , X2 , y = np.array(X1) , np.array(X2) , np.array(y)
      yield [X1,X2] , y
      X1 , X2 , y = list() , list() , list()
      n = 0



#Model creation

In [None]:
inputs1 =   Input(shape=(4096,))
fe1 = Dropout(0.4)(inputs1)
fe2 = Dense(256, activation='relu')(fe1)


#sequence feature layer
inputs2  = Input(shape=(max_length,))
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
se2 = Dropout(0.4)(se1)
se3 = LSTM(256)(se2)

 #decoder model

decoder1 = add([fe2, se3])
decoder2 = Dense(256, activation='relu')(decoder1)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')

 #plot the model
plot_model(model, show_shapes=True)

In [None]:
# train the model
epochs = 15
batch_size = 32
steps = len(train) //batch_size

for i in range(epochs):
  #create data generator
  generator = data_generator(train,mapping , features, tokenizer, max_length, vocab_size, batch_size)
  #fit for one epoch
  model.fit(generator, epochs=1, steps_per_epoch=steps, verbose=1)

In [None]:
model.save('best_model')

# Generate Captions for image

In [None]:
def idx_to_word(integer,tokenizer):
  for word, index in tokenizer.word_index.items():
    if index == integer:
      return word
  return None

In [None]:
# generate caption function
def  predict_caption(model , image ,tokenizer ,max_length):
     in_text = 'startseq'
     # iterate over the max length of sequence
     for i in range(max_length)
         sequences = tokenizer.texts_to_sequences([in_text])[0]
         #  pad the sequence
         sequence =  pad_sequence([sequence],max_length)
         # predict next word
         yhat = model.predict([image,sequence],verbose=0)
         # get index  with high probability
         yhat = np.argmax(yhat)
         # convert index to word
         word = idx_to_word(yhat,tokenizer)
         # stop if word not found
         if word is None:
           break
         # append word as input for generating next word
         in_text += " " + word
         # stop if we reach end tag
         if word == 'endseq':
           break
     return in_text

In [None]:
from nltk.translate.bleu_score import corpus_bleu
# validate with test data
actual , predicted = list() , list()

for key in tqdm(test):
  captions = mapping[key]
  # take the first caption
  y_pred = predict_caption(model,features[key],tokenizer,max_length)
  actual_captions = [caption.split() for caption in captions]
  y_pred = y_pred.split()
  actual.append(actual_captions)
  predicted.append(y_pred)

#calculate  the RELU score
print("RUL score:-",corpus_bleu(actual,predicted,weights=(0.25,0.25,0.25,0.25)))
print("BLEU score:-",corpus_bleu(actual,predicted))

# visulaize the result

In [None]:
from PIL import Image
import matplotlib.pyplot as plt
# image_name = ""
image_id = image_name.split('.')[0]
img_path = os.path.join(path , 'Images' , image_name)
image = Image.open(img_path)
captions = mapping[image_id]
print('---------------------Actual---------------------')
for caption in captions:
    print(caption)
y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
print('--------------------Predicted--------------------')
print(y_pred)
plt.imshow(image)

In [None]:
generate_caption(image_name)