In [17]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from matplotlib import patches

import sys
sys.path.append('../src/utils')

from ac_object_detection_functions import *

In [None]:
#Load in the original model
import torch
from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection
device = torch.device("mps")
model_id = "IDEA-Research/grounding-dino-tiny"
processor = AutoProcessor.from_pretrained(model_id)
model = AutoModelForZeroShotObjectDetection.from_pretrained(model_id, ignore_mismatched_sizes=True).to(device)


In [None]:
import json

# Load API key and project ID from a configuration file
with open('../config.json', 'r') as config_file:
    config = json.load(config_file)


import os
from openai import OpenAI

client = OpenAI(
    # This is the default and can be omitted
    organization= config['OPENAI_ORG_ID'],
    project=config['OPENAI_PROJECT_ID'],
    api_key=config['OPENAI_API_KEY']
)


In [None]:
with_hvac = [0, 1, 3, 6, 8, 9, 10, 12, 13, 14, 16, 17, 18, 19, 20, 21, 22, 24, 25, 27, 28, 29, 30, 31, 33, 34, 35, 36, 37, 38, 39, 40, 42, 43, 45, 46, 47, 50, 51, 53, 56, 57, 59, 60, 61, 65, 66, 68, 70, 71, 72, 75, 76, 77, 79, 81, 83, 85, 86, 87, 88, 91, 93, 94, 95, 96, 97, 98, 99, 102, 103, 104, 105, 106, 107, 109, 110, 111, 112, 114, 115, 118, 119, 121, 122, 123, 125, 126, 128, 129, 130, 132, 133, 134, 135, 137, 138, 140, 142, 143, 144, 145, 147, 149]

In [None]:
with_leak = [0, 6, 8, 10, 12, 13, 18, 25, 29, 39, 42, 43, 45, 47, 51, 56, 61, 65, 66, 68, 70, 71, 72, 75, 76, 77, 79, 83, 87, 93, 98, 110, 111, 114, 121, 125, 128, 130, 143, 147, 149]


In [None]:
category_options = ['AC Unit', 'AC leaking']

params = {}
params['manual_prompt'] = False
input_prompt = '"Tight box around HVAC on roof."'
params['rerun_zero_shot'] = True
params['dataset_version'] = 'v4'
params['seed'] = 42
params['category_idx'] = 1
params['display_image'] = False
params['max_boxes'] = 900 #has to be 900 for the model to work
params['area_threshold'] = 0.20
params['score_threshold'] = 0.1
params['overlap_threshold'] = 0.5
params['skinny_threshold'] = 0.15
params['text_version'] = 'v4'
params['model_version'] = 'v3'
params['partial_run'] = True
params['partial_run_subset_count'] = 10
params['iou_threshold'] = 0.33
params["LLM_try_total"] = 3
model.config.num_queries = params['max_boxes']

ds = datasets.load_from_disk("../data/models/ac_model/ac_object_" + params['dataset_version'] + ".hf")
#set seed for random
random.seed(params['seed'])

train_idx_image = random.sample(range(0,len(ds)), int(len(ds)*0.8))
test_idx_image = [x for x in range(0,len(ds)) if x not in train_idx_image]
params['category'] = category_options[params['category_idx']]
if params['partial_run'] == True:
    if params['category_idx'] == 0:
        zero_shot_set = random.sample([x for x in train_idx_image if x in with_hvac], params['partial_run_subset_count'])
    elif params['category_idx'] == 1:
        zero_shot_set = random.sample([x for x in train_idx_image if x in with_leak], params['partial_run_subset_count'])
else:
    zero_shot_set = range(0, len(ds))
    

category_idx = params['category_idx']
print(model_id)


#initialization
# starter_tries = {'a tiny HVAC (air conditioner) machine that MUST be ON a roof. Make a tight bounding box.': '0.83', 'HVAC on roof.': '0.73', 'A small, off-white or grey HVAC unit on a rooftop, featuring a rectangular shape and a circular fan on top. Use a precise bounding box.' : '0.69','A small rectangular HVAC unit with a circular fan on top, located on a rooftop. Ensure a tight and accurate bounding box.': '0.77'}

starter_tries = {'HVAC on a roof including a smear or leak': '?'}


prompt_results = starter_tries

try_count = len(prompt_results.keys())


if params['manual_prompt'] == True:
    try_count = params["LLM_try_total"]-1

while try_count < params["LLM_try_total"]:


    if category_idx == 0:
        params["prompt_seed"] = ("You are trying to get GroundingDino to find and put bounding boxes on all HVAC systems (typically air conditioners) on a roof based on aerial imagery. GroundingDinop expects a prompt to be a concise description of the object, but it wasn't trained much on aerial imagery so it may need help in describing the object. I have no idea if grammar matters. The image is of a building's rooftop. The HVAC units usually take up less than 5% of the image and look like small, rectangular boxes with one or more circular fan(s) on top. The unit is typically white or grey. Sometimes the units will have pipes or vents visibly connected to them, but not always. GroundingDino tends to have looser bounding boxes, but we want tight bounding boxes. You could think of other synonyms to help.")
    elif category_idx == 1:
        params["prompt_seed"] = ("You are trying to get GroundingDino to find and put bounding boxes on all leaking HVAC on roof systems as seen on a roof based on aerial imagery. You need it to draw around both the system itself and the discoloration caused by the leak. The leaks could be described as looking like smears, stains, traces or sediment flowing down the roof from the HVAC system.")     
    
    params["LLM instructions"] = ("You are helping with an experiment to find the best prompt for a generalized multimodal modal to make it specialized at finding an object. Your job is to generate prompts you believe will be successful based on the prompt seed information and a history of how well each previous prompt performed. The goal is to test lots of different options to find the best one, learning from the previous prompts that were most successful. You have been given a total of " + str(params["LLM_try_total"]) + " tries to generate prompts. You have already used " + str(try_count) + " tries. The results of your previous tries are contained in a dictionary with the text prompt as a key and the corresponding f1 score. The higher the score, the better the prompt, with a maximum score of 1.00. The prompt seed information is: " + params["prompt_seed"] + "The previous trial results are: " + str(prompt_results) + ". Please generate a new prompt based on this information. Your output should be a single string with no commentary. Try to make the shortest possible prompt while maximizing the f1 score. Use fewer than 10 words, ideally 2 to 5. NEVER do the exact same prompt twice. Review the previous prompts to make sure this doesn't happen.")  

    chat_completion = client.chat.completions.create(
    messages=[
        {
            "role": "user",
            "content": params["LLM instructions"]
        }
    ],
    model="gpt-4o-mini",
    )
    if params['manual_prompt'] == True:
        response = input_prompt
    else:
        response = chat_completion.choices[0].message.content
    print('Try ' + str(try_count) + ' with prompt: ' + response)
    
    prompt_results[response] = ''
    params['text_idx'] = len(prompt_results.keys()) - 1
    params['text'] = response
    text = params['text'] 
    if params['rerun_zero_shot'] == True:
        zero_shot_df = pd.DataFrame()
        for image_dict_idx in zero_shot_set:    
            
            human_labels = ds[image_dict_idx]['objects']
            image = ds[image_dict_idx]['image']
            with torch.no_grad():
                inputs = processor(images=image, text=text, return_tensors="pt").to(device)
            
                outputs = model(**inputs,output_hidden_states=True)
                results = processor.post_process_grounded_object_detection(
                    outputs,
                    inputs.input_ids,
                    box_threshold=0.00,
                    text_threshold=0.00,
                    target_sizes=[image.size[::-1]],
            
                )
        
            qualified_box_idx = filter_model_bbox_outputs(results, outputs, image, params)
            if len(qualified_box_idx) == 0:
                qualified_box_idx = [0]
            filt_outputs = FilteredOutputs()
            
            for output_items in ['pred_boxes', 'last_hidden_state']:
                # filter the tensor's 2nd dimension by qualified_box_idx
                setattr(filt_outputs, output_items, outputs[output_items][:, qualified_box_idx, :])
        
            boxes = results[0]['boxes'][qualified_box_idx]
            if len(boxes) == 0:
                scores = [0]
            else:
                scores = results[0]['scores'].cpu().numpy()[qualified_box_idx]
            mdf = pd.DataFrame(columns = ['model_boxes'])
            mdf['model_boxes'] = [[int(y) for y in x] for x in boxes.tolist()]
            mdf['scores'] = scores
            mdf['model_idx'] = mdf.index
            mdf['image_idx'] = image_dict_idx
            mdf['tmp_key'] = 1
            
            
            hdf = human_labels_to_df(human_labels, category_idx)
            hdf['human_idx'] = hdf.index
            hdf['tmp_key'] = 1
            annot_df = pd.merge(hdf,mdf, on='tmp_key')
            annot_df['iou'] = annot_df.apply(lambda row: calculate_iou_not_polygon(row['human_boxes'], row['model_boxes']), axis=1)
            best_match_df = annot_df.sort_values('iou', ascending=False).drop_duplicates(['model_boxes']).sort_values('model_idx')
            
            best_match_df['last_hidden_state'] = filt_outputs.last_hidden_state.cpu().tolist()[0]
            best_match_df['image_idx'] = image_dict_idx
            # logits = filt_outputs.logits.cpu().numpy()[0]
            # logits = list(logits[:, (logits != float('-inf')).any(axis=0)])
            # best_match_df['logits'] = logits
            
            zero_shot_df = pd.concat([zero_shot_df,best_match_df], ignore_index=True)
            print('Image index: ' +str(image_dict_idx) + ' ' + 'for category_idx ' + str(category_idx) + ' processed with text index: ' + str(params['text_idx']) + ' and model version: ' + params['model_version'])
        if params['partial_run'] == True:
            filename = "../data/models/ac_model/partial/zero_shot_df_" + params["dataset_version"] + '_t' + str(params["text_idx"]) + "_" + params["model_version"] + "_partial.pkl"
        else:
            filename = "../data/models/ac_model/zero_shot_df_" + params["dataset_version"] + '_t' + str(params["text_idx"]) + "_" + params["model_version"] + ".pkl"
        if params['manual_prompt'] == False:
            zero_shot_df.to_pickle(filename)
    else:
        #load zero shot df from pkl
        filename = "../data/models/ac_model/zero_shot_df_" + params["dataset_version"] + '_t' + str(params["text_idx"]) + "_" + params["model_version"] + ".pkl"
        zero_shot_df = pd.read_pickle(filename)
    
    
    df = zero_shot_df.copy(deep=True)
    df['matched'] = (df['iou'] > params['iou_threshold']).astype(int)
    y_actual = df['matched']
    y_score = df['scores']
    zero_shot_score = f1_scoring(y_actual, y_score)
    print(zero_shot_score)
    prompt_results[text] = str(zero_shot_score)
    try_count = len(prompt_results.keys())

In [None]:
#save the dictionary prompt_results to disk
import pickle
filename = "../data/models/ac_model/prompt_results_" + params["dataset_version"] + "_" + params["model_version"] + ".pkl"
with open(filename , 'wb') as f:
    pickle.dump(prompt_results, f)


In [None]:
prompt_results

In [None]:
from sklearn.metrics import f1_score

fpr, tpr, thresholds = roc_curve(y_actual, y_score)
J = tpr - fpr
# Find the index of the threshold with the greatest Youden's J statistic
ix = np.argmax(J)
# Find the optimal threshold
optimal_threshold = thresholds[ix]
y_pred_binary = (y_score > optimal_threshold).astype(int)
# print(accuracy_score(y_actual, y_pred_binary))
# print(classification_report(y_actual, y_pred_binary))
# print(confusion_matrix(y_actual, y_pred_binary))
# print(auc(fpr, tpr))
# #print f1 score


 
print(f1_score(y_actual, y_pred_binary))
y_pred_baseline = y_pred_binary+1 >= 1
print(f1_score(y_actual, y_pred_baseline))

In [None]:
from sklearn.model_selection import ShuffleSplit
ss = test_train_split( test_size=0.2, random_state=42)
train_index, test_index = ss.split(X, y_iou)
for train_index, test_index in ss.split(X, y_iou):
    print("TRAIN:", train_index, "TEST:", test_index)
    X_train, X_test = X[train_index], X[test_index]
    y_train, y_test = y_iou[train_index], y_iou[test_index]

In [None]:
# Run linear regression to predict the matched column continuous based on the last_hidden_state_diff
df = zero_shot_df.copy(deep=True)
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import root_mean_squared_error
from sklearn.metrics import fbeta_score

#X = np.array(train_df['top_states'].tolist())
#X = np.array(train_df['last_hidden_state'].tolist())

X = np.array(df['last_hidden_state'].tolist())
#X = np.array(df['scores'].tolist())
#add an intercept
X = np.concatenate([X, np.ones((X.shape[0], 1))], axis=1)


y_iou = np.array(df['iou'])

import random
random.seed(42)
train_image_idx = random.sample(range(0,len(ds)), int(len(ds)*0.8))
test_image_idx = [x for x in range(0,len(ds)) if x not in train_image_idx]

train_idx = np.array(df[df['image_idx'].isin(train_image_idx)].index) 
test_idx = np.array(df[df['image_idx'].isin(test_image_idx)].index)


# test_idx = list(set(np.concatenate([test_idx,np.array([429, 430, 431, 432, 433, 434])])))
# train_idx = np.array([x for x in indicies if x not in test_idx])
X_train = X[train_idx]
X_test = X[test_idx]
y_iou_train = y_iou[train_idx]
y_iou_test = y_iou[test_idx]

scaler = StandardScaler()
X_train= scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)




df['matched'] = (df['iou'] > 0.33).astype(int)
y_matched = np.array(df['matched'])
y_matched_train = y_matched[train_idx]
y_matched_test = y_matched[test_idx]

y_orig_score = np.array(df['scores'])
y_orig_score_train = y_orig_score[train_idx]
y_orig_score_test = y_orig_score[test_idx]



#get coefficients for the lasso model to iou
clf = Lasso(alpha=0.001)
clf.fit(X_train, y_iou_train)
y_pred_iou = clf.predict(X_test)
#find and report coefficients
coefficients = clf.coef_
iou_pred_error = root_mean_squared_error(y_iou_test, y_pred_iou)
print("iou prediction error: " + str(iou_pred_error))
print("Number of non-zero coefficients: ", len(coefficients[coefficients != 0]))
plt.plot(coefficients)


from sklearn.metrics import roc_curve, auc
from sklearn.metrics import root_mean_squared_error
fpr, tpr, thresholds = roc_curve(y_matched_test, y_pred_iou)
fpr_o, tpr_o, thresholds_o = roc_curve(y_matched_test, y_orig_score_test)
# beta = 1
# def find_optimal_threshold(y_true, y_scores,beta):
#     # Calculate the ROC curve points
#     fpr, tpr, thresholds = roc_curve(y_true, y_scores)
# 
#     # Calculate the F1 score for each threshold
#     f1_scores = [fbeta_score(y_true, y_scores > t, beta = beta) for t in thresholds]
# 
#     # Get the optimal threshold
#     optimal_idx = np.argmax(f1_scores)
#     optimal_threshold = thresholds[optimal_idx]
# 
#     return optimal_threshold
# optimal_threshold = find_optimal_threshold(y_matched_test, y_pred_iou,beta)
# optimal_threshold_o = find_optimal_threshold(y_matched_test, y_orig_score_test,beta)

roc_auc = auc(fpr, tpr)
roc_auc_o = auc(fpr_o, tpr_o)
plt.figure()
lw = 2

plt.plot(fpr_o, tpr_o, color='darkblue', lw=lw, label='Before Tuning (area = %0.2f)' % roc_auc_o)
plt.plot(fpr, tpr, color='darkorange', lw=lw, label='After Tuning with Grande data (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], color='navy', lw=lw, linestyle='--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('HVAC Detection Performance with GroundingDino')
plt.legend(loc="lower right")
plt.show()
#Compute the Youden's J statistic
J = tpr - fpr
J_o = tpr_o - fpr_o

# # Find the index of the threshold with the greatest Youden's J statistic
ix = np.argmax(J)
ix_o = np.argmax(J_o)

# Find the optimal threshold
optimal_threshold = thresholds[ix]
optimal_threshold_o = thresholds[ix]
print('Grande Optimal Threshold: ', optimal_threshold)
print('Zero Shot Optimal Threshold: ', optimal_threshold_o)


from sklearn.metrics import accuracy_score
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix

print("Grande Model")
y_pred_binary = (y_pred_iou > optimal_threshold).astype(int)
print(accuracy_score(y_matched_test, y_pred_binary))
print(classification_report(y_matched_test, y_pred_binary))
print(confusion_matrix(y_matched_test, y_pred_binary))
print(f1_scoring(y_matched_test, y_pred_iou))

print("Zero Shot Model")
y_orig_score_binary = (y_orig_score_test > optimal_threshold_o).astype(int)
print(accuracy_score(y_matched_test, y_orig_score_binary ))
print(classification_report(y_matched_test, y_orig_score_binary ))
print(confusion_matrix(y_matched_test, y_orig_score_binary ))
print(f1_scoring(y_matched_test, y_orig_score_test))

In [None]:

def spot_check_annotations(full_train, image_idx = 320, min_agreement = 80, zoom = False):
    patch_df = (full_train[(full_train['image_id'] == full_train['image_id'].iloc[image_idx])
                          & (full_train['percent_agreement'] >= min_agreement)])
    category = np.array(patch_df['name'])
    mask_geometry = patch_df['mask_geometry'].iloc[0]
    category_polygons = patch_df['averaged_polygons']
    image_path = patch_df['image_path'].iloc[0]
    display_image_with_mask(image_path, mask_geometry, category_polygons, category, zoom = zoom)

def convert_polygon_to_bbox(polygon):
    minx, miny, maxx, maxy = polygon.bounds
    width = maxx - minx
    height = maxy - miny
    return minx, miny, width, height


def display_image_with_mask(image_path, mask_geometry, category_polygons, category, zoom=False):
    # Load the image
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(10, 10))
    
    # Display the image
    ax.imshow(image)
    

    if isinstance(category_polygons, Polygon):
        category_polygons= [category_polygons]


    if not zoom: # Display the mask
        x, y, width, height = convert_polygon_to_bbox(mask_geometry)
        patch = patches.Rectangle((x,y),width,height, edgecolor='green', facecolor='None',linewidth=2)
        ax.add_patch(patch)
    
    # Display the averaged polygons
    for cnt,polygon in enumerate(category_polygons):
        x, y, width, height = convert_polygon_to_bbox(polygon)
        if category[cnt] == 'AC Unit':
            color = 'green'
        elif category[cnt] == 'AC leaking':
            color = 'red'
        else:
            ValueError('Category not recognized')
            
        patch = patches.Rectangle((x,y),width,height, edgecolor=color, facecolor='None',linewidth=2)
        ax.add_patch(patch)
    
    if zoom:
        # Set the limits of the plot to the extent of the mask
        minx, miny, maxx, maxy = mask_geometry.bounds
        ax.set_xlim(minx, maxx)
        ax.set_ylim(miny, maxy)
        ax.grid(False)
    # Show the plot
    plt.show()


df = zero_shot_df.copy(deep=True)
df = df.iloc[test_idx,:]
df['new_scores'] = y_pred_iou
df['new_matched'] = (df['new_scores'] > optimal_threshold).astype(int)
df['orig_scores'] = y_orig_score_test
df['orig_matched'] = (df['orig_scores'] > optimal_threshold_o).astype(int)


image_idxs = [test_idx_image[0]]


for image_idx in image_idxs:
    patch_df = df[(df["image_idx"] == image_idx)].reset_index(drop=True)
    
    image = ds[image_idx]['image']

    
    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(10, 10))
    
    new_patch = patch_df[patch_df['new_matched'] == 1]
    for idx in range(len(new_patch)):
        x1, y1, x2, y2 = new_patch['model_boxes'].iloc[idx]
        patch = patches.Rectangle((x1,y1),x2-x1,y2-y1, edgecolor='yellow', facecolor='None',linewidth=4)
        ax.add_patch(patch)

    
    # Display the image
    ax.imshow(image)
    
        # Create a figure and axis
    fig, ax = plt.subplots(figsize=(10, 10))
    
    for idx in range(len(patch_df)):
        x1, y1, x2, y2 = patch_df['human_boxes'].iloc[idx]
        patch = patches.Rectangle((x1,y1),x2-x1,y2-y1, edgecolor='yellow', facecolor='None',linewidth=4)
        ax.add_patch(patch)

    
    # Display the image
    ax.imshow(image)
    
    # Create a figure and axis
    fig, ax = plt.subplots(figsize=(10, 10))
    
    old_patch = patch_df[patch_df['orig_matched'] == 1]
    for idx in range(len(old_patch)):
        x1, y1, x2, y2 = old_patch['model_boxes'].iloc[idx]
        patch2 = patches.Rectangle((x1,y1),x2-x1,y2-y1, edgecolor='yellow', facecolor='None',linewidth=4)
        ax.add_patch(patch2)
    
    ax.imshow(image)