In [None]:
import carla
import numpy as np
import cv2
import time
import random
import math
import ffmpeg
import pygame
import subprocess
import datetime

In [None]:
client = carla.Client('localhost', 2000)
client.set_timeout(10.0)
world = client.get_world()

In [None]:
blueprint_library = world.get_blueprint_library()
vehicle_bp = blueprint_library.find('vehicle.ford.mustang')
spawn_points = world.get_map().get_spawn_points()
vehicle = world.spawn_actor(vehicle_bp, random.choice(spawn_points))

In [None]:
spectator = world.get_spectator()
vehicle_transform = vehicle.get_transform()
offset = carla.Location(x=-4,z=2.5)
spectator_transform = carla.Transform(vehicle_transform.location + offset, vehicle_transform.rotation)
spectator.set_transform(spectator_transform)

In [None]:
scalar = 3
disp_size =  [x * scalar for x in [256, 256]]
print(disp_size)
camera_bp = blueprint_library.find('sensor.camera.rgb')
camera_init_trans = carla.Transform(carla.Location(z=1.5,x=0.4))
camera_bp.set_attribute('image_size_x', str(disp_size[0]))
camera_bp.set_attribute('image_size_y', str(disp_size[1]))
camera = world.spawn_actor(camera_bp, camera_init_trans, attach_to=vehicle)

In [None]:
ffmpeg_process = None
recording = False
pygame.init()
screen=pygame.display.set_mode((disp_size[0], disp_size[1]))

In [None]:
def start_recording():
    global ffmpeg_process, recording
    if not recording:
        timestamp = datetime.datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
        filename = f'{timestamp}.mp4'

        ffmpeg_cmd = [
            "ffmpeg",
            "-y",  # Nadpisywanie plików
            "-f", "rawvideo",
            "-pix_fmt", "bgr24",
            "-s", "256x256",
            "-r", "30",  # FPS
            "-i", "-",  # Input z `stdin`
            "-c:v", "libx264",
            "-preset", "ultrafast",
            "-crf", "23",
            filename
        ]

        ffmpeg_process = subprocess.Popen(ffmpeg_cmd, stdin=subprocess.PIPE)
        recording = True
        print(f"🔴 Nagrywanie rozpoczęte: {filename}")

In [None]:
def stop_recording():
    global ffmpeg_process, recording
    if recording and ffmpeg_process:
        ffmpeg_process.stdin.close()
        ffmpeg_process.wait()
        ffmpeg_process = None
        recording = False
        print("🛑 Nagrywanie zakończone.")

In [None]:
def process_image(image):
    """ Przetwarza obraz z kamery i wysyła do ffmpeg. """
    if recording and ffmpeg_process:
        array = np.frombuffer(image.raw_data, dtype=np.uint8)
        array = array.reshape((image.height, image.width, 4))  # RGBA
        frame = array[:, :, :3]  # Konwersja do BGR (OpenCV)
        ffmpeg_process.stdin.write(frame.tobytes())

In [None]:
camera.listen(lambda image: process_image(image)) #image.save_to_disk('out/%06d.png' % image.frame))

In [None]:
running = True
while running:
    for event in pygame.event.get():
        if event.type == pygame.QUIT:
            running = False

        if event.type == pygame.KEYDOWN:
            if event.key == pygame.K_r:  # Naciśnięcie "R"
                start_recording()
            if event.key == pygame.K_k:  # Naciśnięcie "K"
                stop_recording()

In [None]:
camera.stop()
camera.destroy()
stop_recording()
pygame.quit()

In [None]:
def detect_lanes(image):
    """Wykrywanie pasów ruchu za pomocą przetwarzania obrazu"""
    gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)  # Skala szarości
    blur = cv2.GaussianBlur(gray, (5, 5), 0)  # Rozmycie
    edges = cv2.Canny(blur, 50, 150)  # Detekcja krawędzi
    cv2.imshow(image)
    # Tworzenie maski dla regionu zainteresowania (ROI)
    height, width = edges.shape
    mask = np.zeros_like(edges)
    roi_vertices = np.array([[(50, height), (width//2-50, height//2), (width//2+50, height//2), (width-50, height)]], dtype=np.int32)
    cv2.fillPoly(mask, roi_vertices, 255)
    masked_edges = cv2.bitwise_and(edges, mask)

    # Wykrywanie linii metodą Hougha
    lines = cv2.HoughLinesP(masked_edges, 1, np.pi/180, 50, minLineLength=50, maxLineGap=200)
    return lines

In [None]:
def get_lane_center(lines, image_width):
    """Obliczanie środka pasa na podstawie wykrytych linii"""
    if lines is None:
        return image_width // 2  # Brak wykrytych linii – domyślnie środek ekranu

    left_lines, right_lines = [], []

    for line in lines:
        x1, y1, x2, y2 = line[0]
        slope = (y2 - y1) / (x2 - x1 + 1e-6)  # Unikanie dzielenia przez zero
        if slope < 0:
            left_lines.append((x1, x2))
        else:
            right_lines.append((x1, x2))

    if left_lines and right_lines:
        left_x = np.mean([x for x1, x2 in left_lines for x in (x1, x2)])
        right_x = np.mean([x for x1, x2 in right_lines for x in (x1, x2)])
        return (left_x + right_x) / 2

    return image_width // 2  # Domyślna wartość środka pasa


def get_vehicle_position(image_width):
    """Pozycja pojazdu w obrazie (zakładamy, że to środek dolnej krawędzi)"""
    return image_width // 2


In [None]:
class PIDController:
    """Prosty kontroler PID do sterowania pojazdem"""
    def __init__(self, Kp, Ki, Kd):
        self.Kp = Kp
        self.Ki = Ki
        self.Kd = Kd
        self.integral = 0
        self.prev_error = 0

    def control(self, error, dt):
        """Obliczanie korekcji sterowania"""
        self.integral += error * dt
        derivative = (error - self.prev_error) / dt
        output = self.Kp * error + self.Ki * self.integral + self.Kd * derivative
        self.prev_error = error
        return output

In [None]:
pid = PIDController(Kp=0.2, Ki=0.05, Kd=0.1)

def process_image(image):
    """Obsługa obrazu z kamery i sterowanie pojazdem"""
    array = np.frombuffer(image.raw_data, dtype=np.uint8).reshape((image.height, image.width, 4))[:, :, :3]

    # Wykrywanie linii pasa ruchu
    lines = detect_lanes(array)
    print("lol")
    
    # Obliczenie błędu względem środka pasa
    desired_position = get_lane_center(lines, array.shape[1])
    current_position = get_vehicle_position(array.shape[1])
    error = desired_position - current_position

    # Obliczenie korekcji PID
    correction = pid.control(error, 0.05)  # dt = 0.05s

    # Sterowanie pojazdem
    control = carla.VehicleControl()
    control.steer = np.clip(correction * 0.01, -1.0, 1.0)  # Korekcja skrętu
    control.throttle = 0.3  # Stała prędkość

    vehicle.apply_control(control)

    # Podgląd obrazu z wykrytymi liniami
    if lines is not None:
        for line in lines:
            x1, y1, x2, y2 = line[0]
            cv2.line(array, (x1, y1), (x2, y2), (0, 255, 0), 5)
    
    cv2.imshow("Lane Detection", array)
    cv2.waitKey(1)

# Start nasłuchiwania kamery
camera.listen(lambda image: process_image(image))

print("Autopilot uruchomiony!")

In [None]:
def stop_simulation():
    """Zatrzymuje symulację i niszczy aktorów"""
    camera.stop()
    vehicle.destroy()
    print("Symulacja zatrzymana.")

stop_simulation()  # Wykonaj tę funkcję, jeśli chcesz zatrzymać wszystko