## Imports

In [1]:
import os
from PIL import Image
from torch.utils.data import Dataset
import torchvision.transforms as transforms
from torch.utils.data import Subset
import random
import numpy as np
from collections import Counter
import torch
import torch.nn as nn
import torchvision.models as models
from torch.nn.utils.rnn import pad_sequence
import nltk
nltk.download('punkt')
import matplotlib.pyplot as plt
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from tqdm import tqdm



from google.colab import drive
drive.mount('/content/drive')

## Hyperparameters

In [None]:
EMBEDDING_DIM = 200
HIDDEN_SIZE = 50
BATCH_SIZE = 32
NUM_EPOCHS = 15
LEARNING_RATE = 1e-3
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
glove_path = "/content/glove.6B.200d.txt"


# Dataset Helper functions


In [None]:

def split_dataset(dataset, split_ratio=0.8, seed=42):
    """
    Splits a dataset into train and test sets.

    Args:
        dataset (Dataset): The full dataset to split.
        split_ratio (float): Ratio for train set. (0.8 means 80% train, 20% test)
        seed (int): Random seed for reproducibility.

    Returns:
        train_dataset (Subset), test_dataset (Subset)
    """
    dataset_size = len(dataset)
    indices = list(range(dataset_size))
    random.seed(seed)
    random.shuffle(indices)

    split = int(split_ratio * dataset_size)
    train_indices = indices[:split]
    test_indices = indices[split:]

    train_dataset = Subset(dataset, train_indices)
    test_dataset = Subset(dataset, test_indices)

    return train_dataset, test_dataset









class ImageCaptionDataset(Dataset):
    def __init__(self, image_folder, image_names_file, captions_file,vocab,transform=None,max_caption_length=30):
        self.image_folder = image_folder
        self.transform = transform if transform else transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])
        self.vocab = vocab
        self.max_caption_length = max_caption_length

        # Load image names
        with open(image_names_file, 'r') as f:
            image_names = [line.strip() for line in f]

        # Load captions and group by image name (excluding #index)
        self.captions_dict = {}
        with open(captions_file, 'r') as f:
            for line in f:
                parts = line.strip().split('\t')
                if len(parts) != 2:
                    continue
                img_full_name, caption = parts
                img_name = img_full_name.split('#')[0]
                if img_name not in self.captions_dict:
                    self.captions_dict[img_name] = []
                self.captions_dict[img_name].append(caption)

        # Filter image names that are present in captions and exist in the folder
        self.valid_image_names = [
            name for name in image_names
            if name in self.captions_dict and os.path.exists(os.path.join(image_folder, name))
        ]

    def __len__(self):
        return len(self.valid_image_names)

    def __getitem__(self, idx):
        img_name = self.valid_image_names[idx]
        img_path = os.path.join(self.image_folder, img_name)


        try:
            image = Image.open(img_path).convert('RGB')
        except FileNotFoundError:
            print(f"[WARNING] Skipping missing image: {img_path}")
            return None  # Dataset returns None if image is missing

        image = self.transform(image)

        # Use the first caption by default (you can change this)
        caption = self.captions_dict[img_name][0]
        caption_tensor = self.vocab.caption_to_tensor(caption, self.max_caption_length)


        return image, caption_tensor


## GloVe Setup

In [None]:
def load_glove_embeddings(glove_path, vocab, embedding_dim=200):
    """
    Loads GloVe embeddings and returns an embedding matrix for the given vocab.

    Args:
        glove_path (str): Path to glove.6B.200d.txt
        vocab (Vocabulary): Vocabulary object
        embedding_dim (int): Dimension of GloVe vectors (default 200)

    Returns:
        embedding_matrix (np.ndarray): Shape (vocab_size, embedding_dim)
    """
    embeddings_index = {}

    # Load glove embeddings
    with open(glove_path, 'r', encoding='utf-8') as f:
        for line in f:
            values = line.strip().split()
            word = values[0]
            vector = np.asarray(values[1:], dtype='float32')
            embeddings_index[word] = vector

    vocab_size = len(vocab)
    embedding_matrix = np.zeros((vocab_size, embedding_dim))

    for word, idx in vocab.stoi.items():
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            embedding_matrix[idx] = embedding_vector
        else:
            # Random initialization for words not in GloVe
            embedding_matrix[idx] = np.random.normal(scale=0.6, size=(embedding_dim,))

    return embedding_matrix


## Vocabulary Setup

In [None]:

class Vocabulary:
    def __init__(self, freq_threshold=1):
        self.freq_threshold = freq_threshold
        self.itos = {0: "<PAD>", 1: "<SOS>", 2: "<EOS>", 3: "<UNK>"}
        self.stoi = {v: k for k, v in self.itos.items()}

    def __len__(self):
        return len(self.itos)

    def build_vocabulary(self, sentence_list):
        frequencies = Counter()
        idx = len(self.itos)

        for sentence in sentence_list:
            tokens = sentence.lower().split()
            frequencies.update(tokens)

        for word, freq in frequencies.items():
            if freq >= self.freq_threshold:
                self.stoi[word] = idx
                self.itos[idx] = word
                idx += 1

    def numericalize(self, text):
        return [
            self.stoi.get(word, self.stoi["<UNK>"])
            for word in text.lower().split()
        ]

    def caption_to_tensor(self, caption, max_length=None):
        numericalized = [self.stoi["<SOS>"]] + self.numericalize(caption) + [self.stoi["<EOS>"]]

        if max_length:
            if len(numericalized) < max_length:
                numericalized += [self.stoi["<PAD>"]] * (max_length - len(numericalized))
            else:
                numericalized = numericalized[:max_length-1] + [self.stoi["<EOS>"]]

        return torch.tensor(numericalized, dtype=torch.long)




def collate_fn(batch):
    """
    Collate function that pads captions and stacks images.
    """
    images, captions = zip(*batch)
    images = torch.stack(images, dim=0)
    captions_padded = pad_sequence(captions, batch_first=True, padding_value=0)
    return images, captions_padded


## Model Definition

In [None]:
class VGGEncoder(nn.Module):
    def __init__(self, embed_size):
        super(VGGEncoder, self).__init__()
        vgg = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)

        # Use features only (no classifier)
        self.vgg_features = vgg.features

        # Freeze VGG parameters (optional, for faster training)
        for param in self.vgg_features.parameters():
            param.requires_grad = False

        # Add adaptive pooling and linear projection
        self.fc = nn.Linear(512 * 7 * 7, embed_size)
        self.bn = nn.BatchNorm1d(embed_size)

    def forward(self, images):
        features = self.vgg_features(images)
        features = features.view(features.size(0), -1)
        features = self.fc(features)
        features = self.bn(features)
        return features  # Shape: (batch_size, embed_size)







class LSTMDecoder(nn.Module):
    def __init__(self, embedding_matrix, hidden_size, vocab_size, num_layers=1, freeze_embeddings=False):
        super(LSTMDecoder, self).__init__()

        self.embed = nn.Embedding.from_pretrained(
            torch.FloatTensor(embedding_matrix), freeze=freeze_embeddings
        )

        embed_size = embedding_matrix.shape[1]
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, captions, features):
        embeddings = self.embed(captions)  # (B, T, E)

        features = features.unsqueeze(1)  # (B, 1, E)
        inputs = torch.cat((features, embeddings), dim=1)  # (B, T+1, E)

        outputs, _ = self.lstm(inputs)
        outputs = self.linear(outputs)  # (B, T+1, vocab_size)

        return outputs








class RNNDecoder(nn.Module):
    def __init__(self, embedding_matrix, hidden_size, vocab_size, num_layers=1, freeze_embeddings=False):
        super(RNNDecoder, self).__init__()

        self.embed = nn.Embedding.from_pretrained(
            embedding_matrix, freeze=freeze_embeddings
        )

        embed_size = embedding_matrix.shape[1]
        self.rnn = nn.RNN(embed_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, vocab_size)

    def forward(self, captions, features):
        embeddings = self.embed(captions)  # (B, T, E)

        features = features.unsqueeze(1)  # (B, 1, E)
        inputs = torch.cat((features, embeddings), dim=1)  # (B, T+1, E)

        outputs, _ = self.rnn(inputs)
        outputs = self.linear(outputs)  # (B, T+1, vocab_size)

        return outputs










class ImageCaptionModel(nn.Module):
    def __init__(self, encoder, decoder, vocab, device):
        super(ImageCaptionModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.vocab = vocab
        self.device = device

    def forward(self, images, captions):
        # Forward pass for training
        features = self.encoder(images)  # (B, E)
        outputs = self.decoder(captions[:, :-1], features)  # (B, T, V)
        return outputs

    def generate_caption(self, image, max_length=15):
        self.eval()
        with torch.no_grad():
            # Encode the image
            feature = self.encoder(image.unsqueeze(0).to(self.device))  # (1, E)

            # Start with <start> token
            caption = [self.vocab.stoi["<SOS>"]]
            input_caption = torch.tensor(caption, dtype=torch.long).to(self.device)

            for _ in range(max_length):
                # Get embeddings and decoder output
                output = self.decoder(input_caption.unsqueeze(0), feature)  # (1, T+1, V)
                output = output[0, -1, :]  # Get last timestep output

                predicted_idx = output.argmax(dim=-1).item()
                caption.append(predicted_idx)

                if self.vocab.itos[predicted_idx] == "<EOS>":
                    break

                input_caption = torch.tensor(caption[1:], dtype=torch.long).to(self.device)

            # Convert to words, skip <start> and stop at <end>
            words = [self.vocab.itos[idx] for idx in caption[1:]]
            final_caption = []
            for word in words:
                if word == "<EOS>":
                    break
                final_caption.append(word)

            return ' '.join(final_caption)


## Train helper

In [None]:
class Trainer:
    def __init__(self, model, train_loader, criterion, optimizer, vocab, device):
        self.model = model.to(device)
        self.train_loader = train_loader
        self.criterion = criterion
        self.optimizer = optimizer
        self.vocab = vocab
        self.device = device

        # Track metrics for plotting
        self.train_losses = []
        self.train_accuracies = []

    def _calculate_accuracy(self, outputs, targets):
        _, predicted = outputs.max(2)  # (B, T)
        correct = (predicted == targets).float()
        mask = (targets != self.vocab.stoi["<PAD>"]).float()
        accuracy = (correct * mask).sum() / mask.sum()
        return accuracy.item()

    def train(self, num_epochs):
        for epoch in range(num_epochs):
            self.model.train()
            total_loss = 0
            total_acc = 0
            steps = 0

            for images, captions in tqdm(self.train_loader, desc=f"Epoch {epoch+1}"):
                images = images.to(self.device)
                captions = captions.to(self.device)

                if images is None or captions is None:
                    continue

                # Prepare decoder input and target
                inputs = captions[:, :-1]     # Exclude <end>
                targets = captions[:, 1:]     # Exclude <start>

                self.optimizer.zero_grad()
                outputs = self.model(images, inputs)  # (B, T-1, V)

                # Flatten outputs and targets for loss computation
                loss = self.criterion(
                    outputs.reshape(-1, outputs.size(2)),  # (B*(T-1), V)
                    targets.reshape(-1)                    # (B*(T-1))
                )
                loss.backward()
                self.optimizer.step()

                acc = self._calculate_accuracy(outputs, targets)
                total_loss += loss.item()
                total_acc += acc
                steps += 1

            avg_loss = total_loss / steps
            avg_acc = total_acc / steps
            self.train_losses.append(avg_loss)
            self.train_accuracies.append(avg_acc)

            print(f"Epoch {epoch+1}: Loss = {avg_loss:.4f}, Accuracy = {avg_acc:.4f}")


## Loading Data

In [None]:
import zipfile

# Replace 'file_name.zip' with the name of your zip file
zip_file_path = '/content/drive/MyDrive/CS6910/Images.zip'

extract_to_path = '/content'

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to_path)





with open('/content/image_names.txt', 'r') as f:
    image_names = [line.strip() for line in f]

# Load captions and group by image name (excluding #index)
captions_dict = {}
with open('/content/captions.txt', 'r') as f:
    for line in f:
        parts = line.strip().split('\t')
        if len(parts) != 2:
            continue
        img_full_name, caption = parts
        img_name = img_full_name.split('#')[0]
        if img_name not in captions_dict:
            captions_dict[img_name] = []
        captions_dict[img_name].append(caption)




vocab = Vocabulary(freq_threshold=5)
all_captions = []
for caps in captions_dict.values():
    all_captions.extend(caps)

vocab.build_vocabulary(all_captions)







In [None]:
embedding_matrix = load_glove_embeddings(glove_path, vocab, EMBEDDING_DIM)
embedding_matrix = torch.tensor(embedding_matrix, dtype=torch.float32)


dataset=ImageCaptionDataset('/content/Images/','/content/image_names.txt','/content/captions.txt',vocab)

train_dataset, test_dataset = split_dataset(dataset, split_ratio=0.8)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True,collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False,collate_fn=collate_fn)  # For BLEU


In [None]:
encoder = VGGEncoder(EMBEDDING_DIM).to(DEVICE)
decoder = RNNDecoder(embedding_matrix, HIDDEN_SIZE, len(vocab)).to(DEVICE)


model = ImageCaptionModel(encoder, decoder, vocab,DEVICE).to(DEVICE)
criterion = nn.CrossEntropyLoss(ignore_index=vocab.stoi["<PAD>"])
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)



trainer = Trainer(model, train_loader, criterion, optimizer, vocab, DEVICE)
trainer.train(NUM_EPOCHS)




plt.plot(trainer.train_losses, label='Loss')
plt.plot(trainer.train_accuracies, label='Accuracy')
plt.xlabel("Epoch")
plt.legend()
plt.title("Training Loss & Accuracy")
plt.grid()
plt.show()


# Model BLEU Evaluation

In [None]:
def evaluate_bleu(model, dataloader, vocab, device, max_samples=1000):
    model.eval()
    smooth_fn = SmoothingFunction().method1

    total_bleu1 = 0
    total_bleu2 = 0
    total_bleu3 = 0
    total_bleu4 = 0

    num_samples = min(len(dataloader.dataset), max_samples)

    with torch.no_grad():
        for i, (images, captions) in enumerate(tqdm(dataloader, desc="Evaluating")):
            if i * images.size(0) >= max_samples:
                break

            for img, true_caption in zip(images, captions):
                img = img.to(device)

                # Generate prediction
                pred = model.generate_caption(img)
                pred_tokens = pred.split()

                # Reference caption as tokens
                ref_tokens = [vocab.itos[idx.item()] for idx in true_caption if idx.item() not in [
                    vocab.stoi["<PAD>"], vocab.stoi["<SOS>"], vocab.stoi["<EOS>"]
                ]]
                reference = [ref_tokens]

                total_bleu1 += sentence_bleu(reference, pred_tokens, weights=(1, 0, 0, 0), smoothing_function=smooth_fn)
                total_bleu2 += sentence_bleu(reference, pred_tokens, weights=(0.5, 0.5, 0, 0), smoothing_function=smooth_fn)
                total_bleu3 += sentence_bleu(reference, pred_tokens, weights=(0.33, 0.33, 0.33, 0), smoothing_function=smooth_fn)
                total_bleu4 += sentence_bleu(reference, pred_tokens, weights=(0.25, 0.25, 0.25, 0.25), smoothing_function=smooth_fn)

    bleu1 = total_bleu1 / num_samples
    bleu2 = total_bleu2 / num_samples
    bleu3 = total_bleu3 / num_samples
    bleu4 = total_bleu4 / num_samples

    return bleu1, bleu2, bleu3, bleu4




train_bleus = evaluate_bleu(model, train_loader, vocab, DEVICE)
test_bleus = evaluate_bleu(model, test_loader, vocab, DEVICE)

print("Train BLEU@1-4:", train_bleus)
print("Test  BLEU@1-4:", test_bleus)



bleu_labels = ['BLEU-1', 'BLEU-2', 'BLEU-3', 'BLEU-4']
x = np.arange(len(bleu_labels))

plt.figure(figsize=(8, 5))
plt.bar(x - 0.15, train_bleus, width=0.3, label='Train', color='skyblue')
plt.bar(x + 0.15, test_bleus, width=0.3, label='Test', color='salmon')
plt.xticks(x, bleu_labels)
plt.ylabel("BLEU Score")
plt.ylim(0, 1)
plt.title("BLEU@1-4 Scores")
plt.legend()
plt.grid(True)
plt.show()


## Helper functions for Inference

In [None]:
def unnormalize(tensor, mean, std):
    """
    Reverses normalization on a tensor image.
    Args:
        tensor: torch.Tensor of shape [3, H, W]
        mean: list of mean values used in normalization (e.g., [0.485, 0.456, 0.406])
        std: list of std values used in normalization (e.g., [0.229, 0.224, 0.225])
    Returns:
        Unnormalized tensor
    """
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)  # t = t * std + mean
    return torch.clamp(tensor, 0, 1)  # clip to valid image range

def generate_caption_from_dataset_sample(dataset, model, vocab, device, index=None, max_len=20):
    if index is None:
        index = torch.randint(0, len(dataset), (1,)).item()

    # Get sample from dataset
    image_tensor, original_caption = dataset[index]

    # Ensure shape is [1, 3, 224, 224]
    print("Image tensor shape before processing:", image_tensor.shape)


    # Generate caption
    model.eval()

    generated_caption = model.generate_caption(image_tensor)



    original_caption_words = [vocab.itos[idx.item()] for idx in original_caption if idx.item() not in {vocab.stoi["<SOS>"], vocab.stoi["<EOS>"], vocab.stoi["<PAD>"]}]
    original_caption_str = ' '.join(original_caption_words)

    original_caption_str = ' '.join(original_caption_words)


    # Display image
    unnorm = unnormalize(image_tensor.squeeze(0).cpu(), mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    plt.imshow(F.to_pil_image(unnorm))

    plt.axis("off")
    plt.title(generated_caption, fontsize=14)
    plt.show()

    return generated_caption, original_caption_str






# Generate caption for a random sample from training dataset
caption, original = generate_caption_from_dataset_sample(
    dataset=train_dataset,  # or test_dataset
    model=model,
    vocab=vocab,
    device=DEVICE
)

print("Orginal Caption:", original)
print("Generated Caption:", caption)



caption2,_=generate_caption_from_dataset_sample(train_dataset, model, vocab, DEVICE, index=10)
caption2,_=generate_caption_from_dataset_sample(train_dataset, model, vocab, DEVICE, index=87)


### Replace the Decoder with LSTMDecoder in model definition to get a model with LSTM as Decoder
