In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from tqdm import tqdm
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader
from nltk.translate.bleu_score import corpus_bleu
from sklearn.model_selection import train_test_split


In [2]:
BASE_DIR = os.getcwd()
IMAGES_DIR = os.path.join(BASE_DIR, "images")
CAPTIONS_FILE = os.path.join(BASE_DIR, "captions.txt")

In [3]:
df = pd.read_csv(CAPTIONS_FILE)
df.columns = ['image', 'caption']

In [4]:
df.shape

(40455, 2)

In [5]:
mapping = {}
for _, row in df.iterrows():
    image_id = row['image'].split('.')[0]
    caption = row['caption']
    if image_id not in mapping:
        mapping[image_id] = []
    mapping[image_id].append(caption)

In [6]:
import re

def preprocess_captions(mapping):
    for key, captions in mapping.items():
        for i in range(len(captions)):
            caption = captions[i].lower()
            caption = re.sub(r'[^a-zA-Z ]', '', caption)
            caption = re.sub(r'\s+', ' ', caption).strip()
            caption = 'startseq ' + caption + ' endseq'
            captions[i] = caption

preprocess_captions(mapping)

In [7]:
from collections import Counter

all_captions = []
for captions in mapping.values():
    all_captions.extend(captions)

word_counts = Counter()
for caption in all_captions:
    word_counts.update(caption.split())

In [8]:
words = [word for word, count in word_counts.items() if count >= 1]
word2idx = {word: idx+1 for idx, word in enumerate(words)}
word2idx['<pad>'] = 0
idx2word = {idx: word for word, idx in word2idx.items()}

vocab_size = len(word2idx)
max_length = max(len(caption.split()) for caption in all_captions)

In [9]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device

device(type='cpu')

In [10]:
vgg = models.vgg16(pretrained=True)
vgg.classifier = nn.Sequential(*list(vgg.classifier.children())[:-1])
vgg = vgg.to(device)
vgg.eval()



Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to C:\Users\blogg/.cache\torch\hub\checkpoints\vgg16-397923af.pth


100%|██████████| 528M/528M [00:41<00:00, 13.4MB/s] 


VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [11]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

features = {}
for img_name in tqdm(os.listdir(IMAGES_DIR)):
    if img_name.endswith(('.jpg', '.jpeg', '.png')):
        img_path = os.path.join(IMAGES_DIR, img_name)
        image = Image.open(img_path).convert('RGB')
        image = transform(image).unsqueeze(0).to(device)
        with torch.no_grad():
            feature = vgg(image)
        image_id = img_name.split('.')[0]
        features[image_id] = feature.squeeze(0)

100%|██████████| 8091/8091 [21:16<00:00,  6.34it/s]


In [18]:
class ImageCaptionDataset(Dataset):
    def __init__(self, keys, mapping, features, word2idx, max_length):
        self.keys = keys
        self.mapping = mapping
        self.features = features
        self.word2idx = word2idx
        self.max_length = max_length

    def __len__(self):
        return len(self.keys)

    def __getitem__(self, idx):
        key = self.keys[idx]
        feature = self.features[key]  # e.g., tensor of size [4096]
        captions = self.mapping[key]
        caption = captions[0]
        tokens = caption.split()
        seq = [self.word2idx.get(word, self.word2idx['<pad>']) for word in tokens]
        seq = torch.tensor(seq, dtype=torch.long)
        return feature, seq


In [19]:
class CaptionGenerator(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, feature_size=4096):
        super(CaptionGenerator, self).__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.feature_embed = nn.Linear(feature_size, embed_size)  # Project image features
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.init_weights()

    def init_weights(self):
        self.embed.weight.data.uniform_(-0.1, 0.1)
        self.linear.weight.data.uniform_(-0.1, 0.1)
        self.linear.bias.data.fill_(0)

    def forward(self, features, captions):
        # features: (batch_size, feature_size)
        # captions: (batch_size, seq_len)
        features = self.feature_embed(features)  # (batch_size, embed_size)
        embeddings = self.embed(captions)        # (batch_size, seq_len, embed_size)
        # Concatenate image features as first token embedding
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)  # (batch_size, seq_len+1, embed_size)
        hiddens, _ = self.lstm(embeddings)
        outputs = self.linear(hiddens)  # (batch_size, seq_len+1, vocab_size)
        return outputs

In [20]:
image_ids = list(mapping.keys())
train_ids, test_ids = train_test_split(image_ids, test_size=0.1, random_state=42)

In [21]:
train_dataset = ImageCaptionDataset(train_ids, mapping, features, word2idx, max_length)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=lambda x: x)


In [23]:
embed_size = 256
hidden_size = 512
vocab_size = len(word2idx)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CaptionGenerator(embed_size, hidden_size, vocab_size).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [25]:
for epoch in range(10):
    model.train()
    for batch in train_loader:
        features_batch, captions_batch = zip(*batch)
        features_batch = torch.stack(features_batch).to(device)
        captions_batch = torch.nn.utils.rnn.pad_sequence(
            captions_batch, batch_first=True, padding_value=word2idx['<pad>']
        ).to(device)

        outputs = model(features_batch, captions_batch[:, :-1])  # (batch, seq_len, vocab_size)

        # Skip the first token in outputs because it's the image feature embedding step
        loss = criterion(
            outputs[:, 1:].reshape(-1, vocab_size),
            captions_batch[:, 1:].reshape(-1)
        )

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print(f"Epoch [{epoch+1}/10], Loss: {loss.item():.4f}")

Epoch [1/10], Loss: 1.9429
Epoch [2/10], Loss: 2.6099
Epoch [3/10], Loss: 1.9806
Epoch [4/10], Loss: 1.7816
Epoch [5/10], Loss: 1.6788
Epoch [6/10], Loss: 1.5825
Epoch [7/10], Loss: 1.4055
Epoch [8/10], Loss: 1.2688
Epoch [9/10], Loss: 1.2789
Epoch [10/10], Loss: 1.3090


In [26]:
def generate_caption(model, feature, word2idx, idx2word, max_length):
    model.eval()
    caption = ['startseq']
    for _ in range(max_length):
        seq = [word2idx.get(word, word2idx['<pad>']) for word in caption]
        seq = torch.tensor(seq, dtype=torch.long).unsqueeze(0).to(device)
        with torch.no_grad():
            output = model(feature.unsqueeze(0), seq)
        _, predicted = output[0, -1, :].max(0)
        word = idx2word[predicted.item()]
        caption.append(word)
        if word == 'endseq':
            break
    return ' '.join(caption[1:-1])

In [27]:
actual, predicted = [], []

for key in test_ids:
    feature = features[key].to(device)
    caption = generate_caption(model, feature, word2idx, idx2word, max_length)
    references = [cap.split() for cap in mapping[key]]
    actual.append(references)
    predicted.append(caption.split())

In [28]:
print(f"BLEU-1: {corpus_bleu(actual, predicted, weights=(1.0, 0, 0, 0))}")
print(f"BLEU-2: {corpus_bleu(actual, predicted, weights=(0.5, 0.5, 0, 0))}")

BLEU-1: 0.5074283322870894
BLEU-2: 0.32114328636957684
