## Image Caption Generator

In [None]:
!pip install torch==2.0.0
!pip install torchvision==0.15.1
!pip install torchtext==0.15.1
!pip install transformers
!pip install datasets
!pip install opencv-python-headless
!pip install fsspec==2024.9.0

!pip3 install "git+https://github.com/philferriere/cocoapi.git#egg=pycocotools&subdirectory=PythonAPI"
!git clone "https://github.com/salaniz/pycocoevalcap.git"
!pip install ultralytics



In [None]:

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from torchvision.datasets import CocoDetection
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader, Subset
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
import os
from PIL import Image
import numpy as np
import requests
from io import BytesIO
from datasets import load_dataset
from tqdm import tqdm
import cv2
import pycocoevalcap
from ultralytics import YOLO
from pycocotools.coco import COCO
from pycocoevalcap.eval import COCOEvalCap
from collections import Counter
# for metrics, COCO API metrics is used
import json
from pycocoevalcap import bleu, meteor, rouge, spice
#from pycocoevalcap.evalcap import COCOEvalCap

# Load YOLOv5 for feature extraction
import torch.hub

In [None]:
import torch
import random
import numpy as np

# Set the random seed for PyTorch
seed = 42  # You can choose any integer value
torch.manual_seed(seed)

# If you are using GPU (CUDA)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)  # If using multiple GPUs

# For Python's random module
random.seed(seed)

# For NumPy
np.random.seed(seed)


In [None]:
# setting the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

In [None]:
# Define transformations for image preprocessing
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
# Vocabulary, will be used later.
vocabulary = []

In [None]:
# "annotations": [{"image_id": 179765,"id": 38,"caption": "A black Honda motorcycle parked in front of a garage."},...}
class CocoDataset(Dataset): # <start> cat sat on the mat <end> -> {0 32 24 34 3 3 1 }
    def __init__(self, root_dir, annotation_file, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        with open(annotation_file, 'r') as f:
            self.coco_data = json.load(f)
        self.tokenizer = get_tokenizer("basic_english")  # Tokenizer from torchtext
        self.annotations = self.coco_data['annotations']
        # Build vocabulary
        self.vocab = self.build_vocab()
        self.image_id_to_file = {img['id']: img['file_name'] for img in self.coco_data['images']}
        self.image_id_to_img = {}


    def build_vocab(self):
        counter = Counter()
        for annotation in tqdm(self.annotations):
            caption = annotation['caption']
            tokens = self.tokenizer(caption.lower())
            counter.update(tokens)
        vocab = build_vocab_from_iterator([counter], specials=["<pad>", "<unk>", "<bos>", "<eos>"])
        vocab.set_default_index(vocab["<unk>"])  # Out-of-vocabulary words will be mapped to <unk>
        global vocabulary
        vocabulary = vocab
        return vocab

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        image_id = self.annotations[idx]['image_id']

        file_name = self.image_id_to_file.get(image_id)
        img_path = os.path.join(self.root_dir, file_name)

        try:
          image = Image.open(img_path).convert('RGB')
          self.image_id_to_img[image_id] = image # NOTE
        except Exception as e:
          print(f"Error opening image: {file_name}")
          raise e

        caption = self.annotations[idx]['caption']
        tokens = self.tokenizer(caption.lower())

        # Convert caption tokens to indices
        caption_indices = [self.vocab['<bos>']] + [self.vocab[token] for token in tokens] + [self.vocab['<eos>']]

        if self.transform:
            image = self.transform(image)

        return image, {'image_id': image_id,'captions': torch.tensor(caption_indices, dtype=torch.long)}


In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class EncoderCNN(nn.Module):
    def __init__(self, embed_size, model_type = 'resnet50'):
        """
        EncoderCNN initializes with either ResNet50, VGG16, EfficientNet, or ViT.
        Args:
            embed_size (int): The dimension of the embedding space.
            model_type (str): The type of model ('resnet50', 'vgg16', 'efficientnet', 'vit').
        """
        super(EncoderCNN, self).__init__()
        self.model_type = model_type

        if model_type == 'resnet50':
            self.cnn = models.resnet50(pretrained=True)
            self.backbone = nn.Sequential(*list(self.cnn.children())[:-1])  # Remove the last FC layer
            in_features = self.cnn.fc.in_features

        elif model_type == 'vgg16':
            self.cnn = models.vgg16(pretrained=True)
            self.backbone = self.cnn.features  # Feature extractor in VGG16
            in_features = 25088  # Output features of VGG16

        elif model_type == 'efficientnet':
            self.cnn = models.efficientnet_b0(pretrained=True)  # EfficientNet-B0
            self.backbone = self.cnn.features  # Feature extractor in EfficientNet
            in_features = 62720  #self.cnn.classifier[1].in_features  # Feature dimension
        elif model_type == 'densenet':
            self.cnn = models.densenet121(pretrained=True)
            self.backbone = self.cnn.features  # DenseNet feature extractor
            in_features = 50176  # Output features of DenseNet121

        elif model_type == 'vit':
            # Initialize a Vision Transformer from Hugging Face
            vit_config = ViTConfig.from_pretrained("google/vit-base-patch16-224")
            self.vit = ViTModel.from_pretrained("google/vit-base-patch16-224", config=vit_config)
           # in_features = 25088  
            in_features = vit_config.hidden_size  # Output size of ViT embeddings

        else:
            raise ValueError(f"Unsupported model_type: {model_type}. Choose 'resnet50', 'vgg16', 'efficientnet', or 'vit'.")

        # Linear layer to project extracted features to embed_size
        self.fc = nn.Linear(in_features, embed_size)

    def forward(self, images):
        """
        Forward pass of the encoder.
        Args:
            images (Tensor): Input images (batch_size, 3, H, W).
        Returns:
            Tensor: Embedding features of shape (batch_size, embed_size).
        """
        if self.model_type in ['resnet50', 'vgg16', 'efficientnet','densenet']:
            with torch.no_grad():
                features = self.backbone(images)  # Extract features
            features = features.view(features.size(0), -1)  # Flatten feature maps

        elif self.model_type == 'vit':
            # ViT expects images normalized and reshaped as batches
            vit_outputs = self.vit(pixel_values=images)
            features = vit_outputs.last_hidden_state[:, 0, :]  # CLS token features

        features = self.fc(features)  # Project to embed_size
        return features



In [None]:
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(DecoderRNN, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, features, captions):
        embeddings = self.embedding(captions)
        inputs = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        lstm_out, _ = self.lstm(inputs)
        output = self.fc(lstm_out)
        return output

In [None]:
class TransformerDecoder(nn.Module):
    def __init__(self,
                 embed_size, 
                 hidden_size, 
                 vocab_size, 
                 num_heads, 
                 num_layers,
                 ff_hidden_dim,
                 dropout = 0.1, 
                 max_seq_length = 50):
        super(TransformerDecoder, self).__init__()

        self.embedding = nn.Embedding(vocab_size,embed_size)
        self.positional_encoding = self._get_positional_encoding(max_seq_length,embed_size)

        decoder_layer = nn.TransformerDecoderLayer(d_model = embed_size,
                                                    nhead = num_heads,
                                                    dim_feedforward = ff_hidden_dim,
                                                   dropout = dropout)
        self.transformer_decoder = nn.TransformerDecoder(decoder_layer, num_layers =  num_layers)
        self.fc_out = nn.Linear(embed_size, vocab_size)
        self.embed_size = embed_size

    def _get_positional_encoding(self, max_seq_length, embed_size):
        
        positional_encoding = torch.zeros(max_seq_length, embed_size)
        
        position = torch.arange(0,max_seq_length).unsqueeze(1).float()
        div_term = torch.exp(torch.arange(0,embed_size,2).float() * -(torch.log(torch.tensor(10000.0)) / embed_size)) # why minus sign?
        
        positional_encoding[:,0::2] = torch.sin(position * div_term) # why not unsquuezing or dot product
        positional_encoding[:,1::2] = torch.cos(position * div_term) 
        positiınal_encoding = positinal_encoding.unsqueeze(0)
       
        return nn.Parameter(positional_encoding, requires_grad = False)
        
    def forward(self, features, captions):
        
        return output

Complete Model

In [None]:
class ImageCaptioningModel(nn.Module):
    def __init__(self, encoder, decoder):
        super(ImageCaptioningModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, images, captions):
        features = self.encoder(images)
        outputs = self.decoder(features, captions)
        return outputs

In [None]:
# verify coco dataset
coco_dataset_path = '/kaggle/input/coco-2017-dataset/coco2017'
annotation_path = os.path.join(coco_dataset_path, 'annotations')


train_images_path = os.path.join(coco_dataset_path, 'train2017')
val_images_path = os.path.join(coco_dataset_path, 'val2017')
test_images_path = os.path.join(coco_dataset_path, 'test2017')

train_annotations_path = os.path.join(annotation_path, 'captions_train2017.json')
val_annotations_path = os.path.join(annotation_path, 'captions_val2017.json')
#test_annotations_path = os.path.join(annotation_path, 'captions_test2017.json')


# Check if paths exist
paths = [coco_dataset_path, train_images_path, val_images_path, test_images_path, 
         train_annotations_path, val_annotations_path]

for path in paths:
    print(f"{path}: {os.path.exists(path)}")

# Count number of images in each image directory
train_images_count = len(os.listdir(train_images_path))
val_images_count = len(os.listdir(val_images_path))
test_images_count = len(os.listdir(test_images_path))


print(f"Number of training images: {train_images_count}")
print(f"Number of validation images: {val_images_count}")
print(f"Number of test images: {test_images_count}")

#TODO annotations

In [None]:
from torch.nn.utils.rnn import pad_sequence
def collate_fn(batch):
    """collates a batch of (image, caption) pairs and pads captions."""
    images, targets = zip(*batch)
    image_ids = [target["image_id"] for target in targets]
    captions = [target["captions"] for target in targets]
    # Pad captions to have the same length in the batch
    padded_captions = pad_sequence(captions, batch_first=True, padding_value=0)  # <pad> token is 0

    # Stack images into a batch
    images = torch.stack(images, 0) # dim: The dimension along which to stack. If dim=0, it adds a new first dimension to the tensors.
   # NO NEED FOR padded_captions = torch.stack(padded_captions, 0) , padded_captions are already stacked

    return images, image_ids, padded_captions

In [None]:
def train(model, dataloader, optimizer, criterion, device):
    model.train()
    running_loss = 0.0
    for images, image_ids, captions in dataloader:
        #captions = targets['captions']
        images = images.to(device)
        captions = captions.to(device)
        #print(len(captions))
        #print(len(images))
        optimizer.zero_grad()
        outputs = model(images, captions[:, :-1])  # Exclude last token in captions

        batch_size, seq_len, vocab_size = outputs.size()
        outputs = outputs.view(-1, vocab_size)  # Flattens outputs
        captions = captions.view(-1)
        loss = criterion(outputs, captions)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        # Print the current loss for monitoring the training progress
    print(f"Train Loss for current epoch is : {running_loss / len(dataloader)}")

def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0
    with torch.no_grad():
        for images, image_ids, captions in dataloader:
            #captions = targets['captions']
            images = images.to(device)
            captions = captions.to(device)

            #TODO DIMENSIONALITY PROBLEM
            outputs = model(images, captions[:, :-1])  # Exclude last token in captions
            batch_size, seq_len, vocab_size = outputs.size()
            outputs = outputs.view(-1, vocab_size)  # Flattens outputs
            captions = captions.view(-1)
            loss = criterion(outputs, captions)  # Teacher forcing
            total_loss += loss.item()

    return total_loss / len(dataloader)


In [None]:
# create a directory for checkpoints -> this can be used for storing state of the model in the midst of the training.
checkpoint_dir = 'checkpoints'
os.makedirs(checkpoint_dir, exist_ok= True)

def save_checkpoint(model, optimizer, epoch, loss, checkpoint_path):
    torch.save({
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'loss': loss,
    }, checkpoint_path)
    print(f"Checkpoint saved at {checkpoint_path}")


In [None]:
import random 

train_image_number = 20000 
val_image_number = 4000
test_image_number = 4000

# Load COCO Dataset (captions and images) using
train_dataset = CocoDataset(train_images_path, train_annotations_path, transform=transform)

# Example: Fetch an image and its caption
image, caption = train_dataset[0]

val_dataset = CocoDataset(val_images_path, val_annotations_path, transform=transform)

train_subset_indices = random.sample(range(len(train_dataset)),train_image_number)
remaining_indices = list(set(range(len(train_dataset))) - set(train_subset_indices))
test_subset_indices = random.sample(remaining_indices, test_image_number)

val_subset_indices = random.sample(range(len(val_dataset)),val_image_number)

train_subset = Subset(train_dataset, train_subset_indices)
val_subset = Subset(val_dataset, val_subset_indices)
test_subset = Subset(train_dataset, test_subset_indices)

# important
you can change the model type from the cell below, it is resnet50 here.

In [None]:
# Hyperparameters
embed_size = 512
hidden_size = 1024
num_epochs = 20
learning_rate = 0.0001
batch_size = 32
num_layers = 2

# Data Loaders
train_loader = DataLoader(train_subset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
val_loader = DataLoader(val_subset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)
test_loader = DataLoader(test_subset, batch_size = batch_size, shuffle = False, collate_fn = collate_fn)

# Vocab
train_dataset.vocab = train_dataset.build_vocab()
# Instantiate Encoder, Decoder, and Model
encoder = EncoderCNN(embed_size, model_type = 'resnet50').to(device)
decoder = DecoderRNN(embed_size, hidden_size, len(train_dataset.vocab), num_layers=num_layers).to(device) # TODO
model = ImageCaptioningModel(encoder, decoder).to(device)

# Optimizer and Loss Function
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()



In [None]:
# Training Loop
for epoch in tqdm(range(num_epochs)):
    train(model, train_loader, optimizer, criterion, device)

    print(f"Training for epoch {epoch+1}/{num_epochs} completed.")

    # Commented out - quicker
    val_loss = evaluate(model, val_loader, criterion, device)
    print(f"Epoch {epoch+1}/{num_epochs}, Validation Loss: {val_loss}")

# Save the trained model
torch.save(model.state_dict(), 'image_captioning_model.pth')

In [None]:
model.load_state_dict(torch.load('image_captioning_model.pth'))


In [None]:
def generate_caption(image, model, vocab, device):

    model.eval()
    print(image.shape)
    image = image.unsqueeze(0).to(device) # unsqueezing for encapsulating it inside a batch
    print(image.shape)
    features = model.encoder(image)

    # Generate caption
    caption = ['<bos>']
    for _ in range(50):  # Maximum caption length
        input_caption = torch.tensor([vocab[token] for token in caption]).unsqueeze(0).to(device)
        outputs = model.decoder(features, input_caption)
        _, predicted = outputs.max(2)  # outputs.shape = (batch,seq_len,vocab_size), vocab_size needed because we need to calculate each word's probabilty and pick the best of them
        predicted_word = vocab.get_itos()[predicted[0, -1].item()] # get the first batch (only one), last predicted word in lstm(next word) convert it to string instead of an id
        caption.append(predicted_word)
        if predicted_word == '<eos>':
            break
    print(caption)
    return ' '.join(caption[1:-1])  # Removing <bos> and <eos>

# Load trained model and generate a caption
model.load_state_dict(torch.load('image_captioning_model.pth'))

In [None]:
def generate_captions_for_coco(val_loader,model,vocab, max_length= 50, num_samples=5):
    all_captions = {}
    model.eval()
    with torch.no_grad():
        for i, (images, image_ids,captions) in enumerate(val_loader):
            #image_ids = targets['image_id']
            images = images.to(device)
            for image_id, image in zip(image_ids,images):
              generated_caption = generate_caption(image, model, vocab, device)
              print(f"type(generated_caption) = {type(generated_caption)} , {type(image_id)}")
              all_captions[image_id] = generated_caption
              print(f"Generated Caption for Image {image_id}: {generated_caption}")
            if i >= num_samples:
                break
    print(all_captions)
    return all_captions

In [None]:

def evaluate_captioning_model(generated_captions, coco_annotation_file=train_annotations_path,coco_image_dir=train_images_path):
    """
    Evaluate the image captioning model using COCO evaluation metrics: BLEU, METEOR, ROUGE, CIDEr.

    Parameters:
        generated_captions (dict): Dictionary of generated captions with image_ids as keys.
        coco_annotation_file (str): Path to COCO annotations file.

    Returns:
        dict: Dictionary containing BLEU, METEOR, ROUGE, CIDEr scores.
    """
    # Load the COCO dataset annotations (reference captions)
    coco = COCO(coco_annotation_file)

    # Create a dictionary for the generated captions (image_id -> caption)
    coco_results = [{'image_id': image_id, 'caption': caption} for image_id, caption in generated_captions.items()]


    # Save the generated captions in a temporary file
    with open('generated_captions.json', 'w') as f:
        json.dump(coco_results, f)

    # Load the results into COCO's evaluation API
    coco_results = coco.loadRes('generated_captions.json')
    #print(coco_results)
    #print("Generated Captions Image IDs:", generated_captions.keys())
    #print("Ground Truth Image IDs:", coco.getImgIds())


    # since we filtered the images to contain only first 1000 images, lets filter the metric
    all_image_ids = coco.getImgIds()
    filtered_image_ids = [image_id for image_id in generated_captions.keys()]

    # we need to revise the filtered version of the actual annotations
    gts = {}
    for image_id in filtered_image_ids:
        caption_ids = coco.getAnnIds(imgIds=image_id)
        annotations = coco.loadAnns(caption_ids)
        gts[image_id] = [annotation['caption'] for annotation in annotations]


    # Set up the evaluation
    #print("gts.keys(): ",gts.keys())
    #print("coco_results.keys(): ", generated_captions.keys())
    #assert(gts.keys() == coco_results.keys())

    coco_eval = COCOEvalCap(coco, coco_results)
    coco_eval.params['image_id'] = filtered_image_ids
    coco_eval.evaluate()

    # Extract and return the metrics (BLEU, METEOR, ROUGE, CIDEr)
    metrics = coco_eval.eval
    return metrics

# Example: Generated captions for some images
dummy_generated_captions = {
    12345: "A man in a black shirt is riding a bike.",
    67890: "A dog running through the grass.",
    11223: "A woman holding a book in her hand."
}

generated_captions = generate_captions_for_coco(test_loader,model,vocabulary)
# Evaluate the generated captions
metrics = evaluate_captioning_model(generated_captions)


print("Evaluation Metrics:", metrics)

In [None]:
'''
import matplotlib.pyplot as plt
from PIL import Image
from pycocotools.coco import COCO

def show_images_with_captions(generated_captions, coco_annotation_file, coco_image_dir):
    """
    Display images with their generated captions.

    Parameters:
        generated_captions (dict): Dictionary of generated captions with image_ids as keys.
        coco_annotation_file (str): Path to COCO annotations file.
        coco_image_dir (str): Path to the directory containing COCO images.
    """
    # Load the COCO dataset annotations
    coco = COCO(coco_annotation_file)

    for image_id, caption in generated_captions.items():
        # Get image information from COCO
        try:
            image_info = coco.loadImgs(image_id)[0]
            image_path = f"{coco_image_dir}/{image_info['file_name']}"
            print(f"Image path: {image_path}")  # Debug: Print the image path
        except KeyError:
            print(f"Image ID {image_id} not found in the dataset.")
            continue

        # Load and display the image using PIL and matplotlib
        try:
            image = Image.open(image_path)
        except FileNotFoundError:
            print(f"Image file not found at {image_path}")
            continue

        # Print the caption
        print(f"Generated Caption: {caption}")

        # Display the image
        plt.figure(figsize=(8, 8))
        plt.imshow(image)
        plt.axis('off')
        plt.title(f"Generated Caption: {caption}", fontsize=14, wrap=True)
        plt.show()

# Generate captions for COCO validation images
generated_captions = generate_captions_for_coco(val_loader, model, vocabulary)

# Evaluate the generated captions
metrics = evaluate_captioning_model(generated_captions)

# Print evaluation metrics
print("Evaluation Metrics:", metrics)

# Display images and their captions
show_images_with_captions(generated_captions, val_annotations_path, val_images_path)
'''