In [1]:
import pandas as pd
import os
import random
import torch
import pickle
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from collections import defaultdict, Counter
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision.transforms import transforms
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from pycocoevalcap.bleu.bleu import Bleu
from pycocoevalcap.meteor.meteor import Meteor
from pycocoevalcap.cider.cider import Cider
from pycocoevalcap.spice.spice import Spice

In [2]:
#open csv file
df = pd.read_csv(r"..\dataset\flickr30k_images\results.csv",delimiter="|")

#convert to lowercase and remove whitespace str.lower() is used instead of lower() as we have a 
#pandas series here and we want to apply the operation to every string. It is an vectorized 
# operation
df[" comment"] = df[" comment"].str.lower().str.strip()
df = df[df[" comment"].notna()]                    # Remove NaN
df = df[df[" comment"] != ""]  
#remove duplicate same caption for same image
df = df.drop_duplicates(subset=["image_name"," comment"])

df

Unnamed: 0,image_name,comment_number,comment
0,1000092795.jpg,0,two young guys with shaggy hair look at their ...
1,1000092795.jpg,1,"two young , white males are outside near many ..."
2,1000092795.jpg,2,two men in green shirts are standing in a yard .
3,1000092795.jpg,3,a man in a blue shirt standing in a garden .
4,1000092795.jpg,4,two friends enjoy time spent together .
...,...,...,...
158910,998845445.jpg,0,a man in shorts and a hawaiian shirt leans ove...
158911,998845445.jpg,1,"a young man hanging over the side of a boat , ..."
158912,998845445.jpg,2,a man is leaning off of the side of a blue and...
158913,998845445.jpg,3,"a man riding a small boat in a harbor , with f..."


In [3]:
image_name = df.iloc[0]["image_name"]
caption = df.iloc[0][" comment"]
image_name,caption

('1000092795.jpg',
 'two young guys with shaggy hair look at their hands while hanging out in the yard .')

In [4]:
caption_dict = defaultdict(list)
#accessing element not in default dict will not return error,
#  it creates default value([] here)

for _,row in df.iterrows():
    caption_dict[row["image_name"]].append(row[" comment"])

In [5]:
class Vocabulary:
    def __init__(self):
        self.word2idx = {}
        self.idx2word = {}
        self.freqs = Counter()
        self.special_tokens = ["<start>","<end>","<unk>","<pad>"]

    def build_vocab(self, caption_dict, threshold=5):
        for captions in caption_dict.values():
            for caption in captions:
                if not isinstance(caption, str):
                    continue
                words = caption.lower().strip().split()
                self.freqs.update(words)

        for idx, token in enumerate(self.special_tokens):
            self.word2idx[token] = idx
            self.idx2word[idx] = token
            idx += 1

        idx = len(self.special_tokens)

        for word, freq in self.freqs.items():
            if freq >= threshold:
                self.word2idx[word] = idx
                self.idx2word[idx] = word
                idx += 1


    def __getitem__(self, word):
        return self.word2idx.get(word, self.word2idx["<unk>"])

    def __len__(self):
        return len(self.word2idx)
    
    def idx_to_word(self, index):
        return self.idx2word.get(index, "<unk>")
    
    def decode_caption(self, token_ids):
        words = []

        for token_id in token_ids:
            #token_id = token_id.item()
            word = self.idx_to_word(token_id)

            if word == "<end>":
                break

            if word not in ["<start>", "<pad>"]:
                words.append(word)

        caption = " ".join(words)
        return caption
    
    def decode_captions_debugger(self, token_ids):
        words = []

        for token_id in token_ids:
            #token_id = token_id.item()
            word = self.idx_to_word(token_id)
            words.append(word)
        return words

In [6]:
class Flickr30kDataset(Dataset):

    def __init__(self, image_dir, caption_dict, vocab, transform=None):
        self.image_dir = image_dir
        self.caption_dict = list(caption_dict.items()) # [(image_name, [captions])]
        self.vocab = vocab #dictionary of all words used with index allocated for each word
        self.transform = transform #transformations to be applied for images

    def __len__(self):#returns number of images present
        return len(self.caption_dict)

    def __getitem__(self, idx): #What Happens on dataset[i]
        image_name, captions = self.caption_dict[idx]
        image_path = os.path.join(self.image_dir, image_name)
        image = Image.open(image_path).convert("RGB")#to ensure image has 3 channels

        #one caption is chosen at random, but over many epoch all captions will be choosed
        caption = random.choice(captions) 

        #tokenization words into numerical tensors
        tokens = [self.vocab["<start>"]]
        tokens += [self.vocab[word] for word in caption.split()]
        tokens += [self.vocab["<end>"]]

        #apply image transformations
        if self.transform:
            image = self.transform(image)

        return image, torch.tensor(tokens)


In [7]:
class Collator:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        images, captions = zip(*batch)

        #pad sequences 
        lengths = [len(caption) for caption in captions]
        captions_padded = pad_sequence(captions, batch_first = True, padding_value=self.pad_idx)

        return torch.stack(images), captions_padded, torch.tensor(lengths)

In [8]:
items = sorted(caption_dict.items(), key=lambda x:x[0])
seed = 42

train_set, temp_set = train_test_split(items, test_size=0.25,random_state=seed)
val_set, test_set = train_test_split(temp_set, test_size=0.5,random_state=seed)

train_dict = dict(train_set)
val_dict = dict(val_set)
test_dict = dict(test_set)

In [9]:
#1. Image Transformations
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],  # ImageNet stats
                         std=[0.229, 0.224, 0.225]),
])
#2. Vocabulary building

#add pickle file
vocab_path = r"..\extras\vocabulary.pkl"

if os.path.exists(vocab_path):
    print("Loading Existing Vocabulary....")
    with open(vocab_path, "rb") as f:
        vocab = pickle.load(f)

else:
    print("Creating new Vocabulary....")
    vocab = Vocabulary()
    vocab.build_vocab(caption_dict=caption_dict,
                  threshold=4)
    with open(vocab_path, "wb") as f:
        pickle.dump(vocab, f)


#3. Collate function
collator = Collator(pad_idx=vocab["<pad>"])

#4. Creating dataset
train_dataset = Flickr30kDataset(image_dir=r"..\dataset\flickr30k_images\Images",
                           caption_dict=train_dict,
                           vocab=vocab,
                           transform=transform)

val_dataset = Flickr30kDataset(image_dir=r"..\dataset\flickr30k_images\Images",
                           caption_dict=val_dict,
                           vocab=vocab,
                           transform=transform)

test_dataset = Flickr30kDataset(image_dir=r"..\dataset\flickr30k_images\Images",
                           caption_dict=test_dict,
                           vocab=vocab,
                           transform=transform)
#5. Dataloader
train_loader = DataLoader(dataset=train_dataset,
                        batch_size=32,
                        shuffle=True,
                        collate_fn=collator)

val_loader = DataLoader(dataset=val_dataset,
                        batch_size=32,
                        shuffle=False,
                        collate_fn=collator)

test_loader = DataLoader(dataset=test_dataset,
                        batch_size=32,
                        shuffle=False,
                        collate_fn=collator)

Loading Existing Vocabulary....


In [10]:
for images, captions, lengths in train_loader:
    print("Image batch shape:", images.shape)
    print("Captions shape:", captions.shape)    
    print("Lengths:", lengths)
    print("Image:",images)
    print("Caption: ", captions)
    print("Sum Lengths: ", torch.sum(lengths))
    break

Image batch shape: torch.Size([32, 3, 224, 224])
Captions shape: torch.Size([32, 37])
Lengths: tensor([14, 14, 37,  9, 32, 15,  9, 11, 23, 11,  9, 24, 23, 19, 12, 17, 24, 16,
        19,  8, 20, 14, 13, 14, 10, 19,  9, 16, 19, 19, 31, 23])
Image: tensor([[[[ 1.8208,  2.0948,  2.1975,  ...,  0.1254,  0.1083,  0.3823],
          [ 1.3413,  1.7009,  1.6838,  ...,  0.2796,  0.3309,  0.3823],
          [ 1.9235,  1.9749,  1.5297,  ...,  0.3138,  0.3994,  0.3823],
          ...,
          [-1.4672, -0.9363, -0.6623,  ...,  0.2453,  0.2967,  0.3823],
          [-1.0219, -0.6452, -0.4054,  ...,  0.1426,  0.1768,  0.2796],
          [-0.5938, -0.3712, -0.2856,  ..., -0.1999, -0.1486,  0.0912]],

         [[ 1.8508,  2.2010,  2.3235,  ..., -0.0574, -0.0574,  0.1877],
          [ 1.4307,  1.7808,  1.7283,  ...,  0.1176,  0.1527,  0.2052],
          [ 2.1310,  2.1660,  1.6408,  ...,  0.1702,  0.2577,  0.2577],
          ...,
          [-1.3880, -0.8803, -0.6527,  ...,  0.1877,  0.2402,  0.3102],
 

In [11]:
class EncoderCNN(nn.Module):

    def __init__(self, embed_size):
        super().__init__()
        resnet = models.resnet50(weights="IMAGENET1K_V1")
        for param in resnet.parameters():
            param.requires_grad = False#freezing gradients, transfer learning   

        modules = list(resnet.children())[:-1] #removing last layer
        self.resnet = nn.Sequential(*modules) #combining remaining layers
        self.linear = nn.Linear(resnet.fc.in_features, embed_size)#resizing to embedding size
        self.bn = nn.BatchNorm1d(embed_size)

    def forward(self, images):
        features = self.resnet(images)  # (B, 2048, 1, 1)
        features = features.view(features.size(0), -1)  # (B, 2048)
        features = self.bn(self.linear(features))  # (B, embed_size)
        print(f"Image features shape: {features.shape}")
        return features

In [12]:
class DecoderRNN(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.init_h = nn.Linear(embed_size, hidden_size)
        self.init_c = nn.Linear(embed_size, hidden_size)

    def forward(self, features, captions, lengths):
        embeddings = self.embed(captions)  # (B, T, embed_size)

        # Use image features to initialize LSTM states
        h0 = self.init_h(features).unsqueeze(0)  # (1, B, hidden_size)
        c0 = self.init_c(features).unsqueeze(0)  # (1, B, hidden_size)

        # Pack sequence
        packed = nn.utils.rnn.pack_padded_sequence(embeddings, lengths, batch_first=True, enforce_sorted=False)
        hiddens, _ = self.lstm(packed, (h0, c0))
        outputs = self.linear(hiddens.data)

        return outputs

    
    def sample(self, features, max_length = 35):
        output_ids = []
            # Initialize hidden and cell state using image features
        h0 = self.init_h(features).unsqueeze(0)  # (1, B, hidden_size)
        c0 = self.init_c(features).unsqueeze(0)  # (1, B, hidden_size)
        states = (h0, c0)

        # Start with <start> token
        inputs = self.embed(torch.tensor([vocab["<start>"]]*features.size(0)))  # (B, embed_size)
        inputs = inputs.unsqueeze(1)  # (B, 1, embed_size)

        for _ in range(max_length):
            hidden, states = self.lstm(inputs, states)
            outputs = self.linear(hidden.squeeze(1)) #(B,1,D) -> (B,vocab_size)
            predicted = outputs.argmax(1) #(B,)
            output_ids.append(predicted)

            inputs = self.embed(predicted) #(B,embed_size)
            inputs = inputs.unsqueeze(1) #(B,1,embed_size)

        output_ids = torch.stack(output_ids, 1)
        return output_ids


In [13]:
class ImageCaptioningModel(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, images, captions, lengths):
        features = self.encoder(images)  # (B, embed_size)
        outputs = self.decoder(features, captions, lengths)
        print(f"Model output shape: {outputs.shape}")
        return outputs


In [14]:
# gts: ground truths, res: model results
# Each dict should map image_id -> list of 1+ captions

def evaluate_metrics(gts, res):
    scorers = [
        (Bleu(4), ["BLEU-1", "BLEU-2", "BLEU-3", "BLEU-4"]),
        (Meteor(), "METEOR"),
        (Cider(), "CIDEr"),
        (Spice(), "SPICE")
    ]
#for bleu we get list of scores. So , we zip the scores along with names of scores.
    results = {}

    for scorer, method in scorers:
        score, scores_per_instance = scorer.compute_score(gts, res)
        if isinstance(method, list):
            for m, s in zip(method, score):
                results[m] = s #each bleu score is stored seperately in the dictionary
        else:
            results[method] = score #other methods and scores are stored as key-value pairs

    return results


In [15]:
def generate_caption(model, image, vocab, device, max_len=20,transform = None):
    """
    Generates a caption string from a single image tensor using the model.
    """
    model.eval()
    if transform:
        image = transform(image)
    image = image.unsqueeze(0).to(device)  # (1, C, H, W)

    with torch.no_grad():
        # Encode image features
        features = model.encoder(image)  # (1, embed_size)

        # Generate token IDs using decoder
        sampled_ids = model.decoder.sample(features, max_len=max_len)  # (1, max_len)
        sampled_ids = sampled_ids[0].tolist()  # Remove batch dim → list of ints

        # Convert IDs to words
        caption = vocab.decode_caption(sampled_ids)
    
    return caption

In [16]:
#1. Hyperparameters
num_epochs = 20
learning_rate = 1e-3
pad_idx = vocab["<pad>"]
embed_size = 256
hidden_size = 512

#2. Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


encoder = EncoderCNN(embed_size=embed_size)
decoder = DecoderRNN(embed_size=embed_size, hidden_size=hidden_size, vocab_size=len(vocab))
model = ImageCaptioningModel(encoder=encoder, decoder=decoder)


#3. Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=pad_idx)
optimizer = optim.Adam(params=model.parameters(), lr=learning_rate)


In [17]:
def train_model(model, train_loader, criterion, optimizer, vocab, num_epochs, device, clip_value=10):
    model.to(device=device)

    for epoch in range(1, num_epochs+1):

        # ---------------------- TRAINING ----------------------
        model.train() #to make model run in train mode
        train_loss = 0 #used to track average loss per epoch        

        for batch_idx, (image_batch, captions, lengths) in enumerate(train_loader):
            image_batch = image_batch.to(device)
            captions = captions.to(device)

            adjusted_lengths = [l-1 for l in lengths]

            #Targets: all words except the first one <start>
            targets = nn.utils.rnn.pack_padded_sequence(input=captions[:,1:],
                                                        lengths=adjusted_lengths,
                                                        batch_first=True,
                                                        enforce_sorted=False,
                                                        )[0]
            #Outputs: generated from models
            outputs = model(image_batch, captions[:, :-1], torch.tensor(adjusted_lengths))

            print(f"Captions shape: {captions.shape}")
            print(f"Lengths shape: {lengths.shape}")
            print(f"Outputs shape: {outputs.shape}")
            print(f"Targets shape: {targets.shape}")

            #compute loss
            loss = criterion(outputs, targets)

            #reset gradient to zero
            optimizer.zero_grad()

            #compute gradients
            loss.backward()

            # Gradient clipping
            torch.nn.utils.clip_grad_norm_(model.parameters(), clip_value)

            #Update weights based on gradients
            optimizer.step()

            train_loss += loss.item() #accumulate epoch loss

            #batch loss
            if (batch_idx ) % 100 == 0:
                print(f"Epoch [{epoch}/{num_epochs}], Step [{batch_idx+1}/{len(train_loader)}], Loss: {loss.item():.4f}")

        #average loss in an epoch     
        avg_train_loss = train_loss / len(train_loader)
            
        # ---------------------- SUMMARY ----------------------
        print(f"\nEpoch [{epoch}/{num_epochs}] Summary:")
        print(f"  Train Loss: {avg_train_loss:.4f}")


In [18]:
train_model(model=model,
            train_loader=train_loader,
            criterion=criterion,
            optimizer=optimizer,
            vocab=vocab,
            num_epochs=num_epochs,
            device=device,
            clip_value=10)


Image features shape: torch.Size([32, 256])
Model output shape: torch.Size([439, 8640])
Captions shape: torch.Size([32, 29])
Lengths shape: torch.Size([32])
Outputs shape: torch.Size([439, 8640])
Targets shape: torch.Size([439])
Epoch [1/20], Step [1/745], Loss: 9.0536
Image features shape: torch.Size([32, 256])
Model output shape: torch.Size([458, 8640])
Captions shape: torch.Size([32, 33])
Lengths shape: torch.Size([32])
Outputs shape: torch.Size([458, 8640])
Targets shape: torch.Size([458])
Image features shape: torch.Size([32, 256])
Model output shape: torch.Size([446, 8640])
Captions shape: torch.Size([32, 26])
Lengths shape: torch.Size([32])
Outputs shape: torch.Size([446, 8640])
Targets shape: torch.Size([446])
Image features shape: torch.Size([32, 256])
Model output shape: torch.Size([502, 8640])
Captions shape: torch.Size([32, 46])
Lengths shape: torch.Size([32])
Outputs shape: torch.Size([502, 8640])
Targets shape: torch.Size([502])
Image features shape: torch.Size([32, 256])

KeyboardInterrupt: 