# Deep Learning Project

## Build an Image Caption Generator System

In this project, I will design and implement a deep learning model that learns to properly caption images. I will train the model using data gotten from the Flickr8k Dataset. Transfer learning will be used on the images so as to accurately extract the features. ResNet101 is the pre-trained network used. 


Pytorch will be used to build the model. 

### Implementation
I started by importing the modules required for this project and loading in the data.

In [None]:
import os
import torch
import string
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image
from random import seed
from random import choice
from torch import nn, optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
from torch.utils.data.sampler import SubsetRandomSampler

seed(10)
%matplotlib inline

In [None]:
#Checking if GPU is available
train_on_gpu = torch.cuda.is_available()

if not train_on_gpu:
    print('CUDA is not available.  Training on CPU ...')
else:
    print('CUDA is available!  Training on GPU ...')

In [None]:
#Here we load in the captions data and view the first five rows of the dataframe
#We display an example image from our dataset

file_path = "Flickr8k_Dataset"
df = pd.read_csv("Flickr8k_text/captions.txt")

train_size = len(df) - 200
df, test_df = df.head(train_size), df.tail(1000)
test_df.reset_index(inplace=True)
test_list = test_df["image"].tolist()

image = Image.open(os.path.join(file_path, df["image"][0])).convert("RGB")
display(image)
test_list = test_df["image"].unique().tolist()
len(test_list)

Here we define transforms for our data. Most of the pretrained models require the input to be 224x224 images. Also, we'll need to match the normalization used when the models were trained.

In [None]:
transform = transforms.Compose([transforms.Resize((224, 224)),
                                 transforms.ToTensor()])

train_transforms = transforms.Compose([transforms.RandomRotation(40),
                                       transforms.RandomResizedCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])])

test_transforms = transforms.Compose([transforms.Resize(255),
                                      transforms.CenterCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406],[0.229, 0.224, 0.225])])

### Building Vocabulary and Dataset
Here I built a vocabuary which contains all the words in the captions data and I created a dictionary which maps out the words to an integer representing the number of times those words appear in the data. I also built a dataset which transforms each image in the data and retrieves each caption and adds it to the vocabulary. 

In [None]:
#This function removes punctuation from the captions and returns a list of the lower-cased words
def clean_captions(captions):
    caption = [char for char in captions if char not in string.punctuation]
    caption = "".join(caption)
    caption = caption.split(" ")
    caption = [word.lower() for word in caption]
    return caption

#We define our vocabulary class 

class Vocab:
    def __init__(self, frequency):
        
        """
        :param frequency: Limit on the minimum occurence of each word. This removes irrelevant words from the vocabulary
        """
            
        self.int_to_vocab = {0:"pad", 1:"startseq", 2:"endseq", 3:"unknown"}
        self.vocab_to_int = {"pad":0, "startseq":1, "endseq":2, "unknown":3}
        self.frequency = frequency
        
    def __len__(self):
        return len(self.int_to_vocab)

    def build_vocab(self, captions):
        word_count = {}
        index = 4
        for caption in captions:
            for word in clean_captions(caption):
                if word not in word_count:
                    word_count[word] = 1
                else:
                    word_count[word] += 1
                if word_count[word] == self.frequency:
                    self.vocab_to_int[word] = index
                    self.int_to_vocab[index] = word
                    index += 1

    def numericalize(self, caption):
        tokens = clean_captions(caption)
        return [
            self.vocab_to_int[token] if token in self.vocab_to_int
            else self.vocab_to_int["unknown"] for token in tokens
        ]

In [None]:
class MyDataset(Dataset):
    def __init__(self, file_path, df, transform, frequency=10):
        """
        :param file_path: File path of our image data
        :param df: Our captions dataframe
        :param transform: For image transformations
        :param frequency: Limit on the minimum occurence of each word.
    
        """
        self.file_path = file_path
        self.df = df
        self.transform = transform
        self.images = self.df["image"]
        self.captions = self.df["caption"]
        self.vocab = Vocab(frequency)
        self.vocab.build_vocab(self.captions.tolist())

    def __len__(self):
        return len(self.df)

    def __getitem__(self, key):
        
        """
        :param key: Index that enables us to retrieve each image and corresponding caption from our dataframe
        """
        caption = self.captions[key]
        image_id = self.images[key]
        image = Image.open(os.path.join(self.file_path, image_id)).convert("RGB")

        image = self.transform(image)

        numericalized_caption = [self.vocab.vocab_to_int["startseq"]]
        numericalized_caption += self.vocab.numericalize(caption)
        numericalized_caption.append(self.vocab.vocab_to_int["endseq"])

        return image, torch.tensor(numericalized_caption)

In [None]:
class Collate:
    #This function pads the caption so all captions are of equal length
    
    def __init__(self, pad):
        """
        param pad: To pad variable length tensors (int)
        """
        self.pad = pad

    def __call__(self, batch):
        image = [item[0].unsqueeze(0) for item in batch]
        image = torch.cat(image, dim=0)
        targets = [item[1] for item in batch]
        targets = nn.utils.rnn.pad_sequence(targets, padding_value=self.pad)

        return image, targets

In [None]:
#Here we use SubsetRandomSampler to retirve data for our training and validation sets

num_train = len(df)
indices = list(range(num_train))
np.random.shuffle(indices)
split = int(np.floor(num_train * 0.8))
np.random.shuffle(indices)
train_idx, val_idx = indices[:split], indices[split:]

train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(val_idx)

print(len(val_idx))
print(len(train_idx))

In [None]:
#We define a dataloader which loads our data in batches

def dataloader(file_path, df, transform, batch_size=64, pin_memory=True, data="train"):
    """
    param file_path: path to file
    param df: dataframe consisting of the captions dataset
    param transform: for transforming our image data
    param batch_size: number of samples to be yielded
    param pin_memory: setting to true enables faster daat transfer to CUDA-enabled GPUs
    param data: for building our training and validation sets
    """
    dataset = MyDataset(file_path, df, transform=transform)
    pad = dataset.vocab.vocab_to_int["pad"]
    
    if data == "train":
        trainloader = DataLoader(dataset=dataset, batch_size=batch_size, collate_fn=Collate(pad=pad), pin_memory=pin_memory, sampler=train_sampler)

        return trainloader, dataset

    else:
        validloader = DataLoader(dataset=dataset, batch_size=batch_size, collate_fn=Collate(pad=pad), pin_memory=pin_memory, sampler=valid_sampler)

        return validloader

def main(): 
    train_loader, dataset = dataloader(file_path, df, transform=transform, data="train")
    val_loader = dataloader(file_path, df, transform=transform, data="val")
    for images, captions in val_loader:
        print(captions.shape)
        print(images.shape)

if __name__ == "__main__":
    main()

In [None]:
res_model = models.resnet152(pretrained=True) 
for name, param in res_model.named_parameters():
    if "fc.weight" in name or "fc.bias" in name:
        param.requires_grad = True
    else:
        param.requires_grad = False
    
if train_on_gpu:
    res_model.cuda()

### Model Building

In [None]:
class Encoder(nn.Module):
    def __init__(self, embed_size, model):
        """
        :param embed_size: number of expected features
        :param model: pre-trained model to be used for transfer learning
        """
        super(Encoder, self).__init__()
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(p=0.3)
        modules = list(model.children())[:-1]      # delete the last fc layer.
        self.model = nn.Sequential(*modules)
        self.linear = nn.Linear(model.fc.in_features, embed_size)
        self.bn = nn.BatchNorm1d(embed_size, momentum=0.01)
        
    def forward(self, images):
        features = self.model(images) 
        features = features.reshape(features.size(0), -1) # 64 * 2048
        features = self.bn(self.linear(features))
        
        return self.dropout(self.relu(features))

class Decoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers):
        
        """
        :param embed_size: number of expected features
        :param hidden_size: number of features in the hidden layer 
        :param vocab_size: size of dictionary of embeddings
        :param num_layers: number of recurrent layers
        """
        super(Decoder, self).__init__()

        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(p=0.5)
        
    def forward(self, features, captions):
        embeddings = self.dropout(self.embed(captions))
        embeddings = torch.cat((features.unsqueeze(0), embeddings), dim=0)
        hiddens, _ = self.lstm(embeddings)
        output = self.linear(hiddens)
        
        return output

class CaptionModel(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers, model):
        super(CaptionModel, self).__init__()
        self.encoder = Encoder(embed_size, model)
        self.decoder = Decoder(embed_size, hidden_size, vocab_size, num_layers)
        
    def forward(self, images, captions):
        features = self.encoder(images)
        output = self.decoder(features, captions)
        
        return output
        
    def captions(self, image, vocab, max_length=50):
        
        """
        :param image: image to be captioned
        :param vocab: the vocab list
        :param max_length:
        """
        result = []
        with torch.no_grad():
            x = self.encoder(image).unsqueeze(0)
            states = None
            for _ in range(max_length):
                hidden, states = self.decoder.lstm(x, states)
                output = self.decoder.linear(hidden.squeeze(0))
                predicted = output.argmax(1)

                result.append(predicted.item())
                x = self.decoder.embed(predicted).unsqueeze(0)
                if vocab.int_to_vocab[predicted.item()] == "endseq":
                    break
                    
            return [vocab.int_to_vocab[index] for index in result]
        

In [None]:
#function which un normalizes selected images and displays them 

def imshow(image_id):
    image = test_transforms(Image.open(os.path.join(file_path, image_id)).convert("RGB"))
    c,w,h = image.shape
    modified_img = np.reshape(image, (1,c,w,h))
    title = " ".join(model.captions(modified_img.cuda(), dataset.vocab))
    
    image = image.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    image = std * image + mean
    image = np.clip(image, 0, 1)
    plt.imshow(image)
    plt.title(title)
    plt.pause(0.001)

In [None]:
train_loader, dataset = dataloader(file_path, df, transform=train_transforms)
valid_loader = dataloader(file_path, df, transform=test_transforms, data="val")


epochs = 100
val_loss_min = np.Inf
model = CaptionModel(embed_size=512, hidden_size=512, vocab_size=len(dataset.vocab), num_layers=2, model=res_model).cuda()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=4e-4)
for e in range(epochs):
    running_loss = 0
    model.train()
    for index, (images, captions) in enumerate(train_loader):
        if train_on_gpu:
            images, captions = images.cuda(), captions.cuda()
        optimizer.zero_grad()
        output = model(images, captions[:-1])
        
        loss = criterion(output.reshape(-1, output.shape[2]), captions.reshape(-1))
        
        loss.backward() 
        optimizer.step()
        
        running_loss += loss.item()
        
    else:
        val_loss = 0
        with torch.no_grad():
            model.eval()
            for index, (images, captions) in enumerate(valid_loader):
                if train_on_gpu:
                    images, captions = images.cuda(), captions.cuda()
                output = model(images, captions[:-1])
                loss = criterion(output.reshape(-1, output.shape[2]), captions.reshape(-1))
                val_loss += loss.item()
        train_loss = running_loss/len(train_loader)
        valid_loss = val_loss/len(valid_loader)
        
        print(f"\nEpoch: {e+1}/{epochs}",
              f"Training Loss: {train_loss}",
              f"Validation Loss: {valid_loss}")
        
        if valid_loss <= val_loss_min:
            print(f"Validation loss decreased ({val_loss_min} --> {valid_loss}). Saving model ...")
            torch.save(model.state_dict(), 'model.pt')
            val_loss_min = valid_loss
        model.eval()
        selection = choice(test_list)
        imshow(selection)