# Libraries

In [None]:
from google.colab import drive
drive.mount('/content/drive')

%cd /content/drive/MyDrive/ImageCaptioning

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/.shortcut-targets-by-id/13dGpwyY-c5FPJTEacGkw8XNTkbGVWT2D/ImageCaptioning


In [None]:
import re
import os
import cv2
import math
import glob
import spacy
import random
import numpy as np
import pandas as pd
from time import time
from PIL import Image
from tqdm import tqdm
import tensorflow as tf
from collections import Counter
import matplotlib.pyplot as plt
from nltk.translate.bleu_score import corpus_bleu, sentence_bleu, SmoothingFunction


import torch
import torch.nn.functional as F
from torchvision import transforms
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset, DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision.models import resnet50, ResNet50_Weights
from torch.nn import TransformerDecoder, TransformerDecoderLayer, TransformerEncoder, TransformerEncoderLayer



device = 'cuda' if torch.cuda.is_available() else 'cpu'

spacy_eng = spacy.load("en_core_web_sm")

# Utils

In [None]:
def map_target(in_caption):
    out_caption = list()
    for caption5s in in_caption:
        temp5 = list()
        for cap in caption5s:
            out_cap = list()
            for idx in cap:
                if idx == 0:
                    break
                else:
                    out_cap.append(dataset.vocab.index2word[idx])
            temp5.append(out_cap)
        out_caption.append(temp5)
    return out_caption


def map_predict(in_caption):
    out_caption = list()
    for idx in in_caption:
        if idx == 2:
            break
        else:
            out_caption.append(dataset.vocab.index2word[idx])
    return out_caption

# Dataset

In [None]:
class Vocabulary:
    def __init__(self, freq_threshold):
        self.index2word = {0:"<PAD>", 1:"<SOS>", 2:"<EOS>", 3:"<UNK>"}
        self.word2index = {v: k for k, v in self.index2word.items()}

        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.index2word)

    @staticmethod
    def tokenize(text):
        return [token.text.lower() for token in spacy_eng.tokenizer(text)]

    def build_vocab(self, sentence_list):
        frequencies = Counter()
        idx = 4

        for sentence in sentence_list:
            for word in self.tokenize(sentence):
                frequencies[word] += 1

                #add the word to the vocab if it reaches minum frequecy threshold
                if frequencies[word] == self.freq_threshold:
                    self.word2index[word] = idx
                    self.index2word[idx] = word
                    idx += 1

    def numericalize(self, text):
        """ For each word in the text corresponding index token for that word form the vocab built as list """
        tokenized_text = self.tokenize(text)
        return [self.word2index[token] if token in self.word2index else self.word2index["<UNK>"] for token in tokenized_text ]

In [None]:
class ImageCaptioningDataset(Dataset):
    """Image Captioning dataset"""

    def __init__(self, csv_file, transform, freq_threshold=5):
        self.dataframe = pd.read_csv(csv_file)
        self.transform = transform

        self.images = sorted(os.listdir("dataset/Images"))
        self.captions = self.dataframe['caption']

        self.vocab = Vocabulary(freq_threshold)
        self.vocab.build_vocab(self.captions.tolist())


    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        captions = self.captions[5 * idx: 5 * idx + 5].tolist()
        image_path = self.images[idx]

        image = cv2.imread(f'dataset/Images/{image_path}')
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        if self.transform:
            image = self.transform(image)

        caption_vec = []
        caption_vec.append(torch.full((50,), 0))
        for cap in captions:
            temp = self.vocab.numericalize(cap)
            caption_vec.append(torch.tensor(temp))

        targets = pad_sequence(caption_vec, batch_first=True, padding_value=0)

        return image, targets

In [None]:
dataset = ImageCaptioningDataset(
                    csv_file=f"dataset/captions.txt",
                    transform=transforms.Compose([
                    transforms.ToTensor(),
                    transforms.Resize(232, antialias=True),
                    transforms.CenterCrop(224),
                    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                        std=[0.229, 0.224, 0.225])]))



loader = DataLoader(
                dataset=dataset,
                batch_size=16,
                num_workers=2)

# Par-Inject

## Model

### Encoder

In [None]:
class Encoder(torch.nn.Module):
    def __init__(self, encoder_dim):
        super(Encoder, self).__init__()
        self.encoder_dim = encoder_dim

        # Load pretrained model and remove last fc layer
        pretrained_model = resnet50(weights=ResNet50_Weights.IMAGENET1K_V1)
        self.model = torch.nn.Sequential(*list(pretrained_model.children())[:-1]).to(device)

        # Freeze layer
        for param in self.model.parameters():
            param.requires_grad = False

        # Add a linear layer add the end of model
        self.linear = torch.nn.Linear(2048, self.encoder_dim).to(device)
        self.drop = torch.nn.Dropout(0.3)

    def forward(self, images):
        images = images.to(device)

        # Forward pass
        features = self.model(images)                     # (batch_size, 2048, 1, 1)
        features = features.view(images.shape[0], 1, -1)  # (batch_size, 1, 2048)
        features = self.linear(self.drop(features))       # (batch_size, 1, 512)
        features = features.squeeze(1)                    # (batch_size, 512)
        return features

### Decoder

In [None]:
class Decoder(torch.nn.Module):
    def __init__(self, vocab_size, embed_dim, encoder_dim, decoder_dim, num_layers):
        super(Decoder, self).__init__()
        self.vocab_size = vocab_size
        self.encoder_dim = encoder_dim
        self.decoder_dim = decoder_dim
        self.num_layers = num_layers



        # Embedding layer
        self.embedding = torch.nn.Embedding(vocab_size, embed_dim).to(device)

        # LSTM layer
        self.lstm = torch.nn.LSTM(input_size=embed_dim + encoder_dim,
                                  hidden_size=decoder_dim,
                                  bias=True,
                                  batch_first=True,
                                  num_layers=self.num_layers,
                                  bidirectional=False).to(device)

        # Linear layer
        self.linear1 = torch.nn.Linear(decoder_dim, decoder_dim).to(device)
        self.linear2 = torch.nn.Linear(decoder_dim, vocab_size).to(device)
        self.drop = torch.nn.Dropout(0.3)

    def init_hidden_state(self, features):
        hidden = torch.zeros(self.num_layers, features.size(0), self.decoder_dim).to(device)
        cell = torch.zeros(self.num_layers, features.size(0), self.decoder_dim).to(device)
        return hidden, cell

    def forward_step(self, features, embed_words):
        # Init hidden state
        hidden_state, cell_state = self.init_hidden_state(features)

        # Concat embedding and context vector
        features = features.unsqueeze(1)                             # (batch_size, feature_dim)
        features = features.repeat(1, embed_words.shape[1], 1)       # (batch_size, sequence_length, feature_dim)
        lstm_input = torch.cat((embed_words, features), dim=2)       # (batch_size, sequence_length, feature_dim + embed_dim)

        # Forward pass
        output, (hn, cn) = self.lstm(lstm_input, (hidden_state, cell_state))

        output = self.linear1(output)
        output = self.drop(output)
        output = self.linear2(output)

        return output

    def forward(self, features, sequences):
        # Embedding sequence
        sequence_length = len(sequences[0]) - 1
        sequences = sequences[:, :-1].to(device)
        embed_words = self.embedding(sequences)
        embed_words = embed_words.to(torch.float32)

        output = self.forward_step(features, embed_words)
        return output


    def predict(self, features, max_length, vocab):
        # Embedding sequence
        words = torch.full((features.shape[0], 1), vocab.word2index['<SOS>']).to(device)
        embed_words = self.embedding(words)
        features = features.to(device)

        predicted_captions = torch.zeros(features.shape[0], max_length)

        for idx in range(max_length):
            # Predict word index
            output = self.forward_step(features, embed_words)[:, -1]
            predicted_word_idx = output.argmax(dim=1)
            predicted_captions[:, idx] = predicted_word_idx.unsqueeze(0)[:, :]

            # Procedd with the next predicted word
            next_embed_word = self.embedding(predicted_word_idx).unsqueeze(0)
            next_embed_word = next_embed_word.permute(1, 0, 2)
            embed_words = torch.cat((embed_words, next_embed_word), dim=1)

        return predicted_captions

### Captioner

In [None]:
class Captioner(torch.nn.Module):
    def __init__(self, vocab_size,  vocab, embed_dim, encoder_dim, decoder_dim, num_layers):
        super().__init__()
        self.encoder =  Encoder(encoder_dim)
        self.decoder = Decoder(vocab_size, embed_dim, encoder_dim, decoder_dim, num_layers)
        self.vocab = vocab

    def forward(self, images, captions):

        image_fv = self.encoder(images)
        output = self.decoder(image_fv, captions)

        return output

    def generate_caption(self, image, max_length=20):
        feature = self.encoder(image)
        predicted_caption = self.decoder.predict(feature, max_length, self.vocab)

        return predicted_caption

## Test

In [None]:
def load_model(path):
    checkpoint = torch.load(path)
    model = Captioner(
        vocab_size=checkpoint['vocab_size'],
        vocab=checkpoint['vocab'],
        embed_dim=checkpoint['embed_dim'],
        encoder_dim=checkpoint['encoder_dim'],
        decoder_dim=checkpoint['decoder_dim'],
        num_layers=checkpoint['num_layers'],
    )
    model.load_state_dict(checkpoint['model_state_dict'])
    return model

In [None]:
model = load_model("models/new-par_inject/model_best.pth")
model.eval()
print("Load model successfully")

Load model successfully


In [None]:
with torch.no_grad():
    list_of_references = []
    hypotheses = []
    bleu_score = []
    for idx, (images, targets) in tqdm(enumerate(iter(loader))):
        images, targets = images.to(device), targets[:, 1:, :].tolist()


        mapped_targets = map_target(targets)
        list_of_references.extend(mapped_targets)

        predicted_captions = model.generate_caption(images).tolist()
        predicted_captions = list(map(map_predict, predicted_captions))

        hypotheses.extend(predicted_captions)
        score = corpus_bleu(list_of_references, hypotheses)
        bleu_score.append(score)

506it [10:42,  1.27s/it]


In [None]:
sum(bleu_score) / len(bleu_score)

0.39452743009536984