In [1]:
import json

import torch
from torch.utils.data import DataLoader
from torchmetrics.detection.mean_ap import MeanAveragePrecision

from rtdetr import RTDETR, RTDETRPostProcessor, box_iou
from rtdetr import COCODataset, collate_fn, get_valid_transforms

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
DEVICE = "cuda:0"
CHECKPOINT_PATH = "./saved/rtdetr_r18_50.pth"
CONFIG_PATH = "./rtdetr_r18.json"
cfg = json.load(open(CONFIG_PATH, "r"))

APPLY_NMS = True

In [3]:
model = RTDETR(
    num_classes = cfg['num_classes'],
    backbone_model = cfg['backbone_model'],
    hidden_dim = cfg['hidden_dim'], 
    nhead = cfg['nhead'], 
    ffn_dim = cfg['ffn_dim'], 
    num_encoder_layers = cfg['num_encoder_layers'],
    expansion_factor= cfg['expansion_factor'],
    aux_loss = cfg['aux_loss'],
    num_queries = cfg['num_queries'],
    num_decoder_points = cfg['num_decoder_points'],
    num_denoising = cfg['num_denoising'],
    num_decoder_layers = cfg['num_decoder_layers'],
    dropout = cfg['dropout'],
    multi_scale= cfg['multi_scale'],
    num_bottleneck_blocks= cfg['num_bottleneck_blocks'],
)
checkpoint = torch.load(CHECKPOINT_PATH, map_location="cpu", weights_only=True)
model.load_state_dict(checkpoint)
model.to(DEVICE)
model.eval()

output_processor = RTDETRPostProcessor(num_classes=cfg['num_classes'], num_queries=cfg['num_queries']) 

In [4]:
val_set = COCODataset(image_dir='./data/val2017/', annot_path='./data/annotations/instances_val2017.json', transforms=get_valid_transforms())
print("Total Samples in Validation Set:", len(val_set))

val_loader = DataLoader(val_set, batch_size=cfg['batch_size'], shuffle=False, num_workers=3, collate_fn=collate_fn, prefetch_factor=10)

Total Samples in Validation Set: 5000


In [5]:
def cxcywh_norm_to_xyxy_abs(boxes, im_size=640):
    """
    Scales normalised (cx, cy, w, h) to absolute (x1, y1, x2, y2) in pixels.
    """
    cx, cy, w, h = boxes.unbind(-1)
    cx *= im_size
    cy *= im_size
    w  *= im_size
    h  *= im_size
    x1 = cx - 0.5 * w
    y1 = cy - 0.5 * h
    x2 = cx + 0.5 * w
    y2 = cy + 0.5 * h
    return torch.stack([x1, y1, x2, y2], dim=-1)

In [6]:
# def compute_box_iou(boxes1, boxes2):
#     """
#     Compute IoU between two sets of boxes, boxes1 and boxes2.
#     boxes1: Tensor[N,4], boxes2: Tensor[M,4] (format: x1,y1,x2,y2)
#     Returns: Tensor[N,M] IoU matrix
#     """
#     area1 = (boxes1[:, 2] - boxes1[:, 0]).clamp(min=0) * (boxes1[:, 3] - boxes1[:, 1]).clamp(min=0)
#     area2 = (boxes2[:, 2] - boxes2[:, 0]).clamp(min=0) * (boxes2[:, 3] - boxes2[:, 1]).clamp(min=0)

#     lt = torch.max(boxes1[:, None, :2], boxes2[:, :2])  # [N,M,2]
#     rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:])  # [N,M,2]

#     wh = (rb - lt).clamp(min=0)  # [N,M,2]
#     inter = wh[:, :, 0] * wh[:, :, 1]  # [N,M]

#     union = area1[:, None] + area2 - inter
#     iou = inter / (union + 1e-6)  # add epsilon for stability

#     return iou

In [7]:
def nms_per_class(boxes, scores, iou_threshold=0.5):
    """
    Performs NMS for boxes and scores belonging to a single class.
    Args:
        boxes: Tensor[N, 4], xyxy format absolute
        scores: Tensor[N]
        iou_threshold: float
    Returns:
        keep_indices: Tensor of indices kept after NMS
    """
    if boxes.numel() == 0:
        return torch.empty((0,), dtype=torch.long, device=boxes.device)
    order = scores.argsort(descending=True)
    keep_mask = torch.ones_like(scores, dtype=torch.bool)

    for i in range(order.numel()):
        idx = order[i]
        if not keep_mask[idx]:
            continue
        
        # Get the indices of the boxes to compare against
        compare_indices = order[i+1:]
        compare_indices = compare_indices[keep_mask[compare_indices]]
        if compare_indices.numel() == 0:
            break
        ious, _ = box_iou(boxes[idx].unsqueeze(0), boxes[compare_indices])

        suppress_mask = ious[0] > iou_threshold
        indices_to_suppress = compare_indices[suppress_mask]
        keep_mask[indices_to_suppress] = False

    return torch.where(keep_mask)[0]

In [8]:
def batch_nms(predictions, iou_threshold=0.5, im_size=640):
    """
    Performs NMS on a batch of predictions.
    Args:
        predictions: list of dicts (batch size B), each dict:
            "boxes": tensor [k,4] normalized (cx, cy, w, h)
            "scores": tensor [k]
            "labels": tensor [k]
        iou_threshold: IoU threshold for suppression
        im_size: int or tuple(int,int) image dimension(s) for conversion
    Returns:
        list of dicts with filtered predictions after NMS
    """
    nms_batch = []
    for pred in predictions:
        boxes_norm = pred["boxes"]
        scores = pred["scores"]
        labels = pred["labels"]

        if boxes_norm.numel() == 0:
            nms_batch.append({
                "boxes": boxes_norm.new_empty((0, 4)),
                "scores": scores.new_empty((0,)),
                "labels": labels.new_empty((0,), dtype=labels.dtype)
            })
            continue

        boxes = cxcywh_norm_to_xyxy_abs(boxes_norm.clone(), im_size)
        keep_indices = []
        unique_labels = labels.unique()

        for c in unique_labels:
            cls_mask = (labels == c)
            original_indices = cls_mask.nonzero(as_tuple=True)[0]

            kept_local_indices = nms_per_class(boxes[cls_mask], scores[cls_mask], iou_threshold)
            keep_indices.append(original_indices[kept_local_indices])

        if len(keep_indices):
            keep_indices = torch.cat(keep_indices)
        else:
            keep_indices = torch.tensor([], dtype=torch.long, device=boxes.device)

        # Gather filtered predictions
        filtered = {
            "boxes": boxes_norm[keep_indices],  # Keep normalized format consistent if you want
            "scores": scores[keep_indices],
            "labels": labels[keep_indices]
        }
        nms_batch.append(filtered)

    return nms_batch

In [9]:
@torch.no_grad()
def evaluate(model, postprocessor, dataloader, device, score_threshold=0.05, img_size=640, apply_nms=False):
    """
    Evaluates the object detection model.
    Args:
        model (torch.nn.Module): The object detection model to evaluate.
        postprocessor (torch.nn.Module): The PostProcessor class to process model output to desired format.
        dataloader (torch.utils.data.DataLoader): Dataloader for the validation set.
        device (torch.device): The device to run evaluation on (e.g., 'cuda' or 'cpu').
        output_path (str): Path to the json file to save the evaluation results.
        score_threshold (float): The confidence threshold for model predictions.
        img_size (int): Size of the image.
        apply_nms (bool): If True apply NMS on postprocessed output.
    Returns:
        dict: A dictionary containing detailed evaluation metrics.
    """
    model.eval()
    model.to(device)

    metric = MeanAveragePrecision(
        box_format="xyxy",
        iou_thresholds=[0.50 + 0.05 * i for i in range(10)],
        max_detection_thresholds=[1, 10, 100],
        class_metrics=True,         # keep per-class AP/AR
        extended_summary=True       # keep per-IoU AP & AR
    )

    for batch, (images, targets) in enumerate(dataloader):
        images = images.to(device)
        outputs = model(images)
        preds = postprocessor(outputs, top_k=100, score_thresh=score_threshold)
        if apply_nms:
            preds = batch_nms(preds, 0.5, img_size)

        batch_tgts, batch_preds = [], []

        for i in range(len(preds)):
            t_boxes  = cxcywh_norm_to_xyxy_abs(targets[i]["boxes"], img_size)
            t_labels = targets[i]["labels"]
            batch_tgts.append({
                "boxes":  t_boxes.to(device),
                "labels": t_labels.to(device)
            })

            p_boxes  = cxcywh_norm_to_xyxy_abs(preds[i]["boxes"], img_size)
            p_scores = preds[i]["scores"]
            p_labels = preds[i]["labels"]
            batch_preds.append({
                "boxes":   p_boxes.to(device),
                "scores":  p_scores.to(device),
                "labels":  p_labels.to(device)
            })

        metric.update(batch_preds, batch_tgts)
        print(f"{batch+1} done.", end="\r")

    stats = metric.compute()
    return stats

In [10]:
if APPLY_NMS:
    evaluation_results = evaluate(model, output_processor, val_loader, DEVICE, 0.05, 640, APPLY_NMS)
else:
    evaluation_results = evaluate(model, output_processor, val_loader, DEVICE, 0.05, 640, APPLY_NMS)

125 done.

In [11]:
result = {}
result["map"] = evaluation_results["map"].item()
result["map_50"] = evaluation_results["map_50"].item()
result["map_75"] = evaluation_results["map_75"].item()
result["map_small"] = evaluation_results["map_small"].item()
result["map_medium"] = evaluation_results["map_medium"].item()
result["map_large"] = evaluation_results["map_large"].item()

if APPLY_NMS:
    with open("./saved/evaluation_rtdetr_r18_with_nms.json", 'w') as f:
        json.dump(result, f, indent=4)
else:
    with open("./saved/evaluation_rtdetr_r18_without_nms.json", 'w') as f:
        json.dump(result, f, indent=4)