In [3]:
import os
import csv
import json
import torch
from torch import nn, optim
from torch.cuda.amp import autocast, GradScaler
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms, models
from transformers import AutoTokenizer
from nltk.translate.bleu_score import corpus_bleu
from PIL import Image
from tqdm import tqdm
from pycocoevalcap.cider.cider import Cider
from pycocoevalcap.meteor.meteor import Meteor
from pycocoevalcap.rouge.rouge import Rouge
from bert_score import score as bert_score

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
# Hyperparameters
feature_dim = 256  # Increased dimension for richer features
num_layers = 6     # More layers for deeper processing
nhead = 8          # More attention heads for multi-faceted attention
dim_feedforward = 1024  # Larger feedforward network
batch_size = 8
learning_rate = 1e-4
num_epochs = 50
max_length = 50

In [5]:
# Device setup
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [6]:
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
tokenizer.bos_token = tokenizer.cls_token  # Use [CLS] as <BOS>
tokenizer.eos_token = tokenizer.sep_token  # Use [SEP] as <EOS>
if tokenizer.pad_token is None:
    tokenizer.pad_token = tokenizer.eos_token
vocab_size = tokenizer.vocab_size
print(f"BERT Vocabulary Size: {vocab_size}")

BERT Vocabulary Size: 30522


In [7]:
# --- Dataset Definition ---
class ImageCaptionDataset(Dataset):
    def __init__(self, image_dir, captions_file, tokenizer, max_length=max_length):
        self.image_dir = image_dir
        with open(captions_file, "r") as f:
            self.data = json.load(f)
        self.image_filenames = list(self.data.keys())
        self.captions = list(self.data.values())
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.transform = transforms.Compose([
            transforms.Resize((224, 224)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])

    def __len__(self):
        return len(self.image_filenames)

    def __getitem__(self, idx):
        image_path = os.path.join(self.image_dir, self.image_filenames[idx])
        image = Image.open(image_path).convert("RGB")
        image = self.transform(image)
        caption = self.captions[idx]
        tokenized = self.tokenizer(
            caption,
            add_special_tokens=True,  # Adds [CLS] and [SEP]
            padding="max_length",
            truncation=True,
            max_length=self.max_length,
            return_tensors="pt"
        )
        input_ids = tokenized["input_ids"].squeeze(0)  # Remove batch dimension
        attention_mask = tokenized["attention_mask"].squeeze(0)
        return image, input_ids, attention_mask, caption

In [8]:
# --- Enhanced CNN Encoder ---
class CNNEncoder(nn.Module):
    def __init__(self, feature_dim=feature_dim):
        super(CNNEncoder, self).__init__()
        resnet = models.resnet50(pretrained=True)
        self.features = nn.Sequential(*list(resnet.children())[:-2])
        self.projection = nn.Sequential(
            nn.Conv2d(2048, 1024, kernel_size=1, stride=1),
            nn.ReLU(),
            nn.Conv2d(1024, feature_dim, kernel_size=1, stride=1),
            nn.ReLU()
        )
        self.attn_pool = nn.MultiheadAttention(embed_dim=feature_dim, num_heads=4, batch_first=True)
        self.feature_dim = feature_dim

    def forward(self, images):
        features = self.features(images)
        features = self.projection(features)
        batch_size = features.size(0)
        features = features.permute(0, 2, 3, 1).reshape(batch_size, 49, self.feature_dim)
        attn_output, _ = self.attn_pool(features, features, features)
        return attn_output

In [9]:
# --- Enhanced Transformer Decoder ---
class TransformerDecoder(nn.Module):
    def __init__(self, feature_dim, vocab_size, num_layers=num_layers, nhead=nhead, dim_feedforward=dim_feedforward):
        super(TransformerDecoder, self).__init__()
        self.feature_dim = feature_dim
        self.embedding = nn.Embedding(vocab_size, feature_dim)
        self.pos_encoder = nn.Parameter(torch.zeros(1, max_length, feature_dim))
        self.decoder_layer = nn.TransformerDecoderLayer(
            d_model=feature_dim, nhead=nhead, dim_feedforward=dim_feedforward, dropout=0.1, batch_first=False
        )
        self.decoder = nn.TransformerDecoder(self.decoder_layer, num_layers=num_layers)
        self.fc = nn.Sequential(
            nn.Linear(feature_dim, feature_dim * 2),
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(feature_dim * 2, vocab_size)
        )
        self.norm = nn.LayerNorm(feature_dim)

    def forward(self, encoder_features, captions, tgt_key_padding_mask=None):
        batch_size = captions.size(0)
        embeddings = self.embedding(captions) + self.pos_encoder[:, :captions.size(1), :]
        embeddings = self.norm(embeddings).permute(1, 0, 2)
        memory = encoder_features.permute(1, 0, 2)
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(captions.size(1)).to(captions.device)
        output = self.decoder(embeddings, memory, tgt_mask=tgt_mask, tgt_key_padding_mask=tgt_key_padding_mask)
        output = self.norm(output)
        logits = self.fc(output.permute(1, 0, 2))
        return logits

In [10]:
# --- Combined Model ---
class ImageCaptioningModel(nn.Module):
    def __init__(self, encoder, decoder):
        super(ImageCaptioningModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, images, captions, tgt_key_padding_mask=None):
        features = self.encoder(images)
        outputs = self.decoder(features, captions, tgt_key_padding_mask=tgt_key_padding_mask)
        return outputs

In [11]:
# --- Validation Function ---
def validate_model(model, dataloader, criterion, device):
    model.eval()
    total_val_loss = 0.0
    with torch.no_grad():
        for images, input_ids, attention_mask, _ in tqdm(dataloader, desc="Validation", leave=False):
            images, input_ids, attention_mask = images.to(device), input_ids.to(device), attention_mask.to(device)
            with autocast():
                outputs = model(images, input_ids[:, :-1], tgt_key_padding_mask=(attention_mask[:, :-1] == 0))
                loss = criterion(outputs.reshape(-1, outputs.size(-1)), input_ids[:, 1:].reshape(-1))
            total_val_loss += loss.item()
    return total_val_loss / len(dataloader)

In [12]:
# --- Training Function ---
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs, device, model_save_path):
    scaler = GradScaler()
    best_loss = float("inf")
    epochs_no_improve = 0
    patience = 3

    with open("training_results.csv", "w", newline="") as csv_file:
        csv_writer = csv.writer(csv_file)
        csv_writer.writerow(["epoch", "train_loss", "val_loss", "best_val_loss", "lr"])
        csv_file.flush()

        for epoch in range(num_epochs):
            epoch_pbar = tqdm(total=len(train_loader), desc=f"Epoch {epoch+1}/{num_epochs}", leave=True)
            model.train()
            total_loss = 0.0
            
            for images, input_ids, attention_mask, _ in train_loader:
                images, input_ids, attention_mask = images.to(device), input_ids.to(device), attention_mask.to(device)
                optimizer.zero_grad()
                with autocast():
                    outputs = model(images, input_ids[:, :-1], tgt_key_padding_mask=(attention_mask[:, :-1] == 0))
                    loss = criterion(outputs.reshape(-1, outputs.size(-1)), input_ids[:, 1:].reshape(-1))
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
                total_loss += loss.item()
                epoch_pbar.set_postfix({"Batch Loss": f"{loss.item():.4f}"})
                epoch_pbar.update(1)

            avg_train_loss = total_loss / len(train_loader)
            avg_val_loss = validate_model(model, val_loader, criterion, device)
            scheduler.step(avg_val_loss)
            current_lr = optimizer.param_groups[0]["lr"]

            epoch_pbar.set_postfix({
                "Train Loss": f"{avg_train_loss:.4f}",
                "Val Loss": f"{avg_val_loss:.4f}",
                "Best Val Loss": f"{best_loss:.4f}",
                "LR": f"{current_lr:.6f}"
            })
            epoch_pbar.close()

            if best_loss - avg_val_loss > 0.001:
                best_loss = avg_val_loss
                epochs_no_improve = 0
                torch.save(model.state_dict(), model_save_path)
                print(f"Best model saved with val loss {best_loss:.4f}")
            else:
                epochs_no_improve += 1

            csv_writer.writerow([epoch + 1, avg_train_loss, avg_val_loss, best_loss, current_lr])
            csv_file.flush()

            if epochs_no_improve >= patience:
                print("Early stopping triggered!")
                break

In [13]:
# --- Caption Generation ---
def generate_caption(model, image, tokenizer, max_length=max_length, device="cuda"):
    model.eval()
    with torch.no_grad():
        image = image.unsqueeze(0).to(device)
        features = model.encoder(image)
        generated = torch.tensor([tokenizer.bos_token_id], device=device).unsqueeze(0)
        caption_ids = [tokenizer.bos_token_id]
        for _ in range(max_length - 1):
            logits = model.decoder(features, generated)
            next_token = logits[:, -1, :].argmax(dim=-1, keepdim=True)
            generated = torch.cat([generated, next_token], dim=1)
            token_id = next_token.item()
            caption_ids.append(token_id)
            if token_id == tokenizer.eos_token_id:
                break
        caption = tokenizer.decode(caption_ids, skip_special_tokens=True)
        return caption

In [14]:
# --- Evaluation Function ---
def evaluate_model(model, test_loader, tokenizer, device):
    model.eval()
    refs, hyps = {}, {}
    with torch.no_grad():
        for i, (images, _, _, captions) in enumerate(test_loader):
            generated = [generate_caption(model, img, tokenizer, device=device) for img in images]
            for j, (ref, hyp) in enumerate(zip(captions, generated)):
                idx = i * test_loader.batch_size + j
                refs[idx] = [ref]
                hyps[idx] = [hyp]

    cider_scorer = Cider()
    cider_score, _ = cider_scorer.compute_score(refs, hyps)
    meteor_scorer = Meteor()
    meteor_score, _ = meteor_scorer.compute_score(refs, hyps)
    rouge_scorer = Rouge()
    rouge_score, _ = rouge_scorer.compute_score(refs, hyps)
    ref_list = [r[0] for r in refs.values()]
    hyp_list = [h[0] for h in hyps.values()]
    P, R, F1 = bert_score(hyp_list, ref_list, lang="en", verbose=True)
    bert_f1 = F1.mean().item()
    bleu_score = corpus_bleu([[r.split()] for r in ref_list], [h.split() for h in hyp_list])

    print(
        f"BLEU: {bleu_score:.4f}, CIDEr: {cider_score:.4f}, METEOR: {meteor_score:.4f}, "
        f"ROUGE-L: {rouge_score:.4f}, BERT Score: {bert_f1:.4f}"
    )
    return bleu_score, cider_score, meteor_score, rouge_score, bert_f1

In [15]:
# --- Test Function for Specific Image ---
def test_image_caption(image_path, model, tokenizer, device):
    transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image = Image.open(image_path).convert("RGB")
    image = transform(image)
    caption = generate_caption(model, image, tokenizer, max_length=max_length, device=device)
    print(f"Generated Caption for {image_path}: {caption}")

In [16]:
IMAGE_DIR = r"/media/vaibhav/Programming/Project/train2017"
CAPTIONS_FILE = "merged_captions.json"
MODEL_SAVE_PATH = "complex_model_bert.pth"

# Dataset and DataLoaders
dataset = ImageCaptionDataset(IMAGE_DIR, CAPTIONS_FILE, tokenizer, max_length=max_length)
train_size = int(0.7 * len(dataset))
val_size = int(0.15 * len(dataset))
test_size = len(dataset) - train_size - val_size
train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

In [17]:
# Model Initialization
encoder = CNNEncoder(feature_dim=feature_dim)
decoder = TransformerDecoder(
    feature_dim=feature_dim, vocab_size=vocab_size, num_layers=num_layers,
    nhead=nhead, dim_feedforward=dim_feedforward
)
model = ImageCaptioningModel(encoder, decoder).to(device)

# Training Setup
criterion = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=1e-5)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", factor=0.1, patience=2)



In [18]:
# Train the Model
train_model(
    model, train_loader, val_loader, criterion, optimizer, scheduler,
    num_epochs, device, MODEL_SAVE_PATH
)

  scaler = GradScaler()
  with autocast():
  with autocast():
Epoch 1/50: 100%|██████████| 1750/1750 [02:56<00:00,  9.89it/s, Train Loss=4.3830, Val Loss=3.6167, Best Val Loss=inf, LR=0.000100]


Best model saved with val loss 3.6167


Epoch 2/50:   1%|▏         | 22/1750 [00:02<02:50, 10.14it/s, Batch Loss=3.4599]

KeyboardInterrupt: 

In [19]:
# Load Best Model and Evaluate
model.load_state_dict(torch.load(MODEL_SAVE_PATH))
evaluate_model(model, test_loader, tokenizer, device)

2025-03-25 14:14:57.033123: I tensorflow/core/util/port.cc:153] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-03-25 14:14:57.154894: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1742892297.199837    3418 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1742892297.213063    3418 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1742892297.307862    3418 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linking 

calculating scores...
computing bert embedding.


100%|██████████| 52/52 [00:18<00:00,  2.88it/s]


computing greedy matching.


100%|██████████| 47/47 [00:00<00:00, 114.57it/s]


done in 18.50 seconds, 162.20 sentences/sec
BLEU: 0.0218, CIDEr: 0.0745, METEOR: 0.0938, ROUGE-L: 0.1968, BERT Score: 0.8721


(0.021847830750659554,
 np.float64(0.07453058917477384),
 0.09382001402752198,
 np.float64(0.19684828735403861),
 0.8721246719360352)

In [21]:
# Test on a specific image
test_image_path = r"/media/vaibhav/Programming/Project/train2017/000000391895.jpg"
test_image_caption(test_image_path, model, tokenizer, device)

Generated Caption for /media/vaibhav/Programming/Project/train2017/000000391895.jpg: a group of people are seen walking down a street, with a large city street.
