Downloading Dataset from Kaggle

In [3]:
!pip install kaggle




In [4]:
!pip install tensorflow keras




In [5]:
from google.colab import files
files.upload()  # This will prompt you to upload the kaggle.json file


Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"sunilgiri94","key":"d595113ae2d8f742de0978209d752b5e"}'}

In [6]:
import os
os.makedirs('/root/.kaggle', exist_ok=True)
os.rename('kaggle.json', '/root/.kaggle/kaggle.json')


In [7]:
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip
!wget -q https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip
!unzip -qq Flickr8k_Dataset.zip
!unzip -qq Flickr8k_text.zip
!rm Flickr8k_Dataset.zip Flickr8k_text.zip


Setup

In [10]:
import os
# Set the Keras backend to TensorFlow
# This ensures that Keras will use TensorFlow as the backend for computations
os.environ["KERAS_BACKEND"] = "tensorflow"

# Import necessary libraries
import re  # Import regular expressions library for text processing
import numpy as np  # Import numpy for numerical operations and working with arrays
import matplotlib.pyplot as plt  # Import matplotlib for plotting graphs and visualizations

# Import TensorFlow and Keras libraries
import tensorflow as tf  # Import TensorFlow, the framework for deep learning
import keras  # Import Keras, a high-level neural networks API that runs on top of TensorFlow
from keras import layers  # Import Keras layers module, which provides different types of layers for building models
from keras.applications import efficientnet  # Import EfficientNet pre-trained models for transfer learning
from keras.layers import TextVectorization  # Import the TextVectorization layer for text preprocessing

# Set a random seed for reproducibility of results
# This ensures that any random operations (like weight initialization) are the same each time you run the code
keras.utils.set_random_seed(111)  # Set the random seed to 111 (or any integer value)


In [11]:
# Path to the images
IMAGES_PATH = "Flicker8k_Dataset"

# Desired image dimensions
IMAGE_SIZE = (299, 299)

# Vocabulary size
VOCAB_SIZE = 10000

# Fixed length allowed for any sequence
SEQ_LENGTH = 25

# Dimension for the image embeddings and token embeddings
EMBED_DIM = 512

# Per-layer units in the feed-forward network
FF_DIM = 512

# Other training parameters
BATCH_SIZE = 64
EPOCHS = 30
AUTOTUNE = tf.data.AUTOTUNE

Preparing the Dataset


In [12]:
ls


CrowdFlowerAnnotations.txt  Flickr_8k.devImages.txt   Flickr8k.token.txt         readme.txt
ExpertAnnotations.txt       Flickr8k.lemma.token.txt  Flickr_8k.trainImages.txt  [0m[01;34msample_data[0m/
[01;34mFlicker8k_Dataset[0m/          Flickr_8k.testImages.txt  [01;34m__MACOSX[0m/


In [13]:
from itertools import islice

In [14]:
def load_captions_data(filename):
    """Loads captions (text) data and maps them to corresponding images.

    Args:
        filename: Path to the text file containing caption data.

    Returns:
        caption_mapping: Dictionary mapping image names and the corresponding captions
        text_data: List containing all the available captions
    """

    # Open the caption data file
    with open(filename) as caption_file:
        caption_data = caption_file.readlines()  # Read all lines from the file
        caption_mapping = {}  # Dictionary to map image names to captions
        text_data = []  # List to store all captions
        images_to_skip = set()  # Set to track images that should be skipped

        # Iterate over each line in the caption data
        for line in caption_data:
            line = line.rstrip("\n")  # Remove any newline characters from the end of the line
            # Image name and captions are separated by a tab
            img_name, caption = line.split("\t")

            # Remove the caption number suffix (e.g., #1, #2, etc.)
            img_name = img_name.split("#")[0]
            img_name = os.path.join(IMAGES_PATH, img_name.strip())  # Add the image path prefix

            # Tokenize the caption and check its length
            tokens = caption.strip().split()

            # Skip captions that are too short or too long
            if len(tokens) < 5 or len(tokens) > SEQ_LENGTH:
                images_to_skip.add(img_name)  # Mark this image as to be skipped
                continue

            # Ensure the image name ends with ".jpg" and hasn't been skipped
            if img_name.endswith("jpg") and img_name not in images_to_skip:
                # Add <start> and <end> tokens around the caption
                caption = "<start> " + caption.strip() + " <end>"
                text_data.append(caption)  # Add the caption to the list

                # Map the image to its corresponding captions
                if img_name in caption_mapping:
                    caption_mapping[img_name].append(caption)
                else:
                    caption_mapping[img_name] = [caption]

        # Remove images marked for skipping from the caption_mapping
        for img_name in images_to_skip:
            if img_name in caption_mapping:
                del caption_mapping[img_name]

        # Return the caption mapping and list of all captions
        return caption_mapping, text_data


def train_val_split(caption_data, train_size=0.8, shuffle=True):
    """Split the captioning dataset into train and validation sets.

    Args:
        caption_data (dict): Dictionary containing the mapped caption data
        train_size (float): Fraction of all the full dataset to use as training data
        shuffle (bool): Whether to shuffle the dataset before splitting

    Returns:
        Traning and validation datasets as two separated dicts
    """

    # 1. Get the list of all image names from the caption data dictionary
    all_images = list(caption_data.keys())

    # 2. Shuffle if necessary
    if shuffle:
        np.random.shuffle(all_images)  # Shuffle the list of images to randomize

    # 3. Split into training and validation sets based on the train_size
    train_size = int(len(caption_data) * train_size)  # Calculate the index for the training set

    # Create the training dataset with images and their corresponding captions
    training_data = {
        img_name: caption_data[img_name] for img_name in all_images[:train_size]
    }

    # Create the validation dataset with the remaining images and their captions
    validation_data = {
        img_name: caption_data[img_name] for img_name in all_images[train_size:]
    }

    # 4. Return the training and validation datasets
    return training_data, validation_data


# Load the dataset from the provided caption file
captions_mapping, text_data = load_captions_data("Flickr8k.token.txt")

# Split the dataset into training and validation sets
train_data, valid_data = train_val_split(captions_mapping)

# Print out the number of training and validation samples
print("Number of training samples: ", len(train_data))
print("Number of validation samples: ", len(valid_data))

Number of training samples:  6114
Number of validation samples:  1529


Vectorizing the text data

In [15]:
def custom_standardization(input_string):
    """
    Custom standardization function to preprocess text data.

    Args:
        input_string: Input text string to standardize.

    Returns:
        Standardized string with characters stripped as defined by 'strip_chars'.
    """
    # Convert the input string to lowercase
    lowercase = tf.strings.lower(input_string)
    # Remove specified characters using a regex
    return tf.strings.regex_replace(lowercase, "[%s]" % re.escape(strip_chars), "")


# Characters to strip from the text during preprocessing
strip_chars = "!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~"

# Ensure the < and > tokens are preserved (used as start and end tokens for captions)
strip_chars = strip_chars.replace("<", "")
strip_chars = strip_chars.replace(">", "")

# Initialize a TextVectorization layer to preprocess and vectorize text data
vectorization = TextVectorization(
    max_tokens=VOCAB_SIZE,  # Maximum number of unique tokens in the vocabulary
    output_mode="int",  # Output tokens as integers
    output_sequence_length=SEQ_LENGTH,  # Length of each output sequence
    standardize=custom_standardization,  # Apply the custom standardization function
)

# Adapt the TextVectorization layer to the dataset (learn vocabulary and tokenization)
vectorization.adapt(text_data)

# Define a Sequential model for data augmentation of image data
image_augmentation = keras.Sequential(
    [
        # Randomly flip images horizontally to augment training data
        layers.RandomFlip("horizontal"),
        # Apply a random rotation to the images (up to 20% of 360°)
        layers.RandomRotation(0.2),
        # Adjust the contrast of the images randomly within 30% of the original
        layers.RandomContrast(0.3),
    ]
)


Building a tf.data.Dataset pipeline for training

In [16]:
def decode_and_resize(img_path):
    """
    Decodes an image from its file path, resizes it, and normalizes pixel values.

    Args:
        img_path (str): File path of the image.

    Returns:
        tf.Tensor: A resized and normalized image tensor.
    """
    img = tf.io.read_file(img_path)  # Read the image file from the provided path
    img = tf.image.decode_jpeg(img, channels=3)  # Decode the image as a JPEG with 3 color channels (RGB)
    img = tf.image.resize(img, IMAGE_SIZE)  # Resize the image to the desired dimensions (IMAGE_SIZE)
    img = tf.image.convert_image_dtype(img, tf.float32)  # Normalize pixel values to the range [0, 1]
    return img  # Return the processed image tensor


def process_input(img_path, captions):
    """
    Processes input by decoding and resizing the image and vectorizing the captions.

    Args:
        img_path (str): File path of the image.
        captions (str): Corresponding caption text.

    Returns:
        tuple: A tuple containing the processed image tensor and vectorized captions.
    """
    return decode_and_resize(img_path), vectorization(captions)  # Process the image and vectorize the caption


def make_dataset(images, captions):
    """
    Creates a TensorFlow dataset from images and captions, with preprocessing, batching, and optimization.

    Args:
        images (list): List of image file paths.
        captions (list): List of corresponding captions.

    Returns:
        tf.data.Dataset: A preprocessed and batched dataset ready for training.
    """
    dataset = tf.data.Dataset.from_tensor_slices((images, captions))  # Create a dataset from the image-caption pairs
    dataset = dataset.shuffle(BATCH_SIZE * 8)  # Shuffle the dataset with a buffer size for randomness
    dataset = dataset.map(process_input, num_parallel_calls=AUTOTUNE)  # Process the dataset elements in parallel
    dataset = dataset.batch(BATCH_SIZE).prefetch(AUTOTUNE)  # Batch the dataset and prefetch for performance

    return dataset  # Return the prepared dataset


# Pass the list of images and the list of corresponding captions to create datasets
train_dataset = make_dataset(list(train_data.keys()), list(train_data.values()))  # Create the training dataset
valid_dataset = make_dataset(list(valid_data.keys()), list(valid_data.values()))  # Create the validation dataset


Building the model

Our image captioning architecture consists of three models:

A CNN: used to extract the image features
A TransformerEncoder: The extracted image features are then passed to a Transformer based encoder that generates a new representation of the inputs
A TransformerDecoder: This model takes the encoder output and the text data (sequences) as inputs and tries to learn to generate the caption.

In [17]:
def get_cnn_model():
    # Create a CNN feature extractor based on EfficientNetB0
    base_model = efficientnet.EfficientNetB0(
        input_shape=(*IMAGE_SIZE, 3),
        include_top=False,  # Exclude the final classification head
        weights="imagenet",  # Use pre-trained ImageNet weights
    )
    base_model.trainable = False  # Freeze the base model weights
    base_model_out = base_model.output
    base_model_out = layers.Reshape((-1, base_model_out.shape[-1]))(base_model_out)  # Reshape output for downstream processing
    cnn_model = keras.models.Model(base_model.input, base_model_out)
    return cnn_model


class TransformerEncoderBlock(layers.Layer):
    # A single transformer encoder block with multi-head attention and dense layer
    def __init__(self, embed_dim, dense_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.dense_dim = dense_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.0
        )
        self.layernorm_1 = layers.LayerNormalization()  # Normalize input to stabilize training
        self.layernorm_2 = layers.LayerNormalization()
        self.dense_1 = layers.Dense(embed_dim, activation="relu")  # Fully connected layer

    def call(self, inputs, training, mask=None):
        inputs = self.layernorm_1(inputs)  # Apply layer normalization
        inputs = self.dense_1(inputs)  # Apply dense layer
        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=None,  # No attention mask applied here
            training=training,
        )
        out_1 = self.layernorm_2(inputs + attention_output_1)  # Add residual connection
        return out_1


class PositionalEmbedding(layers.Layer):
    # Layer for token and positional embeddings
    def __init__(self, sequence_length, vocab_size, embed_dim, **kwargs):
        super().__init__(**kwargs)
        self.token_embeddings = layers.Embedding(
            input_dim=vocab_size, output_dim=embed_dim
        )
        self.position_embeddings = layers.Embedding(
            input_dim=sequence_length, output_dim=embed_dim
        )
        self.sequence_length = sequence_length
        self.vocab_size = vocab_size
        self.embed_dim = embed_dim
        self.embed_scale = tf.math.sqrt(tf.cast(embed_dim, tf.float32))  # Scaling factor for embeddings

    def call(self, inputs):
        length = tf.shape(inputs)[-1]
        positions = tf.range(start=0, limit=length, delta=1)  # Create positional indices
        embedded_tokens = self.token_embeddings(inputs) * self.embed_scale  # Scale token embeddings
        embedded_positions = self.position_embeddings(positions)  # Add positional embeddings
        return embedded_tokens + embedded_positions

    def compute_mask(self, inputs, mask=None):
        return tf.math.not_equal(inputs, 0)  # Compute mask for padding tokens


class TransformerDecoderBlock(layers.Layer):
    # A single transformer decoder block with multi-head attention and feedforward layers
    def __init__(self, embed_dim, ff_dim, num_heads, **kwargs):
        super().__init__(**kwargs)
        self.embed_dim = embed_dim
        self.ff_dim = ff_dim
        self.num_heads = num_heads
        self.attention_1 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.1
        )
        self.attention_2 = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.1
        )
        self.ffn_layer_1 = layers.Dense(ff_dim, activation="relu")  # Feedforward network
        self.ffn_layer_2 = layers.Dense(embed_dim)

        self.layernorm_1 = layers.LayerNormalization()  # Normalization layers for stability
        self.layernorm_2 = layers.LayerNormalization()
        self.layernorm_3 = layers.LayerNormalization()

        self.embedding = PositionalEmbedding(
            embed_dim=EMBED_DIM,
            sequence_length=SEQ_LENGTH,
            vocab_size=VOCAB_SIZE,
        )
        self.out = layers.Dense(VOCAB_SIZE, activation="softmax")  # Final output layer

        self.dropout_1 = layers.Dropout(0.3)  # Dropout for regularization
        self.dropout_2 = layers.Dropout(0.5)
        self.supports_masking = True

    def call(self, inputs, encoder_outputs, training, mask=None):
        inputs = self.embedding(inputs)  # Add embeddings
        causal_mask = self.get_causal_attention_mask(inputs)  # Generate causal mask for autoregressive decoding

        if mask is not None:
            padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)  # Padding mask
            combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
            combined_mask = tf.minimum(combined_mask, causal_mask)  # Combine padding and causal masks

        attention_output_1 = self.attention_1(
            query=inputs,
            value=inputs,
            key=inputs,
            attention_mask=combined_mask,
            training=training,
        )
        out_1 = self.layernorm_1(inputs + attention_output_1)  # Add residual connection

        attention_output_2 = self.attention_2(
            query=out_1,
            value=encoder_outputs,
            key=encoder_outputs,
            attention_mask=padding_mask,
            training=training,
        )
        out_2 = self.layernorm_2(out_1 + attention_output_2)  # Add residual connection

        ffn_out = self.ffn_layer_1(out_2)  # Pass through feedforward layers
        ffn_out = self.dropout_1(ffn_out, training=training)
        ffn_out = self.ffn_layer_2(ffn_out)

        ffn_out = self.layernorm_3(ffn_out + out_2, training=training)  # Add residual connection
        ffn_out = self.dropout_2(ffn_out, training=training)
        preds = self.out(ffn_out)  # Compute final predictions
        return preds

    def get_causal_attention_mask(self, inputs):
        # Generate a causal mask to prevent attending to future tokens
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")  # Generate lower triangular mask
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        mult = tf.concat(
            [
                tf.expand_dims(batch_size, -1),
                tf.constant([1, 1], dtype=tf.int32),
            ],
            axis=0,
        )
        return tf.tile(mask, mult)  # Tile mask across batch dimension


class ImageCaptioningModel(keras.Model):
    # Complete image captioning model
    def __init__(
        self,
        cnn_model,
        encoder,
        decoder,
        num_captions_per_image=5,
        image_aug=None,
    ):
        super().__init__()
        self.cnn_model = cnn_model  # CNN for extracting image features
        self.encoder = encoder  # Transformer encoder
        self.decoder = decoder  # Transformer decoder
        self.loss_tracker = keras.metrics.Mean(name="loss")
        self.acc_tracker = keras.metrics.Mean(name="accuracy")
        self.num_captions_per_image = num_captions_per_image
        self.image_aug = image_aug

    def calculate_loss(self, y_true, y_pred, mask):
        # Calculate loss while considering padding mask
        loss = self.loss(y_true, y_pred)
        mask = tf.cast(mask, dtype=loss.dtype)
        loss *= mask
        return tf.reduce_sum(loss) / tf.reduce_sum(mask)

    def calculate_accuracy(self, y_true, y_pred, mask):
        # Calculate accuracy while considering padding mask
        accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))
        accuracy = tf.math.logical_and(mask, accuracy)
        accuracy = tf.cast(accuracy, dtype=tf.float32)
        mask = tf.cast(mask, dtype=tf.float32)
        return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)

    def _compute_caption_loss_and_acc(self, img_embed, batch_seq, training=True):
        # Compute loss and accuracy for a single batch of captions
        encoder_out = self.encoder(img_embed, training=training)
        batch_seq_inp = batch_seq[:, :-1]
        batch_seq_true = batch_seq[:, 1:]
        mask = tf.math.not_equal(batch_seq_true, 0)  # Mask for padding tokens
        batch_seq_pred = self.decoder(
            batch_seq_inp, encoder_out, training=training, mask=mask
        )
        loss = self.calculate_loss(batch_seq_true, batch_seq_pred, mask)
        acc = self.calculate_accuracy(batch_seq_true, batch_seq_pred, mask)
        return loss, acc

    def train_step(self, batch_data):
        # Training step for the model
        batch_img, batch_seq = batch_data
        batch_loss = 0
        batch_acc = 0

        if self.image_aug:
            batch_img = self.image_aug(batch_img)  # Apply data augmentation to images

        img_embed = self.cnn_model(batch_img)  # Extract image embeddings

        for i in range(self.num_captions_per_image):
            with tf.GradientTape() as tape:
                loss, acc = self._compute_caption_loss_and_acc(
                    img_embed, batch_seq[:, i, :], training=True
                )

                batch_loss += loss
                batch_acc += acc

            train_vars = (
                self.encoder.trainable_variables + self.decoder.trainable_variables
            )
            grads = tape.gradient(loss, train_vars)  # Compute gradients
            self.optimizer.apply_gradients(zip(grads, train_vars))  # Update weights

        batch_acc /= float(self.num_captions_per_image)  # Average accuracy across captions
        self.loss_tracker.update_state(batch_loss)
        self.acc_tracker.update_state(batch_acc)

        return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

    def test_step(self, batch_data):
        # Validation step for the model
        batch_img, batch_seq = batch_data
        batch_loss = 0
        batch_acc = 0

        img_embed = self.cnn_model(batch_img)

        for i in range(self.num_captions_per_image):
            loss, acc = self._compute_caption_loss_and_acc(
                img_embed, batch_seq[:, i, :], training=False
            )

            batch_loss += loss
            batch_acc += acc

        batch_acc /= float(self.num_captions_per_image)
        self.loss_tracker.update_state(batch_loss)
        self.acc_tracker.update_state(batch_acc)

        return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

    @property
    def metrics(self):
        # Define metrics for resetting states
        return [self.loss_tracker, self.acc_tracker]


cnn_model = get_cnn_model()  # Initialize the CNN model
encoder = TransformerEncoderBlock(embed_dim=EMBED_DIM, dense_dim=FF_DIM, num_heads=1)  # Transformer encoder
decoder = TransformerDecoderBlock(embed_dim=EMBED_DIM, ff_dim=FF_DIM, num_heads=2)  # Transformer decoder
caption_model = ImageCaptioningModel(
    cnn_model=cnn_model,
    encoder=encoder,
    decoder=decoder,
    image_aug=image_augmentation,  # Optional data augmentation
)


Downloading data from https://storage.googleapis.com/keras-applications/efficientnetb0_notop.h5
[1m16705208/16705208[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


Model training

In [None]:
# Define the loss function
cross_entropy = keras.losses.SparseCategoricalCrossentropy(
    from_logits=False,  # The outputs are probability distributions, not logits
    reduction=None,  # No automatic reduction; the model handles it
)

# EarlyStopping criteria
early_stopping = keras.callbacks.EarlyStopping(
    patience=3,  # Stop training if validation loss doesn't improve for 3 epochs
    restore_best_weights=True,  # Restore the weights of the best epoch
)

# Learning Rate Scheduler for the optimizer
class LRSchedule(keras.optimizers.schedules.LearningRateSchedule):
    # Custom learning rate schedule with warmup steps
    def __init__(self, post_warmup_learning_rate, warmup_steps):
        super().__init__()
        self.post_warmup_learning_rate = post_warmup_learning_rate  # Final learning rate
        self.warmup_steps = warmup_steps  # Number of warmup steps

    def __call__(self, step):
        # Calculate the learning rate based on the current step
        global_step = tf.cast(step, tf.float32)  # Convert step to float for calculations
        warmup_steps = tf.cast(self.warmup_steps, tf.float32)  # Cast warmup steps to float
        warmup_progress = global_step / warmup_steps  # Fraction of warmup steps completed
        warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress  # Linear warmup
        return tf.cond(
            global_step < warmup_steps,  # If within warmup period
            lambda: warmup_learning_rate,  # Use warmup learning rate
            lambda: self.post_warmup_learning_rate,  # Otherwise, use final learning rate
        )

# Create a learning rate schedule
num_train_steps = len(train_dataset) * EPOCHS  # Total number of training steps
num_warmup_steps = num_train_steps // 15  # Define warmup period as 1/15th of total steps
lr_schedule = LRSchedule(
    post_warmup_learning_rate=1e-4,  # Final learning rate after warmup
    warmup_steps=num_warmup_steps,  # Number of warmup steps
)

# Compile the model
caption_model.compile(
    optimizer=keras.optimizers.Adam(lr_schedule),  # Adam optimizer with learning rate schedule
    loss=cross_entropy,  # Sparse categorical cross-entropy loss
)

# Fit the model
caption_model.fit(
    train_dataset,  # Training dataset
    epochs=EPOCHS,  # Number of training epochs
    validation_data=valid_dataset,  # Validation dataset for monitoring performance
    callbacks=[early_stopping],  # Use early stopping to prevent overfitting
)

Epoch 1/30




[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m149s[0m 1s/step - acc: 0.1343 - loss: 35.2315 - val_acc: 0.3081 - val_loss: 20.4750
Epoch 2/30
[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m126s[0m 973ms/step - acc: 0.3182 - loss: 20.0208 - val_acc: 0.3495 - val_loss: 18.0328
Epoch 3/30
[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m151s[0m 1s/step - acc: 0.3513 - loss: 17.7896 - val_acc: 0.3670 - val_loss: 16.9502
Epoch 4/30
[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 1s/step - acc: 0.3702 - loss: 16.6421 - val_acc: 0.3781 - val_loss: 16.3327
Epoch 5/30
[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m133s[0m 969ms/step - acc: 0.3862 - loss: 15.7831 - val_acc: 0.3887 - val_loss: 15.8738
Epoch 6/30
[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m151s[0m 1s/step - acc: 0.3986 - loss: 15.1034 - val_acc: 0.3947 - val_loss: 15.5955
Epoch 7/30
[1m96/96[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m142s[0m 1s/step - 

Check sample predictions

In [None]:
# Define the loss function
cross_entropy = keras.losses.SparseCategoricalCrossentropy(
    from_logits=False,  # The outputs are probability distributions, not logits
    reduction=None,  # No automatic reduction; the model handles it
)

# EarlyStopping criteria
early_stopping = keras.callbacks.EarlyStopping(
    patience=3,  # Stop training if validation loss doesn't improve for 3 epochs
    restore_best_weights=True,  # Restore the weights of the best epoch
)

# Learning Rate Scheduler for the optimizer
class LRSchedule(keras.optimizers.schedules.LearningRateSchedule):
    # Custom learning rate schedule with warmup steps
    def __init__(self, post_warmup_learning_rate, warmup_steps):
        super().__init__()
        self.post_warmup_learning_rate = post_warmup_learning_rate  # Final learning rate
        self.warmup_steps = warmup_steps  # Number of warmup steps

    def __call__(self, step):
        # Calculate the learning rate based on the current step
        global_step = tf.cast(step, tf.float32)  # Convert step to float for calculations
        warmup_steps = tf.cast(self.warmup_steps, tf.float32)  # Cast warmup steps to float
        warmup_progress = global_step / warmup_steps  # Fraction of warmup steps completed
        warmup_learning_rate = self.post_warmup_learning_rate * warmup_progress  # Linear warmup
        return tf.cond(
            global_step < warmup_steps,  # If within warmup period
            lambda: warmup_learning_rate,  # Use warmup learning rate
            lambda: self.post_warmup_learning_rate,  # Otherwise, use final learning rate
        )

# Create a learning rate schedule
num_train_steps = len(train_dataset) * EPOCHS  # Total number of training steps
num_warmup_steps = num_train_steps // 15  # Define warmup period as 1/15th of total steps
lr_schedule = LRSchedule(
    post_warmup_learning_rate=1e-4,  # Final learning rate after warmup
    warmup_steps=num_warmup_steps,  # Number of warmup steps
)

# Compile the model
caption_model.compile(
    optimizer=keras.optimizers.Adam(lr_schedule),  # Adam optimizer with learning rate schedule
    loss=cross_entropy,  # Sparse categorical cross-entropy loss
)

# Fit the model
caption_model.fit(
    train_dataset,  # Training dataset
    epochs=EPOCHS,  # Number of training epochs
    validation_data=valid_dataset,  # Validation dataset for monitoring performance
    callbacks=[early_stopping],  # Use early stopping to prevent overfitting
)

# Vocabulary and sentence decoding setup
vocab = vectorization.get_vocabulary()  # Retrieve vocabulary from the vectorizer
index_lookup = dict(zip(range(len(vocab)), vocab))  # Map indices to tokens
max_decoded_sentence_length = SEQ_LENGTH - 1  # Maximum caption length without start/end tokens
valid_images = list(valid_data.keys())  # List of validation image paths

# Function to generate captions for random validation images
def generate_caption():
    # Select a random image from the validation dataset
    sample_img = np.random.choice(valid_images)

    # Read the image from the disk
    sample_img = decode_and_resize(sample_img)  # Decode and resize the image
    img = sample_img.numpy().clip(0, 255).astype(np.uint8)  # Clip pixel values and convert to uint8
    plt.imshow(img)  # Display the image
    plt.show()

    # Pass the image to the CNN
    img = tf.expand_dims(sample_img, 0)  # Add batch dimension
    img = caption_model.cnn_model(img)  # Extract features using the CNN

    # Pass the image features to the Transformer encoder
    encoded_img = caption_model.encoder(img, training=False)  # Encode the image features

    # Generate the caption using the Transformer decoder
    decoded_caption = "<start> "  # Initialize caption with start token
    for i in range(max_decoded_sentence_length):
        tokenized_caption = vectorization([decoded_caption])[:, :-1]  # Tokenize the caption
        mask = tf.math.not_equal(tokenized_caption, 0)  # Generate mask for padding tokens
        predictions = caption_model.decoder(
            tokenized_caption, encoded_img, training=False, mask=mask
        )  # Predict next token probabilities
        sampled_token_index = np.argmax(predictions[0, i, :])  # Select token with highest probability
        sampled_token = index_lookup[sampled_token_index]  # Map token index to word
        if sampled_token == "<end>":  # Stop if end token is generated
            break
        decoded_caption += " " + sampled_token  # Append token to the caption

    # Clean up the generated caption
    decoded_caption = decoded_caption.replace("<start> ", "")  # Remove start token
    decoded_caption = decoded_caption.replace(" <end>", "").strip()  # Remove end token and extra spaces
    print("Predicted Caption: ", decoded_caption)  # Print the generated caption

# Check predictions for a few samples
generate_caption()
generate_caption()
generate_caption()
