In [None]:
# -*- coding: utf-8 -*-

import sys
import json
import time
import random
import math
from typing import List, Dict, Any

# 项目路径
SIMWORLD_DIR = r"D:\Projects\Food-Delivery-Bench\SimWorld"
LLM_DELIVERY_DIR = r"D:\Projects\Food-Delivery-Bench\LLM-Delivery"

# 数据文件
ROADS_JSON = r"D:\Projects\Food-Delivery-Bench\Test_Data\test\roads.json"
WORLD_JSON = r"D:\Projects\Food-Delivery-Bench\Test_Data\test\progen_world_enriched.json"
STORE_ITEMS_JSON  = r"D:\Projects\Food-Delivery-Bench\LLM-Delivery\input\store_items.json"
FOOD_JSON         = r"D:\Projects\Food-Delivery-Bench\LLM-Delivery\input\food.json"
CONFIG_JSON       = r"D:\Projects\Food-Delivery-Bench\LLM-Delivery\input\config.json"
SPECIAL_NOTES_JSON = r"D:\Projects\Food-Delivery-Bench\LLM-Delivery\input\special_notes.json"

# 项目路径注入
sys.path.insert(0, SIMWORLD_DIR)
sys.path.insert(0, LLM_DELIVERY_DIR)

from Base.Map import Map
from Base.Order import Order, OrderManager
from Base.Types import Vector
from Base.Timer import VirtualClock
from Communicator.Communicator import Communicator

communicator = Communicator(port=9000, ip='127.0.0.1', resolution=(640, 480))

with open(FOOD_JSON, "r", encoding="utf-8") as f:
    food_data = json.load(f) or {}
menu_items = food_data.get("items", [])

with open(SPECIAL_NOTES_JSON, "r", encoding="utf-8") as f:
    special_notes_data = json.load(f) or {}

clock = VirtualClock(time_scale=3.0)

def load_world_nodes(path: str) -> List[Dict[str, Any]]:
    """加载世界节点数据"""
    with open(path, "r", encoding="utf-8") as f:
        obj = json.load(f)
    return obj.get("nodes", [])

def test_handoff_position_sampling():
    """测试handoff位置采样功能"""
    print("=" * 80)
    print("测试Handoff位置采样功能")
    print("=" * 80)
    
    # 1. 构建地图
    print("1. 构建地图...")
    m = Map()
    m.import_roads(ROADS_JSON)
    m.import_pois(WORLD_JSON)
    
    # 2. 加载世界节点
    print("2. 加载世界节点...")
    world_nodes = load_world_nodes(WORLD_JSON)
    
    # 3. 生成订单池
    print("3. 生成订单池...")
    om = OrderManager(capacity=10, menu=menu_items, special_notes_map=special_notes_data, note_prob=0.8)
    om.fill_pool(m, world_nodes, communicator)
    
    # 4. 统计handoff订单
    handoff_orders = []
    for order in om.list_orders():
        if 'hand_to_customer' in order.allowed_delivery_methods:
            handoff_orders.append(order)
    
    print(f"4. 找到 {len(handoff_orders)} 个需要handoff的订单")
    
    if not handoff_orders:
        print("没有找到需要handoff的订单，重新生成...")
        # 强制设置handoff概率为1.0来测试

        # 重新生成订单
        om = OrderManager(capacity=10)
        om.fill_pool(m, world_nodes)
        
        handoff_orders = []
        for order in om.list_orders():
            if order.requires_handoff:
                handoff_orders.append(order)
        
        print(f"强制handoff后找到 {len(handoff_orders)} 个需要handoff的订单")
    
    # 5. 测试每个handoff订单
    print("\n5. 测试handoff位置采样...")
    print("-" * 80)
    
    for i, order in enumerate(handoff_orders):  # 只测试前5个
        print(f"\n订单 #{order.id}:")
        print(f"  Dropoff位置: ({order.dropoff_node.position.x:.1f}, {order.dropoff_node.position.y:.1f})")
        
        if order.dropoff_building_box:
            bbox = order.dropoff_building_box
            print(f"  建筑边界: 中心=({bbox['x']:.1f}, {bbox['y']:.1f}), 尺寸=({bbox['w']:.1f}, {bbox['h']:.1f}), 角度={bbox['yaw']:.1f}°")
        else:
            print("  建筑边界: 无")
        
        if order.handoff_address:
            print(f"  Handoff位置: ({order.handoff_address.x:.1f}, {order.handoff_address.y:.1f})")
            
            # 测试是否在建筑内部
            is_inside = order._is_point_inside_building(order.handoff_address)
            print(f"  是否在建筑内部: {is_inside}")
            
            if is_inside:
                print("  ❌ 错误：Handoff位置在建筑内部！")
            else:
                print("  ✅ 正确：Handoff位置在建筑外部")
                
            # 计算距离
            distance = order.dropoff_node.position.distance(order.handoff_address)
            print(f"  距离dropoff_node: {distance:.1f}cm ({distance/100:.1f}m)")
        else:
            print("  ❌ 错误：没有生成handoff位置")

def test_building_inside_detection():
    """测试建筑内部检测功能"""
    print("\n" + "=" * 80)
    print("测试建筑内部检测功能")
    print("=" * 80)
    
    # 创建一个测试订单
    m = Map()
    m.import_roads(ROADS_JSON)
    m.import_pois(WORLD_JSON)
    
    world_nodes = load_world_nodes(WORLD_JSON)
    
    # 找到第一个有建筑边界信息的订单
    test_order = None
    for _ in range(10):
        om = OrderManager(capacity=1)
        om.fill_pool(m, world_nodes)
        order = om.list_orders()[0]
        if order.dropoff_building_box:
            test_order = order
            break
    
    if not test_order:
        print("没有找到有建筑边界信息的订单")
        return
    
    print(f"使用订单 #{test_order.id} 进行测试")
    bbox = test_order.dropoff_building_box
    print(f"建筑边界: 中心=({bbox['x']:.1f}, {bbox['y']:.1f}), 尺寸=({bbox['w']:.1f}, {bbox['h']:.1f}), 角度={bbox['yaw']:.1f}°")
    
    # 测试建筑中心点
    center_point = Vector(bbox['x'], bbox['y'])
    is_center_inside = test_order._is_point_inside_building(center_point)
    print(f"建筑中心点 ({center_point.x:.1f}, {center_point.y:.1f}) 是否在建筑内部: {is_center_inside}")
    
    # 测试建筑边界上的点
    half_w = bbox['w'] / 2.0
    half_h = bbox['h'] / 2.0
    
    # 在建筑局部坐标系中的边界点
    local_boundary_points = [
        (half_w, 0),      # 右边界中心
        (-half_w, 0),     # 左边界中心
        (0, half_h),      # 上边界中心
        (0, -half_h),     # 下边界中心
        (half_w, half_h), # 右上角
        (-half_w, -half_h), # 左下角
    ]
    
    # 转换到世界坐标系
    yaw_rad = math.radians(bbox['yaw'])
    cos_yaw = math.cos(yaw_rad)
    sin_yaw = math.sin(yaw_rad)
    
    print("\n测试建筑边界上的点:")
    for local_x, local_y in local_boundary_points:
        # 旋转到世界坐标系
        world_x = bbox['x'] + local_x * cos_yaw - local_y * sin_yaw
        world_y = bbox['y'] + local_x * sin_yaw + local_y * cos_yaw
        
        world_point = Vector(world_x, world_y)
        is_inside = test_order._is_point_inside_building(world_point)
        print(f"  点 ({world_point.x:.1f}, {world_point.y:.1f}) 是否在建筑内部: {is_inside}")
    
    # 测试建筑外部的点
    print("\n测试建筑外部的点:")
    outside_points = [
        Vector(bbox['x'] + bbox['w'], bbox['y']),  # 右侧外部
        Vector(bbox['x'] - bbox['w'], bbox['y']),  # 左侧外部
        Vector(bbox['x'], bbox['y'] + bbox['h']),  # 上方外部
        Vector(bbox['x'], bbox['y'] - bbox['h']),  # 下方外部
    ]
    
    for point in outside_points:
        is_inside = test_order._is_point_inside_building(point)
        print(f"  点 ({point.x:.1f}, {point.y:.1f}) 是否在建筑内部: {is_inside}")

def main():
    """主测试函数"""
    print("开始测试Handoff位置采样功能...")
    
    try:
        # 测试1: 基本handoff位置采样
        test_handoff_position_sampling()
        
        # 测试2: 建筑内部检测
        test_building_inside_detection()
        
        print("\n" + "=" * 80)
        print("所有测试完成！")
        print("=" * 80)
        
    except Exception as e:
        print(f"测试过程中出现错误: {e}")
        import traceback
        traceback.print_exc()

if __name__ == "__main__":
    main()

In [None]:
communicator.destroy_customer(8)