In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/muid-iitr-train-val-test-split/labels.csv
/kaggle/input/muid-iitr-train-val-test-split/split_data/data.yaml
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/hp_32.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/en_43.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/hp_68.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/hp_171.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/lp_52.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/mn_5.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/hp_203.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/hp_41.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/hp_143.txt
/kaggle/input/muid-iitr-train-val-test-split/split_data/validation/labels/ln_30.txt
/kaggle/input/muid-iitr-train-val-te

In [2]:
!pip install torch torchvision opencv-python-headless pandas numpy scipy

Collecting nvidia-cudnn-cu12==9.1.0.70 (from torch)
  Downloading nvidia_cudnn_cu12-9.1.0.70-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cublas-cu12==12.4.5.8 (from torch)
  Downloading nvidia_cublas_cu12-12.4.5.8-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cufft-cu12==11.2.1.3 (from torch)
  Downloading nvidia_cufft_cu12-11.2.1.3-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-curand-cu12==10.3.5.147 (from torch)
  Downloading nvidia_curand_cu12-10.3.5.147-py3-none-manylinux2014_x86_64.whl.metadata (1.5 kB)
Collecting nvidia-cusolver-cu12==11.6.1.9 (from torch)
  Downloading nvidia_cusolver_cu12-11.6.1.9-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-cusparse-cu12==12.3.1.170 (from torch)
  Downloading nvidia_cusparse_cu12-12.3.1.170-py3-none-manylinux2014_x86_64.whl.metadata (1.6 kB)
Collecting nvidia-nvjitlink-cu12==12.4.127 (from torch)
  Downloading nvidia_nvjitlink_cu12-1

In [3]:
#!/usr/bin/env python3
import os
import glob
import shutil
import cv2
import pandas as pd
import numpy as np
import torch
import torchvision
from torchvision.transforms import functional as F
from scipy.optimize import linear_sum_assignment  

if os.path.exists('/kaggle/working'):
    shutil.rmtree('/kaggle/working', ignore_errors=True)

def rename_test_images_and_labels_in_new_folder(src_images_dir, src_labels_dir, dst_images_dir, dst_labels_dir):
    os.makedirs(dst_images_dir, exist_ok=True)
    os.makedirs(dst_labels_dir, exist_ok=True)
    
    image_files = sorted(glob.glob(os.path.join(src_images_dir, '*.jpg')))
    
    for i, image_path in enumerate(image_files, start=1):
        new_image_filename = f"{i}.jpg"
        new_label_filename = f"{i}.txt"
        new_image_path = os.path.join(dst_images_dir, new_image_filename)
        new_label_path = os.path.join(dst_labels_dir, new_label_filename)
        
        shutil.copy(image_path, new_image_path)
        old_base = os.path.splitext(os.path.basename(image_path))[0]
        old_label_path = os.path.join(src_labels_dir, f"{old_base}.txt")
        
        if os.path.exists(old_label_path):
            shutil.copy(old_label_path, new_label_path)
    
    print(f"Copied and renamed {len(image_files)} images")

def read_ground_truth_labels(label_file):
    if not os.path.exists(label_file):
        return []
    
    boxes = []
    with open(label_file, "r") as f:
        for line in f:
            parts = list(map(float, line.split()))
            if len(parts) != 5:
                continue
            _, cx, cy, w, h = parts
            x_min = cx - (w / 2)
            y_min = cy - (h / 2)
            x_max = cx + (w / 2)
            y_max = cy + (h / 2)
            boxes.append([x_min, y_min, x_max, y_max])
    return boxes

def read_prediction_labels(label_file):
    if not os.path.exists(label_file):
        return []
    
    boxes = []
    with open(label_file, "r") as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 6:
                continue
            cls_id = float(parts[0])
            cx = float(parts[1])
            cy = float(parts[2])
            w = float(parts[3])
            h = float(parts[4])
            conf = float(parts[5])
            x_min = cx - (w / 2)
            y_min = cy - (h / 2)
            x_max = cx + (w / 2)
            y_max = cy + (h / 2)
            boxes.append([x_min, y_min, x_max, y_max, conf])
    return boxes

def compute_iou(box1, box2):
    xi1 = max(box1[0], box2[0])
    yi1 = max(box1[1], box2[1])
    xi2 = min(box1[2], box2[2])
    yi2 = min(box1[3], box2[3])
    inter_width = max(0, xi2 - xi1)
    inter_height = max(0, yi2 - yi1)
    inter_area = inter_width * inter_height

    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    union_area = box1_area + box2_area - inter_area
    return inter_area / union_area if union_area > 0 else 0

def process_failure_cases(test_images_dir, ground_truth_dir, predictions_labels_dir, annotated_base_dir, iou_threshold):
    iou_failure_subfolder = os.path.join(annotated_base_dir, "iou_threshold")
    mismatch_failure_subfolder = os.path.join(annotated_base_dir, "label_mismatch")
    os.makedirs(iou_failure_subfolder, exist_ok=True)
    os.makedirs(mismatch_failure_subfolder, exist_ok=True)
    
    failure_data = []
    image_files = sorted(glob.glob(os.path.join(test_images_dir, '*.jpg')))
    
    # Initialize metrics
    total_TP = 0
    total_FP = 0
    total_FN = 0
    
    print(f"Processing {len(image_files)} images")
    
    for image_file in image_files:
        base = os.path.splitext(os.path.basename(image_file))[0]
        gt_file = os.path.join(ground_truth_dir, f"{base}.txt")
        pred_file = os.path.join(predictions_labels_dir, f"{base}.txt")
        
        gt_boxes = read_ground_truth_labels(gt_file)
        pred_boxes = read_prediction_labels(pred_file)
        
        # Calculate metrics
        num_gt = len(gt_boxes)
        num_pred = len(pred_boxes)
        label_mismatch = num_gt != num_pred
        
        if num_gt == 0 and num_pred == 0:
            pass
        elif num_gt == 0:
            total_FP += num_pred
        elif num_pred == 0:
            total_FN += num_gt
        else:
            cost_matrix = np.zeros((num_gt, num_pred))
            for i, gt in enumerate(gt_boxes):
                for j, pred in enumerate(pred_boxes):
                    cost_matrix[i, j] = -compute_iou(gt, pred[:4])
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            
            # Process matches
            for i, j in zip(row_ind, col_ind):
                iou_val = compute_iou(gt_boxes[i], pred_boxes[j][:4])
                if iou_val >= iou_threshold:
                    total_TP += 1
                else:
                    total_FP += 1
                    total_FN += 1
            
            # Process unmatched
            total_FN += num_gt - len(row_ind)
            total_FP += num_pred - len(col_ind)
        
        # Failure case handling
        low_iou = False
        iou_values = []
        if gt_boxes and pred_boxes:
            cost_matrix = np.zeros((len(gt_boxes), len(pred_boxes)))
            for i, gt in enumerate(gt_boxes):
                for j, pred in enumerate(pred_boxes):
                    cost_matrix[i, j] = -compute_iou(gt, pred[:4])
            row_ind, col_ind = linear_sum_assignment(cost_matrix)
            for i, j in zip(row_ind, col_ind):
                iou_val = compute_iou(gt_boxes[i], pred_boxes[j][:4])
                iou_values.append(iou_val)
                if iou_val < iou_threshold:
                    low_iou = True
        
        if label_mismatch or low_iou:
            img = cv2.imread(image_file)
            if img is not None:
                height, width = img.shape[:2]
                # Draw ground truth
                for box in gt_boxes:
                    x1 = int(box[0] * width)
                    y1 = int(box[1] * height)
                    x2 = int(box[2] * width)
                    y2 = int(box[3] * height)
                    cv2.rectangle(img, (x1, y1), (x2, y2), (0, 255, 0), 2)
                # Draw predictions
                for box in pred_boxes:
                    x1 = int(box[0] * width)
                    y1 = int(box[1] * height)
                    x2 = int(box[2] * width)
                    y2 = int(box[3] * height)
                    cv2.rectangle(img, (x1, y1), (x2, y2), (0, 0, 255), 2)
                    cv2.putText(img, f"{box[4]:.2f}", (x1, max(y1-5, 0)),
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
                
                failure_reasons = []
                if label_mismatch:
                    failure_reasons.append("label_mismatch")
                    cv2.imwrite(os.path.join(mismatch_failure_subfolder, os.path.basename(image_file)), img)
                if low_iou:
                    failure_reasons.append("iou_threshold")
                    cv2.imwrite(os.path.join(iou_failure_subfolder, os.path.basename(image_file)), img)
                
                failure_data.append({
                    'image': os.path.basename(image_file),
                    'iou_values': iou_values,
                    'ground_truth_count': num_gt,
                    'predicted_count': num_pred,
                    'iou_threshold': iou_threshold,
                    'failure_folders': ",".join(failure_reasons)
                })

    # Calculate final metrics
    precision = total_TP / (total_TP + total_FP) if (total_TP + total_FP) > 0 else 0
    recall = total_TP / (total_TP + total_FN) if (total_TP + total_FN) > 0 else 0
    f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
    
    # Save metrics to log
    log_path = os.path.join(annotated_base_dir, "log.txt")
    with open(log_path, "w") as f:
        f.write(f"Precision: {precision:.4f}\n")
        f.write(f"Recall: {recall:.4f}\n")
        f.write(f"F1 Score: {f1:.4f}\n")
        f.write(f"True Positives (TP): {total_TP}\n")
        f.write(f"False Positives (FP): {total_FP}\n")
        f.write(f"False Negatives (FN): {total_FN}\n")
    
    # Save failure cases
    df = pd.DataFrame(failure_data)
    df.to_csv(os.path.join(annotated_base_dir, "failure_cases.csv"), index=False)
    print(f"Processing complete. Metrics saved to {log_path}")

def predict_test_images(model_path, source_dir, predictions_output_dir, conf_threshold=0.5, num_classes=2):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=False, num_classes=num_classes)
    checkpoint = torch.load(model_path, map_location=device)
    model.load_state_dict(checkpoint)
    model.to(device)
    model.eval()
    
    os.makedirs(os.path.join(predictions_output_dir, "images"), exist_ok=True)
    os.makedirs(os.path.join(predictions_output_dir, "labels"), exist_ok=True)
    
    image_paths = sorted(glob.glob(os.path.join(source_dir, '*.jpg')))
    
    with torch.no_grad():
        for image_path in image_paths:
            orig_img = cv2.imread(image_path)
            if orig_img is None:
                continue
            img_rgb = cv2.cvtColor(orig_img, cv2.COLOR_BGR2RGB)
            img_tensor = F.to_tensor(img_rgb).to(device)
            
            outputs = model([img_tensor])[0]
            height, width = orig_img.shape[:2]
            predictions = []
            
            for box, score, label in zip(outputs['boxes'], outputs['scores'], outputs['labels']):
                if score < conf_threshold:
                    continue
                x1, y1, x2, y2 = box.cpu().numpy()
                cx = (x1 + x2) / 2 / width
                cy = (y1 + y2) / 2 / height
                w = (x2 - x1) / width
                h = (y2 - y1) / height
                predictions.append([label.item(), cx, cy, w, h, score.item()])
                
                cv2.rectangle(orig_img, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 2)
                cv2.putText(orig_img, f"{score:.2f}", (int(x1), max(int(y1)-5, 0)),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
            
            base_name = os.path.basename(image_path)
            cv2.imwrite(os.path.join(predictions_output_dir, "images", base_name), orig_img)
            with open(os.path.join(predictions_output_dir, "labels", base_name.replace(".jpg", ".txt")), "w") as f:
                for pred in predictions:
                    f.write(" ".join(map(str, pred)) + "\n")
    
    print("Prediction complete")
    return {
        "images": os.path.join(predictions_output_dir, "images"),
        "labels": os.path.join(predictions_output_dir, "labels")
    }

if __name__ == "__main__":
    # Configuration
    test_images_dir = '/kaggle/input/muid-iitr-train-val-test-split/split_data/test/images'
    ground_truth_dir = '/kaggle/input/muid-iitr-train-val-test-split/split_data/test/labels'
    predictions_base_dir = '/kaggle/working/predicted'
    annotated_failures_dir = '/kaggle/working/annotated_failure'
    test_new_images_dir = '/kaggle/working/test_new/images'
    test_new_labels_dir = '/kaggle/working/test_new/labels'
    iou_threshold = 0.5
    model_path = '/kaggle/input/rcnn-muid-iitr-pt-model/best-rcnn-iitr.pt'

    rename_test_images_and_labels_in_new_folder(
        test_images_dir,
        ground_truth_dir,
        test_new_images_dir,
        test_new_labels_dir
    )
    
    pred_dict = predict_test_images(
        model_path,
        test_new_images_dir,
        predictions_base_dir,
        conf_threshold=0.5,
        num_classes=2
    )
    
    process_failure_cases(
        test_new_images_dir,
        test_new_labels_dir,
        pred_dict["labels"],
        annotated_failures_dir,
        iou_threshold
    )

Copied and renamed 91 images


Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 219MB/s]
  checkpoint = torch.load(model_path, map_location=device)


Prediction complete
Processing 91 images
Processing complete. Metrics saved to /kaggle/working/annotated_failure/log.txt
