In [1]:
import numpy as np
from __future__ import annotations
import os, json
from pathlib import Path
from datetime import datetime
from collections import defaultdict
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
from typing import List, Dict, Any

from __future__ import annotations


In [2]:
def _xywh_to_xyxy(b):
    x, y, w, h = b
    return (x, y, x + w, y + h)

def _iou(b1, b2):
    x1,y1,x2,y2 = _xywh_to_xyxy(b1)
    x1g, y1g, x2g, y2g = _xywh_to_xyxy(b2)
    ix1, iy1 = max(x1, x1g), max(y1, y1g)
    ix2, iy2 = min(x2, x2g), min(y2, y2g)
    iw, ih = max(0.0, ix2 - ix1), max(0.0, iy2 - iy1)
    inter = iw * ih
    if inter <= 0:
        return 0.0
    area1 = (x2 - x1) * (y2 - y1)
    area2 = (x2g - x1g) * (y2g - y1g)
    union = area1 + area2 - inter
    return inter / union if union > 0 else 0.0

def compute_pr_manual(
    gt_annotations,         # list of dicts: {"image_id", "category_id", "bbox"}
    pred_annotations,       # list of dicts: {"image_id", "category_id", "bbox", "score"}
    iou_thr=0.50,
    image_ids_eval=None     # optional: restrict to these GT image_ids
):
    """
    Returns: dict with TP, FP, FN, precision, recall
    Notes:
      - Greedy one-to-one matching per (image_id, category_id)
      - All predictions considered (no maxDets)
      - All gt considered (no area/crowd filtering)
    """

    # group GT and predictions by (image_id, category_id)
    gts = defaultdict(list)
    dts = defaultdict(list)

    if image_ids_eval is not None:
        image_ids_eval = set(image_ids_eval)

    for g in gt_annotations:
        if image_ids_eval is not None and g["image_id"] not in image_ids_eval:
            continue
        gts[(g["image_id"], g["category_id"])].append({"bbox": g["bbox"], "matched": False})

    for d in pred_annotations:
        if image_ids_eval is not None and d["image_id"] not in image_ids_eval:
            continue
        dts[(d["image_id"], d["category_id"])].append({"bbox": d["bbox"], "score": float(d.get("score", 1.0))})

    tp = 0
    fp = 0
    fn = 0

    # iterate over all keys present in either GT or preds
    keys = set(gts.keys()) | set(dts.keys())
    for key in keys:
        gt_list = gts.get(key, [])
        dt_list = dts.get(key, [])

        # sort detections by score desc
        dt_list.sort(key=lambda x: x["score"], reverse=True)

        gt_matched = [False] * len(gt_list)

        # greedy matching
        for det in dt_list:
            best_iou = 0.0
            best_j = -1
            for j, gt in enumerate(gt_list):
                if gt_matched[j]:
                    continue
                iou = _iou(det["bbox"], gt["bbox"])
                if iou >= iou_thr and iou > best_iou:
                    best_iou = iou
                    best_j = j
            if best_j >= 0:
                gt_matched[best_j] = True
                tp += 1
            else:
                fp += 1

        # any unmatched GT are FN
        fn += sum(1 for m in gt_matched if not m)

    precision = tp / (tp + fp) if (tp + fp) else 0.0
    recall    = tp / (tp + fn) if (tp + fn) else 0.0

    return {
        "tp": tp,
        "fp": fp,
        "fn": fn,
        "precision": precision,
        "recall": recall,
    }


In [3]:
def evaluate_predictions(predictions_dir: str | Path, iou_thr = 0.5, iou_type: str = "bbox") -> List[Dict[str, Any]]:
    """
    Expects prediction files named: {dataset}_{subset}_{model}_predictions.json
    Writes evaluations to: ../Evaluations/{PREDICTIONS_DIR_NAME}/{dataset}_{subset}_{model}_evaluation.json
    Requires:
      - Ground truth at ../Data/{dataset}/annotations/instances_{subset}.json
      - Optional: preds["images"], preds["categories"], preds["annotations"]
      - Optional: preds["total_inference_time_s"]
    Also relies on a user-defined compute_pr_manual(...) helper.
    """
    predictions_dir = Path(predictions_dir)
    out_root = Path("..") / "Evaluations" / predictions_dir.name
    out_root.mkdir(parents=True, exist_ok=True)

    json_files = sorted(predictions_dir.glob("*_predictions.json"))
    if not json_files:
        print(f"No prediction files found in {predictions_dir}")
        return []

    results: List[Dict[str, Any]] = []

    for pred_path in json_files:
        # ---- infer dataset, subset, model from filename ----
        stem = pred_path.stem  # e.g., dataset_subset_model_predictions
        parts = stem.split("_")

        if len(parts) < 3 or parts[-1] != "predictions":
            # Fallback: try to find 'predictions' token and parse before it
            try:
                pred_idx = parts.index("predictions")
                core = parts[:pred_idx]
            except ValueError:
                core = parts
            if len(core) < 3:
                print(f"Skip unrecognized filename pattern: {pred_path.name}")
                continue
            dataset_name, subset_name = core[0], core[1]
            model_name = "_".join(core[2:])
        else:
            dataset_name = parts[0]
            subset_name = parts[1]
            model_name = "_".join(parts[2:-1])  # anything between subset and 'predictions'

        gt_path = Path("..") / "Data" / dataset_name / "annotations" / f"instances_{subset_name}.json"
        if not gt_path.exists():
            print(f"Missing GT: {gt_path}")
            continue

        cocoGt = COCO(str(gt_path))

        with open(pred_path, "r", encoding="utf-8") as f:
            preds = json.load(f)

        # --- map GT and prediction images by filename ---
        gt_name_to_id = {os.path.basename(im["file_name"]): im["id"] for im in cocoGt.dataset["images"]}
        pred_images = preds.get("images", [])
        pred_id_to_name = {im["id"]: os.path.basename(im["file_name"]) for im in pred_images}

        # image set = ALL images listed in preds["images"] that exist in GT
        img_ids_eval = sorted({
            gt_name_to_id[os.path.basename(im["file_name"])]
            for im in pred_images
            if os.path.basename(im["file_name"]) in gt_name_to_id
        })

        # If no overlap, output zeros and continue
        if not img_ids_eval:
            empty = {k: 0.0 for k in [
                "AP@[.50:.95]", "AP@0.50", "AP@0.75", "AP_small", "AP_medium", "AP_large",
                "AR@1", "AR@10", "AR@100", "AR_small", "AR_medium", "AR_large"
            ]}
            eval_summary = {
                "info": {
                    "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                    "dataset_name": dataset_name,
                    "subset_name": subset_name,
                    "model_name": model_name,
                    "iou_type": iou_type,
                    "num_predicted_bbox": 0,
                    "num_gt_bbox": 0,
                    "num_eval_images": 0,
                    "num_eval_categories": 0,
                },
                "metrics": empty,
            }
            eval_json = out_root / f"{dataset_name}_{subset_name}_{model_name}_evaluation.json"
            with open(eval_json, "w", encoding="utf-8") as f:
                json.dump(eval_summary, f, indent=2)
            results.append(eval_summary)
            continue

        # --- category remap ---
        gt_cat_by_name = {c["name"]: c["id"] for c in cocoGt.dataset["categories"]}
        pred_cat_map: Dict[int, int] = {}
        if "categories" in preds:
            pred_cat_by_name = {c["name"]: c["id"] for c in preds["categories"]}
            # map prediction cat IDs -> GT cat IDs when names match but IDs differ
            pred_cat_map = {
                pred_cat_by_name[n]: gt_cat_by_name[n]
                for n in (pred_cat_by_name.keys() & gt_cat_by_name.keys())
                if pred_cat_by_name[n] != gt_cat_by_name[n]
            }
            cat_ids_eval = sorted({gt_cat_by_name[n] for n in (pred_cat_by_name.keys() & gt_cat_by_name.keys())})
        else:
            cat_ids_eval = sorted(gt_cat_by_name.values())

        # --- remap detections (may be empty for some or all images) ---
        remapped = []
        for a in preds.get("annotations", []):
            pred_name = pred_id_to_name.get(a["image_id"])
            if not pred_name:
                continue
            gt_img_id = gt_name_to_id.get(pred_name)
            if gt_img_id is None:
                continue
            cat_id = pred_cat_map.get(a["category_id"], a["category_id"])
            x, y, w, h = a["bbox"]
            remapped.append({
                "image_id": gt_img_id,
                "category_id": cat_id,
                "bbox": [float(x), float(y), float(w), float(h)],
                "score": float(a.get("score", 1.0)),
            })

        # --- run COCOeval over the chosen image set, even if remapped is empty ---
        cocoDt = cocoGt.loadRes(remapped)  # [] allowed
        cocoEval = COCOeval(cocoGt, cocoDt, iouType=iou_type)
        cocoEval.params.imgIds = img_ids_eval
        cocoEval.params.catIds = cat_ids_eval

        cocoEval.evaluate()
        cocoEval.accumulate()
        cocoEval.summarize()

        metrics = {
            "AP@[.50:.95]": cocoEval.stats[0],
            "AP@0.50":      cocoEval.stats[1],
            "AP@0.75":      cocoEval.stats[2],
            "AP_small":     cocoEval.stats[3],
            "AP_medium":    cocoEval.stats[4],
            "AP_large":     cocoEval.stats[5],
            "AR@1":         cocoEval.stats[6],
            "AR@10":        cocoEval.stats[7],
            "AR@100":       cocoEval.stats[8],
            "AR_small":     cocoEval.stats[9],
            "AR_medium":    cocoEval.stats[10],
            "AR_large":     cocoEval.stats[11],
        }

        # --- additional metrics ---
        num_gt_bbox = sum(1 for a in cocoGt.dataset["annotations"] if a["image_id"] in img_ids_eval)
        num_predicted_bbox = len(remapped)

        # User-provided helper must exist
        res = compute_pr_manual(
            cocoGt.dataset["annotations"],
            remapped,
            iou_thr=iou_thr,
            image_ids_eval=img_ids_eval
        )

        bbox_additions = res["fn"]
        bbox_removals  = res["fp"]
        precision_iou   = res["precision"]
        recall_iou      = res["recall"]

        metrics.update({
            "bbox_additions": bbox_additions,
            "bbox_removals": bbox_removals,
            "precision@%.2f" % iou_thr: precision_iou,
            "recall@%.2f" % iou_thr: recall_iou,
        })

        # time calculations
        total_inference_time = preds.get("info", {}).get("total_inference_time_s", 0.0)        
        total_annotation_time = total_inference_time + bbox_additions * 10.15 + bbox_removals * 5.20
        annotation_time_per_bbox = total_annotation_time / num_gt_bbox if num_gt_bbox > 0 else 0.0

        metrics["total_annotation_time_s"] = total_annotation_time
        metrics["annotation_time_per_bbox_s"] = annotation_time_per_bbox

        # cost calculations
        cost_per_bbox_eur = preds.get("info", {}).get("cost_per_bbox_eur", 0.0)
        metrics["cost_per_bbox_eur"] = cost_per_bbox_eur

        eval_summary = {
            "info": {
                "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
                "dataset_name": dataset_name,
                "subset_name": subset_name,
                "model_name": model_name,
                "iou_type": iou_type,
                "num_predicted_bbox": num_predicted_bbox,
                "num_gt_bbox": num_gt_bbox,
                "num_eval_images": len(img_ids_eval),
                "num_eval_categories": len(cat_ids_eval),
            },
            "metrics": metrics,
        }

        # write JSON summary next to predictions
        eval_json = out_root / f"{dataset_name}_{subset_name}_{model_name}_evaluation.json"
        with open(eval_json, "w", encoding="utf-8") as f:
            json.dump(eval_summary, f, indent=2)

        results.append(eval_summary)

    return results


In [4]:
evaluate_predictions(predictions_dir="../Results/Experiment_1/", iou_thr=0.5, iou_type="bbox")

loading annotations into memory...
Done (t=0.18s)
creating index...
index created!
Loading and preparing results...
DONE (t=0.00s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *bbox*
DONE (t=0.03s).
Accumulating evaluation results...
DONE (t=0.01s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets=100 ] = 0.249
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets=100 ] = 0.299
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets=100 ] = 0.253
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= small | maxDets=100 ] = 0.079
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets=100 ] = 0.396
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets=100 ] = -1.000
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets=  1 ] = 0.081
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 10 ] = 0.262
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDet

[{'info': {'timestamp': '2025-11-23 13:25:43',
   'dataset_name': 'tomatoes',
   'subset_name': 'val',
   'model_name': 'gd_t',
   'iou_type': 'bbox',
   'num_predicted_bbox': 10,
   'num_gt_bbox': 26,
   'num_eval_images': 4,
   'num_eval_categories': 1},
  'metrics': {'AP@[.50:.95]': np.float64(0.24947666195190948),
   'AP@0.50': np.float64(0.299009900990099),
   'AP@0.75': np.float64(0.2527581329561528),
   'AP_small': np.float64(0.07920792079207918),
   'AP_medium': np.float64(0.3962871287128713),
   'AP_large': np.float64(-1.0),
   'AR@1': np.float64(0.08076923076923076),
   'AR@10': np.float64(0.26153846153846155),
   'AR@100': np.float64(0.26153846153846155),
   'AR_small': np.float64(0.07272727272727272),
   'AR_medium': np.float64(0.4),
   'AR_large': np.float64(-1.0),
   'bbox_additions': 18,
   'bbox_removals': 2,
   'precision@0.50': 0.8,
   'recall@0.50': 0.3076923076923077,
   'total_annotation_time_s': 283.3155641439995,
   'annotation_time_per_bbox_s': 10.89675246707690