In [1]:
import carla
import math
import time
from collections import deque

In [2]:
# CCRs场景参数配置类
class CCRsScenarioConfig:
    def __init__(self):     
        # 车辆参数
        self.lead_vehicle_model = 'vehicle.audi.a2'  # 前车模型
        self.test_vehicle_model = 'vehicle.tesla.model3'  # 测试车辆模型
        
        # 场景参数
        self.initial_distance = 100  # 初始距离(米)
        self.road_type = 'straight'   # 道路类型
        self.weather_conditions = [
            'ClearNoon',    # 晴天
            'WetNoon',      # 雨天
            'HardRainNoon'  # 大雨
        ]
        
        # AEB算法参数
        self.tth = 1.2  # Time to Headway (秒)
        self.min_gap = 2.0  # 最小安全距离(米)

In [3]:
class CCRsAEBTest:
    """
    CNCAP 2024 CCRs场景AEB测试主类
    实现前方静止车辆追尾测试场景
    """
    
    def __init__(self, host='localhost', port=2000):
        # 连接CARLA服务器
        self.client = carla.Client(host, port)
        self.client.set_timeout(10.0)
        
        # 获取世界和地图
        self.world = self.client.get_world()
        self.client.load_world('Town05')
        self.map = self.world.get_map()
        
        # 场景配置
        self.config = CCRsScenarioConfig()
        
        # 测试车辆和前车引用
        self.test_vehicle = None  # 初始时没有测试车辆
        self.lead_vehicle = None  # 初始时没有前车
        
        # AEB算法实例
        self.aeb_algorithm = SimpleAEBAlgorithm()
        
        # 数据记录
        self.test_data = {
            'collision_occurred': False,
            'min_distance': float('inf'),
            'braking_distance': 0.0,
            'test_speed': 0.0
        }
    
    def setup_scenario(self, test_speed, weather_condition='ClearNoon'):
        """
        设置CCRs测试场景
        Args:
            test_speed: 测试速度(km/h)
            weather_condition: 天气条件
        """
        print(f"设置CCRs场景 - 速度: {test_speed}km/h, 天气: {weather_condition}")
        
        # 1. 设置天气
        self.set_weather(weather_condition)
        
        # 2. 生成车辆
        self.spawn_vehicles()
        
        # 3. 设置车辆初始状态
        self.set_initial_conditions(test_speed)
        
        # 4. 设置碰撞检测
        self.setup_collision_detection()
        
        # 5. 设置同步模式
        self.set_synchronous_mode(True)
    
    def set_weather(self, weather_preset):
        """设置天气条件"""
        weather_presets = {
            'ClearNoon': carla.WeatherParameters.ClearNoon,
            'WetNoon': carla.WeatherParameters.WetNoon,
            'HardRainNoon': carla.WeatherParameters.HardRainNoon,
            'ClearSunset': carla.WeatherParameters.ClearSunset
        }
        
        if weather_preset in weather_presets:
            self.world.set_weather(weather_presets[weather_preset])
            print(f"天气设置为: {weather_preset}")
    
    def spawn_vehicles(self):
        """生成测试车辆和前车"""
        try:
            # 获取车辆蓝图
            blueprint_library = self.world.get_blueprint_library()
            
            # 获取生成点
            spawn_points = self.map.get_spawn_points()
            
            # 生成前车（静止目标车辆）
            lead_bp = blueprint_library.find(self.config.lead_vehicle_model)
            self.lead_vehicle = self.world.spawn_actor(lead_bp, spawn_points[10])
            
            # 生成测试车辆
            test_bp = blueprint_library.find(self.config.test_vehicle_model)
            # 测试车辆生成在前车后方指定距离
            test_spawn_point = self.calculate_test_vehicle_spawn(spawn_points[10])
            self.test_vehicle = self.world.spawn_actor(test_bp, test_spawn_point)
            
            print("车辆生成完成")
            
        except Exception as e:
            print(f"车辆生成失败: {e}")
    
    def calculate_test_vehicle_spawn(self, lead_spawn_point):
        """
        计算测试车辆生成位置
        确保在前车后方指定距离
        """
        # 获取前车位置和朝向
        lead_transform = lead_spawn_point
        lead_location = lead_transform.location
        lead_rotation = lead_transform.rotation
        
        # 计算后方位置（相对坐标系）
        backward_vector = carla.Vector3D(
            x=-self.config.initial_distance * math.cos(math.radians(lead_rotation.yaw)),
            y=-self.config.initial_distance * math.sin(math.radians(lead_rotation.yaw)),
            z=0
        )
        
        # 测试车辆生成位置
        test_location = carla.Location(
            lead_location.x + backward_vector.x,
            lead_location.y + backward_vector.y,
            lead_location.z
        )
        
        # 创建新的变换
        #carla.Transform是CARLA中用于表示物体在3D空间中位置和朝向的核心类
        test_transform = carla.Transform(
            location=test_location,
            rotation=lead_rotation  # 同向
        )
        
        return test_transform

    def cleanup(self):
        """
        清理资源：销毁车辆和传感器
        在测试结束后调用，确保资源正确释放
        """
        print("清理测试资源...")
        
        try:
            # 销毁碰撞传感器（如果存在）
            if hasattr(self, 'collision_sensor') and self.collision_sensor is not None:
                self.collision_sensor.destroy()
                self.collision_sensor = None
                print("碰撞传感器已销毁")
            
            # 销毁测试车辆（如果存在）
            if self.test_vehicle is not None:
                self.test_vehicle.destroy()
                self.test_vehicle = None
                print("测试车辆已销毁")
            
            # 销毁前车（如果存在）
            if self.lead_vehicle is not None:
                self.lead_vehicle.destroy()
                self.lead_vehicle = None
                print("前车已销毁")
                
            # 禁用同步模式
            self.set_synchronous_mode(False)
            
        except Exception as e:
            print(f"资源清理过程中出现错误: {e}")
        
        print("资源清理完成")

In [4]:
class SimpleAEBAlgorithm:
    """
    基于TTC(Time to Collision)的简单AEB算法实现
    参考CNCAP标准的AEB测试要求
    """
    
    def __init__(self, tth=1.2, min_gap=2.0, max_deceleration=8.0):
        """
        初始化AEB参数
        Args:
            tth: Time to Headway (秒)，安全跟车时距
            min_gap: 最小安全距离 (米)
            max_deceleration: 最大减速度 (m/s²)
        """
        self.tth = tth
        self.min_gap = min_gap
        self.max_deceleration = max_deceleration
        self.warning_triggered = False
        self.braking_triggered = False
        
        # 状态记录
        self.relative_distance_history = deque(maxlen=10)
        self.relative_speed_history = deque(maxlen=10)
    
    def calculate_ttc(self, relative_distance, relative_speed):
        """
        计算碰撞时间(Time to Collision)
        Args:
            relative_distance: 相对距离 (米)
            relative_speed: 相对速度 (米/秒)，正数表示接近
        Returns:
            ttc: 碰撞时间，无穷大表示不会碰撞
        """
        if relative_speed <= 0:
            return float('inf')  # 相对速度<=0表示远离或静止，不会碰撞
        
        ttc = relative_distance / relative_speed
        return ttc if ttc > 0 else 0
    

    def calculate_safe_distance(self, ego_speed, lead_speed=0):
        """
        计算安全距离模型 - 优化版本
        基于CNCAP标准的距离模型，增加安全系数
        """
        # 优化安全距离计算：增加基础距离和速度系数
        speed_kmh = ego_speed * 3.6  # 转换为km/h
        
        # 修改：增加基础安全距离和速度系数
        base_distance = 3.0  # 从2.0增加到3.0米
        speed_coefficient = 0.15  # 从0.1增加到0.15
        
        safe_distance = base_distance + max(0, speed_kmh - 10) * speed_coefficient
        
        return safe_distance
    
    def aeb_decision(self, ego_vehicle, lead_vehicle, current_speed):
        """
        AEB决策函数 - 优化版本（提前触发）
        Returns:
            control: 车辆控制指令
            warning_level: 警告级别 0-无警告, 1-FCW, 2-AEB制动
        """
        # 计算相对位置和速度
        ego_location = ego_vehicle.get_location()
        lead_location = lead_vehicle.get_location()
        
        # 相对距离
        relative_distance = math.sqrt(
            (ego_location.x - lead_location.x)**2 + 
            (ego_location.y - lead_location.y)**2
        )
        
        # 相对速度（简化计算，使用速度矢量差）
        ego_velocity = ego_vehicle.get_velocity()
        lead_velocity = lead_vehicle.get_velocity()
        
        # 计算沿车辆前进方向的相对速度
        ego_transform = ego_vehicle.get_transform()
        forward_vector = ego_transform.get_forward_vector()
        
        ego_speed = math.sqrt(ego_velocity.x**2 + ego_velocity.y**2)
        relative_speed = ego_speed  # 前车静止，相对速度等于自车速度
        
        # 更新历史数据
        self.relative_distance_history.append(relative_distance)
        self.relative_speed_history.append(relative_speed)
        
        # 计算TTC
        ttc = self.calculate_ttc(relative_distance, relative_speed)
        
        # 计算安全距离 - 增加安全系数，提前触发
        safe_distance = self.calculate_safe_distance(ego_speed) * 1.5  # 增加50%安全距离
        
        # AEB决策逻辑 - 优化触发条件
        control = carla.VehicleControl() #创建车辆控制指令对象
        warning_level = 0
        
        # 修改1: 降低AEB触发阈值，从0.3倍安全距离改为0.5倍
        if relative_distance <= safe_distance * 0.5 or ttc < 1.0:  # 增加TTC条件
            # 紧急制动：AEB激活
            control.throttle = 0.0
            control.brake = min(1.0, self.max_deceleration / 8.0)  # 增加制动力
            control.steer = 0.0
            warning_level = 2
            self.braking_triggered = True
            print(f"AEB激活! 距离: {relative_distance:.2f}m, TTC: {ttc:.2f}s")
            
        # 修改2: 降低FCW触发阈值，从0.6倍安全距离改为0.8倍  
        elif relative_distance <= safe_distance * 0.8 or ttc < 2.0:  # 增加TTC条件
            # FCW警告：驾驶员应接管
            control.throttle = 0.0
            control.brake = 0.5  # 增加制动力
            control.steer = 0.0
            warning_level = 1
            self.warning_triggered = True
            if not self.braking_triggered:
                print(f"FCW警告! 距离: {relative_distance:.2f}m, TTC: {ttc:.2f}s")
                
        # 修改3: 提前预警阶段
        elif relative_distance <= safe_distance * 1.2 or ttc < 3.0:  # 增加预警范围
            # 预警阶段：轻微减速提示
            control.throttle = 0.0
            control.brake = 0.3  # 增加制动力
            control.steer = 0.0
            warning_level = 1
            
        else:
            # 安全距离内，正常巡航
            target_speed = current_speed / 3.6  # 转换为m/s
            current_speed_ms = ego_speed
            
            if current_speed_ms < target_speed * 0.9:
                control.throttle = 0.7
                control.brake = 0.0
            elif current_speed_ms > target_speed * 1.1:
                control.throttle = 0.0
                control.brake = 0.2
            else:
                control.throttle = 0.3
                control.brake = 0.0
                
            control.steer = 0.0
            warning_level = 0
        
        return control, warning_level, {
            'relative_distance': relative_distance,
            'relative_speed': relative_speed,
            'ttc': ttc,
            'safe_distance': safe_distance
        }

In [5]:
class CCRsAEBTest(CCRsAEBTest):
    """扩展CCRs测试类，添加测试执行逻辑"""
    
    def set_initial_conditions(self, test_speed):
        """设置初始测试条件"""
        # 设置前车为静止状态
        self.lead_vehicle.set_target_velocity(carla.Vector3D(0, 0, 0))
        
        # 设置测试车辆初始速度（通过控制指令）
        control = carla.VehicleControl() #创建车辆控制指令对象
        control.throttle = 0.0
        control.brake = 0.0
        control.steer = 0.0
        self.test_vehicle.apply_control(control)
        
        # 记录测试速度
        self.test_data['test_speed'] = test_speed
        print(f"测试速度设置为: {test_speed} km/h")
    
    def setup_collision_detection(self):
        """设置碰撞检测"""
        # 创建碰撞传感器蓝图
        blueprint_library = self.world.get_blueprint_library()
        collision_bp = blueprint_library.find('sensor.other.collision')
        
        # 将碰撞传感器附加到测试车辆
        self.collision_sensor = self.world.spawn_actor(
            collision_bp,
            carla.Transform(carla.Location(x=0.5, z=1.0)),  # 车辆前方
            attach_to=self.test_vehicle
        )
        
        # 注册碰撞事件回调
        self.collision_sensor.listen(lambda event: self.on_collision(event))
        
    def on_collision(self, event):
        """碰撞事件处理"""
        self.test_data['collision_occurred'] = True
        self.test_data['collision_time'] = time.time()
        print("!!! 碰撞发生 !!!")
    
    def set_synchronous_mode(self, enabled=True):
        """设置同步模式"""
        settings = self.world.get_settings()
        settings.synchronous_mode = enabled
        if enabled:
            settings.fixed_delta_seconds = 0.05  # 20Hz仿真频率
        self.world.apply_settings(settings)
        print(f"同步模式: {'启用' if enabled else '禁用'}")
    
    def run_single_test(self, test_speed, weather_condition='ClearNoon', use_advanced_aeb=False):
        """
        执行单次CCRs测试
        Args:
            test_speed: 测试速度 (km/h)
            weather_condition: 天气条件
            use_advanced_aeb: 是否使用高级AEB算法
        Returns:
            test_result: 测试结果字典
        """
        print(f"\n=== 开始CCRs测试 ===")
        print(f"速度: {test_speed}km/h, 天气: {weather_condition}")
        
        # 重置测试数据
        self.test_data = {
            'test_speed': test_speed,
            'weather_condition': weather_condition,
            'collision_occurred': False,
            'min_distance': float('inf'),
            'braking_start_distance': 0,
            'braking_distance': 0,
            'test_start_time': time.time(),
            'test_duration': 0,
            'aeb_activation_time': 0,
            'warning_activation_time': 0,
            'metrics_history': []
        }
        
        # 设置场景
        self.setup_scenario(test_speed, weather_condition)
        
        # 选择AEB算法
        if use_advanced_aeb:
            self.aeb_algorithm = AdvancedAEBAlgorithm()
            print("使用高级AEB算法（带传感器噪声）")
        else:
            self.aeb_algorithm = SimpleAEBAlgorithm()
            print("使用简单AEB算法")
        
        try:
            # 测试主循环
            test_start_time = time.time()
            frame_count = 0
            braking_started = False
            
            while True:
                current_time = time.time() - test_start_time
                frame_count += 1
                
                # 同步模式需要手动tick
                if self.world.get_settings().synchronous_mode:
                    self.world.tick()
                
                # 获取车辆状态
                test_location = self.test_vehicle.get_location()
                lead_location = self.lead_vehicle.get_location()
                
                # 计算相对距离,当前计算的是中心点距离，未考虑车辆尺寸
                relative_distance = math.sqrt(
                    (test_location.x - lead_location.x)**2 + 
                    (test_location.y - lead_location.y)**2
                )
                
                # 更新最小距离
                if relative_distance < self.test_data['min_distance']:
                    self.test_data['min_distance'] = relative_distance
                
                # AEB决策
                control, warning_level, metrics = self.aeb_algorithm.aeb_decision(
                    self.test_vehicle, self.lead_vehicle, test_speed
                )
                
                # 应用控制
                self.test_vehicle.apply_control(control)
                
                # 记录制动开始
                if warning_level >= 1 and not braking_started:
                    braking_started = True
                    self.test_data['braking_start_distance'] = relative_distance
                    self.test_data['warning_activation_time'] = current_time
                    if warning_level == 1:
                        print("FCW警告激活")
                
                if warning_level == 2 and self.test_data['aeb_activation_time'] == 0:
                    self.test_data['aeb_activation_time'] = current_time
                    print("AEB制动激活")
                
                # 记录度量数据
                frame_metrics = {
                    'timestamp': current_time,
                    'relative_distance': relative_distance,
                    'warning_level': warning_level,
                    'control_throttle': control.throttle,
                    'control_brake': control.brake
                }
                frame_metrics.update(metrics)
                self.test_data['metrics_history'].append(frame_metrics)
                
                # 检查测试结束条件
                test_velocity = self.test_vehicle.get_velocity()
                test_speed_ms = math.sqrt(test_velocity.x**2 + test_velocity.y**2)
                
                # 结束条件1: 发生碰撞
                if self.test_data['collision_occurred']:
                    print("测试结束：发生碰撞")
                    break
                
                # 结束条件2: 车辆完全停止
                if test_speed_ms < 0.1 and braking_started:
                    print("测试结束：车辆已停止")
                    break
                
                # 结束条件3: 超时（30秒）
                if current_time > 30.0:
                    print("测试结束：超时")
                    break
                
                # 结束条件4: 距离异常（测试车辆越过前车）
                if relative_distance > self.config.initial_distance + 10:
                    print("测试结束：距离异常")
                    break
                
                # 控制循环频率
                time.sleep(0.05)  # 约20Hz
                
        except Exception as e:
            print(f"测试执行错误: {e}")
        finally:
            # 计算最终结果
            self.test_data['test_duration'] = time.time() - test_start_time
            if braking_started:
                self.test_data['braking_distance'] = (
                    self.test_data['braking_start_distance'] - self.test_data['min_distance']
                )
            
            # 输出测试结果
            self.print_test_result()
            
            # 清理资源
            time.sleep(3) #延时3秒，可以在调试的时候看到车辆停下来的位置。实际应用不需要。
            self.cleanup()
        
        return self.test_data
    
    def print_test_result(self):
        """打印测试结果"""
        result = "通过" if not self.test_data['collision_occurred'] else "失败"
        
        print(f"\n=== 测试结果 ===")
        print(f"结果: {result}")
        print(f"碰撞发生: {'是' if self.test_data['collision_occurred'] else '否'}")
        print(f"最小距离: {self.test_data['min_distance']:.2f}米")
        print(f"制动距离: {self.test_data['braking_distance']:.2f}米")
        print(f"测试时长: {self.test_data['test_duration']:.2f}秒")
        
        if self.test_data['warning_activation_time'] > 0:
            print(f"FCW激活时间: {self.test_data['warning_activation_time']:.2f}秒")
        if self.test_data['aeb_activation_time'] > 0:
            print(f"AEB激活时间: {self.test_data['aeb_activation_time']:.2f}秒")

In [6]:
test = CCRsAEBTest()

In [7]:
result = test.run_single_test(
        test_speed=60, 
        weather_condition='WetNoon',
        use_advanced_aeb=False
    )


=== 开始CCRs测试 ===
速度: 60km/h, 天气: WetNoon
设置CCRs场景 - 速度: 60km/h, 天气: WetNoon
天气设置为: WetNoon
车辆生成完成
测试速度设置为: 60 km/h
同步模式: 启用
使用简单AEB算法
FCW警告激活
测试结束：车辆已停止

=== 测试结果 ===
结果: 通过
碰撞发生: 否
最小距离: 5.33米
制动距离: 39.19米
测试时长: 12.61秒
FCW激活时间: 6.12秒
清理测试资源...
碰撞传感器已销毁
测试车辆已销毁
前车已销毁
同步模式: 禁用
资源清理完成


In [8]:
result = test.run_single_test(
        test_speed=30, 
        weather_condition='WetNoon',
        use_advanced_aeb=False
    )


=== 开始CCRs测试 ===
速度: 30km/h, 天气: WetNoon
设置CCRs场景 - 速度: 30km/h, 天气: WetNoon
天气设置为: WetNoon
车辆生成完成
测试速度设置为: 30 km/h
同步模式: 启用
使用简单AEB算法
FCW警告激活
测试结束：车辆已停止

=== 测试结果 ===
结果: 通过
碰撞发生: 否
最小距离: 4.73米
制动距离: 18.29米
测试时长: 16.41秒
FCW激活时间: 11.63秒
清理测试资源...
碰撞传感器已销毁
测试车辆已销毁
前车已销毁
同步模式: 禁用
资源清理完成
