# CARLA Project - Car and distance detection

### Package installation

Install the required packages with the following command (run it where the `requirements.txt` file is located):

```bash
pip install -r requirements.txt
```

In [1]:
import carla, time, pygame, math, random, cv2
import numpy as np
import torch
from torchvision import transforms, models
from PIL import Image
from queue import Queue
from queue import Empty
from ultralytics import YOLO

pygame 2.6.1 (SDL 2.28.4, Python 3.7.16)
Hello from the pygame community. https://www.pygame.org/contribute.html


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
client = carla.Client('localhost', 2000)
client.set_timeout(20.0)

# client.load_world('Town01')
world = client.get_world()
spectator = world.get_spectator()
# world.set_weather(carla.WeatherParameters.ClearNoon)

In [3]:
def move_spectator_to(transform, distance=5.0, x=0, y=0, z=4, yaw=0, pitch=-10, roll=0):
    back_location = transform.location - transform.get_forward_vector() * distance
    
    back_location.x += x
    back_location.y += y
    back_location.z += z
    transform.rotation.yaw += yaw
    transform.rotation.pitch = pitch
    transform.rotation.roll = roll
    
    spectator_transform = carla.Transform(back_location, transform.rotation)
    
    spectator.set_transform(spectator_transform)

def spawn_vehicle(vehicle_index=0, spawn_index=0, pattern='vehicle.*'):
    blueprint_library = world.get_blueprint_library()
    vehicle_bp = blueprint_library.filter(pattern)[vehicle_index]
    spawn_point = world.get_map().get_spawn_points()[spawn_index]
    vehicle = world.spawn_actor(vehicle_bp, spawn_point)
    return vehicle

def draw_on_screen(transform, content='O', color=carla.Color(0, 255, 0), life_time=20):
    world.debug.draw_string(transform.location, content, color=color, life_time=life_time)

def spawn_camera(attach_to=None, transform=carla.Transform(carla.Location(x=1.5, z=1.5), carla.Rotation(pitch=-10)), width=800, height=600, frequency = 0):
    camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
    camera_bp.set_attribute('image_size_x', str(width))
    camera_bp.set_attribute('image_size_y', str(height))
    camera_bp.set_attribute('sensor_tick', str(frequency))
    camera = world.spawn_actor(camera_bp, transform, attach_to=attach_to)
    return camera

def spawn_radar(attach_to=None, transform=carla.Transform(carla.Location(x=1.5, z=1.5), carla.Rotation(pitch=-10)), horizontal_fov = 35, vertical_fov = 20):
    radar_bp = world.get_blueprint_library().find('sensor.other.radar')
    radar_bp.set_attribute('horizontal_fov', str(horizontal_fov))
    radar_bp.set_attribute('vertical_fov', str(vertical_fov))
    radar = world.spawn_actor(radar_bp, transform, attach_to=attach_to)
    return radar

In [4]:
for actor in world.get_actors().filter('*vehicle*'):
  actor.destroy()

for actor in world.get_actors().filter('*sensor*'):
  actor.destroy()

In [None]:
settings = world.get_settings()
settings.synchronous_mode = True
settings.fixed_delta_seconds = 0.05
world.apply_settings(settings)

sensor_transform = carla.Transform(carla.Location(x=-0.16, y=-0.9, z=2.4), carla.Rotation(yaw=-100))

vehicle = spawn_vehicle(spawn_index=20, pattern="vehicle.dodge.charger_2020")
camera = spawn_camera(attach_to=vehicle, transform=sensor_transform, width=320, height=320)
radar = spawn_radar(attach_to=vehicle, transform=sensor_transform, horizontal_fov=60, vertical_fov=40)
velocity_range = 7.5 # m/s

video_output = np.zeros((320, 320, 4), dtype=np.uint8)

image_queue = Queue()
radar_queue = Queue()

def sensor_callback(data, queue):
    queue.put(data)

camera.listen(lambda data: sensor_callback(data, image_queue))
radar.listen(lambda data: sensor_callback(data, radar_queue))

vehicle.set_autopilot(True)

cv2.namedWindow('Left Side Camera', cv2.WINDOW_AUTOSIZE)

model = YOLO("runs/detect/train2/weights/best.pt")

running = True

try:
    while running:
        if cv2.waitKey(1) == ord('q'):
            running = False
            break
        
        world.tick()

        try:
            # Get the data once it's received.
            image_data = image_queue.get(True, 1.0)
            radar_data = radar_queue.get(True, 1.0)
        except Empty:
            print("[Warning] Some sensor data has been missed")
            continue

        # sync the data
        assert image_data.frame == radar_data.frame

        # Retrive the image data
        video_output = np.reshape(np.copy(image_data.raw_data), (image_data.height, image_data.width, 4))

        # ----------- RADAR data -----------

        points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4'))
        points = np.reshape(points, (len(radar_data), 4))

        # [vel, azimuth, altitude, depth]
        velocities = points[:, 0]
        azimuth = points[:, 1]
        altitude = points[:, 2]
        depth = points[:, 3]     

        # Convert spherical coordinates to Cartesian (sensor coordinate system)
        X = depth * np.cos(altitude) * np.sin(azimuth)
        Y = depth * np.sin(altitude)
        Z = depth * np.cos(altitude) * np.cos(azimuth)

        # Stack into a single array (shape: Nx3)
        cartesian_points = np.stack((X, Y, Z), axis=1)

        # Build the K projection matrix:
        # K = [[Fx,  0, image_w/2],
        #      [ 0, Fy, image_h/2],
        #      [ 0,  0,         1]]
        image_w = 320 #image_size_x
        image_h = 320 #image_size_y

        #since we are using the standard camera fov
        camera_bp = world.get_blueprint_library().find('sensor.camera.rgb')
        fov = camera_bp.get_attribute("fov").as_float() 
        focal = image_w / (2.0 * np.tan(fov * np.pi / 360.0))

        # In this case Fx and Fy are the same since the pixel aspect ratio is 1
        K = np.identity(3)
        K[0, 0] = K[1, 1] = focal
        K[0, 2] = image_w / 2.0
        K[1, 2] = image_h / 2.0

        # cartesian_points.T is a 3xN matrix - can be multiplied directly by K
        points_2d = np.dot(K, cartesian_points.T)

        # Normalize the points
        points_2d = np.array([
            points_2d[0, :] / points_2d[2, :],
            points_2d[1, :] / points_2d[2, :],
            points_2d[2, :]])
        
        points_2d = points_2d.T

        frame = video_output.copy()  # Copy to avoid modifying the original

        # Loop through all projected 2D points
        for point in points_2d:
            x, y = int(point[0]), int(point[1])  # Convert to integer pixel coordinates

            # Draw the point only if it is inside the image frame
            if 0 <= x < frame.shape[1] and 0 <= y < frame.shape[0]:
                cv2.circle(frame, (x, y), radius=2, color=(0, 0, 255), thickness=-1)  # Red dot

        # ----------- YOLO evaluation -----------

        eval_img = video_output[:, :, :3]
        results = model(eval_img, imgsz=320)
        for result in results:
            # print(result.boxes.xyxy)
            for box in result.boxes.xyxy:
                cv2.rectangle(frame, (int(box[0]), int(box[1])), (int(box[2]), int(box[3])), (0, 255, 0), 1)

        # Show the image with radar points overlaid
        cv2.imshow('Left Side Camera', frame)
finally:
    cv2.destroyAllWindows()
    camera.destroy()
    vehicle.destroy()
    radar.destroy()
    settings = world.get_settings()
    settings.synchronous_mode = False
    settings.no_rendering_mode = False
    settings.fixed_delta_seconds = None
    world.apply_settings(settings)

Ultralytics YOLOv8.0.20  Python-3.7.16 torch-1.10.0+cu113 CUDA:0 (NVIDIA GeForce RTX 4060 Laptop GPU, 8188MiB)
Model summary (fused): 168 layers, 3005843 parameters, 0 gradients, 8.1 GFLOPs


<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'torch.Tensor'>
<class 'tor

In [None]:
    # current_rot = radar_data.transform.rotation
    # for detect in radar_data:
    #     azi = math.degrees(detect.azimuth)
    #     alt = math.degrees(detect.altitude)
    #     # The 0.25 adjusts a bit the distance so the dots can be properly seen
    #     fw_vec = carla.Vector3D(x=detect.depth)
    #     # fw_vec = carla.Vector3D(x=detect.depth - 0.25)
    #     carla.Transform(
    #         carla.Location(),
    #         carla.Rotation(
    #             pitch=current_rot.pitch + alt,
    #             yaw=current_rot.yaw + azi,
    #             roll=current_rot.roll)).transform(fw_vec)

    #     def clamp(min_v, max_v, value):
    #         return max(min_v, min(value, max_v))

    #     norm_velocity = detect.velocity / velocity_range # range [-1, 1]
    #     r = int(clamp(0.0, 1.0, 1.0 - norm_velocity) * 255.0)
    #     g = int(clamp(0.0, 1.0, 1.0 - abs(norm_velocity)) * 255.0)
    #     b = int(abs(clamp(- 1.0, 0.0, - 1.0 - norm_velocity)) * 255.0)
    #     world.debug.draw_point(
    #         radar_data.transform.location + fw_vec,
    #         size=0.075,
    #         life_time=0.06,
    #         persistent_lines=False,
    #         color=carla.Color(r, g, b))

In [None]:
# settings = world.get_settings()
# settings.synchronous_mode = True
# settings.fixed_delta_seconds = 0.05
# world.apply_settings(settings)

# vehicle = spawn_vehicle(spawn_index=0, pattern="vehicle.dodge.charger_2020")
# camera = spawn_camera(attach_to=vehicle, transform=carla.Transform(carla.Location(x=-0.16, y=-0.9, z=2.4), carla.Rotation(yaw=-100)), frequency=0.2, width=300, height=300)
# video_output = np.zeros((300, 300, 4), dtype=np.uint8)
# car_label = ""

# def camera_callback(image):
#     global video_output
#     global car_label
    
#     image_data = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
    
#     # Convert the array to a PIL Image (RGB)
#     image_rgb = Image.fromarray(image_data[:, :, :3])  # Ignore the alpha channel
    
#     # Preprocess the image for the model
#     image_tensor = transform(image_rgb).unsqueeze(0)  # Add batch dimension
    
#     # Perform prediction
#     with torch.no_grad():
#         output = model(image_tensor)  # Get model output
#         _, predicted = torch.max(output, 1)  # Get the predicted class
    
#     # Set the car label based on the prediction
#     if predicted.item() == 0:
#         car_label = "Car detected"
#     else:
#         car_label = "No car detected"

#     # Update video output (for visualization)
#     video_output = image_data
#     cv2.putText(video_output, car_label, (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2)

# camera.listen(camera_callback)

# vehicle.set_autopilot(True, 8000)

# cv2.namedWindow('Left Side Camera', cv2.WINDOW_AUTOSIZE)

# running = True

# try:
#   while running:
#     if cv2.waitKey(1) == ord('q'):
#         running = False
#         break
#     cv2.imshow('Left Side Camera', video_output)
#     world.tick()
# finally:
#   cv2.destroyAllWindows()
#   camera.destroy()
#   vehicle.destroy()
#   settings = world.get_settings()
#   settings.synchronous_mode = False
#   settings.no_rendering_mode = False
#   settings.fixed_delta_seconds = None
#   world.apply_settings(settings)

In [None]:

# # Define the model architecture
# model = models.mobilenet_v2()

# # Modify the classifier layer to match the saved state dictionary
# model.classifier[1] = torch.nn.Linear(model.classifier[1].in_features, 2)

# # Load the state dictionary
# state_dict = torch.load("mobilenet_v2_finetuned.pth")

# # Load the state dictionary into the model
# model.load_state_dict(state_dict)

# # Set the model to evaluation mode
# model.eval()

# # Define the image transform (adjust according to your model's requirements)
# transform = transforms.Compose([
#     transforms.ToTensor(),
#     transforms.Resize((224, 224)),  # Adjust to match your model's expected input size
#     transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Adjust if needed
# ])

In [None]:
# settings = world.get_settings()
# settings.synchronous_mode = True
# settings.fixed_delta_seconds = 0.05
# world.apply_settings(settings)

# vehicle = spawn_vehicle(spawn_index=40, pattern="vehicle.dodge.charger_2020")
  
# transforms = [
#     carla.Transform(carla.Location(x=-0.16, y=-0.9, z=2.4), carla.Rotation(yaw=-100)),  # Left side camera
#     carla.Transform(carla.Location(x=-0.16, y=0.9, z=2.4), carla.Rotation(yaw=100)),  # Right side camera
#     carla.Transform(carla.Location(x=-1.5, z=2.4), carla.Rotation(yaw=180))  # Rear camera
# ]
# cameras = []

# video_outputs = [np.zeros((300, 300, 4), dtype=np.uint8) for _ in range(3)]
# def create_camera_callback(index):
#   def camera_callback(image):
#       global video_outputs
#       video_outputs[index] = np.reshape(np.copy(image.raw_data), (image.height, image.width, 4))
#   return camera_callback

# for i in range(3):
#   camera = spawn_camera(attach_to=vehicle, transform=transforms[i], frequency=0.5, width=300, height=300)
#   camera.listen(create_camera_callback(i))
#   cameras.append(camera)

# vehicle.set_autopilot(True, 8000)

# cv2.namedWindow('Left Side Camera', cv2.WINDOW_AUTOSIZE)
# cv2.namedWindow('Right Side Camera', cv2.WINDOW_AUTOSIZE)
# cv2.namedWindow('Rear Camera', cv2.WINDOW_AUTOSIZE)

# running = True

# try:
#   while running:
#     if cv2.waitKey(1) == ord('q'):
#         running = False
#         break
#     cv2.imshow('Left Side Camera', video_outputs[1])
#     cv2.imshow('Right Side Camera', video_outputs[2])
#     cv2.imshow('Rear Camera', video_outputs[3])
# finally:
#   cv2.destroyAllWindows()
#   for camera in cameras:
#       camera.destroy()
#   vehicle.destroy()
#   settings = world.get_settings()
#   settings.synchronous_mode = False
#   settings.no_rendering_mode = False
#   settings.fixed_delta_seconds = None
#   world.apply_settings(settings)