In [2]:
import carla
import numpy as np
import cv2
import time
IM_WIDTH = 640
IM_HEIGHT = 480
# Connect to CARLA server
client = carla.Client('localhost', 2000)
client.set_timeout(5.0)
# Get the world and blueprint library
world = client.get_world()
blueprint_library = world.get_blueprint_library()

In [1]:
import carla
import numpy as np
import cv2
import time
IM_WIDTH = 640
IM_HEIGHT = 480
# Connect to CARLA server
client = carla.Client('localhost', 2000)
client.set_timeout(5.0)
# Get the world and blueprint library
world = client.get_world()
blueprint_library = world.get_blueprint_library()


# Spawn a vehicle
vehicle_bp = blueprint_library.filter('model3')[0]
spawn_point = carla.Transform(carla.Location(x=0, y=137, z=0.1), carla.Rotation(pitch=0.0, yaw=180.0, roll=0.0))
vehicle = world.spawn_actor(vehicle_bp, spawn_point)

# Attach a camera sensor to the vehicle
camera_bp = blueprint_library.find('sensor.camera.rgb')
camera_bp.set_attribute("image_size_x", f"{IM_WIDTH}")
camera_bp.set_attribute("image_size_y", f"{IM_HEIGHT}")
camera_transform = carla.Transform(carla.Location(x=2.0, z=1.5), carla.Rotation(pitch=-15.0))
camera = world.spawn_actor(camera_bp, camera_transform, attach_to=vehicle)

seg_camera_bp = blueprint_library.find('sensor.camera.semantic_segmentation')
seg_camera_bp.set_attribute("image_size_x", f"{IM_WIDTH}")
seg_camera_bp.set_attribute("image_size_y", f"{IM_HEIGHT}")
seg_camera_transform = carla.Transform(carla.Location(x=2.0, z=1.5), carla.Rotation(pitch=-15.0))

# Spawn the camera and attach to the vehicle
seg_camera = world.spawn_actor(seg_camera_bp, seg_camera_transform, attach_to=vehicle)

def process_image(image):
    try:
        i = np.array(image.raw_data)
        #print(i.shape)
        i2 = i.reshape((IM_HEIGHT, IM_WIDTH, 4))
        i3 = i2[:, :, :3]
        gray = cv2.cvtColor(i3, cv2.COLOR_BGR2GRAY)
        return gray
    except Exception as e:
        print("Error in process_image:", str(e))
        return None
    
def detect_curved_lanes(binary_warped):
    histogram = np.sum(binary_warped[binary_warped.shape[0] // 2:, :], axis=0)
    
    midpoint = int(histogram.shape[0] // 2)

    leftx_base = np.argmax(histogram[:midpoint])
    rightx_base = np.argmax(histogram[midpoint:]) + midpoint

    nwindows = 9
    window_height = int(binary_warped.shape[0] // nwindows)
    
    nonzero = binary_warped.nonzero()
    nonzeroy = np.array(nonzero[0])
    nonzerox = np.array(nonzero[1])

    leftx_current = leftx_base
    rightx_current = rightx_base

    margin = 100
    minpix = 50
    
    left_lane_inds = []
    right_lane_inds = []

    for window in range(nwindows):
        win_y_low = binary_warped.shape[0] - (window + 1) * window_height
        win_y_high = binary_warped.shape[0] - window * window_height

        win_xleft_low = leftx_current - margin
        win_xleft_high = leftx_current + margin
        win_xright_low = rightx_current - margin
        win_xright_high = rightx_current + margin

        good_left_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                          (nonzerox >= win_xleft_low) & (nonzerox < win_xleft_high)).nonzero()[0]
        good_right_inds = ((nonzeroy >= win_y_low) & (nonzeroy < win_y_high) & 
                           (nonzerox >= win_xright_low) & (nonzerox < win_xright_high)).nonzero()[0]

        left_lane_inds.append(good_left_inds)
        right_lane_inds.append(good_right_inds)

        if len(good_left_inds) > minpix:
            leftx_current = int(np.mean(nonzerox[good_left_inds]))
        if len(good_right_inds) > minpix:        
            rightx_current = int(np.mean(nonzerox[good_right_inds]))

    left_lane_inds = np.concatenate(left_lane_inds)
    right_lane_inds = np.concatenate(right_lane_inds)

    leftx = nonzerox[left_lane_inds]
    lefty = nonzeroy[left_lane_inds] 
    rightx = nonzerox[right_lane_inds]
    righty = nonzeroy[right_lane_inds] 

    left_fit = np.polyfit(lefty, leftx, 2)
    right_fit = np.polyfit(righty, rightx, 2)
    
    return left_fit, right_fit

def detect_lanes2(gray):
    try:
        display_image = gray.copy()

        # Edge Detection
        edges = cv2.Canny(gray, 70, 70)
        height, width = edges.shape
        roi_vertices = [
            (0, height),
            (0, height * 0.5),
            (width, height * 0.5),
            (width, height)
        ]

        mask = np.zeros_like(edges)
        cv2.fillPoly(mask, [np.array(roi_vertices, dtype=np.int32)], 255)
        roi = cv2.bitwise_and(edges, mask)
        
        left_fit, right_fit = detect_curved_lanes(roi)  # Detect curved lanes

        # Generate y values for plotting
        ploty = np.linspace(0, height-1, height)

        # Calculate x values using the polynomial coefficients
        left_fitx = left_fit[0] * ploty**2 + left_fit[1] * ploty + left_fit[2]
        right_fitx = right_fit[0] * ploty**2 + right_fit[1] * ploty + right_fit[2]

        # Draw the lanes on the display_image
        for y, x_left, x_right in zip(ploty, left_fitx, right_fitx):
            y = int(y)
            x_left = int(x_left)
            x_right = int(x_right)
            cv2.circle(display_image, (x_left, y), 3, (255, 255, 255), -1)
            cv2.circle(display_image, (x_right, y), 3, (255, 255, 255), -1)

        cv2.imshow("Original", display_image)
        cv2.waitKey(1)

        # Compute lane center based on polynomial fits
        lane_center_x = (left_fitx[-1] + right_fitx[-1]) / 2  # x position at the bottom of the image
        lane_center = (int(lane_center_x), height-1)
        
        return lane_center

    except Exception as e:
        print(f"Error: {e}")
        return (width // 2, height)
    
def post_process_lines(lines, height):
    if lines is None:
        return None

    left_lines = []
    right_lines = []
    
    for line in lines:
        for x1, y1, x2, y2 in line:
            if x2 == x1:
                continue  # Avoiding division by zero
            slope = (y2 - y1) / (x2 - x1)
            intercept = y1 - slope * x1
            if 0.3 < abs(slope) < 4:  # Filtering out near-horizontal lines
                if slope < 0:  # Negative slope represents left lane
                    left_lines.append((slope, intercept))
                else:
                    right_lines.append((slope, intercept))

    # Average the position of lines on the left and right
    left_avg = np.average(left_lines, axis=0)
    right_avg = np.average(right_lines, axis=0)

    # Calculate endpoints for averaged lines
    if len(left_lines) > 0:
        left_line = extrapolate_line(left_avg, height)
    else:
        left_line = None

    if len(right_lines) > 0:
        right_line = extrapolate_line(right_avg, height)
    else:
        right_line = None

    return left_line, right_line

def extrapolate_line(line, height, ymin=0):
    slope, intercept = line
    y1 = height
    y2 = ymin
    x1 = int((y1 - intercept) / slope)
    x2 = int((y2 - intercept) / slope)
    return ((x1, y1), (x2, y2))

def detect_lanes(gray):
    try:
        display_image = gray.copy()
        # mask_white = cv2.inRange(gray, 175, 255)
        # blurred = cv2.GaussianBlur(gray, (9, 9), 0)
        # Edge Detection
        edges = cv2.Canny(gray, 70, 70)
        height, width = edges.shape
        roi_height = height * 0.45
        roi_vertices = [
            (0, height),
            (width * 1/5, roi_height), # left (x, y)
            (width * 4/5, roi_height), # right (x, y)
            (width, height)
        ]

        mask = np.zeros_like(gray)
        cv2.fillPoly(mask, [np.array(roi_vertices, dtype=np.int32)], 255)
        roi = cv2.bitwise_and(edges, mask)

        lines = cv2.HoughLinesP(roi, 1, np.pi/180, threshold=100, minLineLength=10, maxLineGap=350)
        if lines is None:
            # print("No lines detected!")
            return (400, 300)
        right_lines = [] 
        for line in lines:
            for x1, y1, x2, y2 in line:
                # Calculate slope of line
                if x2 - x1 != 0:  # Avoiding division by zero error
                    slope = (y2 - y1) / (x2 - x1)
                    # Filter lines based on slope, consider a small range around 0 to filter out horizontal lines
                    if 0.5 < slope < 2:  # Right lane
                        right_lines.append(line)

        # Draw the right lane
        for line in right_lines:
            for x1, y1, x2, y2 in line:
                cv2.line(display_image, (x1, y1), (x2, y2), (255, 255, 255), 2)

        # # Replace the drawing lines part in your main function with:
        # processed_lines = post_process_lines(lines, height)
        # if processed_lines[0] is not None:
        #     cv2.line(display_image, processed_lines[0][0], processed_lines[0][1], (255, 255, 255), 2)
        # if processed_lines[1] is not None:
        #     cv2.line(display_image, processed_lines[1][0], processed_lines[1][1], (255, 255, 255), 2)

        # black_image = np.zeros((height, width, 3), np.uint8)
        # if lines is not None:
        #     for line in lines:
        #         for x1, y1, x2, y2 in line:
        #             cv2.line(black_image, (x1, y1), (x2, y2), (255, 255, 255), 2)
        #             cv2.line(display_image, (x1, y1), (x2, y2), (255, 255, 255), 2)

        # Original Image Display
        # cv2.imshow('Edges', edges)
        # cv2.imshow('ROI', roi)
        # cv2.imshow('blurred', blurred)
        # cv2.imshow("Mask", mask)
        # cv2.imshow("mask white", mask_white)
        # cv2.imshow("black", black_image)
        cv2.imshow("Original", display_image)
        cv2.waitKey(1)

        # Compute lane center based on lines
        if lines is not None and len(lines) > 1:
            left_lane, right_lane = lines[0][0], lines[1][0]
            lane_center = ((left_lane[0] + right_lane[0]) / 2, (left_lane[1] + right_lane[1]) / 2)
            # print(f"left_lane: {left_lane}, right_lane: {right_lane}, lane_center: {lane_center}")
        else:
            lane_center = (width / 2, height)
        
        return lane_center
    except Exception as e:
        # print("Error in detect_lanes:", str(e))
        pass
    return (400, 300)

def process_semantic_image(image):
    # Convert the raw data to a numpy array
    semantic_array = np.array(image.raw_data).reshape((IM_HEIGHT, IM_WIDTH, 4))
    
    # Extract only the R channel (which contains the class labels)
    labels = semantic_array[:, :, 2]

    # Define a colormap to represent different classes (e.g., red for vehicles, green for pedestrians, etc.)
    # Adjust this colormap as per your needs.
    colormap = {
        # 0: [0, 0, 0],        # None
        1: [70, 70, 70],    # road
        # 2: [190, 153, 153], # sidewalks
        # 3: [250, 170, 160], # building
        # 4: [220, 20, 60],   # not sure
        # 5: [153, 153, 153], # not sure
        # 6: [157, 234, 50],  # light poles
        # 7: [128, 64, 128],  # traffic lights
        # 8: [244, 35, 232],  # TrafficSign
        # 10: [0, 0, 142],    # not sure
        # 11: [244, 35, 232],    # sky
        # 14: [244, 35, 232],    # cars
        # 20: [244, 35, 232],    # Dynamic
        # 22: [244, 35, 232],    # Dynamic
        # Add other classes as needed
    }

    # Convert labels to RGB based on the colormap
    semantic_rgb = np.zeros((IM_HEIGHT, IM_WIDTH, 3))
    for key, value in colormap.items():
        semantic_rgb[np.where(labels == key)] = value

    return semantic_rgb.astype(np.uint8)
# Define a function for steering control
def control_vehicle(lane_center):
    try:
        # _, width, _ = world.get_settings().screen_resolution
        car_center = IM_WIDTH / 2.0

        deviation = lane_center[0] - car_center
        kp = 0.0007
        steering = -kp * deviation
        # print(steering)
        vehicle.apply_control(carla.VehicleControl(throttle=0.5, steer=steering))
    except Exception as e:
        print("Error in control_vehicle:", str(e))

def callback(image):
    semantic_processed = process_semantic_image(image)
    # cv2.imshow("Semantic Processed", semantic_processed)

    # processed_image = process_image(image)
    processed_image = cv2.cvtColor(semantic_processed, cv2.COLOR_BGR2GRAY)
    # cv2.imshow("Grayscale", processed_image)

    lane_center = detect_lanes(processed_image)
    # print("Callback called. Lane center:", lane_center)
    control_vehicle(lane_center)

def semantic_callback(image):
    # processed_image = process_image(image)
    processed_image = process_semantic_image(image)
    cv2.imshow("Semantic Segmentation", processed_image)
    cv2.waitKey(1)

# Listen to camera images, detect lanes, and control the vehicle
# camera.listen(callback)
seg_camera.listen(callback)


In [4]:
# Start from a known location on the track
start_location = carla.Location(x=0, y=137, z=0.1)  # Change to your desired starting location
waypoint_api = map.get_waypoint(start_location)

waypoint_list = []
next_waypoint = waypoint_api
distance = 20.0  # distance between consecutive waypoints
num_waypoints = 120  # Number of waypoints to generate, adjust as needed

for _ in range(num_waypoints):
    # Append to the list
    waypoint_list.append(next_waypoint)

    # Print x and y coordinates
    print("X:", round(next_waypoint.transform.location.x), " Y:", round(next_waypoint.transform.location.y))
    # world.debug.draw_point(next_waypoint.transform.location, size=0.1, color=carla.Color(255,0,0), life_time=5)
    world.debug.draw_line(vehicle.get_transform().location, next_waypoint.transform.location, thickness=0.05, color=carla.Color(0,0,255), life_time=5)

    # Get the next waypoint
    next_waypoint = next_waypoint.next(distance)[0]

AttributeError: type object 'map' has no attribute 'get_waypoint'

In [11]:
# For each waypoint you want to visualize:
world.debug.draw_point(carla.Location(x=0, y=137, z=0.1), size=0.1, color=carla.Color(255,0,0), life_time=10)

In [2]:
actors = client.get_world().get_actors()

# Filter the list to include only vehicles and sensors
vehicles = [actor for actor in actors if 'vehicle' in actor.type_id]
sensors = [actor for actor in actors if 'sensor' in actor.type_id]

# Destroy all vehicles and sensors
for vehicle in vehicles:
    vehicle.destroy()
for sensor in sensors:
    sensor.destroy()


In [1]:
import carla
import numpy as np
import cv2
import time
IM_WIDTH = 640
IM_HEIGHT = 480
# Connect to CARLA server
client = carla.Client('localhost', 2000)
client.set_timeout(5.0)
# Get the world and blueprint library
world = client.get_world()
blueprint_library = world.get_blueprint_library()