# Import libraries and initialize setup

In [None]:
import os 
import pandas as pd  
import spacy  
import random
import statistics
from PIL import Image  
from tqdm import tqdm
import copy

import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.nn.utils.rnn import pad_sequence  
from torch.utils.data import DataLoader, Dataset
import torchvision.models as models

import matplotlib.pyplot as plt
from nltk.translate.bleu_score import sentence_bleu

spacy_eng = spacy.load("en")

from google.colab import drive
drive.mount('/content/drive')
%cd "/content/drive/My Drive/Master's DS/Computer Vision CSCI-2271/Final Project/"
torch.backends.cudnn.benchmark = True
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/.shortcut-targets-by-id/1slfDnD2htWQW0hyg7pOTJg8hMvdyXWyC/Final Project


# Define functions for loading and preparing the data

In [None]:
class Vocabulary:
    def __init__(self, freq_threshold):
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {"<PAD>": 0, "<SOS>": 1, "<EOS>": 2, "<UNK>": 3}
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer_eng(text):
        return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]

    def build_vocabulary(self, sentence_list):
        frequencies = {}
        idx = 4

        for sentence in sentence_list:
            for word in self.tokenizer_eng(sentence):
                if word not in frequencies:
                    frequencies[word] = 1

                else:
                    frequencies[word] += 1

                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer_eng(text)

        return [
            self.stoi[token] if token in self.stoi else self.stoi["<UNK>"]
            for token in tokenized_text
        ]


class FlickrDataset(Dataset):
    def __init__(self, root_dir, captions_file, indices=None, transform=None, freq_threshold=5):
        self.root_dir = root_dir
        self.df = pd.read_csv(captions_file)
        self.transform = transform

        # Get img, caption columns
        self.imgs = self.df["image"]
        self.captions = self.df["caption"]

        # Initialize vocabulary and build vocab
        self.vocab = Vocabulary(freq_threshold)
        self.vocab.build_vocabulary(self.captions.tolist())

        # Keep subset of data corresponding to the passed indices
        if indices:
          self.df = self.df.loc[indices].reset_index(drop=True)
        

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        caption = self.captions[index]
        img_id = self.imgs[index]
        img = Image.open(os.path.join(self.root_dir, img_id)).convert("RGB")

        if self.transform is not None:
            img = self.transform(img)

        numericalized_caption = [self.vocab.stoi["<SOS>"]]
        numericalized_caption += self.vocab.numericalize(caption)
        numericalized_caption.append(self.vocab.stoi["<EOS>"])

        return img, torch.tensor(numericalized_caption)

class Padder:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        imgs = [item[0].unsqueeze(0) for item in batch]
        imgs = torch.cat(imgs, dim=0)
        targets = [item[1] for item in batch]
        targets = pad_sequence(targets, batch_first=False, padding_value=self.pad_idx)

        return imgs, targets

def split_indicies(indices_list, split_percent):
  random.shuffle(indices_list)
  cutt_off = int(len(indices_list) * split_percent)
  return indices_list[0 : cutt_off], indices_list[cutt_off : ]


def expand_indices(indices):
  res = []
  for i in indices:
      for j in range(5):
        res.append(i+j)
  return res


def get_loader_and_indices(root_folder, annotation_file, transform):
    dataset = FlickrDataset(root_folder, annotation_file, transform=transform)

    shirnked_indicies = [i for i in range(0, len(dataset), 5)]
    train_val_indices, test_indices = split_indicies(shirnked_indicies, 0.9)
    train_indices, validation_indices = split_indicies(train_val_indices, 0.9)

    expanded_train_indices = expand_indices(train_indices)
    expanded_val_indices = expand_indices(validation_indices)
    expanded_test_indices = expand_indices(test_indices)

    train_dataset = FlickrDataset(root_folder, annotation_file, indices=expanded_train_indices, transform=transform)

    pad_idx = dataset.vocab.stoi["<PAD>"]
    
    train_loader = DataLoader(
        dataset=train_dataset,
        batch_size=32,
        num_workers=8,
        shuffle=True,
        pin_memory=True,
        collate_fn=Padder(pad_idx=pad_idx),
    )

    return train_loader, expanded_val_indices, expanded_test_indices, dataset

# Define Models - Encoder & Decoder

In [None]:
class Encoder_CNN(nn.Module):
    def __init__(self, embed_dim, dropout_rate=0.5):
        super(Encoder_CNN, self).__init__()
        self.dropout_rate = dropout_rate
        self.cnn = models.inception_v3(pretrained=True, aux_logits=False)
        self.cnn.fc = nn.Linear(self.cnn.fc.in_features, embed_dim)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(embed_dim, embed_dim)

        # We do not want to mess with pretrained CNN, so just fine tune last layers
        for name, param in self.cnn.named_parameters():
            if "fc.weight" in name or "fc.bias" in name:
                param.requires_grad = True
            else:
                param.requires_grad = False

    def forward(self, imgs):
        output = self.cnn(imgs)
        output = self.relu(output)
        output = self.dropout(output)
        output = self.fc(output)
        return output


class Decoder_LSTM(nn.Module):
    def __init__(self, embed_dim, hidden_dim, vocab_dim, dropout_rate=0.5):
        super(Decoder_LSTM, self).__init__()
        self.dropout_rate = dropout_rate
        self.dropout = nn.Dropout(dropout_rate)
        self.embedding = nn.Embedding(vocab_dim, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, 1)
        self.fc1 = nn.Linear(hidden_dim, hidden_dim)
        self.fc2 = nn.Linear(hidden_dim, vocab_dim)
        

    def forward(self, input, caps):
        word_embeds = self.embedding(caps)
        word_embeds = self.dropout(word_embeds)
        word_embeds = torch.cat((input.unsqueeze(0), word_embeds), dim=0)
        h, s = self.lstm(word_embeds)
        output = self.fc1(h)
        output = self.fc2(output)
        return output

class EncoderToDecoder(nn.Module):
    def __init__(self, embed_dim, hidden_dim, vocab_dim, dropout_rate=0.5):
        super(EncoderToDecoder, self).__init__()
        self.encoder = Encoder_CNN(embed_dim, dropout_rate)
        self.decoder = Decoder_LSTM(embed_dim, hidden_dim, vocab_dim, dropout_rate)

    def forward(self, imgs, caps):
        output = self.encoder(imgs)
        output = self.decoder(output, caps)
        return output


# Load data and setup objects for training

In [None]:
transform = transforms.Compose(
    [
        transforms.Resize((356, 356)),
        transforms.RandomCrop((299, 299)),
        transforms.ToTensor(),
        transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5)),
    ]
)

train_loader, validation_indices, test_indices, dataset = get_loader_and_indices(
    root_folder="/content/drive/My Drive/Master's DS/Computer Vision CSCI-2271/Final Project/data/flickr8k/Images",
    annotation_file="/content/drive/My Drive/Master's DS/Computer Vision CSCI-2271/Final Project/data/flickr8k/captions.txt",
    transform=transform
)


embed_dim = 512
dropout_rate = 0.5
hidden_dim = 512
vocab_dim = len(dataset.vocab)
lr = 4e-4

model = EncoderToDecoder(embed_dim, hidden_dim, vocab_dim, dropout_rate).to(device)
criterion = nn.CrossEntropyLoss(ignore_index=dataset.vocab.stoi["<PAD>"])
optimizer = optim.Adam(model.parameters(), lr=lr)

# Define training/validation and helper functions

In [None]:
class BLEU_SCORES(object):
  def __init__(self, bleu1, bleu2, bleu3, bleu4, ind_bleu1, ind_bleu2, ind_bleu3, ind_bleu4):
    self.bleu1 = bleu1
    self.bleu2 = bleu2
    self.bleu3 = bleu3
    self.bleu4 = bleu4 

    self.ind_bleu1 = ind_bleu1
    self.ind_bleu2 = ind_bleu2
    self.ind_bleu3 = ind_bleu3
    self.ind_bleu4 = ind_bleu4

  def __str__(self):
    return ("Cumulative Scores: " +
    str(self.bleu1) + " " + str(self.bleu2) +" "+ str(self.bleu3) +" "+ str(self.bleu4) +". " +
    "Individual Scores: " +
    str(self.ind_bleu1) + " " + str(self.ind_bleu2) + " " + str(self.ind_bleu3) + " " + str(self.ind_bleu4))


def validate(model, dataset, indicies, print_sample=False):
  # Returns BLEU scores for the passed dataset and indices
  # Randomly prints one of the datapoint as sample example if print_sample is True
  model.eval()
  total_bleu1 = 0
  total_bleu2 = 0
  total_bleu3 = 0
  total_bleu4 = 0

  total_ind_bleu1 = 0
  total_ind_bleu2 = 0
  total_ind_bleu3 = 0
  total_ind_bleu4 = 0

  l = len(indicies)
  rand_idx = random.randint(0, l-1)
  for i in range(0, len(indicies), 5):
    idx = indicies[i]
    img, _ = dataset[idx]
    candidate = generate(model, img.unsqueeze(0).to(device), dataset.vocab, 50)
    reference = []
    for offset in range(5):
      img2, caps = dataset[idx + offset]
      reference.append([dataset.vocab.itos[j] for j in caps.squeeze().tolist()])

      if print_sample and rand_idx//5*5 == i:
        print("idx", idx + offset)
        show_image(img2, "idx" + str(idx + offset))
    
    if print_sample and rand_idx//5*5 == i:
      print("Random example during validation")
      print("reference", reference)
      print("candidate", candidate)
      print('Individual 1-gram: %f' % sentence_bleu(reference, candidate, weights=(1, 0, 0, 0)))
      print('Individual 2-gram: %f' % sentence_bleu(reference, candidate, weights=(0, 1, 0, 0)))
      print('Individual 3-gram: %f' % sentence_bleu(reference, candidate, weights=(0, 0, 1, 0)))
      print('Individual 4-gram: %f' % sentence_bleu(reference, candidate, weights=(0, 0, 0, 1)))
      print('Cumulative 1-gram: %f' % sentence_bleu(reference, candidate, weights=(1, 0, 0, 0)))
      print('Cumulative 2-gram: %f' % sentence_bleu(reference, candidate, weights=(0.5, 0.5, 0, 0)))
      print('Cumulative 3-gram: %f' % sentence_bleu(reference, candidate, weights=(0.33, 0.33, 0.33, 0)))
      print('Cumulative 4-gram: %f' % sentence_bleu(reference, candidate, weights=(0.25, 0.25, 0.25, 0.25)))
      print("")

    bleu1 = sentence_bleu(reference, candidate, weights=(1, 0, 0, 0))
    bleu2 = sentence_bleu(reference, candidate, weights=(0.5, 0.5, 0, 0))
    bleu3 = sentence_bleu(reference, candidate, weights=(0.33, 0.33, 0.33, 0))
    bleu4 = sentence_bleu(reference, candidate, weights=(0.25, 0.25, 0.25, 0.25))

    ind_bleu1 = sentence_bleu(reference, candidate, weights=(1, 0, 0, 0))
    ind_bleu2 = sentence_bleu(reference, candidate, weights=(0, 1, 0, 0))
    ind_bleu3 = sentence_bleu(reference, candidate, weights=(0, 0, 1, 0))
    ind_bleu4 = sentence_bleu(reference, candidate, weights=(0, 0, 0, 1))

    total_bleu1 += bleu1
    total_bleu2 += bleu2
    total_bleu3 += bleu3
    total_bleu4 += bleu4

    total_ind_bleu1 += ind_bleu1
    total_ind_bleu2 += ind_bleu2
    total_ind_bleu3 += ind_bleu3
    total_ind_bleu4 += ind_bleu4

  # Divide l by 5 to get number of examples
  l = l//5
  model.train()
  return BLEU_SCORES(total_bleu1/l, total_bleu2/l, total_bleu3/l, total_bleu4/l, total_ind_bleu1/l, total_ind_bleu2/l, total_ind_bleu3/l, total_ind_bleu4/l)

def show_image(inp, title=None):
    inp = inp.numpy().transpose((1, 2, 0))
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)

def print_random_example(model, dataset, indicies):
    idx = random.choice(indicies)
    idx = idx//5 * 5
    model.eval()
    img, caps = dataset[idx]
    show_image(img, "Image")
    for i in range(5):
      actual = " ".join([dataset.vocab.itos[j] for j in dataset[idx+i][1].squeeze().tolist()])
      print("Example idx:", idx, "TARGET :", actual)
    print("Example OUTPUT:", " ".join(generate(model, img.unsqueeze(0).to(device), dataset.vocab, 50)))
    model.train()


def generate(model, image, vocab, max_len):
    res = []

    with torch.no_grad():
        x = model.encoder(image).unsqueeze(0)
        states = None
        for i in range(max_len):
            hiddens, states = model.decoder.lstm(x, states)
            output = model.decoder.fc1(hiddens.squeeze(0))
            output = model.decoder.fc2(output)
            pred = output.argmax(1)
            x = model.decoder.embedding(pred).unsqueeze(0)

            res.append(pred.item())

            if vocab.itos[pred.item()] == "<EOS>":
                break

    return [vocab.itos[idx] for idx in res]


def train(model, criterion, opt, epochs, train_loader, validation_indices, test_indices, dataset, validate_interval):
    BELUs_list = []

    avg_losses = []
    best_model = None
    best_bleu4 = 0.0
    for epoch in range(epochs):
        print_random_example(model, dataset, validation_indices)
        print_random_example(model, dataset, validation_indices)
        print_random_example(model, dataset, validation_indices)
        model.train()

        total_loss = 0
        for i, (imgs, caps) in tqdm(enumerate(train_loader), total=len(train_loader), leave=False):
            caps = caps.to(device)
            imgs = imgs.to(device)
            output = model(imgs, caps[:-1])

            loss = criterion(output.reshape(-1, output.shape[2]), caps.reshape(-1))
            total_loss += loss.item()

            if i % validate_interval == 0:
              BLEUs = validate(model, dataset, validation_indices)
              avg_loss = total_loss/(i+1)
              total_loss = 0


              BELUs_list.append((BLEUs.bleu1, BLEUs.bleu4))
              avg_losses.append(avg_loss)
              if best_bleu4 < BLEUs.bleu4:
                  best_bleu4 = BLEUs.bleu4
                  best_model = copy.deepcopy(model)

              print("BLEU 1", BLEUs.bleu1, "BLEU 4", BLEUs.bleu4, "LOSS", loss.item())

            opt.zero_grad()
            loss.backward(loss)
            opt.step()

        print("EPOCH:", epoch, "LOSS:", avg_loss, "BLEU 1-4", BLEUs)

    return BELUs_list, avg_losses, best_model


# Run training

In [None]:
BELUs_list, avg_losses, best_model = train(model, criterion, optimizer, 15, train_loader, validation_indices, test_indices, dataset, 100)
print(BELUs_list)
print(avg_losses)

Output hidden; open in https://colab.research.google.com to view.

# Get BLEU scores for Test data

In [None]:
BLEUs = validate(best_model, dataset, test_indices)
print("BLEU 1", BLEUs.bleu1, "BLEU 4", BLEUs.bleu4)

  self.dropout, self.training, self.bidirectional, self.batch_first)
Corpus/Sentence contains 0 counts of 3-gram overlaps.
BLEU scores might be undesirable; use SmoothingFunction().
Corpus/Sentence contains 0 counts of 4-gram overlaps.
BLEU scores might be undesirable; use SmoothingFunction().
