In [1]:
import numpy as np
import random
import variables as var
import functions as func
import matplotlib.pyplot as plt
from tqdm import tqdm
from scipy.stats import norm

Variable Initialization

In [2]:
T_Y = var.T_Y  #timing for yellow phase
T_G = var.T_G #timing for green phase
T_R = var.T_R #timing for red phase
total_T = T_Y + T_G + T_R

d_lower = var.d_lower #lower bound for the initial distribution of distance
d_upper = var.d_upper #upper bound for the initial distribution of distance
v_lower = var.v_lower #lower bound for the initial distribution of velocity
v_upper = var.v_upper #upper bound for the initial distribution of velocity

delta_t = var.delta_t

phi_to_T_dict = {'Y': T_Y, 'G': T_G, 'R': T_R} #dictionary to map each phase to its timing
phi_to_next_phi_dict = {'G':'Y', 'Y':'R', 'R':'G'} #dictionary to map each phase to its next phase
phi_prev_phi_dict = var.phi_prev_phi_dict #dictionary to map each phase to its previous phase

std_d= 0.5#var.std_d #standard deviation of distance noise in state process
std_v= 0.25#var.std_v #standard deviation of velocity noise in state process
std_t_phi= 0#var.std_t_phi #standard deviation of timing noise in state process

trials = var.trials #number of runs for Q-learning training phase
episodes = var.episodes #maximum number for the length of the trajectory

learning_rate = 0.001 #learning_rate for the training phase of Q-learning
discount_factor = var.discount_factor #discount factor for the training phase of Q-learning
epsilon = var.epsilon

actions = var.actions #action space
discrete_d = var.discrete_d #list of discrete distances, used for discretization of the distance in state space
v_max= var.v_max #maximum allowed velocity
discrete_v = var.discrete_v #list of discrete velocities, used for discretization of the velocity in state space
discrete_phi_t = var.discrete_phi_t #list of discrete pairs of (phi, t_phi), used for discretization of the (phi, t_phi) in state space


In [3]:
def possible_states(state, action, std_d, std_v, std_t_phi, phi_to_T_dict, phi_to_next_phi_dict): #finds the possible states that due to the noise we could end up
    possible_states=[]
    
    for i in range(10): #finds the 10 possible states
        new_state = func.update_state(state, action, 1, phi_to_T_dict, phi_to_next_phi_dict, std_d, std_v, std_t_phi,1)
        possible_states.append(new_state)    
    return possible_states

def min_max_q(idx, Q_table):#for the state indexes in idx list, finds the maximum Q-value, and returns the minimum of these Q-vals
    max_q_vals = []
    for i in idx:
        max_q_vals.append(np.max(Q_table[i].flatten()))
    min_q = min(max_q_vals)

    return min_q

Q-Learning

In [None]:
Q_table = np.zeros((len(discrete_d)*len(discrete_v)*len(discrete_phi_t), len(actions))) #initialize Q-table
std_d= 0.5#var.std_d
std_v= 0.25#var.std_v
std_t_phi= 0#var.std_t_phi
np.random.seed()

for trial in tqdm(range(trials)):
    idx_i = np.random.randint(0, Q_table.shape[0], 1)[0] #randomly selects a row number of Q-table
    state = func.map_idx_to_state(idx_i, discrete_d, discrete_v, discrete_phi_t) #maps the row number to the state
    distance = state[0]
    velocity = state[1]
    for episode in range(episodes):
        
        if distance<0: #if the vehicle passed the intersection
            break
            
        action, action_idx, not_feasible_index = func.minmax_action_selection(Q_table[idx_i], velocity, actions, epsilon, v_max,1) #action selection based on minmax
        Q_table[idx_i, not_feasible_index]=-1000 #set a high negative value as the q-value for not feasible actions
        q = Q_table[idx_i, action_idx]

        probable_states = possible_states(state, action, std_d, std_v, std_t_phi, phi_to_T_dict, phi_to_next_phi_dict)#finds 10 possible states that the vehicle moves to because of the noise
            
        idx_i_new_list = [func.map_state_to_idx(state_new, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R, phi_prev_phi_dict) for state_new in probable_states]#index of the 10 possible states in Q-table
        min_q = min_max_q(idx_i_new_list, Q_table)#the minimum of the max q-values among 10 possible states
        
        state_new = func.update_state(state, action, delta_t, phi_to_T_dict, phi_to_next_phi_dict, std_d, std_v, std_t_phi,1)#update the staet based on the selected action
        idx_i_new = func.map_state_to_idx(state_new, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R, phi_prev_phi_dict)#finds the index of state in Q-table

        distance = state_new[0]
        velocity = state_new[1]
        phi = state_new[2]
        t_phi = state_new[3]
        
        reward = func.reward_function(distance, velocity, phi, t_phi, v_max, T_Y)#reward of going to the new state

        q = q + learning_rate*(reward + discount_factor*min_q - q)#updating q-value
        Q_table[idx_i,action_idx] = q #update q-table with the new q-value
        
        idx_i = idx_i_new
        state = state_new

with open('MINMAX_Q_table_noise_0.5_0.25_0.npy','wb') as f:#save Q-table
    np.save(f, Q_table)

 24%|██▍       | 951642/4000000 [2:38:36<8:07:52, 104.14it/s] 

Q-Table Loading

In [None]:
with open('MINMAX_Q_table_noise_0.5_0.25_0.npy','rb') as f:
    Q_table = np.load(f)
print(f'Percentage of non-zero elements in Qtable:{len(Q_table[Q_table!=0])/(Q_table.shape[0]*Q_table.shape[1])}')

Test Scenario

In [40]:

# state = (90.55001067682015, 12.032548969195476, 'G', 1)
state=trajectory[0][0]
print('Initial State')
print(state)
print()
idx_i = func.map_state_to_idx(state, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R, phi_prev_phi_dict)
mapped_state = func.map_idx_to_state(idx_i, discrete_d, discrete_v, discrete_phi_t)
distance = mapped_state[0]

while (distance>0):
    print(f'state: {state}')
    action, action_idx = func.action_selection(Q_table[idx_i], velocity, actions, 0, v_max, 0)

    print(f'Q_table for this state:{Q_table[idx_i]}')
    print(f'action: {action}\n')

    state_new = func.update_state(mapped_state, action, delta_t, phi_to_T_dict, phi_to_next_phi_dict, std_d, std_v, std_t_phi,0)
    idx_i_new = func.map_state_to_idx(state_new, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R, phi_prev_phi_dict)
    mapped_state = func.map_idx_to_state(idx_i_new, discrete_d, discrete_v, discrete_phi_t)
    distance = state_new[0]

    state=mapped_state
    idx_i=idx_i_new

print('Final State')
if state[2]=='R':
    print(f"\x1b[31m{state}\x1b[0m")

elif state[2]=='G':
    print(f"\x1b[32m{state}\x1b[0m")
else:
    print(f"\x1b[33m{state}\x1b[0m")
#     print(Q_table[idx_i]) 
print('******************************************************')


Initial State
(115.72247669903288, 10.508714284043458, 'Y', 0.2220329434276186)

state: (115.72247669903288, 10.508714284043458, 'Y', 0.2220329434276186)
Q_table for this state:[-0.18077978 -0.18086615 -0.18090316 -0.18100722 -0.1790151  -0.18113422
 -0.18106569]
action: 1

state: (105.02402555508777, 11.52238730966237, 'Y', array([1.24919965]))
Q_table for this state:[-0.30225388 -0.30234221 -0.30253315 -0.30256126 -0.30227456 -0.30197912
 -0.3036287 ]
action: 2

state: (92.58625788046594, 13.154460412335006, 'Y', array([2.38322198]))
Q_table for this state:[-0.22669408 -0.22737049 -0.22722273 -0.22711912 -0.22690539 -0.22696165
 -0.22684709]
action: -3

state: (81.71315572259539, 10.103223911452165, 'Y', array([3.24379654]))
Q_table for this state:[-0.15307847 -0.15186025 -0.15290288 -0.15296268 -0.15213125 -0.1511139
 -0.15215312]
action: 2

state: (70.33769628860323, 12.248643718854316, 'Y', array([4.29961125]))
Q_table for this state:[-0.17689283 -0.17712422 -0.17469318 -0.1762128

KeyboardInterrupt: 