# COMP5623 Coursework on Image Caption Generation


### Import the necessary

In [0]:
import pandas as pd
import urllib.request
from PIL import Image
import cv2
from nltk.probability import FreqDist
from nltk.translate.bleu_score import sentence_bleu
from nltk import tokenize
import torch
import torch.nn as nn
import torchvision.models as models
from torch.nn.utils.rnn import pack_padded_sequence
from torch.utils.data import Dataset
import matplotlib.pyplot as plt
%matplotlib inline

### Download dataset

In [0]:
# Check what the current directory is
pwd

In [0]:
# Check current directory
ls

In [0]:
# Download the two zip files into current directory, '/content'
url = "https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_Dataset.zip"
urllib.request.urlretrieve(url, "Flickr8k_Dataset.zip")

url = "https://github.com/jbrownlee/Datasets/releases/download/Flickr8k/Flickr8k_text.zip"
urllib.request.urlretrieve(url, "Flickr8k_text.zip")

### Create folders to store dataset and captions

In [0]:
# Create a folder named 'data' in '/content'
!mkdir data

In [0]:
ls

In [0]:
# Enter the folder 'data' 
cd data

In [0]:
# Inside the folder 'data', create two more folders, one named 'images' and another named 'captions'
!mkdir images
!mkdir captions

In [0]:
# Go back to the previous directory, '/content/data'
cd /content

### Unzip datasets into specified folders

In [0]:
# Unzip the zip file, 'Flickr8k_Dataset.zip' into '/content/data/images'
!unzip Flickr8k_Dataset.zip -d /content/data/images

In [0]:
# Unzip the zip file, 'Flickr8k_Dataset.zip' into '/content/data/captions'
!unzip Flickr8k_text.zip -d /content/data/captions

In [0]:
# Enter the folder containing the images
cd /content/data/images/Flicker8k_Dataset/

### Define directory names

In [0]:
root = "/content/data/"
caption_dir = root + "captions/"                       
image_dir = root + "images/Flicker8k_Dataset/"                           
token_file = "Flickr8k.token.txt"

### Define a function to read in our ground truth text file, line by line. 

In [0]:
def read_lines(filepath):

    """ Open the ground truth captions into memory, line by line. """

    file = open(filepath, 'r')
    lines = []

    while True: 
        # Get next line from file until there's no more
        line = file.readline() 
        if not line: 
            break
        lines.append(line.strip())
    file.close() 
    
    return lines

Read ground truth captions (5 per image), into memory: 

In [0]:
lines = read_lines(caption_dir + token_file)

Read the first five lines:

In [0]:
lines[:5]

Delete the caption corresponding to the image ID "2258277193_586949ec62.jpg" because the image does not exist.

In [0]:
lines = [line for line in lines if "2258277193_586949ec62.jpg" not in line]

In [0]:
print("Total number of captions:", len(lines))

### Define a class called `Vocabulary()` 

Our vocabulary should consist of all the possible words which can be used, both as input into the model and as an output prediction. The model will not predict words which are not in our vocabulary. Every word in the vocabulary will have a unique integer starting from 0.

The function add_word() checks if a particular word already exists in the vocabulary. If it does not exist, create an entry in the dictionary. If it already exists, do nothing.

In [0]:
class Vocabulary(object): 

    """Simple vocabulary wrapper which maps every unique word to an integer ID. """
    
    def __init__(self):
        # Intially, set both the IDs and words to empty dictionaries.
        self.word2idx = {}
        self.idx2word = {}
        self.idx = 0

    def add_word(self, word):
        # If the word does not already exist in the dictionary, add it
        if not word in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            # Increment the ID for the next word
            self.idx += 1

    def __call__(self, word):
        # If we try to access a word in the dictionary which does not exist, return the <unk> id
        if not word in self.word2idx:
            return self.word2idx['<unk>']
        return self.word2idx[word]

    def __len__(self):
        return len(self.word2idx)

### Clean the captions, extract all words and store them in a list called `words`

Use...

*   `split()` function to split a string of text by a specific separator. 
*   `rstrip()` function to remove trailing whitespaces.
*   `lower()` function to convert all words into lowercase.





Extract all the words from ```lines```, and create a list of them in a variable ```words```, for example:

```words = ["a", "an", "the", "cat"... ]```

No need to worry about duplicates.


### Concatenate the cleaned captions into a single list called ```cleaned_captions```.

Keeping the same order, concatenate all the cleaned words from each caption into a string again, and add them all to a list of strings ```cleaned_captions```.

In [0]:
# Create an empty list to store the words in the captions later
words = []

# Create an empty list to store cleaned captions later
cleaned_captions = []

for i in lines:
  
  # Split the image ID from the caption text
  x = i.split("\t")
  
  # Define punctuation marks and remove them from the caption text
  punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''
  no_punct = ""
  for char in x[1]:
    if char not in punctuations:
      no_punct = no_punct + char
    x[1] = no_punct 
  
  # Convert the words to lower case, remove any trailing white spaces, split the words up into a list of words
  y = x[1].lower().rstrip().split()

  # Append the list of processed words into the empty list to form a single list of all words
  words = words + y
  
  # Append each caption into the cleaned_captions list
  separator = " "
  cleaned_captions.append(separator.join(y))
  
# Sanity checks... 
print("Total number of words from all captions: ", len(words))
print("Total number of unique words from all captions: ", len(set(words)))
print("Total number of captions: ", len(cleaned_captions))

In [0]:
# Sanity checks
cleaned_captions[:5]

In [0]:
# Sanity checks
words[:5]

### Filter out words which appear 3 times or less  

In [0]:
# Create a FreqDist object
freq_words = FreqDist(words)

# Create an empty dictionary to store filtered words later
filtered_words = dict()

# Filter out words which appear 3 times or less
for (k, v) in freq_words.items():
  if v >= 4:
    filtered_words[k] = v

# Check the length of the vocabulary containing unique words after filtering out infrequently used words 
print("Total number of words from all captions, after filtering: ", len(filtered_words))

### Add words to our vocabulary

In [0]:
# Create a vocab instance
vocab = Vocabulary()

# Add the token words first
vocab.add_word('<pad>')
vocab.add_word('<start>') 
vocab.add_word('<end>') 
vocab.add_word('<unk>')   

Add the rest of the words from the parsed captions:

In [0]:
# Add every unique word in filtered_words_dict into our vocabulary
for word in filtered_words.keys():
  vocab.add_word(word) 

In [0]:
# Check position of a random word in our vocabulary
vocab('air')

In [0]:
# Check the size of our vocabulary
print("Total number of words in our vocabulary:", len(vocab))

### Store all image IDs in a list

In [0]:
# Create an empty list to store image IDs later
image_ids = []

for i in lines:

  # Split the image ID from the caption text
  x = i.split("\t")

  # Chop off the index from the image IDs
  y = x[0].split(".")

  # Append the image IDs to list image_ids
  image_ids.append(y[0])
  
print("Total number of Image IDs:", len(image_ids))  

### Create a dataframe for the image paths and captions.

In [0]:
# Create data for data frame
data = {
    'image_id': image_ids,
    'path': [image_dir + image_id + ".jpg" for image_id in image_ids],
    'caption': cleaned_captions
}

# Create a data frame
data_df = pd.DataFrame(data, columns=['image_id', 'path', 'caption'])

In [0]:
# Check first five rows of data frame
data_df.head(n=5)

### Define a class called `Flickr8k` for the dataset.

In [0]:
class Flickr8k(Dataset):

    """ Flickr8k custom dataset compatible with torch.utils.data.DataLoader. """
    
    def __init__(self, df, vocab, transform=None):

        """ Set the path for images, captions and vocabulary wrapper.
        Args:
            df: df containing image paths and captions.
            vocab: vocabulary wrapper.
            transform: image transformer.
        """
        self.df = df
        self.vocab = vocab
        self.transform = transform

    def __getitem__(self, index):
      
        """ Returns one data pair (image and caption). """

        vocab = self.vocab

        caption = self.df['caption'][index]
        img_id = self.df['image_id'][index]
        path = self.df['path'][index]

        image = Image.open(open(path, 'rb'))

        if self.transform is not None:
            image = self.transform(image)

        # Convert caption (string) to word ids.
        tokens = caption.split()
        caption = []
        
        # Build the Tensor version of the caption, with token words
        caption.append(vocab('<start>'))
        caption.extend([vocab(token) for token in tokens])
        caption.append(vocab('<end>'))
        target = torch.Tensor(caption)
        return image, target

    def __len__(self):
        return len(self.df)

### Define a function called `caption_collate_fn()`

We need to overwrite the default PyTorch ```collate_fn()``` because our ground truth captions are sequential data of varying lengths. The default ```collate_fn()``` does not support merging the captions with padding.

You can read more about it here: https://pytorch.org/docs/stable/data.html#dataloader-collate-fn. 

In [0]:
def caption_collate_fn(data):

    """ Creates mini-batch tensors from the list of tuples (image, caption).
    Args:
        data: list of tuple (image, caption). 
            - image: torch tensor of shape (3, 256, 256).
            - caption: torch tensor of shape (?); variable length.
    Returns:
        images: torch tensor of shape (batch_size, 3, 256, 256).
        targets: torch tensor of shape (batch_size, padded_length).
        lengths: list; valid length for each padded caption.
    """
    
    # Sort a data list by caption length from longest to shortest.
    data.sort(key=lambda x: len(x[1]), reverse=True)
    images, captions = zip(*data)

    # Merge images (from tuple of 3D tensor to 4D tensor).
    images = torch.stack(images, 0)

    # Merge captions (from tuple of 1D tensor to 2D tensor).
    lengths = [len(cap) for cap in captions]
    targets = torch.zeros(len(captions), max(lengths)).long()
    for i, cap in enumerate(captions):
        end = lengths[i]
        targets[i, :end] = cap[:end]        
    return images, targets, lengths

### Transform the data

In [0]:
from torchvision import transforms

# Crop size matches the input dimensions expected by the pre-trained ResNet
data_transform = transforms.Compose([ 
    transforms.Resize(224),
    transforms.CenterCrop(224),  
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406),   # Using ImageNet norms
                         (0.229, 0.224, 0.225))])

Initialising the datasets. The only twist is that every image has 5 ground truth captions, so each image appears five times in the dataframe. We don't want an image to appear in more than one set.

In [0]:
unit_size = 5

train_split = 0.95 # Defines the ratio of train/test data.

# We didn't shuffle the dataframe yet so this works
train_size = unit_size * round(len(data_df)*train_split / unit_size)

dataset_train = Flickr8k(
    df=data_df[:train_size].reset_index(drop=True),
    vocab=vocab,
    transform=data_transform,
)

# Define a dataframe for generation of caption later
test_df=data_df[(train_size):].reset_index(drop=True)

dataset_test = Flickr8k(
    df=test_df,
    vocab=vocab,
    transform=data_transform,
)

### Define train and test data loaders

Write the dataloaders ```train_loader``` and ```test_loader``` - explicitly replacing the collate_fn:

```train_loader = torch.utils.data.DataLoader(
  ...,
  collate_fn=caption_collate_fn
)```

Set train batch size to 128 and be sure to set ```shuffle=True```

In [0]:
train_loader = torch.utils.data.DataLoader(
    dataset_train,
    batch_size=128, 
    shuffle=True,
    num_workers=2,
    collate_fn=caption_collate_fn
)

test_loader = torch.utils.data.DataLoader(
    dataset_test,
    batch_size=128, 
    shuffle=False,
    num_workers=2,
    collate_fn=caption_collate_fn
)

In [0]:
print("Number of batches in train loader:", len(train_loader))
print("Number of batches in test loader:", len(test_loader))

## Specify encoder and decoder models

In [0]:
class EncoderCNN(nn.Module):

    def __init__(self, embed_size):

        """Load the pretrained ResNet-152 and replace top fc layer."""

        super(EncoderCNN, self).__init__()
        resnet = models.resnet152(pretrained=True) # Pre-trained on ImageNet by default (with a download bar.) 
        layers = list(resnet.children())[:-1]      # Keep all layers except the last one 

        # Unpack the layers and create a new Sequential
        self.resnet = nn.Sequential(*layers) # * means unpack the layers and pass them as separate arguments. 
        # Sequential takes a list of layers - building a new network with all layers except the last one.
        
        # We want a specific output size, which is the size of our embedding, so
        # we feed our extracted features from the last fc layer (dimensions 1 x 1000)
        # into a Linear layer to resize
        self.linear = nn.Linear(resnet.fc.in_features, embed_size) 
        
        # Batch normalisation helps to speed up training
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)
        
    def forward(self, images):

        """Extract feature vectors from input images."""
        
        # Complete graph here. Remember to put the ResNet layer in a with torch.no_grad() block
        # don't touch the weights of the ResNet. They already know stuff. 

        # ResNet layer
        with torch.no_grad(): 
          features = self.resnet(images) 

        # Flattening
        features = features.reshape(features.size(0), -1) 

        # Fully-connected/linear layer
        features = self.linear(features) 

        # Batch normalisation
        features = self.bn(features) 

        return features

class DecoderRNN(nn.Module):

    def __init__(self, embed_size, hidden_size, vocab_size, num_layers, max_seq_length=20):

        """Set the hyper-parameters and build the layers."""

        super(DecoderRNN, self).__init__()
        
        self.embed = nn.Embedding(vocab_size, embed_size) 
        
        # Choose RNN or LSTM:
        self.rnn = nn.RNN(input_size=embed_size, 
                          hidden_size=hidden_size, 
                          num_layers=num_layers, 
                          nonlinearity='tanh', 
                          bias=True, 
                          batch_first=True, 
                          dropout=0, 
                          bidirectional=False) 

        # self.lstm = nn.LSTM(input_size=embed_size, 
        #                     hidden_size=hidden_size, 
        #                     num_layers=num_layers, 
        #                     bias=True, 
        #                     batch_first=True, 
        #                     dropout=0, 
        #                     bidirectional=False) 

        self.linear = nn.Linear(hidden_size, vocab_size) # produce the actual word output predictions which match the vocab size
        self.max_seq_length = max_seq_length
        
    def forward(self, features, captions, lengths):
      
        """Decode image feature vectors and generates captions."""
        
        embeddings = self.embed(captions) # Converts the captions into the feature vectors, then add them into the features (from the images).
        embeddings = torch.cat((features.unsqueeze(1), embeddings), 1)

        # What is "packing" a padded sequence? Knows the length of the longest thing, splits into sub-batches, and each sub-batch will go into RNN together.
        packed = pack_padded_sequence(embeddings, lengths, batch_first=True) 

        # Choose RNN or LSTM:
        hiddens, _ = self.rnn(packed) 
        # hiddens, _ = self.lstm(packed) 
        
        outputs = self.linear(hiddens[0])

        return outputs
    
    # This function will run thru the RNN/LSTM for you, return a list of word IDs (integers), then use vocab to convert word IDs to words.
    # Note: Run encoder.eval() before using encoder for forward pass for test images thru. 
    def sample(self, features, states=None):

        """Generate captions for given image features using greedy search."""
        
        sampled_ids = []
        inputs = features.unsqueeze(1)
        
        for i in range(self.max_seq_length):

            # Choose RNN or LSTM:         
            hiddens, states = self.rnn(inputs, states)          # hiddens: (batch_size, 1, hidden_size)
            # hiddens, states = self.lstm(inputs, states)
            
            outputs = self.linear(hiddens.squeeze(1))            # outputs:  (batch_size, vocab_size)
            _, predicted = outputs.max(1)                        # predicted: (batch_size)
            sampled_ids.append(predicted)
            inputs = self.embed(predicted)                       # inputs: (batch_size, embed_size)
            inputs = inputs.unsqueeze(1)                         # inputs: (batch_size, 1, embed_size)

        sampled_ids = torch.stack(sampled_ids, 1)                # sampled_ids: (batch_size, max_seq_length)
        
        return sampled_ids

In [0]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

## Set training parameters.

In [0]:
embed_size = 256
hidden_size = 512
num_layers = 1
learning_rate = 0.001
num_epochs = 5
log_step = 10
save_step = 1

Initialize the models and set the learning parameters.

In [0]:
import numpy as np

# Build the models
encoder = EncoderCNN(embed_size).to(device)
decoder = DecoderRNN(embed_size, hidden_size, len(vocab), num_layers).to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()

# Optimisation will be on the parameters of BOTH the enocder and decoder,
# but excluding the ResNet parameters, only the new added layers.
params = list(
    decoder.parameters()) + list(encoder.linear.parameters()) + list(encoder.bn.parameters()
)

optimizer = torch.optim.Adam(params, lr=learning_rate)

## Training the model


The loop to train the model. Feel free to put this in a function if you prefer.

In [0]:
# Train the models
total_step = len(train_loader)

training_loss = []

for epoch in range(num_epochs):

    # Before Training
    if epoch == 0:
      print("=== START OF TRAINING ===")
      print("==> Saving 'encoder-at-checkpoint-%d.ckpt' and 'decoder-at-checkpoint-%d.ckpt'..." % (epoch, epoch))
      print("\n")
      torch.save(encoder.state_dict(), 'encoder-at-checkpoint-{}.ckpt'.format(epoch))
      torch.save(decoder.state_dict(), 'decoder-at-checkpoint-{}.ckpt'.format(epoch))

    for i, (images, captions, lengths) in enumerate(train_loader):

        # Set mini-batch dataset
        images = images.to(device)
        captions = captions.to(device)

        # Packed as well as we'll compare to the decoder outputs
        targets = pack_padded_sequence(captions, lengths, batch_first=True)[0]

        # Forward, backward and optimize
        features = encoder(images)
        outputs = decoder(features, captions, lengths)

        loss = criterion(outputs, targets)
        
        # Zero gradients for both networks
        decoder.zero_grad()
        encoder.zero_grad()

        loss.backward() # For both networks
        optimizer.step()

        # Print log info every 10 batches 
        if i % log_step == 0:
            print('Epoch [{}/{}], Batch [{}/{}], Loss: {:.4f}'
                  .format(epoch+1, num_epochs, i, total_step, loss.item()))
            training_loss.append(loss.item()) 
            
    print("==> Saving 'encoder-at-checkpoint-%d.ckpt' and 'decoder-at-checkpoint-%d.ckpt'..." % (epoch+1, epoch+1))
    print("\n")
    torch.save(encoder.state_dict(), 'encoder-at-checkpoint-{}.ckpt'.format(epoch+1))
    torch.save(decoder.state_dict(), 'decoder-at-checkpoint-{}.ckpt'.format(epoch+1))

print("=== END OF TRAINING ===")

# Plot training loss 
plt.plot(training_loss, label = "Training Loss")   
plt.xlabel("Number of Mini-Batches")
plt.ylabel("Training Loss")
plt.show()  

## Define a function to load image

In [0]:
def load_image(image_path, transform=None):

  """
  This function reads in an image path, transform the image and loads it.
  """

  image = Image.open(image_path).convert('RGB')
  image = image.resize([224, 224], Image.LANCZOS)
  if transform is not None:
    image = transform(image).unsqueeze(0)
  return image

## Define a function to extract reference captions for an image

In [0]:
def extract_reference_captions(image_id):

  """
  This function extracts the five reference captions given a particular image ID.
  """

  reference_captions = []
  reference_captions_split = []

  for idx, line in enumerate(lines):
    if image_id in line:
      reference_captions_split.append(cleaned_captions[idx].split())
      reference_captions.append(cleaned_captions[idx])
      
  return reference_captions_split, reference_captions

## Define a function to generate captions by inputing a filepath

In [0]:
def generate_caption(filepath_image):

  """
  This function generates captions of an image given its filepath.
  """

  image = load_image(filepath_image, transform=data_transform)
  image_tensor = image.to(device)
  feature_image = encoder(image_tensor)
  sampled_ids_image = decoder.sample(feature_image)
  sampled_ids_image = sampled_ids_image[0].cpu().numpy()

  # Convert word_ids to words
  sampled_caption = []
  for word_id in sampled_ids_image:
    word = vocab.idx2word[word_id]
    sampled_caption.append(word)
    if word == '<end>':
      break
  
  return sampled_caption

## Define function to remove tokens from generated captions

In [0]:
def clean_generated_captions(caption):
  
  """
  This function removes <pad>, <start>, <end>, <unk> from the generated captions for BLEU score computation.
  """

  special_tokens = ["<start>", "<end>", "<pad>", "<unk>"]
  
  for i in range(len(special_tokens)):
    for token in caption:
      if token == special_tokens[i]:
        caption.remove(special_tokens[i])  

  return caption

## Create a dataframe to store lengths of captions of each image


In [0]:
# Create a list to store length of each caption
caption_length_list = []

# Iterate through each row in test_df and append the length of each caption to the list
for idx, caption in enumerate(test_df['caption']):
  caption_length_list.append(len(caption.split()))

# Convert the list to dataframe
caption_length_df = pd.DataFrame(caption_length_list, columns=['caption_length'])

# Concatenate test_df with caption_length_df
test_df2 = pd.concat([test_df, caption_length_df], axis=1)
test_df2.head()

## Create a data frame to store average length of reference captions per image

In [0]:
test_df3 = test_df2.groupby('image_id').mean().reset_index(drop=False)
test_df3.head()

## Create an empty dataframe to store generated captions and BLEU scores

In [0]:
test_df4 = pd.DataFrame(columns=['caption_0', 'BLEU_0', 'caption_1', 'BLEU_1', 'caption_2', 'BLEU_2', 
                                 'caption_3', 'BLEU_3', 'caption_4', 'BLEU_4', 'caption_5', 'BLEU_5'], index=range(0, 405))

## Define the paths of every image in test set

In [0]:
root_image = "/content/data/images/Flicker8k_Dataset/"
name_image = [image_id + ".jpg" for image_id in list(test_df3['image_id'])]

## ***Store generated caption and BLEU scores in dataframe

In [0]:
base_encoder = "encoder-at-checkpoint-{}.ckpt"
base_decoder = "decoder-at-checkpoint-{}.ckpt"

print("=== START OF CAPTION GENERATION ===")

# For loop to iterate through all images in the test set
for idx, image in enumerate(name_image):

  print("Index:", idx)
  reference_captions = extract_reference_captions(image)

  # Create an empty list to store generated captions and BLEU scores for each image
  x = []

  # For loop to iterate through epochs for each image
  for i in range(num_epochs+1):
    #print("Checkpoint:", i)
    j = base_encoder.format(i)
    k = base_decoder.format(i)
    
    # Build the models
    encoder = EncoderCNN(embed_size).eval()
    decoder = DecoderRNN(embed_size, hidden_size, len(vocab), num_layers)

    encoder = encoder.to(device)
    decoder = decoder.to(device)

    encoder.load_state_dict(torch.load(j))
    decoder.load_state_dict(torch.load(k))

    # Generated caption at each checkpoint given an image file path
    generated_caption_split = generate_caption(filepath_image=root_image+image) 
    generated_caption = ' '.join(generated_caption_split)
    #print("Generated caption:", generated_caption)
    x.append(generated_caption)
 
    # Get rid of special tokens in generated captions for BLEU score computation
    candidate_caption = clean_generated_captions(generated_caption_split)

    # Compute BLEU score
    score = sentence_bleu(reference_captions[0], candidate_caption, weights=(0.15, 0.15, 0.30, 0.40))
    #print("BLEU score:", score)
    x.append(score)

  # Fill the empty dataframe, test_df4 with generated captions and BLEU scores of each image
  test_df4.iloc[idx] = x

print("=== END OF CAPTION GENERATION ===")

In [0]:
test_df4.head()

## Concatenate dataframes and save resulting dataframe as .csv file

In [0]:
# Concatenate test_df3 with test_df4
test_results_df = pd.concat([test_df3, test_df4], axis=1)
test_results_df.head()

In [0]:
# Specify file path to save csv file
cd /content/data/images/

In [0]:
# Save as .csv file
test_results_df.to_csv("test_results_rnn.csv")
# test_results_df.to_csv("test_results_lstm.csv")

# Comparing RNN vs LSTM

In [0]:
# Load .csv file
test_results_df = pd.read_csv("test_results_rnn.csv")
# test_results_df = pd.read_csv("test_results_lstm.csv")

In [0]:
# Subset the dataframe with only BLEU scores
test_results_bleu = test_results_df[['image_id', 'caption_length', 'BLEU_0', 'BLEU_1', 'BLEU_2', 'BLEU_3', 'BLEU_4', 'BLEU_5']]
#test_results_bleu = test_results_bleu[['BLEU_0', 'BLEU_1', 'BLEU_2', 'BLEU_3', 'BLEU_4', 'BLEU_5']].astype(float)
test_results_bleu2 = test_results_bleu.iloc[:,1:].astype(float)

## Summary Statistics on BLEU Scores

In [0]:
test_results_bleu2.describe()

In [0]:
# Average BLEU score at each checkpoint
test_results_bleu2.describe().loc['mean'].iloc[1:]

In [0]:
# Average BLEU score across all 6 checkpoints
test_results_bleu2.describe().loc['mean'].iloc[1:].mean()

In [0]:
# Average BLEU score across all checkpoints, excluding Checkpoint 0
test_results_bleu2.describe().loc['mean'].iloc[2:].mean()

## Define function that divides test set into intervals and computes BLEU scores for each interval

In [0]:
def average_bleu_by_caption_length(data, bin_size):
    
    """ 
    This function divides the data into bins according to caption lengths,
    and calculates the average BLEU scores for each interval.
    """
    
    # Length of shortest caption
    caption_length_min = min(data['caption_length'])

    # Length of longest caption
    caption_length_max = max(data['caption_length'])

    # Set interval
    interval = (caption_length_max - caption_length_min) / bin_size

    # Set breaks
    breaks = np.arange(caption_length_min, caption_length_max+interval, interval)
    
    # Group data by caption_length according to interval breaks and computes average BLEU scores for each interval
    bleu_by_caption_length = round(data.groupby(pd.cut(data["caption_length"], breaks)).mean(), 3)
    
    # Rename columns
    bleu_by_caption_length.columns = ['average_caption_length', 'BLEU_0', 'BLEU_1', 'BLEU_2', 'BLEU_3', 'BLEU_4',
       'BLEU_5']
    
    return bleu_by_caption_length.reset_index(drop=False)

## Define function to plot BLEU scores

In [0]:
def plot_bleu_scores(data, bin_size):
    
  """ 
  This function plots BLEU scores across the checkpoints.
  """
    checkpoints = np.arange(6)
    legend_names = list(data['caption_length'])
    palette = plt.get_cmap('Set2')
    plt.figure(figsize=(8,6))
    for i in range(bin_size):
        plt.plot(checkpoints, data.iloc[i][2:], 
                 color=palette(i), 
                 label=str(legend_names[i]))
    plt.xlabel('Checkpoint Number')
    plt.ylabel('BLEU Score')
    plt.title('Variation of BLEU Scores Across Checkpoints (RNN)')
    # plt.title('Variation of BLEU Scores Across Checkpoints (LSTM)') # For LSTM
    plt.legend(title="Interval Ranges for BLEU Scores")
    plt.show()

## Plot BLEU scores at each checkpoint

In [0]:
# Set bin size
bin_size = 2

In [0]:
test_results_bleu_interval = average_bleu_by_caption_length(data=test_results_bleu2, bin_size=bin_size)
test_results_bleu_interval

In [0]:
plot_bleu_scores(data=test_results_bleu_interval, bin_size=bin_size)

## Determine checkpoints where BLEU scores are minimum and maximum

In [0]:
# Create an empty dataframe
test_df5 = pd.DataFrame(columns=['min_BLEU_index', 'max_BLEU_index'], index=range(0, 405))

# Iterate through all images and store the index corresponding to minimum and maximum BLEU scores in the dataframe
for idx, image_id in enumerate(test_results_bleu['image_id']):
    
    BLEU_min = np.where(test_results_bleu.iloc[idx].iloc[2:] == min(test_results_bleu.iloc[idx].iloc[2:]))[0].astype(int)[0]
    test_df5.iloc[idx][0] = BLEU_min
    
    BLEU_max = np.where(test_results_bleu.iloc[idx].iloc[2:] == max(test_results_bleu.iloc[idx].iloc[2:]))[0].astype(int)[0]
    test_df5.iloc[idx][1] = BLEU_max

test_df5

In [0]:
test_df5['max_BLEU_index'].value_counts(sort=False)

In [0]:
test_df5['min_BLEU_index'].value_counts(sort=False)

In [0]:
max_BLEU = list(test_df5['max_BLEU_index'].value_counts(sort=False))

In [0]:
min_BLEU = list(test_df5['min_BLEU_index'].value_counts(sort=False))

## Define function that plots bar graphs of number of checkpoints corresponding to minimum or maximum BLEU scores

In [0]:
def plot_bar_graph(bleu_list, min_or_max):
    
    """
    This function plots bar graphs of number of checkpoints corresponding to minimum or maximum BLEU scores
    """

    if min_or_max == "Minimum":
        x = 0
    else:
        x = 1
    
    plt.figure(figsize=(8,6))
    
    for i in range(x,6):
        plt.bar(i, height=bleu_list[i-1], color=(0.2, 0.4, 0.6, 0.6)) if x == 1 else plt.bar(i, height=bleu_list[i], color=(0.2, 0.4, 0.6, 0.6))
        plt.xlabel('Checkpoint Number')
        plt.ylabel('Number of Test Images')
        plt.annotate(bleu_list[i-1], (i-0.1, bleu_list[i-1]+2)) if x == 1 else plt.annotate(bleu_list[i], (i-0.1, bleu_list[i]+2))
        plot_title = "Checkpoints Corresponding to " + min_or_max + " BLEU Score Across Test Set"
        plt.title(plot_title)

## Plot bar graphs

In [0]:
plot_bar_graph(bleu_list=min_BLEU, min_or_max="Minimum")

In [0]:
plot_bar_graph(bleu_list=max_BLEU, min_or_max="Maximum")