# **IMPORT LIBRERIE**

In [1]:
import carla, time, pygame, math, random, paho.mqtt.client as mqtt
from enum import Enum

pygame 2.6.1 (SDL 2.28.4, Python 3.7.12)
Hello from the pygame community. https://www.pygame.org/contribute.html


# **CONNESSIONE A CARLA**

In [2]:
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)

# **CONNESSIONE A MQTT**

In [3]:
# Definizione variabili
broker = 'broker.emqx.io'
port = 1883
topic = "carla/fcw_state"
# Connessione
mqttc = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
mqttc.connect(broker, port)

<MQTTErrorCode.MQTT_ERR_SUCCESS: 0>

# **VARIABILI GLOBALI**

In [4]:
# Instanza mondo
world = client.get_world()

In [5]:
# Enum state del sistema adas
class Fcw_state(Enum):
    IDLE = 4
    WARNING = 3
    ACTION = 2
    ESCAPE = 1

In [19]:
# Variabili di configurazione
min_fcw_state = Fcw_state.IDLE
vehicle_vertical_dimension = 0.75
vehicle_lateral_dimension = 0.9
min_ttc = 0.5
average_reaction_time = 2.5
throttle = 0.7
velocity_threshold = 2.77
max_steer_angle = 40
angle_steer_tollerance = 0.05
radar_range = 285

# **METODI PER LA CREAZIONE DI ATTORI**

In [7]:
def spawn_vehicle(vehicle_index=0, spawn_index=0, pattern='vehicle.*'):
    blueprint_library = world.get_blueprint_library()
    vehicle_bp = blueprint_library.filter(pattern)[vehicle_index]
    spawn_point = world.get_map().get_spawn_points()[spawn_index]
    vehicle = world.spawn_actor(vehicle_bp, spawn_point)
    return vehicle

In [8]:
def spawn_radar(vehicle):
    rad_bp = world.get_blueprint_library().find('sensor.other.radar')
    rad_bp.set_attribute('horizontal_fov', str(125))
    rad_bp.set_attribute('vertical_fov', str(115))
    rad_bp.set_attribute('range', str(radar_range))
    rad_bp.set_attribute('points_per_second', str(2000))
    rad_location = carla.Location(x=2.25, z=0.9)
    rad_rotation = carla.Rotation()
    rad_transform = carla.Transform(rad_location,rad_rotation)
    return world.spawn_actor(rad_bp,rad_transform,attach_to=vehicle, attachment_type=carla.AttachmentType.Rigid)


In [9]:
def spawn_walker():
    walker_bp = random.choice(world.get_blueprint_library().filter("walker.pedestrian.*"))

    start_location = carla.Location(x=70, y=25, z=1)

    trans = carla.Transform()
    trans.location = start_location

    return world.spawn_actor(walker_bp, trans)

# **FUNZIONI DI SUPPORTO GENERICHE**

In [10]:
# Controllo dei segno
def equals_sign(a, b):
    return (a > 0 and b > 0) or (a < 0 and b < 0)

In [11]:
# Convertitori
def check_lateral_collision(azimuth, depth, steering_angle):
    global vehicle_lateral_dimension
    if equals_sign(azimuth, steering_angle):
        return True
    return abs(depth * math.sin(azimuth)) < vehicle_lateral_dimension

def check_vertical_collision(altitude, depth):
    global vehicle_vertical_dimension
    return abs(depth * math.sin(altitude)) < vehicle_vertical_dimension 

def get_projected_velocity(azimuth, altitude, velocity, steering_angle):
    return abs(math.cos(azimuth - steering_angle) * math.cos(altitude) * velocity)

def get_projected_depth(azimuth, altitude, depth, steering_angle):
    return abs(math.cos(azimuth - steering_angle) * math.cos(altitude) * depth)

def get_breaking_distance(projected_velocity, asphalt_friction_deceleration):
    return (1/2) * (projected_velocity**2) / asphalt_friction_deceleration

def get_vehicle_velocity(vehicle):
    velocity_vector = vehicle.get_velocity()
    velocity = (velocity_vector.x**2 + velocity_vector.y**2 + velocity_vector.z**2)**0.5
    if vehicle.get_control().reverse:
        velocity = -velocity
    return velocity

def get_max_depth(azimuth, steering_angle):
    global vehicle_lateral_dimension
    if steering_angle == 0:
        return radar_range
    if equals_sign(azimuth, steering_angle):
        stering_range = abs(vehicle_lateral_dimension / math.tan(steering_angle))
    else:
        stering_range = abs((vehicle_lateral_dimension / 2) / math.tan(steering_angle))
    return min(stering_range, radar_range)
    

In [12]:
# Comandi guida veicolo
def move_forward(vehicle):
    global throttle
    control = carla.VehicleControl()
    control.throttle = throttle
    vehicle.apply_control(control)

def stop_vehicle(vehicle):
    control = carla.VehicleControl()
    control.throttle = 0.0
    control.brake = 1
    vehicle.apply_control(control)

In [13]:
# Simulazione sensore umidità dell'asflato
def get_asphalt_friction_coefficient():
    world = client.get_world()
    weather = world.get_weather()
    weather.precipitation_deposits
    weather.wetness
    return 0.8

In [14]:
# Invio messaggi Mqtt
def publish_message(msg):
    global mqttc
    status = mqttc.publish(topic, msg)
    if status[0] == 0:
        print(f"Send `{msg}` to topic `{topic}`")
    else:
        print(f"Failed to send message to topic {topic}")

# **LOGICA ADAS**

In [15]:
# Debug visivo
def radar_vidual_debug(detection, radar_data, red, green, blue):
    current_rot = radar_data.transform.rotation
    azi = math.degrees(detection.azimuth)
    alt = math.degrees(detection.altitude)
    
    fw_vec = carla.Vector3D(x=detection.depth - 0.25)
    carla.Transform(
        carla.Location(),
        carla.Rotation(
            pitch=current_rot.pitch + alt,
            yaw=current_rot.yaw + azi,
            roll=current_rot.roll)).transform(fw_vec)
    
    world.debug.draw_point(
        radar_data.transform.location + fw_vec,
        size=0.075,
        life_time=0.25,
        persistent_lines=False,
        color=carla.Color(red, green, blue))

In [21]:
# Algoritmo
def forward_collision_callback(radar_data, vehicle):
  global min_fcw_state, min_ttc, velocity_threshold, average_reaction_time, max_steer_angle, angle_steer_tollerance
  asphalt_friction_coefficient = get_asphalt_friction_coefficient()
  asphalt_friction_deceleration = 9.81 * asphalt_friction_coefficient
  control = vehicle.get_control()
  if -angle_steer_tollerance < control.steer < angle_steer_tollerance:
    steering_angle = control.steer * max_steer_angle
  else:
    steering_angle = 0
  for detection in radar_data:
    vehicle_velocity = get_vehicle_velocity(vehicle)
    if vehicle_velocity > velocity_threshold:
      if detection.velocity < 0 and check_lateral_collision(detection.azimuth, detection.depth, steering_angle) and check_vertical_collision(detection.altitude, detection.depth):
        projected_depth = get_projected_depth(detection.azimuth, detection.altitude, detection.depth, steering_angle)
        max_depth = get_max_depth(detection.azimuth, steering_angle)
        if projected_depth <= max_depth:
          projected_velocity = get_projected_velocity(detection.azimuth, detection.altitude, detection.velocity, steering_angle) 
          breaking_distance = get_breaking_distance(projected_velocity, asphalt_friction_deceleration)
          reacting_distance = max(0, projected_depth - breaking_distance)
          ttc = reacting_distance / projected_velocity
          fcw_state = Fcw_state.IDLE
          if ttc < min_ttc:  
            if vehicle_velocity < projected_velocity - velocity_threshold:
              fcw_state = Fcw_state.ESCAPE
              radar_vidual_debug(detection, radar_data, 0,0,1)
            else:
              fcw_state = Fcw_state.ACTION
              radar_vidual_debug(detection, radar_data, 1,0,0)
          elif ttc < min_ttc + average_reaction_time:
            fcw_state = Fcw_state.WARNING
            radar_vidual_debug(detection, radar_data, 1,1,0)
          else:
            radar_vidual_debug(detection, radar_data, 0,1,0)
          if fcw_state.value < min_fcw_state.value:
            min_fcw_state = fcw_state    
            publish_message(fcw_state.name)
    else:
      fcw_state = Fcw_state.IDLE
        

# **SIMULAZIONE**

In [17]:
# Start mqtt
mqttc.loop_start()

<MQTTErrorCode.MQTT_ERR_SUCCESS: 0>

In [22]:
try:
    vehicle = spawn_vehicle()
    radar = spawn_radar(vehicle)
    #walker = spawn_walker()

    radar.listen(lambda radar_data: forward_collision_callback(radar_data, vehicle))

    min_fcw_state = Fcw_state.IDLE
    while True:
        if min_fcw_state == Fcw_state.ACTION:
            stop_vehicle(vehicle)
        else:
            move_forward(vehicle)
        time.sleep(0.05)

finally:
    vehicle.destroy()
    radar.destroy()
    #walker.destroy()
    

Send `ACTION` to topic `carla/fcw_state`


KeyboardInterrupt: 

In [None]:
# Stop mqtt
mqttc.loop_stop()
mqttc.disconnect()