In [2]:
import os
import json

In [180]:
dataset_name = "mvtec_ad_transistor"

dataset_coco = f"/home/nisyad/projects/industrial-defect-detection/datasets/processed/{dataset_name}/test.json"

# Zero-shot responses
responses = f"/home/nisyad/projects/industrial-defect-detection/{dataset_name}_responses.json"

# 1-shot In-context responses
responses = f"/home/nisyad/projects/industrial-defect-detection/{dataset_name}_responses_in_context.json"

In [181]:
with open(dataset_coco, "r") as f:
    coco = json.load(f)
    
categories = coco["categories"]
categories


[{'id': 1, 'name': 'good'},
 {'id': 2, 'name': 'broken_or_misplaced lead'},
 {'id': 3, 'name': 'misplaced transistor'},
 {'id': 4, 'name': 'damaged casing'}]

In [182]:
category_name_to_class_id = {d["name"] : d["id"] -1 for d in categories}
category_name_to_class_id

{'good': 0,
 'broken_or_misplaced lead': 1,
 'misplaced transistor': 2,
 'damaged casing': 3}

In [183]:
with open(responses, "r") as f:
    results = json.load(f)
    
results[:3]

[{'sample_idx': 0,
  'response': '```json\n{\n  "is_defective": "yes",\n  "reason": "The left lead of the transistor is deformed and not properly aligned compared to the non-defective transistor.",\n  "defect_type": "broken_or_misplaced lead",\n  "bounding_box": "[0.2, 0.7, 0.35, 0.95]"\n}\n```',
  'target': [[0.0, -1.0, -1.0, -1.0, -1.0]],
  'parsed_response': {'is_defective': 'yes',
   'reason': 'The left lead of the transistor is deformed and not properly aligned compared to the non-defective transistor.',
   'defect_type': 'broken_or_misplaced lead',
   'bounding_box': '[0.2, 0.7, 0.35, 0.95]'}},
 {'sample_idx': 1,
  'response': '{\n"is_defective": "yes",\n"reason": "The left lead of the transistor appears to be broken or misplaced compared to the non-defective transistor.",\n"defect_type": "broken_or_misplaced_lead",\n"bounding_box": "[0.25, 0.7, 0.35, 1.0]"\n}',
  'target': [[0.0, -1.0, -1.0, -1.0, -1.0]],
  'parsed_response': {'is_defective': 'yes',
   'reason': 'The left lead o

In [184]:
is_defective_types = set()
is_defective_count = {"yes": 0, "no": 0, "other": 0}

for result in results:
    is_defective_type = result["parsed_response"]["is_defective"]
    if is_defective_type not in is_defective_count:
        is_defective_count["other"] += 1
    else:
        is_defective_count[is_defective_type] += 1
    is_defective_types.add(is_defective_type)
    

is_defective_types
print(is_defective_count)

{'yes': 94, 'no': 6, 'other': 0}


In [185]:
defect_types = set()

for result in results:
    defect_type = result["parsed_response"]["defect_type"]
    defect_types.add(defect_type)

defect_types

{'',
 'broken_or_misplaced lead',
 'broken_or_misplaced_lead',
 'damaged casing',
 'misplaced transistor'}

In [186]:
defect_types = list(defect_types)
defect_types

['',
 'damaged casing',
 'broken_or_misplaced lead',
 'misplaced transistor',
 'broken_or_misplaced_lead']

In [187]:
# Split multiple defect types
defect_types = [defect_type.split(", ") for defect_type in defect_types]
defect_types

[[''],
 ['damaged casing'],
 ['broken_or_misplaced lead'],
 ['misplaced transistor'],
 ['broken_or_misplaced_lead']]

In [188]:
def calculate_iou(box1, box2):
    """
    Calculate the Intersection over Union (IoU) of two bounding boxes.

    Parameters:
    - box1, box2: Lists of 4 coordinates [x_top, y_top, x_bottom, y_bottom].

    Returns:
    - IoU as a float value.
    """
    # Unpack the coordinates
    x1_top, y1_top, x1_bottom, y1_bottom = box1
    x2_top, y2_top, x2_bottom, y2_bottom = box2

    # Calculate the (x, y)-coordinates of the intersection rectangle
    x_inter_top = max(x1_top, x2_top)
    y_inter_top = max(y1_top, y2_top)
    x_inter_bottom = min(x1_bottom, x2_bottom)
    y_inter_bottom = min(y1_bottom, y2_bottom)

    # Compute the area of intersection rectangle
    inter_area = max(0, x_inter_bottom - x_inter_top) * max(0, y_inter_bottom - y_inter_top)

    # Compute the area of both the prediction and ground-truth rectangles
    box1_area = (x1_bottom - x1_top) * (y1_bottom - y1_top)
    box2_area = (x2_bottom - x2_top) * (y2_bottom - y2_top)

    # Compute the intersection over union by taking the intersection
    # area and dividing it by the sum of prediction + ground-truth
    # areas - the interesection area
    iou = inter_area / float(box1_area + box2_area - inter_area)

    return iou

In [189]:
# Calculate the mean IoU

total_iou = 0

for result in results:
    is_defective = result["parsed_response"]["is_defective"]
    target = result["target"][0][0]
     
    if (is_defective == "yes" and target != 0):
        tgt_bbox = result["target"][0][1:]
        pred_bbox = result["parsed_response"]["bounding_box"]
        
        try:
            pred_bbox = json.loads(pred_bbox) if not isinstance(pred_bbox, list) else pred_bbox
        except:
            pred_bbox = [0, 0, 0, 0]
        iou = calculate_iou(tgt_bbox, pred_bbox)
        total_iou += iou
        
    if (is_defective == "no" and target == 0):
        total_iou += 1
    
mean_iou = total_iou / len(results)
print(mean_iou)

0.1418978594440779


In [190]:
# transistor

def get_binarized_scores(responses, num_classes):
    targets = []
    preds = []
    for item in responses:
        response = item["parsed_response"]
        target_class_id = int(item["target"][0][0])
        targets.append((target_class_id))
        
        if response["is_defective"] == "no":
            preds.append(0)
            continue
        elif response["is_defective"] == "yes":
            pred_class = response["defect_type"]
            if pred_class == "broken_or_misplaced_lead":
                pred_class = "broken_or_misplaced lead"
            
            pred_class_id = category_name_to_class_id[pred_class]
            preds.append(pred_class_id)
        
    return preds, targets
            
    

preds, targets = get_binarized_scores(results, num_classes=4)
print(len(preds))
print(len(targets))
    
print(max(preds), min(preds))
print(max(targets), min(targets))

100
100
3 0
3 0


In [191]:
# # screw

# def get_binarized_scores(responses, num_classes):
#     targets = []
#     preds = []
#     for item in responses:
#         response = item["parsed_response"]
#         target_class_id = int(item["target"][0][0])
#         targets.append((target_class_id))
        
#         if response["is_defective"] == "no":
#             preds.append(0)
#             continue
#         elif response["is_defective"] == "yes":
#             pred_class = response["defect_type"]
            
#             if pred_class == "damaged-thread":
#                 pred_class = "damaged thread"
            
#             if ", " in pred_class:
#                 pred_classes = pred_class.split(", ")
#                 for _class in pred_classes:
#                     pred_class_id = category_name_to_class_id[_class]
#             else:
#                 pred_class_id = category_name_to_class_id[pred_class]        
            
            
#             preds.append(pred_class_id)
        
#     return preds, targets
            
    

# preds, targets = get_binarized_scores(results, num_classes=5)
# print(len(preds))
# print(len(targets))
    
# print(max(preds), min(preds))
# print(max(targets), min(targets))

In [192]:
# # transistor

# def get_binarized_scores(responses, num_classes):
#     targets = []
#     preds = []
#     for item in responses:
#         response = item["parsed_response"]
#         target_class_id = int(item["target"][0][0])
#         targets.append((target_class_id))
        
#         if response["is_defective"] == "no":
#             preds.append(0)
#             continue
#         elif response["is_defective"] == "yes":
#             pred_class = response["defect_type"]
           
            
#             pred_class_id = category_name_to_class_id[pred_class]
#             preds.append(pred_class_id)
        
#     return preds, targets
            
    

# preds, targets = get_binarized_scores(results, num_classes=3)
# print(len(preds))
# print(len(targets))
    
# print(max(preds), min(preds))
# print(max(targets), min(targets))

In [193]:
# # metal plate

# def get_binarized_scores(responses, num_classes):
#     targets = []
#     preds = []
#     for item in responses:
#         response = item["parsed_response"]
#         target_class_id = int(item["target"][0][0])
#         targets.append((target_class_id))
        
#         if response["is_defective"] == "no":
#             preds.append(0)
#             continue
#         elif response["is_defective"] == "yes":
#             pred_class = response["defect_type"]
#             pred_class = pred_class.split(", ")[0]
           
            
#             pred_class_id = category_name_to_class_id[pred_class]
#             preds.append(pred_class_id)
        
#     return preds, targets
            
    

# preds, targets = get_binarized_scores(results, num_classes=3)
# print(len(preds))
# print(len(targets))
    
# print(max(preds), min(preds))
# print(max(targets), min(targets))

In [194]:
from sklearn.metrics import f1_score

In [195]:
f1_score(targets, preds, average="macro")

0.447011322011322