In [None]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("adityajn105/flickr30k")

print("Path to dataset files:", path)

# Part 1 : Feature Extraction Pipeline

In [None]:
import os, pickle, torch, torch.nn as nn
from torchvision import models, transforms
from torch.utils.data import DataLoader, Dataset
from PIL import Image
from tqdm import tqdm

def find_image_dir():
    # Common Kaggle root 
    base_input = '/kaggle/input'
    # where the images actually are
    for root, dirs, files in os.walk(base_input):
        # Looking for the folder containing a high volume of jpg files
        if len([f for f in files if f.endswith('.jpg')]) > 1000:
            return root
    return None

IMAGE_DIR = find_image_dir()
OUTPUT_FILE = 'flickr30k_features.pkl'

if IMAGE_DIR:
    print(f" Found images at: {IMAGE_DIR}") 
else:
    raise FileNotFoundError("Could not find the Flickr30k image directory. Please ensure the dataset is added to the notebook.")

# DATASET CLASS
class FlickrDataset(Dataset):
    def __init__(self, img_dir, transform):
        self.img_names = [f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.jpeg'))] 
        self.transform = transform 
        self.img_dir = img_dir 

    def __len__(self):
        return len(self.img_names) 

    def __getitem__(self, idx):
        name = self.img_names[idx] 
        img_path = os.path.join(self.img_dir, name)
        img = Image.open(img_path).convert('RGB') 
        return self.transform(img), name

# FEATURE EXTRACTION PIPELINE
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 

# Load pre-trained ResNet50 and strip the final classification layer
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT) 
model = nn.Sequential(*list(model.children())[:-1]) # Extract 2048-dim feature vector only 
model = nn.DataParallel(model).to(device)
model.eval()

# Image preprocessing transforms as required by ResNet50
transform = transforms.Compose([
    transforms.Resize((224, 224)), 
    transforms.ToTensor(),
    transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)) 
])

dataset = FlickrDataset(IMAGE_DIR, transform) 
loader = DataLoader(dataset, batch_size=128, num_workers=4) 
features_dict = {}

# Run the extraction 
with torch.no_grad():
    for imgs, names in tqdm(loader, desc="Extracting Features"): 
        # Flatten the output to (batch_size, 2048)
        feats = model(imgs.to(device)).view(imgs.size(0), -1)
        for i, name in enumerate(names):
            features_dict[name] = feats[i].cpu().numpy() 

# Save the dictionary to a pickle file 
with open(OUTPUT_FILE, 'wb') as f:
    pickle.dump(features_dict, f) 

print(f"Success! {len(features_dict)} images processed and saved to {OUTPUT_FILE}") 

# Part 2: Vocabulary & Text Pre-Processing

In [None]:
import pandas as pd
import re
from collections import Counter

class Vocabulary:
    def __init__(self, freq_threshold):
        # 0: padding, 1: start of sentence, 2: end of sentence, 3: unknown words
        self.itos = {0: "<pad>", 1: "<start>", 2: "<end>", 3: "<unk>"}
        self.stoi = {"<pad>": 0, "<start>": 1, "<end>": 2, "<unk>": 3}
        self.freq_threshold = freq_threshold

    def __len__(self):
        return len(self.itos)

    @staticmethod
    def tokenizer(text):
        # Cleaning the text: lowercase, removing non-alphabetic chars
        text = str(text).lower()
        text = re.sub(r'[^a-z\s]', '', text)
        return text.split()

    def build_vocabulary(self, sentence_list):
        frequencies = Counter()
        idx = 4 # Started indexing after our 4 special tokens

        for sentence in sentence_list:
            for word in self.tokenizer(sentence):
                frequencies[word] += 1

                # Only added word to vocab if it meets the frequency threshold
                if frequencies[word] == self.freq_threshold:
                    self.stoi[word] = idx
                    self.itos[idx] = word
                    idx += 1

    def numericalize(self, text):
        tokenized_text = self.tokenizer(text)
        # Converted each word to its ID; using <unk> if word isn't in vocab
        return [
            self.stoi.get(token, self.stoi["<unk>"]) 
            for token in tokenized_text
        ]

# --- LOADING CAPTIONS ---

# Flickr30k typically uses '|' as a delimiter in captions.txt
CAPTIONS_PATH = '/kaggle/input/flickr30k/captions.txt'
df = pd.read_csv(CAPTIONS_PATH)

# Robust column detection (handles varying column names like 'comment' or 'caption')
caption_col = 'comment' if 'comment' in df.columns else df.columns[-1]
print(f"Using column '{caption_col}' for captions.")

# --- BUILDING VOCABULARY ---
# We use a threshold of 5 to ignore rare words/typos and keep the model efficient
vocab = Vocabulary(freq_threshold=5)
vocab.build_vocabulary(df[caption_col].tolist())

print(f"Vocabulary Size: {len(vocab)}")

# TEST
sample_text = df.iloc[0][caption_col]
numerical_seq = [vocab.stoi["<start>"]] + vocab.numericalize(sample_text) + [vocab.stoi["<end>"]]

print(f"\nExample Pre-processing:")
print(f"Original: {sample_text}")
print(f"Tokenized: {vocab.tokenizer(sample_text)}")
print(f"Numericalized: {numerical_seq}")

# Part 3: The Seq2Seq Architecture

In [None]:
import torch
import torch.nn as nn

class Encoder(nn.Module):
    def __init__(self, embed_size):
        super(Encoder, self).__init__()
        # Projects the 2048-dim ResNet features into the hidden_size 
        self.fc = nn.Linear(2048, embed_size)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(0.5)

    def forward(self, features):
        # features shape: (batch_size, 2048)
        # Output shape: (batch_size, embed_size)
        return self.dropout(self.relu(self.fc(features)))

class Decoder(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(Decoder, self).__init__()
        # Input: Word Embeddings of the caption 
        self.embed = nn.Embedding(vocab_size, embed_size)
        
        # The LSTM storyteller 
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        
        # Output: A Linear layer mapped to your vocab_size 
        self.linear = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(0.5)

    def forward(self, features, captions):
        # features: encoder output, captions: tokenized sequences
        # Create embeddings for the caption tokens
        # We don't pass the <end> token to the LSTM during training
        embeddings = self.dropout(self.embed(captions[:, :-1]))
        
        # Initial Hidden State logic: 
        # We treat the image features as the "first word" in the sequence
        # features shape: (batch_size, embed_size) -> (batch_size, 1, embed_size)
        embeddings = torch.cat((features.unsqueeze(1), embeddings), dim=1)
        
        # hiddens shape: (batch_size, seq_len, hidden_size)
        hiddens, _ = self.lstm(embeddings)
        
        # Map LSTM outputs to vocabulary probabilities 
        outputs = self.linear(hiddens)
        return outputs

class NeuralStoryteller(nn.Module):
    def __init__(self, embed_size, hidden_size, vocab_size, num_layers=1):
        super(NeuralStoryteller, self).__init__()
        self.encoder = Encoder(embed_size)
        self.decoder = Decoder(embed_size, hidden_size, vocab_size, num_layers)

    def forward(self, images, captions):
        # images: pre-extracted 2048-dim vectors
        features = self.encoder(images)
        outputs = self.decoder(features, captions)
        return outputs

# --- INITIALIZING THE MODEL ---
# Hyperparameters as suggested 
EMBED_SIZE = 512
HIDDEN_SIZE = 512
VOCAB_SIZE = len(vocab)

model = NeuralStoryteller(EMBED_SIZE, HIDDEN_SIZE, VOCAB_SIZE).to(device)

print(f"Model Architecture Initialized.")
print(f"Embed Size: {EMBED_SIZE}, Hidden Size: {HIDDEN_SIZE}, Vocab Size: {VOCAB_SIZE}")

# Part 4: Training & Inference

In [None]:
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

# COLLATE FUNCTION: Handles variable length captions in a batch
class MyCollate:
    def __init__(self, pad_idx):
        self.pad_idx = pad_idx

    def __call__(self, batch):
        # batch: list of tuples (feature, numericalized_caption)
        imgs = torch.stack([item[0] for item in batch])
        targets = [torch.tensor(item[1]) for item in batch]
        # Pad sequences so they all have the same length in the tensor
        targets = pad_sequence(targets, batch_first=True, padding_value=self.pad_idx)
        return imgs, targets

# DATASET WRAPPER: Pairs your cached .pkl features with the captions
class CachedDataset(Dataset):
    def __init__(self, df, features_dict, vocab, caption_col):
        self.df = df
        self.features_dict = features_dict
        self.vocab = vocab
        self.caption_col = caption_col

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        caption = self.df.iloc[idx][self.caption_col]
        img_id = self.df.iloc[idx]['image'] # Ensure this matches your CSV column name
        
        feature = torch.tensor(self.features_dict[img_id])
        
        # Format: <start> caption <end>
        numericalized = [self.vocab.stoi["<start>"]]
        numericalized += self.vocab.numericalize(str(caption))
        numericalized += [self.vocab.stoi["<end>"]]
        
        return feature, numericalized

# --- INITIALIZING LOADERS ---
train_dataset = CachedDataset(df, features_dict, vocab, caption_col)
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=64,
    shuffle=True,
    num_workers=2,
    collate_fn=MyCollate(pad_idx=vocab.stoi["<pad>"])
)

# 3. LOSS & OPTIMIZER 
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<pad>"])
optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)

# 4. INFERENCE: GREEDY SEARCH 
def greedy_search(model, image_feature, vocab, max_len=20):
    model.eval()
    result_caption = []
    
    with torch.no_grad():
        # Encoding the image feature
        features = model.encoder(image_feature.to(device).view(1, -1)).unsqueeze(1)
        states = None 
        
        # Starting with <start> token
        input_word = torch.tensor([vocab.stoi["<start>"]]).to(device).unsqueeze(0)
        
        for i in range(max_len):
            embeddings = model.decoder.embed(input_word)
            
            # For the very first step, concatenating the image context
            if i == 0:
                embeddings = torch.cat((features, embeddings), dim=1)
            
            hiddens, states = model.decoder.lstm(embeddings, states)
            outputs = model.decoder.linear(hiddens[:, -1, :])
            
            predicted = outputs.argmax(dim=1)
            
            word = vocab.itos[predicted.item()]
            if word == "<end>":
                break
                
            result_caption.append(word)
            input_word = predicted.unsqueeze(0)
            
    return " ".join(result_caption)

print("Training components and Greedy Inference initialized.")

**Training Loop**

In [None]:
from tqdm import tqdm

def train_model(epochs=5):
    model.train()
    train_losses = []

    for epoch in range(epochs):
        running_loss = 0.0
        loop = tqdm(enumerate(train_loader), total=len(train_loader))
        
        for idx, (features, captions) in loop:
            features, captions = features.to(device), captions.to(device)

            # Forward
            outputs = model(features, captions)
            
            # Loss Calculation (Shift targets to align with predictions)
            loss = criterion(outputs.view(-1, VOCAB_SIZE), captions.view(-1))

            # Backward
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            loop.set_description(f"Epoch [{epoch+1}/{epochs}]")
            loop.set_postfix(loss=loss.item())

        train_losses.append(running_loss / len(train_loader))
    
    return train_losses

# Start Training
losses = train_model(epochs=10) 

**Beam Search Implementation**

In [None]:
import torch.nn.functional as F

def beam_search(model, image_feature, vocab, beam_width=5, max_len=20):
    model.eval()
    
    with torch.no_grad():
        feature_context = model.encoder(image_feature.to(device).view(1, -1)).unsqueeze(1)
        
        # Candidates: (score, sequence, hidden_states)
        candidates = [(0.0, [vocab.stoi["<start>"]], None)]
        
        for i in range(max_len):
            all_candidates = []
            
            for score, seq, states in candidates:
                if seq[-1] == vocab.stoi["<end>"]:
                    all_candidates.append((score, seq, states))
                    continue
                
                input_word = torch.tensor([seq[-1]]).to(device).view(1, 1)
                embeddings = model.decoder.embed(input_word)
                
                if i == 0:
                    embeddings = torch.cat((feature_context, embeddings), dim=1)
                
                hiddens, states = model.decoder.lstm(embeddings, states)
                
                # CRITICAL FIX: Only take the LAST output vector [:, -1, :]
                outputs = model.decoder.linear(hiddens[:, -1, :])
                log_probs = F.log_softmax(outputs, dim=1)
                
                top_log_probs, top_indices = log_probs.topk(beam_width)
                
                for j in range(beam_width):
                    next_score = score + top_log_probs[0][j].item()
                    next_seq = seq + [top_indices[0][j].item()]
                    all_candidates.append((next_score, next_seq, states))
            
            # Sort and prune to beam_width
            candidates = sorted(all_candidates, key=lambda x: x[0], reverse=True)[:beam_width]
            
            if all(c[1][-1] == vocab.stoi["<end>"] for c in candidates):
                break
        
        best_seq = candidates[0][1]
        final_caption = [vocab.itos[idx] for idx in best_seq if idx not in [vocab.stoi["<start>"], vocab.stoi["<end>"]]]
        
        return " ".join(final_caption)
             
        

# --- QUICK TEST---
sample_feat = torch.tensor(features_dict[list(features_dict.keys())[0]])
print("Greedy:", greedy_search(model, sample_feat, vocab))
print("Beam (k=5):", beam_search(model, sample_feat, vocab, beam_width=5))

# Deliverable 1: Caption Examples

In [None]:
import matplotlib.pyplot as plt
from PIL import Image
import random
import os

def display_caption_examples(model, features_dict, df, vocab, img_dir, num_examples=5):
    # Selecting random image names from our feature cache
    all_image_names = list(features_dict.keys())
    random_images = random.sample(all_image_names, num_examples)
    
    # Setup the plot
    plt.figure(figsize=(20, 20))
    
    for i, img_name in enumerate(random_images):
        # 1. Loading the actual image file
        img_path = os.path.join(img_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        
        # 2. Getting the Ground Truth (taking the first of the 5 available captions)
        # Using the column name identified in Phase 2
        ground_truth = df[df['image'] == img_name].iloc[0][caption_col]
        
        # 3. Generating the Model Caption
        # Convert cached feature back to tensor
        feature_tensor = torch.tensor(features_dict[img_name])
        model_caption = greedy_search(model, feature_tensor, vocab)
        
        # 4. Plotting
        plt.subplot(num_examples, 1, i + 1)
        plt.imshow(image)
        plt.title(f"Ground Truth: {ground_truth}\nModel Generated: {model_caption}", 
                  fontsize=12, pad=10, loc='left', color='blue' if i % 2 == 0 else 'green')
        plt.axis('off')
        
    plt.tight_layout()
    plt.show()

# Runing the visual evaluation 
# Ensuring IMAGE_DIR is the path found in Phase 1
display_caption_examples(model, features_dict, df, vocab, IMAGE_DIR)

# Deliverable 2: Loss Curve

In [None]:
import matplotlib.pyplot as plt

def plot_training_results(train_losses):
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Training Loss', color='#1f77b4', linewidth=2)
    plt.title('Deliverable 2: Model Training Progress', fontsize=14)
    plt.xlabel('Epochs', fontsize=12)
    plt.ylabel('Loss (CrossEntropy)', fontsize=12)
    plt.legend()
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.show()

# Assuming 'losses' is the variable returned from your train_model function
plot_training_results(losses)

# Deliverable 3: Quantative Evaluation

In [None]:
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction

def calculate_metrics(model, features_dict, df, vocab, num_test=100):
    model.eval()
    total_bleu = 0
    smooth = SmoothingFunction().method1
    
    all_precision = []
    all_recall = []
    
    # Evaluating on a representative subset of data
    test_ids = list(features_dict.keys())[:num_test]
    
    for img_id in test_ids:
        # 1. Getting references (ground truths)
        references = df[df['image'] == img_id]['caption'].astype(str).tolist()
        ref_tokens = [r.lower().split() for r in references]
        
        # 2. Getting model prediction
        feat = torch.tensor(features_dict[img_id])
        prediction = greedy_search(model, feat, vocab).split()
        
        # 3. Calculating BLEU-4
        total_bleu += sentence_bleu(ref_tokens, prediction, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smooth)
        
        # 4. Calculating Precision/Recall (token-level)
        gt_set = set(ref_tokens[0]) # comparing against first ground truth
        pred_set = set(prediction)
        common = gt_set & pred_set
        
        precision = len(common) / len(pred_set) if len(pred_set) > 0 else 0
        recall = len(common) / len(gt_set) if len(gt_set) > 0 else 0
        
        all_precision.append(precision)
        all_recall.append(recall)

    avg_bleu = total_bleu / num_test
    avg_p = sum(all_precision) / len(all_precision)
    avg_r = sum(all_recall) / len(all_recall)
    f1 = 2 * (avg_p * avg_r) / (avg_p + avg_r) if (avg_p + avg_r) > 0 else 0
    
    print(f"--- Deliverable 3: Quantitative Metrics (n={num_test}) ---")
    print(f"Average BLEU-4: {avg_bleu:.4f}")
    print(f"Average Precision: {avg_p:.4f}")
    print(f"Average Recall: {avg_r:.4f}")
    print(f"Average F1-Score: {f1:.4f}")

calculate_metrics(model, features_dict, df, vocab)

In [None]:
torch.save(model.state_dict(), 'model.pth')