In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import torchvision.transforms as T
import torch.nn.functional as F
from torchvision.io import read_image, ImageReadMode
from PIL import Image
import os
import json
import pickle
import numpy as np

class RefCocoG_Dataset(Dataset):
    full_annotations = None

    def __init__(self, root_dir, annotations_f, instances_f, split='train', transform=None, target_transform=None) -> None:
        super().__init__()

        self.root_dir = root_dir
        self.annotations_f = annotations_f
        self.instances_f = instances_f

        self.split = split

        self.transform = transform
        self.target_transform = target_transform

        self.get_annotations()
        self.image_names = list([
            self.annotations[id]['image']['actual_file_name']
            for id in self.annotations
        ])

    def get_annotations(self):
        if RefCocoG_Dataset.full_annotations:
            self.annotations = dict(filter(lambda match: match[1]['image']['split'] == self.split, RefCocoG_Dataset.full_annotations.items()))
            return

        # Load pickle data
        with open(os.path.join(self.root_dir, 'annotations', self.annotations_f), 'rb') as file:
            self.data = pickle.load(file)

        # Load instances
        with open(os.path.join(self.root_dir, 'annotations', self.instances_f), 'rb') as file:
            self.instances = json.load(file)

        # Match data between the two files and build the actual dataset
        self.annotations = {}

        images_actual_file_names = {}
        for image in self.instances['images']:
            images_actual_file_names[image['id']] = image['file_name']

        for image in self.data:
            if image['ann_id'] not in self.annotations:
                self.annotations[image['ann_id']] = {}

            self.annotations[image['ann_id']]['image'] = image
            self.annotations[image['ann_id']]['image']['actual_file_name'] = images_actual_file_names[image['image_id']]

        for annotation in self.instances['annotations']:
            if annotation['id'] not in self.annotations:
                continue

            self.annotations[annotation['id']]['annotation'] = annotation

        # Keep only samples from the given split
        RefCocoG_Dataset.full_annotations = self.annotations
        self.annotations = dict(filter(lambda match: match[1]['image']['split'] == self.split, self.annotations.items()))

    def __len__(self):
        # Return the number of images
        return len(self.image_names)

    def corner_size_to_corners(self, bounding_box):
        """
        Transform (top_left_x, top_left_y, width, height) bounding box representation
        into (top_left_x, top_left_y, bottom_right_x, bottom_right_y)
        """

        return [
            bounding_box[0],
            bounding_box[1],
            bounding_box[0] + bounding_box[2],
            bounding_box[1] + bounding_box[3]
        ]

    def __getitem__(self, idx):
        # Get the image name at the given index
        image_name = self.image_names[idx]

        # Load the image file as a PIL image
        image = Image.open(os.path.join(self.root_dir, 'images', image_name)).convert('RGB')
        # image = read_image(os.path.join(self.root_dir, 'images', image_name), ImageReadMode.RGB)
        
        image_id = list(self.annotations)[idx]

        # print(image_id)

        # Get the caption for the image
        prompts = [
            prompt['sent'] for prompt in self.annotations[image_id]['image']['sentences']
        ]

        # Get the bounding box for the prompts for the image
        bounding_box = self.corner_size_to_corners(self.annotations[image_id]['annotation']['bbox'])

        # Apply the transform if given
        if self.transform:
            image = self.transform(image)

        sample = [
            image,
            bounding_box,
            prompts,
        ]

        # Return the sample as a list
        return sample

In [None]:
dataset_train = RefCocoG_Dataset('refcocog', 'refs(umd).p', 'instances.json', split='train')
dataset_val = RefCocoG_Dataset('refcocog', 'refs(umd).p', 'instances.json', split='val')
dataset_test = RefCocoG_Dataset('refcocog', 'refs(umd).p', 'instances.json', split='test')

dataset_splits = [
    dataset_train,
    dataset_val,
    dataset_test
]

In [None]:
len(RefCocoG_Dataset.full_annotations), len(dataset_train.annotations), len(dataset_val.annotations), len(dataset_test.annotations)

In [None]:
def collate_differently_sized_prompts(batch):
    images = [item[0] for item in batch]
    bboxes = [item[1] for item in batch]
    prompts = [item[2] for item in batch]
    
    return list(images), list(bboxes), list(prompts)

def get_data(dataset_splits, batch_size=64, test_batch_size=256, num_workers=0):
    training_data = dataset_splits[0]
    validation_data = dataset_splits[1]
    test_data = dataset_splits[2]

    # Change shuffle to True for train
    train_loader = torch.utils.data.DataLoader(training_data, batch_size, shuffle=True, drop_last=True, collate_fn=collate_differently_sized_prompts, num_workers=num_workers)
    val_loader = torch.utils.data.DataLoader(validation_data, test_batch_size, shuffle=False, collate_fn=collate_differently_sized_prompts, num_workers=num_workers)
    test_loader = torch.utils.data.DataLoader(test_data, test_batch_size, shuffle=False, collate_fn=collate_differently_sized_prompts, num_workers=num_workers)

    return train_loader, val_loader, test_loader

In [None]:
train_loader, val_loader, test_loader = get_data(dataset_splits, batch_size=128, test_batch_size=64, num_workers=0)

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda:0") # First GPU
else:
    device = 'cpu'

In [None]:
if torch.cuda.is_available():
    yolo_models = [torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True).to(f'cuda:{i}') for i in range(torch.cuda.device_count())]
else:
    yolo_models = [torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True).to(device)]

In [None]:
import clip

models, preprocesses = [], []

if torch.cuda.is_available():
    for i in range(torch.cuda.device_count()):
        model, preprocess = clip.load("RN50x16", device=f'cuda:{i}')
        
        models.append(model)
        preprocesses.append(preprocess)
else:
    model, preprocess = clip.load("RN50x16", device=device)
    models.append(model)
    preprocesses.append(preprocess)

In [None]:
next(models[0].parameters()).device

In [None]:
import torch

def cosine_similarity(a: torch.Tensor, b: torch.Tensor):
    """
    Cosine Similarity

    Normalizes both tensors a and b. Returns <b, a.T> (inner product).
    """

    a_norm = a / a.norm(dim=-1, keepdim=True)
    b_norm = b / b.norm(dim=-1, keepdim=True)

    similarity = (b_norm @ a_norm.T)

    return similarity.cpu()

In [None]:
%matplotlib inline

import matplotlib.pyplot as plt
import matplotlib.patches as patches

def visualise_scores(scores: torch.Tensor, images, texts: list[str]):
    for t_idx, text in enumerate(texts):
        for i_idx, image in enumerate(images):
            fig, ax = plt.subplots()
            ax.imshow(image)
            ax.set_title(f'Score: {scores[t_idx, i_idx]} / Prompt: {text}')

In [None]:
import torch
import torch.nn as nn
import clip
import numpy as np

class BaselineModel(nn.Module):
    def __init__(self, device=None, models=None, preprocesses=None, yolo_models=None) -> None:
        super().__init__()
        
        if device:
            self.device = device
        else:
            self.device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
        
        if not models or not preprocesses:
            raise ValueError('Models and preprocesses for CLIP model should be provided')

        self.models = models
        self.preprocesses = preprocesses
        
        if not yolo_models:
            raise ValueError('Models for YOLO should be provided')
        self.yolo_models = yolo_models

        self.transform_to_tensor = T.Compose([
            transforms.ToTensor()
        ])

    def forward(self, indices, images, prompts_list):
        self.device = indices.device
        if indices.is_cuda:
            self.device_index = int(str(self.device)[-1])
        else:
            self.device_index = 0

        # -- Getting the right data and moving it to the correct device --

        # Images remain on the CPU because they are PIL Images, not Tensors
        # Converting to Tensors leads to errors with YOLO
        images = [images[i] for i in indices]

        prompts_list = [prompts_list[i] for i in indices]
        prompts_tensor = [clip.tokenize(prompt_list).to(self.device) for prompt_list in prompts_list]

        # -- Actual processing --

        bounding_boxes = self.get_bounding_boxes(images)

        # It contains the predicted bounding box for each image for each prompt
        # Then, it is a list of length len(images) and for each entry there is a
        # list with len(prompts[i]), where i is the i-th image 
        overall_outputs = []

        with torch.no_grad():
            for idx, prompts_tensor_for_sample in enumerate(prompts_tensor):
                # Image crops
                image_crops = self.get_cropped_bounding_boxes(images[idx], bounding_boxes.xyxy[idx])

                preprocessed_image_crops = torch.stack([self.preprocesses[self.device_index](image).to(self.device) for image in image_crops])

                crop_features = self.models[self.device_index].encode_image(preprocessed_image_crops)
                crop_features /= crop_features.norm(dim=-1, keepdim=True)

                # Scaling is not required as cosine_similarity already scales.
                # This is to avoid redundant computations and speed up runtime
                text_features = self.models[self.device_index].encode_text(prompts_tensor_for_sample)

                similarity = cosine_similarity(crop_features, text_features).float()
                texts_p = (100 * similarity).softmax(dim=-1)

                # To return the cosine similarity between the best crops and the prompts
                # max_cos_sim_values, _ = similarity.max(dim=-1)
                # for max_value in max_cos_sim_values:
                #     overall_outputs.append(max_value.to(self.device))
                # continue

                _, max_indices = texts_p.max(dim=1)
                try:
                    for max_idx in max_indices:
                        overall_outputs.append(
                            torch.tensor(bounding_boxes.xyxy[idx][max_idx, 0:4]).to(self.device)
                        )
                except:
                    for max_idx in max_indices:
                        overall_outputs.append(
                            torch.tensor((0, 0, 0, 0)).to(self.device)
                        )

        return torch.stack(overall_outputs)

    def get_prompts(self, sample):
        return [prompt['sent'] for prompt in sample['image']['sentences']]

    def get_bounding_boxes(self, pil_images):
        bounding_boxes = self.yolo_models[self.device_index](pil_images)
        return bounding_boxes
    
    def get_cropped_bounding_boxes(self, image, bounding_boxes):
        cropped_bounding_boxes = []
        
        for bounding_box in bounding_boxes:
            cropped_img = image.crop((bounding_box[0].item(), bounding_box[1].item(), bounding_box[2].item(), bounding_box[3].item()))
            cropped_bounding_boxes.append(cropped_img)

        if len(cropped_bounding_boxes) == 0:
            cropped_bounding_boxes.append(image)
                
        return cropped_bounding_boxes

baseline_model = BaselineModel(models=models, preprocesses=preprocesses, yolo_models=yolo_models)

if torch.cuda.device_count() > 1:
    baseline_model = torch.nn.DataParallel(baseline_model)

In [None]:
from torchvision.ops import box_iou

def iou_metric(bounding_boxes, ground_truth_bounding_boxes):
    """
    Localization Accuracy Metric

    Intersection over Union (IoU) is a common metric measure for localization accuracy.
    """

    ground_truth_bounding_boxes = torch.tensor(ground_truth_bounding_boxes).unsqueeze(0).to(device)

    return box_iou(bounding_boxes, ground_truth_bounding_boxes)

def cosine_similarity_metric(bounding_boxes, ground_truth_bounding_boxes):
    """
    Cosine Similarity Metric

    Cosine similarity is a common metric measure for semantic similarity.
    """

    ground_truth_bounding_boxes = torch.tensor(ground_truth_bounding_boxes).to(device)
    
    return cosine_similarity(bounding_boxes, ground_truth_bounding_boxes)


### To compute average cosine similarity between embeddings

In [None]:
overall_outputs = []

for batch_idx, (images, gt_bounding_boxes, prompts) in enumerate(test_loader):
    print(f'-- Batch index: {batch_idx} --')

    prompts_tensor = [clip.tokenize(prompt_list) for prompt_list in prompts]
    
    indices = torch.tensor(list(range(len(images)))).to(device)
    outputs = baseline_model(indices, images, prompts)

    overall_outputs.append(outputs)

In [None]:
cos_sim_cpu = []
for out in overall_outputs:
    for cos_sim_val in out:
        cos_sim_cpu.append(cos_sim_val.item())
cos_sim_cpu = np.array(cos_sim_cpu)
np.nanmean(cos_sim_cpu)

### To compute standard metrics

In [None]:
from torchvision.ops import boxes as box_ops

IoUs = []
cosine_similarities = []
  
for batch_idx, (images, gt_bounding_boxes, prompts) in enumerate(test_loader):
    print(f'-- Batch index: {batch_idx} --')

    prompts_tensor = [clip.tokenize(prompt_list) for prompt_list in prompts]
    
    indices = torch.tensor(list(range(len(images)))).to(device)
    outputs = baseline_model(indices, images, prompts)

    outputs_grouped_by_sample = []
    outputs_idx = 0
    prompts_idx = 0
    while True:
        if not prompts_idx < len(images):
            break

        outputs_grouped_by_sample.append(
            outputs[outputs_idx : outputs_idx + len(prompts[prompts_idx])]
        )

        outputs_idx += len(prompts[prompts_idx])
        prompts_idx += 1

    for output_bboxes, gt_bboxes in zip(outputs_grouped_by_sample, gt_bounding_boxes):
        """
        There is one output bounding box for each prompt given in input.
        Note that each prompt for a given input is actually a list of prompts,
        therefore it can contain an arbitrary number of promps. Hence, there is
        a bounding box for each one of them.
        """

        result_ious = iou_metric(output_bboxes, gt_bboxes)
        result_cosine_similarity = cosine_similarity_metric(output_bboxes, gt_bboxes)

        for iou in result_ious:
            IoUs.append(iou)

        for cs in result_cosine_similarity:
            cosine_similarities.append(cs)

In [None]:
IoUs_to_cpu = np.array([tensor.item() if torch.is_tensor(tensor) else 0 for tensor in IoUs])
mIoU = np.nanmean(IoUs_to_cpu)

cosine_similarities_to_cpu = np.array([tensor.item() if torch.is_tensor(tensor) else 0 for tensor in cosine_similarities])
m_cos_sim = np.nanmean(cosine_similarities_to_cpu)

print('--- Metrics ---')
print(f'Mean Intersection over Union (mIoU): {mIoU}')
print(f'Mean Cosine Similarity: {m_cos_sim}')

In [None]:
np.savetxt('outcomes/iou_baseline_with_RN50x16.csv', IoUs_to_cpu, delimiter=',')
np.savetxt('outcomes/cossim_baseline_with_RN50x16.csv', cosine_similarities_to_cpu, delimiter=',')

In [None]:
counter = 0
counter_threshold = 0
for iou in IoUs:
    if iou == 0:
        counter += 1
    # if iou < 0.5:
    #     counter_threshold += 1
    if iou >= 0.5:
        counter_threshold += 1
counter, counter_threshold, len(IoUs), counter_threshold / len(IoUs)

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches

output_idx = 0

# Loading the image
img = images[output_idx]

# Preparing the output
fig, ax = plt.subplots()

# Display the image
ax.imshow(img)

colors = ['r', 'b', 'g']

# Create a Rectangle patch
for bbox, color in zip(outputs_grouped_by_sample[output_idx][1:2], colors):
    bounding_box_coordinates = bbox.cpu()
    top_left_x, top_left_y = bounding_box_coordinates[0], bounding_box_coordinates[1]
    width, height = bounding_box_coordinates[2]- top_left_x, bounding_box_coordinates[3] - top_left_y

    # Parameters: (x, y), width, height
    rect = patches.Rectangle((top_left_x, top_left_y), width, height, linewidth=1, edgecolor=color, facecolor='none')

    # Add the patch to the Axes
    ax.add_patch(rect)

ax.set_title(prompts[output_idx][1])

In [None]:
available_gpus = [torch.cuda.device(i) for i in range(torch.cuda.device_count())]
available_gpus