In [None]:
import os
import sys
from pathlib import Path
import numpy as np
import torch
import matplotlib.pyplot as plt
from typing import Dict, Any

# 添加项目根目录到Python路径
project_root = Path.cwd().parent
sys.path.append(str(project_root))

# 导入必要的模块
import hydra
from hydra.utils import instantiate
from omegaconf import DictConfig

from navsim.common.dataloader import SceneLoader
from navsim.common.dataclasses import SceneFilter, SensorConfig, Scene
from navsim.agents.abstract_agent import AbstractAgent
from navsim.visualization.plots import plot_bev_with_agent, plot_bev_frame

# 设置环境变量（请根据您的环境修改）
if not os.getenv("OPENSCENE_DATA_ROOT"):
    os.environ["OPENSCENE_DATA_ROOT"] = "/path/to/your/data"  # 请修改为您的数据路径

print("环境设置完成！")


In [None]:
# 配置数据加载
SPLIT = "mini"  # 使用mini数据集进行测试
FILTER = "all_scenes"

# 初始化场景过滤器
hydra.initialize(config_path="../navsim/planning/script/config/common/scene_filter", version_base=None)
cfg = hydra.compose(config_name=FILTER)
scene_filter: SceneFilter = instantiate(cfg)

# 设置数据路径
openscene_data_root = Path(os.getenv("OPENSCENE_DATA_ROOT"))

# 创建场景加载器
scene_loader = SceneLoader(
    openscene_data_root / f"navsim_logs/{SPLIT}",
    openscene_data_root / f"sensor_blobs/{SPLIT}",
    scene_filter,
    sensor_config=SensorConfig.build_all_sensors(),
)

print(f"数据加载器设置完成，可用场景数量: {len(scene_loader.tokens)}")
hydra.core.global_hydra.GlobalHydra.instance().clear()


In [None]:
class ModelInferenceAgent(AbstractAgent):
    """
    模型推理代理包装器
    将训练好的模型包装成符合AbstractAgent接口的类
    """
    
    def __init__(self, model_type="diffusiondrive", checkpoint_path=None):
        super().__init__()
        self.model_type = model_type
        self.checkpoint_path = checkpoint_path
        self.model = None
        self.config = None
        
        if checkpoint_path:
            self._load_model()
    
    def _load_model(self):
        """加载预训练模型"""
        try:
            if self.model_type == "diffusiondrive":
                from navsim.agents.diffusiondrive.transfuser_agent import TransfuserAgent
                from navsim.agents.diffusiondrive.transfuser_config import TransfuserConfig
                
                # 加载配置
                self.config = TransfuserConfig()
                
                # 创建agent并加载检查点
                agent = TransfuserAgent(self.config)
                checkpoint = torch.load(self.checkpoint_path, map_location='cpu')
                
                # 加载模型权重
                if 'state_dict' in checkpoint:
                    agent.load_state_dict(checkpoint['state_dict'])
                else:
                    agent.load_state_dict(checkpoint)
                
                agent.eval()
                self.model = agent
                
            elif self.model_type == "transfuser":
                from navsim.agents.transfuser.transfuser_agent import TransfuserAgent
                from navsim.agents.transfuser.transfuser_config import TransfuserConfig
                
                self.config = TransfuserConfig()
                agent = TransfuserAgent(self.config)
                checkpoint = torch.load(self.checkpoint_path, map_location='cpu')
                
                if 'state_dict' in checkpoint:
                    agent.load_state_dict(checkpoint['state_dict'])
                else:
                    agent.load_state_dict(checkpoint)
                    
                agent.eval()
                self.model = agent
            
            print(f"成功加载 {self.model_type} 模型！")
            
        except Exception as e:
            print(f"模型加载失败: {e}")
            print("将使用简单的常速度模型进行演示")
            self._use_fallback_model()
    
    def _use_fallback_model(self):
        """如果模型加载失败，使用简单的基线模型"""
        from navsim.agents.constant_velocity_agent import ConstantVelocityAgent
        self.model = ConstantVelocityAgent()
        self.model_type = "constant_velocity"
        print("使用常速度模型作为演示")
    
    def compute_trajectory(self, agent_input, scene=None):
        """计算轨迹预测"""
        if self.model is None:
            self._use_fallback_model()
        
        # 根据模型类型调用相应的推理方法
        if hasattr(self.model, 'requires_scene') and self.model.requires_scene and scene is not None:
            return self.model.compute_trajectory(agent_input, scene)
        else:
            return self.model.compute_trajectory(agent_input)

# 创建推理代理实例
# 请将checkpoint_path修改为您的模型路径
checkpoint_path = None  # 例如: "/path/to/your/model.pth"
inference_agent = ModelInferenceAgent(
    model_type="diffusiondrive", 
    checkpoint_path=checkpoint_path
)

print(f"推理代理创建完成，模型类型: {inference_agent.model_type}")


In [None]:
# 随机选择一个场景
token = np.random.choice(scene_loader.tokens)
print(f"选择的场景token: {token}")

# 加载场景
scene = scene_loader.get_scene_from_token(token)
agent_input = scene_loader.get_agent_input_from_token(token)

print(f"场景信息:")
print(f"  - 历史帧数: {scene.scene_metadata.num_history_frames}")
print(f"  - 未来帧数: {scene.scene_metadata.num_future_frames}")
print(f"  - 总帧数: {len(scene.frames)}")
print(f"  - 场景类型: {scene.scene_metadata.scenario_type}")
print(f"  - 日志名称: {scene.scene_metadata.log_name}")


In [None]:
# 创建BEV轨迹对比图
fig, ax = plot_bev_with_agent(scene, inference_agent)

# 添加标题和说明
plt.title(f"轨迹对比 - {inference_agent.model_type.upper()}模型\n" + 
          f"场景: {scene.scene_metadata.scenario_type}\n" +
          f"绿色: 人类驾驶轨迹, 红色: AI预测轨迹")

plt.tight_layout()
plt.show()

print("BEV轨迹对比图已生成！")
print("图例说明:")
print("  - 绿色轨迹: 人类驾驶员的真实轨迹")
print("  - 红色轨迹: AI模型的预测轨迹")
print("  - 灰色区域: 道路和可行驶区域")
print("  - 彩色方框: 其他车辆和障碍物")


In [None]:
# 获取轨迹数据
human_trajectory = scene.get_future_trajectory()
agent_trajectory = inference_agent.compute_trajectory(agent_input, scene)

print("轨迹分析:")
print(f"  - 人类轨迹点数: {len(human_trajectory.poses)}")
print(f"  - AI预测轨迹点数: {len(agent_trajectory.poses)}")
print(f"  - 采样间隔: {human_trajectory.trajectory_sampling.interval_length:.2f}秒")
print(f"  - 预测时长: {human_trajectory.trajectory_sampling.time_horizon:.1f}秒")

# 计算轨迹统计信息
human_poses = np.array(human_trajectory.poses)
agent_poses = np.array(agent_trajectory.poses)

# 计算位置差异（欧几里得距离）
min_length = min(len(human_poses), len(agent_poses))
position_diff = np.sqrt(np.sum((human_poses[:min_length, :2] - agent_poses[:min_length, :2])**2, axis=1))

print(f"\n轨迹差异分析:")
print(f"  - 平均位置误差: {np.mean(position_diff):.2f}米")
print(f"  - 最大位置误差: {np.max(position_diff):.2f}米")
print(f"  - 最终位置误差: {position_diff[-1]:.2f}米")

# 绘制位置误差随时间变化
plt.figure(figsize=(12, 4))
time_points = np.arange(min_length) * human_trajectory.trajectory_sampling.interval_length

plt.subplot(1, 2, 1)
plt.plot(time_points, position_diff, 'b-', linewidth=2)
plt.xlabel('时间 (秒)')
plt.ylabel('位置误差 (米)')
plt.title('位置误差随时间变化')
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(human_poses[:, 0], human_poses[:, 1], 'g-', linewidth=3, label='人类轨迹', alpha=0.7)
plt.plot(agent_poses[:, 0], agent_poses[:, 1], 'r--', linewidth=2, label='AI预测')
plt.scatter(human_poses[0, 0], human_poses[0, 1], c='green', s=100, marker='o', label='起点')
plt.scatter(human_poses[-1, 0], human_poses[-1, 1], c='green', s=100, marker='s', label='终点')
plt.xlabel('X坐标 (米)')
plt.ylabel('Y坐标 (米)')
plt.title('轨迹对比（俯视图）')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')

plt.tight_layout()
plt.show()


In [None]:
def create_comprehensive_visualization(scene, agent):
    """
    创建综合可视化，展示多个角度的分析结果
    """
    fig = plt.figure(figsize=(16, 10))
    
    # 1. BEV轨迹对比（主要视图）
    ax1 = plt.subplot(2, 3, (1, 2))
    
    # 复用现有的BEV可视化组件
    from navsim.visualization.bev import add_configured_bev_on_ax, add_trajectory_to_bev_ax
    from navsim.visualization.config import TRAJECTORY_CONFIG
    from navsim.visualization.plots import configure_bev_ax
    
    frame_idx = scene.scene_metadata.num_history_frames - 1
    add_configured_bev_on_ax(ax1, scene.map_api, scene.frames[frame_idx])
    
    human_trajectory = scene.get_future_trajectory()
    agent_trajectory = agent.compute_trajectory(scene.get_agent_input(), scene)
    
    add_trajectory_to_bev_ax(ax1, human_trajectory, TRAJECTORY_CONFIG["human"])
    add_trajectory_to_bev_ax(ax1, agent_trajectory, TRAJECTORY_CONFIG["agent"])
    configure_bev_ax(ax1)
    ax1.set_title(f"BEV轨迹对比 - {scene.scene_metadata.scenario_type}")
    
    # 2. 误差分析
    ax2 = plt.subplot(2, 3, 3)
    human_poses = np.array(human_trajectory.poses)
    agent_poses = np.array(agent_trajectory.poses)
    min_length = min(len(human_poses), len(agent_poses))
    
    position_diff = np.sqrt(np.sum((human_poses[:min_length, :2] - agent_poses[:min_length, :2])**2, axis=1))
    time_points = np.arange(min_length) * human_trajectory.trajectory_sampling.interval_length
    
    ax2.plot(time_points, position_diff, 'b-', linewidth=2)
    ax2.set_xlabel('时间 (秒)')
    ax2.set_ylabel('位置误差 (米)')
    ax2.set_title('误差随时间变化')
    ax2.grid(True, alpha=0.3)
    
    # 3. 轨迹俯视图
    ax3 = plt.subplot(2, 3, 4)
    ax3.plot(human_poses[:, 0], human_poses[:, 1], 'g-', linewidth=3, label='人类轨迹', alpha=0.7)
    ax3.plot(agent_poses[:, 0], agent_poses[:, 1], 'r--', linewidth=2, label='AI预测')
    ax3.scatter(human_poses[0, 0], human_poses[0, 1], c='green', s=100, marker='o')
    ax3.set_xlabel('X坐标 (米)')
    ax3.set_ylabel('Y坐标 (米)')
    ax3.set_title('轨迹俯视图')
    ax3.legend()
    ax3.grid(True, alpha=0.3)
    ax3.axis('equal')
    
    # 4. 速度分析
    ax4 = plt.subplot(2, 3, 5)
    if len(human_poses) > 1:
        dt = human_trajectory.trajectory_sampling.interval_length
        human_speeds = np.linalg.norm(np.diff(human_poses[:, :2], axis=0), axis=1) / dt
        agent_speeds = np.linalg.norm(np.diff(agent_poses[:min_length, :2], axis=0), axis=1) / dt
        
        time_speed = time_points[1:min_length]
        ax4.plot(time_speed, human_speeds[:len(time_speed)], 'g-', label='人类速度', linewidth=2)
        ax4.plot(time_speed, agent_speeds[:len(time_speed)], 'r--', label='AI速度', linewidth=2)
        ax4.set_xlabel('时间 (秒)')
        ax4.set_ylabel('速度 (m/s)')
        ax4.set_title('速度对比')
        ax4.legend()
        ax4.grid(True, alpha=0.3)
    
    # 5. 统计信息
    ax5 = plt.subplot(2, 3, 6)
    ax5.axis('off')
    
    stats_text = f"""
场景信息:
• Token: {scene.scene_metadata.token[:12]}...
• 类型: {scene.scene_metadata.scenario_type}
• 日志: {scene.scene_metadata.log_name}

轨迹统计:
• 预测时长: {human_trajectory.trajectory_sampling.time_horizon:.1f}秒
• 采样间隔: {human_trajectory.trajectory_sampling.interval_length:.2f}秒
• 轨迹点数: {min_length}

性能指标:
• 平均误差: {np.mean(position_diff):.2f}m
• 最大误差: {np.max(position_diff):.2f}m
• 最终误差: {position_diff[-1]:.2f}m
• 模型类型: {agent.model_type.upper()}
    """
    
    ax5.text(0.05, 0.95, stats_text, transform=ax5.transAxes, fontsize=9,
             verticalalignment='top', fontfamily='monospace',
             bbox=dict(boxstyle="round,pad=0.5", facecolor="lightgray", alpha=0.8))
    
    plt.tight_layout()
    plt.show()
    
    return fig

# 创建综合可视化
print("创建综合可视化界面...")
comprehensive_fig = create_comprehensive_visualization(scene, inference_agent)
print("综合可视化完成！")


In [None]:
print("🎉 模型推理与可视化教程完成！")
print(f"\n当前使用的模型类型: {inference_agent.model_type}")
print(f"测试的场景: {scene.scene_metadata.scenario_type}")
print(f"场景token: {token}")

print("\n💡 接下来您可以：")
print("1. 修改checkpoint_path加载您自己的模型")
print("2. 尝试不同的场景（重新运行场景选择cell）")
print("3. 扩展可视化功能")
print("4. 集成PDM Score评估")
print("5. 创建批量分析脚本")

print("\n📚 相关文件和函数：")
print("- navsim/visualization/plots.py - 基础可视化函数")
print("- navsim/agents/diffusiondrive/transfuser_callback.py - 训练时可视化")
print("- navsim/evaluate/pdm_score.py - 评估指标计算")
print("- tutorial/tutorial_visualization.ipynb - 基础可视化教程")
