### Image Captioning using CNN and RNN


In [None]:
!pip install tensorflow==2.13.1

Collecting tensorflow==2.13.1
  Downloading tensorflow-2.13.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.4 kB)
Collecting gast<=0.4.0,>=0.2.1 (from tensorflow==2.13.1)
  Downloading gast-0.4.0-py3-none-any.whl.metadata (1.1 kB)
Collecting keras<2.14,>=2.13.1 (from tensorflow==2.13.1)
  Downloading keras-2.13.1-py3-none-any.whl.metadata (2.4 kB)
Collecting numpy<=1.24.3,>=1.22 (from tensorflow==2.13.1)
  Downloading numpy-1.24.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (5.6 kB)
Collecting tensorboard<2.14,>=2.13 (from tensorflow==2.13.1)
  Downloading tensorboard-2.13.0-py3-none-any.whl.metadata (1.8 kB)
Collecting tensorflow-estimator<2.14,>=2.13.0 (from tensorflow==2.13.1)
  Downloading tensorflow_estimator-2.13.0-py2.py3-none-any.whl.metadata (1.3 kB)
Collecting typing-extensions<4.6.0,>=3.6.6 (from tensorflow==2.13.1)
  Downloading typing_extensions-4.5.0-py3-none-any.whl.metadata (8.5 kB)
Collecting google-auth-oauthlib<1.1,>=0

In [None]:
import tensorflow as tf
print(tf.__version__)

In [None]:
import keras
print(keras.__version__)


In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import keras
import re
import nltk
from nltk.corpus import stopwords
import string
import json
from time import time
import pickle
from keras.applications.vgg16 import VGG16
from keras.applications.inception_v3 import InceptionV3, preprocess_input, decode_predictions
from keras.applications.resnet50 import ResNet50, preprocess_input, decode_predictions
from keras.preprocessing import image
from keras.models import Model, load_model
from keras.preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
from keras.layers import Input, Dense, Dropout, Embedding, LSTM
from keras.layers import add

In [None]:
# read text captions

def readTextFile(path):
    with open(path) as f:
        caption = f.read()
    return caption

In [None]:
captions = readTextFile('/kaggle/input/flickr8k/captions.txt')

In [None]:
captions = captions.split('\n')[:-1]

In [None]:
captions[161]

In [None]:
desc = {}

for x in captions:
    zyzz = x.split(',')
    img_name,img_captions= zyzz[0],zyzz[1]
    
    if desc.get(img_name) is None:
        desc[img_name] = []
    desc[img_name].append(img_captions)


In [None]:
desc['1000268201_693b08cb0e.jpg']

In [None]:
import cv2

In [None]:
path = '/kaggle/input/flickr8k/Images/1003163366_44323f5815.jpg'
plt.imshow(cv2.imread(path))
plt.show()
desc['1003163366_44323f5815.jpg']

### Data Cleaning
**HERE**
- Do not remove stopwords
- Do not stemming
- Remove numbers, punctuations

$X \rightarrow MODEL \rightarrow \textbf{Dense layer with soft max}\rightarrow vector[\textbf{probability distribution of each word}]$

In [None]:
import re

In [None]:
def clean_text(sentence):
    sentence = sentence.lower()
    sentence = re.sub('[^a-z]+',' ', sentence)
    sentence = sentence.split()
    sentence = [s for s in sentence if len(s) > 1]
    sentence = ' '.join(sentence)
    return sentence

In [None]:
# clean all captions

In [None]:
for key, caption_list in desc.items():
    for i in range(len(caption_list)):
        caption_list[i] = clean_text(caption_list[i])

In [None]:
desc['1042020065_fb3d3ba5ba.jpg']

In [None]:
import os
print(os.listdir('/kaggle/working'))


In [None]:
# write the data to text file 
with open('/kaggle/working/descriptions.txt', 'w') as f:
    f.write(str(desc))


### Vocabulary


In [None]:
import json

In [None]:
descriptions = None
with open('/kaggle/working/descriptions.txt') as f:
    descriptions = f.read()
json_acceptable_string = descriptions.replace("'","\"")    
descriptions = json.loads(json_acceptable_string)

In [None]:
print(type(descriptions))

In [None]:
descriptions.get('1042020065_fb3d3ba5ba.jpg')

In [None]:
# Vocab

vocab = set()
for key in descriptions.keys():
    [vocab.update(sentence.split()) for sentence in descriptions[key]]
    
print("Vocab Size(unique words In Vocab) : %d"% len(vocab))

In [None]:
descriptions.get('1042020065_fb3d3ba5ba.jpg')

In [None]:
# Total No of words across all the sentences
total_words = []

for key in descriptions.keys():
    [total_words.append(i) for des in descriptions[key] for i in des.split()]
    
print("Total Words %d"%len(total_words))


In [None]:
len(total_words), type(total_words)

In [None]:
# filter words from vocab according to certain threshold frequency

In [None]:
import collections
counter = collections.Counter(total_words)
frq_cnt = dict(counter)
print(len(frq_cnt.keys()))
# print(frq_cnt)

In [None]:
# sort the dictionary according to the freq count

sorted_freq_cnt = sorted(frq_cnt.items(), reverse = True, key=lambda x:x[1])

# filter
threshold =  10
sorted_freq_cnt = [x for x in sorted_freq_cnt if x[1]>threshold]
total_words = [x[0] for x in sorted_freq_cnt]

In [None]:
print(len(total_words)) # 1800 unique words filter, this is going to be new vocab size

In [None]:
descriptions.get('1042020065_fb3d3ba5ba.jpg')

### Train/Val/Test Data

In [None]:
len(descriptions.keys()), len(descriptions.values())

In [None]:
import random 

image_filenames = list(descriptions.keys())

random.shuffle(image_filenames)
split_index_test = int(0.8 * len(image_filenames))  # 80% for train + val
train_val_filenames = image_filenames[:split_index_test]
test_filenames = image_filenames[split_index_test:]

# Split train + val into train (70%) and val (30%)
split_index_val = int(0.7 * len(train_val_filenames))  # 70% for train
train_filenames = train_val_filenames[:split_index_val]
val_filenames = train_val_filenames[split_index_val:]

# Save filenames to their respective files
with open('train.txt', 'w') as train_file:
    for filename in train_filenames:
        train_file.write(f"{filename}\n")

with open('val.txt', 'w') as val_file:
    for filename in val_filenames:
        val_file.write(f"{filename}\n")

with open('test.txt', 'w') as test_file:
    for filename in test_filenames:
        test_file.write(f"{filename}\n")

In [None]:
train_file_data = readTextFile('/kaggle/working/train.txt')
val_file_data = readTextFile('/kaggle/working/val.txt')
test_file_data = readTextFile('/kaggle/working/test.txt')

In [None]:
train = [row.split('.')[0] for row in train_file_data.split('\n')[:-1]]
val = [row.split('.')[0] for row in val_file_data.split('\n')[:-1]]
test = [row.split('.')[0] for row in test_file_data.split('\n')[:-1]]

In [None]:
len(train), len(val), len(test)

In [None]:
print(descriptions.get('1000268201_693b08cb0e.jpg'))

In [None]:
if 'image' in train:
    train.remove('image')

if 'image' in test:
    test.remove('image')

if 'image' in val:
    val.remove('image')
    

In [None]:
id = '1000268201_693b08cb0e'
descriptions[id+'.jpg']

In [None]:
train_descriptions = {}
for img_id in train:
    # print("---------------------")
    train_descriptions[img_id] = []
    # print(img_id)
    # print(descriptions[img_id+'.jpg'])
    for cap in descriptions[img_id+'.jpg']:
        cap_to_append = "startseq " + cap + " endseq"
        train_descriptions[img_id].append(cap_to_append)



In [None]:
val_descriptions = {}
for img_id in val:
    # print("---------------------")
    val_descriptions[img_id] = []
    # print(img_id)
    # print(descriptions[img_id+'.jpg'])
    for cap in descriptions[img_id+'.jpg']:
        cap_to_append = "startseq " + cap + " endseq"
        val_descriptions[img_id].append(cap_to_append)


In [None]:
# Prepare Description for the Training Data
# Tweak - Add <s> and <e> token to our training data
test_descriptions = {}
for img_id in test:
    # print("---------------------")
    test_descriptions[img_id] = []
    # print(img_id)
    # print(descriptions[img_id+'.jpg'])
    for cap in descriptions[img_id+'.jpg']:
        cap_to_append = "startseq " + cap + " endseq"
        test_descriptions[img_id].append(cap_to_append)



## Transfer Learning
<br>

### Step 1: Image Feature Extraction

In [None]:
model = ResNet50(weights = 'imagenet', input_shape = (224,224,3))
model.summary()

In [None]:
model_new = Model(model.input,model.layers[-2].output)

In [None]:
def preprocess_img(img):
    img = image.load_img(img,target_size=(224,224))
    img = image.img_to_array(img)
    img = np.expand_dims(img,axis=0)
    # Normalisation
    img = preprocess_input(img)
    return img

In [None]:
img = preprocess_img('/kaggle/input/flickr8k/Images/'+"1000268201_693b08cb0e.jpg")
plt.imshow(img[0])
plt.axis("off")
plt.show()

In [None]:
def encode_image(img):
    img = preprocess_img(img)
    feature_vector = model_new.predict(img)  
    feature_vector = feature_vector.reshape((2048,))
    #print(feature_vector.shape)
    return feature_vector

In [None]:
encode_image('/kaggle/input/flickr8k/Images/'+"1000268201_693b08cb0e.jpg")

In [None]:
type(train)

In [None]:
start = time()
encoding_train = {}
#image_id -->feature_vector extracted from Resnet Image
for ix,img_id in enumerate(train):
    img_path = '/kaggle/input/flickr8k/Images/'+"/"+img_id+".jpg"
    encoding_train[img_id] = encode_image(img_path)
    if ix%100==0:
        print("Encoding in progress time step %d "%ix)
end_t = time()
print("Total Time Taken :",end_t-start)

In [None]:
start = time()
encoding_test = {}
#image_id -->feature_vector extracted from Resnet Image
for ix,img_id in enumerate(test):
    img_path = '/kaggle/input/flickr8k/Images'+"/"+img_id+".jpg"
    encoding_test[img_id] = encode_image(img_path)
    if ix%100==0:
        print("Encoding in progress time step %d "%ix)
end_t = time()
print("Total Time Taken :",end_t-start)

In [None]:
start = time()
encoding_val = {}
#image_id -->feature_vector extracted from Resnet Image
for ix,img_id in enumerate(val):
    img_path = '/kaggle/input/flickr8k/Images/'+"/"+img_id+".jpg"
    encoding_val[img_id] = encode_image(img_path)
    if ix%100==0:
        print("Encoding in progress time step %d "%ix)
end_t = time()
print("Total Time Taken :",end_t-start)

In [None]:
# # Store everything to the disk 
import pickle

with open("encoded_train_features.pkl","wb") as f:
    pickle.dump(encoding_train,f)

with open("encoded_val_features.pkl","wb") as f:
    pickle.dump(encoding_val ,f)

with open("encoded_test_features.pkl","wb") as f:
    pickle.dump(encoding_test,f)

In [None]:
len(total_words)

In [None]:
word_to_idx = {}
idx_to_word = {}

for i, word in enumerate(total_words):
    word_to_idx[word] = i+1
    idx_to_word[i+1] = word

In [None]:
word_to_idx['dog']
idx_to_word[1]

In [None]:
len(idx_to_word)

In [None]:
idx_to_word[1801] = 'startseq'
word_to_idx['startseq'] = 1801

idx_to_word[1802] = 'endseq'
word_to_idx['endseq'] = 1802

In [None]:
vocab_size =len(word_to_idx)+1
print(vocab_size)

In [None]:
max_len = 0 
for key in train_descriptions.keys():
    for cap in train_descriptions[key]:
        max_len = max(max_len,len(cap.split()))
        
print(max_len)

### Data Loader

In [None]:
from keras.utils import to_categorical

In [None]:
def data_generator(train_descriptions,encoding_train,word_to_idx,max_len,batch_size):
    X1,X2, y = [],[],[]
    
    n = 0
    while True:
        for key,desc_list in train_descriptions.items():
            n += 1
            
            photo = encoding_train[key]
            for desc in desc_list:
                
                seq = [word_to_idx[word] for word in desc.split() if word in word_to_idx]
                for i in range(1,len(seq)):
                    xi = seq[0:i]
                    yi = seq[i]
                    
                    #0 denote padding word
                    xi = pad_sequences([xi],maxlen=max_len,value=0,padding='post')[0]
                    yi = tf.keras.utils.to_categorical([yi],num_classes=vocab_size)[0]
                    
                    X1.append(photo)
                    X2.append(xi)
                    y.append(yi)
                    
                if n == batch_size:
                    yield [[np.array(X1),np.array(X2)],np.array(y)]
                    X1,X2,y = [],[],[]
                    n = 0

In [None]:
f = open("/kaggle/input/glove/keras/default/1/glove.6B.200d.txt",encoding='utf8')

In [None]:
embedding_index = {}

for line in f:
    values = line.split()
    
    word = values[0]
    word_embedding = np.array(values[1:],dtype='float')
    embedding_index[word] = word_embedding
    

In [None]:
embedding_index['apple'] # embedding for apple

In [None]:
def get_embedding_matrix():
    emb_dim = 200
    matrix = np.zeros((vocab_size,emb_dim))
    for word,idx in word_to_idx.items():
        embedding_vector = embedding_index.get(word)
        
        if embedding_vector is not None:
            matrix[idx] = embedding_vector
            
    return matrix

In [None]:
embedding_matrix = get_embedding_matrix()
embedding_matrix.shape

# Model

In [None]:
input_img_features = Input(shape=(2048,))
inp_img1 = Dropout(0.5)(input_img_features)
inp_img2 = Dense(256,activation='relu')(inp_img1)

In [None]:
# Captions as Input
from keras.layers import Input, Embedding, Dropout, SimpleRNN
input_captions = Input(shape=(max_len,))
inp_cap1 = Embedding(input_dim=vocab_size,output_dim=200,mask_zero=True)(input_captions)
inp_cap2 = Dropout(0.5)(inp_cap1)
inp_cap3 = SimpleRNN(256)(inp_cap2)

In [None]:
decoder1 = add([inp_img2,inp_cap3])
decoder2 = Dense(256,activation='relu')(decoder1)
outputs = Dense(vocab_size,activation='softmax')(decoder2)

# Combined Model
model = Model(inputs=[input_img_features,input_captions],outputs=outputs)

In [None]:
model.summary()

In [None]:
# Embedding Layer most important
model.layers[2].set_weights([embedding_matrix])
model.layers[2].trainable = False

- categorical_crossentropy is used with large no. of classes

In [None]:
model.compile(
    loss='categorical_crossentropy',
    optimizer="adam",
    metrics=['accuracy'] )


In [None]:
# model training 
epochs = 30
batch_size = 32
steps = len(train_descriptions)//batch_size + 1


In [None]:
import pydot
import graphviz
import tensorflow as tf
tf.keras.utils.plot_model(model, show_shapes=True)

In [None]:
encoding_train['1032122270_ea6f0beedb']

In [None]:
from keras.metrics import CategoricalAccuracy
from nltk.translate.bleu_score import sentence_bleu

# Track metrics
train_loss = []
val_loss = []
train_accuracy = CategoricalAccuracy()
val_accuracy = CategoricalAccuracy()

# BLEU Score Calculation
def calculate_bleu(reference, prediction):
    reference = [ref.split() for ref in reference]
    prediction = prediction.split()
    bleu1 = sentence_bleu(reference, prediction, weights=(1, 0, 0, 0))
    bleu2 = sentence_bleu(reference, prediction, weights=(0.5, 0.5, 0, 0))
    bleu3 = sentence_bleu(reference, prediction, weights=(0.33, 0.33, 0.33, 0))
    bleu4 = sentence_bleu(reference, prediction, weights=(0.25, 0.25, 0.25, 0.25))
    return bleu1, bleu2, bleu3, bleu4


In [None]:
import wandb
from wandb.integration.keras import WandbCallback

wandb.login(key="269c10a0fe91233f6f807f246ffe2b0daa927a62") 
wandb.init(
    project="image-captioning",  # Tên dự án của bạn
    # name="captioning-model-run",  # Tên phiên bản (run)
    config={
        "epochs": epochs,
        "batch_size": batch_size,
        "optimizer": "adam",
        "loss": "categorical_crossentropy"
    }
)


In [None]:
from keras.preprocessing.sequence import pad_sequences
import numpy as np

def generate_caption(model, photo, word_to_idx, idx_to_word, max_len):
    """
    Generate a caption for a given image using the trained model.
    
    :param model: Trained model
    :param photo: Feature vector of the image
    :param word_to_idx: Mapping of words to their indices
    :param idx_to_word: Mapping of indices to their corresponding words
    :param max_len: Maximum length of the caption
    :return: Generated caption
    """
    in_text = "startseq"
    for _ in range(max_len):
        # Convert the caption into a sequence of indices
        sequence = [word_to_idx[word] for word in in_text.split() if word in word_to_idx]
        sequence = pad_sequences([sequence], maxlen=max_len, padding='post')
        
        # Predict the next word
        yhat = model.predict([photo.reshape(1, 2048), sequence], verbose=0)
        yhat = np.argmax(yhat)
        
        # Map the predicted index to the word
        word = idx_to_word.get(yhat)
        if word is None:
            break
        in_text += " " + word
        
        # Stop if "endseq" is predicted
        if word == "endseq":
            break
    return in_text


In [None]:
from nltk.translate.bleu_score import sentence_bleu

def calculate_bleu_scores(descriptions, encoding):
    bleu1, bleu2, bleu3, bleu4 = 0, 0, 0, 0
    total_samples = len(descriptions)

    for img_id, refs in descriptions.items():
        references = [ref.split() for ref in refs]
        predicted_caption = generate_caption(model, encoding[img_id], word_to_idx, idx_to_word, max_len).split()
        bleu1 += sentence_bleu(references, predicted_caption, weights=(1, 0, 0, 0))
        bleu2 += sentence_bleu(references, predicted_caption, weights=(0.5, 0.5, 0, 0))
        bleu3 += sentence_bleu(references, predicted_caption, weights=(0.33, 0.33, 0.33, 0))
        bleu4 += sentence_bleu(references, predicted_caption, weights=(0.25, 0.25, 0.25, 0.25))

    return bleu1 / total_samples, bleu2 / total_samples, bleu3 / total_samples, bleu4 / total_samples


In [None]:
def train():
    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}/{epochs}")
        
        # Training
        train_generator = data_generator(train_descriptions, encoding_train, word_to_idx, max_len, batch_size)
        train_steps = len(train_descriptions) // batch_size
        history = model.fit(
            train_generator,
            epochs=1,
            steps_per_epoch=train_steps,
            verbose=1
        )
        train_loss = history.history['loss'][-1]
        train_acc = history.history.get('accuracy', [0])[-1]

        # Validation
        val_generator = data_generator(val_descriptions, encoding_val, word_to_idx, max_len, batch_size)
        val_steps = len(val_descriptions) // batch_size
        val_loss, val_acc = model.evaluate(
            val_generator,
            steps=val_steps,
            verbose=1
        )

        # Log metrics to WandB
        wandb.log({
            "epoch": epoch + 1,
            "train_loss": train_loss,
            "train_accuracy": train_acc,
            "val_loss": val_loss,
            "val_accuracy": val_acc,
        })

        # Log metrics
        print(f"Epoch {epoch + 1}: Train Loss: {train_loss}, Train Accuracy: {train_acc}")
        print(f"Validation Loss: {val_loss}, Validation Accuracy: {val_acc}")

        # Save model weights
        model.save(f'./model_weights/model_epoch_{epoch+1}.h5')

In [None]:
import keras
import keras.utils
from keras import utils as np_utils
from keras.utils import to_categorical

In [None]:
#uncomment to train
train()

In [1]:
test_generator = data_generator(test_descriptions, encoding_test, word_to_idx, max_len, batch_size)
test_steps = len(test_descriptions) // batch_size
test_loss, test_acc = model.evaluate(
    test_generator,
    steps=test_steps,
    verbose=1
)
test_bleu1, test_bleu2, test_bleu3, test_bleu4 = calculate_bleu_scores(test_descriptions, encoding_test)

# Log test metrics to WandB
wandb.log({
    "test_loss": test_loss,
    "test_accuracy": test_acc,
    "test_bleu1": test_bleu1,
    "test_bleu2": test_bleu2,
    "test_bleu3": test_bleu3,
    "test_bleu4": test_bleu4,
})

print(f"Test Loss: {test_loss}, Test Accuracy: {test_acc}")
print(f"Test BLEU: BLEU-1: {test_bleu1}, BLEU-2: {test_bleu2}, BLEU-3: {test_bleu3}, BLEU-4: {test_bleu4}")

NameError: name 'data_generator' is not defined

## FINAL PREDICTIONS

In [None]:
def predict_caption(photo):
    # Initialize the caption with the starting token
    in_text = "startseq"
    for i in range(max_len):
        # Convert the current caption to a sequence of indices
        sequence = [word_to_idx[w] for w in in_text.split() if w in word_to_idx]
        
        # Pad the sequence to match the model's input shape
        sequence = pad_sequences([sequence], maxlen=max_len, padding='post')
        
        # Predict the next word
        ypred = model.predict([photo, sequence])
        ypred = ypred.argmax()  # Get the index of the word with the highest probability
        
        # Map the predicted index back to a word
        word = idx_to_word[ypred]
        
        # Break if the end token is generated
        if word == "endseq":
            break
        
        # Append the predicted word to the current caption
        in_text += ' ' + word
        
    # Remove start and end tokens and return the final caption
    final_caption = in_text.split()[1:]  # Exclude the start token
    return ' '.join(final_caption)


In [None]:
print("Max length used in training:", max_len)


In [None]:
desc['1003163366_44323f5815.jpg']

In [None]:
# Pick Some Random Images and See Results
plt.style.use("seaborn")
for i in range(10):
    idx = np.random.randint(0,1000)
    all_img_names = list(encoding_test.keys())
    img_name = all_img_names[idx]
    photo_2048 = encoding_test[img_name].reshape((1,2048))
    
    i = plt.imread("/kaggle/input/flickr8k/Images/"+img_name+".jpg")
    
    caption = predict_caption(photo_2048)

    image = plt.imread(f"/kaggle/input/flickr8k/Images/{img_name}.jpg")
    # Log to wandb
    wandb.log({
        "Generated Captions": wandb.Image(image, caption=caption)}
    )
    
    plt.title(caption)
    plt.imshow(i)
    plt.axis("off")
    plt.show()
    

In [None]:
len(encoding_test)