In [1]:
import carla #the sim library itself
import random #to pick random spawn point
import cv2 #to work with images from cameras
import numpy as np #in this example to change image representation - re-shaping
import time
import os
import logging

In [2]:
# # connect to the sim 
# client = carla.Client('localhost', 2000)

try:
    # Connect to the server on localhost at port 2000
    client = carla.Client('localhost', 2000)
    client.set_timeout(10.0)  # seconds

    # Retrieve the worl
    print("Connected to CARLA world:")

except Exception as e:
    print("Failed to connect to CARLA:", e)

Connected to CARLA world:


In [3]:
# optional to load different towns
#client.set_timeout(15)
client.load_world('Town01')

<carla.libcarla.World at 0x161d8b5fc00>

In [4]:
world = client.get_world()
spawn_points = world.get_map().get_spawn_points()

In [5]:
# Cell 1: Setup Traffic Manager
tm_port = 8000
traffic_manager = client.get_trafficmanager(tm_port)
traffic_manager.set_global_distance_to_leading_vehicle(2.5)

# Optional: enable hybrid mode for large maps
traffic_manager.set_hybrid_physics_mode(True)
traffic_manager.set_hybrid_physics_radius(70.0)
print("Traffic Manager configured.")

Traffic Manager configured.


In [6]:
# Cell 2: Configure synchronous mode
settings = world.get_settings()
synchronous_master = False

if not settings.synchronous_mode:
    settings.synchronous_mode = True
    settings.fixed_delta_seconds = 0.05
    synchronous_master = True
    world.apply_settings(settings)
print(f"Synchronous mode enabled: {settings.synchronous_mode}, fixed delta seconds: {settings.fixed_delta_seconds}")

Synchronous mode enabled: True, fixed delta seconds: 0.05


In [7]:
# Cell 3: Prepare spawn points and vehicle blueprints
import random

vehicle_blueprints = world.get_blueprint_library().filter('vehicle.*')

random.shuffle(spawn_points)

number_of_vehicles = min(30, len(spawn_points))
print(f"Number of vehicles to spawn: {number_of_vehicles}")


Number of vehicles to spawn: 30


In [8]:
# Cell 4: Spawn vehicles with autopilot
vehicles_list = []
batch = []

for n in range(number_of_vehicles):
    bp = random.choice(vehicle_blueprints)
    if bp.has_attribute('color'):
        color = random.choice(bp.get_attribute('color').recommended_values)
        bp.set_attribute('color', color)
    if bp.has_attribute('driver_id'):
        driver_id = random.choice(bp.get_attribute('driver_id').recommended_values)
        bp.set_attribute('driver_id', driver_id)
    bp.set_attribute('role_name', 'autopilot')
    transform = spawn_points[n]
    if n == 0:
        # For the first vehicle: only spawn (no autopilot yet)
        batch.append(carla.command.SpawnActor(bp, transform))
    else:
        # For others: spawn and set autopilot immediately
        batch.append(carla.command.SpawnActor(bp, transform).then(
            carla.command.SetAutopilot(carla.command.FutureActor, True, tm_port)
        ))
        
responses = client.apply_batch_sync(batch, synchronous_master)
for response in responses:
    if response.error:
        import logging
        logging.error(response.error)
    else:
        vehicles_list.append(response.actor_id)

print(f"Spawned {len(vehicles_list)} vehicles.")


Spawned 30 vehicles.


In [None]:
# Cell 5: Prepare to spawn walkers
walker_blueprints = world.get_blueprint_library().filter('walker.pedestrian.*')
number_of_walkers = 10
walkers_list = []
spawn_points = []

for _ in range(number_of_walkers):
    loc = world.get_random_location_from_navigation()
    if loc is not None:
        spawn_points.append(carla.Transform(loc))

print(f"Walker spawn points generated: {len(spawn_points)}")


Walker spawn points generated: 5


In [10]:
# Cell 6: Spawn walkers (pedestrians)
walker_speed = []
batch = []

for spawn_point in spawn_points:
    walker_bp = random.choice(walker_blueprints)
    if walker_bp.has_attribute('is_invincible'):
        walker_bp.set_attribute('is_invincible', 'false')
    speed = walker_bp.get_attribute('speed').recommended_values[1]
    walker_speed.append(float(speed))
    batch.append(carla.command.SpawnActor(walker_bp, spawn_point))

results = client.apply_batch_sync(batch, True)
for i, result in enumerate(results):
    if result.error:
        logging.error(result.error)
    else:
        walkers_list.append({"id": result.actor_id})
print(f"Spawned {len(walkers_list)} walkers.")


Spawned 5 walkers.


In [11]:
# Cell 7: Spawn controllers for walkers
controller_bp = world.get_blueprint_library().find('controller.ai.walker')
batch = []

for walker in walkers_list:
    batch.append(carla.command.SpawnActor(controller_bp, carla.Transform(), walker["id"]))

results = client.apply_batch_sync(batch, True)
for i, result in enumerate(results):
    if result.error:
        logging.error(result.error)
    else:
        walkers_list[i]["con"] = result.actor_id

print("Spawned controllers for walkers.")


Spawned controllers for walkers.


In [12]:
# Cell 8: Start walkers and assign random destinations
all_id = []
for w in walkers_list:
    all_id.extend([w["con"], w["id"]])
all_actors = world.get_actors(all_id)

world.tick()

for i in range(0, len(all_id), 2):
    controller = all_actors[i]
    actor = all_actors[i + 1]
    controller.start()
    controller.go_to_location(world.get_random_location_from_navigation())
    controller.set_max_speed(walker_speed[i // 2])

print(f"Started {len(walkers_list)} walkers with destinations.")


Started 5 walkers with destinations.


In [17]:
# Cell 2.1: Keep ticking the world in synchronous mode
import time

print("Starting ticking loop. Press Interrupt (Ctrl+C) to stop.")
try:
    while True:
        world.tick()
        time.sleep(0.05)  # match fixed_delta_seconds to avoid busy wait
except KeyboardInterrupt:
    print("Ticking loop stopped by user.")

Starting ticking loop. Press Interrupt (Ctrl+C) to stop.
Ticking loop stopped by user.


In [14]:
# move simulator view to the car
spawn_points = world.get_map().get_spawn_points()
start_point = spawn_points[2]
spectator = world.get_spectator()
start_point.location.z = start_point.location.z+1 #start_point was used to spawn the car but we move 1m up to avoid being on the floor
spectator.set_transform(start_point)

In [15]:
#look for a blueprint of Mini car
vehicle_bp = world.get_blueprint_library().filter('*Audi*')

In [16]:
#spawn a car in a random location (first spawn point in the list)
vehicle = world.try_spawn_actor(vehicle_bp[0], start_point)


In [26]:
# Setup bird eye view

CAMERA_POS_Z = 15       # 15 meters above ground
CAMERA_ROT_X = -90.0    # Camera looks straight down

# === Get blueprint library ===
blueprint_library = world.get_blueprint_library()

# === RGB Camera blueprint ===
camera_rgb_bp = blueprint_library.find('sensor.camera.rgb')
camera_rgb_bp.set_attribute('image_size_x', '640')
camera_rgb_bp.set_attribute('image_size_y', '360')

# === Camera Transform ===
camera_init_trans = carla.Transform(
    carla.Location(z=CAMERA_POS_Z),
    carla.Rotation(pitch=CAMERA_ROT_X)
)

# === Spawn RGB Camera actor ===
camera_rgb = world.spawn_actor(camera_rgb_bp, camera_init_trans, attach_to=vehicle)

# === Semantic Segmentation Camera blueprint ===
camera_seg_bp = blueprint_library.find('sensor.camera.semantic_segmentation')
camera_seg_bp.set_attribute('image_size_x', '640')
camera_seg_bp.set_attribute('image_size_y', '360')

# === Spawn Semantic Segmentation Camera actor ===
camera_seg = world.spawn_actor(camera_seg_bp, camera_init_trans, attach_to=vehicle)

# === Lidar Camera blueprint ===
camera_lidar_bp = blueprint_library.find('sensor.lidar.ray_cast')
camera_lidar_bp.set_attribute('channels', '32')
camera_lidar_bp.set_attribute('range', '50')
camera_lidar_bp.set_attribute('rotation_frequency', '10')

# === Spawn Lidar Camera actor ===
camera_lidar = world.spawn_actor(camera_lidar_bp, camera_init_trans, attach_to=vehicle)

# === Depth Camera blueprint ===
camera_depth_bp = blueprint_library.find('sensor.camera.depth')
camera_depth_bp.set_attribute('image_size_x', '640')
camera_depth_bp.set_attribute('image_size_y', '360')

# === Spawn Depth Camera actor ===
camera_depth = world.spawn_actor(camera_depth_bp, camera_init_trans, attach_to=vehicle)


In [31]:
# Setup window car view

CAMERA_POS_Z = 1       # 15 meters above ground

# === Camera Transform ===
camera_init_trans_window = carla.Transform(
    carla.Location(z=CAMERA_POS_Z)
)

# === Spawn RGB Camera actor ===
camera_rgb_window = world.spawn_actor(camera_rgb_bp, camera_init_trans_window, attach_to=vehicle)

# === Spawn Semantic Segmentation Camera actor ===
camera_seg_window = world.spawn_actor(camera_seg_bp, camera_init_trans_window, attach_to=vehicle)

# === Spawn Lidar Camera actor ===
camera_lidar_window = world.spawn_actor(camera_lidar_bp, camera_init_trans_window, attach_to=vehicle)

# === Spawn Depth Camera actor ===
camera_depth_window = world.spawn_actor(camera_depth_bp, camera_init_trans_window, attach_to=vehicle)


In [34]:
# bird eye view
os.makedirs('D:/KP_Alexis_Doci/rgb', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/seg', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/rgb', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/seg', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/lidar', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/depth', exist_ok=True)

# window car view
os.makedirs('D:/KP_Alexis_Doci/rgb_window', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/seg_window', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/lidar_window', exist_ok=True)
os.makedirs('D:/KP_Alexis_Doci/depth_window', exist_ok=True)

rgb_frame_counter = 1
seg_frame_counter = 1
depth_frame_counter = 1
lidar_frame_counter = 1

rgb_frame_counter_window = 1
seg_frame_counter_window = 1
depth_frame_counter_window = 1
lidar_frame_counter_window = 1

# === Bird Eye view callbacks ===
def process_rgb(image):
    global rgb_frame_counter
    image.save_to_disk(f'D:/KP_Alexis_Doci/rgb/frame_{rgb_frame_counter:05d}.png')
    rgb_frame_counter += 1

def process_seg(image):
    global seg_frame_counter
    image.convert(carla.ColorConverter.CityScapesPalette)
    image.save_to_disk(f'D:/KP_Alexis_Doci/seg/frame_{seg_frame_counter:05d}.png')
    seg_frame_counter += 1

def process_depth(image):
    global depth_frame_counter
    image.convert(carla.ColorConverter.LogarithmicDepth)
    image.save_to_disk(f'D:/KP_Alexis_Doci/depth/frame_{depth_frame_counter:05d}.png')
    depth_frame_counter += 1


# === Window Car view callbacks ===
def process_rgb_window(image):
    global rgb_frame_counter_window
    image.save_to_disk(f'D:/KP_Alexis_Doci/rgb_window/frame_{rgb_frame_counter_window:05d}.png')
    rgb_frame_counter_window += 1

def process_seg_window(image):
    global seg_frame_counter_window
    image.convert(carla.ColorConverter.CityScapesPalette)
    image.save_to_disk(f'D:/KP_Alexis_Doci/seg_window/frame_{seg_frame_counter_window:05d}.png')
    seg_frame_counter_window += 1

def process_depth_window(image):
    global depth_frame_counter_window
    image.convert(carla.ColorConverter.LogarithmicDepth)
    image.save_to_disk(f'D:/KP_Alexis_Doci/depth_window/frame_{depth_frame_counter_window:05d}.png')
    depth_frame_counter_window += 1

In [None]:
# pemrosesan khusus lidar
import math
import open3d as o3d
from matplotlib import cm

# ==== Color Map dan Fungsi Axis Open3D ====
# Colormap untuk Lidar
VIRIDIS = np.array(cm.get_cmap('plasma').colors)
VID_RANGE = np.linspace(0.0, 1.0, VIRIDIS.shape[0])

# Colormap untuk Radar
COOL_RANGE = np.linspace(0.0, 1.0, VIRIDIS.shape[0])
COOL = np.array(cm.get_cmap('winter')(COOL_RANGE))[:, :3]

# Fungsi untuk menambahkan axis di Open3D
def add_open3d_axis(vis):
    axis = o3d.geometry.LineSet()
    axis.points = o3d.utility.Vector3dVector(np.array([
        [0, 0, 0], [1, 0, 0],
        [0, 1, 0], [0, 0, 1]
    ]))
    axis.lines = o3d.utility.Vector2iVector([[0,1],[0,2],[0,3]])
    axis.colors = o3d.utility.Vector3dVector([[1,0,0], [0,1,0], [0,0,1]])
    vis.add_geometry(axis)

# === Callback untuk Sensor ===
# Lidar
def lidar_callback(point_cloud, point_list):
    data = np.copy(np.frombuffer(point_cloud.raw_data, dtype=np.float32)).reshape(-1, 4)
    intensity = data[:, -1]
    int_color = np.c_[
        np.interp(intensity, VID_RANGE, VIRIDIS[:, 0]),
        np.interp(intensity, VID_RANGE, VIRIDIS[:, 1]),
        np.interp(intensity, VID_RANGE, VIRIDIS[:, 2])
    ]
    points = data[:, :-1]
    points[:, :1] = -points[:, :1]  # flip x
    point_list.points = o3d.utility.Vector3dVector(points)
    point_list.colors = o3d.utility.Vector3dVector(int_color)

# Radar
def radar_callback(data, radar_list):
    radar_data = np.zeros((len(data), 4))
    for i, d in enumerate(data):
        x = d.depth * math.cos(d.altitude) * math.cos(d.azimuth)
        y = d.depth * math.cos(d.altitude) * math.sin(d.azimuth)
        z = d.depth * math.sin(d.altitude)
        radar_data[i, :] = [x, y, z, d.velocity]
    intensity = np.abs(radar_data[:, -1])
    int_color = np.c_[
        np.interp(intensity, COOL_RANGE, COOL[:, 0]),
        np.interp(intensity, COOL_RANGE, COOL[:, 1]),
        np.interp(intensity, COOL_RANGE, COOL[:, 2])
    ]
    points = radar_data[:, :-1]
    points[:, :1] = -points[:, :1]
    radar_list.points = o3d.utility.Vector3dVector(points)
    radar_list.colors = o3d.utility.Vector3dVector(int_color)

# Kamera RGB
def camera_callback(image, data_dict):
    data_dict['image'] = np.reshape(
        np.copy(image.raw_data), (image.height, image.width, 4))



In [None]:
#send the car off on autopilot - this will leave the spectator
vehicle.set_autopilot(True)

In [None]:
# Prepare a CSV file for logging
log_filename = f'car_location_log_{time.strftime("%Y%m%d-%H%M%S")}.csv'
log_file = open(log_filename, 'w', newline='')
log_writer = csv.writer(log_file)
log_writer.writerow(['frame', 'timestamp', 'x', 'y', 'z'])

# --- Your sensor listeners are already set up correctly ---
# bird eye view
camera_rgb.listen(process_rgb)
camera_seg.listen(process_seg)
camera_depth.listen(process_depth)
# camera_lidar.listen(process_lidar)

# window car view
camera_rgb_window.listen(process_rgb_window)
camera_seg_window.listen(process_seg_window)
camera_depth_window.listen(process_depth_window)
# camera_lidar_window.listen(process_lidar_window)

# --- This is the key part to integrate the logging logic ---
def log_and_tick():
    """A single function to handle both ticking and logging."""
    
    # Tick the world to get the latest state
    world_snapshot = world.tick()

    # Get the vehicle's transform from the snapshot for better synchronization
    # and to ensure we get the data from the current frame.
    vehicle_snapshot = world_snapshot.find(vehicle.id)
    
    # Check if the vehicle still exists
    if vehicle_snapshot and vehicle.is_alive:
        loc = vehicle_snapshot.get_transform().location
        timestamp = world_snapshot.timestamp
        
        # Write the data to the CSV file
        log_writer.writerow([
            world_snapshot.frame,
            timestamp.elapsed_seconds,
            loc.x,
            loc.y,
            loc.z
        ])

        # Optional: Print to console for real-time feedback
        # sys.stdout.write(f"\rFrame: {world_snapshot.frame:06d} | Time: {timestamp.elapsed_seconds:.2f}s | Location: (x={loc.x:.2f}, y={loc.y:.2f}, z={loc.z:.2f})")
        # sys.stdout.flush()

    # Sleep to match the fixed delta seconds
    time.sleep(0.05)


print("Starting ticking loop. Press Interrupt (Ctrl+C) to stop.")
try:
    while True:
        log_and_tick()
except KeyboardInterrupt:
    print("Ticking loop stopped by user.")
finally:
    # Stop all sensors and close the log file
    camera_rgb.stop()
    camera_seg.stop()
    camera_depth.stop()
    # camera_lidar.stop()
    camera_rgb_window.stop()
    camera_seg_window.stop()
    camera_depth_window.stop()
    # camera_lidar_window.stop()
    
    # Close the log file
    log_file.close()

    print("All sensors stopped and log file closed.")

Starting ticking loop. Press Interrupt (Ctrl+C) to stop.
Ticking loop stopped by user.
All sensors stopped.
