<a href="https://colab.research.google.com/github/inuwamobarak/image-capturing-pre-trained/blob/main/Image_Caption_Generation_Using_Generative_Artificial_Intelligence.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Installing requirements

In [None]:
# Install required packages for the code to run
!pip install transformers rouge_score evaluate datasets

## Libraries and Packages

In [None]:
# Importing necessary libraries and packages

import requests  # Library for making HTTP requests
import torch  # PyTorch for deep learning
from PIL import Image  # Library for image processing
from transformers import *  # Transformers library for NLP tasks
from tqdm import tqdm  # Library for displaying progress bars
import numpy as np # Library for numerical manipulation

device = "cuda" if torch.cuda.is_available() else "cpu"  # Checking for GPU availability and setting the device accordingly

## Loading Encoders and Decoders

In [None]:
# The model used for encoding the image and extracting image features
# Available encoder models:
# encoder_model = "WinKawaks/vit-small-patch16-224"
# encoder_model = "google/vit-base-patch16-224"
# encoder_model = "google/vit-base-patch16-224-in21k"
encoder_model = "microsoft/swin-base-patch4-window7-224-in22k"

# The model used for decoding the image features and generating captions
# Available decoder models:
# decoder_model = "bert-base-uncased"
# decoder_model = "prajjwal1/bert-tiny"
decoder_model = "gpt2"

## Load the pre-trained Encoder and Decoder models
model = VisionEncoderDecoderModel.from_encoder_decoder_pretrained(
    encoder_model, decoder_model
).to(device)

## Loading Tokenizers and Image Processors

In [None]:
## Initialize the Tokenizer

# The tokenizer is used to preprocess the text and convert it into numerical inputs for the model
# Available tokenizers:
# tokenizer = AutoTokenizer.from_pretrained(decoder_model)
tokenizer = GPT2TokenizerFast.from_pretrained(decoder_model)
# tokenizer = BertTokenizerFast.from_pretrained(decoder_model)

## Initialize the Image Processor

# The image processor is used to preprocess the images and extract visual features
# Available image processors:
# - ViTImageProcessor (for "google/vit-base-patch16-224" and "microsoft/swin-base-patch4-window7-224-in22k" encoder models)
image_processor = ViTImageProcessor.from_pretrained(encoder_model)

## Configuring the Model and Tokenizer for the Decoder

In [None]:
# If the decoder model is "gpt2"
if "gpt2" in decoder_model:
    # Adjust the tokenizer and model configurations for "gpt2"
    tokenizer.pad_token = tokenizer.eos_token  # Set pad_token_id as eos_token_id
    model.config.eos_token_id = tokenizer.eos_token_id
    model.config.pad_token_id = tokenizer.pad_token_id
    model.config.decoder_start_token_id = tokenizer.bos_token_id  # Set decoder_start_token_id as bos_token_id
else:
    # For other decoder models
    model.config.decoder_start_token_id = tokenizer.cls_token_id  # Set decoder_start_token_id as cls_token_id
    model.config.pad_token_id = tokenizer.pad_token_id  # Set pad_token_id as pad_token_id

# Importing and Loading the Dataset

In [None]:
from datasets import load_dataset

max_length = 32  # Maximum length of the captions in tokens
coco_dataset_ratio = 50  # 50% of the COCO2014 dataset

# Load the COCO2014 dataset for training, validation, and testing splits
train_ds = load_dataset("HuggingFaceM4/COCO", split=f"train[:{coco_dataset_ratio}%]")
valid_ds = load_dataset("HuggingFaceM4/COCO", split=f"validation[:{coco_dataset_ratio}%]")
test_ds = load_dataset("HuggingFaceM4/COCO", split="test")

# Get the number of examples in each split
train_len = len(train_ds)
valid_len = len(valid_ds)
test_len = len(test_ds)

train_len, valid_len, test_len  # Display the number of examples in each split

# Removing Images with Less than 3 Dimensions

In [None]:
# Filter out images with less than 3 dimensions (possibly grayscale images)
train_ds = train_ds.filter(lambda item: np.array(item["image"]).ndim in [3, 4], num_proc=2)
valid_ds = valid_ds.filter(lambda item: np.array(item["image"]).ndim in [3, 4], num_proc=2)
test_ds = test_ds.filter(lambda item: np.array(item["image"]).ndim in [3, 4], num_proc=2)

# Dataset Preprocessing

In [None]:
def preprocess(items):
    # Preprocess the image
    pixel_values = image_processor(items["image"], return_tensors="pt").pixel_values.to(device)

    # Tokenize the captions with truncation and padding
    targets = tokenizer([sentence["raw"] for sentence in items["sentences"]],
                        max_length=max_length, padding="max_length", truncation=True, return_tensors="pt").to(device)

    return {'pixel_values': pixel_values, 'labels': targets["input_ids"]}

# Apply the preprocess function to transform the datasets during training
train_dataset = train_ds.map(preprocess)
valid_dataset = valid_ds.map(preprocess)
test_dataset = test_ds.map(preprocess)

# Batch Collation Function

In [None]:
def collate_fn(batch):
    return {
        'pixel_values': torch.stack([x['pixel_values'] for x in batch]),
        'labels': torch.stack([x['labels'] for x in batch])
    }

# This function takes a batch of preprocessed examples and stacks the pixel values and labels into tensors. It will be used by the data loader to collate the samples into batches.

# Metrics Computation

In [None]:
import evaluate

# Load the Rouge and Bleu metrics
rouge = evaluate.load("rouge")
bleu = evaluate.load("bleu")

def compute_metrics(eval_pred):
    preds = eval_pred.label_ids
    labels = eval_pred.predictions

    # Decode the predictions and labels
    pred_str = tokenizer.batch_decode(preds, skip_special_tokens=True)
    labels_str = tokenizer.batch_decode(labels, skip_special_tokens=True)

    # Compute the Rouge score
    rouge_result = rouge.compute(predictions=pred_str, references=labels_str)
    rouge_result = {k: round(v * 100, 4) for k, v in rouge_result.items()}  # Multiply by 100 to get the same scale as the Rouge score

    # Compute the Bleu score
    bleu_result = bleu.compute(predictions=pred_str, references=labels_str)

    # Get the length of the generated captions
    generation_length = bleu_result["translation_length"]

    return {
        **rouge_result,
        "bleu": round(bleu_result["bleu"] * 100, 4),
        "gen_len": bleu_result["translation_length"] / len(preds)
    }

# This function takes the evaluation predictions (including label ids and predicted ids) and computes the Rouge and Bleu scores for the generated captions. It also calculates the average generation length.

# Training Parameters

In [None]:
num_epochs = 2  # Number of epochs
batch_size = 16  # Batch size

# Set the number of training epochs and the batch size. Adjust these values according to your specific requirements.

# Dataset Example Shapes

In [None]:
# Iterate over the training dataset and print the shapes of labels and pixel values for an example.
for item in train_dataset:
    print(item["labels"].shape)
    print(item["pixel_values"].shape)
    break

# Training Arguments

In [None]:
from transformers import Seq2SeqTrainingArguments

# Define the training arguments
training_args = Seq2SeqTrainingArguments(
    predict_with_generate=True,             # Use generate to calculate the loss
    num_train_epochs=num_epochs,            # Number of training epochs
    evaluation_strategy="steps",            # Evaluate after each eval_steps
    eval_steps=2000,                        # Evaluate after each 2000 steps
    logging_steps=2000,                     # Log after each 2000 steps
    save_steps=2000,                        # Save after each 2000 steps
    per_device_train_batch_size=batch_size, # Batch size for training
    per_device_eval_batch_size=batch_size,  # Batch size for evaluation
    output_dir="vit-swin-base-224-gpt2-image-captioning",  # Output directory for saving checkpoints and logs
    # push_to_hub=True # Whether you want to push the model to the hub
    # Check this guide for more details: https://huggingface.co/transformers/model_sharing.html
)

# DataLoader Functions

In [None]:
from torch.utils.data import DataLoader

def get_eval_loader(eval_dataset=None):
    return DataLoader(valid_dataset, collate_fn=collate_fn, batch_size=batch_size)

def get_test_loader(eval_dataset=None):
    return DataLoader(test_dataset, collate_fn=collate_fn, batch_size=batch_size)

# Override the `get_train_dataloader`, `get_eval_dataloader`, and `get_test_dataloader` methods of the trainer
# so that we can properly load the data

trainer.get_train_dataloader = lambda: DataLoader(train_dataset, collate_fn=collate_fn, batch_size=batch_size)
trainer.get_eval_dataloader = get_eval_loader
trainer.get_test_dataloader = get_test_loader

# These functions define the data loaders for training, evaluation, and testing. We override the default methods in the trainer to use our custom data loaders that properly collate the batches using the `collate_fn` function.

# Train the model

In [None]:
trainer.train()

# Evaluate the model on the test dataset
trainer.evaluate(test_dataset)

# If you set the push_to_hub parameter in the TrainingArguments,
# complete the push to the model hub using the following code
trainer.push_to_hub()

# Managing Memory by Freeing Space

In [None]:
# Import necessary libraries
import gc

# Free up GPU memory
torch.cuda.empty_cache()

# Perform garbage collection to release unused memory
gc.collect()


# Using PyToch DataLoader

In [None]:
# Import necessary libraries
from torch.utils.data import DataLoader

# Define data loaders for training, validation, and testing datasets
train_dataset_loader = DataLoader(train_dataset, collate_fn=collate_fn, batch_size=batch_size, shuffle=True)
valid_dataset_loader = DataLoader(valid_dataset, collate_fn=collate_fn, batch_size=8, shuffle=True)
test_dataset_loader = DataLoader(test_dataset, collate_fn=collate_fn, batch_size=8, shuffle=True)

In [None]:
# Import necessary libraries
from torch.optim import AdamW

# Define the optimizer
optimizer = AdamW(model.parameters(), lr=1e-5)

# The code comment has been updated to explain that the purpose of this line is to define the optimizer (AdamW) and initialize it with the model's parameters, using a learning rate of 1e-5.

# Handling Log Files with TensorBoard

In [None]:
# Start tensorboard
%load_ext tensorboard
%tensorboard --logdir ./image-captioning/tensorboard

# The code you provided is used to start TensorBoard within a Jupyter Notebook environment. It first loads the TensorBoard extension using `%load_ext tensorboard`, and then starts TensorBoard with the specified log directory using `%tensorboard --logdir ./image-captioning/tensorboard`.

# Please note that the `./image-captioning/tensorboard` directory should contain the log files generated during the training process.

In [None]:
from torch.utils.tensorboard import SummaryWriter

# Create a SummaryWriter for TensorBoard logging
summary_writer = SummaryWriter(log_dir="./image-captioning/tensorboard")

# Print some statistics before training
# Calculate the number of training steps
n_train_steps = num_epochs * len(train_dataset_loader)
# Calculate the number of validation steps
n_valid_steps = len(valid_dataset_loader)
# Set the current training step to 0
current_step = 0
# Define the step frequency for logging, evaluation, and saving
save_steps = 1000


# The code comments have been updated to explain that the `SummaryWriter` is used for logging training information to TensorBoard. The statistics before training are also clarified, including the calculation of the number of training and validation steps, the initialization of the current training step, and the definition of the step frequency for logging, evaluation, and saving.

# Training Loop

In [None]:
for epoch in range(num_epochs):
    # Set the model to training mode
    model.train()

    # Initialize the training loss
    train_loss = 0

    for batch in tqdm(train_dataset_loader, "Training", total=len(train_dataset_loader), leave=False):
        if current_step % save_steps == 0:
            # Evaluate on the validation set if the current step is a multiple of the save steps
            print()
            print(f"Validation at step {current_step}...")
            print()

            # Set the model to evaluation mode
            model.eval()

            # Initialize lists to store predictions and labels for validation
            predictions, labels = [], []

            # Initialize the validation loss
            valid_loss = 0

            for batch in valid_dataset_loader:
                # Get the batch data
                pixel_values = batch["pixel_values"]
                label_ids = batch["labels"]

                # Perform forward pass
                outputs = model(pixel_values=pixel_values, labels=label_ids)

                # Calculate the loss
                loss = outputs.loss
                valid_loss += loss.item()

                # Free the GPU memory
                logits = outputs.logits.detach().cpu()

                # Add the predictions to the list
                predictions.extend(logits.argmax(dim=-1).tolist())

                # Add the labels to the list
                labels.extend(label_ids.tolist())

            # Create EvalPrediction object for compute_metrics function
            eval_prediction = EvalPrediction(predictions=predictions, label_ids=labels)

            # Compute the metrics
            metrics = compute_metrics(eval_prediction)

            # Print the statistics
            print()
            print(f"Epoch: {epoch}, Step: {current_step}, Train Loss: {train_loss / save_steps:.4f}, " +
                  f"Valid Loss: {valid_loss / n_valid_steps:.4f}, BLEU: {metrics['bleu']:.4f}, " +
                  f"ROUGE-1: {metrics['rouge1']:.4f}, ROUGE-2: {metrics['rouge2']:.4f}, ROUGE-L: {metrics['rougeL']:.4f}")
            print()

            # Log the metrics to TensorBoard
            summary_writer.add_scalar("valid_loss", valid_loss / n_valid_steps, global_step=current_step)
            summary_writer.add_scalar("bleu", metrics["bleu"], global_step=current_step)
            summary_writer.add_scalar("rouge1", metrics["rouge1"], global_step=current_step)
            summary_writer.add_scalar("rouge2", metrics["rouge2"], global_step=current_step)
            summary_writer.add_scalar("rougeL", metrics["rougeL"], global_step=current_step)

            # Save the model
            model.save_pretrained(f"./image-captioning/checkpoint-{current_step}")
            tokenizer.save_pretrained(f"./image-captioning/checkpoint-{current_step}")
            image_processor.save_pretrained(f"./image-captioning/checkpoint-{current_step}")

            # Set the model back to train mode
            model.train()

            # Reset the train and valid loss
            train_loss, valid_loss = 0, 0

        # Get the batch data and convert them to tensors
        pixel_values = batch["pixel_values"]
        labels = batch["labels"]

        # Perform forward pass
        outputs = model(pixel_values=pixel_values, labels=labels)

        # Calculate the loss
        loss = outputs.loss

        # Backward pass
        loss.backward()

        # Update the weights
        optimizer.step()

        # Zero the gradients
        optimizer.zero_grad()

        # Log the loss
        loss_value = loss.item()
        train_loss += loss_value

        # Increment the step
        current_step += 1

        # Log the training loss to TensorBoard
        summary_writer.add_scalar("train_loss", loss_value, global_step=current_step)

#The training loop code has been updated with clearer comments to describe each step of the loop. It includes training the model, evaluating on the validation set at specified steps, logging metrics to TensorBoard, saving the model, and updating the training loss.

# Loading Best Model

In [None]:
# Load the best model by changing the checkpoint number to the best checkpoint
best_checkpoint = 3000
best_model = VisionEncoderDecoderModel.from_pretrained(f"./image-captioning/checkpoint-{best_checkpoint}").to(device)

#The code comment has been updated to explain that the purpose of this code is to load the best model from a specified checkpoint. The checkpoint number is set to the `best_checkpoint` variable, and the model is loaded using the `from_pretrained` method from the `VisionEncoderDecoderModel` class. The loaded model is then assigned to the `best_model` variable and moved to the specified device (GPU in this case).

# Model Evaluation

In [None]:
#The provided code defines a function `get_evaluation_metrics` that takes a `model` and a `dataset` as inputs and returns evaluation metrics. Here's a revised version of the code with clearer comments:

def get_evaluation_metrics(model, dataset):
    # Set the model to evaluation mode
    model.eval()

    # Define the dataloader
    dataloader = DataLoader(dataset, collate_fn=collate_fn, batch_size=batch_size)

    # Calculate the number of testing steps
    n_test_steps = len(dataloader)

    # Initialize lists to store predictions and labels
    predictions, labels = [], []

    # Initialize the test loss
    test_loss = 0.0

    # Iterate over batches in the dataloader
    for batch in tqdm(dataloader, "Evaluating"):
        # Get the batch data
        pixel_values = batch["pixel_values"]
        label_ids = batch["labels"]

        # Perform forward pass
        outputs = model(pixel_values=pixel_values, labels=label_ids)

        # Get the loss
        loss = outputs.loss
        test_loss += loss.item()

        # Free the GPU memory
        logits = outputs.logits.detach().cpu()

        # Add the predictions to the list
        predictions.extend(logits.argmax(dim=-1).tolist())

        # Add the labels to the list
        labels.extend(label_ids.tolist())

    # Create EvalPrediction object that the compute_metrics function expects
    eval_prediction = EvalPrediction(predictions=predictions, label_ids=labels)

    # Compute the evaluation metrics
    metrics = compute_metrics(eval_prediction)

    # Add the test_loss to the metrics
    metrics["test_loss"] = test_loss / n_test_steps

    return metrics

#The revised comments provide clearer explanations of each step in the function.

In [None]:
#The code `metrics = get_evaluation_metrics(best_model, test_dataset)` calls the `get_evaluation_metrics` function with the `best_model` and `test_dataset` as inputs, and assigns the returned metrics to the `metrics` variable. Finally, `metrics` is printed to display the evaluation metrics.

metrics = get_evaluation_metrics(best_model, test_dataset)
metrics

In [None]:
#The code `finetuned_metrics = get_evaluation_metrics(finetuned_model, test_dataset)` calls the `get_evaluation_metrics` function with the `finetuned_model` and `test_dataset` as inputs, and assigns the returned metrics to the `finetuned_metrics` variable. Finally, `finetuned_metrics` is printed to display the evaluation metrics.

finetuned_metrics = get_evaluation_metrics(finetuned_model, test_dataset)
finetuned_metrics

# Image Captioning Pipeline

In [None]:
#The provided code sets up an image captioning pipeline using the Hugging Face Transformers library. The pipeline is initialized with the `"image-to-text"` task and the `"Abdou/vit-swin-base-224-gpt2-image-captioning"` model. Here's the updated code:

# Import necessary libraries
from transformers import pipeline

# Create the image captioning pipeline
image_captioner = pipeline("image-to-text", model="Abdou/vit-swin-base-224-gpt2-image-captioning")

# Move the model to the specified device (e.g., GPU)
image_captioner.model = image_captioner.model.to(device)

#The code comment explains that the purpose is to set up an image captioning pipeline using the specified model and move the model to the specified device for inference.

In [None]:
get_evaluation_metrics(image_captioner.model, test_dataset)

In [None]:
#The provided code defines a function `show_image_and_captions` that takes a URL as input. It displays the image, generates captions using different models, and prints the captions. Here's the revised code with clearer comments:

def show_image_and_captions(url):
    # Get the image and display it
    display(load_image(url))

    # Generate captions using different models
    our_caption = get_caption(best_model, image_processor, tokenizer, url)
    finetuned_caption = get_caption(finetuned_model, finetuned_image_processor, finetuned_tokenizer, url)
    pipeline_caption = get_caption(image_captioner.model, image_processor, tokenizer, url)

    # Print the captions
    print(f"Our caption: {our_caption}")
    print(f"nlpconnect/vit-gpt2-image-captioning caption: {finetuned_caption}")
    print(f"Abdou/vit-swin-base-224-gpt2-image-captioning caption: {pipeline_caption}")

#The updated comments provide clearer explanations of each step in the function.

In [None]:
#The `show_image_and_captions()` function displays the image from the provided URL and generates captions using different models. Here's the updated code:

show_image_and_captions("http://images.cocodataset.org/test-stuff2017/000000000001.jpg")

#Running this code will display the image and print the captions generated by the `best_model`, `finetuned_model`, and `image_captioner.model` for the given image URL.

In [None]:
show_image_and_captions("http://images.cocodataset.org/test-stuff2017/000000000019.jpg")