1. Encoder: Design a CNN based encoder that handles the variable sized
images.
2. Decoder: Design a RNN / LSTM based decoder which generates the
captions given the encoded image input. Note that you can either design
a word-level or a character-level LSTM for generating the caption.
3. Training strategy: Use cross-entropy as the loss function and [teacher forcing](https://machinelearningmastery.com/teacher-forcing-for-recurrent-neural-networks/) for training the decoder. Donâ€™t forget to use START and END
tokens to allow variable length caption outputs in the decoder.


In [1]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

Mounted at /content/drive


In [2]:
import os
os.chdir('/content/drive/MyDrive/')
os.getcwd()

'/content/drive/MyDrive'

In [None]:
!pip install torch
!pip install torchvision
!pip install scikit-image

In [52]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils

from skimage import io, transform

import matplotlib.pyplot as plt # for plotting
import numpy as np
import re
import os

In [53]:
torch.cuda.is_available()

False

## Image Tranformations

In [54]:
class Rescale(object):
    """Rescale the image in a sample to a given size.

    Args:
        output_size (tuple or int): Desired output size. If tuple, output is
            matched to output_size. If int, smaller of image edges is matched
            to output_size keeping aspect ratio the same.
    """

    def __init__(self, output_size):
        assert isinstance(output_size, (int, tuple))
        self.output_size = output_size

    def __call__(self, image):
        h, w = image.shape[:2]
        if isinstance(self.output_size, int):
            if h > w:
                new_h, new_w = self.output_size * h / w, self.output_size
            else:
                new_h, new_w = self.output_size, self.output_size * w / h
        else:
            new_h, new_w = self.output_size

        new_h, new_w = int(new_h), int(new_w)
        img = transform.resize(image, (new_h, new_w))
        return img


class ToTensor(object):
    """Convert ndarrays in sample to Tensors."""

    def __call__(self, image):
        # swap color axis because
        # numpy image: H x W x C
        # torch image: C X H X W
        image = image.transpose((2, 0, 1)) # Order of destination axis
        # image = image.type(torch.FloatTensor)
        # image = image.float

        return image


IMAGE_RESIZE = (256, 256)

# Sequentially compose the transforms
img_transform = transforms.Compose([Rescale(IMAGE_RESIZE), ToTensor()]) # Applied sequentially


In [None]:
import nltk

nltk.download('stopwords')
nltk.download('punkt')

from nltk.corpus import stopwords
import string
from nltk.tokenize import word_tokenize

## Caption Processing

In [102]:
class CaptionsPreprocessing:
    """Preprocess the captions, generate vocabulary and convert words to tensor tokens

    Args:
        captions_file_path (string): captions tsv file path
    """
    def __init__(self, captions_file_path):
        self.captions_file_path = captions_file_path

        # Read raw captions
        self.raw_captions_dict = self.read_raw_captions()

        # Preprocess captions
        self.captions_dict = self.process_captions()

        # Create vocabulary
        self.vocab, self.inv_vocab = self.generate_vocabulary()
        
    def read_raw_captions(self):
        """
        Returns:
            Dictionary with raw captions list keyed by image ids (integers)
        """

        captions_dict = {}
        with open(self.captions_file_path, 'r', encoding='utf-8') as f:
            for img_caption_line in f.readlines():
                img_captions = img_caption_line.strip().split('\t')
                image_path = "/content/drive/MyDrive/data/train_data_main/" + img_captions[0]
                
                captions_dict[img_captions[0]] = img_captions[1]
                # if os.path.exists(image_path):
                #     captions_dict[img_captions[0]] = img_captions[1]

        return captions_dict

    def process_captions(self):
        """
        Use this function to generate dictionary and other preprocessing on captions
        """

        raw_captions_dict = self.raw_captions_dict

        # Do the preprocessing here
        # Can remove the stopwords and gibberish in the caption
        stop_words = stopwords.words('english')
        punctuation = list(string.punctuation)

        for key, value in raw_captions_dict.items():
            cleaned_caption = re.sub('[^A-Za-z0-9]+', ' ', value) #Extra space removal
            tokens = word_tokenize(cleaned_caption)
            cleaned_tokens = [token for token in tokens if token not in stop_words and token not in punctuation] # Remove stopwords and punctuation
            cleaned_caption = "[START] " + " ".join(cleaned_tokens) + " [END]"
            raw_captions_dict[key] = cleaned_caption        

        captions_dict = raw_captions_dict

        return captions_dict

    def generate_vocabulary(self):
        """
        Use this function to generate dictionary and other preprocessing on captions
        """

        captions_dict = self.captions_dict
        
        vocabulary = {}
        inv_vocabulary = {}
        
        max_caption = 0
        idx = 1

        for key, value in captions_dict.items():
            val = value.split()
            max_caption = max(max_caption, len(val))

            for i in val:
                if i not in vocabulary.keys():
                    vocabulary[i] = idx
                    inv_vocabulary[idx] = i
                    idx+=1
        self.max_caption = max_caption

        # Generate the vocabulary        

        return vocabulary, inv_vocabulary
    
    def get_caption(self, tensor_tokens):
        caption = []
        for i in tensor_tokens:
            if int(i) in self.inv_vocab:
                caption.append(self.inv_vocab[int(i)])
            else:
                caption.append('TOKEN')

        return " ".join(caption)

    def captions_transform(self, img_caption):
        """
        Use this function to generate tensor tokens for the text captions
        Args:
            img_caption_list: List of captions for a particular image
        """
        vocab = self.vocab

        caption = img_caption.split(" ")
        caption_mapped = np.zeros(self.max_caption)
        
        # print(img_caption, caption)

        for i in range(len(caption)):
            try:
                caption_mapped[i] = self.vocab[caption[i]]
            except:
                print(img_caption, caption, i)
        
        caption = torch.from_numpy(caption_mapped)

        # Generate tensors

        # return torch.zeros(len(img_caption_list), 10)
        # Input: 5
        # Vocab: 100
        # Output: 5?

        # Convert array to tensor
        # Use collate function (padding)
        # DataLoader

        return caption

# Set the captions tsv file path
BASE_DIR = '/home/prakank/IIT Delhi/3rd_year/Sem5/COL774_Machine_Learning/COL774-Machine-Learning-Assignments/Assignment-4/'
CAPTIONS_FILE_PATH = os.path.join(BASE_DIR, 'data', 'train_text.tsv')
captions_preprocessing_obj = CaptionsPreprocessing(CAPTIONS_FILE_PATH)

In [103]:
print(len(captions_preprocessing_obj.vocab))
print(len(captions_preprocessing_obj.captions_dict))
print(captions_preprocessing_obj.max_caption)

7356
50000
11


## DataSet Class

In [104]:
class ImageCaptionsDataset(Dataset):

    def __init__(self, img_dir, captions_dict, img_transform=None, captions_transform=None):
        """
        Args:
            img_dir (string): Directory with all the images.
            captions_dict: Dictionary with captions list keyed by image paths (strings)
            img_transform (callable, optional): Optional transform to be applied
                on the image sample.

            captions_transform: (callable, optional): Optional transform to be applied
                on the caption sample (list).
        """
        self.img_dir = img_dir
        self.captions_dict = captions_dict
        self.img_transform = img_transform
        self.captions_transform = captions_transform

        self.image_ids = list(captions_dict.keys())

    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_name = self.image_ids[idx]
        image = io.imread(img_name)
        captions = self.captions_dict[img_name]

        if self.img_transform:
            image = self.img_transform(image)

        if self.captions_transform:
            captions = self.captions_transform(captions)

        sample = {'image': image, 'captions': captions}

        return sample

In [105]:
class TestDatasetLoader(Dataset):
    
    def __init__(self, img_dir, img_transform):
        """
        Args:
            img_dir (string): Directory with all the test images.            
            img_transform (callable, optional): Optional transform to be applied
                on the image sample.
        """
        self.img_dir = img_dir
        self.img_transform = img_transform
        
        self.image_ids = ['test_data/test' + str(i) + '.jpg' for i in range(1, 5001)]
        
    def __len__(self):
        return len(self.image_ids)

    def __getitem__(self, idx):
        img_name = self.image_ids[idx]
        image = io.imread(img_name)
        
        if self.img_transform:
            image = self.img_transform(image)
        
        sample = {'image': image}
        
        return sample

## Model Architecture

In [106]:
def collate_fn(data):
    """
       data: is a list of tuples with (example, label, length)
             where 'example' is a tensor of arbitrary shape
             and label/length are scalars
    """
    for i in zip(*data):
        print(i)

    labels, lengths = zip(*data)
    max_len = max(lengths)
    n_ftrs = data[0][0].size(1)
    features = torch.zeros((len(data), max_len, n_ftrs))
    labels = torch.tensor(labels)
    lengths = torch.tensor(lengths)

    for i in range(len(data)):
        j, k = data[i][0].size(0), data[i][0].size(1)
        features[i] = torch.cat([data[i][0], torch.zeros((max_len - j, k))])

    return features.float(), labels.long(), lengths.long()

In [107]:
class Encoder(nn.Module):
    def __init__(self, encoded_space_dim, fully_connected_dim):
        super().__init__()
        
        # Given groups=1, weight of size [8, 1, 3, 3], expected input[32, 3, 256, 256] to have 1 channels, but got 3 channels instead

        self.encoder_cnn = nn.Sequential(
            nn.Conv2d(3, 8, kernel_size=3, stride=2, padding=1),
            nn.ReLU(True),
            nn.Conv2d(8, 16, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=0),
            nn.ReLU(True)
        )        

        # print("Part 1")

        self.flatten = nn.Flatten(start_dim=1)

        # print("Part 2")

        self.encoder_lin = nn.Sequential(
            nn.Linear(30752, 128),
            nn.ReLU(True),
            nn.Linear(128, encoded_space_dim)
        )

        # print("Part 3")
        
    def forward(self, x):
        x = self.encoder_cnn(x)
        x = self.flatten(x)
        x = self.encoder_lin(x)
        return x

class Decoder(nn.Module):

    def __init__(self, encoded_space_dim,fully_connected_dim):
        super().__init__()

        self.decoder_lin = nn.Sequential(
            nn.Linear(encoded_space_dim, 128),
            nn.ReLU(True),
            nn.Linear(128, 288),
            nn.ReLU(True)
        )

        # print("Decoder begin ... ")
        # print('Part 1')

        self.unflatten = nn.Unflatten(dim=1, 
        unflattened_size=(32, 3, 3))

        # print('Part 2')

        self.decoder_conv = nn.Sequential(
            nn.ConvTranspose2d(32, 16, 3, 
            stride=2, output_padding=0),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 8, 3, stride=2, 
            padding=1, output_padding=1),
            nn.BatchNorm2d(8),
            nn.ReLU(True),
            nn.ConvTranspose2d(8, 1, 3, stride=2, 
            padding=1, output_padding=1)
        )

# torch.Size([32, 1, 28, 11])

        # print('Part 3')

        self.decoder_flatten_fin = nn.Flatten(start_dim=1)

        self.decoder_linear_fin = nn.Sequential(
            nn.Linear(784, 128),
            nn.ReLU(True),
            nn.Linear(128, 32),
            nn.ReLU(True),
            nn.Linear(32, 11)
        )

        # print('Part 4 ...')
        
    def forward(self, x):
        x = self.decoder_lin(x)
        x = self.unflatten(x)
        x = self.decoder_conv(x)

        # print("Inside Block ...")
        # print(x.shape)

        x = self.decoder_flatten_fin(x)
        x = self.decoder_linear_fin(x)
        x = torch.sigmoid(x)
        return x


In [163]:
class ImageCaptionsNet(nn.Module):
    def __init__(self, encoder, decoder, captions_preprocessing_obj, loss_function, optimizer, epsilon):
        super(ImageCaptionsNet, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.loss_function = loss_function
        self.optimizer = optimizer
        self.epsilon = epsilon
        self.captions_preprocessing_obj = captions_preprocessing_obj


    def train(self, device, train_loader):
        self.encoder.train()
        self.decoder.train()

        train_loss = []

        for batch_idx, sample in enumerate(train_loader):
            net.zero_grad()
            image_batch, captions_batch = sample['image'], sample['captions']

            # print(type(image_batch), image_batch.dtype)
            # print(type(captions_batch), captions_batch.dtype)
            # If GPU training required
            if torch.cuda.is_available():
                image_batch, captions_batch = image_batch.cuda(), captions_batch.cuda()

            image_batch = image_batch.float()
            captions_batch = captions_batch.float()

            encoded_vector = self.encoder(image_batch)
            # print("Encoded:",batch_idx)
            
            output_caption = self.decoder(encoded_vector).float()
            # print("Decoded:",batch_idx)
            
            # print(output_caption.shape)
            # print(captions_batch.shape)

            print((captions_batch.view(-1).shape))
        
            loss = self.loss_function(captions_batch.view(-1), output_caption.view(-1)) # 32*11
            
            # loss = loss_function(output_captions.view(-1,vocab_size), captions.view(-1))
            
            print("Batch:{}, partial train loss (single batch):{}\n".format(batch_idx, loss.data), end="")
            
            # for i in range(captions_batch.shape[0]):
                # print(captions_preprocessing_obj.get_caption(captions_batch[i]))
                # print(captions_preprocessing_obj.get_caption(output_caption[i]))
                # print((captions_batch[i]))
                # print((output_caption[i]))
                # print()
            
            self.optimizer.zero_grad()

            # print("Optimizer Zero grad successful")

            loss.backward(loss)

            # print("Loss backward successful")

            self.optimizer.step()

            # print("Optimizer step successful")
                        
            train_loss = np.append(train_loss, (loss.detach().cpu().numpy()))            

            if len(train_loss) > 1:
                if abs(train_loss[-1] - train_loss[-2]) < EPSILON:
                    return np.mean(train_loss)
        
        return np.mean(train_loss)

    def predict(self, device, test_loader):
        self.encoder.eval()
        self.decoder.eval()

        with torch.no_grad():
            conc_out = []
            captions = []
            # conc_label = []

            for batch_idx, sample in enumerate(test_loader):
                # Move tensor to the proper device
                image_batch = sample['image'].float()
                
                image_batch = image_batch.to(device)
                # Encode data
                encoded_data = self.encoder(image_batch)
                # Decode data
                decoded_data = self.decoder(encoded_data)  # 32*11

                conc_out = decoded_data.cpu().numpy()
                                
                for i in conc_out:
                    captions.append(self.captions_preprocessing_obj.get_caption(i))

                print("Batch:{}".format(batch_idx))
                
                if batch_idx > 3:
                    return captions

        return captions
            # conc_out = torch.cat(conc_out)
            # conc_label = torch.cat(conc_label)
            # Evaluate global loss
            # val_loss = loss_fn(conc_out, conc_label)
        # return val_loss.data


## Training Loop

In [164]:
BASE_DIR = '/home/prakank/IIT Delhi/3rd_year/Sem5/COL774_Machine_Learning/COL774-Machine-Learning-Assignments/Assignment-4/'

# os.chdir('/content/drive/MyDrive/')
IMAGE_DIR = os.path.join(BASE_DIR, 'data')

if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Creating the Dataset
train_dataset = ImageCaptionsDataset(
    IMAGE_DIR, captions_preprocessing_obj.captions_dict, img_transform=img_transform,
    captions_transform=captions_preprocessing_obj.captions_transform
)

# Define your hyperparameters
NUMBER_OF_EPOCHS = 3
LEARNING_RATE = 1
EPSILON = 1e-5
BATCH_SIZE = 32
NUM_WORKERS = 1 # Parallel threads for dataloading

# Creating the DataLoader for batching purposes
os.chdir(os.path.join(BASE_DIR, 'data'))

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

loss_function = nn.CrossEntropyLoss()
# loss_function = torch.nn.MSELoss()

encoded_space_dim = 10
fully_connected_dim = 128

encoder = Encoder(encoded_space_dim, fully_connected_dim)
decoder = Decoder(encoded_space_dim, fully_connected_dim)

params_to_optimize = [{'params':encoder.parameters()},{'params': decoder.parameters()}]

optimizer = optim.SGD(params_to_optimize, lr=LEARNING_RATE)

encoder.to(device)
decoder.to(device)

net = ImageCaptionsNet(encoder, decoder, captions_preprocessing_obj, loss_function, optimizer, EPSILON)
# net = net.cuda()

for epoch in range(NUMBER_OF_EPOCHS):
    loss = net.train(device, train_loader)
    print("Iteration:{} , Loss:{}".format(str(epoch + 1), loss))

# train_loss =train_epoch(encoder,decoder,device,train_loader,loss_fn,optim)

torch.Size([352])


RuntimeError: shape '[-1, 7356]' is invalid for input of size 352

In [113]:
# Predcition

TEST_IMAGE_DIR = os.path.join(BASE_DIR, 'data')

test_img_transform = transforms.Compose([Rescale(IMAGE_RESIZE), ToTensor()]) # Applied sequentially

# Creating the Dataset
test_dataset = TestDatasetLoader(TEST_IMAGE_DIR, img_transform=test_img_transform)

test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

output_caption = net.predict(device, test_loader, captions_preprocessing_obj)

Batch:0
Batch:1
Batch:2
Batch:3
Batch:4


In [115]:
output_caption = np.asarray(output_caption)
print(output_caption.shape)

(160,)


In [118]:
print(output_caption)

['TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN TOKEN'
 'TOKEN TOKEN TOKEN TOKEN TOKEN TO

In [None]:
# os.chdir('/content/drive/MyDrive/')
# IMAGE_DIR = '/content/drive/MyDrive/data/train_data_main/'

# # Creating the Dataset
# train_dataset = ImageCaptionsDataset(
#     IMAGE_DIR, captions_preprocessing_obj.captions_dict, img_transform=img_transform,
#     captions_transform=captions_preprocessing_obj.captions_transform
# )

# # Define your hyperparameters
# NUMBER_OF_EPOCHS = 3
# LEARNING_RATE = 1e-1
# BATCH_SIZE = 32
# NUM_WORKERS = 1 # Parallel threads for dataloading
# loss_function = nn.CrossEntropyLoss()

# optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE)

# # print(train_dataset.image_ids, len(train_dataset.image_ids))

# # Creating the DataLoader for batching purposes
# os.chdir('/content/drive/MyDrive/data/train_data_main/')
# train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)

# # print(train_loader.dataset.captions_dict)

# import os
# for epoch in range(NUMBER_OF_EPOCHS):
#     for batch_idx, sample in enumerate(train_loader):
#         net.zero_grad()

#         image_batch, captions_batch = sample['image'], sample['captions']

#         # If GPU training required
#         print(image_batch.shape)
#         print(captions_batch.shape)
#         image_batch, captions_batch = image_batch.cuda(), captions_batch.cuda()

#         output_captions = net((image_batch, captions_batch))
        
#         # output_captions <- output of decoder
#         loss = loss_function(output_captions, captions_batch) # Error computation
#         loss.backward()
#         optimizer.step()
#     print("Iteration: " + str(epoch + 1))