### Imports

In [84]:
import json, os, random
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, BertModel, get_linear_schedule_with_warmup
from sklearn.metrics import precision_recall_fscore_support
from tqdm import tqdm
import numpy as np

### Import Captions from Part-B

In [85]:
with open("/kaggle/input/captions-from-part-b/masked_image_captions_custom_model.json", "r") as f:
    mymodel_captions = json.load(f)

with open("/kaggle/input/captions-from-part-b/smolvlm_masked_image_captions.json", "r") as f:
    smolvlm_captions_original = json.load(f)


In [86]:

smolvlm_captions = {}

def clean_caption(caption):
    if "Assistant:" in caption:
        return caption.split("Assistant:")[1].strip()
    return caption


for pct in smolvlm_captions_original:
    smolvlm_captions[pct] = {}
    for img_name, caption in smolvlm_captions_original[pct].items():
        smolvlm_captions[pct][img_name] = clean_caption(caption)


print(mymodel_captions["10"]["test_1.jpg"])
print("\nBefore cleaning (smolvlm):")
print(smolvlm_captions_original["10"]["test_1.jpg"])
print("\nAfter cleaning (smolvlm):")
print(smolvlm_captions["10"]["test_1.jpg"])

A man is standing on a sidewalk. He is wearing a black jacket and a black helmet. He is holding a white and red cell phone. There is a man standing on the sidewalk next to him. The man is wearing a black shirt and a

Before cleaning (smolvlm):
User:<image>What's in this image?
Assistant: A busy city street with a modern building in the background.

After cleaning (smolvlm):
A busy city street with a modern building in the background.


### Load the captions given in the dataset ( Provided in the assignment itself )

In [87]:
def load_original_captions(base_path="/kaggle/input/dataset/custom_captions_dataset"):
    all_splits = ["train", "val", "test"]
    original_captions = {}
    for split in all_splits:
        df = pd.read_csv(os.path.join(base_path, f"{split}.csv"))
        for _, row in df.iterrows():
            original_captions[row['filename']] = row['caption']
    return original_captions

original_captions = load_original_captions()

### Create dataset with balanced classes

#### label : 1 for custom model 
#### label : 0 for smolvlm model

In [88]:
def create_classifier_dataset(smol_data, custom_data, original_caps):
    dataset = []
    for pct in smol_data:
        pct_str = str(pct)
        for img_name in smol_data[pct_str]:
            if img_name in custom_data[pct_str] and img_name in original_caps:
                orig = original_caps[img_name]
                dataset.append({
                    "original": orig,
                    "generated": smol_data[pct_str][img_name],
                    "occlusion": pct_str,
                    "label": 0,  # SmolVLM (Model A)
                    "img_name": img_name
                })
                dataset.append({
                    "original": orig,
                    "generated": custom_data[pct_str][img_name],
                    "occlusion": pct_str,
                    "label": 1,  # Custom model (Model B)
                    "img_name": img_name
                })
    return dataset

full_data = create_classifier_dataset(smolvlm_captions, mymodel_captions, original_captions)
print(f"dataset length - {len(full_data)} examples")

dataset length - 5568 examples


### Split the data in 70:10:20 ratio ( train , val , test ) 

In [89]:
def split_by_image(data, train_ratio=0.7, val_ratio=0.1):
    
    random.seed(42)
    images = {}
    
    for d in data:
        img_id = d["img_name"]
        if img_id not in images:
            images[img_id] = []
        images[img_id].append(d)
    
    image_ids = list(images.keys())
    random.shuffle(image_ids)
    
    n = len(image_ids)
    train_ids = set(image_ids[:int(train_ratio * n)])
    val_ids = set(image_ids[int(train_ratio * n):int((train_ratio + val_ratio) * n)])
    test_ids = set(image_ids[int((train_ratio + val_ratio) * n):])
    
    train_set, val_set, test_set = [], [], []
    
    for img_id in train_ids:
        train_set.extend(images[img_id])
    
    for img_id in val_ids:
        val_set.extend(images[img_id])
    
    for img_id in test_ids:
        test_set.extend(images[img_id])
    
    return train_set, val_set, test_set

train_data, val_data, test_data = split_by_image(full_data)
print(f"Length of train_data : {len(train_data)}")
print(f"Length of val_data   : {len(val_data)}")
print(f"Length of test_data  : {len(test_data)}")

Length of train_data : 3894
Length of val_data   : 558
Length of test_data  : 1116


In [90]:
class CaptionComparisonDataset(Dataset):
    def __init__(self, data, tokenizer, max_len=128):
        self.data = data
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        item = self.data[idx]
        
        # <original_caption> <SEP> <generated_caption> <SEP> <perturbation_percentage>
        text = f"{item['original']} [SEP] {item['generated']} [SEP] {item['occlusion']}"
        
        # Use proper tokenization with attention mask
        encoding = self.tokenizer(
            text,
            add_special_tokens=True,
            max_length=self.max_len,
            padding='max_length',
            truncation=True,
            return_attention_mask=True,
            return_tensors='pt'
        )
        
        return {
            'input_ids': encoding['input_ids'].flatten(),
            'attention_mask': encoding['attention_mask'].flatten(),
            'label': torch.tensor(item['label'], dtype=torch.long)
        }

### Model Definition (BERT-base with custom linear layers)
#### In CaptionClassifier class :
##### Freezed the BERT embeddings to prevent overfitting and freezed the first 8 layers to reduce trainable parameters

#### Pooled Output : 
##### The special [CLS] token's final representation after being passed through a linear layer and tanh activation. 
##### It's a single vector that captures the aggregate meaning of the entire input sequence, designed specifically for classification tasks

In [91]:
class CaptionClassifier(nn.Module):
    def __init__(self, dropout_rate=0.3):
        super(CaptionClassifier, self).__init__()
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        
        for param in self.bert.embeddings.parameters():
            param.requires_grad = False
        
        for layer in self.bert.encoder.layer[:8]:
            for param in layer.parameters():
                param.requires_grad = False
        
        hidden_size = self.bert.config.hidden_size
        
        self.classifier = nn.Sequential(
            nn.Linear(hidden_size, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(512, 128),
            nn.BatchNorm1d(128),
            nn.ReLU(),
            nn.Dropout(dropout_rate),
            nn.Linear(128, 2)  
        )
        self._init_weights()
    
    def _init_weights(self):
        for module in self.classifier:
            if isinstance(module, nn.Linear):
                nn.init.xavier_normal_(module.weight)
                if module.bias is not None:
                    nn.init.zeros_(module.bias)

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(
            input_ids=input_ids,
            attention_mask=attention_mask
        )
        
        pooled_output = outputs.pooler_output        
        return self.classifier(pooled_output)

## Decalring tokenizer and dataset with the earlier definition of CaptionComparisonDataset

In [92]:
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")

train_dataset = CaptionComparisonDataset(train_data, tokenizer)
val_dataset = CaptionComparisonDataset(val_data, tokenizer)
test_dataset = CaptionComparisonDataset(test_data, tokenizer)

batch_size = 16
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)
test_loader = DataLoader(test_dataset, batch_size=batch_size)


### Training Function ( TQDM for progress tracking )

In [93]:
def train_classifier(model, train_loader, val_loader, optimizer, scheduler, criterion, device, epochs=3):
    model.to(device)
    best_val_loss = float('inf')
    
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    for epoch in range(epochs):
        model.train()
        train_loss = 0
        correct = 0
        total = 0
        
        progress_bar = tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs} [Train]")
        for batch in progress_bar:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            
            optimizer.zero_grad()            
            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            loss.backward()
            # Gradient clipping to prevent exploding gradients
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            
            optimizer.step()
            scheduler.step()
            train_loss += loss.item()
            preds = outputs.argmax(dim=1)
            correct += (preds == labels).sum().item()
            total += labels.size(0)
            
            progress_bar.set_postfix({
                'loss': f"{loss.item():.4f}",
                'acc': f"{correct/total:.4f}"
            })
        
        avg_train_loss = train_loss / len(train_loader)
        train_acc = correct / total
        
        train_losses.append(avg_train_loss)
        train_accs.append(train_acc)
        
        model.eval()
        val_loss = 0
        correct = 0
        total = 0
        
        with torch.no_grad():
            for batch in tqdm(val_loader, desc=f"Epoch {epoch+1}/{epochs} [Val]"):
                input_ids = batch['input_ids'].to(device)
                attention_mask = batch['attention_mask'].to(device)
                labels = batch['label'].to(device)
                
                outputs = model(input_ids, attention_mask)
                loss = criterion(outputs, labels)
                
                val_loss += loss.item()
                preds = outputs.argmax(dim=1)
                correct += (preds == labels).sum().item()
                total += labels.size(0)
        
        avg_val_loss = val_loss / len(val_loader)
        val_acc = correct / total
        
        val_losses.append(avg_val_loss)
        val_accs.append(val_acc)
        
        print(f"\nEpoch {epoch+1}/{epochs}")
        print(f"  Train Loss: {avg_train_loss:.4f} | Train Acc: {train_acc:.4f}")
        print(f"  Val Loss: {avg_val_loss:.4f} | Val Acc: {val_acc:.4f}")
        
       
    return {
        'train_losses': train_losses,
        'val_losses': val_losses,
        'train_accs': train_accs,
        'val_accs': val_accs
    }


### Evaluation Function (for macro precision, recall, and F1 scores)

In [94]:
def evaluate_classifier(model, dataloader, device):
    model.eval()
    model.to(device)
    
    y_true = []
    y_pred = []
    total_loss = 0
    criterion = nn.CrossEntropyLoss()

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)

            outputs = model(input_ids, attention_mask)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            preds = torch.argmax(outputs, dim=1)

            y_true.extend(labels.cpu().numpy())
            y_pred.extend(preds.cpu().numpy())

    avg_loss = total_loss / len(dataloader)
    accuracy = sum(1 for t, p in zip(y_true, y_pred) if t == p) / len(y_true)
    
    precision, recall, f1, _ = precision_recall_fscore_support(y_true, y_pred, average='macro')
    
    return {
        "Loss": avg_loss,
        "Accuracy": accuracy,
        "Precision": precision,
        "Recall": recall,
        "F1": f1
    }


### Analysis by Occlusion Level

In [95]:
def analyze_by_occlusion(model, test_data, tokenizer, device):
    model.eval()
    results_by_occlusion = {"10": {"correct": 0, "total": 0},
                           "50": {"correct": 0, "total": 0},
                           "80": {"correct": 0, "total": 0}}
    
    dataset = CaptionComparisonDataset(test_data, tokenizer)
    loader = DataLoader(dataset, batch_size=16)
    
    with torch.no_grad():
        for i, batch in enumerate(loader):
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['label'].to(device)
            
            outputs = model(input_ids, attention_mask)
            preds = torch.argmax(outputs, dim=1)
            
            # Match predictions with original data to get occlusion level
            for j in range(len(preds)):
                idx = i * 16 + j
                if idx < len(test_data):
                    occlusion = test_data[idx]["occlusion"]
                    correct = preds[j].item() == labels[j].item()
                    
                    results_by_occlusion[occlusion]["total"] += 1
                    if correct:
                        results_by_occlusion[occlusion]["correct"] += 1
    
    occlusion_results = {}
    for occlusion in results_by_occlusion:
        correct = results_by_occlusion[occlusion]["correct"]
        total = results_by_occlusion[occlusion]["total"]
        accuracy = correct / total if total > 0 else 0
        occlusion_results[occlusion] = accuracy
        print(f"Occlusion {occlusion}%: Accuracy = {accuracy:.4f} ({correct}/{total})")
    
    return occlusion_results


### Defining the parameters and Evaluate

In [96]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = CaptionClassifier(dropout_rate=0.3)
optimizer = torch.optim.AdamW(
    [p for p in model.parameters() if p.requires_grad],
    lr=2e-5,
    weight_decay=0.01
)

criterion = nn.CrossEntropyLoss()
num_epochs = 3
total_steps = len(train_loader) * num_epochs
scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=int(0.1 * total_steps),
    num_training_steps=total_steps
)

train_classifier(
    model, 
    train_loader, 
    val_loader, 
    optimizer, 
    scheduler, 
    criterion, 
    device, 
    epochs=num_epochs
)

print("\n")
metrics = evaluate_classifier(model, test_loader, device)

print(f"Macro Precision: {metrics['Precision']:.4f}")
print(f"Macro Recall: {metrics['Recall']:.4f}")
print(f"Macro F1: {metrics['F1']:.4f}")

Epoch 1/3 [Train]: 100%|██████████| 244/244 [00:31<00:00,  7.84it/s, loss=0.0074, acc=0.8546]
Epoch 1/3 [Val]: 100%|██████████| 35/35 [00:02<00:00, 12.98it/s]



Epoch 1/3
  Train Loss: 0.3307 | Train Acc: 0.8546
  Val Loss: 0.1095 | Val Acc: 0.9588


Epoch 2/3 [Train]: 100%|██████████| 244/244 [00:31<00:00,  7.87it/s, loss=0.2664, acc=0.9620]
Epoch 2/3 [Val]: 100%|██████████| 35/35 [00:02<00:00, 12.99it/s]



Epoch 2/3
  Train Loss: 0.1149 | Train Acc: 0.9620
  Val Loss: 0.0597 | Val Acc: 0.9767


Epoch 3/3 [Train]: 100%|██████████| 244/244 [00:31<00:00,  7.86it/s, loss=0.0286, acc=0.9725]
Epoch 3/3 [Val]: 100%|██████████| 35/35 [00:02<00:00, 12.70it/s]



Epoch 3/3
  Train Loss: 0.0856 | Train Acc: 0.9725
  Val Loss: 0.0496 | Val Acc: 0.9803




Evaluating: 100%|██████████| 70/70 [00:05<00:00, 13.00it/s]

Macro Precision: 0.9840
Macro Recall: 0.9839
Macro F1: 0.9839



