In [None]:
import base64
import requests
import os
import time

# OpenAI API Key
api_key = "**************"

# Function to encode the image
def encode_image(image_path):
    with open(image_path, "rb") as image_file:
        return base64.b64encode(image_file.read()).decode('utf-8')

# Path to your image
input_dir = "PPE-detection-no-ppe-v2/test/images"
output_dir = "output/gpt4o/working/test_set"
os.makedirs(output_dir, exist_ok=True)
i = 0
for file_name in os.listdir(input_dir):
    i+=1
    filepath = os.path.join(input_dir, file_name)
    print(f'elaboro {file_name} elemento numero {i}')
    base64_image = encode_image(filepath)
    headers = {
      "Content-Type": "application/json",
      "Authorization": f"Bearer {api_key}"
    }
    payload = {
      "model": "gpt-4o",
      "messages": [
        {
          "role": "user",
          "content": [
            {
              "type": "text",
              "text": ("Are there PPEs in the scene? "
                    "Are there blue, orange or yellow high visibility vests in the scene? "
                    "Are there gloves? "
                    "Are there security boots? "
                    "Are there hard hats? "
                    "Count people who doesn't wear PPE in the picture, I'll make you an example:\n"
                    "    - People: {number of people}\n"
                    "    - no-Hard_hats: {number of Hard_hats}\n"
                    "    - no-Gloves: {number of Gloves}\n"
                    "    - no-Reflective_vests: {number of Reflective_vests}\n"
                    "    - no-Security_boots: {number of Security_boots}\n")
            },
            {
              "type": "image_url",
              "image_url": {
                "url": f"data:image/jpeg;base64,{base64_image}"
              }
            }
          ]
        }
      ],
      "max_tokens": 500
    }
    start = time.time()
    response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
    elapsed_time = time.time() - start
    # Extracting and printing only the response content
    response_json = response.json()
    message_content = response_json['choices'][0]['message']['content']
    #print(f'file: {filepath}, message_content:\n {message_content}\n')

    output_file_path = os.path.join(output_dir, f"{os.path.splitext(file_name)[0]}.txt")
    with open(output_file_path, "w") as output_file:
        output_file.write(f"Image filename: {file_name}\n{message_content}\n")
        output_file.write(f"Processing time: {elapsed_time:.4f} seconds\n")

elaboro 000003_jpg.rf.d40443db19285b9a9946c11744788b4e.jpg elemento numero 1


elaboro 000007_jpg.rf.b0dc72be378912149acb5cdff9467226.jpg elemento numero 2
elaboro 000011_jpg.rf.49004c4398611bd1bb489d5e00bef241.jpg elemento numero 3
elaboro 000038_jpg.rf.1130ff13d1eae8edfe3ac8062e85fcbc.jpg elemento numero 4
elaboro 000047_jpg.rf.d84b4eef009d0b8940346cbfcb27aa37.jpg elemento numero 5
elaboro 000059_jpg.rf.78dbc7b805060ce84bb5abe7390ea2ea.jpg elemento numero 6
elaboro 000074_jpg.rf.a08d3e43f1e2ecf69a91575b31ef6350.jpg elemento numero 7
elaboro 000080_jpg.rf.f71b3ca0d53611a5cafdadf815bf08f7.jpg elemento numero 8
elaboro 000087_jpg.rf.ec9cc9dbbca8ace5ba6780a12d22842c.jpg elemento numero 9
elaboro 000099_jpg.rf.5cfab9af85fab2f10af5bc6dba43a1a7.jpg elemento numero 10
elaboro 000112_jpg.rf.8720f21b094b332ca0d232845c78bebb.jpg elemento numero 11
elaboro 000139_jpg.rf.3adf28e0ab1eb59d719f487b7455fcbe.jpg elemento numero 12
elaboro 000159_jpg.rf.14e4f61408d57cf1796e52b134937587.jpg elemento numero 13
elaboro 000163_jpg.rf.93fc2da24e45edda4a1c82ae462b9ac5.jpg elemento nume

In [None]:
mapping = [
    ("Worker", "People:"),
    ("no-vest", "no-Reflective_vests:"),
    ("no-shoes", "no-Security_boots:"),
    ("no-gloves", "no-Gloves:"),
    ("no-helmet", "no-Hard_hats:"),
]

def calculate_metrics(ground_truth, predictions, mapping):
    metrics = {"TP": 0, "FP": 0, "FN": 0}
    for class_name, model_key in mapping:
        if model_key is None:
            pred_count = 0  # No prediction for this class
        else:
            pred_count = predictions.get(model_key, 0)
        true_count = ground_truth.get(class_name, 0)

        if pred_count == true_count:
            metrics["TP"] += true_count
        elif true_count > pred_count:
            metrics["TP"] += pred_count
            metrics["FN"] += (true_count - pred_count)
        else:
            metrics["TP"] += true_count
            metrics["FP"] += (pred_count - true_count)

    return metrics

def calculate_metrics_per_DPI(ground_truth, predictions, mapping, dpi):
    metrics = {"TP": 0, "FP": 0, "FN": 0}
    for class_name, model_key in mapping:
        if model_key is None or dpi != model_key.strip(":"):
            continue  # Skip if the current class does not match the DPI we're focusing on

        pred_count = predictions.get(model_key, 0)
        true_count = ground_truth.get(class_name, 0)

        if pred_count == true_count:
            metrics["TP"] += true_count
        elif true_count > pred_count:
            metrics["TP"] += pred_count
            metrics["FN"] += (true_count - pred_count)
        else:
            metrics["TP"] += true_count
            metrics["FP"] += (pred_count - true_count)

    return metrics



def read_ground_truth(file_path):
    ground_truth = {}
    with open(file_path, 'r') as f:
        for line in f:
            elements = line.split()
            class_id = int(elements[0])
            if class_id == 2:
                ground_truth["no-gloves"] = ground_truth.get("no-gloves", 0) + 1
            elif class_id == 3:
                ground_truth["no-helmet"] = ground_truth.get("no-helmet", 0) + 1
            elif class_id == 6:
                ground_truth["Worker"] = ground_truth.get("Worker", 0) + 1
            elif class_id == 4:
                ground_truth["no-shoes"] = ground_truth.get("no-shoes", 0) + 1
            elif class_id == 5:
                ground_truth["no-vest"] = ground_truth.get("no-vest", 0) + 1
    return ground_truth


def extract_json_from_file(file_path):
    result = {}
    found = False
    with open(file_path, 'r') as file:
        lines = file.readlines()
        for line in lines:
            line = line.lstrip()
            if line.startswith('-'):
                DPI = ''
                found = True
                words = line.split()
                flag = True
                for word in words:
                    if not flag:
                        break
                    if word == 'People:' or word == 'no-Gloves:' or word == 'no-Security_boots:' or word == 'no-Reflective_vests:' or word == 'no-Hard_hats:':
                        DPI = word
                        found = True
                    if word.isdigit():
                        result[DPI] = result.get(DPI, 0) + int(word)
                        flag = False
                    if word == 'Yes':
                        result[DPI] = result.get(DPI, 0) + 1
                        flag = False
                    if word == 'No' or word.lower() == 'not':
                        result[DPI] = result.get(DPI, 0)
                        flag = False
        if not found:
            result['People:'] = result.get('People:', 0)
            result['no-Gloves:'] = result.get('no-Gloves:', 0)
            result['no-Security_boots:'] = result.get('no-Security_boots:', 0)
            result['no-Reflective_vests:'] = result.get('no-Reflective_vests:', 0)
            result['no-Hard_hats:'] = result.get('no-Hard_hats:', 0)
    #print(f'result: {result}')
    return result



def filter_greater_than_zero(data):
    return {key: value for key, value in data.items() if value > 0}

def calculate_classification_metrics(metrics):
    TP = metrics["TP"]
    FP = metrics["FP"]
    FN = metrics["FN"]

    accuracy = TP / (TP + FP + FN) if (TP + FP + FN) > 0 else 0
    precision = TP / (TP + FP) if (TP + FP) > 0 else 0
    recall = TP / (TP + FN) if (TP + FN) > 0 else 0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1_score": f1_score
    }

def save_metrics_to_file(metrics, output_folder, file_name):
    output_path = os.path.join(output_folder, f"{file_name}_metrics.json")
    with open(output_path, 'w') as f:
        json.dump(metrics, f, indent=4)
    #print(f"Metrics saved to {output_path}")

def calculate_final_metrics(all_metrics):
    total_accuracy = 0
    total_precision = 0
    total_recall = 0
    total_f1_score = 0
    n = len(all_metrics)
    # Somma i valori delle metriche per ciascun file
    for metric in all_metrics:
        total_accuracy += metric['accuracy']
        total_precision += metric['precision']
        total_recall += metric['recall']
        total_f1_score += metric['f1_score']


    # Calcola la media per ciascuna metrica
    final_metrics = {
        "final_accuracy": total_accuracy / n,
        "final_precision": total_precision / n,
        "final_recall": total_recall / n,
        "final_f1_score": total_f1_score / n
    }
    return final_metrics

In [None]:
import json
import os

ground_truth_folder = "PPE-detection-no-ppe-v2/test/labels"
predictions_folder = "output/gpt4o/working/test_set"
output_folder = "output/gpt4o/working/metrics_output"
output_folder_final = "output/gpt4o/working/metrics_output_final"
os.makedirs(output_folder, exist_ok=True)
os.makedirs(output_folder_final, exist_ok=True)
DPI = ['People', 'no-Hard_hats', 'no-Gloves', 'no-Reflective_vests', 'no-Security_boots']

gt_files = sorted(os.listdir(ground_truth_folder))
pred_files = sorted(os.listdir(predictions_folder))
all_metrics = []
for gt_file, pred_file in zip(gt_files, pred_files):
    #print(gt_file, pred_file)
    base_filename = os.path.splitext(pred_file)[0]
    #print(base_filename)
    gt_results = read_ground_truth(os.path.join(ground_truth_folder,gt_file))
    #print(f'gt: {gt_results}')
    data = extract_json_from_file(os.path.join(predictions_folder, pred_file))
    prediction_results = filter_greater_than_zero(data)
    #print(f'prediction: {prediction_results}\n')
    metrics = calculate_metrics(gt_results, prediction_results, mapping)
    final_metrics_per_image = calculate_classification_metrics(metrics)
    all_metrics.append(final_metrics_per_image)
final = calculate_final_metrics(all_metrics)
print(f'final metric: {final}')
save_metrics_to_file(final, output_folder_final, "final")

#########################################################################################################
final_ppes_metrics = {
    'People': {
        'Accuracy': 0,
        'Precision': 0,
        'Recall': 0,
        'F1-Score': 0
    },
    'no-Hard_hats': {
        'Accuracy': 0,
        'Precision': 0,
        'Recall': 0,
        'F1-Score': 0
    },
    'no-Gloves': {
        'Accuracy': 0,
        'Precision': 0,
        'Recall': 0,
        'F1-Score': 0
    },
    'no-Reflective_vests': {
        'Accuracy': 0,
        'Precision': 0,
        'Recall': 0,
        'F1-Score': 0
    },
    'no-Security_boots': {
        'Accuracy': 0,
        'Precision': 0,
        'Recall': 0,
        'F1-Score': 0
    },
}
# Second part of the code to calculate per DPI metrics
gt_files = sorted(os.listdir(ground_truth_folder))
pred_files = sorted(os.listdir(predictions_folder))

# Iterate over each DPI type
for dpi in DPI:
    all_metrics_per_dpi = []  # Initialize a list to collect metrics for each DPI
    for gt_file, pred_file in zip(gt_files, pred_files):
        base_filename = os.path.splitext(pred_file)[0]

        # Read ground truth and predictions for the current image
        gt_results = read_ground_truth(os.path.join(ground_truth_folder, gt_file))
        data = extract_json_from_file(os.path.join(predictions_folder, pred_file))

        # Filter predictions with counts greater than zero
        prediction_results = filter_greater_than_zero(data)

        # Calculate per-image metrics for the current DPI
        metrics = calculate_metrics_per_DPI(gt_results, prediction_results, mapping, dpi)
        final_metrics_per_image = calculate_classification_metrics(metrics)
        all_metrics_per_dpi.append(final_metrics_per_image)

    # Calculate final averaged metrics for the current DPI
    final_dpi_metrics = calculate_final_metrics(all_metrics_per_dpi)
    print(f'final metric {dpi}: {final_dpi_metrics}')

    # Update the final PPE metrics dictionary with calculated values
    final_ppes_metrics[dpi]['Accuracy'] = final_dpi_metrics['final_accuracy']
    final_ppes_metrics[dpi]['Precision'] = final_dpi_metrics['final_precision']
    final_ppes_metrics[dpi]['Recall'] = final_dpi_metrics['final_recall']
    final_ppes_metrics[dpi]['F1-Score'] = final_dpi_metrics['final_f1_score']

# Save final PPEs metrics to file
output_file_path = os.path.join(output_folder_final, "final_ppes_metrics.json")
with open(output_file_path, 'w') as f:
    json.dump(final_ppes_metrics, f, indent=4)

final metric: {'final_accuracy': 0.6551302059058779, 'final_precision': 0.747347585968294, 'final_recall': 0.8260635905401282, 'final_f1_score': 0.7651634608511553}
final metric People: {'final_accuracy': 0.8994594539977463, 'final_precision': 0.9274818780023135, 'final_recall': 0.9167827708006279, 'final_f1_score': 0.9181150809173393}
final metric no-Hard_hats: {'final_accuracy': 0.2632640177283035, 'final_precision': 0.27847351061636777, 'final_recall': 0.2769982993197279, 'final_f1_score': 0.2734185010259554}
final metric no-Gloves: {'final_accuracy': 0.46962631895309515, 'final_precision': 0.5906710119945414, 'final_recall': 0.5770072550105018, 'final_f1_score': 0.5459126223570253}
final metric no-Reflective_vests: {'final_accuracy': 0.6199471283003558, 'final_precision': 0.6378461439672287, 'final_recall': 0.6476853999175428, 'final_f1_score': 0.6378215641240917}
final metric no-Security_boots: {'final_accuracy': 0.17826505337248263, 'final_precision': 0.2473249423736436, 'final_r