# Prompt Engineering

In [1]:
import json

with open("../data/questions/train_sampled_questions_50000.json") as f:
   questions = list(json.load(f).items())

In [330]:
import random 

def compute_object_size(scene_graph, object):
    image_size = scene_graph["width"] * scene_graph["height"]
    object_size = object["w"] * object["h"]
    return object_size / image_size

class_samples_positive = []
attr_samples_positive = []
rel_samples_positive = []

def object_within_image_bounds(scene_graph, object):
    return object["x"] >= 0 and object["y"] >= 0 and object["h"] > 0 and object["w"] > 0 and \
           object["x"] + object["w"] <= scene_graph["width"] and object["y"] + object["h"] <= scene_graph["height"]

for qid, question in questions:
    for op in question["semantic"]:
        operation = op["operation"]
        argument = op["argument"].strip()
        objects = question["sceneGraph"]["objects"]

        if operation == "select" and argument != "scene" and not argument.endswith("(-)"):
            matching_objects = [(oid, objects[oid]) for oid in argument.split("(")[1][:-1].split(",") if object_within_image_bounds(question["sceneGraph"], objects[oid])] 
            if len(matching_objects) > 0:
                oid, object = random.choice(matching_objects)
                object["object_id"] = oid
                class_samples_positive.append({
                        "question_id": qid,
                        "image_id": question["imageId"],
                        "class": argument.split("(")[0].strip(),
                        "object": object,
                        "object_size": compute_object_size(question["sceneGraph"], object),
                        "y": True
                    })  

        elif operation.startswith("filter"):
            attr = ' '.join(operation.split(' ')[1:]) if operation != "filter" else "any"
            attr_value = argument[4:-1] if argument.startswith('not(') else argument
            objects_with_attr = [(oid, o) for oid, o in objects.items() if attr_value in o["attributes"]]
            if len(objects_with_attr) > 0 and attr not in ["hposition", "vposition"]:
                oid, object = random.choice(objects_with_attr)
                object["object_id"] = oid
                attr_samples_positive.append({
                        "question_id": qid,
                        "image_id": question["imageId"],
                        "attr_value": attr_value,
                        "object": object,
                        "object_size": compute_object_size(question["sceneGraph"], object),
                        "y": True
                    })

        elif operation.startswith("verify"):
            attr = ' '.join(operation.split(' ')[1:]) if operation != "verify" else "any"
            attr_value = argument
            objects_with_attr = [(oid, o) for oid, o in objects.items() if attr_value in o["attributes"]]
            if len(objects_with_attr) > 0 and attr not in ["hposition", "vposition"]:
                oid, object = random.choice(objects_with_attr)
                object["object_id"] = oid
                attr_samples_positive.append({
                        "question_id": qid,
                        "image_id": question["imageId"],
                        "attr_value": attr_value,
                        "object": object,
                        "object_size": compute_object_size(question["sceneGraph"], object),
                        "y": True
                    })
                
        elif operation.startswith("choose ") and argument != "":
            attr = " ".join(operation.split(" ")[1:])
            attr_value = random.choice([argument.split("|")[0], argument.split("|")[1]])
            objects_with_attr = [(oid, o) for oid, o in objects.items() if attr_value in o["attributes"]]
            if len(objects_with_attr) > 0 and attr not in ["hposition", "vposition"]:
                oid, object = random.choice(objects_with_attr)
                object["object_id"] = oid
                attr_samples_positive.append({
                        "question_id": qid,
                        "image_id": question["imageId"],
                        "attr_value": attr_value,
                        "object": object,
                        "object_size": compute_object_size(question["sceneGraph"], object),
                        "y": True
                    })
                
        elif operation == "relate":
            relation_type = argument.split(',')[1]
            position = 'subject' if argument.split(',')[2].startswith('s') else 'object'
            target_object = argument.split('(')[1][:-1]

            if target_object != "-":
                if position == 'object':
                    matching_objects = [(oid, o) for oid, o in objects.items() if any(r["object"] == target_object and r["name"] == relation_type for r in o["relations"])]
                    if len(matching_objects) > 0:
                        oid0, object0 = random.choice(matching_objects)
                        object0["object_id"] = oid0
                        object1 = objects[target_object]
                        object1["object_id"] = target_object

                        rel_samples_positive.append({
                                "question_id": qid,
                                "image_id": question["imageId"],
                                "rel": relation_type,
                                "object0": object0,
                                "object1": object1,
                                "object0_size": compute_object_size(question["sceneGraph"], object0),
                                "object1_size": compute_object_size(question["sceneGraph"], object1),
                                "y": True
                            })
                        
                else:
                    matching_oids = [r["object"] for r in objects[target_object]["relations"] if r["name"] == relation_type]
                    matching_objects = [(oid, objects[oid]) for oid in matching_oids]
                    if len(matching_objects) > 0:
                        object0 = objects[target_object]
                        object0["object_id"] = target_object
                        oid1, object1 = random.choice(matching_objects)
                        object1["object_id"] = oid1

                        rel_samples_positive.append({
                            "question_id": qid,
                            "image_id": question["imageId"],
                            "rel": relation_type,
                            "object0": object0,
                            "object1": object1,
                            "object0_size": compute_object_size(question["sceneGraph"], object0),
                            "object1_size": compute_object_size(question["sceneGraph"], object1),
                            "y": True
                        })

In [332]:
with open('../data/metadata/gqa_all_class.json') as f:
    categories = json.load(f)
class_to_category = {}
for category, classes in categories.items():
    for c in classes:
        if c not in class_to_category:
            class_to_category[c] = [category]
        else:
            class_to_category[c].append(category)

class_samples_negative = []
for sample in class_samples_positive:
    candidate = random.choice(class_samples_positive)
    while sample["class"] in [*class_to_category.get(candidate["object"]["name"], []), candidate["object"]["name"]]:
        candidate = random.choice(class_samples_positive)
    class_samples_negative.append({
        **candidate,
        "class": sample["class"],
        "y": False
    })
class_samples = [*class_samples_positive, *class_samples_negative]
random.shuffle(class_samples)

attr_samples_negative = []
for sample in attr_samples_positive:
    candidate = random.choice(attr_samples_positive)
    while sample["attr_value"] in candidate["object"]["attributes"]:
        candidate = random.choice(attr_samples_positive)
    
    attr_samples_negative.append({
        **candidate,
        "attr": sample["attr_value"],
        "y": False
    })
attr_samples = [*attr_samples_positive, *attr_samples_negative]
random.shuffle(attr_samples)

rel_samples_negative = []
for sample in rel_samples_positive:
    candidate = random.choice(rel_samples_positive)
    while any(r for r in candidate["object0"]["relations"] if r["name"] == sample["rel"] and r["object"] == candidate["object1"]["object_id"]):
        candidate = random.choice(rel_samples_positive)
    
    rel_samples_negative.append({
        **candidate,
        "rel": sample["rel"],
        "y": False
    })
rel_samples = [*rel_samples_positive, *rel_samples_negative]
random.shuffle(rel_samples)

In [316]:
from transformers import CLIPProcessor, CLIPImageProcessor, CLIPTokenizer, CLIPModel

model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to("mps")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
image_processor = CLIPImageProcessor.from_pretrained("openai/clip-vit-base-patch32")
tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-base-patch32")

In [333]:
import tensorflow as tf
import pandas as pd

class_samples_flat = [{
    "question_id": s["question_id"],
    "image_id": s["image_id"],
    "class": s["class"],
    "object_x": s["object"]["x"],
    "object_y": s["object"]["y"],
    "object_w": s["object"]["w"],
    "object_h": s["object"]["h"],
    "object_size": s["object_size"],
    "y": s["y"]
} for s in class_samples]
class_samples_df = pd.DataFrame.from_dict(class_samples_flat)

In [334]:
import torch 
from torchvision.io import read_image, ImageReadMode
from torchvision.transforms.functional import crop, resize, pad
from math import tanh

def scaling(x, ceiling=3):
    return (1 - tanh(x * 2)) * ceiling

def get_scaled_bbox(entry, img_height, img_width, padding_scale_ceiling=1):
    padding_w = scaling(entry["object_w"] / img_width, padding_scale_ceiling) * entry["object_w"]
    padding_h = scaling(entry["object_h"] / img_height, padding_scale_ceiling) * entry["object_h"]

    return (
        int(max(entry["object_y"] - padding_h, 0)),
        int(max(entry["object_x"] - padding_w, 0)),
        int(min(entry["object_h"]+2*padding_h, img_height-max(entry["object_y"] - padding_h, 0))),
        int(min(entry["object_w"]+2*padding_w, img_width-max(entry["object_x"] - padding_w, 0)))
    )

class ClassDataset(torch.utils.data.Dataset):
    def __init__(self, df: pd.DataFrame):
        self.df = df 

    def __len__(self):
        return self.df.shape[0]
    
    def __getitem__(self, idx):
        entry = self.df.iloc[idx]
        image = read_image(f"../data/images/{entry['image_id']}.jpg", ImageReadMode.RGB)
        y,x,h,w = get_scaled_bbox(entry, image.shape[1], image.shape[2])
        orig_image = crop(image, entry["object_y"], entry["object_x"], entry["object_h"], entry["object_w"])
        try:
            image = crop(image, y, x, h, w)
            resize_dimensions = (224, 2*round((224*entry["object_w"]/entry["object_h"])/2)) if entry["object_h"] > entry["object_w"] else (2*round((224*entry["object_h"]/entry["object_w"])/2), 224)
            image = resize(image, resize_dimensions)
            image = pad(image, ((224 - resize_dimensions[1])//2, (224 - resize_dimensions[0])//2))
            orig_image = resize(orig_image, (224, 224))
        except:
            print(entry)
            print((y, x, h, w))
            print(read_image(f"../data/images/{entry['image_id']}.jpg", ImageReadMode.RGB).shape)
            raise 
        prompts = [f"a photo of a {entry['class']}"]

        return (orig_image, image, prompts), entry['y']


In [335]:
from torch.utils.data import DataLoader

class_dataset = ClassDataset(class_samples_df)
batch_size = 64
class_dataloader = DataLoader(class_dataset, batch_size=batch_size, shuffle=True)

In [336]:
correct = 0
for batch in class_dataloader:
    image_inputs = image_processor(batch[0][1], return_tensors="pt", do_resize=False, do_center_crop=False).to("mps")
    text_inputs = tokenizer([*batch[0][2][0], "a photo of an object"], return_tensors="pt", padding=True).to("mps")


    result = model(**image_inputs, **text_inputs)
    scores = torch.stack([torch.diagonal(result["logits_per_image"][:,:-1]), result["logits_per_text"][batch[1].shape[0],:]])
    probs = torch.nn.functional.softmax(scores, dim=0)
    correct += sum((probs[0, :] > probs[1, :]) == batch[1].to("mps"))

print(f"Accuracy: {correct/class_dataset.__len__()*100}%")



Accuracy: 82.72315216064453%
