<a href="https://colab.research.google.com/github/vivi-alencar/bachelor_thesis/blob/main/CustomClip_Inference.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pycocotools.coco import COCO

In [None]:
from PIL import Image
import os
import albumentations as A
import numpy as np
import pandas as pd
import itertools
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
from torch import nn
import torch.nn.functional as F
import timm
from transformers import DistilBertModel, DistilBertConfig, DistilBertTokenizer

In [None]:
import json

In [None]:
from collections import Counter

In [None]:
from collections import defaultdict

In [None]:
image_path = "path to coco images"

In [None]:
model_path = "path to trained model.pt"

In [None]:
model_name = "name of saved model"

In [None]:
### CLASS: Store configurations
class CFG:
    debug = False
    #debug = True
    image_path = image_path
    batch_size = 32           # Number of samples processed in each batch during training/validation
    num_workers = 2           # Number of subprocesses used for data loading
    head_lr = 5e-4            # Learning rate for the projection heads (which map image and text embeddings to a common space)
    image_encoder_lr = 5e-5   # Learning rate for the image encoder
    text_encoder_lr = 1e-5    # Learning rate for the text encoder
    weight_decay = 1e-3       # Regularization: add a penalty for large weights in the model

    epochs = 30              # Number of epochs to train the model
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model_name = 'vit_base_patch32_224' # Image encoder
    image_embedding = 768     # Embedding size for ViT after processing the image
    text_encoder_model = "distilbert-base-uncased" # Text encoder
    text_embedding = 768
    text_tokenizer = "distilbert-base-uncased"
    max_length = 200

    pretrained = True        # for both image encoder and text encoder
    trainable = True         # for both image encoder and text encoder
    temperature = 1.0        # Used in the softmax function to control the sharpness of the probability distribution in contrastive learning

    size = 224               # Input image size

    # For projection head:
    num_projection_layers = 1 # Layers in the projection head
    projection_dim = 256      # Size of the output embedding produced by the projection head
    dropout = 0.1             # Regularization: randomly drop some neurons during training to prevent overfitting. Recommended: 0.1

    # Early stopping patience for validation loss improvement
    early_stopping_patience = 7 # After N epochs without improvement, stop training

    # ReduceLROnPlateau scheduler settings
    lr_scheduler_patience = 3  # Number of epochs to wait before reducing the LR
    lr_scheduler_factor = 0.8  # Factor to reduce LR

In [None]:
### CLASS: Track and compute the running average of a metric"""

class AvgMeter:
    def __init__(self, name="Metric"):
        self.name = name
        self.reset()

    def reset(self): # This method sets all the internal counters (avg, sum, count) to zero, preparing the meter for a fresh run.
        self.avg, self.sum, self.count = [0] * 3

    def update(self, val, count=1): # Updates the sum, count, and recalculates the average (avg) whenever a new value (val) is provided.
        self.count += count
        self.sum += val * count
        self.avg = self.sum / self.count

    def __repr__(self): #  string representation method that formats and returns the average value as a string. Handy for printing/logging purposes.
        text = f"{self.name}: {self.avg:.4f}"
        return text

In [None]:
### FUNCTION: Retrieve the current learning rate from the optimizer"""

def get_lr(optimizer):
    for param_group in optimizer.param_groups:
        return param_group["lr"]

In [None]:
### CLASS: Dataset class to load image-class pairs"""

class CLIPDataset(torch.utils.data.Dataset): #class inherits from torch.utils.data.Dataset
    def __init__(self, image_filenames, classes, tokenizer=None, transforms=None): # initialization method for the class
        # image_filenames: list of image file names (i.e., paths to the images you want to load).
        # classes: list of corresponding classes for the images.
        # tokenizer: tokenizer used to convert the classes into a format suitable for the model.
        # transforms: set of image transformations applied to the images before feeding them into the model.

        # Stores the list of image filenames
        self.image_filenames = image_filenames

        # Convert the classes into a list and stores them
        self.classes = list(classes)

        # DEBUG
        print(f"Initializing CLIPDataset with {len(self.image_filenames)} images.")

        if tokenizer is not None:
            try:
                self.encoded_classes = tokenizer(
                    list(classes), padding=True, truncation=True, max_length=CFG.max_length, return_tensors='pt'
                )
                print("Classes tokenized successfully.")
            except Exception as e:
                print("Error during tokenization:", e)
                self.encoded_classes = None
        else:
            self.encoded_classes = None
            print("No tokenizer provided; skipping class encoding.")

        # # Use tokenizer to convert classes into a dictionary of tokenized representations,
        # # with padding and truncation applied to match the desired maximum sequence length (CFG.max_length).
        # self.encoded_classes = tokenizer(
        #     list(classes), padding=True, truncation=True, max_length=CFG.max_length
        # )

        # Store the image transformation functions for later use when loading the images
        self.transforms = transforms

    def __getitem__(self, idx): # Define how to retrieve an individual sample (image and class) from the dataset based on the provided index
        # Loop through each tokenized item in encoded_classes (e.g., input_ids, attention_mask)
        # and convert into PyTorch tensors, grabbing the specific tokenized class at index idx.
        # This creates the item dictionary where each key is a tokenized class component, and each value is a tensor.
        # item = {
        #     key: torch.tensor(values[idx])
        #     for key, values in self.encoded_classes.items()
        # }
        item = {}

        if self.encoded_classes is not None:
            item = {
                key: torch.tensor(values[idx])
                for key, values in self.encoded_classes.items()
            }
            #print(f"Encoded class for index {idx}: {item}")

        # Load image corresponding to the index idx using PIL:
        img_path = f"{CFG.image_path}/{self.image_filenames[idx]}"
        #print(f"Loading image from path: {img_path}")

        image = Image.open(img_path).convert("RGB")  # Open image and convert to RGB format

        # Convert the PIL image to a NumPy array (some transformations expect NumPy arrays)
        image = np.array(image)

        if self.transforms:
            image = self.transforms(image=image)['image']

        item['image'] = torch.tensor(image).permute(2, 0, 1).float()
        item['class'] = self.classes[idx]
        return item

    def __len__(self): # Method to return total number of samples in the dataset (length of the list of classes)
        return len(self.classes)

In [None]:
### CLASS: Image encoder"""

class ImageEncoder(nn.Module): # Class inherits from nn.Module from PyTorch

    # Constructor
    def __init__(
        self, model_name=CFG.model_name, pretrained=CFG.pretrained, trainable=CFG.trainable
    ):
        #model_name: model architecture for image encoding
        #pretrained: whether to use pre-trained version of the model
        #trainable: determine whether the model's parameters should be trainable (True: updated during training, False:frozen)

        # Call constructor of the parent class nn.Module (required when overriding the __init__ method in a subclass)
        super().__init__()

        # Create model with timm library
        # With num_classes = 0, output classification layer is removed (model outputs a fixed-size feature vector (embedding) instead of class predictions.
        self.model = timm.create_model(
            model_name, pretrained, num_classes=0, global_pool="avg"
        )

        # Loop iterates through all model parameters and sets the requires_grad attribute based on the value of trainable
        # requires_grad=True parameters are trainable/will be updated during backpropagation
        # requires_grad=False: parameters are frozen/will not be updated during training.
        for p in self.model.parameters():
            p.requires_grad = trainable

    def forward(self, x): # Method for Forward pass of the model (how the input data flows through the network)
        # x: input tensor (image or a batch of images)
        return self.model(x) # pass x through the pretrained model

In [None]:
### FUNCTION: Return a set of image transformations"""

def get_transforms(mode="train"):
    if mode == "train": # returns a set of transformations specifically tailored for training images
        return A.Compose( #sequential transformations
            [
                A.Resize(CFG.size, CFG.size, always_apply=True), # Resize to CFG.size x CFG.size
                A.Normalize(max_pixel_value=255.0, always_apply=True), # Scale pixel values from [0, 255] to [0, 1]
            ]
        )
    else: # Non-train mode
        return A.Compose( # Same as above. Validation and testing require the same resizing/normalization
            [
                A.Resize(CFG.size, CFG.size, always_apply=True),
                A.Normalize(max_pixel_value=255.0, always_apply=True),
            ]
        )

In [None]:
### CLASS: Text encoder"""

class TextEncoder(nn.Module): # Inherits from nn.Module
    def __init__(self, model_name=CFG.text_encoder_model, pretrained=CFG.pretrained, trainable=CFG.trainable):
        super().__init__()
        # model_name: name of the DistilBERT model to use.
        # pretrained: whether to use a pre-trained version of DistilBERT. True: model is loaded with pre-trained weights. False: model is initialized with random weights.
        # trainable: whether the DistilBERT model's parameters should be trainable. True: model will be fine-tuned during training. False: parameters are frozen.

        if pretrained:
            self.model = DistilBertModel.from_pretrained(model_name)
        else:
            self.model = DistilBertModel(config=DistilBertConfig())

        # Set requires_grad attribute for all parameters in the DistilBERT model based on the trainable flag.
        for p in self.model.parameters():
            p.requires_grad = trainable

        # In BERT-based models (including DistilBERT), the [CLS] token is a special token that is added at the beginning of each input sequence.
        # The hidden representation of this token is often used as the embedding for the entire sentence or sequence,
        # as it is designed to represent the full meaning of the input.
        self.target_token_idx = 0 # embedding for the [CLS] token (at position 0 in the sequence) will be used as the output embedding for the text sequence.

    def forward(self, input_ids, attention_mask): # How the model processes input data.
        # input_ids: tokenized input text sequences, represented as integers (tokens) that correspond to words or subwords.
        # Each sequence starts with the [CLS] token.
        # attention_mask: indicates which tokens are actual tokens and which are padding
        # (in cases where sequences have different lengths). It allows the model to ignore the padding tokens during processing.

        # Pass input ids and attention masks to DistilBERT, which returns object with various hidden states
        output = self.model(input_ids=input_ids, attention_mask=attention_mask)

        # Extract last hidden state, which contains final hidden layer representations for all tokens in the sequence
        last_hidden_state = output.last_hidden_state # Tensor of shape (batch_size, sequence_length, hidden_size).

        # Extract the hidden state corresponding to the [CLS] token (which is at index 0)
        return last_hidden_state[:, self.target_token_idx, :] # the fixed-size embedding representing the entire input sentence or sequence (tensor of shape (batch_size, hidden_size)

In [None]:
### CLASS: Projection head"""

# This class defines a projection head, which is responsible for mapping high-dimensional input embeddings into a lower-dimensional space
# (often used before applying a loss function, like contrastive loss).
class ProjectionHead(nn.Module): #inherits from nn.Module
    def __init__( # Constructor
        self,
        embedding_dim, # the size of the input embeddings (dimensionality of the embeddings produced by the image encoder or text encoder)
        projection_dim=CFG.projection_dim, # dimensionality of the space to which the embeddings will be projected
        dropout=CFG.dropout # prevent overfitting
    ):
        super().__init__()
        self.projection = nn.Linear(embedding_dim, projection_dim) # Fully connected layer, projects from embedding_dim to projection_dim
        self.gelu = nn.GELU() # nonlinear activation function. Allows network to model complex relationships between the input and output.
        self.fc = nn.Linear(projection_dim, projection_dim) # Fully connected layer, projects intermediate space back to the same dimensionality
        self.dropout = nn.Dropout(dropout) # Dropout randomly sets some elements of the tensor to zero during training, helping the network generalize better by preventing overfitting.
        self.layer_norm = nn.LayerNorm(projection_dim) # helps smooth out the values and avoid exploding/vanishing gradients during training.

    def forward(self, x): #  defines how the input embeddings x flow through the layers of the projection head.
        projected = self.projection(x) #  input x is projected to a lower-dimensional space (projection_dim) using the first fully connected layer.
        x = self.gelu(projected) # projected output is passed through the GELU activation function to introduce non-linearity.
        x = self.fc(x) #  fully connected layer is applied to the output of GELU, maintaining the same dimensionality (projection_dim).
        x = self.dropout(x) # Dropout randomly sets some elements to zero. Helps with regularization and reduces overfitting.
        x = x + projected # Residual connection is added: original projection (projected) is added to the output of the fully connected layer.
                          # Stabilizes training and prevents gradient issues. Allows model to learn differences from the original input rather than completely transforming it.
        x = self.layer_norm(x) #  output is normalized across the feature dimension using layer normalization to ensure smooth training
        return x # returns the final transformed embedding, which is now in the projection_dim space and ready for downstream tasks (e.g., contrastive loss).

In [None]:
### CLASS: Clip Model"""

# Encode both images and text into a shared embedding space, then calculate the contrastive loss between the two.
class CLIPModel(nn.Module):
    def __init__( # Constructor
        self,
        temperature=CFG.temperature, # scalar value used to scale the logits (similarities) before applying softmax.
                                     #It controls the sharpness of the output distribution.
                                     #A lower temperature sharpens the distribution, making high similarities more dominant.
        image_embedding=CFG.image_embedding, # Dimensionality of the image embeddings (i.e., the size of the vector produced by the ImageEncoder).
        text_embedding=CFG.text_embedding, # Dimensionality of the text embeddings (i.e., the size of the vector produced by the TextEncoder).
    ):
        super().__init__()
        self.image_encoder = ImageEncoder() # generates feature embeddings for the input images.
        self.text_encoder = TextEncoder() # generates feature embeddings for the input text.
        self.image_projection = ProjectionHead(embedding_dim=image_embedding) # takes output from img encoder and projects into low-dim space for contrastive learning
        self.text_projection = ProjectionHead(embedding_dim=text_embedding) # similarly for text
        self.temperature = temperature

    def forward(self, batch):
        # Input to model: batch, a dictionary containint image, input ids and attention mask
        image_features = self.image_encoder(batch["image"]) # batch of input images.
        text_features = self.text_encoder(
            input_ids=batch["input_ids"], attention_mask=batch["attention_mask"] # tokenized text sequences and corresponding attention masks.
        )

        # Getting Image and Text Embeddings (with same dimension)
        image_embeddings = self.image_projection(image_features) # project image features.
        text_embeddings = self.text_projection(text_features) # project text features.

        # Calculating Constrastive Loss:
        # 1. Compute similarity scores between the projected text embeddings and the image embeddings using matrix multiplication (@ operator).
        # The dot product between every text embedding and every image embedding is computed to create the logits matrix.
        logits = (text_embeddings @ image_embeddings.T) / self.temperature # similarity matrix: row = text, column = image.

        # 2. Identity matrix as target for cross-entropy loss
        batch_size = logits.shape[0]  # This should be the same for both text and image logits
        targets = torch.eye(batch_size, device=logits.device)  # Ensure batch size consistency


        # 3. Calculate loss using cross-entropy
        texts_loss = cross_entropy(logits, targets, reduction='none')
        images_loss = cross_entropy(logits.T, targets.T, reduction='none')

        # 4. Final loss as the average of the image and text losses, ensuring symmetry between the two modalities.
        loss =  (images_loss + texts_loss) / 2.0 # shape: (batch_size)

        return loss.mean() # The mean loss is returned for backpropagation.

In [None]:
### FUNCTION: Custom cross entropy loss"""

def cross_entropy(preds, targets, reduction='none'):
    # preds: Predicted values (logits) from the model.
    #       These are unnormalized scores (typically before applying softmax) that represent the model's confidence in each class.
    # targets: Identity matrix representing the ground truth (matching pairs).
    # reduction='none': Specifies how to reduce (aggregate) the loss across the batch.
    #                   none: No reduction, the per-sample loss is returned.
    #                   mean: The loss is averaged across all samples in the batch.

    # Create a LogSoftmax layer that applies the log of the softmax function along the last dimension (dim=-1)
    # Softmax converts logits (unnormalized model predictions) into probabilities.
    # LogSoftmax gives the natural logarithm of these probabilities, which is useful for computing log-likelihood-based losses.
    # It is numerically more stable than computing softmax followed by a logarithm
    log_softmax = nn.LogSoftmax(dim=-1)

    # Calculate the loss
    # Since the targets will be an identity matrix, this computes the negative log likelihood
    # for the matching pairs (diagonal entries) and considers non-matching pairs (off-diagonal
    loss = (-targets * log_softmax(preds)).sum(1)

    # Handling Reduction
    if reduction == "none": # function returns the individual loss values for each sample in the batch (i.e., no aggregation is applied).
        return loss
    elif reduction == "mean": # function returns the mean of the individual loss values across the batch.
        return loss.mean()

In [None]:
### FUNCTION: Create data loaders """

def build_loaders(dataframe, tokenizer, mode):
    # dataframe: Pandas DataFrame containing image paths and classes
    # tokenizer: tokenizer
    # mode: whether the DataLoader is being created for training or validation. Determines the behavior for data augmentation and shuffling.

    # Call function that returns different image transformations based on whether the model is in:
    #     training mode
    #     validation mode
    transforms = get_transforms(mode=mode)

    # Print DataFrame contents to check image and class values
    print(f"Building loaders with {len(dataframe)} samples.")
    print(f"Sample images: {dataframe['image'].head()}")
    print(f"Sample classes: {dataframe['classes'].head()}")

    # Create dataset
    dataset = CLIPDataset(
        dataframe["image"].values, # pass the array of image file paths from the DataFrame.
        dataframe["classes"].values, # pass the array of classes from the DataFrame.
        tokenizer=tokenizer, # to tokenize classes
        transforms=transforms, # image transformations, specific to the mode (train/validation), are applied to the images in the dataset.
    )

    # Debugging output after dataset creation
    print(f"Dataset created with {len(dataset)} samples.")

    # Create a PyTorch DataLoader
    dataloader = torch.utils.data.DataLoader(
        dataset, # dataset object created above
        batch_size=CFG.batch_size, # batch size for loading the data
        num_workers=CFG.num_workers, # number of worker processes used to load the data in parallel. More workers = faster data loading (depends on the system’s hardware).
        shuffle=True if mode == "train" else False, # Shuffling is important during training to ensure that the model doesn't learn the order of the data
    )

    # # Log some information about the dataloader
    # logging.info(f'{mode.capitalize()} DataLoader created with {len(dataset)} samples, batch size {CFG.batch_size}, and {CFG.num_workers} workers.')
    print("DataLoader created.")
    return dataloader # returns the DataLoader, which can then be used in the training/validation loop to load batches of images and classes.

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained(CFG.text_tokenizer)

In [None]:
text_encoder = TextEncoder(pretrained=True).to(CFG.device)  # Ensure it's on the correct device

In [None]:
# Load the model

In [None]:
model = CLIPModel().to(CFG.device)

In [None]:
model.load_state_dict(torch.load(model_path))
model.eval()  # Set the model to evaluation mode

## From COCOStats

In [None]:
tokenizer = DistilBertTokenizer.from_pretrained(CFG.text_tokenizer)

In [None]:
text_encoder = TextEncoder(pretrained=True).to(CFG.device)  # Ensure it's on the correct device

## COCO specific preprocessing

In [None]:
# Load COCO instance annotations using the COCO class of pycocotools library
coco = COCO('path to file instances_val2017.json')

# DEBUGGING category_id_to_name

In [None]:
# Create a dictionary of to map category IDs to names
category_id_to_name = {category['id']: category['name'] for category in coco.loadCats(coco.getCatIds())}

In [None]:
# # Print all category IDs and names
# for category_id, category_name in category_id_to_name.items():
#     print(f"ID: {category_id}, Name: {category_name}")

In [None]:
# Count the number of unique categories
num_categories = len(category_id_to_name)
print(f"Total number of categories: {num_categories}")


In [None]:
static_text = "a photo of a "

In [None]:
# Create a list of class queries based on the category names
class_queries = [f"{static_text}{name}" for name in category_id_to_name.values()]

In [None]:
print(class_queries)

In [None]:
class_embeddings = []
for query in class_queries:
    encoded_query = tokenizer([query], padding=True, truncation=True, return_tensors='pt').to(CFG.device)

    with torch.no_grad():
        text_embedding = text_encoder(encoded_query["input_ids"], encoded_query["attention_mask"])
        projected_embedding = model.text_projection(text_embedding).squeeze(0)  # Squeeze here

        class_embeddings.append(projected_embedding)

# Stack the embeddings into a single tensor
class_embeddings_tensor = torch.stack(class_embeddings)  # Should now be [80, 256]
class_embeddings_n = F.normalize(class_embeddings_tensor, p=2, dim=-1)  # Normalize after stacking

In [None]:
print(projected_embedding.shape)

In [None]:
print(class_embeddings_tensor.shape)

In [None]:
print(class_embeddings_n.shape)

In [None]:
# Prepare a dictionary to collect image paths and their corresponding class labels
image_data = {}

# Collect image paths and their corresponding class labels
for annotation in coco.loadAnns(coco.getAnnIds()): # This loop iterates through all annotations in the COCO dataset.
    category_id = annotation['category_id']
    label_name = category_id_to_name[category_id].strip().lower()  # Clean label name

    image_id = annotation['image_id']
    image_file_name = coco.loadImgs(image_id)[0]['file_name']

    if image_file_name not in image_data:
        image_data[image_file_name] = set()  # Initialize as set

    image_data[image_file_name].add(label_name)  # Add cleaned class name to set

# Convert the dictionary to a DataFrame
coco_df = pd.DataFrame({
    'image': list(image_data.keys()),
    'classes': [','.join(sorted(classes)) for classes in image_data.values()]  # Join cleaned classes into a single string
})

In [None]:
# Check the number of unique COCO classes
num_classes = len(category_id_to_name)
print(f"Number of COCO classes: {num_classes}")

In [None]:
print(f"Number of images in coco_df: {len(coco_df)}")

In [None]:
# Filter coco_df to only include images with classes
coco_df = coco_df[coco_df['classes'].str.strip().astype(bool)]
print(f"Number of images with ground truth: {len(coco_df)}")

In [None]:
image_id = coco.getImgIds(imgIds=["000000389933.jpg"])[0]
annotations = coco.loadAnns(coco.getAnnIds(imgIds=image_id))
ground_truth_labels = {category_id_to_name[annotation['category_id']] for annotation in annotations}



In [None]:
print(annotations)  # This should match your expected ground truth

## DEBUG DATAFRAME

In [None]:
# View first 5 rows
print(coco_df.head())

In [None]:
# Get column names
print(coco_df.columns)

In [None]:
# Access specific column
images = coco_df['image']
print(images)

In [None]:
# Filter rows
dog_rows = coco_df[coco_df['classes'].str.contains('dog')]
print(dog_rows)

In [None]:
# Access classes of a certain image by position
some_image_classes = coco_df.iloc[1]['classes']
print(some_image_classes)

In [None]:
# Same by name
image_name = "000000289343.jpg"
image_classes = coco_df[coco_df['image'] == image_name]['classes'].values[0]
print(image_classes)

In [None]:
# Get info
print(coco_df.info())

In [None]:
# Check for duplicate image names in the DataFrame
duplicates = coco_df[coco_df.duplicated(subset='image', keep=False)]

if not duplicates.empty:
    print("Duplicate images found:")
    print(duplicates)
else:
    print("No duplicate images found.")

In [None]:
print(coco_df[coco_df['image'] == image_name])

## Displaying an image

In [None]:
# # Function to display an image with its classes
# def display_image_with_classes(image_file_name, coco_df):
#     # Load the image
#     image_path = f"path file {image_file_name}"
#     image = Image.open(image_path)

#     # Get the classes for the image
#     classes = coco_df[coco_df['image'] == image_file_name]['classes'].values[0]

#     print(f"Image name: {image_file_name}")

#     # Display the image and its classes along with the image name
#     plt.figure(figsize=(10, 10))
#     plt.imshow(image)
#     plt.title(f"Image: {image_file_name}\nClasses: {classes}")  # Include image name in the title
#     plt.axis('off')  # Hide axes
#     plt.show()

In [None]:
# # Example usage for the 4952 images in the DataFrame (0 to 4951)
# if not coco_df.empty:
#     test_image = coco_df.iloc[3]['image']
#     display_image_with_classes(test_image, coco_df)

## Get image embeddings

In [None]:
def get_image_embeddings(test_df, model_path):
    print("Starting get_image_embeddings...")
    print(f"Input DataFrame shape: {test_df.shape}")

    test_loader = build_loaders(test_df, tokenizer, mode="valid")
    print("DataLoader created.")

    model = CLIPModel().to(CFG.device)
    model.load_state_dict(torch.load(model_path, map_location=CFG.device))
    model.eval()

    test_image_embeddings = []
    with torch.no_grad():
        for batch in tqdm(test_loader):
           # print(f"Processing batch with size: {batch['image'].size()}")
            image_features = model.image_encoder(batch["image"].to(CFG.device))
            image_embeddings = model.image_projection(image_features)
            test_image_embeddings.append(image_embeddings)

    print("Image embeddings calculated.")
    embeddings_tensor = torch.cat(test_image_embeddings)
    print(f"Final embeddings shape: {embeddings_tensor.shape}")

    # Return the model, embeddings, and DataLoader
    return model, embeddings_tensor, test_loader  # Ensure test_loader is included here

In [None]:
# Coco_df is the DataFrame containing the COCO validation images and their paths
model, image_embeddings, test_loader = get_image_embeddings(coco_df, model_path)

## START of top-1 accuracy test

In [None]:
# Initialize counters
matches_found = 0
correct_predictions = []  # List to store correct predictions
incorrect_predictions = []  # List to store incorrect predictions

In [None]:
for i in tqdm(range(len(coco_df)), desc="Processing Images with In-Memory Embeddings"):
    image_name = coco_df.iloc[i]['image']

    # Access the precomputed embedding from image_embeddings directly
    image_embedding = image_embeddings[i].to(CFG.device)
    image_embedding_n = F.normalize(image_embedding, p=2, dim=-1)  # Normalize embedding

    # Calculate similarity with class embeddings
    similarities = (100.0 * image_embedding_n @ class_embeddings_n.T).softmax(dim=-1)
    index_of_match = torch.argmax(similarities, dim=-1).item()
    category_name_found = class_queries[index_of_match][len(static_text):]

    if category_name_found in coco_df.iloc[i]['classes']:
        matches_found += 1
        correct_predictions.append({
            "image": image_name,
            "prediction": category_name_found
        })
    else:
        # Log incorrect predictions
        incorrect_predictions.append({
            "image": image_name,
            "predicted": category_name_found,
            "actual": coco_df.iloc[i]['classes']  # Actual classes for reference
        })

In [None]:
print(f"Matches found: {matches_found}")

## END of top-1 accuracy test

## Count correct predictions

In [None]:
coco_classes = [
    "person", "bicycle", "car", "motorcycle", "airplane", "bus", "train", "truck", "boat", "traffic light",
    "fire hydrant", "stop sign", "parking meter", "bench", "bird", "cat", "dog", "horse", "sheep", "cow",
    "elephant", "bear", "zebra", "giraffe", "backpack", "umbrella", "handbag", "tie", "suitcase", "frisbee",
    "skis", "snowboard", "sports ball", "kite", "baseball bat", "baseball glove", "skateboard", "surfboard",
    "tennis racket", "bottle", "wine glass", "cup", "fork", "knife", "spoon", "bowl", "banana", "apple",
    "sandwich", "orange", "broccoli", "carrot", "hot dog", "pizza", "donut", "cake", "chair", "couch",
    "potted plant", "bed", "dining table", "toilet", "tv", "laptop", "mouse", "remote", "keyboard", "cell phone",
    "microwave", "oven", "toaster", "sink", "refrigerator", "book", "clock", "vase", "scissors", "teddy bear",
    "hair drier", "toothbrush"
]

In [None]:
# Initialize a dictionary to hold counts for each class
class_counts = {class_name: 0 for class_name in coco_classes}

# Count occurrences of each prediction
for entry in correct_predictions:
    prediction = entry["prediction"]
    if prediction in class_counts:
        class_counts[prediction] += 1

# Print the counts for each class
for class_name, count in sorted(class_counts.items()):
    print(f"{class_name}: {count}")

## Count incorrect predictions

In [None]:
# Initialize a dictionary to hold counts for each incorrect prediction category
incorrect_class_counts = {class_name: 0 for class_name in coco_classes}

# Count occurrences of each incorrect prediction
for entry in incorrect_predictions:
    prediction = entry["predicted"]
    if prediction in incorrect_class_counts:
        incorrect_class_counts[prediction] += 1

for class_name, count in sorted(incorrect_class_counts.items()):
    print(f"{class_name}: {count}")

## Save information

In [None]:
# Define the folder and file path
output_folder = "path to output folder"
os.makedirs(output_folder, exist_ok=True)  # Create folder if it doesn't exist

# Set the file name and path
output_path = os.path.join(output_folder, f"{model_name}_results.json")

In [None]:
# Data structure to hold results
results = {
    "model": model_name,
    "total_correct": len(correct_predictions),
    "total_incorrect": len(incorrect_predictions),
    "correct_predictions": correct_predictions,
    "incorrect_predictions": incorrect_predictions
}

In [None]:
# Save the results as a JSON file
with open(output_path, "w") as outfile:
    json.dump(results, outfile, indent=4)

print(f"Results saved to {output_path}")

## Check information

In [None]:
# Path to the JSON file
file_path = f" path to results.json"

# Load and verify the JSON content
with open(file_path, "r") as infile:
    loaded_results = json.load(infile)

# Display the loaded data to verify correctness
print("Model Name:", loaded_results["model"])
print("Total Correct Predictions:", loaded_results["total_correct"])
print("Total Incorrect Predictions:", loaded_results["total_incorrect"])

In [None]:
# # Count occurrences of a category in correct predictions
# target_category = "bird"

# category_correct_count = sum(1 for entry in loaded_results["correct_predictions"] if entry["prediction"] == target_category)
# category_incorrect_count = sum(1 for entry in loaded_results["incorrect_predictions"] if entry["predicted"] == target_category)

# print(f"The category '{target_category}' was correctly predicted {category_correct_count} times.")
# print(f"The category '{target_category}' was incorrectly predicted {category_incorrect_count} times.")

## Double check

In [None]:
# Initialize a dictionary to hold counts for each correct prediction category
correct_class_counts = defaultdict(int)

# Count occurrences of each correct prediction
for entry in loaded_results["correct_predictions"]:
    predicted_class = entry["prediction"]
    correct_class_counts[predicted_class] += 1

# Display the counts for each correctly predicted class
for class_name, count in sorted(correct_class_counts.items()):
    print(f"{class_name}: {count}")

In [None]:
# Initialize a dictionary to hold counts for each incorrect prediction category
incorrect_class_counts = defaultdict(int)

# Count occurrences of each incorrect prediction
for entry in loaded_results["incorrect_predictions"]:
    predicted_class = entry["predicted"]
    incorrect_class_counts[predicted_class] += 1

# Display the counts for each incorrectly predicted class
for class_name, count in sorted(incorrect_class_counts.items()):
    print(f"{class_name}: {count}")

## Generate images and save

In [None]:
# Convert dictionaries to DataFrames
correct_df = pd.DataFrame(list(correct_class_counts.items()), columns=['Category', 'Correct'])
incorrect_df = pd.DataFrame(list(incorrect_class_counts.items()), columns=['Category', 'Incorrect'])

# Merge data for easier plotting
category_counts_df = pd.merge(correct_df, incorrect_df, on='Category', how='outer').fillna(0)
category_counts_df = category_counts_df.sort_values(by=['Correct', 'Incorrect'], ascending=False)

# Set a wider figure size
plt.figure(figsize=(20, 8))  # Adjust width and height as needed

# Plot
category_counts_df.plot(kind='bar', x='Category', stacked=True, ax=plt.gca())
plt.title("Correct vs. Incorrect Predictions per Category")
plt.xlabel("Category")
plt.ylabel("Prediction Count")
plt.xticks(rotation=45, ha="right")  # Rotate category names for better readability

# Save the plot as a PNG file, including the model name
output_path = f"path to save {model_name}_correct_vs_incorrect_predictions.png"
plt.savefig(output_path, format='png', bbox_inches='tight')

plt.show()
print(f"Plot saved to {output_path}")

## Top 10 plots

In [None]:
# Sort categories by the number of correct predictions and select the top 10
top_correct_df = category_counts_df.sort_values(by='Correct', ascending=False).head(10)

# Plot the top 10 categories with the highest number of correct predictions
top_correct_df.plot(kind='bar', x='Category', y='Correct', color='green')
plt.title("Top 10 Categories with Highest Number of Correct Predictions")
plt.xlabel("Category")
plt.ylabel("Correct Predictions")

# Save the plot as a PNG file, including the model name
output_path = f"path to save {model_name}_top_10_correct.png"
plt.savefig(output_path, format='png', bbox_inches='tight')

plt.show()
print(f"Plot saved to {output_path}")

In [None]:
top_misclassified_df = category_counts_df.sort_values(by='Incorrect', ascending=False).head(10)

top_misclassified_df.plot(kind='bar', x='Category', y='Incorrect', color='red')
plt.title("Top 10 Most Misclassified Categories")
plt.xlabel("Category")
plt.ylabel("Incorrect Predictions")

# Save the plot as a PNG file, including the model name
output_path = f" path to save {model_name}_top_10_incorrect.png"
plt.savefig(output_path, format='png', bbox_inches='tight')

plt.show()
print(f"Plot saved to {output_path}")