# 实验对比：自定义框架 vs. 默认 LC2013

[cite_start]本 Notebook 用于运行 A/B 对比测试，评估论文中提出的自定义换道框架 [cite: 257-376][cite_start]（实验 B）与 SUMO 的默认 LC2013 [cite: 161-173] 换道模型（实验 A）之间的性能差异。

**对比指标:**
1.  **吞吐量 (Throughput)**
2.  **平均速度 (Average Speed)**
3.  [cite_start]**2D-TTC 安全性 (2D-TTC)** [cite: 307-313]

In [1]:
import traci
import sumolib
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# --- 关键路径设置 ---
# 1. 将项目根目录 (DCDL_GMARL) 添加到 sys.path
# 这允许我们从 'controller' 包中导入模块
CURRENT_DIR = os.path.abspath('.') # 应该是 DCDL_GMARL/analysis
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, '..'))
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)

print(f"项目根目录: {PROJECT_ROOT}")
print(f"Python 搜索路径: {sys.path}")

# --- 检查 SUMO_HOME ---
if "SUMO_HOME" not in os.environ:
    print("错误: SUMO_HOME 环境变量未设置。")
    # 这里可以设置一个默认值，或者直接退出
    # os.environ["SUMO_HOME"] = "D:/SUMO" # 例如
    sys.exit("请设置 SUMO_HOME 环境变量")
else:
    print(f"SUMO_HOME: {os.environ['SUMO_HOME']}")
    tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
    if tools not in sys.path:
        sys.path.append(tools)

项目根目录: f:\论文4\DCDL_GMARL
Python 搜索路径: ['f:\\论文4\\DCDL_GMARL\\analysis', 'D:\\SUMO\\tools', 'd:\\Anaconda\\python311.zip', 'd:\\Anaconda\\DLLs', 'd:\\Anaconda\\Lib', 'd:\\Anaconda', '', 'C:\\Users\\张凡\\AppData\\Roaming\\Python\\Python311\\site-packages', 'C:\\Users\\张凡\\AppData\\Roaming\\Python\\Python311\\site-packages\\win32', 'C:\\Users\\张凡\\AppData\\Roaming\\Python\\Python311\\site-packages\\win32\\lib', 'C:\\Users\\张凡\\AppData\\Roaming\\Python\\Python311\\site-packages\\Pythonwin', 'd:\\Anaconda\\Lib\\site-packages', 'd:\\Anaconda\\Lib\\site-packages\\win32', 'd:\\Anaconda\\Lib\\site-packages\\win32\\lib', 'd:\\Anaconda\\Lib\\site-packages\\Pythonwin', 'D:\\SUMO\\tools', 'f:\\论文4\\DCDL_GMARL']
SUMO_HOME: D:\SUMO\


In [2]:
import traci
import sumolib
import os
import sys
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# =================================================================
# 1. 路径注入
# =================================================================
CURRENT_DIR = os.path.abspath('.') 
PROJECT_ROOT = os.path.abspath(os.path.join(CURRENT_DIR, '..'))
if PROJECT_ROOT not in sys.path:
    sys.path.append(PROJECT_ROOT)
    print(f"项目根目录已添加到路径: {PROJECT_ROOT}")
else:
    print(f"项目根目录已在路径中: {PROJECT_ROOT}")

# 检查 SUMO_HOME
if "SUMO_HOME" not in os.environ:
    print("错误: SUMO_HOME 环境变量未设置。")
    sys.exit("请设置 SUMO_HOME 环境变量")
else:
    tools = os.path.join(os.environ['SUMO_HOME'], 'tools')
    if tools not in sys.path:
        sys.path.append(tools)
    print(f"SUMO_HOME tools 已在路径中: {tools}")

# =================================================================
# 2. 导入项目模块
# =================================================================
try:
    from controller.config import CONFIG
    from controller.controllers.lane_manager import LaneManager
    from controller.controllers.vehicle_controller import VehicleController
    # 导入您 .py 文件中的类
    from controller.environment.twod_ttc_calculator import TwoDTTC_Calculator
    print("成功导入所有控制器和 2D-TTC 模块。")
except ImportError as e:
    print(f"致命错误: 无法导入依赖模块。")
    print(f"  请确保您的 'controller' 文件夹结构正确，且所有 '__init__.py' 文件都存在。")
    print(f"  错误详情: {e}")
    sys.exit(1)
except Exception as e:
    print(f"发生意外导入错误: {e}")
    sys.exit(1)

# =================================================================
# 3. 全局配置
# =================================================================
SUMO_BINARY = os.environ.get("SUMO_BIN", "sumo") # 用 "sumo" (无界面) 会快得多
SCENARIO_PATH = os.path.join(PROJECT_ROOT, "scenario")
SUMO_CONFIG_FILE = os.path.join(SCENARIO_PATH, "test.sumocfg")

CONTROL_EDGES = ["2", "3", "4", "5", "6", "7", "8", "9", "10", "11"]
CONTROLLED_LANE_INDEX = 0

FIXED_POLICY_M = 5 # CDL 长度 (SCLs)
FIXED_POLICY_N = 5 # HML 长度 (SCLs)

DOWNSTREAM_DETECTOR_IDS = ["D_15_0", "D_15_1", "D_15_2"]
CRITICAL_TTC_THRESHOLD = 3.0 # 危险TTC阈值

# =================================================================
# 4. 仿真函数定义 (参考 main.py 修正)
# =================================================================
def run_simulation(run_mode: str, label: str):
    """
    运行一次完整的 SUMO 仿真并收集数据。
    """
    
    # 1. 启动 SUMO (已移除 --remote-port)
    sumo_cmd = [
        os.path.join(os.environ["SUMO_HOME"], "bin", SUMO_BINARY),
        "-c", SUMO_CONFIG_FILE,
        "--step-length", str(CONFIG.scenario.SIM_STEP_LENGTH_S),
        "--quit-on-end", "true",
        "--no-warnings", "true",
    ]
    
    print(f"  [启动 {label}]: {' '.join(sumo_cmd)}")
    try:
        traci.start(sumo_cmd, port=8813, label=label)
    except Exception as e:
        print(f"  [!! 错误 !!]: TraCI 无法连接到 SUMO。 端口 8813 是否被占用？")
        print(f"  错误详情: {e}")
        return pd.DataFrame() 
    
    # 2. 实例化控制器
    lane_manager = LaneManager(CONTROL_EDGES, CONTROLLED_LANE_INDEX)
    veh_controller = VehicleController(CONTROL_EDGES)
    
    ttc_calculator = TwoDTTC_Calculator(
        time_step=CONFIG.scenario.SIM_STEP_LENGTH_S,
        ttc_threshold=CRITICAL_TTC_THRESHOLD
    )
    
    results_data = []
    
    # 3. 应用策略
    lane_manager.initialize_permissions()
    lane_manager.apply_lane_strategy(FIXED_POLICY_M, FIXED_POLICY_N)

    # 4. 运行主循环
    # ==================== [ 最终修正 ] ====================
    # 严格参考 main.py 中的 try...except...finally 逻辑
    # 来防止 AttributeError
    # ======================================================
    step = 0
    try:
        while traci.simulation.getMinExpectedNumber() > 0:
            traci.simulationStep()
            lane_manager.step()
            
            if run_mode == 'CUSTOM_FRAMEWORK':
                active_hml_lanes = lane_manager.get_active_hml_lanes() 
                active_cdl_lanes = lane_manager.get_active_cdl_lanes()
                cdl_start = lane_manager.get_cdl_start_edge()
                
                veh_controller.update_vehicle_states(
                    active_hml_lanes, 
                    active_cdl_lanes, 
                    cdl_start
                )
            
            # --- 数据收集 ---
            step_throughput = 0
            step_speeds = []
            
            for det_id in DOWNSTREAM_DETECTOR_IDS:
                step_throughput += traci.inductionloop.getLastStepVehicleNumber(det_id)
                if traci.inductionloop.getLastStepVehicleNumber(det_id) > 0:
                    speed = traci.inductionloop.getLastStepMeanSpeed(det_id)
                    if speed >= 0:
                        step_speeds.append(speed)
            
            mean_speed_mps = np.mean(step_speeds) if step_speeds else np.nan
            
            all_veh_ids = traci.vehicle.getIDList()
            step_ttc_count = 0
            
            if all_veh_ids:
                all_states = ttc_calculator.get_all_vehicle_states(all_veh_ids)
                for veh_id in all_veh_ids:
                    ttc_val = ttc_calculator.calculate_vehicle_2d_ttc(veh_id, all_states)
                    if ttc_val is not None and 0 < ttc_val < CRITICAL_TTC_THRESHOLD:
                        step_ttc_count += 1
            
            results_data.append({
                "step": step,
                "time_s": step * CONFIG.scenario.SIM_STEP_LENGTH_S,
                "throughput_veh": step_throughput,
                "mean_speed_mps": mean_speed_mps,
                "critical_ttc_events": step_ttc_count
            })

            step += 1
            if step % 1000 == 0:
                print(f"  [{label}] 仿真步: {step}")
    
    # (逻辑参考 main.py)
    except traci.TraCIException as e:
        if "connection closed by SUMO" in str(e):
            print(f"  [{label}] 仿真正常结束。")
        else:
            print(f"  [{label}] 仿真在中途出错: {e}")
    
    # (逻辑参考 main.py)
    finally:
        # 这个嵌套的 try/except 是防止 'AttributeError' 的关键
        try:
            traci.close()
            print(f"  [{label}] TraCI 连接已关闭。")
        except Exception:
            print(f"  [{label}] TraCI 连接已自动关闭。")

    return pd.DataFrame(results_data)

# =================================================================
# 5. 执行实验
# =================================================================
print("\n--- 开始对比实验 ---")

# 1. 运行实验 A (基线 - LC2013)
df_baseline = run_simulation(
    run_mode='DEFAULT_LC2013', 
    label="LC2013_Baseline"
)
df_baseline.to_csv(os.path.join(CURRENT_DIR, "analysis_results_baseline.csv"), index=False)
print("基线 (LC2013) 运行完成并保存。")

print("\n" + "="*30 + "\n")

# 2. 运行实验 B (您的框架)
df_custom = run_simulation(
    run_mode='CUSTOM_FRAMEWORK', 
    label="Custom_Framework"
)
df_custom.to_csv(os.path.join(CURRENT_DIR, "analysis_results_custom.csv"), index=False)
print("自定义框架运行完成并保存。")

print("\n--- 所有实验已完成 ---")

项目根目录已在路径中: f:\论文4\DCDL_GMARL
SUMO_HOME tools 已在路径中: D:\SUMO\tools
成功导入所有控制器和 2D-TTC 模块。

--- 开始对比实验 ---
LaneManager 初始化完毕，管理 10 个 SCL 路段。
2D-TTC 计算器已初始化 (L=5.0m, W=1.8m)
  [LC2013_Baseline] 仿真在中途出错: Induction loop 'D_15_0' is not known
  [LC2013_Baseline] TraCI 连接已关闭。
基线 (LC2013) 运行完成并保存。


LaneManager 初始化完毕，管理 10 个 SCL 路段。
2D-TTC 计算器已初始化 (L=5.0m, W=1.8m)
  [Custom_Framework] 仿真在中途出错: Induction loop 'D_15_0' is not known
  [Custom_Framework] TraCI 连接已关闭。
自定义框架运行完成并保存。

--- 所有实验已完成 ---


# 结果分析与可视化

In [3]:
def summarize_results(df: pd.DataFrame, name: str):
    if df.empty:
        print(f"--- {name} 结果 (无数据) ---")
        return
        
    print(f"\n--- {name} 结果汇总 ---")
    
    # 1. 吞吐量
    total_throughput = df['throughput_veh'].sum()
    print(f"  总吞吐量 (辆): {total_throughput}")
    
    # 2. 平均速度 (m/s -> km/h)
    avg_speed_kph = (df['mean_speed_mps'].mean() * 3.6) # .mean() 自动忽略 nans
    print(f"  平均速度 (km/h): {avg_speed_kph:.2f}")
    
    # 3. 2D-TTC
    total_critical_events = df['critical_ttc_events'].sum()
    print(f"  危险 2D-TTC 事件总数 (TTC < {CRITICAL_TTC_THRESHOLD}s): {total_critical_events}")

# 打印两个实验的汇总
summarize_results(df_baseline, "A: 默认 LC2013 ")
summarize_results(df_custom, "B: 自定义框架 ")

--- A: 默认 LC2013  结果 (无数据) ---
--- B: 自定义框架  结果 (无数据) ---


In [4]:
if not df_baseline.empty and not df_custom.empty:
    plt.figure(figsize=(12, 6))
    
    # 计算累计吞吐量
    df_baseline['cumulative_throughput'] = df_baseline['throughput_veh'].cumsum()
    df_custom['cumulative_throughput'] = df_custom['throughput_veh'].cumsum()
    
    plt.plot(df_baseline['time_s'], df_baseline['cumulative_throughput'], label='默认 LC2013 ')
    plt.plot(df_custom['time_s'], df_custom['cumulative_throughput'], label='自定义框架 ', linestyle='--')
    
    plt.title('对比 1: 累计吞吐量', fontsize=16)
    plt.xlabel('仿真时间 (秒)')
    plt.ylabel('累计吞吐量 (辆)')
    plt.legend()
    plt.grid(True)
    plt.show()
else:
    print("数据不完整，跳过绘制吞吐量图表。")

数据不完整，跳过绘制吞吐量图表。


In [5]:
if not df_baseline.empty and not df_custom.empty:
    plt.figure(figsize=(12, 6))
    
    # 使用滚动平均 (rolling mean) 使曲线平滑
    ROLLING_WINDOW = 300 # 30秒 (300 步 * 0.1s/步)
    
    baseline_speed_kph = (df_baseline['mean_speed_mps'] * 3.6).rolling(window=ROLLING_WINDOW).mean()
    custom_speed_kph = (df_custom['mean_speed_mps'] * 3.6).rolling(window=ROLLING_WINDOW).mean()
    
    plt.plot(df_baseline['time_s'], baseline_speed_kph, label=f'默认 LC2013  ({ROLLING_WINDOW}步滚动平均)')
    plt.plot(df_custom['time_s'], custom_speed_kph, label=f'自定义框架  ({ROLLING_WINDOW}步滚动平均)', linestyle='--')
    
    plt.title('对比 2: 平均速度', fontsize=16)
    plt.xlabel('仿真时间 (秒)')
    plt.ylabel('平均速度 (km/h)')
    plt.legend()
    plt.grid(True)
    plt.ylim(bottom=0) # 速度不能为负
    plt.show()
else:
    print("数据不完整，跳过绘制速度图表。")

数据不完整，跳过绘制速度图表。


In [6]:
if not df_baseline.empty and not df_custom.empty:
    plt.figure(figsize=(10, 6))
    
    # 比较两个实验中 *发生* 危险TTC事件的 *总数*
    # (这比平均TTC值更能反映安全性)
    events_baseline = df_baseline['critical_ttc_events'].sum()
    events_custom = df_custom['critical_ttc_events'].sum()
    
    labels = ['默认 LC2013 ', '自定义框架 ']
    values = [events_baseline, events_custom]
    
    plt.bar(labels, values, color=['blue', 'orange'])
    
    plt.title(f'对比 3: 危险 2D-TTC 事件总数 (TTC < {CRITICAL_TTC_THRESHOLD}s)', fontsize=16)
    plt.ylabel('事件总数 (越低越安全)')
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    
    for i, v in enumerate(values):
        plt.text(i, v + (max(values) * 0.01), str(v), ha='center', fontweight='bold')
        
    plt.show()
else:
    print("数据不完整，跳过绘制 2D-TTC 图表。")

数据不完整，跳过绘制 2D-TTC 图表。
