In [1]:
import carla
import math
import random
import time
import cv2
import numpy as np

In [None]:
SHOW_PREVIEW = False
IM_WIDTH = 640
IM_HEIGHT = 480
SECONDS_PER_EPISODE = 10

In [None]:
class CarEnv:
    front_camera = None
    
    SHOW_CAM = SHOW_PREVIEW
    STEER_AMT = 1.0
    
    im_width = IMG_WIDTH
    im_height = IMG_HEIGHT
    
    actor_list= []
    collision_hist = []

    def __init__(self):
        self.client = carla.Client('localhost', 2000)
        self.client.set_timeout(2.0) #time for connection

        # retrieve the world that is currently running
        self.world = self.client.get_world()

        # the world contains the list blueprints >> use it for adding actors 
        self.blueprint_library = self.world.get_blueprint_library()

        # choose the vehicle type by filter method
        self.model_3 = self.blueprint_library.filter('model3')[0]

    def reset(self):
        self.collision_hist = []
        self.actor_list = []
        self.spawn_pont_vehicle = random.choice(self.world.get_map().get_spawn_points())
        self.vehicle = self.world.try_spawn_actor(self.model_3, self.spawn_pont_vehicle)
        self.actor_list.append(self.vehicle)

        self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb')

        self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}')
        self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}')
        self.rgb_cam.set_attribute('fov', '110')

        self.transform = carla.Transform(carla.Location(x=2.5, z=0.7))
        self.sensor = self.world.try_spawn_actor(self.rgb_cam, self.transform, attach_to=self.vehicle)
        self.actor_list.append(self.sensor)
        self.sensor.listen(lambda data: self.process_img(data))
        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) 
        time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky.

        self.colsensor = self.world.get_blueprint_library().find('sensor.other.collision')
        self.colsensor = self.world.spawn_actor(self.colsensor, self.transform, attach_to=self.vehicle)
        self.actor_list.append(self.colsensor)
        self.colsensor.listen(lambda event: self.collision_data(event))

        while self.front_camera is None:
            time.sleep(0.01)
        self.episode_start = time.time()
        self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) 

        return self.front_camera

    def collision_data(self, event):
        self.collision_hist.append(event)

    def process_img(self, image):
        i = np.array(image.raw_data)
        i2 = i.reshape((self.im_height, self.im_width, 4))
        i3 = i2[:, :, :3]
        if self.SHOW_CAM:
            cv2.imshow("",i3)
            cv2.waitKey(1)
        self.front_camera = i3

    def step(self, action):
        #steer center, left, right >> 0, 1, 2
        if action == 0:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0))
        if action == 1:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT))
        if action == 2:
            self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
            
        v = self.vehicle.get_velocity()
        kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2))

        if len(self.collision_hist) != 0:
            done = True
            reward = -200
        elif kmh < 50:
            done = False
            reward = -1
        else:
            done = False
            reward = 1

        if SECONDS_PER_EPISODE < time.time() - self.episode_start:
            done = True

        return self.front_camera, reward, done, None
        
