In [15]:
# --- Necessary Imports ---
import os
import numpy as np
import pandas as pd
from PIL import Image
import random
import torch
from torch.utils.data import Dataset, DataLoader
# torchvision is needed for the custom transform you provided earlier
from torchvision import transforms
from tqdm import tqdm
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from nltk.tokenize import word_tokenize
from rouge_score import rouge_scorer # Make sure this library is installed (pip install rouge-score)

# Import models and processors/tokenizers - Base components needed
from transformers import (
    AutoProcessor,
    AutoModelForVision2Seq,
    AutoTokenizer,
    ViTModel,          # Base ViT model for encoder
    GPT2LMHeadModel,   # Base GPT-2 model for decoder
    ViTImageProcessor  # Needed if using HF processor transform, but you used torchvision
)
import torch.nn as nn # Needed for custom model definition
import torch.nn.functional as F # Needed for custom model definition

# --- Constants ---
# Paths based on your Kaggle input structure
BASE_DIR = "/kaggle/input/image-captioning-dataset/custom_captions_dataset"
TEST_CSV_PATH = os.path.join(BASE_DIR, "test.csv")
TEST_IMAGE_DIR = os.path.join(BASE_DIR, "test")
# Ensure these column names match your test.csv file
FILENAME_COL = 'filename'
CAPTION_COL = 'caption'

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
BATCH_SIZE = 8 # Adjust based on your GPU memory (Kaggle T4/P100 usually handle 8)
OCCLUSION_LEVELS = [10, 50, 80] # Percentages as required
PATCH_GRID_SIZE = 16 # Grid size for occlusion (16x16 patches)

# Model specifics
SMOLVLM_MODEL_NAME = "HuggingFaceTB/SmolVLM-Instruct" # Will be downloaded from Hub

# --- Custom Model Configuration ---
# *** IMPORTANT: You MUST set the path to your saved model weights file below ***
CUSTOM_MODEL_PATH = "/kaggle/input/image-captioning-vit-gpt/transformers/default/1/model.pth" # <--- SET THIS PATH (e.g., "/kaggle/working/custom_caption_model.pth" or "/kaggle/input/my-trained-model/custom_caption_model.pth")

# Names for downloading/loading the base encoder/decoder from Hugging Face Hub
# (These will be downloaded if not already cached by Kaggle/HF)
CUSTOM_ENCODER_NAME = "WinKawaks/vit-small-patch16-224"
CUSTOM_DECODER_NAME = "gpt2"
# Use the decoder name to load the corresponding tokenizer, assuming they match (common for GPT-2)
CUSTOM_TOKENIZER_PATH = CUSTOM_DECODER_NAME # Correctly uses the variable

# --- NLTK Downloads (ensure they run at least once) ---
print("Downloading NLTK resources (if needed)...")
try:
    # Use nltk.data.find to check before downloading if possible, might save time on reruns
    nltk.data.find('corpora/wordnet.zip')
except nltk.downloader.DownloadError:
    nltk.download('wordnet', quiet=True)
try:
    nltk.data.find('tokenizers/punkt')
except nltk.downloader.DownloadError:
    nltk.download('punkt', quiet=True)
try:
    nltk.data.find('corpora/omw-1.4.zip')
except nltk.downloader.DownloadError:
    nltk.download('omw-1.4', quiet=True) # Needed for METEOR's wordnet lookup
print("NLTK resources checked/downloaded.")

# --- Initialize Metric Calculators ---
SMOOTHIE = SmoothingFunction().method4 # For BLEU calculation stability
ROUGE = rouge_scorer.RougeScorer(['rougeL'], use_stemmer=True) # For ROUGE-L F1

print("-" * 50)
print(f"Configuration:")
print(f"  Device: {DEVICE}")
print(f"  Test CSV: {TEST_CSV_PATH}")
print(f"  Test Images: {TEST_IMAGE_DIR}")
print(f"  Occlusion Levels: {OCCLUSION_LEVELS}")
print(f"  SmolVLM Name: {SMOLVLM_MODEL_NAME}")
print(f"  Custom Encoder: {CUSTOM_ENCODER_NAME}")
print(f"  Custom Decoder: {CUSTOM_DECODER_NAME}")
print(f"  Custom Tokenizer: {CUSTOM_TOKENIZER_PATH}")
if not CUSTOM_MODEL_PATH:
    print("  WARNING: CUSTOM_MODEL_PATH is not set. Custom model evaluation will fail.")
else:
    print(f"  Custom Weights: {CUSTOM_MODEL_PATH}")
print("-" * 50)

# --- Define the torchvision transform from your Part A ---
# Moved here to ensure it's defined early
print("Defining custom model image transform (using torchvision)...")
custom_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    # Using 0.5, 0.5 for mean, std assumes images were normalized this way during ViT pre-training
    # or that you fine-tuned ViT with this normalization. Double-check if needed.
    transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
])
print("Custom transform defined.")

Downloading NLTK resources (if needed)...
NLTK resources checked/downloaded.
--------------------------------------------------
Configuration:
  Device: cuda
  Test CSV: /kaggle/input/image-captioning-dataset/custom_captions_dataset/test.csv
  Test Images: /kaggle/input/image-captioning-dataset/custom_captions_dataset/test
  Occlusion Levels: [10, 50, 80]
  SmolVLM Name: HuggingFaceTB/SmolVLM-Instruct
  Custom Encoder: WinKawaks/vit-small-patch16-224
  Custom Decoder: gpt2
  Custom Tokenizer: gpt2
  Custom Weights: /kaggle/input/image-captioning-vit-gpt/transformers/default/1/model.pth
--------------------------------------------------
Defining custom model image transform (using torchvision)...
Custom transform defined.


In [16]:
def occlude_image(image: np.array, mask_percentage: int) -> np.array:
    """
    Applies patch-wise occlusion to an image by setting pixel values to black.

    Args:
        image (np.array): Input image as a NumPy array (H, W, C).
                          Assumes RGB channel order if C=3.
        mask_percentage (int): Percentage of patches to mask (0-100).

    Returns:
        np.array: Occluded image as a NumPy array with the same dimensions.
                  Returns a copy of the original if mask_percentage is 0.
    """
    if not isinstance(image, np.ndarray):
        raise TypeError("Input image must be a NumPy array.")
    if not (0 <= mask_percentage <= 100):
        raise ValueError("mask_percentage must be between 0 and 100")

    # Return a copy if no occlusion is needed
    if mask_percentage == 0:
        return image.copy()

    img_h, img_w = image.shape[:2]

    # Calculate patch dimensions based on the 16x16 grid requirement
    patch_h = img_h // PATCH_GRID_SIZE
    patch_w = img_w // PATCH_GRID_SIZE

    # Handle cases where the image is too small for the grid
    if patch_h == 0 or patch_w == 0:
         print(f"Warning: Image dimensions ({img_h}x{img_w}) are too small "
               f"to create meaningful patches for a {PATCH_GRID_SIZE}x{PATCH_GRID_SIZE} grid. "
               f"Skipping occlusion for this image.")
         return image.copy()

    num_patches_h = PATCH_GRID_SIZE
    num_patches_w = PATCH_GRID_SIZE
    total_patches = num_patches_h * num_patches_w

    # Calculate the number of patches to mask, ensuring at least one if percentage > 0
    num_patches_to_mask = int(round(total_patches * (mask_percentage / 100.0)))
    if num_patches_to_mask == 0 and mask_percentage > 0:
        num_patches_to_mask = 1 # Guarantee at least one patch is masked

    # Ensure we don't try to mask more patches than exist
    num_patches_to_mask = min(num_patches_to_mask, total_patches)

    # Generate all possible patch indices (row, column) from 0 to 15
    all_patch_indices = [(r, c) for r in range(num_patches_h) for c in range(num_patches_w)]

    # Randomly select the indices of the patches to mask without replacement
    indices_to_mask = random.sample(all_patch_indices, num_patches_to_mask)

    # Create a copy of the image to modify
    occluded_img = image.copy()

    # Iterate through the selected patch indices and apply the mask
    for r_idx, c_idx in indices_to_mask:
        # Calculate pixel coordinates for the top-left corner of the patch
        start_row = r_idx * patch_h
        start_col = c_idx * patch_w

        # Calculate pixel coordinates for the bottom-right corner (exclusive)
        # Use original image dimensions to handle potential non-perfect divisions correctly
        end_row = start_row + patch_h
        end_col = start_col + patch_w

        # Ensure end points do not exceed image boundaries
        end_row = min(end_row, img_h)
        end_col = min(end_col, img_w)

        # Set the selected patch region to black (0)
        # Works for grayscale (H, W) or color (H, W, C) images
        occluded_img[start_row:end_row, start_col:end_col] = 0

    return occluded_img

# --- Optional: Example Usage & Visualization ---
# Uncomment below to test the function with a sample image if needed
# import matplotlib.pyplot as plt
# try:
#     # Load a sample image (make sure TEST_CSV_PATH and TEST_IMAGE_DIR are set)
#     sample_df = pd.read_csv(TEST_CSV_PATH)
#     if not sample_df.empty:
#         sample_img_path = os.path.join(TEST_IMAGE_DIR, sample_df.iloc[0][FILENAME_COL])
#         if os.path.exists(sample_img_path):
#             sample_pil_image = Image.open(sample_img_path).convert("RGB")
#             sample_np_image = np.array(sample_pil_image)

#             # Apply occlusion (e.g., 50%)
#             occluded_50 = occlude_image(sample_np_image, 50)
#             occluded_10 = occlude_image(sample_np_image, 10)
#             occluded_80 = occlude_image(sample_np_image, 80)

#             # Display using matplotlib
#             fig, ax = plt.subplots(1, 4, figsize=(20, 5))
#             ax[0].imshow(sample_np_image)
#             ax[0].set_title("Original")
#             ax[0].axis('off')
#             ax[1].imshow(occluded_10)
#             ax[1].set_title("10% Occluded")
#             ax[1].axis('off')
#             ax[2].imshow(occluded_50)
#             ax[2].set_title("50% Occluded")
#             ax[2].axis('off')
#             ax[3].imshow(occluded_80)
#             ax[3].set_title("80% Occluded")
#             ax[3].axis('off')
#             plt.tight_layout()
#             plt.show()

#             # You can convert back to PIL if needed for model input:
#             # occluded_pil = Image.fromarray(occluded_50)
#         else:
#             print(f"Sample image not found: {sample_img_path}")
#     else:
#         print("Test CSV is empty or not found.")

# except Exception as e:
#      print(f"Error during example usage: {e}")

In [17]:
import pandas as pd
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import os
import torch
# Ensure other necessary imports like ViTImageProcessor, transforms are present from previous cells
from transformers import ViTImageProcessor


# --- Dataset Definition ---
# (Using the version you provided in the last message)
class ImageCaptionDataset(Dataset):
    """
    Loads image paths and captions, providing PIL images and optionally transformed tensors.
    """
    def __init__(self, csv_path, image_dir, filename_col, caption_col, transform=None, dataframe=None):
        """
        Args:
            csv_path (string, optional): Path to the csv file. Used if dataframe is None.
            image_dir (string): Directory with all the images.
            filename_col (string): Name of the column with image filenames.
            caption_col (string): Name of the column with ground truth captions.
            transform (callable, optional): Optional transform for the custom model's input.
            dataframe (pd.DataFrame, optional): Use this pre-loaded DataFrame instead of reading csv_path.
        """
        if dataframe is not None:
            # Use the provided dataframe directly (make a copy)
            self.data_frame = dataframe.copy()
            print(f"Dataset initialized using provided DataFrame.")
        elif csv_path is not None:
            # Load from CSV path if dataframe not provided
            print(f"Loading DataFrame from CSV: {csv_path}")
            try:
                self.data_frame = pd.read_csv(csv_path)
            except FileNotFoundError:
                print(f"Error: CSV file not found at {csv_path}")
                self.data_frame = pd.DataFrame(columns=[filename_col, caption_col]) # Empty df
        else:
            raise ValueError("Must provide either csv_path or dataframe to ImageCaptionDataset")


        self.image_dir = image_dir
        self.transform = transform
        self.filename_col = filename_col
        self.caption_col = caption_col

        # --- Data Cleaning and Validation ---
        initial_count = len(self.data_frame)
        if initial_count > 0:
            print(f"Initial entries before validation: {initial_count}")
            self.data_frame.dropna(subset=[self.filename_col, self.caption_col], inplace=True)
            self.data_frame = self.data_frame[
                self.data_frame[self.caption_col].apply(lambda x: isinstance(x, str) and len(x.strip()) > 0)
            ]
            self.data_frame['full_path'] = self.data_frame[self.filename_col].apply(
                lambda x: os.path.join(self.image_dir, str(x))
            )
            # Check file existence - can be slow, consider sampling check if needed
            print("Validating image file paths...")
            self.data_frame['exists'] = self.data_frame['full_path'].apply(os.path.exists)
            self.data_frame = self.data_frame[self.data_frame['exists']]
            self.data_frame.drop(columns=['exists'], inplace=True)

            final_count = len(self.data_frame)
            print(f"Entries after validation (paths exist, captions valid): {final_count}")
            if final_count == 0:
                 print(f"Warning: Dataset has 0 valid entries after validation.")
        else:
             print(f"Warning: Initial DataFrame has 0 entries.")


    def __len__(self):
        return len(self.data_frame)

    def __getitem__(self, idx):
        # (Your __getitem__ logic remains the same)
        if torch.is_tensor(idx): idx = idx.tolist()
        row = self.data_frame.iloc[idx]
        img_full_path = row['full_path']
        caption = row[self.caption_col]
        filename = row[self.filename_col]
        try:
            pil_image = Image.open(img_full_path).convert('RGB')
        except Exception as e:
            return None, None, None, None # Handled by collate_fn
        transformed_tensor = None
        if self.transform:
            try:
                transformed_tensor = self.transform(pil_image)
            except Exception as e:
                 # print(f"Warning: Transform failed for {filename}: {e}") # Optional debug
                 pass # Keep tensor as None
        return pil_image, caption, transformed_tensor, filename


# --- Custom Collate Function ---
# (Using the version you provided - it handles None correctly)
def custom_collate_fn(batch):
    original_batch_size = len(batch)
    batch = [item for item in batch if item[0] is not None]
    filtered_batch_size = len(batch)
    if filtered_batch_size == 0: return None, None, None, None
    pil_images, captions, transformed_tensors_list, filenames = zip(*batch)
    valid_tensors = [t for t in transformed_tensors_list if t is not None]
    transformed_batch = None
    if valid_tensors:
        try:
            # Only stack if the number of valid tensors matches the filtered batch size
            if len(valid_tensors) == filtered_batch_size:
                 transformed_batch = torch.stack(valid_tensors)
            # else: # Optional: print warning if some transforms failed in the batch
            #      print(f"Warning: Collate received {filtered_batch_size} valid images but only {len(valid_tensors)} valid tensors.")
        except Exception as e:
            print(f"Error stacking transformed tensors in collate_fn: {e}. Batch tensor will be None.")
    return list(pil_images), list(captions), transformed_batch, list(filenames)


# --- Define Transforms for Custom Model ---
# (Using ViTImageProcessor as in your provided code)
# Ensure necessary constants (CUSTOM_ENCODER_NAME) are defined from the setup cell
custom_transform = None # Initialize
try:
    if 'CUSTOM_ENCODER_NAME' not in globals(): raise NameError("CUSTOM_ENCODER_NAME not defined.")
    print(f"Loading ViTImageProcessor for: {CUSTOM_ENCODER_NAME}")
    custom_image_processor = ViTImageProcessor.from_pretrained(CUSTOM_ENCODER_NAME)
    custom_transform = lambda pil_img: custom_image_processor(
        images=pil_img, return_tensors="pt"
    ).pixel_values.squeeze(0) # Remove batch dim
    print("Custom model transform defined successfully using ViTImageProcessor.")
except Exception as e:
    print(f"ERROR: Failed to load ViTImageProcessor from '{CUSTOM_ENCODER_NAME}'. Error: {e}")
    print("Custom transform will be None. Custom model evaluation likely to fail.")


# --- Create LIMITED Dataset and DataLoader ---
NUM_IMAGES_TO_PROCESS = 200
print(f"\n--- Creating LIMITED Dataset/DataLoader for first {NUM_IMAGES_TO_PROCESS} images ---")

# Ensure necessary constants are defined
required_vars = ['TEST_CSV_PATH', 'TEST_IMAGE_DIR', 'FILENAME_COL', 'CAPTION_COL', 'BATCH_SIZE', 'DEVICE']
for var in required_vars:
    if var not in globals(): raise NameError(f"Constant '{var}' is not defined.")

test_dataloader = None
test_dataset = None
subset_df = None # Initialize

try:
    # 1. Load the full DataFrame from CSV
    print(f"Loading full DataFrame from {TEST_CSV_PATH}...")
    full_df = pd.read_csv(TEST_CSV_PATH)
    print(f"Full DataFrame loaded with {len(full_df)} rows.")

    # 2. Select the first N rows that potentially exist
    if len(full_df) == 0:
         print("Warning: Full DataFrame is empty. No data to process.")
    else:
        if len(full_df) >= NUM_IMAGES_TO_PROCESS:
            subset_df = full_df.head(NUM_IMAGES_TO_PROCESS).copy()
            print(f"Selected first {len(subset_df)} rows for potential evaluation.")
        else:
            subset_df = full_df.copy()
            print(f"Selected all {len(subset_df)} available rows (less than {NUM_IMAGES_TO_PROCESS}).")

    # 3. Create the dataset using the SUBSET DataFrame (if subset_df is not None)
    if subset_df is not None and not subset_df.empty:
        # Pass the subset DataFrame directly to the modified Dataset constructor
        test_dataset = ImageCaptionDataset(
            image_dir=TEST_IMAGE_DIR,
            filename_col=FILENAME_COL,
            caption_col=CAPTION_COL,
            transform=custom_transform, # Pass the transform defined above
            dataframe=subset_df,        # Pass the filtered DataFrame here
            csv_path=None               # Explicitly set csv_path to None
        )

        # 4. Create the DataLoader ONLY if the dataset is valid and non-empty
        if len(test_dataset) > 0:
            test_dataloader = DataLoader(
                test_dataset,
                batch_size=BATCH_SIZE,
                shuffle=False,
                num_workers=2,
                collate_fn=custom_collate_fn,
                pin_memory=True if DEVICE == "cuda" else False
            )
            print(f"LIMITED Test DataLoader created. Samples after validation: {len(test_dataset)}, Batches: {len(test_dataloader)}")
        else:
            print("ERROR: The limited dataset has 0 valid entries after validation (check image paths/captions in the first 200 rows). Cannot create DataLoader.")
    else:
         print("Skipping Dataset/DataLoader creation as the subset DataFrame is empty or None.")


except FileNotFoundError:
     print(f"ERROR: Test CSV not found at {TEST_CSV_PATH}. Cannot create DataLoader.")
except Exception as e:
     print(f"ERROR creating limited dataset/dataloader: {e}")

# Check if dataloader was created successfully
print(f"\nDataLoader 'test_dataloader' ready for use: {'Yes' if test_dataloader is not None else 'No'}")

Loading ViTImageProcessor for: WinKawaks/vit-small-patch16-224


preprocessor_config.json:   0%|          | 0.00/160 [00:00<?, ?B/s]

Custom model transform defined successfully using ViTImageProcessor.

--- Creating LIMITED Dataset/DataLoader for first 200 images ---
Loading full DataFrame from /kaggle/input/image-captioning-dataset/custom_captions_dataset/test.csv...
Full DataFrame loaded with 928 rows.
Selected first 200 rows for potential evaluation.
Dataset initialized using provided DataFrame.
Initial entries before validation: 200
Validating image file paths...
Entries after validation (paths exist, captions valid): 200
LIMITED Test DataLoader created. Samples after validation: 200, Batches: 25

DataLoader 'test_dataloader' ready for use: Yes


In [18]:
# Ensure necessary base imports are present
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformers import ViTModel, GPT2LMHeadModel, AutoTokenizer, AutoProcessor, AutoModelForVision2Seq
from PIL import Image
from torchvision import transforms # Needed for the custom transform
import os # Needed for file path checks

# --- User-Provided Custom Model Definition ---
# (Pasted directly from your input)
class ImageCaptionModel(nn.Module):
    def __init__(self, encoder, decoder, processor, tokenizer, embed_dim=768):
        super(ImageCaptionModel, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.processor = processor # Store if needed, otherwise can be None
        self.tokenizer = tokenizer # Store tokenizer
        # Linear layer to map encoder output dim to decoder input dim
        self.encoder_to_decoder = nn.Linear(encoder.config.hidden_size, embed_dim)
        print(f"Initialized ImageCaptionModel: Encoder hidden size {encoder.config.hidden_size}, Decoder embed dim {embed_dim}")

    def forward(self, images, input_ids, attention_mask=None):
        # Forward pass used for training (calculates loss)
        with torch.no_grad(): # Usually encoder fine-tuning is off during captioning training
            encoder_outputs = self.encoder(pixel_values=images).last_hidden_state
        image_features = encoder_outputs.mean(dim=1) # Average pooling as per your code
        image_embeds = self.encoder_to_decoder(image_features).unsqueeze(1)
        decoder_input_embeds = self.decoder.transformer.wte(input_ids)
        decoder_inputs_embeds = torch.cat([image_embeds, decoder_input_embeds], dim=1)
        final_attention_mask = None
        if attention_mask is not None:
            bos_mask = torch.ones((attention_mask.size(0), 1), dtype=attention_mask.dtype, device=attention_mask.device)
            final_attention_mask = torch.cat([bos_mask, attention_mask], dim=1)
        decoder_outputs = self.decoder(inputs_embeds=decoder_inputs_embeds, attention_mask=final_attention_mask)
        logits = decoder_outputs.logits[:, 1:] # Slice off prediction based on image embed
        labels = input_ids.clone()
        labels[labels == self.tokenizer.pad_token_id] = -100
        logits_flat = logits.contiguous().view(-1, logits.size(-1))
        labels_flat = labels.contiguous().view(-1)
        loss = F.cross_entropy(logits_flat, labels_flat)
        return loss

    def generate(self, image_tensor, tokenizer, max_length=30):
        # Generation method used for inference
        self.eval()
        if image_tensor.ndim == 3:
             image_tensor = image_tensor.unsqueeze(0)
        image_tensor = image_tensor.to(next(self.parameters()).device)
        with torch.no_grad():
            encoder_outputs = self.encoder(pixel_values=image_tensor).last_hidden_state
            image_features = encoder_outputs.mean(dim=1)
            encoder_embeds = self.encoder_to_decoder(image_features).unsqueeze(1)
            # Use tokenizer's BOS or EOS as starting token
            start_token = tokenizer.bos_token_id if tokenizer.bos_token_id is not None else tokenizer.eos_token_id
            if start_token is None:
                print("ERROR in generate: Cannot find BOS or EOS token ID.")
                return ""
            generated_ids = [start_token]
            for _ in range(max_length - 1):
                input_ids_tensor = torch.tensor([generated_ids], device=image_tensor.device)
                decoder_input_embeds = self.decoder.transformer.wte(input_ids_tensor)
                decoder_inputs_embeds = torch.cat([encoder_embeds, decoder_input_embeds], dim=1)
                attn_mask = torch.ones(decoder_inputs_embeds.shape[:2], device=image_tensor.device)
                outputs = self.decoder(inputs_embeds=decoder_inputs_embeds, attention_mask=attn_mask)
                next_token_logits = outputs.logits[:, -1, :]
                next_token_id = torch.argmax(next_token_logits, dim=-1).item()
                generated_ids.append(next_token_id)
                if next_token_id == tokenizer.eos_token_id:
                    break
            caption = tokenizer.decode(generated_ids, skip_special_tokens=True)
            return caption.strip()
# === End of Pasted Class Definition ===


# --- Load Pre-trained Models ---

# 1. Load SmolVLM (Downloads from Hub)
print("Loading SmolVLM model and processor...")
# Check if constants are defined from previous cell
if 'SMOLVLM_MODEL_NAME' not in globals(): raise NameError("SMOLVLM_MODEL_NAME not defined.")
if 'DEVICE' not in globals(): raise NameError("DEVICE not defined.")

smol_model = None
smol_processor = None
try:
    smol_processor = AutoProcessor.from_pretrained(SMOLVLM_MODEL_NAME)
    smol_model = AutoModelForVision2Seq.from_pretrained(
        SMOLVLM_MODEL_NAME,
        torch_dtype=torch.bfloat16 if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else torch.float16,
        low_cpu_mem_usage=True, # Helps on systems with less CPU RAM
        _attn_implementation="eager" # Use eager if flash attention isn't available/working
    ).to(DEVICE)
    smol_model.eval() # Set to evaluation mode
    print("SmolVLM loaded successfully.")
except Exception as e:
    print(f"ERROR: Failed to load SmolVLM model '{SMOLVLM_MODEL_NAME}': {e}")
    # Keep smol_model and smol_processor as None

# 2. Load Custom Model Components (Downloads base models from Hub)
print("\nLoading Custom Model components...")
custom_model = None
custom_tokenizer = None
# Check if constants are defined
if 'CUSTOM_TOKENIZER_PATH' not in globals(): raise NameError("CUSTOM_TOKENIZER_PATH not defined.")
if 'CUSTOM_ENCODER_NAME' not in globals(): raise NameError("CUSTOM_ENCODER_NAME not defined.")
if 'CUSTOM_DECODER_NAME' not in globals(): raise NameError("CUSTOM_DECODER_NAME not defined.")
if 'CUSTOM_MODEL_PATH' not in globals(): raise NameError("CUSTOM_MODEL_PATH not defined.")

try:
    # Load Tokenizer (Downloads from Hub using the specified name)
    print(f"Loading custom tokenizer from Hub: {CUSTOM_TOKENIZER_PATH}")
    custom_tokenizer = AutoTokenizer.from_pretrained(CUSTOM_TOKENIZER_PATH)
    # Set PAD token if missing (GPT-2 often needs this)
    if custom_tokenizer.pad_token is None:
        if custom_tokenizer.eos_token is not None:
            custom_tokenizer.pad_token = custom_tokenizer.eos_token
            print(f"Set custom tokenizer PAD token to EOS token: {custom_tokenizer.eos_token} ({custom_tokenizer.eos_token_id})")
        else:
            # Add a pad token if EOS is also missing (less common)
             custom_tokenizer.add_special_tokens({'pad_token': '[PAD]'})
             print("Added '[PAD]' as PAD token.")
             # Resize decoder embeddings if a new token was added
             # resize_needed = True # Flag this for later
    if custom_tokenizer.pad_token is None:
         raise ValueError("Custom tokenizer needs a PAD token (set to EOS or added manually).")


    # Load the pre-trained base encoder and decoder models from Hub
    print(f"Loading pre-trained encoder from Hub: {CUSTOM_ENCODER_NAME}")
    encoder = ViTModel.from_pretrained(CUSTOM_ENCODER_NAME).to(DEVICE)

    print(f"Loading pre-trained decoder from Hub: {CUSTOM_DECODER_NAME}")
    decoder = GPT2LMHeadModel.from_pretrained(CUSTOM_DECODER_NAME).to(DEVICE)

    # Instantiate YOUR custom model class using the pre-trained parts
    print("Instantiating custom ImageCaptionModel architecture...")
    custom_model = ImageCaptionModel(
        encoder=encoder,
        decoder=decoder,
        processor=None, # Pass None as processor is handled externally by transform
        tokenizer=custom_tokenizer
    ).to(DEVICE)
    print("Custom model architecture instantiated.")

    # --- Load your fine-tuned weights ---
    if not CUSTOM_MODEL_PATH:
        print("\nWARNING: CUSTOM_MODEL_PATH is empty. Skipping loading of fine-tuned weights.")
        print("         Custom model will use pre-trained weights only (likely poor performance).")
    elif not os.path.isfile(CUSTOM_MODEL_PATH):
         print(f"\nERROR: Custom model weights file not found at '{CUSTOM_MODEL_PATH}'.")
         print("         Custom model will use pre-trained weights only.")
         custom_model = None # Set to None to prevent errors later if weights are essential
    else:
        try:
            print(f"Loading fine-tuned weights from: {CUSTOM_MODEL_PATH}")
            custom_model.load_state_dict(torch.load(CUSTOM_MODEL_PATH, map_location=DEVICE))
            custom_model.eval() # Set to evaluation mode after loading weights
            print("Custom Model fine-tuned weights applied successfully.")
        except RuntimeError as e:
             print(f"\nERROR: Runtime error loading custom model weights from '{CUSTOM_MODEL_PATH}'.")
             print(f"       Check if the saved weights match the current model architecture (encoder/decoder names). Error: {e}")
             print("       Custom model evaluation might fail or use only pre-trained weights.")
             # Decide whether to proceed with base weights or stop:
             # custom_model = None # Option: Stop if weights don't load
        except Exception as e:
             print(f"\nERROR: An unexpected error occurred loading custom model weights: {e}")
             custom_model = None # Stop if weights don't load


except Exception as e:
    print(f"\nERROR: An critical error occurred during custom model component loading: {e}")
    custom_model = None # Ensure model is None if setup failed


# --- Inference Helper Functions ---

# 1. SmolVLM Caption Generation
def generate_caption_smolvlm(pil_image, model, processor, device):
    """Generates a caption for a PIL image using the SmolVLM model."""
    # Check if model and processor were loaded successfully
    if model is None or processor is None or pil_image is None:
        # print("Skipping SmolVLM generation: model/processor missing.") # Optional debug
        return ""
    try:
        messages = [{"role": "user", "content": [{"type": "image"}, {"type": "text", "text": "Describe this image briefly."}]}]
        prompt = processor.apply_chat_template(messages, add_generation_prompt=True)
        inputs = processor(text=prompt, images=[pil_image], return_tensors="pt").to(device, model.dtype) # Use model's dtype for inputs
        with torch.no_grad():
            output_ids = model.generate(
                **inputs,
                max_new_tokens=128, # Max caption length
                do_sample=False,    # Greedy decoding
                pad_token_id=processor.tokenizer.pad_token_id
            )
        raw_output = processor.batch_decode(output_ids, skip_special_tokens=True)[0].strip()
        marker = "Assistant:"
        caption = raw_output.split(marker, 1)[-1].strip() if marker in raw_output else raw_output
        return caption
    except Exception as e:
        # print(f"SmolVLM inference error: {e}") # Uncomment for debugging
        return ""

# 2. Custom Model Caption Generation
def generate_caption_custom(image_tensor, model, tokenizer, device, max_length=30):
    """
    Generates a caption for a single transformed image tensor using the custom model's
    own .generate() method. image_tensor should be the output of custom_transform.
    """
    # Check if model and tokenizer were loaded successfully
    if model is None or tokenizer is None or image_tensor is None:
        # print("Skipping custom generation: model/tokenizer missing.") # Optional debug
        return ""
    try:
        # Ensure input tensor is on the correct device
        image_tensor = image_tensor.to(device)
        # Call the model's implemented generate method
        caption = model.generate(image_tensor, tokenizer, max_length=max_length)
        return caption
    except Exception as e:
        # print(f"Custom model inference error: {e}") # Uncomment for debugging
        return ""

print("\nInference helper functions defined.")
# Final check if models are ready for evaluation
print(f"SmolVLM model ready: {'Yes' if smol_model and smol_processor else 'No'}")
print(f"Custom model ready: {'Yes' if custom_model and custom_tokenizer else 'No'}")

Loading SmolVLM model and processor...
SmolVLM loaded successfully.

Loading Custom Model components...
Loading custom tokenizer from Hub: gpt2
Set custom tokenizer PAD token to EOS token: <|endoftext|> (50256)
Loading pre-trained encoder from Hub: WinKawaks/vit-small-patch16-224


config.json:   0%|          | 0.00/69.7k [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/88.2M [00:00<?, ?B/s]

Some weights of ViTModel were not initialized from the model checkpoint at WinKawaks/vit-small-patch16-224 and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Loading pre-trained decoder from Hub: gpt2


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

Instantiating custom ImageCaptionModel architecture...
Initialized ImageCaptionModel: Encoder hidden size 384, Decoder embed dim 768
Custom model architecture instantiated.
Loading fine-tuned weights from: /kaggle/input/image-captioning-vit-gpt/transformers/default/1/model.pth

ERROR: Runtime error loading custom model weights from '/kaggle/input/image-captioning-vit-gpt/transformers/default/1/model.pth'.
       Check if the saved weights match the current model architecture (encoder/decoder names). Error: Error(s) in loading state_dict for ImageCaptionModel:
	Missing key(s) in state_dict: "encoder.embeddings.cls_token", "encoder.embeddings.position_embeddings", "encoder.embeddings.patch_embeddings.projection.weight", "encoder.embeddings.patch_embeddings.projection.bias", "encoder.encoder.layer.0.attention.attention.query.weight", "encoder.encoder.layer.0.attention.attention.query.bias", "encoder.encoder.layer.0.attention.attention.key.weight", "encoder.encoder.layer.0.attention.attent

In [19]:
def evaluate_on_occluded_images(model,
                                dataloader,
                                device,
                                occlusion_levels,
                                is_smolvlm=True,          # Flag to determine model type
                                smol_processor=None,      # Required if is_smolvlm=True
                                custom_tokenizer=None,    # Required if is_smolvlm=False
                                custom_img_transform=None # Required if is_smolvlm=False
                               ):
    """
    Evaluates a model's captioning performance on images with varying occlusion levels.

    Args:
        model (torch.nn.Module): The image captioning model (SmolVLM or Custom).
        dataloader (DataLoader): DataLoader providing batches of
                                 (pil_images, captions, transformed_tensors, filenames).
        device (str): Device ('cuda' or 'cpu').
        occlusion_levels (list): List of percentages for occlusion (e.g., [10, 50, 80]).
        is_smolvlm (bool): True if evaluating SmolVLM, False for the custom model.
        smol_processor: The processor for SmolVLM (needed if is_smolvlm=True).
        custom_tokenizer: The tokenizer for the custom model (needed if is_smolvlm=False).
        custom_img_transform: The image transformation function for the custom model
                             (needed if is_smolvlm=False to process occluded images).

    Returns:
        tuple: (
            dict: Dictionary mapping occlusion levels (including 0) to average scores.
                  Example: {0: {'BLEU': 0.X, ...}, 10: {'BLEU': 0.Y, ...}, ...}
            list: List of dictionaries containing data for Part C analysis.
                  Each dict: {'original_caption': str, 'generated_caption': str,
                              'perturbation_percentage': int, 'filename': str,
                              'model_type': str} # model_type added outside this function
        )
    """
    if is_smolvlm and smol_processor is None:
        raise ValueError("SmolVLM processor is required when is_smolvlm is True.")
    if not is_smolvlm and (custom_tokenizer is None or custom_img_transform is None):
        raise ValueError("Custom tokenizer and image transform are required when is_smolvlm is False.")

    model.eval() # Ensure model is in evaluation mode

    # Store results per level {level: {metric: [list_of_scores]}}
    results_list = {level: {'BLEU': [], 'METEOR': [], 'ROUGE-L': []} for level in [0] + occlusion_levels}
    part_c_data_list = [] # Store data for the classifier task

    # --- Iterate through each required occlusion level (plus 0% baseline) ---
    for level in [0] + occlusion_levels:
        print(f"\n--- Evaluating Occlusion Level: {level}% ---")

        # --- Iterate through batches from the dataloader ---
        for batch_data in tqdm(dataloader, desc=f"Level {level}% Batches"):
            if batch_data is None or batch_data[0] is None: # Check if batch is valid (collate_fn returns None on empty)
                # print("Skipping empty or invalid batch.")
                continue

            pil_images, gt_captions, transformed_batch, filenames = batch_data

            # --- Process each item within the batch ---
            for i in range(len(pil_images)):
                original_pil = pil_images[i]
                gt_caption = gt_captions[i]
                filename = filenames[i]

                # --- Prepare Image Input based on Occlusion Level and Model Type ---
                try:
                    if level == 0:
                        # No occlusion: Use original PIL for SmolVLM, pre-transformed tensor for Custom
                        if is_smolvlm:
                            input_image_for_model = original_pil
                        else:
                            if transformed_batch is None or i >= len(transformed_batch):
                                 # print(f"Warning: Missing transformed tensor for {filename} at level 0. Skipping.")
                                 continue # Skip if transform failed earlier for this item
                            input_image_for_model = transformed_batch[i] # Already a tensor
                    else:
                        # Apply occlusion
                        np_image = np.array(original_pil)
                        occluded_np_image = occlude_image(np_image, level)
                        occluded_pil_image = Image.fromarray(occluded_np_image) # Convert back to PIL

                        if is_smolvlm:
                            input_image_for_model = occluded_pil_image # SmolVLM takes PIL
                        else:
                            # Custom model needs the occluded PIL image transformed
                            input_image_for_model = custom_img_transform(occluded_pil_image) # Apply transform

                except Exception as e:
                    print(f"Error processing/occluding image {filename} at level {level}%: {e}. Skipping sample.")
                    continue # Skip this sample if occlusion/processing fails

                # --- Generate Caption ---
                pred_caption = "" # Initialize in case generation fails
                try:
                    if is_smolvlm:
                        if smol_model: # Check if model loaded successfully
                             pred_caption = generate_caption_smolvlm(input_image_for_model, smol_model, smol_processor, device)
                    else:
                        if custom_model: # Check if model loaded successfully
                             # Pass the transformed tensor (either original or occluded+transformed)
                             pred_caption = generate_caption_custom(input_image_for_model, custom_model, custom_tokenizer, device)
                except Exception as e:
                     print(f"Error during caption generation for {filename} at level {level}%: {e}")
                     # pred_caption remains ""


                # --- Calculate Metrics ---
                bleu_score, meteor_score_val, rouge_l_score = 0.0, 0.0, 0.0 # Default scores
                if gt_caption and pred_caption: # Only calculate if both are non-empty
                    try:
                        # Tokenize for BLEU and METEOR
                        ref_tokens = word_tokenize(gt_caption.lower())
                        pred_tokens = word_tokenize(pred_caption.lower())

                        # Avoid errors with empty lists after tokenization
                        if ref_tokens and pred_tokens:
                            # BLEU Score
                            bleu_score = sentence_bleu([ref_tokens], pred_tokens, smoothing_function=SMOOTHIE)

                            # METEOR Score (requires string inputs)
                            try:
                                meteor_score_val = meteor_score([' '.join(ref_tokens)], ' '.join(pred_tokens))
                            except Exception as meteor_err:
                                # print(f"Meteor calc error for {filename}: {meteor_err}")
                                meteor_score_val = 0.0 # Assign 0 if calculation fails

                            # ROUGE-L Score (requires string inputs)
                            try:
                                rouge_scores = ROUGE.score(gt_caption, pred_caption)
                                rouge_l_score = rouge_scores['rougeL'].fmeasure
                            except Exception as rouge_err:
                                # print(f"Rouge calc error for {filename}: {rouge_err}")
                                rouge_l_score = 0.0 # Assign 0 if calculation fails
                        # else: print(f"Warning: Empty tokens for {filename}") # Optional debug

                    except Exception as metric_err:
                        # Catch any other unexpected metric errors
                        # print(f"General metric calculation error for {filename}: {metric_err}")
                        bleu_score, meteor_score_val, rouge_l_score = 0.0, 0.0, 0.0

                # Append scores for the current level
                results_list[level]['BLEU'].append(bleu_score)
                results_list[level]['METEOR'].append(meteor_score_val)
                results_list[level]['ROUGE-L'].append(rouge_l_score)

                # --- Store Data for Part C (only for specified occlusion levels > 0) ---
                if level in occlusion_levels: # Store only for 10, 50, 80% etc.
                    part_c_data_list.append({
                        'original_caption': gt_caption,
                        'generated_caption': pred_caption if pred_caption else "", # Ensure empty string if generation failed
                        'perturbation_percentage': level,
                        'filename': filename
                        # 'model_type' will be added after calling this function
                    })
        # --- End of Batch Loop ---
    # --- End of Occlusion Level Loop ---

    # --- Calculate Average Scores ---
    avg_results = {level: {} for level in [0] + occlusion_levels}
    for level in results_list:
        for metric in results_list[level]:
            scores = results_list[level][metric]
            avg_results[level][metric] = np.mean(scores) if scores else 0.0
        print(f"Level {level}% Avg Scores: "
              f"BLEU={avg_results[level]['BLEU']:.4f}, "
              f"METEOR={avg_results[level]['METEOR']:.4f}, "
              f"ROUGE-L={avg_results[level]['ROUGE-L']:.4f}")


    return avg_results, part_c_data_list

In [7]:
# PART 1: SmolVLM Evaluation Execution

print("\nExecuting Part B - Step 1: SmolVLM Evaluation")

# --- Helper Function to Calculate Performance Changes ---
# (Include this helper in both parts or define it globally earlier)
def calculate_changes(results_raw):
    changes = {level: {} for level in OCCLUSION_LEVELS}
    baseline = results_raw.get(0, {'BLEU': 0.0, 'METEOR': 0.0, 'ROUGE-L': 0.0})
    for level in OCCLUSION_LEVELS:
        current = results_raw.get(level, {'BLEU': 0.0, 'METEOR': 0.0, 'ROUGE-L': 0.0})
        changes[level]['BLEU_change'] = current.get('BLEU', 0.0) - baseline.get('BLEU', 0.0)
        changes[level]['METEOR_change'] = current.get('METEOR', 0.0) - baseline.get('METEOR', 0.0)
        changes[level]['ROUGE-L_change'] = current.get('ROUGE-L', 0.0) - baseline.get('ROUGE-L', 0.0)
    return changes, baseline

# --- Run Evaluation for SmolVLM ---
print("\n===== Evaluating SmolVLM Robustness =====")
smolvlm_results_raw, smolvlm_part_c_data = ({}, []) # Initialize defaults
if 'smol_model' in globals() and smol_model and 'smol_processor' in globals() and smol_processor:
    smolvlm_results_raw, smolvlm_part_c_data = evaluate_on_occluded_images(
        model=smol_model,
        dataloader=test_dataloader, # Assumes test_dataloader is defined
        device=DEVICE,             # Assumes DEVICE is defined
        occlusion_levels=OCCLUSION_LEVELS, # Assumes OCCLUSION_LEVELS is defined
        is_smolvlm=True,
        smol_processor=smol_processor,
        custom_tokenizer=None,      # Not needed
        custom_img_transform=None   # Not needed
    )
    # Add model type label for Part C
    for item in smolvlm_part_c_data:
        item['model_type'] = 'SmolVLM'
    print("SmolVLM evaluation complete.")
else:
    print("Skipping SmolVLM evaluation: Model or Processor not loaded.")

# --- Calculate and Report SmolVLM Changes ---
print("\n--- Calculating SmolVLM Performance Changes ---")
smolvlm_changes, smolvlm_baseline = calculate_changes(smolvlm_results_raw)

print("\n--- SmolVLM Baseline Performance (0% Occlusion) ---")
print(f"BLEU={smolvlm_baseline.get('BLEU', 0.0):.4f}, METEOR={smolvlm_baseline.get('METEOR', 0.0):.4f}, ROUGE-L={smolvlm_baseline.get('ROUGE-L', 0.0):.4f}")

print("\n--- SmolVLM Performance Change (Score_at_Occlusion - Score_Baseline) ---")
print("-" * 68)
print(f"{'Model':<11} | {'Occlusion':^9} | {'BLEU Change':^11} | {'METEOR Change':^13} | {'ROUGE-L Change':^14}")
print("-" * 68)
for level in OCCLUSION_LEVELS:
    s_chg = smolvlm_changes.get(level, {})
    print(f"{'SmolVLM':<11} | {level:^9}% | {s_chg.get('BLEU_change', 0.0):^11.4f} | {s_chg.get('METEOR_change', 0.0):^13.4f} | {s_chg.get('ROUGE-L_change', 0.0):^14.4f}")
print("-" * 68)

# --- Save SmolVLM Data for Part C ---
if smolvlm_part_c_data:
    smolvlm_part_c_df = pd.DataFrame(smolvlm_part_c_data)
    part_c_output_dir = "/content/drive/MyDrive/assignment 2 deep learning/results" # Define output directory
    os.makedirs(part_c_output_dir, exist_ok=True)
    smolvlm_output_path = os.path.join(part_c_output_dir, "part_b_smolvlm_results_for_part_c.csv")
    try:
        smolvlm_part_c_df.to_csv(smolvlm_output_path, index=False)
        print(f"\nSuccessfully saved {len(smolvlm_part_c_df)} SmolVLM results for Part C to:\n{smolvlm_output_path}")
    except Exception as e:
        print(f"\nERROR: Failed to save SmolVLM Part C data to {smolvlm_output_path}: {e}")
else:
    print("\nWarning: No SmolVLM data collected for Part C.")

print("\nPart B - Step 1 (SmolVLM Evaluation) Finished.")


Executing Part B - Step 1: SmolVLM Evaluation

===== Evaluating SmolVLM Robustness =====

--- Evaluating Occlusion Level: 0% ---


Level 0% Batches: 100%|██████████| 25/25 [17:39<00:00, 42.40s/it]



--- Evaluating Occlusion Level: 10% ---


Level 10% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 10% Batches: 100%|██████████| 25/25 [18:02<00:00, 43.31s/it]



--- Evaluating Occlusion Level: 50% ---


Level 50% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 50% Batches: 100%|██████████| 25/25 [17:28<00:00, 41.95s/it]



--- Evaluating Occlusion Level: 80% ---


Level 80% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 80% Batches: 100%|██████████| 25/25 [16:22<00:00, 39.31s/it]

Level 0% Avg Scores: BLEU=0.0576, METEOR=0.0000, ROUGE-L=0.2545
Level 10% Avg Scores: BLEU=0.0471, METEOR=0.0000, ROUGE-L=0.2396
Level 50% Avg Scores: BLEU=0.0369, METEOR=0.0000, ROUGE-L=0.2228
Level 80% Avg Scores: BLEU=0.0204, METEOR=0.0000, ROUGE-L=0.1875
SmolVLM evaluation complete.

--- Calculating SmolVLM Performance Changes ---

--- SmolVLM Baseline Performance (0% Occlusion) ---
BLEU=0.0576, METEOR=0.0000, ROUGE-L=0.2545

--- SmolVLM Performance Change (Score_at_Occlusion - Score_Baseline) ---
--------------------------------------------------------------------
Model       | Occlusion | BLEU Change | METEOR Change | ROUGE-L Change
--------------------------------------------------------------------
SmolVLM     |    10    % |   -0.0105   |    0.0000     |    -0.0148    
SmolVLM     |    50    % |   -0.0206   |    0.0000     |    -0.0317    
SmolVLM     |    80    % |   -0.0372   |    0.0000     |    -0.0669    
--------------------------------------------------------------------




In [20]:
# PART 2: Custom Model Evaluation Execution

print("\nExecuting Part B - Step 2: Custom Model Evaluation")

# --- Helper Function to Calculate Performance Changes ---
# (Ensure this function is defined in your notebook, either here or globally before)
def calculate_changes(results_raw):
    """Calculates the difference between baseline (0%) and other occlusion levels."""
    # Default baseline if level 0 is missing for some reason
    baseline = results_raw.get(0, {'BLEU': 0.0, 'METEOR': 0.0, 'ROUGE-L': 0.0})
    # Ensure OCCLUSION_LEVELS is defined (e.g., [10, 50, 80])
    if 'OCCLUSION_LEVELS' not in globals(): raise NameError("OCCLUSION_LEVELS not defined.")
    changes = {level: {} for level in OCCLUSION_LEVELS}

    for level in OCCLUSION_LEVELS:
        # Default scores for the level if it's missing
        current = results_raw.get(level, {'BLEU': 0.0, 'METEOR': 0.0, 'ROUGE-L': 0.0})
        changes[level]['BLEU_change'] = current.get('BLEU', 0.0) - baseline.get('BLEU', 0.0)
        changes[level]['METEOR_change'] = current.get('METEOR', 0.0) - baseline.get('METEOR', 0.0)
        changes[level]['ROUGE-L_change'] = current.get('ROUGE-L', 0.0) - baseline.get('ROUGE-L', 0.0)
    return changes, baseline

# --- Check Prerequisites ---
# Verify that the necessary components from previous steps exist
# These checks run *before* attempting the potentially long evaluation loop
model_ready = 'custom_model' in globals() and custom_model is not None
tokenizer_ready = 'custom_tokenizer' in globals() and custom_tokenizer is not None
transform_ready = 'custom_transform' in globals() and custom_transform is not None
dataloader_ready = 'test_dataloader' in globals() and test_dataloader is not None
constants_ready = ('DEVICE' in globals() and 'OCCLUSION_LEVELS' in globals())

prerequisites_met = model_ready and tokenizer_ready and transform_ready and dataloader_ready and constants_ready

if not prerequisites_met:
    print("\nERROR: Prerequisites for Custom Model evaluation not met.")
    print(f"  - Custom Model Loaded: {model_ready}")
    print(f"  - Custom Tokenizer Loaded: {tokenizer_ready}")
    print(f"  - Custom Transform Defined: {transform_ready}")
    print(f"  - Test DataLoader Ready: {dataloader_ready}")
    print(f"  - DEVICE/OCCLUSION_LEVELS Defined: {constants_ready}")
    print("Skipping Custom Model evaluation.")
else:
    print("\nPrerequisites met. Starting Custom Model evaluation...")
    # --- Run Evaluation for Custom Model ---
    print("===== Evaluating Custom Model Robustness =====")
    custom_model_results_raw, custom_model_part_c_data = ({}, []) # Initialize defaults

    try:
        custom_model_results_raw, custom_model_part_c_data = evaluate_on_occluded_images(
            model=custom_model,
            dataloader=test_dataloader,
            device=DEVICE,
            occlusion_levels=OCCLUSION_LEVELS,
            is_smolvlm=False,                    # Flag for custom model logic
            custom_tokenizer=custom_tokenizer,
            custom_img_transform=custom_transform,
            smol_processor=None                  # Not needed for custom model
        )
        # Add model type label for Part C data
        for item in custom_model_part_c_data:
            item['model_type'] = 'Custom'
        print("Custom model evaluation run finished.")

    except Exception as eval_error:
        print(f"\nERROR during custom model evaluation run: {eval_error}")
        print("Evaluation may be incomplete.")
        # Results might be partially filled or empty


    # --- Calculate and Report Custom Model Changes ---
    custom_model_changes, custom_model_baseline = (None, None) # Initialize
    if custom_model_results_raw: # Check if the results dict has data
        print("\n--- Calculating Custom Model Performance Changes ---")
        try:
             custom_model_changes, custom_model_baseline = calculate_changes(custom_model_results_raw)

             print("\n--- Custom Model Baseline Performance (0% Occlusion) ---")
             print(f"BLEU={custom_model_baseline.get('BLEU', 0.0):.4f}, METEOR={custom_model_baseline.get('METEOR', 0.0):.4f}, ROUGE-L={custom_model_baseline.get('ROUGE-L', 0.0):.4f}")

             print("\n--- Custom Model Performance Change (Score_at_Occlusion - Score_Baseline) ---")
             print("-" * 68)
             print(f"{'Model':<11} | {'Occlusion':^9} | {'BLEU Change':^11} | {'METEOR Change':^13} | {'ROUGE-L Change':^14}")
             print("-" * 68)
             if custom_model_changes:
                 for level in OCCLUSION_LEVELS:
                     c_chg = custom_model_changes.get(level, {})
                     print(f"{'Custom':<11} | {level:^9}% | {c_chg.get('BLEU_change', 0.0):^11.4f} | {c_chg.get('METEOR_change', 0.0):^13.4f} | {c_chg.get('ROUGE-L_change', 0.0):^14.4f}")
             else:
                 # Should not happen if results_raw is not empty, but included for safety
                 print(f"{'Custom':<11} | {'---':^9} | {'---':^11} | {'---':^13} | {'---':^14} (Calculation Failed)")
             print("-" * 68)
        except Exception as report_err:
             print(f"\nERROR during results calculation/reporting: {report_err}")

    else:
        print("\nSkipping Custom Model results reporting as evaluation results are empty or evaluation failed.")


    # --- Save Custom Model Data for Part C ---
    if custom_model_part_c_data: # Check if any data was collected
        custom_part_c_df = pd.DataFrame(custom_model_part_c_data)
        # Ensure output directory exists
        part_c_output_dir = "/kaggle/working/results" # Saving to working directory is often easier in Kaggle
        # Or use: part_c_output_dir = "/content/drive/MyDrive/assignment 2 deep learning/results" # If using Drive
        try:
            os.makedirs(part_c_output_dir, exist_ok=True)
            custom_output_path = os.path.join(part_c_output_dir, "part_b_custom_results_for_part_c.csv")
            custom_part_c_df.to_csv(custom_output_path, index=False)
            print(f"\nSuccessfully saved {len(custom_part_c_df)} Custom Model results for Part C to:\n{custom_output_path}")
        except Exception as e:
            print(f"\nERROR: Failed to save Custom Model Part C data to {custom_output_path}: {e}")
    else:
        print("\nWarning: No Custom Model data collected/saved for Part C (check evaluation logs for errors).")

print("\nPart B - Step 2 (Custom Model Evaluation Script) Finished.")

# Reminder about combining CSVs if needed later
# print("\nNote: If SmolVLM evaluation also ran successfully, remember to combine ")
# print(" 'part_b_smolvlm_results_for_part_c.csv' and 'part_b_custom_results_for_part_c.csv' for Part C input.")


Executing Part B - Step 2: Custom Model Evaluation

Prerequisites met. Starting Custom Model evaluation...
===== Evaluating Custom Model Robustness =====

--- Evaluating Occlusion Level: 0% ---


Level 0% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 0% Batches: 100%|██████████| 25/25 [00:55<00:00,  2.23s/it]



--- Evaluating Occlusion Level: 10% ---


Level 10% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 10% Batches: 100%|██████████| 25/25 [00:56<00:00,  2.25s/it]



--- Evaluating Occlusion Level: 50% ---


Level 50% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 50% Batches: 100%|██████████| 25/25 [00:56<00:00,  2.26s/it]



--- Evaluating Occlusion Level: 80% ---


Level 80% Batches:   0%|          | 0/25 [00:00<?, ?it/s]huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)
Level 80% Batches: 100%|██████████| 25/25 [00:56<00:00,  2.27s/it]

Level 0% Avg Scores: BLEU=0.0081, METEOR=0.0000, ROUGE-L=0.1343
Level 10% Avg Scores: BLEU=0.0083, METEOR=0.0000, ROUGE-L=0.1317
Level 50% Avg Scores: BLEU=0.0127, METEOR=0.0000, ROUGE-L=0.1525
Level 80% Avg Scores: BLEU=0.0128, METEOR=0.0000, ROUGE-L=0.1482
Custom model evaluation run finished.

--- Calculating Custom Model Performance Changes ---

--- Custom Model Baseline Performance (0% Occlusion) ---
BLEU=0.0081, METEOR=0.0000, ROUGE-L=0.1343

--- Custom Model Performance Change (Score_at_Occlusion - Score_Baseline) ---
--------------------------------------------------------------------
Model       | Occlusion | BLEU Change | METEOR Change | ROUGE-L Change
--------------------------------------------------------------------
Custom      |    10    % |   0.0002    |    0.0000     |    -0.0026    
Custom      |    50    % |   0.0045    |    0.0000     |     0.0182    
Custom      |    80    % |   0.0047    |    0.0000     |     0.0139    
--------------------------------------------


