<a href="https://colab.research.google.com/github/vjhawar12/Image-Captioning/blob/main/Image_Captioning.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports & installation

In [None]:
!pip install torchtext && pip install torch && pip install torchvision && pip install bert-score

In [None]:
from torchtext.vocab import vocab
import torch
import torchvision
from torchvision.transforms import v2
from torchvision.io import decode_image
import torch.nn as nn
import torch.optim as optim
from torchvision.datasets import CocoDetection
from torch.utils.data import DataLoader, Dataset
from pycocotools.coco import COCO
from pprint import pprint
import pandas as pd
from skimage import io
from os import path
from random import randint
from collections import Counter
from google.cloud import storage
from tqdm import tqdm
from bert_score import score
from torch.func import vmap
from torch.nn.utils.rnn import pad_sequence
from numpy import mean

# Hyperparameters

The following is a list of the major hyperparameters used in this project.

In [None]:
BATCHSIZE = 32 # batch size of data passed to encoder
EPOCHS = 20 # training epochs
MIN_FREQUENCY = 2 # minimium times a token must appear to be included in the vocabulary
FEATURE_MAP_SIZE = 1280 # size of the feature map
EMBED_SIZE = 256 # dimension of the word embedding vector space
HIDDEN_SIZE = 512 # dimension of the hidden state vector space
NUM_LAYERS = 2 # number of layers in the GRU
VOCAB_SIZE = 10000 # dimension of the vocabulary vector space
MAX_LEN = 10 # max length of caption generated
DROPOUT_RATE = 0.1 # Proportion of neurons to be randomly deactivated each forward pass

# CUDA Optimizations

In [None]:
if torch.cuda.is_available():
  torch.backends.cuda.matmul.allow_tf32 = True # more efficient highly-accurate data format
  torch.backends.cudnn.allow_tf32 = True
  torch.backends.cuda.enable_flash_sdp(True) # efficient version of scaled dot product attention comptuation
  torch.backends.cuda.enable_mem_efficient_sdp(True)
  torch.backends.cuda.enable_math_sdp(True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # moving to CUDA if possible

In [None]:
encoder = torch.hub.load('pytorch/vision:v0.10.0', 'mobilenet_v2', pretrained=True) # loading MobileNetV2.

encoder.classifier = nn.Identity() # removing the final classification layer to retrieve the feature map. Feature map: [1, 1280]
encoder.to(device) # moving to CUDA if possible

for param in encoder.parameters(): # freezing the encoder since we're not training it
  param.requires_grad = False # avoid computing gradients for brevity

# Pre-processing transforms

In [None]:
transform_encoder = v2.Compose(
    [
        v2.Resize((224, 224)),
        v2.SanitizeBoundingBoxes(), # removing invalid bounding boxes
        v2.ToTensor(),
        v2.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# GRU-based decoder

In [None]:
class GRU_Decoder(nn.Module):

  def __init__(self, feature_map_size=1280, embed_size=256, hidden_size=512, num_layers=2, vocab_size=10000):
    super().__init__()

    self.feature_map_size = feature_map_size
    self.embed_size = embed_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.vocab_size = vocab_size

    self.embed = nn.Embedding(num_embeddings=self.vocab_size, embedding_dim=self.embed_size) # word --> embedding (vector representation)
    self.proj = nn.Linear(in_features=self.feature_map_size, out_features=self.hidden_size) # dim(feature space) --> dim(hidden state) to draw caption-related information from the raw images
    self.gru = nn.GRU(input_size=self.embed_size, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True) # embed_size --> hidden_size
    self.fc = nn.Linear(in_features=self.hidden_size, out_features=self.vocab_size) # hidden state vector -> vocabulary vector (in hidden state vector space, the vector is not interpretable hence it needs to go to vocabulary vector space)
    self.dropout = nn.Dropout(p=DROPOUT_RATE)

  """ Called during val/test loop. Generates captions when passed BOS token and EOS token using autoregression not teacher-forcing. """
  def generate(self, feature_map, bos_token, eos_token, max_len=10):
    batch_size = feature_map.size(0)
    bos_token = self.embed(bos_token).unsqueeze(1)
    h = self.proj(feature_map).unsqueeze(0) # initial hidden state

    last_word = bos_token
    caption = []

    for i in range(max_len): # don't have the entire caption yet, so need to loop until its generated
      output, h = self.gru(last_word, h) # passing the last word generated through the GRU layer to get the next word
      output = self.dropout(output) # applying dropout to boost generalization
      logits = self.fc(output) # now in vocabulary vector space
      word = torch.argmax(logits, dim=1) # argmaxxing to get the most probable predicted word
      caption.append(word) # adding this word to the caption generated so far

      if torch.all(word == eos_token): # comparing word and eos token across the various dimensions
        break # exit if reached end of caption

      last_word = self.embed(word).unsqueeze(1) # shifting the last_word pointer to the right

    return torch.stack(caption, dim=1) # formatting the caption correctly before returning it

  """ Called during train loop. Generates captions when passed ground truth (words) using teacher-forcing not autoregression  """
  def forward(self, feature_map, words):
    batch_size = feature_map.size(0)
    embedding = self.embed(words) # returns a vector representation of a word
    h0 = self.proj(feature_map).unsqueeze(0) # initializes the hidden state by projecting the feature map onto the hidden state dimensional space
    h0 = h0.reshape(self.num_layers, batch_size, self.hidden_size) # gru expects hidden state in a certain format
    output, _ = self.gru(embedding, h0) # teacher-forcing with the correct captions
    logits = self.fc(output) # going from hidden state vector space --> vocabulary vector space

    return logits



In [None]:
decoder = GRU_Decoder(feature_map_size=FEATURE_MAP_SIZE, embed_size=EMBED_SIZE, hidden_size=HIDDEN_SIZE, num_layers=NUM_LAYERS, vocab_size=VOCAB_SIZE)
decoder.to(device) # shifting model to CUDA if possible

#MiniCOCO: Karpathy Split of COCO

This project uses the [Karpathy split](https://www.kaggle.com/datasets/shtvkumar/karpathy-splits/data) of the COCO dataset with the following customizations:

- ✅ **Single caption per image** during training for faster convergence.
- ✅ **Multiple captions per image** for validation and testing to enable robust evaluation.
- ✅ **Special tokens** (`<bos>`, `<eos>`, `<pad>`) added to sequences to guide generation and training.
- ✅ **Padding** applied to caption sequences to ensure uniform batch dimensions.

In [None]:
class MiniCoco(Dataset):
  def pad(data): # class function to pad captions for uniformity
    images, captions = zip(*data) # unzipping data
    images = torch.stack(images, dim=0) # combining along dim=0

    captions = pad_sequence(captions, batch_first=True, padding_value=train_data.vocab["<pad>"])

    return images, captions

  def __init__(self, json_file, root_dir, split, transform=None):
    super().__init__()

    self.full_data = pd.read_json(json_file)
    self.data = self.full_data["images"]
    self.split = split
    self.counter = Counter() # counting the # of occurances of a particular word in a sentence
    self.captions = [] # nested list with all the captions for each sample

    if self.split == "train":
      self.data = [obj for obj in self.data if obj["split"] == "restval"]
    elif self.split == "val":
      self.data = [obj for obj in self.data if obj["split"] == "val"]
    elif self.split == "test":
      self.data = [obj for obj in self.data if obj["split"] == "test"]
    else:
      raise Exception("Invalid split")

    self.length = len(self.data)
    self.root_dir = root_dir
    self.transform = transform

    if self.split == "train": # only want to pre-load captions for train -- during test/val mode they will be generated only for comparison with predicted
      for sample_num in range(len(self.data)): # iterating over all samples in the train dataset
        cap = [] # captions for particular sample

        for j in range(len(self.data[sample_num]["sentences"])): # iterating over the various captions provided for each sample
          caption = self.data[sample_num]["sentences"][j]
          token = caption["tokens"]
          self.counter.update(token) # keeping track of the frequency of each token
          cap.append(token)

        self.captions.append(cap)

    special_tokens = ['<unk>', '<pad>', '<bos>', '<eos>'] # <unk>: unknown; <pad>: padding; <bos>: beginning of sentence; <eos>: end of sentence
    self.vocab = vocab(self.counter, specials=special_tokens, special_first=True, min_freq=MIN_FREQUENCY) # defining vocab object for stoi and itos
    self.vocab.set_default_index(self.vocab["<unk>"]) # map to unk by default

    for i in range(len(self.captions)):
      for j in range(len(self.captions[i])):
        self.captions[i][j] = self.encode(self.captions[i][j]) # mapping each caption in the nested list to an integer via encode()

  def encode(self, text): # caption --> numerical representation
    return [self.vocab["<bos>"]] + [self.vocab.get_stoi()[s] for s in text] + [self.vocab["<eos>"]]

  def itos(self, tens):
    return ' '.join(self.vocab.get_itos()[i] for i in tens[1:-1]) # applying itos for a 1D list of integers

  def decode(self, ints):  # sequence of numbers --> space-seperated caption
    if ints.dim() == 1:
      return self.itos(ints)

    return [self.itos(seq) for seq in ints] # applying itos for n-dim list

  def __len__(self):
    return self.length

  def __getitem__(self, index):
    # train images should only have 1 caption (leads to faster convergence when teacher-forcing during training)
    if self.split == "train":
      captions = self.captions[index][randint(0, len(self.captions[index]) - 1)]
    # during test/val we will compare all 5 captions to the generated caption using bertscore for more accurate metrics
    else:
      captions = [self.encode(sent["tokens"]) for sent in self.data[index]["sentences"]]

    # storing the image into memory as a torch tensor
    image_name = path.join(self.root_dir, self.data[index]["filename"])
    image = decode_image(image_name, mode="RGB") # returns tensor

    return image, captions

# Loading data from Google Cloud Platform

In [None]:
!gcloud auth application-default login

In [None]:
def download_blob(bucket_name, source_blob_name, destination_file_name): # downloading the Karpathy split of the COCO dataset from Google Cloud Storage to Colab's VM

  client = storage.Client(project="Image Captioning")
  bucket = client.bucket(bucket_name)
  blob = bucket.blob(source_blob_name)
  blob.download_to_filename(destination_file_name)

In [None]:
download_blob("img-captioning", "images.cocodataset.org/zips/test2014.zip", "/content/test2014.zip")
download_blob("img-captioning", "images.cocodataset.org/zips/train2014.zip", "/content/train2014.zip")
download_blob("img-captioning", "images.cocodataset.org/zips/val2014.zip", "/content/val2014.zip")
download_blob("img-captioning", "archive.zip", "/content/archive.zip")

In [None]:
!unzip /content/test2014.zip -d /content/test2014/ && unzip /content/train2014.zip -d /content/train2014/ && unzip /content/archive.zip -d /content/archive/ && !unzip /content/val2014.zip -d /content/val2014/

In [None]:
!rm /content/test2014.zip /content/train2014.zip /content/val2014.zip /content/archive.zip

In [None]:
!rm /content/archive/dataset_flickr30k.json && rm /content/archive/dataset_flickr8k.json

In [None]:
!cd /content/archive && ls

dataset_coco.json  dataset_flickr30k.json  dataset_flickr8k.json


# Train/val/test splits

In [None]:
json_file = "/content/archive/dataset_coco.json"
root_train_dir = "/content/train2014/train2014/"
root_test_dir = "/content/test2014/test2014/"
root_val_dir = "/content/val2014/val2014/"

train_data = MiniCoco(json_file, root_train_dir, "train")
test_data = MiniCoco(json_file, root_test_dir, "test")
val_data = MiniCoco(json_file, root_val_dir, "val")

In [None]:
train_dataloader = DataLoader(train_data, batch_size=BATCHSIZE, shuffle=True, collate_fn=MiniCoco.pad) # padding all captions for uniformity because decoder performs better with captions of uniform length
test_dataloader = DataLoader(test_data, batch_size=BATCHSIZE, shuffle=False, collate_fn=MiniCoco.pad)
val_dataloader = DataLoader(val_data, batch_size=BATCHSIZE, shuffle=False, collate_fn=MiniCoco.pad)

In [None]:
print(f"Number of samples \t Train: {len(train_dataloader) * BATCHSIZE}, Val: {len(val_dataloader) * BATCHSIZE}, Test: {len(test_dataloader) * BATCHSIZE}")

# Train + val loop

In [None]:
#bertscore = load("bertscore") # Using BERTScore as a metric because it prioritizes semeantic meaning between the predicted and true caption.
loss_fn = nn.CrossEntropyLoss() # standard loss function for classification
optimizer = optim.Adagrad(decoder.parameters()) # good for sparse data. Words are one-hot encoded so they're sparse. With embeddings, some words are infrequent, so they can be sparse too.

In [None]:
def train_one_epoch():
  running_loss = 0
  f1 = []

  for batch_num, data in enumerate(train_dataloader):
    images, captions = data

    images = images.to(device)  # images: [B, C, H, W] batch, channels, height, width
    captions = captions.to(device) # captions [B, L] batch, sequence length

    sliced_captions = captions[:, :-1] # removing eos token
    optimizer.zero_grad() # zeroing gradients because they accumulate

    input_tensor = transform_encoder(images) # applying transformation and adding batch dimension
    feature_map = encoder(input_tensor) # getting a feature map
    outputs = decoder(feature_map, sliced_captions)

    predicted_decoded = train_data.decode(outputs)
    correct_decoded = train_data.decode(captions)
    f1.append(score(predicted_decoded, correct_decoded, lang='en', verbose=True)) # getting the f1 score (precision + recall) using bertscore

    loss = loss_fn(outputs, sliced_captions) # computing loss
    torch.nn.utils.clip_grad_norm_(decoder.parameters(), max_norm=1) # gradient clipping to prevent exploding gradients
    loss.backward() # backpropogating
    running_loss += loss.item() # summing loss
    optimizer.step()

  avg_loss = running_loss / len(train_dataloader)
  f1_avg = mean(f1)
  return avg_loss, f1_avg

In [None]:
def validate():
  f1 = []

  for vdata in val_dataloader:
    vimages, vcaptions = vdata

    # moving to CUDA
    vimages = vimages.to(device)
    vcaptions = vcaptions.to(device)

    input_tensor = transform_encoder(vimages) # applying data cleaning transforms
    feature_map = encoder(input_tensor) # getting feature map from encoder

    # calling generate() which is essentially just __forward__ but is fed BOS instead of ground truth captions and the function has a slightly different implementation
    voutputs = decoder.generate(feature_map, train_data.vocab["<bos>"], train_data.vocab["<eos>"], max_len=MAX_LEN)
    vpredicted_decoded = train_data.decode(voutputs)
    vcorrect_decoded = train_data.decode(vcaptions)
    f1.append(score(vpredicted_decoded, vcorrect_decoded, lang='en', verbose=True))

  f1_avg = mean(f1)
  return f1

In [None]:
best_acc = -1
loop = tqdm(range(EPOCHS)) # adding progress bar

for epoch in loop:
  decoder.train() # setting decoder to train mode
  avg_loss, f1_train = train_one_epoch()

  decoder.eval() # moving onto validation dataset

  with torch.no_grad(): # no need to find gradients during validation
    f1_val = validate() # validation accuracy

  loop.set_description(f"Avg Loss: {avg_loss} \t Train F1 Score: {f1_train} \t Val F1 Score: {f1_val}")

  if f1_val > best_acc: # if validation accuracy improves, save its progress
    # NOTE: I used Accuracy over loss because its a better indicator of test accuracy (reflects generalization) and is less noisy.

     torch.save({ # saving everything
    'decoder_state_dict': decoder.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'vocab': train_data.vocab,
    }, "full_model_checkpoint.pt")

     best_acc = f1_val # update best accuracy


# Test loop

In [None]:
def test():
  f1 = []

  for tdata in test_dataloader:
    timages, tcaptions = tdata

    # Moving to CUDA
    timages = timages.to(device)
    tcaptions = tcaptions.to(device)

    input_tensor = transform_encoder(timages)
    feature_map = encoder(input_tensor)

    # calling generate() which is essentially just __forward__ but is fed BOS instead of ground truth captions and the function has a slightly different implementation
    toutputs = decoder.generate(feature_map, train_data.vocab["<bos>"], train_data.vocab["<eos>"], max_len=MAX_LEN)
    tpredicted_decoded = train_data.decode(toutputs)
    tcorrect_decoded = train_data.decode(tcaptions)
    f1.append(score(tpredicted_decoded, tcorrect_decoded, lang='en', verbose=True)) # aggregating the predictions and captions to evaluate with BERTScore each epoch

  f1_avg = mean(f1)
  return f1_avg

In [None]:
with torch.no_grad(): # no need to compute gradients during testing
  decoder.eval() # switching to eval mode
  print(test())