**Task 3:** Image captioning using a CNN with NetVLAD as encoder and a single hidden 
layer RNN based decoder.


In [None]:
# **Task 3:** Image captioning using a CNN with NetVLAD as encoder and a single hidden 
# layer RNN based decoder.


In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
from PIL import Image
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader
from nltk.translate.bleu_score import sentence_bleu
from nltk.translate.bleu_score import SmoothingFunction


In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
class CaptioningDataset(Dataset):
    def __init__(self, root_dir, image_names_file, captions_file, transform=None, train=True):
        self.root_dir = root_dir
        self.transform = transform
        self.train = train
        self.image_captions = self.load_image_captions(image_names_file, captions_file)

    def load_image_captions(self, image_names_file, captions_file):
        image_captions = {} # Dictionary to store image names and captions
        
        # Load image names
        with open(image_names_file, "r") as f_images:
            image_names = f_images.read().splitlines()
        
        # Load captions
        with open(captions_file, "r") as f_captions:
            captions = f_captions.read().splitlines()

        for caption in captions:
            img_id, img_caption = caption.split("\t")
            img_id = img_id.split("#")[0]  # Extract image ID
            if img_id not in image_captions:
                image_captions[img_id] = []
            image_captions[img_id].append(img_caption)

        return [(image_name, image_captions[image_name]) for image_name in image_names]

    def __len__(self):
        return len(self.image_captions)

    def __getitem__(self, idx):
        image_name, captions = self.image_captions[idx]
        image_path = os.path.join(self.root_dir, image_name)
        image = Image.open(image_path).convert("RGB")

        if self.transform:
            image = self.transform(image)

        if self.train:
            return image, captions
        else:
            return image, captions[0]  # Return only the first caption for testing

In [None]:
class Vocabulary:
    def __init__(self):
        self.word_to_index = {}
        self.index_to_word = {}
        self.special_tokens = ["<start>", "<end>", "<unk>"]

    def generate_vocabulary(self, captions_file, vocabulary_file):
        words = set(self.special_tokens)  # Initialize with special tokens
        with open(captions_file, "r") as f:
            for line in f:
                caption = line.strip().split("\t")[1]
                words.update(caption.split())

        # Write words to vocabulary file
        with open(vocabulary_file, "w") as f:
            for word in words:
                f.write(f"{word}\n")

        # Define word_to_index and index_to_word dictionaries
        with open(vocabulary_file, "r") as f:
            for i, line in enumerate(f):
                word = line.strip()
                self.word_to_index[word] = i
                self.index_to_word[i] = word

    def get_word_index(self, word):
        # Return index of word, or index of <unk> if word not found
        return self.word_to_index.get(word, self.word_to_index.get("<unk>"))

    def get_index_word(self, index):
        return self.index_to_word.get(index)


In [None]:
# Define the NetVLAD layer
class NetVLAD(nn.Module):
    def __init__(self, num_clusters=2, dim=512, alpha=100.0, normalize_input=True):
        super(NetVLAD, self).__init__()
        self.num_clusters = num_clusters
        self.dim = dim
        self.alpha = alpha
        self.normalize_input = normalize_input
        self.conv = nn.Conv2d(dim, num_clusters, kernel_size=(1, 1), bias=True)
        self.centroids = nn.Parameter(torch.rand(num_clusters, dim))
        self._init_params()

    def _init_params(self):
        self.conv.weight = nn.Parameter(
            (2.0 * self.alpha * self.centroids).unsqueeze(-1).unsqueeze(-1)
        )
        self.conv.bias = nn.Parameter(
            - self.alpha * self.centroids.norm(dim=1)
        )

    def forward(self, x):
        N, C = x.shape[:2]

        if self.normalize_input:
            x = F.normalize(x, p=2, dim=1)

        soft_assign = self.conv(x).view(N, self.num_clusters, -1)
        soft_assign = F.softmax(soft_assign, dim=1)

        x_flatten = x.view(N, C, -1)
        
        residual = x_flatten.expand(self.num_clusters, -1, -1, -1).permute(1, 0, 2, 3) - \
            self.centroids.expand(x_flatten.size(-1), -1, -1).permute(1, 2, 0).unsqueeze(0)
        residual *= soft_assign.unsqueeze(2)
        vlad = residual.sum(dim=-1)

        vlad = F.normalize(vlad, p=2, dim=2)
        vlad = vlad.view(x.size(0), -1)
        vlad = F.normalize(vlad, p=2, dim=1)

        return vlad


In [None]:
# Define the Encoder
class Encoder(nn.Module):
    def __init__(self, num_clusters=2, dim=512): # k = 2
        super(Encoder, self).__init__()
        self.base_model = models.resnet18(pretrained=True)
        self.base_model = nn.Sequential(*list(self.base_model.children())[:-2])
        self.net_vlad = NetVLAD(num_clusters=num_clusters, dim=dim)

    def forward(self, x):
        x = self.base_model(x)
        x = self.net_vlad(x)
        return x


In [None]:
# Define the Decoder
class Decoder(nn.Module):
    def __init__(self, input_size, hidden_size, vocab_size):
        super(Decoder, self).__init__()
        self.hidden_size = hidden_size
        self.embedding = nn.Embedding(vocab_size, input_size)
        self.rnn = nn.RNN(input_size, hidden_size)
        self.out = nn.Linear(hidden_size, vocab_size)

    def forward(self, x, hidden):
        x = self.embedding(x).view(1, 1, -1)
        x, hidden = self.rnn(x, hidden)
        x = F.log_softmax(self.out(x[0]), dim=1)
        return x, hidden

    def initHidden(self):
        return torch.zeros(1, 1, self.hidden_size)


In [None]:
# Define the Image Captioning Model
class ImageCaptioningModel(nn.Module):
    def __init__(self, encoder, decoder):
        super(ImageCaptioningModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, x, target=None, max_length=50):
        encoder_output = self.encoder(x)
        hidden = self.decoder.initHidden()
        loss = 0

        if target is not None:
            target = torch.tensor(target)  # Convert target to tensor
            target = target.unsqueeze(0)   # Add batch dimension

            for i in range(target.size(1)):
                decoder_output, hidden = self.decoder(target[0, i], hidden)
                loss += F.nll_loss(decoder_output, target[0, i])

        return loss


In [None]:
class Trainer:
    def __init__(self, model, train_loader, optimizer, word_to_index):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.optimizer = optimizer
        self.word_to_index = word_to_index

    def pad_caption(self, caption, max_length):
        caption_indices = [self.word_to_index.get(word, self.word_to_index['<unk>']) for word in caption.split()]
        padded_indices = caption_indices + [self.word_to_index['<unk>']] * (max_length - len(caption_indices))
        return torch.tensor(padded_indices, device=device)

    def train(self, epochs):
        self.model.train()
        for epoch in range(epochs):
            total_loss = 0
            for i, (images, captions) in enumerate(self.train_loader):
                images = images.to(device)
                # Determine the maximum caption length for padding
                max_caption_length = max(len(caption.split()) for caption_tuple in captions for caption in caption_tuple)
                captions_tensor = []

                # Process and pad each caption in the batch
                for caption_tuple in captions:
                    for caption in caption_tuple:
                        caption_tensor = self.pad_caption(caption, max_caption_length)
                        captions_tensor.append(caption_tensor)

                # Stack all caption tensors
                captions_tensor = torch.stack(captions_tensor).to(device)

                self.optimizer.zero_grad()
                loss = self.model(images, captions_tensor)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()

                if (i + 1) % 10 == 0:
                    print(f"Epoch [{epoch+1}/{epochs}], Step [{i+1}/{len(self.train_loader)}], Loss: {loss.item():.4f}")
            print(f"Epoch [{epoch+1}/{epochs}], Average Loss: {total_loss/len(self.train_loader):.4f}")


In [None]:
class Evaluator:
    def __init__(self, model, word_to_index, index_to_word):
        self.model = model.to(device)
        self.word_to_index = word_to_index
        self.index_to_word = index_to_word

    def evaluate_bleu(self, reference_caption, generated_caption):
        reference_caption = reference_caption.split()
        generated_caption = generated_caption.split()

        smoothie = SmoothingFunction().method4
        bleu_1 = sentence_bleu([reference_caption], generated_caption, weights=(1, 0, 0, 0), smoothing_function=smoothie)
        bleu_2 = sentence_bleu([reference_caption], generated_caption, weights=(0.5, 0.5, 0, 0), smoothing_function=smoothie)
        bleu_3 = sentence_bleu([reference_caption], generated_caption, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smoothie)
        bleu_4 = sentence_bleu([reference_caption], generated_caption, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smoothie)

        return bleu_1, bleu_2, bleu_3, bleu_4

    def evaluate_model(self, dataset):
        self.model.eval()
        total_loss = 0
        total_bleu_1 = 0
        total_bleu_2 = 0
        total_bleu_3 = 0
        total_bleu_4 = 0

        with torch.no_grad():
            for i in range(len(dataset)):
                image, caption = dataset[i]
                image = image.unsqueeze(0).to(device)
                caption = caption.split(" ")
                target = torch.tensor([self.word_to_index[word] for word in caption]).to(device)

                loss = self.model(image, target)
                total_loss += loss.item()

                # Generate a caption
                hidden = self.model.decoder.initHidden(device)
                word = torch.tensor([self.word_to_index["<start>"]]).to(device)
                words = []

                for i in range(50):
                    output, hidden = self.model.decoder(word, hidden)
                    word_index = output.argmax().item()
                    word = torch.tensor([word_index]).to(device)
                    words.append(self.index_to_word[word_index])

                    if self.index_to_word[word_index] == "<end>":
                        break

                generated_caption = " ".join(words)
                bleu_1, bleu_2, bleu_3, bleu_4 = self.evaluate_bleu(caption, generated_caption)
                total_bleu_1 += bleu_1
                total_bleu_2 += bleu_2
                total_bleu_3 += bleu_3
                total_bleu_4 += bleu_4

            avg_loss = total_loss / len(dataset)
            avg_bleu_1 = total_bleu_1 / len(dataset)
            avg_bleu_2 = total_bleu_2 / len(dataset)
            avg_bleu_3 = total_bleu_3 / len(dataset)
            avg_bleu_4 = total_bleu_4 / len(dataset)

            print(f"Average Loss: {avg_loss:.4f}")
            print(f"Average BLEU@1: {avg_bleu_1:.4f}")
            print(f"Average BLEU@2: {avg_bleu_2:.4f}")
            print(f"Average BLEU@3: {avg_bleu_3:.4f}")
            print(f"Average BLEU@4: {avg_bleu_4:.4f}")


In [None]:
# Define transformations
data_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

# Define dataset and dataloader for training
train_dataset = CaptioningDataset(
    root_dir="./dataset/captioning/Images",
    image_names_file="./dataset/captioning/image_names.txt",
    captions_file="./dataset/captioning/captions.txt",
    transform=data_transform,
    train=True,
)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)

# Define dataset and dataloader for testing
test_dataset = CaptioningDataset(
    root_dir="./dataset/captioning/Images",
    image_names_file="./dataset/captioning/image_names.txt",
    captions_file="./dataset/captioning/captions.txt",
    transform=data_transform,
    train=False,
)

test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

# Show an image with its caption from the training dataset
image, captions = train_dataset[100]
plt.imshow(image.permute(1, 2, 0))
plt.axis('off')
plt.show()
print("Captions:", captions)

# Show an image with its caption from the testing dataset
image, caption = test_dataset[100]
plt.imshow(image.permute(1, 2, 0))
plt.axis('off')
plt.show()
print("Caption:", caption)


In [None]:
vocabulary = Vocabulary()
vocabulary.generate_vocabulary(
    captions_file="./dataset/captioning/captions.txt",
    vocabulary_file="./dataset/captioning/vocabulary.txt"
)

word_to_index = vocabulary.word_to_index
index_to_word = vocabulary.index_to_word
vocab_size = len(word_to_index)
print(f"Vocabulary size: {vocab_size}")

In [None]:
word_to_index["<start>"]
index_to_word[2939]

In [None]:
# print output size of the encoder
encoder = Encoder(num_clusters=2, dim=512)
encoder_output = encoder(image.unsqueeze(0))
print(encoder_output.size())

In [None]:
# def pad_caption(caption, max_length, word_to_index, device):
#     caption_indices = [word_to_index.get(word, word_to_index['<unk>']) for word in caption.split()]
#     padded_indices = caption_indices + [word_to_index['<unk>']] * (max_length - len(caption_indices))
#     return torch.tensor(padded_indices).to(device)

# def train(train_loader, word_to_index, device):
#     for i, (images, captions) in enumerate(train_loader):
#         images = images.to(device)
#         # Calculate the maximum length of captions in the batch
#         max_caption_length = max(len(caption.split()) for caption_tuple in captions for caption in caption_tuple)
#         captions_tensor = []

#         for caption_tuple in captions:
#             for caption in caption_tuple:
#                 caption_tensor = pad_caption(caption, max_caption_length, word_to_index, device)
#                 captions_tensor.append(caption_tensor)

#         captions_tensor = torch.stack(captions_tensor)  # Stack all caption tensors
#         captions_tensor.to(device)


In [None]:
# for i, (images, captions) in enumerate(train_loader):
#     print(images.size())
#     images.to(device)
#     # captions is a list of 5 tuples
#     # each tuple is a tuple of 32 captions
#     # caption[j][k] is a string

#     # I want to put them to device
#     # it has to be a pytorch tensor
#     captions_tensor = []
#     for j in range(5):
#         tuple_captions = captions[j]  # Get tuple of 32 captions
#         tuple_caption_tensors = []
#         for caption in tuple_captions:
#             caption_indices = [word_to_index.get(word, word_to_index['<unk>']) for word in caption.split()]
#             caption_tensor = torch.tensor(caption_indices).unsqueeze(0).to(device)
#             tuple_caption_tensors.append(caption_tensor)
#         captions_tensor.append(tuple_caption_tensors)
#     print(len(captions_tensor))

#     # print(type(captions))
#     # print(len(captions[0]))
#     break

In [None]:
# Define the model
encoder = Encoder(num_clusters=2, dim=512)
decoder = Decoder(input_size=encoder.net_vlad.dim * encoder.net_vlad.num_clusters, hidden_size=256, vocab_size=vocab_size)
model = ImageCaptioningModel(encoder, decoder)

# Define the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Create Trainer instance and train the model
trainer = Trainer(model, train_loader, optimizer, word_to_index)
trainer.train(epochs=10)

# Save the model
# torch.save(model.state_dict(), "image_captioning_model.pth")


In [None]:
# Load the model
model = ImageCaptioningModel(encoder, decoder)
model.load_state_dict(torch.load("image_captioning_model.pth"))

# Create the evaluator instance
evaluator = Evaluator(model, word_to_index, index_to_word)

# Evaluate the model
evaluator.evaluate_model(train_dataset)
