### Import Packages

In [None]:
%pip install -U nltk rouge-score

In [None]:
import os
import random
from itertools import product

import nltk
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from nltk.translate.bleu_score import corpus_bleu
from nltk.translate.meteor_score import meteor_score
from PIL import Image
from rouge_score import rouge_scorer
from torch.optim.lr_scheduler import ReduceLROnPlateau
from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from transformers import (
    AutoImageProcessor,
    AutoModel,
    AutoModelForVision2Seq,
    AutoProcessor,
    AutoTokenizer,
    GPT2LMHeadModel,
)
from transformers.image_utils import load_image

In [None]:
nltk.download("punkt", quiet=True)
nltk.download("wordnet", quiet=True)

In [None]:
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


set_seed()
device = "cuda" if torch.cuda.is_available() else "cpu"

### Image Occlusion for Data Augmentation


In [None]:
def occulude_image(image: np.array, mask_percentage: float):
    h, w, c = image.shape
    block_size = 16
    blocks_h = h // block_size
    blocks_w = w // block_size

    masked_image = image.copy()

    blocks = [(i, j) for i in range(blocks_h) for j in range(blocks_w)]

    num_blocks_to_mask = int(len(blocks) * mask_percentage)
    blocks_to_mask = random.sample(blocks, num_blocks_to_mask)

    for i, j in blocks_to_mask:
        masked_image[
            i * block_size : (i + 1) * block_size,
            j * block_size : (j + 1) * block_size,
            :,
        ] = 0

    return masked_image


def load_image_from_path(image_path: str):
    image = np.array(Image.open(image_path).convert("RGB"))
    return image


def convert_image_to_occuluded(image_path: str, mask_percentage: float):
    image = load_image_from_path(image_path)
    masked_image = occulude_image(image, mask_percentage)
    return masked_image

### Zero-Shot Captioning with Occlusion


In [None]:
def calculate_metrics(predictions, ground_truths):
    references_for_bleu = [[gt.split()] for gt in ground_truths]
    predictions_for_bleu = [pred.split() for pred in predictions]
    bleu = corpus_bleu(references_for_bleu, predictions_for_bleu)

    scorer = rouge_scorer.RougeScorer(["rougeL"], use_stemmer=True)
    rouge_scores = []
    for pred, ref in zip(predictions, ground_truths):
        rouge_result = scorer.score(ref, pred)
        rouge_scores.append(rouge_result["rougeL"].fmeasure)
    avg_rouge = sum(rouge_scores) / len(rouge_scores) if rouge_scores else 0

    meteor_scores = []
    for pred, ref in zip(predictions, ground_truths):
        meteor_scores.append(meteor_score([ref.split()], pred.split()))
    avg_meteor = sum(meteor_scores) / len(meteor_scores) if meteor_scores else 0

    return {"BLEU": bleu, "ROUGE-L": avg_rouge, "METEOR": avg_meteor}


def zero_shot_captioning(
    image_path,
    model,
    processor,
    model_name="HuggingFaceTB/SmolVLM-Instruct",
    occulude_percentage=0.5,
):
    try:
        image = convert_image_to_occuluded(
            image_path, mask_percentage=occulude_percentage
        )
        message = [
            {
                "role": "user",
                "content": [
                    {"type": "image", "image": image},
                    {"type": "text", "text": "Please Describe the Image"},
                ],
            }
        ]
        prompt = processor.apply_chat_template(message, add_generation_prompt=True)

        inputs = processor(
            images=image, text=prompt, return_tensors="pt", padding=True
        ).to(device)

        with torch.no_grad():
            generated_ids = model.generate(**inputs, max_new_tokens=128)
            generated_ids = generated_ids[:, inputs["input_ids"].size(1) :]
        text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
        print(f"Generated Caption: {text}")
        return text
    except Exception as e:
        print(f"Error processing {image_path}: {e}")
        return ""


### Evaluate on Occluded Images: Model Performance with Perturbations


In [None]:
def evaluate_on_occluded_images(
    model_name="HuggingFaceTB/SmolVLM-Instruct",
    custom_model=None,
    test_csv_path="./custom_captions_dataset/test.csv",
    image_dir="./custom_captions_dataset/test/",
    occlusion_levels=[0.1, 0.5, 0.8],
    tokenizer=None,
    image_processor=None,
    device="cuda",
    model_type="smolvlm",
    classifier_data=None,
):
    test_data = pd.read_csv(test_csv_path)
    results = {}
    occlusion_results = []
    if model_type == "smolvlm":
        baseline_results = pd.read_csv(
            "/kaggle/input/dl-assignment-2/custom_captions_dataset/smol_results.csv"
        )
        processor = AutoProcessor.from_pretrained(model_name)
        model = AutoModelForVision2Seq.from_pretrained(
            model_name,
            torch_dtype=torch.bfloat16,
            _attn_implementation="eager",
        ).to(device)
    elif model_type == "custom":
        baseline_results = pd.read_csv(
            "/kaggle/input/dl-assignment-2/custom_captions_dataset/smol_results.csv"
        )

    if classifier_data is None:
        classifier_data = {
            "image_id": [],
            "original_caption": [],
            "generated_caption": [],
            "perturbation_percentage": [],
            "model_type": [],
        }
    for occlusion_level in occlusion_levels:
        print(
            f"\nEvaluating {model_type} model at {occlusion_level * 100}% occlusion..."
        )
        occlusion_predictions = []
        occlusion_ground_truths = []

        if model_type == "smolvlm":
            for idx in tqdm(
                range(len(test_data)),
                desc=f"Processing {occlusion_level * 100}% Occluded Images",
            ):
                image_data = test_data.iloc[idx]
                image_path = os.path.join(image_dir, image_data["filename"])

                generated_text = zero_shot_captioning(
                    image_path,
                    model,
                    processor,
                    model_name,
                    occulude_percentage=occlusion_level,
                )

                if generated_text:
                    occlusion_predictions.append(generated_text)
                    occlusion_ground_truths.append(image_data["caption"])

                    classifier_data["image_id"].append(image_data["filename"])
                    classifier_data["original_caption"].append(image_data["caption"])
                    classifier_data["generated_caption"].append(generated_text)
                    classifier_data["perturbation_percentage"].append(
                        int(occlusion_level * 100)
                    )
                    classifier_data["model_type"].append("Model A (SmolVLM)")

                if device == "cuda":
                    torch.cuda.empty_cache()

        else:
            for idx in tqdm(
                range(len(test_data)),
                desc=f"Processing {occlusion_level * 100}% Occluded Images",
            ):
                image_data = test_data.iloc[idx]
                image_path = os.path.join(image_dir, image_data["filename"])

                image = np.array(Image.open(image_path).convert("RGB"))
                occluded_image = occulude_image(image, occlusion_level)
                occluded_image = Image.fromarray(occluded_image)

                pixel_values = image_processor(
                    images=occluded_image, return_tensors="pt"
                ).pixel_values.to(device)

                with torch.no_grad():
                    generated_text = custom_model.generate_caption(
                        pixel_values, tokenizer, max_length=128, temperature=0.7
                    )

                if generated_text:
                    occlusion_predictions.append(generated_text)
                    occlusion_ground_truths.append(image_data["caption"])

                    classifier_data["image_id"].append(image_data["filename"])
                    classifier_data["original_caption"].append(image_data["caption"])
                    classifier_data["generated_caption"].append(generated_text)
                    classifier_data["perturbation_percentage"].append(
                        int(occlusion_level * 100)
                    )
                    classifier_data["model_type"].append("Model B (Custom)")

                if device == "cuda":
                    torch.cuda.empty_cache()

        occlusion_metrics = calculate_metrics(
            occlusion_predictions, occlusion_ground_truths
        )

        changes = {
            f"{metric}_change": occlusion_metrics[metric]
            - baseline_results.get(metric, pd.Series([0])).values[0]
            for metric in occlusion_metrics.keys()
        }
        level_results = {
            "occlusion_level": occlusion_level,
            **occlusion_metrics,
            **changes,
        }

        occlusion_results.append(level_results)

        print(f"Results at {occlusion_level * 100}% occlusion:")
        for metric, value in occlusion_metrics.items():
            print(f"{metric} Score: {value:.4f}")
        print("Changes from baseline:")
        for metric, value in changes.items():
            print(f"{metric}: {value:.4f}")

    results["occlusion_results"] = occlusion_results
    return results, classifier_data

### Zero-Shot Captioning Evaluation on Occluded Images Using SmolVLM Model


In [None]:
classifier_data = {
    "image_id": [],
    "original_caption": [],
    "generated_caption": [],
    "perturbation_percentage": [],
    "model_type": [],
}

test_csv_path = "/kaggle/input/dl-assignment-2/custom_captions_dataset/test.csv"
image_dir = "/kaggle/input/dl-assignment-2/custom_captions_dataset/test/"
occlusion_levels = [0.1, 0.5, 0.8]

print("=== Evaluating SmolVLM Zero-Shot Model ===")
smolvlm_results, classifier_data = evaluate_on_occluded_images(
    model_name="HuggingFaceTB/SmolVLM-Instruct",
    test_csv_path=test_csv_path,
    image_dir=image_dir,
    occlusion_levels=occlusion_levels,
    device=device,
    model_type="smolvlm",
    classifier_data=classifier_data,
)

classifier_data_copy = classifier_data.copy()

###  ImageCaptionModel: Vision Transformer + GPT-2 for Image Captioning


In [None]:
class ImageCaptionModel(nn.Module):
    def __init__(
        self,
        vit_model_name="WinKawaks/vit-small-patch16-224",
        gpt2_model_name="gpt2",
        dropout_rate=0.5,
    ):
        super().__init__()

        self.encoder = AutoModel.from_pretrained(vit_model_name)
        self.encoder_dim = self.encoder.config.hidden_size

        self.decoder = GPT2LMHeadModel.from_pretrained(gpt2_model_name)
        self.decoder_dim = self.decoder.config.n_embd

        self.image_proj = nn.Sequential(
            nn.Linear(self.encoder_dim, self.decoder_dim),
            nn.LayerNorm(self.decoder_dim),
            nn.Dropout(dropout_rate),
            nn.ReLU(),
            nn.Linear(self.decoder_dim, self.decoder_dim),
        )
        self.decoder.resize_token_embeddings(self.decoder.config.vocab_size + 1)

    def forward(self, pixel_values, input_ids, attention_mask):
        encoder_outputs = self.encoder(pixel_values=pixel_values)
        image_embedding = encoder_outputs.last_hidden_state[:, 0]
        image_embedding = self.image_proj(image_embedding)
        image_embedding = image_embedding.unsqueeze(1)

        inputs_embeds = self.decoder.transformer.wte(input_ids)
        inputs_embeds = torch.cat([image_embedding, inputs_embeds], dim=1)

        extended_attention_mask = torch.cat(
            [
                torch.ones((attention_mask.size(0), 1), device=attention_mask.device),
                attention_mask,
            ],
            dim=1,
        )

        outputs = self.decoder(
            inputs_embeds=inputs_embeds,
            attention_mask=extended_attention_mask,
            return_dict=True,
        )

        return outputs.logits

    def generate_caption(
        self, pixel_values, tokenizer, max_length=128, temperature=0.7
    ):
        self.eval()
        with torch.no_grad():
            encoder_outputs = self.encoder(pixel_values=pixel_values)
            image_embedding = encoder_outputs.last_hidden_state[:, 0]
            image_embedding = self.image_proj(image_embedding).unsqueeze(1)

            generated_ids = torch.tensor(
                [[tokenizer.convert_tokens_to_ids("<|startoftext|>")]]
            ).to(pixel_values.device)
            inputs_embeds = self.decoder.transformer.wte(generated_ids)
            inputs_embeds = torch.cat([image_embedding, inputs_embeds], dim=1)

            attention_mask = torch.ones(
                (inputs_embeds.size(0), inputs_embeds.size(1)),
                device=pixel_values.device,
            )

            for _ in range(max_length):
                outputs = self.decoder(
                    inputs_embeds=inputs_embeds,
                    attention_mask=attention_mask,
                    return_dict=True,
                )

                next_token_logits = outputs.logits[:, -1, :] / temperature

                probs = F.softmax(next_token_logits, dim=-1)
                sorted_probs, sorted_indices = torch.sort(probs, descending=True)
                cumulative_probs = torch.cumsum(sorted_probs, dim=-1)

                sorted_indices_to_remove = cumulative_probs > 0.9
                sorted_indices_to_remove[..., 1:] = sorted_indices_to_remove[
                    ..., :-1
                ].clone()
                sorted_indices_to_remove[..., 0] = 0

                indices_to_remove = sorted_indices_to_remove.scatter(
                    1, sorted_indices, sorted_indices_to_remove
                )
                next_token_logits[indices_to_remove] = -float("Inf")

                next_token = torch.multinomial(
                    F.softmax(next_token_logits, dim=-1), num_samples=1
                )

                generated_ids = torch.cat([generated_ids, next_token], dim=1)

                if next_token.item() == tokenizer.eos_token_id:
                    break

                next_token_embeds = self.decoder.transformer.wte(next_token)
                inputs_embeds = torch.cat([inputs_embeds, next_token_embeds], dim=1)
                attention_mask = torch.cat(
                    [attention_mask, torch.ones((1, 1), device=pixel_values.device)],
                    dim=1,
                )

            caption = tokenizer.decode(
                generated_ids[0], skip_special_tokens=True
            ).strip()
            return caption


###  Dataset & Dataloaders for Image Captioning


In [None]:
class ImageCaptioningDataset(Dataset):
    def __init__(self, csv_file, img_dir, tokenizer, image_processor, max_length=128):
        self.img_captions = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.tokenizer = tokenizer
        self.image_processor = image_processor
        self.max_length = max_length

    def __len__(self):
        return len(self.img_captions)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.img_dir, str(self.img_captions.iloc[idx, 1]))
        caption = self.img_captions.iloc[idx, 2]

        image = Image.open(img_name).convert("RGB")
        pixel_values = self.image_processor(
            images=image, return_tensors="pt"
        ).pixel_values.squeeze(0)

        caption_with_start = f"<|startoftext|> {caption}"
        caption_encoding = self.tokenizer(
            caption_with_start,
            padding="max_length",
            max_length=self.max_length,
            truncation=True,
            return_tensors="pt",
        )

        input_ids = caption_encoding.input_ids.squeeze(0)
        attention_mask = caption_encoding.attention_mask.squeeze(0)
        target_ids = input_ids.clone()

        return {
            "pixel_values": pixel_values,
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "target_ids": target_ids,
            "caption": caption,
        }


def get_dataloaders(data_dir, batch_size=16):
    tokenizer = AutoTokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token

    special_tokens = {"additional_special_tokens": ["<|startoftext|>"]}
    tokenizer.add_special_tokens(special_tokens)

    image_processor = AutoImageProcessor.from_pretrained(
        "WinKawaks/vit-small-patch16-224"
    )

    train_dataset = ImageCaptioningDataset(
        csv_file=os.path.join(data_dir, "train.csv"),
        img_dir=os.path.join(data_dir, "train"),
        tokenizer=tokenizer,
        image_processor=image_processor,
    )

    val_dataset = ImageCaptioningDataset(
        csv_file=os.path.join(data_dir, "val.csv"),
        img_dir=os.path.join(data_dir, "val"),
        tokenizer=tokenizer,
        image_processor=image_processor,
    )

    test_dataset = ImageCaptioningDataset(
        csv_file=os.path.join(data_dir, "test.csv"),
        img_dir=os.path.join(data_dir, "test"),
        tokenizer=tokenizer,
        image_processor=image_processor,
    )

    train_loader = DataLoader(
        train_dataset,
        batch_size=batch_size,
        shuffle=True,
        num_workers=4,
        pin_memory=True,
    )

    val_loader = DataLoader(
        val_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4,
        pin_memory=True,
    )

    test_loader = DataLoader(
        test_dataset,
        batch_size=batch_size,
        shuffle=False,
        num_workers=4,
        pin_memory=True,
    )

    return train_loader, val_loader, test_loader, tokenizer, image_processor

###  Evaluate Custom Image Captioning Model on Occluded Images


In [None]:
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token
special_tokens = {"additional_special_tokens": ["<|startoftext|>"]}
tokenizer.add_special_tokens(special_tokens)
image_processor = AutoImageProcessor.from_pretrained("WinKawaks/vit-small-patch16-224")
custom_model = ImageCaptionModel().to(device)


custom_model.load_state_dict(
    torch.load(
        "/kaggle/input/dl-assignment-2/custom_captions_dataset/best_image_caption_model.pth",
        map_location=device,
    )
)

print("\n=== Evaluating Custom Model ===")
custom_results, classifier_data = evaluate_on_occluded_images(
    custom_model=custom_model,
    test_csv_path=test_csv_path,
    image_dir=image_dir,
    occlusion_levels=occlusion_levels,
    tokenizer=tokenizer,
    image_processor=image_processor,
    device=device,
    model_type="custom",
    classifier_data=classifier_data,
)


###  Save Classifier Data: Export to CSV


In [None]:
classifier_data = pd.DataFrame(classifier_data)
classifier_data.to_csv("/kaggle/working/classifier_data.csv", index=False)

### Printing Results

In [None]:
print("\n=== Custom Model Results ===")
print(custom_results)
print("\n=== SmolVLM Model Results ===")
print(smolvlm_results)

###  Save Results: Export Custom and SmolVLM Results to JSON


In [None]:
import json

with open("/kaggle/working/custom_results.json", "w") as f:
    json.dump(custom_results, f, indent=4)
with open("/kaggle/working/smolvlm_results.json", "w") as f:
    json.dump(smolvlm_results, f, indent=4)