Let's implement an LSTM for image captioning

How do recurrent models generally work? They operate with a sequence of hidden states (and cell states, in LSTM's case) in order to put into practice the notion of recurrency (which allows for long-term memory).

Let's go ahead and define our LSTM

In [1]:
import torch
import torch.nn as nn
import numpy as np
import torch
!pip install datasets
!pip install pillow



Collecting datasets
  Downloading datasets-2.18.0-py3-none-any.whl (510 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m510.5/510.5 kB[0m [31m5.4 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.9,>=0.3.0 (from datasets)
  Downloading dill-0.3.8-py3-none-any.whl (116 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m116.3/116.3 kB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
Collecting xxhash (from datasets)
  Downloading xxhash-3.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (194 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m194.1/194.1 kB[0m [31m6.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting multiprocess (from datasets)
  Downloading multiprocess-0.70.16-py310-none-any.whl (134 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m134.8/134.8 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: xxhash, dill, multiprocess, datasets
Successfully installed datasets-2

In [2]:

if torch.cuda.is_available():
  device = torch.device("cuda")
else:
  device = torch.device("device")

In [3]:
from datasets import load_dataset
from PIL import Image
import requests
from io import BytesIO
from transformers import AutoTokenizer
from datasets import load_dataset
import torch
import torchvision.models as models
import torchvision.transforms as transforms
from PIL import Image


vgg16 = models.vgg16()
vgg16_features = vgg16.features
vgg16_features.eval()

transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.Grayscale(num_output_channels=3),  # Convert to three channels if not already
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
# img = Image.open("image.png")

tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")

def tokenization(example):
    return tokenizer(example["text"])

def preprocess(data):
    # Pre-process the image
    img = data['image']
    if isinstance(img, str): # only fetch the image if 'image' is a URL (i.e., a string)
        response = requests.get(img)
        img = Image.open(BytesIO(response.content))
    img = transform(img)

    # Pre-process the captions
    caption = data['text']

    # Tokenize the caption
    tokens = tokenizer.encode_plus(
        caption,
        truncation=True,
        max_length=512,
        padding='max_length',
        add_special_tokens=True,
        return_tensors='pt'
    )

    return {'image': img, 'caption': tokens['input_ids'].squeeze(), 'attention_mask': tokens['attention_mask'].squeeze()}


# Load the dataset
dataset = load_dataset('jpawan33/kag100-image-captioning-dataset')

# Pre-process the dataset
dataset = dataset.map(preprocess)

class LSTM_Captioner(nn.Module):
  def __init__(self, input_size, hidden_size, num_layers, output_dim, cnn_output_size):
    super(LSTM_Captioner, self).__init__()
    self.input_size = input_size
    self.hidden_size = hidden_size
    self.num_layers = num_layers
    self.output_dim = output_dim
    self.model = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
    self.fc1 = nn.Linear(hidden_size, output_dim)
    self.fc = nn.Linear(cnn_output_size, hidden_size)


  # def forward(self, image_features, caption_tokens):
  #     # Process the image features
  #     image_features = self.fc(image_features)  # Shape: [batch_size, hidden_size]
  #     image_features = image_features.unsqueeze(0)  # Add a sequence length dimension

  #     # Initialize the hidden state (optionally with image features)
  #     h0 = torch.zeros(self.num_layers, image_features.size(1), self.hidden_size).to(image_features.device)
  #     c0 = torch.zeros(self.num_layers, image_features.size(1), self.hidden_size).to(image_features.device)

  #     # Optionally, you can initialize h0 with image_features if it makes sense for your model
  #     # h0 = image_features

  #     # Process the caption tokens through the LSTM
  #     lstm_out, _ = self.model(caption_tokens, (h0, c0))

  #     # Pass the output of the LSTM to the fully connected layer
  #     output = self.fc1(lstm_out)

    #     return output

    # TODO let's include an additional weight matrix in LSTM for images themselves- so we can recurently feed back in
  def forward(self, image_features, caption_tokens):
      # Process the image features
      image_features = self.fc(image_features)  # Shape: [batch_size, hidden_size]
      image_features = image_features.unsqueeze(0)  # Add a sequence length dimension

      # Initialize the hidden state (optionally with image features)
      h0 = torch.zeros(self.num_layers, image_features.size(1), self.hidden_size).to(image_features.device)
      c0 = torch.zeros(self.num_layers, image_features.size(1), self.hidden_size).to(image_features.device)

      # Optionally, you can initialize h0 with image_features if it makes sense for your model
      # h0 = image_features

      # Process the caption tokens through the LSTM
      lstm_out, _ = self.model(caption_tokens, (h0, c0))

      # Pass the output of the LSTM to the fully connected layer
      output = self.fc1(lstm_out)

      return output


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/502 [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/462M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/1000 [00:00<?, ? examples/s]

Map:   0%|          | 0/1000 [00:00<?, ? examples/s]

In [9]:
data= dataset['train']

In [4]:


hidden_size = 512 # the size to which you want to map your CNN features
vocabulary_size = tokenizer.vocab_size # 30522 for BERT
embedding_dim = tokenizer.model_max_length # 768 for BERT
num_layers = 2 # the number of LSTM layers
cnn_output_size = 49

embed = nn.Embedding(vocabulary_size, embedding_dim)

# Define your LSTM captioner
model = LSTM_Captioner(input_size=embedding_dim, hidden_size=hidden_size, num_layers=num_layers, output_dim=vocabulary_size, cnn_output_size=cnn_output_size)





Now let's bring in a pre-trained CNN for feature extraction for our LSTM

In [5]:
loss_fn = torch.nn.CrossEntropyLoss()

Our training data consists of
- Captions
- Images

To train our LSTM
- Extract feature maps from images using pre-trained CNN
- Feed in feature map as iniital hidden state- prompt with <START> token
- Use b


In [10]:


# Now
from tqdm import tqdm



epochs = 4
for i in range(epochs):
  for batch in tqdm(data):
    img = torch.tensor(batch['image'])
    features = vgg16_features(torch.tensor(img))

    caption = torch.tensor(batch['caption'])

    print("HI")

    print(caption.shape)
    # loss = 0.0
    # for each timestep in the captions

    for t in range(512-1):  # minus 1 because we don't have the next word for the last word as input
        # Forward pass
        # if ()
        input_caption = torch.tensor(caption[t])
        if (input_caption.item()==0):
          break

        print(input_caption)
        image_features = features.view(512, -1)
        # input_caption

        input_caption = torch.tensor([[input_caption.item()]])

        output = model(image_features, torch.tensor(input_caption))
        print(f"output {output}")


        target_caption = torch.tensor(caption[t+1])

        # # compute the loss
        target_caption = caption[t+1]  # predicting the next word in the caption
        loss_t = loss_fn(output.squeeze(1), target_caption)
        # loss += loss_t

    # # Backward pass
    # loss.backward()

    # # Update weights
    # optimizer.step()





dataset['train'][0]['text']

  features = vgg16_features(torch.tensor(img))
  input_caption = torch.tensor(caption[t])
  output = model(image_features, torch.tensor(input_caption))
  0%|          | 0/1000 [00:00<?, ?it/s]

HI
torch.Size([512])
tensor(101)





RuntimeError: For unbatched 2-D input, hx and cx should also be 2-D but got (3-D, 3-D) tensors