In [131]:
import os
import pandas as pd
import numpy as np
from pathlib import Path
from tqdm import tqdm
import cv2

In [132]:
def read_labels(label_path):
    columns = ['class', 'x_center', 'y_center', 'width', 'height', 'confidence']
    labels_df = pd.read_csv(label_path, sep=' ', names=columns)
    return labels_df

In [133]:
def iou(box1, box2):
    x1_max, y1_max = max(box1[0], box2[0]), max(box1[1], box2[1])
    x2_min, y2_min = min(box1[2], box2[2]), min(box1[3], box2[3])

    inter_area = max(0, x2_min - x1_max) * max(0, y2_min - y1_max)
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union_area = box1_area + box2_area - inter_area

    if union_area == 0:
        return 0.0
    
    return inter_area / union_area

In [134]:
# Function to normalize bounding boxes
def normalize_boxes(boxes, img_width, img_height):
    normalized_boxes = []
    for box in boxes:
        xmin, ymin, xmax, ymax = box
        x_center = (xmin + xmax) / 2 / img_width
        y_center = (ymin + ymax) / 2 / img_height
        width = (xmax - xmin) / img_width
        height = (ymax - ymin) / img_height
        normalized_boxes.append([x_center, y_center, width, height])
    return normalized_boxes

In [None]:
# NMS (keeping the higher-confidence box)
def nms(visible_results, infrared_results, iou_threshold=0.5):
    # Filter out empty dataframes
    if visible_results.empty and infrared_results.empty:
        return [], [], []
    elif visible_results.empty:
        combined_results = infrared_results
    elif infrared_results.empty:
        combined_results = visible_results
    else:
        combined_results = pd.concat([visible_results, infrared_results])
    
    # Sort by confidence in descending order
    combined_results = combined_results.sort_values(by='confidence', ascending=False).reset_index(drop=True)
    
    final_boxes = []
    final_scores = []
    final_classes = []

    for _, row in combined_results.iterrows():
        box = [row['xmin'], row['ymin'], row['xmax'], row['ymax']]
        confidence = row['confidence']
        cls = row['class']

        if len(final_boxes) == 0:
            final_boxes.append(box)
            final_scores.append(confidence)
            final_classes.append(cls)
            continue

        # Compute IoUs with existing boxes
        ious = np.array([iou(box, b) for b in final_boxes])
        
        # Apply NMS
        if np.max(ious) <= iou_threshold:
            final_boxes.append(box)
            final_scores.append(confidence)
            final_classes.append(cls)
        else:
            # Retain the box with the higher confidence
            max_iou_index = np.argmax(ious)
            if confidence > final_scores[max_iou_index]:
                final_boxes[max_iou_index] = box
                final_scores[max_iou_index] = confidence
                final_classes[max_iou_index] = cls

    return final_boxes, final_scores, final_classes

In [136]:
# Paths to the label folders
visible_label_folder = 'C:/Users/letri/Documents/project/single_modality/yolov5/runs/val/iou45conf25/vis_best/labels/'
infrared_label_folder = 'C:/Users/letri/Documents/project/single_modality/yolov5/runs/val/iou45conf25/ir_best/labels/'

#visible_label_folder = '/Users/letriluan/Desktop/project/yolov5/runs/val/iou45conf25/vis_best/labels/'
#infrared_label_folder = '/Users/letriluan/Desktop/project/yolov5/runs/val/iou45conf25/ir_best/labels/'

labels_output_folder = 'C:/Users/letri/Documents/project/late_fusion_new/nms/epochbest/labels'
images_output_folder = 'C:/Users/letri/Documents/project/late_fusion_new/nms/epochbest/images'

In [137]:
# Create output folders if they don't exist
Path(labels_output_folder).mkdir(parents=True, exist_ok=True)
Path(images_output_folder).mkdir(parents=True, exist_ok=True)

In [138]:
# Process each pair of labels in the folders
for visible_label_name in tqdm(os.listdir(visible_label_folder), desc="Processing labels"):
    visible_label_path = os.path.join(visible_label_folder, visible_label_name)
    infrared_label_path = os.path.join(infrared_label_folder, visible_label_name)

    if not os.path.exists(infrared_label_path):
        print(f"Infrared label for {visible_label_name} not found, skipping...")
        continue
    
    visible_results_df = read_labels(visible_label_path)
    infrared_results_df = read_labels(infrared_label_path)
    visible_image_path = os.path.join('C:/Users/letri/Documents/project/single_modality/data/Vis/images/test', visible_label_name.replace('.txt', '.png'))
    image = cv2.imread(visible_image_path)
    
    if image is None:
        continue

    img_height, img_width = image.shape[0], image.shape[1]  

    visible_results_df['xmin'] = (visible_results_df['x_center'] - visible_results_df['width'] / 2) * img_width
    visible_results_df['ymin'] = (visible_results_df['y_center'] - visible_results_df['height'] / 2) * img_height
    visible_results_df['xmax'] = (visible_results_df['x_center'] + visible_results_df['width'] / 2) * img_width
    visible_results_df['ymax'] = (visible_results_df['y_center'] + visible_results_df['height'] / 2) * img_height

    infrared_results_df['xmin'] = (infrared_results_df['x_center'] - infrared_results_df['width'] / 2) * img_width
    infrared_results_df['ymin'] = (infrared_results_df['y_center'] - infrared_results_df['height'] / 2) * img_height
    infrared_results_df['xmax'] = (infrared_results_df['x_center'] + infrared_results_df['width'] / 2) * img_width
    infrared_results_df['ymax'] = (infrared_results_df['y_center'] + infrared_results_df['height'] / 2) * img_height

    # Fuse the results using ProEnD
    fused_boxes, fused_scores, fused_classes = nms(visible_results_df, infrared_results_df)
    normalized_boxes = normalize_boxes(fused_boxes, img_width, img_height)
    combined_results = sorted(zip(normalized_boxes, fused_scores, fused_classes), key=lambda x: x[0][0])

    prediction_file_path = os.path.join(labels_output_folder, visible_label_name)
    with open(prediction_file_path, 'w') as f:
        for box, score, cls in combined_results:
            f.write(f'{cls} {box[0]} {box[1]} {box[2]} {box[3]} {score}\n')

    
    for box, confidence, cls in zip(fused_boxes, fused_scores, fused_classes):
        xmin, ymin, xmax, ymax = map(int, box)
        cv2.rectangle(image, (xmin, ymin), (xmax, ymax), (0, 255, 0), 2)
        label_text = f'{cls} {confidence:.2f}'
        cv2.putText(image, label_text, (xmin, ymin - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)

    # Save the result image
    output_image_path = os.path.join(images_output_folder, visible_label_name.replace('.txt', '.png'))
    cv2.imwrite(output_image_path, image)

print("Processing complete.")


Processing labels:  79%|███████▉  | 245/310 [00:13<00:02, 23.80it/s]

Infrared label for 04107.txt not found, skipping...
Infrared label for 04110.txt not found, skipping...
Infrared label for 04111.txt not found, skipping...


Processing labels: 100%|██████████| 310/310 [00:17<00:00, 17.96it/s]

Processing complete.





In [139]:
import numpy as np
import os
import re
def load_data(file_path):
    data = []
    with open(file_path, 'r') as file:
        for line in file:
            split_values = re.split(r'\s+|,', line.strip())
            try:
                data.append([float(x) for x in split_values])
            except ValueError:
                print(f"Warning: Could not convert line to float values - '{line.strip()}'")
    return np.array(data)

def compute_iou(box1, box2):
    x1, y1, w1, h1 = box1
    x2, y2, w2, h2 = box2

    xi1 = max(x1 - w1 / 2, x2 - w2 / 2)
    yi1 = max(y1 - h1 / 2, y2 - h2 / 2)
    xi2 = min(x1 + w1 / 2, x2 + w2 / 2)
    yi2 = min(y1 + h1 / 2, y2 + h2 / 2)
    
    inter_area = max(0, xi2 - xi1) * max(0, yi2 - yi1)
    box1_area = w1 * h1
    box2_area = w2 * h2
    union_area = box1_area + box2_area - inter_area
    
    return inter_area / (union_area + 1e-16)

def compute_ap(recall, precision):
    mrec = np.concatenate(([0.0], recall, [1.0]))
    mpre = np.concatenate(([1.0], precision, [0.0]))

    mpre = np.flip(np.maximum.accumulate(np.flip(mpre)))

    x = np.linspace(0, 1, 101)
    ap = np.trapz(np.interp(x, mrec, mpre), x)
    
    return ap, mpre, mrec

def smooth(y, f=0.05):
    nf = round(len(y) * f * 2) // 2 + 1
    p = np.ones(nf // 2)
    yp = np.concatenate((p * y[0], y, p * y[-1]), 0)
    return np.convolve(yp, np.ones(nf) / nf, mode="valid")

def ap_per_class(tp, conf, pred_cls, target_cls, plot=False, save_dir=".", names=(), eps=1e-16, prefix=""):
    i = np.argsort(-conf)
    tp, conf, pred_cls = tp[i], conf[i], pred_cls[i]
    
    unique_classes, nt = np.unique(target_cls, return_counts=True)
    nc = unique_classes.shape[0]
    px, py = np.linspace(0, 1, 1000), []
    ap, p, r = np.zeros((nc, tp.shape[1])), np.zeros((nc, 1000)), np.zeros((nc, 1000))
    
    for ci, c in enumerate(unique_classes):
        i = pred_cls == c
        n_l = nt[ci]
        n_p = i.sum()
        if n_p == 0 or n_l == 0:
            continue
        fpc = (1 - tp[i]).cumsum(0)
        tpc = tp[i].cumsum(0)
        recall = tpc / (n_l + eps)
        r[ci] = np.interp(-px, -conf[i], recall[:, 0], left=0)
        precision = tpc / (tpc + fpc)
        p[ci] = np.interp(-px, -conf[i], precision[:, 0], left=1)
        for j in range(tp.shape[1]):
            ap[ci, j], mpre, mrec = compute_ap(recall[:, j], precision[:, j])
            if plot and j == 0:
                py.append(np.interp(px, mrec, mpre))

    f1 = 2 * p * r / (p + r + eps)

    i = smooth(f1.mean(0), 0.1).argmax()
    p, r, f1 = p[:, i], r[:, i], f1[:, i]
    tp = (r * nt).round()
    fp = (tp / (p + eps) - tp).round()
    return tp, fp, p, r, f1, ap, unique_classes.astype(int)

In [140]:
true_dir = 'C:/Users/letri/Documents/project/single_modality/data/Vis/labels/test'
predict_dir = 'C:/Users/letri/Documents/project/late_fusion_new/nms/epochbest/labels'

# List files in directories
true_files = set(os.listdir(true_dir))
predict_files = set(os.listdir(predict_dir))

# Find common files in both directories
common_files = true_files.intersection(predict_files)

iou_threshold = 0.5
tp_list = []
conf_list = []
pred_cls_list = []
target_cls_list = []

for file_name in true_files:
    ground_truths = load_data(os.path.join(true_dir, file_name))
    predictions = load_data(os.path.join(predict_dir, file_name)) if file_name in predict_files else []

    tp = []
    conf = []
    pred_cls = []
    target_cls = []
    matched_gt = set()

    for pred in predictions:
        pred_class = pred[0]
        pred_box = pred[1:5]
        pred_conf = pred[5]

        best_iou = 0
        best_gt_index = -1

        for gt_index, gt in enumerate(ground_truths):
            gt_class = gt[0]
            gt_box = gt[1:5]

            if pred_class == gt_class:
                iou = compute_iou(pred_box, gt_box)
                if iou > best_iou:
                    best_iou = iou
                    best_gt_index = gt_index

        if best_iou > iou_threshold and best_gt_index not in matched_gt:
            tp.append(1)
            matched_gt.add(best_gt_index)
        else:
            tp.append(0)

        conf.append(pred_conf)
        pred_cls.append(pred_class)

    # Append unmatched ground truths as false negatives
    for gt in ground_truths:
        target_cls.append(gt[0])

    tp_list.extend(tp)
    conf_list.extend(conf)
    pred_cls_list.extend(pred_cls)
    target_cls_list.extend(target_cls)

tp = np.array(tp_list).reshape(-1, 1)
conf = np.array(conf_list)
pred_cls = np.array(pred_cls_list)
target_cls = np.array(target_cls_list)

tp, fp, p, r, f1, ap, unique_classes = ap_per_class(tp, conf, pred_cls, target_cls)
print(f"True Positives: {tp}")
print(f"False Positives: {fp}")
print(f"Precision: {p}")
print(f"Recall: {r}")
print(f"F1 Score: {f1}")
print(f"Average Precision (AP): {ap}")
print(f"Overall mAP: {ap.mean()}")

True Positives: [571. 655.]
False Positives: [30. 93.]
Precision: [0.95049571 0.87515192]
Recall: [0.72461929 0.95620438]
F1 Score: [0.82232865 0.91388453]
Average Precision (AP): [[0.85727117]
 [0.97154696]]
Overall mAP: 0.91440906434008
