In [1]:
from typing import List

from pyengine.inference.unified_structs.inference_results import Skeleton, Rect


def iou(a: Rect, b: Rect) -> float:
    ax1, ay1, ax2, ay2 = a.x1, a.y1, a.x2, a.y2
    bx1, by1, bx2, by2 = b.x1, b.y1, b.x2, b.y2
    iw = max(0.0, min(ax2, bx2) - max(ax1, bx1))
    ih = max(0.0, min(ay2, by2) - max(ay1, by1))
    inter = iw * ih
    if inter <= 0.0:
        return 0.0
    area_a = max(0.0, (ax2 - ax1)) * max(0.0, (ay2 - ay1))
    area_b = max(0.0, (bx2 - bx1)) * max(0.0, (by2 - by1))
    denom = area_a + area_b - inter
    if denom <= 0.0:
        return 0.0
    return inter / denom


def center_distance_normalized(a: Rect, b: Rect) -> float:
    """
    计算两个矩形中心点的归一化距离。
    归一化方式：除以两个矩形平均尺寸的对角线长度。
    返回值越小表示中心越接近。
    """
    # 计算中心点
    ax_center = (a.x1 + a.x2) / 2.0
    ay_center = (a.y1 + a.y2) / 2.0
    bx_center = (b.x1 + b.x2) / 2.0
    by_center = (b.y1 + b.y2) / 2.0

    # 欧几里得距离
    dist = ((ax_center - bx_center) ** 2 + (ay_center - by_center) ** 2) ** 0.5

    # 归一化：使用两个框的平均对角线长度
    a_diag = ((a.x2 - a.x1) ** 2 + (a.y2 - a.y1) ** 2) ** 0.5
    b_diag = ((b.x2 - b.x1) ** 2 + (b.y2 - b.y1) ** 2) ** 0.5
    avg_diag = (a_diag + b_diag) / 2.0

    if avg_diag <= 0.0:
        return float('inf')

    return dist / avg_diag


def nms_skeletons(
    sks: List[Skeleton],
    iou_threshold: float = 0.5,
    class_aware: bool = True,
    center_dist_threshold: float = 0.5,
    debug: bool = False
) -> List[Skeleton]:
    """
    Hard-NMS：同类间按置信度降序抑制。

    增强版 NMS，除了 IoU，还考虑中心距离。这对于处理 tile 边界的检测框特别有用。

    Args:
        sks: 骨架列表
        iou_threshold: IoU 阈值，超过此值认为重叠
        class_aware: 是否只在同类间做 NMS
        center_dist_threshold: 归一化中心距离阈值。当 IoU=0 但中心距离小于此值时，
                              也认为是同一个物体（用于 tile 边界情况）
        debug: 是否输出调试信息
    """
    if not sks:
        return []
    order = sorted(range(len(sks)), key=lambda i: sks[i].confidence, reverse=True)
    picked: List[int] = []

    def same_class(i: int, j: int) -> bool:
        return (sks[i].classification == sks[j].classification) if class_aware else True

    if debug:
        from pyengine.utils.logger import logger
        logger.debug("NMS", f"Processing {len(sks)} detections")
        for idx, sk in enumerate(sks):
            logger.debug("NMS", f"  [{idx}] bbox=({sk.rect.x1:.0f}, {sk.rect.y1:.0f}, {sk.rect.x2:.0f}, {sk.rect.y2:.0f}), conf={sk.confidence:.3f}, cls={sk.classification}")

    for i in order:
        keep = True
        for j in picked:
            if not same_class(i, j):
                continue

            # 计算 IoU
            overlap = iou(sks[i].rect, sks[j].rect)

            # 如果 IoU 超过阈值，直接抑制
            if overlap > iou_threshold:
                if debug:
                    from pyengine.utils.logger import logger
                    logger.debug("NMS", f"  Suppress [{i}] by [{j}]: IoU={overlap:.4f} > {iou_threshold}")
                keep = False
                break

            # 检查中心距离（特别适用于 tile 边界情况，即使有一些小的 IoU）
            # 当 IoU 未达到抑制阈值时，检查中心距离作为补充判定
            if overlap <= iou_threshold:
                center_dist = center_distance_normalized(sks[i].rect, sks[j].rect)
                if debug:
                    from pyengine.utils.logger import logger
                    logger.debug("NMS", f"  Compare [{i}] vs [{j}]: IoU={overlap:.4f}, CenterDist={center_dist:.4f}")
                if center_dist < center_dist_threshold:
                    if debug:
                        from pyengine.utils.logger import logger
                        logger.debug("NMS", f"  Suppress [{i}] by [{j}]: CenterDist={center_dist:.4f} < {center_dist_threshold}")
                    keep = False
                    break

        if keep:
            picked.append(i)

    if debug:
        from pyengine.utils.logger import logger
        logger.debug("NMS", f"Kept {len(picked)} detections: {picked}")

    return [sks[i] for i in picked]


In [2]:
import sys
from pyengine.inference.unified_structs.inference_results import Point


def create_skeleton(x1, y1, x2, y2, confidence=0.9, classification=0):
    """辅助函数：创建一个测试用的 Skeleton 对象"""
    return Skeleton(
        rect=Rect(x1, y1, x2, y2),
        classification=classification,
        confidence=confidence,
        track_id=0,
        features=[],
        points=[Point((x1+x2)//2, (y1+y2)//2, 0.9)]
    )


def test_iou_calculation():
    """测试 IoU 计算的正确性"""
    print("=" * 80)
    print("测试 1: IoU 计算")
    print("=" * 80)

    # 完全重叠
    rect1 = Rect(100, 100, 200, 200)
    rect2 = Rect(100, 100, 200, 200)
    result = iou(rect1, rect2)
    print(f"完全重叠: IoU = {result:.4f} (期望: 1.0000)")
    assert abs(result - 1.0) < 0.001, f"Expected 1.0, got {result}"

    # 不重叠
    rect1 = Rect(100, 100, 200, 200)
    rect2 = Rect(300, 100, 400, 200)
    result = iou(rect1, rect2)
    print(f"不重叠: IoU = {result:.4f} (期望: 0.0000)")
    assert result == 0.0, f"Expected 0.0, got {result}"

    # 50% 重叠
    rect1 = Rect(100, 100, 200, 200)
    rect2 = Rect(150, 100, 250, 200)
    result = iou(rect1, rect2)
    print(f"50% 重叠: IoU = {result:.4f} (期望: 0.3333)")
    assert abs(result - 0.3333) < 0.01, f"Expected ~0.3333, got {result}"

    print("✓ IoU 计算测试通过\n")


def test_center_distance():
    """测试中心距离计算"""
    print("=" * 80)
    print("测试 2: 中心距离计算")
    print("=" * 80)

    # 相同中心
    rect1 = Rect(100, 100, 200, 200)
    rect2 = Rect(100, 100, 200, 200)
    result = center_distance_normalized(rect1, rect2)
    print(f"相同中心: 归一化距离 = {result:.4f} (期望: 0.0000)")
    assert result == 0.0, f"Expected 0.0, got {result}"

    # 紧邻的框（tile 边界场景）
    rect1 = Rect(100, 100, 200, 200)
    rect2 = Rect(200, 100, 300, 200)  # 紧邻右侧
    result = center_distance_normalized(rect1, rect2)
    print(f"紧邻框（tile 边界）: 归一化距离 = {result:.4f} (期望: ~0.71)")
    # 中心距离 = 100, 对角线 = 141.4, 归一化 = 100/141.4 ≈ 0.707
    assert 0.6 < result < 0.8, f"Expected ~0.71, got {result}"

    # 远距离的框
    rect1 = Rect(100, 100, 200, 200)
    rect2 = Rect(500, 100, 600, 200)
    result = center_distance_normalized(rect1, rect2)
    print(f"远距离框: 归一化距离 = {result:.4f} (期望: > 2.0)")
    assert result > 2.0, f"Expected > 2.0, got {result}"

    print("✓ 中心距离计算测试通过\n")


def test_nms_standard_iou():
    """测试标准 IoU 重叠的 NMS"""
    print("=" * 80)
    print("测试 3: 标准 IoU 重叠合并")
    print("=" * 80)

    # 两个高度重叠的框，应该合并
    sk1 = create_skeleton(100, 100, 200, 200, confidence=0.9)
    sk2 = create_skeleton(120, 100, 220, 200, confidence=0.8)

    skeletons = [sk1, sk2]
    result = nms_skeletons(skeletons, iou_threshold=0.5, center_dist_threshold=0.5)

    print(f"输入: 2 个高度重叠的检测框")
    print(f"输出: {len(result)} 个检测框 (期望: 1)")
    assert len(result) == 1, f"Expected 1 detection, got {len(result)}"
    assert result[0].confidence == 0.9, "Should keep the higher confidence detection"

    print("✓ 标准 IoU 重叠合并测试通过\n")


def test_nms_tile_boundary():
    """测试 tile 边界处的检测框合并（核心测试）"""
    print("=" * 80)
    print("测试 4: Tile 边界检测框合并（无重叠但中心接近）")
    print("=" * 80)

    # 模拟两个 tile 边界的检测：紧邻但不重叠
    sk1 = create_skeleton(1800, 300, 1920, 800, confidence=0.92)  # Tile 1 右边缘
    sk2 = create_skeleton(1920, 300, 2040, 800, confidence=0.88)  # Tile 2 左边缘

    # 计算指标
    overlap = iou(sk1.rect, sk2.rect)
    center_dist = center_distance_normalized(sk1.rect, sk2.rect)

    print(f"Tile 1 检测框: (1800, 300, 1920, 800), conf=0.92")
    print(f"Tile 2 检测框: (1920, 300, 2040, 800), conf=0.88")
    print(f"IoU: {overlap:.4f}")
    print(f"归一化中心距离: {center_dist:.4f}")

    skeletons = [sk1, sk2]
    result = nms_skeletons(skeletons, iou_threshold=0.5, center_dist_threshold=0.5)

    print(f"输出: {len(result)} 个检测框 (期望: 1)")
    assert len(result) == 1, f"Expected 1 detection (merged), got {len(result)}"
    assert result[0].confidence == 0.92, "Should keep the higher confidence detection"

    print("✓ Tile 边界检测框合并测试通过\n")


def test_nms_class_aware():
    """测试类别感知的 NMS"""
    print("=" * 80)
    print("测试 5: 类别感知 NMS（不同类别不应合并）")
    print("=" * 80)

    # 两个完全重叠但类别不同的框
    sk1 = create_skeleton(100, 100, 200, 200, confidence=0.9, classification=0)
    sk2 = create_skeleton(100, 100, 200, 200, confidence=0.8, classification=1)

    skeletons = [sk1, sk2]
    result = nms_skeletons(skeletons, iou_threshold=0.5, class_aware=True)

    print(f"输入: 2 个完全重叠但类别不同的检测框")
    print(f"输出: {len(result)} 个检测框 (期望: 2)")
    assert len(result) == 2, f"Expected 2 detections (different classes), got {len(result)}"

    print("✓ 类别感知 NMS 测试通过\n")


def test_nms_far_distance():
    """测试远距离检测框不应合并"""
    print("=" * 80)
    print("测试 6: 远距离检测框不合并")
    print("=" * 80)

    # 两个距离很远的框
    sk1 = create_skeleton(100, 100, 200, 200, confidence=0.9)
    sk2 = create_skeleton(1000, 100, 1100, 200, confidence=0.8)

    overlap = iou(sk1.rect, sk2.rect)
    center_dist = center_distance_normalized(sk1.rect, sk2.rect)

    print(f"检测框 1: (100, 100, 200, 200)")
    print(f"检测框 2: (1000, 100, 1100, 200)")
    print(f"IoU: {overlap:.4f}")
    print(f"归一化中心距离: {center_dist:.4f}")

    skeletons = [sk1, sk2]
    result = nms_skeletons(skeletons, iou_threshold=0.5, center_dist_threshold=0.5)

    print(f"输出: {len(result)} 个检测框 (期望: 2)")
    assert len(result) == 2, f"Expected 2 detections (far apart), got {len(result)}"

    print("✓ 远距离检测框不合并测试通过\n")


def test_nms_small_overlap_with_close_center():
    """测试有小重叠但中心接近的情况（修复后的关键场景）"""
    print("=" * 80)
    print("测试 7: 小重叠 + 中心接近的检测框合并（0.1 < IoU < 0.5）")
    print("=" * 80)

    # 模拟实际场景：两个框有小重叠（IoU ~ 0.2）且中心接近
    # 这是修复前的 bug 场景
    sk1 = create_skeleton(1053 + 410, 155 + 180, 1053 + 478, 155 + 400, confidence=0.92)
    sk2 = create_skeleton(1486 + 0, 153 + 180, 1486 + 90, 153 + 400, confidence=0.88)

    overlap = iou(sk1.rect, sk2.rect)
    center_dist = center_distance_normalized(sk1.rect, sk2.rect)

    print(f"检测框 1: ({sk1.rect.x1:.0f}, {sk1.rect.y1:.0f}, {sk1.rect.x2:.0f}, {sk1.rect.y2:.0f}), conf={sk1.confidence}")
    print(f"检测框 2: ({sk2.rect.x1:.0f}, {sk2.rect.y1:.0f}, {sk2.rect.x2:.0f}, {sk2.rect.y2:.0f}), conf={sk2.confidence}")
    print(f"IoU: {overlap:.4f}")
    print(f"归一化中心距离: {center_dist:.4f}")

    skeletons = [sk1, sk2]
    result = nms_skeletons(skeletons, iou_threshold=0.5, center_dist_threshold=0.5)

    print(f"输出: {len(result)} 个检测框 (期望: 1)")

    if len(result) == 1:
        print("✓ 小重叠 + 中心接近的检测框合并测试通过\n")
    else:
        print(f"✗ 测试失败！应该合并为 1 个检测框，实际得到 {len(result)} 个")
        print(f"  这表明 NMS 逻辑仍有问题，需要调整 center_dist_threshold 或检查逻辑")
        print()


def test_nms_complex_scenario():
    """测试复杂场景：多个 tile，多个检测"""
    print("=" * 80)
    print("测试 8: 复杂场景（3个 tile, 5个检测, 应合并为 3个人）")
    print("=" * 80)

    # 人物 A：在 Tile 1 和 Tile 2 边界
    sk_a1 = create_skeleton(1770, 337, 1920, 843, confidence=0.92)
    sk_a2 = create_skeleton(1920, 337, 2070, 843, confidence=0.88)

    # 人物 B：在 Tile 2 和 Tile 3 边界
    sk_b1 = create_skeleton(3660, 506, 3840, 1012, confidence=0.90)
    sk_b2 = create_skeleton(3840, 506, 4020, 1012, confidence=0.85)

    # 人物 C：完全在 Tile 2 内
    sk_c = create_skeleton(2820, 421, 2940, 928, confidence=0.95)

    skeletons = [sk_a1, sk_a2, sk_b1, sk_b2, sk_c]

    print(f"输入: {len(skeletons)} 个检测（5个检测来自3个实际人物）")

    result = nms_skeletons(skeletons, iou_threshold=0.5, center_dist_threshold=0.5)

    print(f"输出: {len(result)} 个检测 (期望: 3)")

    if len(result) == 3:
        print("✓ 复杂场景测试通过")
        print(f"  保留的检测置信度: {[f'{sk.confidence:.2f}' for sk in result]}")
    else:
        print(f"✗ 测试失败！期望 3 个检测，实际得到 {len(result)} 个")

    print()


def run_all_tests():
    """运行所有测试"""
    print("\n")
    print("╔" + "=" * 78 + "╗")
    print("║" + " " * 25 + "NMS 算法测试套件" + " " * 37 + "║")
    print("╚" + "=" * 78 + "╝")
    print()

    tests = [
        test_iou_calculation,
        test_center_distance,
        test_nms_standard_iou,
        test_nms_tile_boundary,
        test_nms_class_aware,
        test_nms_far_distance,
        test_nms_small_overlap_with_close_center,
        test_nms_complex_scenario,
    ]

    passed = 0
    failed = 0

    for test in tests:
        try:
            test()
            passed += 1
        except AssertionError as e:
            print(f"✗ 测试失败: {e}\n")
            failed += 1
        except Exception as e:
            print(f"✗ 测试出错: {e}\n")
            failed += 1

    print("=" * 80)
    print(f"测试结果: {passed} 通过, {failed} 失败")
    print("=" * 80)

    return failed == 0


if __name__ == "__main__":
    success = run_all_tests()
    print(f"All tests are {'success' if success else 'failed'}")



║                         NMS 算法测试套件                                     ║

测试 1: IoU 计算
完全重叠: IoU = 1.0000 (期望: 1.0000)
不重叠: IoU = 0.0000 (期望: 0.0000)
50% 重叠: IoU = 0.3333 (期望: 0.3333)
✓ IoU 计算测试通过

测试 2: 中心距离计算
相同中心: 归一化距离 = 0.0000 (期望: 0.0000)
紧邻框（tile 边界）: 归一化距离 = 0.7071 (期望: ~0.71)
远距离框: 归一化距离 = 2.8284 (期望: > 2.0)
✓ 中心距离计算测试通过

测试 3: 标准 IoU 重叠合并
输入: 2 个高度重叠的检测框
输出: 1 个检测框 (期望: 1)
✓ 标准 IoU 重叠合并测试通过

测试 4: Tile 边界检测框合并（无重叠但中心接近）
Tile 1 检测框: (1800, 300, 1920, 800), conf=0.92
Tile 2 检测框: (1920, 300, 2040, 800), conf=0.88
IoU: 0.0000
归一化中心距离: 0.2334
输出: 1 个检测框 (期望: 1)
✓ Tile 边界检测框合并测试通过

测试 5: 类别感知 NMS（不同类别不应合并）
输入: 2 个完全重叠但类别不同的检测框
输出: 2 个检测框 (期望: 2)
✓ 类别感知 NMS 测试通过

测试 6: 远距离检测框不合并
检测框 1: (100, 100, 200, 200)
检测框 2: (1000, 100, 1100, 200)
IoU: 0.0000
归一化中心距离: 6.3640
输出: 2 个检测框 (期望: 2)
✓ 远距离检测框不合并测试通过

测试 7: 小重叠 + 中心接近的检测框合并（0.1 < IoU < 0.5）
检测框 1: (1463, 335, 1531, 555), conf=0.92
检测框 2: (1486, 333, 1576, 553), conf=0.88
IoU: 0.3932
归一化中心距离: 0.1456
输出: 1 个检测框 (期望: 1)
✓ 小重叠 + 中心接近的检