# CARLA Autonomous Vehicle with LiDAR-Camera Fusion

**AI-controlled vehicle using LiDAR-to-camera projection for obstacle detection and navigation**

## Pipeline
1. Connect to CARLA in synchronous mode
2. Clear environment (keep only roads)
3. Spawn vehicle with cameras + LiDAR
4. Setup sensor fusion with LiDAR projection
5. Run AI-controlled navigation
6. Save synchronized sensor data
7. Cleanup and restore

See `README.md` for detailed documentation.

In [None]:
# ============================================================================
# IMPORTS
# ============================================================================
import carla
import random
import time
import numpy as np
import cv2
import os
import queue
from matplotlib import cm

# Viridis colormap for LiDAR intensity visualization
VIRIDIS = np.array(cm.get_cmap('viridis').colors)
VID_RANGE = np.linspace(0.0, 1.0, VIRIDIS.shape[0])

print("‚úì Dependencies loaded")

‚úì Dependencies loaded


  VIRIDIS = np.array(cm.get_cmap('viridis').colors)


## 1. CARLA Connection & Synchronous Mode

In [None]:
# Connect to CARLA server
client = carla.Client('localhost', 2000)
client.set_timeout(20.0)
print("‚úì Connected to CARLA")

In [None]:
# Get world and store original settings
world = client.get_world()
carla_map = world.get_map()
original_settings = world.get_settings()

# Enable synchronous mode for deterministic sensor capture
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05  # 20 FPS
world.apply_settings(settings)

# Synchronize traffic manager
traffic_manager = client.get_trafficmanager(8000)
traffic_manager.set_synchronous_mode(True)

print(f"‚úì Map: {carla_map.name}")
print(f"‚úì Synchronous mode: {1/settings.fixed_delta_seconds:.0f} FPS")

## 2. Clear Environment (Keep Only Roads)

In [None]:
# Remove all static map objects
for layer in [carla.MapLayer.Buildings, carla.MapLayer.Decals, carla.MapLayer.Foliage,
              carla.MapLayer.ParkedVehicles, carla.MapLayer.Particles, 
              carla.MapLayer.Props, carla.MapLayer.Walls]:
    world.unload_map_layer(layer)
world.tick()

print("‚úì Environment cleared")

In [None]:
# Reload all static map objects
for layer in [carla.MapLayer.Buildings, carla.MapLayer.Decals, carla.MapLayer.Foliage,
              carla.MapLayer.ParkedVehicles, carla.MapLayer.Particles, 
              carla.MapLayer.Props, carla.MapLayer.Walls]:
    world.load_map_layer(layer)
world.tick()

print("‚úì Environment reloaded")

## 3. Spawn Vehicle & Sensors

In [None]:
# Spawn vehicle at predetermined location for reproducibility
blueprint_library = world.get_blueprint_library()
vehicle_bp = blueprint_library.filter('vehicle.tesla.model3')[0]

# Fixed spawn coordinates (x, y, z) and rotation (pitch, yaw, roll)
# Using spawn point 0 from map
SPAWN_X = -64.6
SPAWN_Y = 24.5
SPAWN_Z = 0.6
SPAWN_YAW = 0.2  # Vehicle facing direction (degrees)

spawn_point = carla.Transform(
    carla.Location(x=SPAWN_X, y=SPAWN_Y, z=SPAWN_Z),
    carla.Rotation(pitch=0, yaw=SPAWN_YAW, roll=0)
)
vehicle = world.spawn_actor(vehicle_bp, spawn_point)
world.tick()

print(f"‚úì Vehicle spawned at fixed location ({SPAWN_X}, {SPAWN_Y}, {SPAWN_Z})")
print(f"  Facing: {SPAWN_YAW}¬∞ yaw")

In [None]:
# Configure RGB cameras (1920x1080, 90¬∞ FOV)
camera_bp = blueprint_library.find('sensor.camera.rgb')
camera_bp.set_attribute('image_size_x', '1920')
camera_bp.set_attribute('image_size_y', '1080')
camera_bp.set_attribute('fov', '90')

# Camera positions: front, left, right
camera_configs = {
    'front': carla.Transform(carla.Location(x=2.5, z=1.0), carla.Rotation(pitch=0, yaw=0)),
    'left': carla.Transform(carla.Location(x=0.0, y=-1.0, z=1.0), carla.Rotation(pitch=0, yaw=-90)),
    'right': carla.Transform(carla.Location(x=0.0, y=1.0, z=1.0), carla.Rotation(pitch=0, yaw=90))
}

# Spawn and attach cameras
cameras = {}
for cam_name, transform in camera_configs.items():
    cameras[cam_name] = world.spawn_actor(camera_bp, transform, attach_to=vehicle)
world.tick()

print(f"‚úì {len(cameras)} cameras attached (1920x1080, 90¬∞ FOV)")

In [None]:
# Compute camera intrinsic matrix K for LiDAR projection
image_w = int(camera_bp.get_attribute("image_size_x").as_int())
image_h = int(camera_bp.get_attribute("image_size_y").as_int())
fov = float(camera_bp.get_attribute("fov").as_float())
focal = image_w / (2.0 * np.tan(fov * np.pi / 360.0))

K = np.identity(3)
K[0, 0] = K[1, 1] = focal
K[0, 2] = image_w / 2.0
K[1, 2] = image_h / 2.0

print(f"‚úì Camera intrinsics computed (focal={focal:.1f}px)")

In [None]:
# Configure and attach LiDAR (64 channels, 100m range, 20Hz)
lidar_bp = blueprint_library.find('sensor.lidar.ray_cast')
lidar_bp.set_attribute('channels', '64')
lidar_bp.set_attribute('points_per_second', '1200000')
lidar_bp.set_attribute('rotation_frequency', '20')
lidar_bp.set_attribute('range', '100')
lidar_bp.set_attribute('upper_fov', '10')
lidar_bp.set_attribute('lower_fov', '-30')

lidar_transform = carla.Transform(carla.Location(x=0.0, z=2.5), carla.Rotation(pitch=0, yaw=0))
lidar = world.spawn_actor(lidar_bp, lidar_transform, attach_to=vehicle)
world.tick()

print("‚úì LiDAR attached (64ch, 100m, 20Hz)")

## 4. Spectator View Setup

In [None]:
# Setup third-person spectator view
spectator = world.get_spectator()

def update_spectator_view():
    """Position spectator behind and above vehicle"""
    transform = vehicle.get_transform()
    spectator.set_transform(carla.Transform(
        transform.location + carla.Location(x=0, z=4),
        carla.Rotation(pitch=-15, yaw=transform.rotation.yaw)
    ))

update_spectator_view()
print("‚úì Spectator view configured")
world.tick()


## 5. Sensor Fusion: LiDAR-to-Camera Projection

Projects 3D LiDAR points onto 2D camera images using camera intrinsics. Synchronized queues ensure temporal alignment across all sensors.

In [None]:
# ============================================================================
# SETUP OUTPUT DIRECTORIES & SENSOR QUEUES
# ============================================================================
output_base_dir = "carla_output"
camera_dirs = {name: os.path.join(output_base_dir, f"camera_{name}") for name in cameras.keys()}
lidar_dir = os.path.join(output_base_dir, "lidar")
projection_dir = os.path.join(output_base_dir, "lidar_projection")
fusion_dir = os.path.join(output_base_dir, "fusion")

for directory in list(camera_dirs.values()) + [lidar_dir, projection_dir, fusion_dir]:
    os.makedirs(directory, exist_ok=True)

# Synchronous sensor queues
camera_queues = {cam_name: queue.Queue() for cam_name in cameras.keys()}
lidar_queue = queue.Queue()
frame_counter = 0

# Capture configuration
fixed_delta = world.get_settings().fixed_delta_seconds
capture_interval_seconds = 0.5
capture_interval_ticks = int(capture_interval_seconds / fixed_delta)
DOT_EXTENT = 2  # LiDAR projection dot size

# Distance weighting parameters for steering
CLOSE_DISTANCE_THRESHOLD = 25.0  # Points closer than this get 4x weight (meters)
CLOSE_WEIGHT_MULTIPLIER = 4.0    # Weight multiplier for close objects

print(f"‚úì Output directories created: {output_base_dir}/")
print(f"‚úì Capture interval: {capture_interval_seconds}s ({capture_interval_ticks} ticks)")
print(f"‚úì Close object emphasis: {CLOSE_WEIGHT_MULTIPLIER}x weight for objects < {CLOSE_DISTANCE_THRESHOLD}m")

# ============================================================================
# SENSOR CALLBACKS
# ============================================================================
def create_image_callback(cam_name):
    def process_image(image):
        camera_queues[cam_name].put(image)
    return process_image

def lidar_callback(point_cloud):
    lidar_queue.put(point_cloud)

# ============================================================================
# LIDAR-TO-CAMERA PROJECTION
# ============================================================================
def project_lidar_to_camera(lidar_data, camera, im_array):
    """Project 3D LiDAR points onto 2D camera image using intrinsics.
    
    Returns:
        im_array: Image with projected LiDAR points
        num_points: Total number of projected points
        weighted_density: Distance-weighted density (close objects weighted 4x more)
    """
    # Parse point cloud (x, y, z, intensity)
    p_cloud = np.frombuffer(lidar_data.raw_data, dtype=np.dtype('f4')).reshape((-1, 4))
    intensity = p_cloud[:, 3]
    
    # Transform: LiDAR -> World -> Camera
    local_points = np.r_[p_cloud[:, :3].T, [np.ones(len(p_cloud))]]
    world_points = np.dot(lidar.get_transform().get_matrix(), local_points)
    sensor_points = np.dot(np.array(camera.get_transform().get_inverse_matrix()), world_points)
    
    # Convert to camera coordinates
    camera_coords = np.array([sensor_points[1], sensor_points[2] * -1, sensor_points[0]])
    
    # Project to 2D
    points_2d = np.dot(K, camera_coords)
    points_2d = np.array([points_2d[0] / points_2d[2], points_2d[1] / points_2d[2], points_2d[2]]).T
    
    # Filter points in image bounds
    mask = (points_2d[:, 0] > 0) & (points_2d[:, 0] < image_w) & \
           (points_2d[:, 1] > 0) & (points_2d[:, 1] < image_h) & \
           (points_2d[:, 2] > 0)
    points_2d, intensity = points_2d[mask], intensity[mask]
    
    # Calculate distance-weighted density
    # Close objects (< CLOSE_DISTANCE_THRESHOLD) get CLOSE_WEIGHT_MULTIPLIER weight
    depths = points_2d[:, 2]  # Depth in meters
    weights = np.where(depths < CLOSE_DISTANCE_THRESHOLD, CLOSE_WEIGHT_MULTIPLIER, 1.0)
    weighted_density = np.sum(weights)
    
    # Colorize by intensity (Viridis colormap)
    intensity = np.clip(1.0 - (4 * intensity - 3), 0.0, 1.0)
    colors = np.array([
        np.interp(intensity, VID_RANGE, VIRIDIS[:, 2]) * 255.0,
        np.interp(intensity, VID_RANGE, VIRIDIS[:, 1]) * 255.0,
        np.interp(intensity, VID_RANGE, VIRIDIS[:, 0]) * 255.0
    ]).astype(np.uint8).T
    
    # Draw points on image
    u, v = points_2d[:, 0].astype(int), points_2d[:, 1].astype(int)
    for i in range(len(points_2d)):
        v_min, v_max = max(0, v[i] - DOT_EXTENT), min(image_h, v[i] + DOT_EXTENT + 1)
        u_min, u_max = max(0, u[i] - DOT_EXTENT), min(image_w, u[i] + DOT_EXTENT + 1)
        im_array[v_min:v_max, u_min:u_max] = colors[i]
    
    return im_array, len(points_2d), weighted_density

# ============================================================================
# DATA SAVING FUNCTIONS
# ============================================================================
def save_camera_image(cam_name, image, frame_num):
    array = np.frombuffer(image.raw_data, dtype=np.uint8).reshape((image.height, image.width, 4))[:, :, :3]
    filename = os.path.join(camera_dirs[cam_name], f"frame_{frame_num:06d}.jpg")
    cv2.imwrite(filename, array)
    return filename, array

def save_projected_image(cam_name, im_array, frame_num):
    proj_dir = os.path.join(projection_dir, f"camera_{cam_name}")
    os.makedirs(proj_dir, exist_ok=True)
    filename = os.path.join(proj_dir, f"frame_{frame_num:06d}.jpg")
    cv2.imwrite(filename, im_array)
    return filename

def save_lidar_data(point_cloud, frame_num):
    points = np.frombuffer(point_cloud.raw_data, dtype=np.float32).reshape((-1, 4))
    filename = os.path.join(lidar_dir, f"lidar_{frame_num:06d}.npy")
    np.save(filename, points)
    return filename, len(points)

def save_fusion_metadata(frame_num, camera_files, projection_files, lidar_file, 
                         lidar_points, projection_counts, timestamp):
    import json
    metadata = {
        "frame_id": frame_num, "timestamp": timestamp,
        "cameras": camera_files, "lidar_projections": projection_files,
        "projection_point_counts": projection_counts,
        "lidar": {"file": lidar_file, "num_points": lidar_points}
    }
    with open(os.path.join(fusion_dir, f"frame_{frame_num:06d}.json"), 'w') as f:
        json.dump(metadata, f, indent=2)
    return metadata

# ============================================================================
# SENSOR DATA PROCESSOR
# ============================================================================
def process_sensor_queues_with_projections(should_save=False, sim_time=0.0):
    """Process queues and return projection images and weighted counts for AI"""
    global frame_counter
    
    # Clear queues if not saving
    if not should_save:
        for q in list(camera_queues.values()) + [lidar_queue]:
            while not q.empty(): q.get()
        return None, {}, {}
    
    # Collect synchronized sensor data
    camera_images = {}
    for cam_name in cameras.keys():
        if not camera_queues[cam_name].empty():
            camera_images[cam_name] = camera_queues[cam_name].get()
            while not camera_queues[cam_name].empty():
                camera_queues[cam_name].get()
    
    lidar_data = lidar_queue.get() if not lidar_queue.empty() else None
    while not lidar_queue.empty(): lidar_queue.get()
    
    # Process and save if all sensors ready
    if len(camera_images) == len(cameras) and lidar_data is not None:
        current_frame = frame_counter
        frame_counter += 1
        
        # Save camera images
        camera_files, camera_arrays = {}, {}
        for cam_name, image in camera_images.items():
            camera_files[cam_name], camera_arrays[cam_name] = save_camera_image(cam_name, image, current_frame)
        
        # Save LiDAR
        lidar_file, num_points = save_lidar_data(lidar_data, current_frame)
        
        # Project and save - now returns weighted density for steering
        projection_files, projection_counts, projection_images = {}, {}, {}
        for cam_name in cameras.keys():
            proj_array = camera_arrays[cam_name].copy()
            proj_array, num_proj, weighted_density = project_lidar_to_camera(lidar_data, cameras[cam_name], proj_array)
            projection_files[cam_name] = save_projected_image(cam_name, proj_array, current_frame)
            projection_counts[cam_name] = weighted_density  # Use weighted density for steering
            projection_images[cam_name] = proj_array
        
        # Save metadata
        save_fusion_metadata(current_frame, camera_files, projection_files, 
                           lidar_file, num_points, projection_counts, sim_time)
        
        if current_frame % 10 == 0:
            print(f"Frame {current_frame}: {num_points} LiDAR pts, weighted density: L={projection_counts['left']:.0f} F={projection_counts['front']:.0f} R={projection_counts['right']:.0f}")
        
        return None, projection_images, projection_counts
    
    return None, {}, {}

# Attach sensor listeners
for cam_name, cam in cameras.items():
    cam.listen(create_image_callback(cam_name))
lidar.listen(lidar_callback)

print(f"‚úì All sensors streaming to queues")

## 6. LiDAR Density-Based Steering Algorithm

Steers the vehicle based on LiDAR projection point density. The algorithm compares obstacle density between left and right cameras and steers proportionally toward the direction with fewer obstacles (lower density).

In [None]:
# ============================================================================
# LIDAR DENSITY-BASED STEERING ALGORITHM
# ============================================================================
# Steers the vehicle based on LiDAR projection point density:
# - Goes STRAIGHT when front has lowest density (clearest path ahead)
# - Steers toward the direction with LOWER density (fewer obstacles)
# - Proportional steering based on density difference ratio
#
# Input:  projection_counts (dict) - Number of LiDAR points projected per camera
#         - projection_counts['front'] : int - Points visible in front camera
#         - projection_counts['left']  : int - Points visible in left camera
#         - projection_counts['right'] : int - Points visible in right camera
#
# Output: carla.VehicleControl object with throttle and proportional steering
# ============================================================================

# Tunable parameters
THROTTLE = 0.3          # Constant forward throttle (0.0 to 1.0)
STEERING_GAIN = 0.8     # Steering sensitivity multiplier (higher = more aggressive)

def lidar_density_steering(projection_counts):
    """
    LiDAR density-based steering algorithm.
    
    If front has the lowest density (clearest path), go straight.
    Otherwise, steer toward the direction with fewer LiDAR points.
    
    Args:
        projection_counts: dict with 'front', 'left', 'right' LiDAR point counts
        
    Returns:
        carla.VehicleControl: Control commands with proportional steering
    """
    # Get point counts for each camera view
    front_count = projection_counts.get('front', 0)
    left_count = projection_counts.get('left', 0)
    right_count = projection_counts.get('right', 0)
    
    # If front has the lowest density (fewest obstacles), go straight
    if front_count < left_count and front_count < right_count:
        steer = 0.0
    else:
        # Calculate proportional steering based on density difference
        # Positive steer = turn right, Negative steer = turn left
        # We steer TOWARD lower density (away from obstacles)
        total_side = left_count + right_count
        
        if total_side > 0:
            # Normalized difference: positive when left has more points (steer right)
            # negative when right has more points (steer left)
            density_diff = (left_count - right_count) / total_side
            steer = density_diff * STEERING_GAIN
        else:
            # No side obstacles detected, go straight
            steer = 0.0
    
    # Clamp steering to valid range
    steer = max(-1.0, min(1.0, steer))
    
    # Create control command
    control = carla.VehicleControl()
    control.throttle = THROTTLE
    control.steer = steer
    control.brake = 0.0
    
    return control

print(f"‚úì LiDAR density steering algorithm ready")
print(f"  Throttle: {THROTTLE}, Steering gain: {STEERING_GAIN}")
print(f"  Rule: Go STRAIGHT when front has lowest density")

## 7. Run Simulation (Density-Based Steering)

Vehicle drives using the LiDAR density-based steering algorithm. Each tick, LiDAR projection counts are used to calculate proportional steering toward lower obstacle density.

In [None]:
# Disable autopilot - using custom density-based steering
vehicle.set_autopilot(False)
simulation_duration = 30
total_ticks = int(simulation_duration / fixed_delta)

print(f"‚úì Density-based steering enabled for {simulation_duration}s ({total_ticks} ticks)")
print("Watch CARLA window for autonomous driving\n")

tick_count = 0
last_print_time = 0
current_projection_counts = {}

try:
    while tick_count < total_ticks:
        world.tick()
        tick_count += 1
        
        # Wait for synchronized sensor data (timeout 2s)
        start_time = time.time()
        while time.time() - start_time < 2.0:
            if all(camera_queues[c].qsize() > 0 for c in cameras.keys()) and lidar_queue.qsize() > 0:
                break
            time.sleep(0.001)
        
        # Process sensor data
        should_save = (tick_count % capture_interval_ticks == 0)
        _, projection_images, projection_counts = process_sensor_queues_with_projections(should_save, tick_count * fixed_delta)
        
        # Update current projection counts when new data available
        if projection_counts:
            current_projection_counts = projection_counts
        
        # Apply density-based steering control
        if current_projection_counts:
            control = lidar_density_steering(current_projection_counts)
            vehicle.apply_control(control)
        
        # Update spectator view
        update_spectator_view()
        
        # Print status every 5s
        current_sim_time = tick_count * fixed_delta
        if int(current_sim_time) % 5 == 0 and int(current_sim_time) != last_print_time:
            last_print_time = int(current_sim_time)
            loc = vehicle.get_location()
            vel = vehicle.get_velocity()
            speed = 3.6 * np.sqrt(vel.x**2 + vel.y**2 + vel.z**2)
            
            # Show density info with updated steering logic
            l = current_projection_counts.get('left', 0)
            f = current_projection_counts.get('front', 0)
            r = current_projection_counts.get('right', 0)
            
            # Determine steering direction (matches algorithm logic)
            if f < l and f < r:
                steer_dir = "STRAIGHT (front clear)"
            elif l > r:
                steer_dir = "RIGHT"
            elif r > l:
                steer_dir = "LEFT"
            else:
                steer_dir = "STRAIGHT"
            
            print(f"[{int(current_sim_time)}s] Speed: {speed:.1f} km/h | Density L:{l:.0f} F:{f:.0f} R:{r:.0f} ‚Üí {steer_dir}")
        
except KeyboardInterrupt:
    print("\n‚úó Simulation interrupted by user")

print("\n‚úì Simulation completed!")

## 8. Cleanup

Stop sensors, destroy actors, and restore asynchronous mode.

In [None]:
# Restore async mode
world.apply_settings(original_settings)
traffic_manager.set_synchronous_mode(False)

# Stop sensor listeners
for cam in cameras.values():
    cam.stop()
lidar.stop()
time.sleep(0.5)

# Destroy actors
for cam in cameras.values():
    cam.destroy()
lidar.destroy()
vehicle.destroy()
world.tick()

print("‚úì Cleanup completed")
print(f"\nüìÅ Data saved in: {output_base_dir}/")
print(f"   ‚îú‚îÄ‚îÄ camera_front/left/right/  (raw images)")
print(f"   ‚îú‚îÄ‚îÄ lidar/                     (point clouds)")
print(f"   ‚îú‚îÄ‚îÄ lidar_projection/          (projected images)")
print(f"   ‚îî‚îÄ‚îÄ fusion/                    (JSON metadata)")