In [1]:
import random
import pandas as pd
import numpy as np
import itertools
import time
from IPython.display import display, clear_output
import copy
import pickle
import timeit

In [2]:
#class used to execute any time trial. allows for a user to use value iteration, q learning, or sarsa
class time_trial_2 ():
    
    #to call the time trial, the track and crash type must be specified. this main method sets up the track and 
    #some other basic metrics that we need to do any type of learning approach
    def __init__ (self, track_type, crash):
        self.track_type = track_type
        self.valid_move = False
        self.fix_found = False
        self.cross_finish = False
        self.crash = crash
        track_read = open(track_type + "-track.txt","r+")
        track_raw = [line.replace('\n', '') for line in track_read.readlines()]
        self.shape = (int(track_raw[0].split(',')[0]), int(track_raw[0].split(',')[1]))
        self.spaces = list(itertools.product(range(self.shape[0]), range(self.shape[1])))
        self.acceleration_pairs = list(itertools.product([-1,0,1], [-1,0,1]))
        self.velocity_pairs = list(itertools.product(range(-5,6), range(-5,6)))
        self.track = []
        track_list = track_raw[1:]
        for row in track_list:
            self.track.append([val for val in row])
    
    #this method picks the starting position and used in several approaches at different times
    #for q learning and sarsa we allow the starting position to be some random position on the track
    #otherwise we find the start positions on the track and pick one of those and initialize velocity to (0,0)
    def starting_position(self, rand):
        if rand:
            self.start_position = random.choice(self.spaces)
            self.current_p = self.start_position
            self.current_v = random.choice(self.velocity_pairs)
        else:
            starting_points = []
            for space in self.spaces:
                if self.track[space[0]][space[1]] =='S':
                    starting_points.append((space[0], space[1]))
            self.start_position = random.choice(starting_points)
            self.current_p = self.start_position
            self.current_v = (0,0)

    #recursice method that evaluates whether a move is legal or not. an illegal move is when a car fully jumps over a wall and makes
    #it in bounds on the other side. To do so, this method utilizes recursion to see if there is any possible way given the velocity
    #for the car to make it to the current state from the previous state. if not, the car must return to its original location.
    def evaluate_move(self, p_s, v, count_walls, tolerance):
        v_y = v[0]
        v_x = v[1]
        if v_y == 0 and v_x == 0:
            if count_walls <tolerance:
                self.valid_move = True
        if self.track[p_s[0]][p_s[1]] == '#':
            count_walls+=1
        if self.track[p_s[0]][p_s[1]] == 'F' and count_walls == 0:
            self.cross_finish = True
        
        if v_y<0:
            v_y+=1
            next_ps = (p_s[0]-1, p_s[1])
            self.evaluate_move(next_ps, (v_y, v_x), count_walls,tolerance)
        elif v_y>0:
            v_y-=1
            next_ps = (p_s[0]+1, p_s[1])
            self.evaluate_move(next_ps, (v_y, v_x), count_walls,tolerance)
        if v_x<0:
            v_x+=1
            next_ps = (p_s[0], p_s[1]-1)
            self.evaluate_move(next_ps, (v_y, v_x), count_walls,tolerance)
        elif v_x>0:
            v_x-=1
            next_ps = (p_s[0], p_s[1]+1)
            self.evaluate_move(next_ps, (v_y, v_x), count_walls,tolerance)
    
    
    #the below method handles changes in methods and allows for non-determinism to be shut off during certain
    #training times. it also makes sure that the absolute value of the velocity is never above 5.
    #it evaluates the move to see if it was I define as illegal. Finally it says the new current position and 
    #velocity and stores the last one.
    def change_position (self, accel, train_time):
        if train_time == False:
            if random.uniform(0, 1) >=0.8:
                accel = (0,0)
        yv_change = self.current_v[0] + accel[0]
        xv_change = self.current_v[1] + accel[1]
        if xv_change >5:
            xv_change = 5
        elif xv_change < -5:
            xv_change = -5
        
        if yv_change >5:
            yv_change = 5
        elif yv_change < -5:
            yv_change = -5
        try:
            if self.track[self.current_p[0]+yv_change][self.current_p[1]+xv_change]!='#':
                self.evaluate_move(self.current_p, (yv_change, xv_change), 0, 1)
            else:
                self.valid_move = True   
        except Exception:
            self.valid_move = True
        
        self.last_v = copy.deepcopy(self.current_v)
        self.current_v = (yv_change, xv_change)
        self.last_p = copy.deepcopy(self.current_p)
        self.current_p = (self.current_p[0] + self.current_v[0], self.current_p[1] + self.current_v[1])
    
    #the move function changes the position and if the move ends up out of bounds, resolves this by either fixing the crash
    #or restarting the car to the beginning.
    def move(self,accel, train_time):
        self.change_position(accel, train_time)
        if self.current_p[0]<self.shape[0] and self.current_p[0]>=0 and self.current_p[1]<self.shape[1] and self.current_p[1]>=0:
            if self.track_type == '/content/O' and self.current_p[0]>11 and self.current_p[1]<11 and self.last_p[0]<11 and self.last_p[1]<11:
                self.starting_position(False)
            elif self.track[self.current_p[0]][self.current_p[1]] == '#' or not self.valid_move:
                if self.crash == 'fix' and self.valid_move:
                    p_old_0 = self.current_p[0]
                    p_old_1 = self.current_p[1]
                    self.fix_crash((self.current_p[0],self.current_p[1]), (self.current_v[0], self.current_v[1]), train_time)
                    self.fix_found = False
                else:
                    self.starting_position(False)
        else:
            self.current_p = self.last_p
            self.current_v = self.last_v
        self.valid_move = False

    #this method creates the final q table from the qtable created during each approach
    def q_table(self):
            self.q_final = {}
            for pos in itertools.product(range(self.shape[0]), range(self.shape[1]), range(-5,6), range(-5,6)):
                self.q_final[pos] = list(self.acceleration_pairs)[np.argmax(self.q[pos[0]][pos[1]][pos[2]+5][pos[3]+5])]        
    
    #this method just prints out the car moving through the course
    def print_move(self, course):
        clear_output(wait=True)
        print(course)
        time.sleep(0.01)

    
    #this is a recursive method that fixes a crash by identifying the location on the track nearest the crash site
    #it also makes not of the current direction of the car and moves backwards in that direction so that it does not 
    #jump over a wall when it is revived
    def fix_crash(self, p_f, v, train_time, count = 0): 
        if count<6 and not self.fix_found and p_f[0]<self.shape[0] and p_f[0]>=0 and p_f[1]<self.shape[1] and p_f[1]>=0:
            if self.track[p_f[0]][p_f[1]] != '#':
                self.current_p = p_f
                self.current_v = (0,0)
                self.fix_found = True
                return
            else:
                if v[0]>0:
                    rec_y = [0, -1]
                elif v[0]==0:
                    rec_y = [0]
                elif v[0]<0:
                    rec_y = [0, 1]
                if v[1]>0:
                    rec_x = [0, -1]
                elif v[1]==0:
                    rec_x = [0]
                elif v[1]<0:
                    rec_x = [0, 1]

                for vals in itertools.product(rec_y, rec_x):
                    if not self.fix_found:
                        self.fix_crash((p_f[0]+vals[0], p_f[1]+vals[1]), v, train_time, count+1)
        else:
            return
    

        
        
### VALUE ITERATION
    #this is the first part of the value iteration process. it instatiates the q tables and rewards table
    #i decided to split this out because the value iteration process has a lot of code
    def value_iteration(self):
        self.starting_position(False)
        self.rewards = np.random.rand(self.shape[0],self.shape[1],12,12)
        
        self.q = np.random.rand(self.shape[0],self.shape[1],12,12, len(self.acceleration_pairs))
        for space in self.spaces:
            if self.track[space[0]][space[1]] == 'F':
                for v in (itertools.product(range(12), range(12))):
                    self.rewards[space[0]][space[1]][v[0]][v[1]] = 0
                    for a_index, accel in enumerate(self.acceleration_pairs):
                        self.q[space[0]][space[1]][v[0]][v[1]][a_index] = 0
    
    #this is the value iteration training method, it creates the rewards table and q table for the track and does this by
    #iterating through every possible state for the specified number of iterations. it terminates if the max of the changes is less
    #than a specified amount
    def train(self, iterations):
        
        for n in range(iterations):
            old_rewards = self.rewards.copy()
            #clear_output(wait=True)
            print(n)
            for pos in itertools.product(range(self.shape[0]), range(self.shape[1]), range(-5,6), range(-5,6)):
                self.current_p = (pos[0], pos[1])
                self.current_v = (pos[2], pos[3])
                if self.track[pos[0]][pos[1]] == '#':
                    self.rewards[pos[0]][pos[1]][pos[2]+5][pos[3]+5] = 0
                else:
                    for a_index, accel in enumerate(self.acceleration_pairs):
                        if self.track[pos[0]][pos[1]] == 'F':
                            reward = 0
                        else:
                            reward = -1
                        self.move(accel, True)
                        val1= self.rewards[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5]                    
                        self.current_p = (pos[0], pos[1])
                        self.current_v = (pos[2], pos[3])
                        self.move((0,0), True)
                        val2 = self.rewards[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5]
                        self.current_p = (pos[0], pos[1])
                        self.current_v = (pos[2], pos[3])
                        ev = (0.8 * val1) + (0.2 * val2)
                        self.q[pos[0]][pos[1]][pos[2]+5][pos[3]+5][a_index] = reward + ev
                    max_accel = np.argmax(self.q[pos[0]][pos[1]][pos[2]+5][pos[3]+5])
                    self.rewards[pos[0]][pos[1]][pos[2]+5][pos[3]+5] = self.q[pos[0]][pos[1]][pos[2]+5][pos[3]+5][max_accel]
            for space in self.spaces:
                if self.track[space[0]][space[1]] == 'F':
                    for v in list(itertools.product(range(12), range(12))):
                        self.rewards[space[0]][space[1]][v[0]][v[1]] = 0            
            delta = np.abs(1-max((np.abs(self.rewards - old_rewards)).flatten()))
            print(delta)
            if delta < 0.002:
                self.q_table()
                return
        self.q_table()
    
### Q LEARNING
    #q learning method which utilizes an annealing e greeedy approach. chooses a random starting position for a specified number of
    #episodes and computes the q table through exploration and exploitation. it is off-policy because it doesnt use the existing
    #q table to compute the current 
    def q_learner(self, episodes, iterations, lr):
        self.q = np.random.rand(self.shape[0],self.shape[1],12,12, len(self.acceleration_pairs))
        for space in self.spaces:
            if self.track[space[0]][space[1]] == 'F':
                for v in (itertools.product(range(12), range(12))):
                    for a_index, accel in enumerate(self.acceleration_pairs):
                        self.q[space[0]][space[1]][v[0]][v[1]][a_index] = 0
        for ep in range(episodes):
            if ep % 100 == 0:
                clear_output(wait=True)
                print(ep)
            current_e = 1/(np.log(ep+0.0000001))
            self.current_p = random.choice(self.spaces)
            self.current_v = random.choice(self.velocity_pairs)
            for x in range(iterations):
                if self.track[self.current_p[0]][self.current_p[1]] in ['F', '#']:
                    break
                if random.uniform(0,1) >= current_e:
                    accel_index = np.argmax(self.q[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5])
                else:
                    accel_index = np.random.choice(9)
                accel = list(self.acceleration_pairs)[accel_index]
                p = (self.current_p[0], self.current_p[1])
                v = (self.current_v[0], self.current_v[1])
                
                self.move(accel, False)
                #if self.valid_move:
                self.q[p[0]][p[1]][v[0]+5][v[1]+5][accel_index] = (1-lr)*self.q[p[0]][p[1]][v[0]+5][v[1]+5][accel_index]+(lr*(-1+0.9*max(self.q[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5])))
                    #self.valid_move = False
#                 else:
#                     self.q[p[0]][p[1]][v[0]+5][v[1]+5][accel_index] = -np.inf
#                     self.current_p = self.last_p
#                     self.current_v = (0,0)
        self.q_table()

### SARSA
    #sarsa is very similar to the q learning approach aside from the update rule in the q table. it is on-policy, and thus 
    #uses the existing values of the q table to update the value in question
    def sarsa(self, episodes, iterations, lr):
        self.q = np.random.rand(self.shape[0],self.shape[1],12,12, len(self.acceleration_pairs))
        for space in self.spaces:
            if self.track[space[0]][space[1]] == 'F':
                for v in (itertools.product(range(12), range(12))):
                    for a_index, accel in enumerate(self.acceleration_pairs):
                        self.q[space[0]][space[1]][v[0]][v[1]][a_index] = 0
        for ep in range(episodes):
            if ep % 100 == 0:
                clear_output(wait=True)
                print(ep)
            current_e = 1/(np.log(ep+0.0000001))
            self.current_p = random.choice(self.spaces)
            self.current_v = random.choice(self.velocity_pairs)
            
            if random.uniform(0,1) >= current_e:
                    accel_index = np.argmax(self.q[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5])
            else:
                accel_index = np.random.choice(9)
            accel = list(self.acceleration_pairs)[accel_index]
            
            
            for x in range(iterations):
                if self.track[self.current_p[0]][self.current_p[1]] in ['F', '#']:
                    break
                p = (self.current_p[0], self.current_p[1])
                v = (self.current_v[0], self.current_v[1])
                self.move(accel, False)
                if random.uniform(0,1) >= current_e:
                    accel_index_PRIME = np.argmax(self.q[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5])
                else:
                    accel_index_PRIME = np.random.choice(9)
                accel_PRIME = list(self.acceleration_pairs)[accel_index_PRIME]
                self.q[p[0]][p[1]][v[0]+5][v[1]+5][accel_index] = (1-lr)*self.q[p[0]][p[1]][v[0]+5][v[1]+5][accel_index]+(lr*(-1+0.9*(self.q[self.current_p[0]][self.current_p[1]][self.current_v[0]+5][self.current_v[1]+5][accel_index_PRIME])))
                accel = accel_PRIME
                accel_index = accel_index_PRIME
        self.q_table()
    
    #this method simulates the time trial given the q table for the specific approach it outputs some of the metrics we need for evaluation 
    def simulate(self, runs):
        start = timeit.default_timer()
        self.starting_position(False)
        current_runs =0
        turns = 0
        turns_list = []
        stall = 0
        while current_runs<runs:
            turns+=1
            track_new = copy.deepcopy(self.track)
            track_new[self.current_p[0]][self.current_p[1]] = 'X'
            course = ''
            for y in track_new:
                for z in y:
                    course+=z
                course+='\n'
            self.print_move(course)
            a = self.q_final[(self.current_p[0], self.current_p[1], self.current_v[0], self.current_v[1])]
            if self.track[self.current_p[0]][self.current_p[1]] == 'F': 
                turns_list.append(turns)
                turns = 0
                current_runs+=1
                self.starting_position(False)
                #return(x)
                #return 'DONE'
            last_pos = copy.deepcopy(self.current_p)
            self.move(a, False)
            if self.current_position == last_pos:
                stall+=1
            if stall>10:
                self.starting_position(False)
                turns = 0
        stop = timeit.default_timer()
        print('Time: ', stop - start) 
        print(np.mean(turns_list))
    #this method simulates the time trial given the q table for the specific approach it outputs some of the metrics we need for evaluation 
    def simulate_saved(self, q_final, runs):
        start = timeit.default_timer()
        self.starting_position(False)
        current_runs =0
        turns = 0
        turns_list = []
        stall = 0
        stall_count = 0
        while current_runs<runs:
            turns+=1
            track_new = copy.deepcopy(self.track)
            track_new[self.current_p[0]][self.current_p[1]] = 'X'
            course = ''
            for y in track_new:
                for z in y:
                    course+=z
                course+='\n'
            self.print_move(course)
            a = q_final[(self.current_p[0], self.current_p[1], self.current_v[0], self.current_v[1])]
            if self.track[self.current_p[0]][self.current_p[1]] == 'F': 
                turns_list.append(turns)
                turns = 0
                current_runs+=1
                self.starting_position(False)
                #return 'DONE'
            last_pos = copy.deepcopy(self.current_p)
            self.move(a, False)
            if self.current_p == last_pos or self.track[self.current_p[0]][self.current_p[1]]=='S':
                stall+=1
            if stall>10:
                self.starting_position(False)
                turns = 0
                stall = 0
                stall_count+=1
        stop = timeit.default_timer()
        print(f'track: R crash_type: {self.crash} episodes: 750k')
        print('Time: ', stop - start) 
        print(f'turn count: {np.mean(turns_list)}')
        print(f'stall count: {np.mean(stall_count)}')


In [3]:
with open('sarsa_fix_o.pickle', 'rb') as handle:
    sarsa_fix_o = pickle.load(handle)

In [4]:
test = time_trial_2('O', 'fix')
test.simulate_saved(sarsa_fix_o, 20)

#########################
####.................####
###...................###
###....###########....###
##....#############....##
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#SSSS###############....#
####################....#
#XFFF###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
##....#############....##
###....###########....###
###...................###
####.................####
#########################

track: R crash_type: fix episodes: 750k
Time:  16.90793686899997
turn count: 35.75
stall count: 9.0


In [5]:
with open('q_fix_o.pickle', 'rb') as handle:
    q_fix_o = pickle.load(handle)

In [6]:
test = time_trial_2('O', 'fix')
test.simulate_saved(q_fix_o, 20)

#########################
####.................####
###...................###
###....###########....###
##....#############....##
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#SSSS###############....#
####################....#
#XFFF###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
##....#############....##
###....###########....###
###...................###
####.................####
#########################

track: R crash_type: fix episodes: 750k
Time:  12.177015394000023
turn count: 32.35
stall count: 5.0


In [7]:
with open('vi_fix_o.pickle', 'rb') as handle:
    vi_fix_o = pickle.load(handle)

In [8]:
test = time_trial_2('O', 'fix')
test.simulate_saved(vi_fix_o, 20)

#########################
####.................####
###...................###
###....###########....###
##....#############....##
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#SSSS###############....#
####################....#
#XFFF###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
#....###############....#
##....#############....##
###....###########....###
###...................###
####.................####
#########################

track: R crash_type: fix episodes: 750k
Time:  9.245151054999951
turn count: 25.15
stall count: 1.0
