In [1]:
import numpy as np
import math
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import matplotlib.patches as patches
from IPython.display import clear_output
import imageio
import matplotlib.backends.backend_agg as plt_backend_agg
import os
import subprocess
import shutil
import time
import pickle

### Car Class

In [2]:
class Car:
    def __init__(self, position, acceleration, angle, width, length, dist_sec_lat, dist_sec_front, angle_sensor, nn):
        self.position = np.array(position)
        self.velocity = 0.5
        self.acceleration = acceleration
        self.angle = angle
        self.width = width
        self.length = length
        self.dist_sec_lat = dist_sec_lat
        self.dist_sec_front = dist_sec_front
        self.angle_rad = math.radians(self.angle)
        self.angle_sensor = angle_sensor
        self.nn = nn
        self.fitness = 0
        
    def set_angle(self, new_angle):
        self.angle = new_angle
        self.angle_rad = math.radians(self.angle)
    
    def set_acceleration(self, new_acceleration):
        self.acceleration = new_acceleration

    def update_position(self, delta_t):
        
        #v0_mod = np.sqrt(self.velocity[0]**2 + self.velocity[1]**2)
        v0_mod = self.velocity
        v0 = v0_mod * np.array([math.cos(self.angle_rad), math.sin(self.angle_rad)])
        a0 = self.acceleration * np.array([math.cos(self.angle_rad), math.sin(self.angle_rad)])
        
        prev_position = self.position 
        
        self.velocity = v0_mod + self.acceleration * delta_t
        self.position = self.position + v0 * delta_t + 0.5 * a0 * delta_t ** 2
        
        if self.velocity > 0 :
            car_dist_vec = self.position - prev_position
            self.fitness = self.fitness + np.sqrt(car_dist_vec[0] ** 2 + car_dist_vec[1] ** 2)
    
    
    def intersect(self, p1, p2, p3, p4):
        # intersection between line(p1, p2) and line(p3, p4)
        x1,y1 = p1
        x2,y2 = p2
        x3,y3 = p3
        x4,y4 = p4

        denom = (y4-y3)*(x2-x1) - (x4-x3)*(y2-y1)

        if denom == 0: # parallel
            return None
        ua = ((x4-x3)*(y1-y3) - (y4-y3)*(x1-x3)) / denom
        if ua < 0 or ua > 1: # out of range
            return None
        ub = ((x2-x1)*(y1-y3) - (y2-y1)*(x1-x3)) / denom
        if ub < 0 or ub > 1: # out of range
            return None
        
        x = x1 + ua * (x2-x1)
        y = y1 + ua * (y2-y1)
        return (x,y)
    
    def intersection_wall(self, road):     
        angle_sensor_rad = math.radians(self.angle_sensor)
        
        right_inters_vec = []
        left_inters_vec = []
        front_inters_vec = []
        
        for i in range(0,len(road)):

            left_inters = self.intersect( 
                                (self.position[0], self.position[1] ),
                                (self.position[0] + math.cos(self.angle_rad + angle_sensor_rad) * self.dist_sec_lat, self.position[1] + math.sin(self.angle_rad + angle_sensor_rad) * self.dist_sec_lat ),
                                (road[i][0], road[i][2]),
                                (road[i][1], road[i][3]))
            right_inters = self.intersect( 
                                (self.position[0], self.position[1] ),
                                (self.position[0] + math.cos(self.angle_rad - angle_sensor_rad) * self.dist_sec_lat, self.position[1] + math.sin(self.angle_rad - angle_sensor_rad) * self.dist_sec_lat ),
                                (road[i][0], road[i][2]),
                                (road[i][1], road[i][3]))
            front_inters = self.intersect( 
                                (self.position[0], self.position[1] ),
                                (self.position[0] + math.cos(self.angle_rad) * self.dist_sec_front, self.position[1] + math.sin(self.angle_rad) * self.dist_sec_front ),
                                (road[i][0], road[i][2]),
                                (road[i][1], road[i][3]))

            if left_inters is not None:
                    left_inters_vec.append(left_inters)
            if right_inters is not None:
                    right_inters_vec.append(right_inters)
            if front_inters is not None:
                    front_inters_vec.append(front_inters)
            
        if len(left_inters_vec) == 0:
            left_inters_vec.append((self.position[0] + math.cos(self.angle_rad + angle_sensor_rad) * self.dist_sec_lat, self.position[1] + math.sin(self.angle_rad + angle_sensor_rad) * self.dist_sec_lat))
        if len(right_inters_vec) == 0:
            right_inters_vec.append((self.position[0] + math.cos(self.angle_rad - angle_sensor_rad) * self.dist_sec_lat, self.position[1] + math.sin(self.angle_rad - angle_sensor_rad) * self.dist_sec_lat))
        if len(front_inters_vec) == 0:
            front_inters_vec.append((self.position[0] + math.cos(self.angle_rad) * self.dist_sec_front, self.position[1] + math.sin(self.angle_rad) * self.dist_sec_front))

        return left_inters_vec, right_inters_vec,front_inters_vec

    def get_sensor_data(self, road):
        left_inters, right_inters, front_inters = self.intersection_wall(road)
        
        left_distance = np.min([np.linalg.norm(np.array(point) - self.position) for point in left_inters])
        right_distance = np.min([np.linalg.norm(np.array(point) - self.position) for point in right_inters])
        front_distance = np.min([np.linalg.norm(np.array(point) - self.position) for point in front_inters])
        
        return left_distance, right_distance, front_distance

    def check_crash(self, road):
        crash_vector = self.get_sensor_data(road)
        
        if min(crash_vector) <= self.width:
            return True
        else:
            return False
        
    def draw(self, road, t, fig, ax, generation, sensor_draw = False):
        #clear_output(wait=True)
        #fig, ax = plt.subplots(1,figsize = (20,6))
        #ax.clear()
        #ax.set_xlim([-3, 13])
        #ax.set_ylim([-2, 14])
        ax.set_ylim([-3, 14])
        

        #for i in range(0,len(road)):
        #    ax.plot(road[i][0:2], road[i][2:4], 'k')
        
        left_inters, right_inters, front_inters = self.intersection_wall(road)
        
        
        angle_sensor_rad = math.radians(self.angle_sensor)
        
        if sensor_draw:
            for i in range(0, len(left_inters)):
                ax.plot(*left_inters[i], 'ok', markersize=3)
            for i in range(0, len(right_inters)):
                ax.plot(*right_inters[i], 'ok', markersize=3)
            for i in range(0, len(front_inters)):
                ax.plot(*front_inters[i], 'ok', markersize=3)
            
            ax.plot([self.position[0], self.position[0] + math.cos(self.angle_rad) * self.dist_sec_front],
                     [self.position[1], self.position[1] + math.sin(self.angle_rad) * self.dist_sec_front],
                     [self.position[0], self.position[0] + math.cos(self.angle_rad + angle_sensor_rad) * self.dist_sec_lat],
                     [self.position[1], self.position[1] + math.sin(self.angle_rad + angle_sensor_rad) * self.dist_sec_lat],
                     [self.position[0], self.position[0] + math.cos(self.angle_rad - angle_sensor_rad) * self.dist_sec_lat],
                     [self.position[1], self.position[1] + math.sin(self.angle_rad - angle_sensor_rad) * self.dist_sec_lat],)
        
        
        square = patches.Rectangle((
                                self.position[0] - math.cos(self.angle_rad) * self.length / 2    + math.sin(self.angle_rad) * self.width / 2,
                                self.position[1] - math.cos(self.angle_rad) * self.width / 2    - math.sin(self.angle_rad) * self.length / 2),
                                self.length, self.width, edgecolor='black', facecolor='grey')

        ax.plot(self.position[0],self.position[1],'ko',markersize=1)

        square.set_angle(self.angle)
        ax.add_patch(square)
        text = "t = %1.2f s" % t
        ax.text(0,1.5,text)
        ax.text(0,2,"Generation: %2d" % int(generation+1))
        #ax.grid()
        #ax.axis('equal')
        
        return fig,ax


### Neural Network Class

In [3]:
class NNetwork:
    def __init__(self, layer_sizes):
        
        self.layer_sizes = layer_sizes
        self.num_layers = len(layer_sizes)
        self.weights = []
        self.biases = []
        #self.a = []
        #self.z = []
        
        for i in range(self.num_layers - 1):
            #weight = np.random.randn(layer_sizes[i], layer_sizes[i+1])
            weight = np.random.randn(layer_sizes[i], layer_sizes[i+1]) * np.sqrt(2 / layer_sizes[i])
            bias = np.random.randn(1, layer_sizes[i+1]) * np.sqrt(2 / layer_sizes[i])
            self.weights.append(weight)
            self.biases.append(bias)
        
    def sigmoid(self, x):
        return 1 / ( 1 + np.exp(-x) )
    
    def relu(self, x):
        return np.maximum(0, x)
    
    def forward_propagation(self, inputs):
        self.a = []
        self.z = []
        activation = inputs
        
        for i in range(self.num_layers - 2):
            z_layer = np.sum(activation * np.transpose(self.weights[i]),axis=1) + self.biases[i]
            a_layer = self.relu(z_layer)
            self.z.append(z_layer)
            self.a.append(a_layer)
            activation = a_layer
         
        z_layer = np.sum(activation * np.transpose(self.weights[-1]),axis=1) + self.biases[-1]
        a_layer = self.sigmoid(z_layer)
        self.z.append(z_layer)
        self.a.append(a_layer)
        
    def output(self):
        return self.a[-1][0]
        
    def min_max_scaling(self, data, min_val, max_val):
        return (data - min_val) / (max_val - min_val)
        
    def update_weights(self, best_weights, best_biases, learning_rate):
        
        for i in range(self.num_layers - 1):
            #self.weights[i] = best_weights[i] * (1 + learning_rate * np.random.uniform(-1,1))
            #self.biases[i] = best_biases[i] * (1 + learning_rate * np.random.uniform(-1,1))
            self.weights[i] = best_weights[i] * (1 + learning_rate * np.random.uniform(-1, 1, size=best_weights[i].shape))
            self.biases[i] = best_biases[i] * (1 + learning_rate * np.random.uniform(-1, 1, size=best_biases[i].shape))

## Neural Network

In [4]:
def simulate_car(population, road, t_final, delta_t, max_turn, max_acceleration, draw_option, generation, video_count,video_option,frame_skip):
    
    t = 0
    step_count = 0
    output_directory = ""
    
    if video_option:
        output_directory = f"Step_video_files/Gen{generation:03d}"

        if not os.path.exists(output_directory):
            os.makedirs(output_directory)
    
    while t < t_final and not all(car.check_crash(road) for car in population):# and not car.check_crash(road):
        if (draw_option or video_option) :
            if step_count % frame_skip == 0:
                fig, ax = plt.subplots(1, figsize=(20, 10))
                ax.set_ylim([-3, 14])
                ax.grid()
                ax.axis('equal')
                for i in range(0,len(road)):
                    ax.plot(road[i][0:2], road[i][2:4], 'k')
        
        for car in population:
            if not car.check_crash(road):
                left_distance, right_distance, front_distance = car.get_sensor_data(road)
                inputs = [
                    car.nn.min_max_scaling(left_distance, 0, car.dist_sec_lat),
                    car.nn.min_max_scaling(right_distance, 0, car.dist_sec_lat),
                    car.nn.min_max_scaling(front_distance, 0, car.dist_sec_front),
                    car.nn.min_max_scaling(car.velocity , 0, 10),
                    car.nn.min_max_scaling(car.angle, 0, 180)
                ]

                car.nn.forward_propagation(inputs)
                output = car.nn.output()

                new_angle = car.angle + (output[0] - output[1]) * max_turn * delta_t
                new_acceleration = (output[2] - output[3]) * max_acceleration

                car.update_position(delta_t)
                car.set_angle(new_angle)
                car.set_acceleration(new_acceleration)
            
            best_fit = []
            
            for car_fit in population:
                best_fit.append(car_fit.fitness)
            car_best_fit = population[best_fit.index(max(best_fit))]
            
            
            if (draw_option or video_option):
                if step_count % frame_skip == 0:
                    if car == car_best_fit:
                        fig,ax = car.draw(road, t, fig, ax, generation,True)
                    else:
                        fig,ax = car.draw(road, t, fig, ax, generation,False)


        if draw_option and step_count % frame_skip == 0:
            plt.show()
            video_count += 1
            plt.pause(0.01)
            clear_output(wait=True)
        
        if video_option and step_count % frame_skip == 0:
            filename = f"frame_step_{video_count:06d}.png"
            filepath = os.path.join(output_directory, filename)
            plt.savefig(filepath)
            plt.close(fig)
            video_count += 1
        
        t = t + delta_t
        step_count += 1
        
    return video_count,output_directory


        
def simulate_generation(population, road, t_final, delta_t, max_turn, max_acceleration):
    for car in population:
        simulate_car(car, road, t_final, delta_t, max_turn, max_acceleration, False)

def select_best_car(population):
    return max(population, key=lambda car: car.fitness)

def generate_next_population(best_car, population_size, learning_rate):
    new_population = []
    best_weights = best_car.nn.weights
    best_biases = best_car.nn.biases
    for _ in range(population_size):
        nn = NNetwork(best_car.nn.layer_sizes)
        nn.update_weights(best_weights, best_biases, learning_rate)
        new_car = Car(position, acceleration, angle, width, length, dist_sec_lat, dist_sec_front, angle_sensor, nn)        
        new_population.append(new_car)
    return new_population

def draw_best_car(best_car, road, t_final, delta_t, max_turn, max_acceleration):
    population = []
    best_weights = best_car.nn.weights
    best_biases = best_car.nn.biases

    nn = NNetwork(best_car.nn.layer_sizes)

    nn.update_weights(best_weights, best_biases, 0)
    new_car = Car(position, acceleration, angle, width, length, dist_sec_lat, dist_sec_front, angle_sensor, nn)
    population.append(new_car)
    simulate_car(population,new_car, road, t_final, delta_t, max_turn, max_acceleration, True, 0)
    
    
def create_video_and_cleanup(output_directory, generation,fps):
    # Use ffmpeg to create a video
    video_filename = os.path.join("Step_video_files", f"Gen{generation:03d}_output.mp4")
    
    if os.path.exists(video_filename):
        os.remove(video_filename)
        print(f"Deleted existing video file: {video_filename}")
    
    ffmpeg_command = [
        'ffmpeg', '-f', 'image2', '-framerate', str(fps), '-i',
        os.path.join(output_directory, 'frame_step_%06d.png'),
        '-c:v', 'libx264', '-pix_fmt', 'yuv420p', video_filename
    ]

    try:
        subprocess.run(ffmpeg_command, check=True)
        print(f"Video created: {video_filename}")
    except subprocess.CalledProcessError as e:
        print(f"Error occurred while creating video: {e}")

    # Delete the figures to save memory
    shutil.rmtree(output_directory)
    print(f"Deleted directory: {output_directory}")
    
    
def reset_generation(population, population_size):
    new_population = []
    
    for ii in range(population_size):
        best_weights = population[ii].nn.weights
        best_biases = population[ii].nn.biases
        nn = NNetwork(population[ii].nn.layer_sizes)
        nn.update_weights(best_weights, best_biases, 0)
        new_car = Car(position, acceleration, angle, width, length, dist_sec_lat, dist_sec_front, angle_sensor, nn)        
        new_population.append(new_car)
    return new_population    

In [5]:

lin0 = [0,0,-0.3,0.3]

lin1 = [0,3,0.3,0.3]
lin2 = [3,4,0.3,1]
lin3 = [4,4,1,2]
lin4 = [4,3,2,2.7]
lin5 = [3,0,2.7,2.7]
lin6 = [0,-3,2.7,2.7]
lin7 = [-3,-4,2.7,2]
lin8 = [-4,-4,2,1]
lin9 = [-4,-3,1,0.3]
lin10 = [-3,0,0.3,0.3]

lin11 = [0,3.5,-0.3,-0.3]
lin12 = [3.5,5,-0.3,0.8]
lin13 = [5,5,0.8,2.2]
lin14 = [5,3.5,2.2,3.5]
lin15 = [3.5,0,3.5,3.5]
lin16 = [0,-3.5,3.5,3.5]
lin17 = [-3.5,-5,3.5,2.2]
lin18 = [-5,-5,2.2,0.8]
lin19 = [-5,-3.5,0.8,-0.3]
lin20 = [-3.5,0,-0.3,-0.3]



road_prod2 = [
    lin0, lin1, lin2, lin3, lin4, lin5, lin6, lin7, lin8, lin9,
    lin10, lin11, lin12, lin13, lin14, lin15, lin16, lin17, lin18, lin19, lin20
]
road_prod3 = [
    lin1, lin2, lin3, lin4, lin5, lin6, lin7, lin8, lin9,
    lin10, lin11, lin12, lin13, lin14, lin15, lin16, lin17, lin18, lin19, lin20
]


line0 = [-1,-1,-0.6,0.4]

line1 = [-1,2,-0.6,-0.6]
line2 = [2,4.2,-0.6,0.8]
line3 = [4.2,4.2,0.8,3]
line4 = [4.2,2.2,3,4.5]
line5 = [2.2,2.2,4.5,6.2]
line6 = [2.2,4.2,6.2,7.5]
line7 = [4.2,6.5,7.5,7.5]
line8 = [6.5,7.4,7.5,6]
line9 = [7.4,7.4,6,4]
line10 = [7.4,9.4,4,3]
line11 = [9.4,11.4,3,4]
line12 = [11.4,11.4,4,9]
line13 = [11.4,9.6,9,10.5]
line14 = [9.6,6.7,10.5,11.5]
line15 = [6.7,-2,11.5,11.5]

line16 = [-1,1.6,0.4,0.4]
line17 = [1.6,3.1,0.4,1.3]
line18 = [3.1,3.1,1.3,2.4]
line19 = [3.1,0.8,2.4,4.1]
line20 = [0.8,0.8,4.1,6.6]
line21 = [0.8,3.8,6.6,8.8]
line22 = [3.8,7.3,8.8,8.7]
line23 = [7.3,8.6,8.7,6.7]
line24 = [8.6,8.6,6.7,4.7]
line25 = [8.6,9.5,4.7,4.1]
line26 = [9.5,10.3,4.1,4.6]
line27 = [10.3,10.3,4.6,8.4]
line28 = [10.3,9,8.4,9.8]
line29 = [9,6.5,9.8,10.6]
line30 = [6.5,3.8,10.6,11]
line31 = [3.8,-2,11,11.1]

road_prod_final = [line0, line1, line2, line3, line4, line5, line6, line7, line8, line9, line10, line11, line12, line13, line14, line15, line16, line17,
                   line18, line19, line20, line21, line22, line23, line24, line25, line26, line27, line28, line29, line30, line31]


position=[-0.8, -0.1]

acceleration = 0
angle=0
width=0.3
length=0.5
dist_sec_lat = 2
dist_sec_front = 4
angle_sensor = 30

max_turn = 90
max_acceleration = 0.1

road = road_prod_final

neuron_layers = [5,32,16,4]
# outputs
# [0]: turn left
# [1]: turn right
# [2]: speed up
# [3]: speed down


population_size = 40
num_generations = 20
t_final = 60
fps = 10
delta_t = 1/fps
learning_rate = 0.2

population_list = []
video_option = False
draw_option = False

frame_skip = 2



In [None]:
# Initialize a population of cars
population = []
for car_num in range(population_size):
    nn = NNetwork(neuron_layers)  # Example layer sizes
    car = Car(position, acceleration, angle, width, length, dist_sec_lat, dist_sec_front, angle_sensor, nn)
    population.append(car)
    
for generation in range(num_generations):
    population_list.append(population)
    time_0 = time.time()
    video_count = 1
    video_count, output_directory = simulate_car(population, road, t_final, delta_t, max_turn, max_acceleration, draw_option, generation,video_count,video_option,frame_skip)
    
    if video_option:
        create_video_and_cleanup(output_directory, generation,fps/frame_skip)
        
    best_car = select_best_car(population)
    print(f'Generation {generation + 1: 03d}: Best Fitness = {best_car.fitness: 0.4f}. Epoch time: {time.time()-time_0: 0.3f} s')
    
    population = generate_next_population(best_car, population_size, learning_rate)

In [6]:
def save_population_list(population_list, folder="Simulation_Data"):
    # Create the directory if it doesn't exist
    if not os.path.exists(folder):
        os.makedirs(folder)
    
    # Define the file path
    filepath = os.path.join(folder, f"population_list.pkl")
    
    # Save the population_list to the file
    with open(filepath, 'wb') as file:
        pickle.dump(population_list, file)
    
    print(f"Saved population_list to {filepath}")
    
def load_population_list(folder="Simulation_Data"):
    # Define the file path
    filepath = os.path.join(folder, f"population_list.pkl")
    
    # Load the population_list from the file
    with open(filepath, 'rb') as file:
        population_list = pickle.load(file)
    
    print(f"Loaded population_list from {filepath}")
    return population_list

In [35]:
save_population_list(population_list)

Saved population_list to Simulation_Data\population_list.pkl


In [7]:
population_list = load_population_list(folder="Simulation_Data")

Loaded population_list from Simulation_Data\population_list.pkl


In [10]:
video_option = True
draw_option = False
gen_min = 18
gen_max = 20

frame_skip = 1
for generation in range(gen_min,gen_max):
    population = population_list[generation]
    population = reset_generation(population, population_size)
    time_0 = time.time()
    video_count = 1
    video_count, output_directory = simulate_car(population, road, t_final, delta_t, max_turn, max_acceleration, draw_option, generation,video_count,video_option,frame_skip)
    
    if video_option:
        create_video_and_cleanup(output_directory, generation,fps/frame_skip)
        
    best_car = select_best_car(population)
    print(f'Generation {generation + 1: 03d}: Best Fitness = {best_car.fitness: 0.4f}. Epoch time: {time.time()-time_0: 0.3f} s')

Video created: Step_video_files\Gen018_output.mp4
Deleted directory: Step_video_files/Gen018
Generation  19: Best Fitness =  44.3473. Epoch time:  530.364 s
Video created: Step_video_files\Gen019_output.mp4
Deleted directory: Step_video_files/Gen019
Generation  20: Best Fitness =  47.0537. Epoch time:  512.882 s


In [33]:
video_option = False
draw_option = True

generation = 19

frame_skip = 2
population = population_list[generation]

population = reset_generation(population, population_size)
time_0 = time.time()
video_count = 1
video_count, output_directory = simulate_car(population, road, t_final, delta_t, max_turn, max_acceleration, draw_option, generation,video_count,video_option,frame_skip)

if video_option:
    create_video_and_cleanup(output_directory, generation,fps)

best_car = select_best_car(population)
print(f'Generation {generation + 1: 03d}: Best Fitness = {best_car.fitness: 0.4f}. Epoch time: {time.time()-time_0: 0.3f} s')
generation += 1
    

Generation  20: Best Fitness =  47.0537. Epoch time:  56.899 s
