In [None]:
import carla
import time
import cv2
import numpy as np
import math
import sys
import random
import os

sys.path.append('D:\\CARLA\\WindowsNoEditor\\PythonAPI\\carla')
from agents.navigation.global_route_planner import GlobalRoutePlanner

In [None]:
# Define global parameters, including camera parameters

CAMERA_POS_Z = 1.3
CAMERA_POS_X = 1.4

CAM_HEIGHT = 480
CAM_WIDTH = 640
FOV = 90

YAW_ADJ_DEGREES = 15

In [None]:
def cleanup(world): 
    for actor in world.get_actors().filter('*vehicle*'):
        actor.destroy()
    for actor in world.get_actors().filter('*sensor*'):
        actor.destroy()

# Get sematic camera data
def sem_callback(image, data_dict): 
    image.convert(carla.ColorConverter.CityScapesPalette)
    data_dict['sem_image'] = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))[:, :, :3]


# Get angle between vehicle and target
def get_angle(car, wp): 
    vehicle_pos = car.get_transform()
    car_x, car_y = vehicle_pos.location.x, vehicle_pos.location.y
    wp_x, wp_y = wp.transform.location.x, wp.transform.location.y
    dx = wp_x - car_x
    dy = wp_y - car_y
    dist = (dx**2 + dy**2)**0.5
    x = dx / dist if dist != 0 else 0
    y = dy / dist if dist != 0 else 0
    car_vector = vehicle_pos.get_forward_vector()
    degrees = math.degrees(np.arctan2(y, x) - np.arctan2(car_vector.y, car_vector.x))
    if degrees < -180: degrees += 360
    elif degrees > 180: degrees -= 360
    return degrees

# Calculate latest direction
def get_distant_angle(car, wp_idx, route, delta):  
    i = min(wp_idx + delta, len(route)-1)
    for x in range(i - wp_idx):
        if route[wp_idx + x][0].is_junction:
            junction_wps = route[wp_idx + x][0].get_junction().get_waypoints(carla.LaneType.Driving)
            angles_planned = []
            all_angles = []
            for wp in junction_wps:
                angle = int(get_angle(car, wp[1]))
                if wp[1].transform.location.distance(route[wp_idx + x][0].transform.location) > 20:
                    for j in range(wp_idx + x, len(route)):
                        if wp[1].transform.location.distance(route[j][0].transform.location) < 10:
                            angles_planned.append(angle)
                        else:
                            all_angles.append(angle)
            angles_planned = list(set(angles_planned))
            all_angles = list(set(all_angles))
            alternative_angles = [a for a in all_angles if a not in angles_planned]
            if not angles_planned or not alternative_angles:
                return 0
            if min(angles_planned) < -25 and (not alternative_angles or min(alternative_angles) > min(angles_planned)):
                return -1
            elif max(angles_planned) > 25 and (not alternative_angles or max(alternative_angles) < max(angles_planned)):
                return 1
            else:
                return 0
    return 0

def draw_route(wp_idx, route, world, seconds=5.0):  
    draw_colour = carla.Color(r=255, g=0, b=0) if len(route) - wp_idx < 25 else carla.Color(r=0, g=0, b=255)
    for i in range(50):
        if wp_idx + i < len(route) - 2:
            world.debug.draw_string(route[wp_idx + i][0].transform.location, '^', draw_shadow=False,
                                     color=draw_colour, life_time=seconds, persistent_lines=True)

def select_random_route(position, locs, world):  
    grp = GlobalRoutePlanner(world.get_map(), 1)
    point_a = position.location
    route_list = []
    for loc in locs:
        route = grp.trace_route(point_a, loc.location)
        if len(route) > 100:
            route_list.append(route)
    return random.choice(route_list) if route_list else None


In [None]:
CAMERA_POS_X
def main():
    client = carla.Client('localhost', 2000)
    time.sleep(5)
    client.set_timeout(25)
    world = client.get_world()

    settings = world.get_settings()
    settings.synchronous_mode = False
    settings.no_rendering_mode = True
    world.apply_settings(settings)

    spawn_points = world.get_map().get_spawn_points()

    cleanup(world)

    start_point = random.choice(spawn_points)
    vehicle_bp = world.get_blueprint_library().filter('*model3*')[0]
    vehicle = world.try_spawn_actor(vehicle_bp, start_point)
    time.sleep(2)

    camera_data = {'sem_image': np.zeros((CAM_HEIGHT, CAM_WIDTH, 3))}

    # Semantgic camera
    sem_camera_bp = world.get_blueprint_library().find('sensor.camera.semantic_segmentation')
    sem_camera_bp.set_attribute('image_size_x', str(CAM_WIDTH))
    sem_camera_bp.set_attribute('image_size_y', str(CAM_HEIGHT))
    sem_camera_bp.set_attribute('fov', str(FOV))
    camera_init_trans = carla.Transform(carla.Location(z=CAMERA_POS_Z,x=CAMERA_POS_X))
    sem_camera = world.spawn_actor(sem_camera_bp, camera_init_trans, attach_to=vehicle)
    sem_camera.listen(lambda image: sem_callback(image, camera_data))

    # Make a route
    route = select_random_route(start_point, spawn_points, world)
    gen_dir_angle = 0

    for idx, waypoint in enumerate(route):
        vehicle.set_transform(waypoint[0].transform)
        vehicle.apply_control(carla.VehicleControl(throttle=0, steer=0, brake=1))
        time.sleep(2)

        if not waypoint[0].is_intersection and not waypoint[0].is_junction:
            gen_dir_angle = get_distant_angle(vehicle, idx, route, 30)

        draw_route(idx, route, world)

        if idx < len(route) - 2 and route[idx][0].lane_id != route[idx+1][0].lane_id:
            gen_dir_angle = -1 if get_angle(vehicle, route[idx+1][0]) < 0 else 1

        for i in range(5):
            yaw_adj = random.randrange(-YAW_ADJ_DEGREES, YAW_ADJ_DEGREES, 1)
            trans = waypoint[0].transform
            trans.rotation.yaw += yaw_adj
            vehicle.set_transform(trans)
            vehicle.apply_control(carla.VehicleControl(throttle=0, steer=0, brake=1))
            time.sleep(1)

            if idx + 5 < len(route) - 1:
                predicted_angle = get_angle(vehicle, route[idx+5][0])
                img = camera_data['sem_image']
                if np.sum(img) > 0:
                    os.makedirs("_img", exist_ok=True)
                    time_grab = time.time_ns()
                    cv2.imwrite(f"_img/{time_grab:06}_{gen_dir_angle}_{round(predicted_angle)}.png", img)

    cleanup(world)
    cv2.destroyAllWindows()


In [None]:
main()