## 1. SETUP AND IMPORTS

In [None]:
import os
import pickle
import numpy as np
import pandas as pd
from PIL import Image
import re
import pandas
import time
from IPython.display import display
import string
from tqdm import tqdm
import matplotlib.pyplot as plt
import matplotlib.image as mpimg

import tensorflow as tf
from sklearn.model_selection import train_test_split
from tensorflow.keras.applications.vgg16 import VGG16, preprocess_input
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.utils import to_categorical, plot_model
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, LSTM, Embedding, Dropout, add, LeakyReLU
from tensorflow.keras.models import load_model
from nltk.translate.bleu_score import corpus_bleu

print(f"TensorFlow Version: {tf.__version__}")

In [None]:
BASE_DIR = "/kaggle/input/flickr8k"
WORKING_DIR = "/kaggle/working/"
STARTSEQ = "<start>"
ENDSEQ = "<end>"
CAPTIONS_FILE = os.path.join(BASE_DIR, 'captions.txt')
IMAGES_DIR = os.path.join(BASE_DIR, 'Images')
FEATURES_FILE = os.path.join(WORKING_DIR, "features_vgg.pkl")
TOKENIZER_FILE =  os.path.join(WORKING_DIR, "tokenizer.pkl")
MODEL_FILE = os.path.join(WORKING_DIR, "best_model.keras")

## 2. EXTRACT FEATURES

In [None]:
vgg = VGG16()
features_extractor = Model(inputs=vgg.inputs, outputs=vgg.layers[-2].output)
#plot_model(vgg, show_shapes=True)

In [None]:
features = {}

if os.path.exists(FEATURES_FILE) :
    print("Features already exists, loading features... ")
    with open(FEATURES_FILE, 'rb') as file:
        features = pickle.load(file)
else:
    print("Extracting features...")
    for img_name in tqdm(os.listdir(IMAGES_DIR)):
        img_path = os.path.join(IMAGES_DIR, img_name)
        img = load_img(img_path, target_size=(224, 224))
        img = img_to_array(img)
        img = img.reshape((1, img.shape[0], img.shape[1], img.shape[2]))
        img = preprocess_input(img)
        feature = features_extractor.predict(img, verbose=0)
        img_id = img_name.split('.')[0]
        features[img_id] = feature
    
    print(f"Done. Extracted features from {len(features)} images ")
    with open(FEATURES_FILE, 'wb') as file:
        print(f"Saving features to features_vgg.pkl")
        pickle.dump(features, file)

## 3. LOAD AND PROCESS CAPTIONS

In [None]:
df = pd.read_csv(CAPTIONS_FILE)
print(df.head())

In [None]:
import re

def clean_caption(caption: str) -> str:
    # Convert to lowercase
    caption = caption.lower()
    # Remove special characters (keep only letters and spaces)
    caption = re.sub(r"[^a-z\s]", "", caption)
    # Replace multiple spaces with one space
    caption = re.sub(r"\s+", " ", caption)
    # Add <start> and <end> tokens
    caption = f"{STARTSEQ} {' '.join([word for word in caption.split() if len(word) > 1])} {ENDSEQ}"
    return caption

mapping = {}
for _, row in tqdm(df.iterrows(), 'Processing and mapping captions'):
    image_id = row["image"].split('.')[0]
    caption = row["caption"]
    
    cleaned_caption = clean_caption(caption)
    
    if image_id not in mapping:
        mapping[image_id] = []
    mapping[image_id].append(cleaned_caption)

print(f"Mapped captions for {len(mapping)} images ")

In [None]:
print(f"Cpations of 1000268201_693b08cb0e.jpg")
for c in mapping['1000268201_693b08cb0e']:
    print(f"- {c}")

In [None]:
all_captions = [caption for captions in mapping.values() for caption in captions]
print(f"Loaded {len(all_captions)} captions")

## 4. TOKENIZATION

In [None]:
tokenizer = Tokenizer()
tokenizer.fit_on_texts(all_captions)
vocab_size = len(tokenizer.word_index) + 1
print(f"We have {vocab_size} vocabularies in captions")

In [None]:
max_length = max([len(caption) for caption in all_captions])
print(f"Max length in captions is {max_length}")

## 5. TRAIN TEST SPLIT

In [None]:
img_ids = list(mapping.keys())
split_size = int(len(img_ids) * 0.95)
train, test = img_ids[:split_size], img_ids[split_size:]

len(train), len(test)

## 6. DATA GENERATOR

In [None]:
def data_generator(data_keys, mapping, features, max_length, vocab_size, batch_size):
    # Initialize lists to hold batches of data
    X1_batch, X2_batch, y_batch = [], [], []
    n = 0  # Counter for the number of images processed in the current batch

    # Infinite loop to generate data batches
    while True:
        # Iterate over the image IDs in the current epoch
        for key in data_keys:
            # Increment image counter
            n += 1
            # Get all captions for the current image
            captions = mapping[key]

            # Iterate over each caption for the current image
            for caption in captions:
                # Convert caption to a sequence of token IDs
                seq = tokenizer.texts_to_sequences([caption])[0]

                # Create input-output pairs from the sequence
                # For a caption "start word1 word2 end", the pairs are:
                # (start, word1), (start word1, word2), (start word1 word2, end)
                for i in range(1, len(seq)):
                    in_seq, out_seq = seq[:i], seq[i]

                    # Pad the input sequence to the maximum length
                    in_seq = pad_sequences([in_seq], maxlen=max_length, padding='post')[0]

                    # One-hot encode the output sequence (the next word)
                    out_seq = to_categorical([out_seq], num_classes=vocab_size)[0]

                    # Append the generated pair to the batch lists
                    X1_batch.append(features[key][0])  # Image features for the current image
                    X2_batch.append(in_seq)            # Padded input sequence
                    y_batch.append(out_seq)            # One-hot encoded output word

            # If the batch size is reached, yield the current batch
            if n == batch_size:
                # Convert lists to NumPy arrays
                X1_batch_np = np.array(X1_batch, dtype=np.float32)
                X2_batch_np = np.array(X2_batch, dtype=np.int32)
                y_batch_np = np.array(y_batch, dtype=np.float32)

                # Yield the batch as a tuple of (inputs, outputs)
                # Inputs are a dictionary matching the model's input layer names
                yield {"image_features": tf.constant(X1_batch_np),
                       "caption_input": tf.constant(X2_batch_np)}, tf.constant(y_batch_np)

                # Clear the batch lists for the next batch
                X1_batch, X2_batch, y_batch = [], [], []
                n = 0

        # After iterating through all images, if there are remaining samples, yield a partial batch
        if n > 0:
            X1_batch_np = np.array(X1_batch, dtype=np.float32)
            X2_batch_np = np.array(X2_batch, dtype=np.int32)
            y_batch_np = np.array(y_batch, dtype=np.float32)

            yield {"image_features": tf.constant(X1_batch_np),
                   "caption_input": tf.constant(X2_batch_np)}, tf.constant(y_batch_np)

            X1_batch, X2_batch, y_batch = [], [], []
            n = 0

## 7. MODEL ARCHITECTURE

In [None]:
inputs1 = Input(shape=(4096,), name="image_features")
fe1 = Dropout(0.3)(inputs1)
fe2 = Dense(256)(fe1)
fe2 = LeakyReLU(negative_slope=0.1)(fe2)

# Sequence model
inputs2 = Input(shape=(max_length,), name="caption_input")
se1 = Embedding(vocab_size, 256, mask_zero=True)(inputs2)
se2 = Dropout(0.3)(se1)
se3 = LSTM(256)(se2)

# Fusion des deux branches (image + texte)
decoder1 = add([fe2, se3])
decoder2 = Dense(256)(decoder1)
decoder2 = LeakyReLU(negative_slope=0.1)(decoder2)
outputs = Dense(vocab_size, activation='softmax')(decoder2)

# Compilation et visualisation
model = Model(inputs=[inputs1, inputs2], outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer='adam')
plot_model(model,to_file='model.png',show_shapes=True,show_dtype=True,
           show_layer_activations=False,show_trainable=True)

## 8. TRAIN THE MODEL

In [None]:
if not os.path.exists(MODEL_FILE):
    epochs = 64
    batch_size = 32
    steps = len(train) // batch_size
    
    output_signature = (
        {
            "image_features": tf.TensorSpec(shape=(None, 4096), dtype=tf.float32),
            "caption_input": tf.TensorSpec(shape=(None, max_length), dtype=tf.int32)
        },
        tf.TensorSpec(shape=(None, vocab_size), dtype=tf.float32)
    )
    
    dataset = tf.data.Dataset.from_generator(
        lambda: data_generator(
            data_keys=train,
            mapping=mapping,
            features=features,
            max_length=max_length,
            vocab_size=vocab_size,
            batch_size=batch_size
        ),
        output_signature=output_signature
    )
    
    # Train the model using the dataset
    history = model.fit(
        dataset,
        epochs=epochs,
        steps_per_epoch=steps,
        verbose=1
    )
    
    model.save(MODEL_FILE)

    plt.plot(history.history['loss'], label='Training loss')
    
    plt.title('Model Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.show()

## 9. EVALUATE DU MODEL

In [None]:
with open(FEATURES_FILE, 'rb') as file:
    features = pickle.load(file)
model = load_model(MODEL_FILE)

In [None]:
def idx_to_word(integer, tokenizer):
    for word, index in tokenizer.word_index.items():
        if index == integer:
            return word

    return None

def predict_caption(model, image, tokenizer, max_length):    
    in_text = STARTSEQ
    for i in range(max_length):
        seq = tokenizer.texts_to_sequences([in_text])[0]
        seq = pad_sequences([seq], max_length, padding='post')
        yhat = model.predict([image, seq], verbose=0)
        yhat = np.argmax(yhat)
        word = idx_to_word(yhat, tokenizer)
        if word is None or word == ENDSEQ or word == "end":
            break
        in_text += " " + word
    return in_text[8:] # remove "<start> "

In [None]:
actual, predicted = [], []

for key in tqdm(test):
    captions = mapping[key]
    y_pred=predict_caption(model, features[key], tokenizer, max_length)
    actual_captions = [caption.split() for caption in captions]
    y_pred=y_pred.split()
    actual.append(actual_captions)
    predicted.append(y_pred)

print(f"BLEU-1: {corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0))}")
print(f"BLEU-2: {corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0))}")

## 10. TEST ET VISUALIZE RESULTS

In [None]:
import random

image_name = random.choice(test) + ".jpg"
image_id = image_name.split('.')[0]
image_path = os.path.join(IMAGES_DIR, image_name)
image = Image.open(image_path)
captions = mapping[image_id]

print("================ Actual Captions ===================")
for caption in captions:
    print(caption[8:-6])

y_pred = predict_caption(model, features[image_id], tokenizer, max_length)
print("================ Predicted Captions ===================")

print(y_pred)
plt.imshow(image)