<a href="https://colab.research.google.com/github/martinpius/DEEP-CNN-TRANSFER/blob/main/IMAGE_CAPTION_PROBLEM_IMPLEMENTATION_FROM_SCRATCH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [13]:
from google.colab import drive
drive.mount("/content/drive", force_remount = True)
try:
  COLAB = True
  import torch
  print(f"You are on CoLaB with torch version {torch.__version__}")
except Exception as e:
  print(f">>>> {type(e)}: {e}\n>>>> please correct {type(e)} and reload your drive")
  COLAB = False
if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("cpu")
def time_fmt(t: float = 231.21)->float:
  h = int(t / (60 * 60))
  m = int(t % (60 * 60) / 60)
  s = int(t % 60)
  return f"h: {h} min: {m:>02} sec: {s:>05.2f}"
print(f">>>> testing the time formating function...\n>>>> time elapsed\t{time_fmt()}")


Mounted at /content/drive
You are on CoLaB with torch version 1.9.0+cu102
>>>> testing the time formating function...
>>>> time elapsed	h: 0 min: 03 sec: 51.00


In [14]:
# In this network we are going to implement a neural network for image caption problem.
# The network is going to take an image captioned by a text which discribes what is on the picture
# This network combine two srchitectures, the CNN (which extract representative details) of the images
# and the LSTM which predict the image discription based on the information received from the features
# The model is a typical kind of encoder- decoder. The encoder being the CNN and decoder the LSTM
# we also add an attention mechanism to learn important representation from the images to improve prediction

In [15]:
import torch, spacy, os
import torchvision.transforms as transforms
import torchvision
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from PIL import Image
from tqdm import tqdm
from math import ceil
import random, time, datetime
import numpy as np
from tensorflow import summary
import pandas as pd
%load_ext tensorboard
spacy_eng = spacy.load('en')


The tensorboard extension is already loaded. To reload it, use:
  %reload_ext tensorboard


In [16]:
#set the random seed for reproducability and the gpu to deterministic:
seed = 123
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.benchmark = True
torch.backends.cudnn.deterministic = True

In [17]:
# We start with an encoder class: This is a CNN, we prefer to use transfer learning
# approach (load the pre-trained model), for our case we will use Inception-v3 network
# we will remove the classifier and replace the final layer before the classifier with
# an embedding layer (final output of the encoder must correspons to an embedding dim)

In [18]:

class CNNENC(nn.Module):
  def __init__(self, embedding_dim, train = False):
    super(CNNENC, self).__init__()
    self.train = train
    self.inception_v3 = torchvision.models.inception_v3(pretrained = True, aux_logits = False)
    self.inception_v3.fc = nn.Linear(self.inception_v3.fc.in_features, embedding_dim)
    self.drp = nn.Dropout(p = 0.5)
    self.relu = nn.ReLU()
  
  def forward(self, input_tensor):
    # we first grab the features of an image
    features = self.inception_v3(input_tensor)
    # we are training only the last layer of the inception network
    for name, param in self.inception_v3.named_parameters():
      if "fc_weight" in name or "fc_bias" in name:
        param.require_grad = True
      else:
        param.require_grad = self.train
    return self.drp(self.relu(features))


In [19]:
# the decorder class is the ussual RNN with an LSTM architecture
# the input to this network is the feature vectors from the encoder's network
# the dimension of the feature vectors must be equal to an embedding dim of LSTM
class LSTMDEC(nn.Module):
  def __init__(self, embedding_dim, hidden_dim, vocab_size,num_layers):
    super(LSTMDEC, self).__init__()
    self.embed = nn.Embedding(vocab_size, embedding_dim)
    self.lstm = nn.LSTM(input_size = embedding_dim, 
                        hidden_size = hidden_dim, 
                        num_layers = num_layers,
                        dropout = 0.5)
    self.drp = nn.Dropout(p = 0.5)
    self.fc = nn.Linear(hidden_dim, vocab_size)
  
  def forward(self, features, captions):
    ''' 
    the foward will pass in features from cnn-decoder and the captions (texts)
    to return the output which is the caption

    '''
    # first get the embedding for the target (captions)
    embeddings = self.drp(self.embed(captions))
    # add a batch dimension to the features and concatenate with the embeddings
    embeddings = torch.cat((features.unsqueeze(0), embeddings), dim = 0)
    # input to lstm include both features from images (pixels and the embeded texts from captions)
    hiddens, _ = self.lstm(embeddings)
    outputs = self.fc(hiddens) # get the output of our decorder
    return outputs

    

In [20]:
# We now build the model class which combines both the encoder and the decorder
class AUTOENC(nn.Module):
  def __init__(self, embedding_dim, hidden_dim, vocab_size, num_layers):
    super(AUTOENC, self).__init__()
    self.encoder = CNNENC(embedding_dim = embedding_dim)
    self.decoder = LSTMDEC(embedding_dim, hidden_dim,vocab_size, num_layers)
  
  def forward(self, images, captions):
    features = self.encoder(images) # run the encoder (CNN)
    outputs = self.decoder(images, captions) # run the decoder (the LSTM)
    return outputs

  def pred_captions(self, images, vocabulary, max_len = 50):
    '''
    Here we receives images without any caption. We use our
    model to predict what will be the probable caption. We fix length of the 
    caption to 50 words. We also provide vocabulary for the network to choose mostly
    likely words combinations based on the image. The idea is like to predict the mostly
    likely next word in a sentence.

    '''
    caption_res = [] # container for our caption (predicted)
    # no need to train the network again
    with torch.no_grad():
      feature = CNNENC(images).unsqueeze(0) # get the features
      states = None # we initialize the  lstm states to none [directly will be zeros at start]
      for _ in range(max_len):
        hidden, states = self.decoder.lstm(feature, states)
        output = self.decoder.fc(hidden.squeeze(0))
        pred_word = output.argmax(1) # grab the maximum probability in a (class probs = vocab_size)
        caption_res.append(pred_word.item()) # append the mostly probable word in a sentence.
        # the next input to the LSTM will be the output of the previous step
        feature = self.decoder.embed(pred_word).unsqueeze()
        # stop / quit the loop when we reach end of sentence
        if vocabulary.itos[pred_word.item()] == "<EOS>":
          break
        # we now return our predicted caption. 
    return [vocabulary.itos[idx] for idx in caption_res]


In [21]:
# A class to build vocabulary
class Vocabulary:
  def __init__(self, freq_threshold):
    '''
    We construct a dictionary which key-values as index-tring and vice-versa
    to convert the strings to indice and indices back to strings
    <UNK> is when the word doesnt bit the frequency threshold limit.
    '''
    self.freq_threshold = freq_threshold
    self.itos = {0:"<PAD>", 1:"<SOS>", 2:"<EOS>", 3:"<UNK>"}
    self.stoi = {"<PAD>":0, "<SOS>":1, "<EOS>":2, "<UNK>":3}
  
  def __len__(self):
    return len(self.itos)

  @staticmethod
  def eng_tokenizer(text):
    ''' 
    we use spacy-tokenizer to tokenize the texts and then change them to lower cases
    '''
    return [tok.text.lower() for tok in spacy_eng.tokenizer(text)]
  
  def build_vocabulary(self, caption_list):
    '''
    we send in caption list and build a corpus / bag of vocabulary
    in every caption we inspect the each word count. If the word occured
    more than frequency threshold we assign an index otherwise it will be assigned
    to unknown index.
    '''
    frequencies = {} # a dictionary / place-holder to store the words
    idx = 4 # we start at 4 because 0 = PAD, 1 = SOS, 2 = EOS, 3 = UNK
    for caption in caption_list:
      for word in caption:
        if word not in frequencies:
          frequencies[word] = 1
        else:
          frequencies[word] += 1

        #here we do the conversion if the criteria is met
        if frequencies[word] == self.freq_threshold:
          self.stoi[word] = idx
          self.itos[idx] = word
          idx += 1

  def numericalize(self, text):
    '''
    we actually convert the texts to numerics using this method

    '''
    tokenized_text = self.eng_tokenizer(text) # get the tokens in lower cases
    return [
            self.stoi[token] if token in self.stoi else self.stoi['<UNK>']
            for token in tokenized_text
    ]
      

# a class to Load the data from google drive
class Flickr30kData(Dataset):
  def __init__(self, root_dir, csv_file, transform = None, freq_threshold = 5):
    '''
    root_dir == directory to images folder
    csv_file == csv file directory for image discription (id, caption)
    transform == if we will apply some transformations to images
    freq_threshold == frequency threshold to keep most frequent words in captions

    '''
    self.root_dir = root_dir
    self.dfm = pd.read_csv(csv_file, error_bad_lines = False) # we read the csv file from a specified directory
    self.transform = transform

    # Grab the image id and caption data from the panda dataframe:
    self.imgs = self.dfm['image']
    self.captions = self.dfm['caption']

    #initialize and buil a vocabulary
    self.vocab = Vocabulary(freq_threshold) # we use Vocabulary class (to be defined)
    self.vocab.build_vocabulary(self.captions.tolist())
  
  def __len__(self):
    ''' 
    we grasp total number of examples from our data frame to mark the end of our
    loop when we load one datapoint after the other

    '''
    return len(self.dfm)
  
  def __getitem__(self, idx):
    '''
    this method help to grasp one sample at a time
    a single image with a corresponding caption

    '''
    caption = self.captions[idx] # grab a caption from an image (texts) from image description csv_file
    img_id = self.imgs[idx] # grab the image id from an image description csv-file
    img = Image.open(os.path.join(self.root_dir, img_id)).convert("RGB") # grab an image from the image folder and convert to RGB
    # we apply some transformation to the image if needed
    if self.transform is not None:
      img = self.transform(img)
    
    # We pre-process the texts here: captions (change into numeric)
    numericalized_caption = [self.vocab.stoi['<SOS>']] # we start at the begining of the sentennce (SOS)
    numericalized_caption += self.vocab.numericalize(caption) # change the caption to numeric
    numericalized_caption.append(self.vocab.stoi["<EOS>"]) # mark the end of the sentence <EOS>
    # convert to a tensor and then return the image with the corresponding caption
    return img, torch.tensor(numericalized_caption)

# Since every caption is of specified legth, economically we need to pad-the generated 
# sequences with the maximum length of a sentence on a specified batch.
class MyCollate:
  def __init__(self, pad_idx):
    self.pad_idx = pad_idx 

  def __call__(self, batch):
    images = [item[0].unsqueeze(0) for item in batch] # list of images with an added batch dimension
    images = torch.cat(images, dim = 0) # combine images accross the batch dims
    targets = [item[1] for item in batch] # grab all captions 
    targets = pad_sequence(targets, batch_first = False,padding_value = self.pad_idx) # pad every batch with its max len
    return images, targets

# We finally define our iterator (dataloader method to stream the data during training)
def get_loader(images_dir,
               csv_dir,
               transform,
               batch_size = 64,
               shuffle = True,
               pin_memory = True):
  
  #instantiate the data-loader, splits the data into batches padded independntly with their max len
  my_flickrdata = Flickr30kData(images_dir, csv_dir, transform)
  pad_idx = my_flickrdata.vocab.stoi["<PAD>"] # to use in the custom- collate function
  loader = DataLoader(dataset = my_flickrdata, 
                      batch_size = batch_size, 
                      shuffle = shuffle,
                      pin_memory = pin_memory, 
                      collate_fn = MyCollate(pad_idx = pad_idx))
  return loader, my_flickrdata

In [22]:

# We now implementing the function to train the above network
def train():
  transform = transforms.Compose([
                                  transforms.ToTensor(),
                                  transforms.Resize((356,356)),
                                  transforms.RandomCrop((299, 299)),
                                  transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))

  ]) 


  loader, dataset = get_loader(images_dir = "/content/drive/MyDrive/flickr30k_images/flickr8k/images", 
                               csv_dir = "/content/drive/MyDrive/flickr30k_images/flickr8k/captions.txt", 
                               transform = transform)
 
  # Hyper-parameters
  learning_rate = 1e-3
  EPOCHS = 10
  num_layers = 2
  embedding_size = 300
  hidden_dim =  512
  vocab_size = len(dataset.vocab)
  
  # tensorboard environment
  curr_time = datetime.datetime.now().timestamp()
  my_dir = "logs/tensorboard/image_captions" + str(curr_time)
  writer = summary.create_file_writer(my_dir)
  step = 0
  
  # Model's initialization
  model = AUTOENC(embedding_dim = embedding_size,
                  hidden_dim = hidden_dim,
                  vocab_size = vocab_size, 
                  num_layers = num_layers).to(device = device)
  optimizer = optim.RMSprop(params = model.parameters(), lr = learning_rate)
  loss_obj = nn.CrossEntropyLoss(ignore_index = dataset.vocab.stoi['<PAD>'])

  # the train loop
  tic = time.time()
  for epoch in range(EPOCHS):
    print(f"\n>>>> train starts for epoch {epoch + 1}\n>>>> please wait while the model is training................")
    for idx, (image, caption) in enumerate(tqdm(loader)):
      image = image.to(device = device)
      caption = caption.to(device = device)
      pred = model(image, caption[:-1]) # we want the model to predict EOS token that why we do not send it in
      loss = loss_obj(pred.reshape(-1, pred.shape[2]), caption.reshape(-1))
      with writer.as_default():
        summary.scalar("training_loss", loss.item(), step = step)
        step+=1
      optimizer.zero_grad()
      loss.backward(loss)
      optimizer.step()
  

In [None]:
train()
toc = time.time()
print(f"\n>>>> time elapsed: {time_fmt(toc - tic)}")


  0%|          | 0/633 [00:00<?, ?it/s][A


>>>> train starts for epoch 1
>>>> please wait while the model is training................
