Preparing colab to import kaggle dataset

In [None]:
!pip install -q kaggle

from google.colab import files
_ = files.upload()

!mkdir ~/.kaggle
!cp kaggle.json ~/.kaggle/

!chmod 600 ~/.kaggle/kaggle.json


download and unzip dataset


In [None]:
!kaggle datasets download -d adityajn105/flickr8k
!unzip flickr8k.zip

Importing used libraries

In [None]:
import string
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils as utils
import nltk
from PIL import Image, ImageDraw, ImageFont
import os
from torchsummary import summary
import torchvision.models as models # import resnet18, ResNet18_Weights
from torch.nn.utils.rnn import pad_sequence
import numpy as np
from torch.utils.data import DataLoader,Dataset
import torchvision.transforms as T
import time

Importing used libraries

In [None]:
START_WORD  = '<SOS>'
END_WORD    = '<EOS>'
PADDING_WORD= '<PAD>'
UNKNOWN_WORD= '<UNK>'
GUID_TOKENS = [START_WORD, END_WORD, PADDING_WORD, UNKNOWN_WORD]

In [None]:
IMAGES_PATH = './Images/'
ANNOTATION_FILE = './captions.txt'

Vocabulary maps words to index and indexes to word

In [None]:

class Vocabulary():
  def __init__(self):
    self.idx2token = {0: START_WORD, 1: END_WORD, 2: PADDING_WORD, 3: UNKNOWN_WORD}
    self.token2idx = {v:k for k,v in self.idx2token.items()}
    self.max_cap_len = 0
  def __len__(self):
    return len(self.idx2token)

  def tokenize(self, text):
    tokens = list(filter(len, text.translate(str.maketrans('', '', string.punctuation+'\n')).lower().split()))
    if len(tokens) > self.max_cap_len:
      self.max_cap_len = len(tokens)
    return tokens

  def fill_vocab(self, sentence_list):
    idx = max(list(self.idx2token.keys()))
    for sentence in sentence_list:
      for word in self.tokenize(sentence):
        if word not in self.token2idx:
          idx += 1
          self.token2idx[word] = idx
          self.idx2token[idx] = word

  def numericalize(self, text):
    tokenized_text = self.tokenize(text)
    return [ self.token2idx[token] if token in self.token2idx else self.token2idx[UNKNOWN_WORD] for token in tokenized_text ]

  def stringify(self, idxVec):
    return filter(lambda word: word not in [START_WORD, END_WORD, PADDING_WORD], [ self.idx2token[idx] if idx in self.idx2token else UNKNOWN_WORD for idx in idxVec.tolist() ])

Dataset gets images file and tokenizes caption

In [None]:
class Flickr8KDataset(utils.data.Dataset):
  def __init__(self, images_dir, captions_file, transform=None, mode='train', train_test_split=0.1):
    self.images_dir = images_dir
    self.df = pd.read_csv(captions_file)
    self.transform = transform
    self.images = self.df["image"]
    self.captions = self.df["caption"]
    self.vocab = Vocabulary()
    self.vocab.fill_vocab(self.captions.tolist())
    self.mode = mode
    self.train_test_split = train_test_split

  def __len__(self):
    return int(len(self.df) * (1 - self.train_test_split)) if self.mode == 'train' else int(len(self.df) * self.train_test_split)

  def __getitem__(self,idx):
    if self.mode != 'train':
      idx = len(self) - idx
    caption = self.captions[idx]
    img_name = self.images[idx]
    img_location = os.path.join(self.images_dir,img_name)
    img = Image.open(img_location).convert("RGB")

    if self.transform is not None:
      img = self.transform(img)

    caption_vec = []
    caption_vec += [self.vocab.token2idx[START_WORD]]
    caption_vec += self.vocab.numericalize(caption)
    caption_vec += [self.vocab.token2idx[END_WORD]]

    return img, torch.tensor(caption_vec)

Collate is used for padding in each batch

In [None]:
class CapsCollate():
  def __init__(self, pad_idx, batch_first=False):
    self.pad_idx = pad_idx
    self.batch_first = batch_first

  def __call__(self, batch):
    imgs = [item[0].unsqueeze(0) for item in batch]
    imgs = torch.cat(imgs,dim=0)
    targets = [item[1] for item in batch]
    targets = pad_sequence(targets, batch_first=self.batch_first, padding_value = self.pad_idx)
    return imgs, targets

Defining constant values

In [None]:
BATCH_SIZE = 256
NUM_WORKER = 1

Common transforms

In [None]:
transforms = T.Compose([
    T.Resize(226),
    T.RandomCrop(224),
    T.ToTensor()
])

Create a dataset

In [None]:
dataset =  Flickr8KDataset(
    images_dir = IMAGES_PATH,
    captions_file = ANNOTATION_FILE,
    transform=transforms
)

Defining device type and vocab size

In [None]:
vocab_size = len(dataset.vocab)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
max_cap_len = dataset.vocab.max_cap_len

Create a dataloader

In [None]:
data_loader = utils.data.DataLoader(
    dataset = dataset,
    batch_size = BATCH_SIZE,
    num_workers = NUM_WORKER,
    shuffle = True,
    collate_fn = CapsCollate(pad_idx=dataset.vocab.token2idx[PADDING_WORD], batch_first=True)
)

a function to show image with its caption

In [None]:
def show_image(inp, title=None):
  inp = inp.numpy().transpose((1,2,0))
  plt.imshow(inp)
  if title is not None:
    toks = dataset.vocab.stringify(title)
    caption = ' '.join(toks)
    plt.title(caption)
  plt.pause(0.001)  # pause a bit so that plots are updated
img, caps = dataset[0]
show_image(img,caps)

Preview of resnet18 to understand architecture

In [None]:
checking_resnet = models.resnet18(pretrained=True)
print(checking_resnet)

CNN part of model
- uses resnet18 except last layer (fc)
- uses a linear layer to embed features to input size of LSTM


In [None]:
import torch.nn.functional as F

class CNN_Model(nn.Module):
    def __init__(self, embed_size, freeze=True):
        super(CNN_Model, self).__init__()
        resnet = models.resnet18(pretrained=True)
        if freeze:
            for param in resnet.parameters():
                param.requires_grad_(False)
        modules = list(resnet.children())[:-1]
        self.resnet = nn.Sequential(*modules)

        self.embed = nn.Linear(resnet.fc.in_features, embed_size)
        self.embed.weight.data.normal_(0., 0.02)
        self.embed.bias.data.fill_(0)
        self.dropout = nn.Dropout(0.5)
        self.batch = nn.BatchNorm1d(embed_size, momentum=0.01)

    def forward(self, images):
        features = self.resnet(images)
        features = features.view(features.size(0), -1)
        features = F.relu(self.batch(self.embed(features)))
        features = self.dropout(features)
        return features


RNN part
- uses trainable embeding layer
- uses LSTM as RNN to create captions
- uses a linear layer to translate hiddens to indexes

In [None]:
class LSTM_Model(nn.Module):
    def __init__(self, vocab_size, embed_size=300, hidden_size=256, num_layers=1, dropout=0.5):
        super(LSTM_Model, self).__init__()
        self.num_layers = num_layers
        self.hidden_size = hidden_size
        self.embed_size = embed_size
        self.vocabulary_size = vocab_size

        self.embed = nn.Embedding(self.vocabulary_size, self.embed_size)
        self.embed.weight.data.uniform_(-0.1, 0.1)

        self.dropout = nn.Dropout(dropout)  # Adding dropout layer with specified dropout probability

        self.lstm = nn.LSTM(self.embed_size, self.hidden_size, self.num_layers, batch_first=True)

        self.linear = nn.Linear(hidden_size, self.vocabulary_size)
        self.linear.weight.data.uniform_(-0.1, 0.1)
        self.linear.bias.data.fill_(0)

    def forward(self, features, captions):
        embeddings = self.embed(captions)
        embeddings = self.dropout(embeddings)  # Applying dropout after the embedding layer
        features = features.unsqueeze(1)
        embeddings = torch.cat((features, embeddings[:, :-1, :]), dim=1)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)
        return outputs


Merging two models

In [None]:
class ImgCap_Model(nn.Module):
  def __init__(self, vocab_size, embed_size = 300, hidden_size = 256, freeze = True):
    super(ImgCap_Model, self).__init__()
    self.cnn = CNN_Model(embed_size = embed_size, freeze = freeze)
    self.lstm = LSTM_Model(vocab_size = vocab_size, embed_size = embed_size, hidden_size = hidden_size)

  def forward(self, images, captions):
    features = self.cnn(images)
    outputs = self.lstm(features, captions)
    return outputs

Sample model 1
- all layers except last linear layer are freezed

In [None]:
model_freezed = ImgCap_Model(vocab_size)

In [None]:
EPOCHS = 20
learning_rate = 3e-4
criterion = nn.CrossEntropyLoss(ignore_index=dataset.vocab.token2idx[PADDING_WORD])
optimizer = optim.Adam(model_freezed.parameters(), lr=learning_rate)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

defining function of model training

In [None]:
data_loader_test = utils.data.DataLoader(
    dataset = dataset,
    batch_size = 1,
    num_workers = NUM_WORKER,
    shuffle = True,
    collate_fn = CapsCollate(pad_idx=dataset.vocab.token2idx[PADDING_WORD], batch_first=True)
)

In [None]:
def eval_model(model, data_loader, criterion, device):
  model.eval()
  data_loader.dataset.mode = 'test'
  loss = 0.0
  for idx ,(image, caption) in enumerate(iter(data_loader)):
    image = image.to(device)
    caption = caption.to(device)
    raw_cap = [dataset.vocab.token2idx[PADDING_WORD] for _ in range(caption.size(dim=1))]
    raw_cap = torch.tensor([raw_cap])
    raw_cap = raw_cap.to(device)
    result = None
    for i in range(len(caption)):
      result = model(image, raw_cap)
      result_t = result.contiguous().view(-1, vocab_size)
      raw_cap = result_t
      raw_cap = raw_cap.to(device)
    loss_val = criterion(result.contiguous().view(-1, vocab_size), caption.view(-1))
    loss += loss_val.item()
  return loss / len(data_loader)

In [None]:
def train_model(model, epochs, optimizer, criterion, data_loader, val_data_loader, device):
  steps = len(val_data_loader.dataset) / BATCH_SIZE
  history_loss = []
  history_loss_val = []
  oldtime = time.time()
  loss = 0
  for epoch in range(1, epochs + 1):
    oldtime = time.time()
    data_loader.dataset.mode = 'train'
    model.train()
    for idx, (image, captions) in enumerate(iter(data_loader)):
      image = image.to(device)
      captions = captions.to(device)

      model.zero_grad()

      outputs = model(image, captions)

      loss = criterion(outputs.contiguous().view(-1, vocab_size), captions.view(-1))

      loss.backward()

      optimizer.step()

      stats = 'Epoch [%d/%d], Step [%d/%d], loss: %.4f' % (epoch, epochs, idx, steps, loss.item())
      print('\r' + stats, end="")
    train_time = time.time()
    val_loss = eval_model(model, val_data_loader, criterion, device)
    stats = ', val_loss: %.4f, %.3f s' % (val_loss, train_time - oldtime)
    print(stats)
    history_loss.append(loss.item())
    history_loss_val.append(val_loss)
  return history_loss, history_loss_val

In [None]:
model_freezed.to(device)
loss_history, history_loss_val = train_model(model_freezed, EPOCHS, optimizer, criterion, data_loader, data_loader_test, device)

error plot of model 1 training

In [None]:
fig = plt.figure()
plt.plot([loss for loss in loss_history], color='crimson', linewidth=2, label='train loss')
plt.plot([loss for loss in history_loss_val], color='blue', linewidth=2, label='validation loss')
plt.xlabel('epoch')
plt.legend()
plt.grid()
plt.show()

In [None]:
model_freezed.eval()
for idx, (images, captions) in enumerate(iter(data_loader_test)):
  if idx % 5 != 2:
    continue
  images = images.to(device)
  captions = captions.to(device)
  raw_cap = [dataset.vocab.token2idx[PADDING_WORD] for _ in range(max_cap_len)]
  raw_cap = torch.tensor([raw_cap])
  raw_cap = raw_cap.to(device)
  result = None
  for i in range(max_cap_len):
    result = model_freezed(images, raw_cap)
    result = [i.argmax() for i in result.cpu().data.numpy()[0]]
    raw_cap = torch.tensor([result])
    raw_cap = raw_cap.to(device)
  show_image(images.cpu().data[0], raw_cap[0])
  if idx >= 20:
    break

sample model 2
- all layers are trainable

In [None]:
model_unfreezed = ImgCap_Model(vocab_size, freeze = False)

Create another dataloader with same dataset

In [None]:
dataset.mode = 'train'

In [None]:
data_loader_un = utils.data.DataLoader(
    dataset = dataset,
    batch_size = BATCH_SIZE,
    num_workers = NUM_WORKER,
    shuffle = True,
    collate_fn = CapsCollate(pad_idx=dataset.vocab.token2idx[PADDING_WORD], batch_first=True)
)

In [None]:
data_loader_test_un = utils.data.DataLoader(
    dataset = dataset,
    batch_size = 1,
    num_workers = NUM_WORKER,
    shuffle = True,
    collate_fn = CapsCollate(pad_idx=dataset.vocab.token2idx[PADDING_WORD], batch_first=True)
)

In [None]:
criterion_un = nn.CrossEntropyLoss(ignore_index=dataset.vocab.token2idx[PADDING_WORD])
optimizer_un = optim.Adam(model_unfreezed.parameters(), lr=learning_rate)

Training model 2

In [None]:
model_unfreezed.to(device)
loss_history_un, val_loss_history_un = train_model(model_unfreezed, EPOCHS, optimizer_un, criterion_un, data_loader_un, data_loader_test_un, device)

In [None]:
fig = plt.figure()
plt.plot([loss for loss in loss_history_un], color='crimson', linewidth=2, label='train loss')
plt.plot([loss for loss in val_loss_history_un], color='blue', linewidth=2, label='validation loss')
plt.xlabel('epoch')
plt.legend()
plt.grid()
plt.show()

In [1]:
model_unfreezed.eval()
for idx, (images, captions) in enumerate(iter(data_loader_test_un)):
  if idx % 5 != 2:
    continue
  images = images.to(device)
  captions = captions.to(device)
  raw_cap = [dataset.vocab.token2idx[PADDING_WORD] for _ in range(max_cap_len)]
  raw_cap = torch.tensor([raw_cap])
  raw_cap = raw_cap.to(device)
  result = None
  for i in range(max_cap_len):
    result = model_unfreezed(images, raw_cap)
    result = [i.argmax() for i in result.cpu().data.numpy()[0]]
    raw_cap = torch.tensor([result])
    raw_cap = raw_cap.to(device)
  show_image(images.cpu().data[0], raw_cap[0])
  if idx >= 20:
    break

NameError: name 'model_unfreezed' is not defined