In [1]:
import numpy as np
import random

In [2]:
def map_idx_to_state(idx, discrete_d, discrete_v, discrete_phi_t):
    '''
    map the index(row number) of the Q-table to a state
    '''
    len_1 = len(discrete_d) #the length of the list containing discretized distances
    len_2 = len(discrete_v) #the length of the list containing discretized velocities
    len_3 = len(discrete_phi_t) #the length of the list containing discretized pair of (phi, t_phi)
    
    idx_1 = int(np.floor(idx/(len_2*len_3))) #calculate the index of the distance in discrete_d
    distance = discrete_d[idx_1] #the value of distance in the mapped state
    
    idx_2 = int(np.floor((idx - idx_1*len_2*len_3)/len_3)) #compute the index of the mapped velocity in discrete_v
    velocity = discrete_v[idx_2] #mapped velocity
    
    idx_3 = idx - idx_1*len_2*len_3 - idx_2*len_3 #the index of the desired (phi, t_phi) in discrete_phi_t

    phi = discrete_phi_t[idx_3].flatten()[0] #mapped phi
    t_phi = discrete_phi_t[idx_3].flatten()[1] #mapped t_phi
    
    return (distance, velocity, phi, t_phi)

In [3]:
def map_state_to_idx(state, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R):
    '''
    map the state to discretized state and return the index(row number) of that discretized state in Q-table
    '''
    distance = state[0]
    velocity = state[1]
    phi = state[2]
    t_phi = state[3]
    
    #find the closest discrete distance to the distance of the given state
    distance_diff = np.asarray([(distance - d)**2 for d in discrete_d]) 
    distance_idx = np.argmin(distance_diff)
    
    #find the closest discrete velocity to the velocity of the given state
    velocity_diff = np.asarray([(velocity - v)**2 for v in discrete_v])
    velocity_idx = np.argmin(velocity_diff)
    
    #find the closest discrete (phi, t_phi) to the (phi_t_phi) of the given state
    t_phi_candidates = np.asarray([int(x[1]) for x in discrete_phi_t if x[0]==phi])
    t_phi_diff = np.asarray([(t_phi - t)**2 for t in t_phi_candidates])
    t_phi_idx = np.argmin(t_phi_diff)
    if phi=='Y':
        phi_t_idx = t_phi_idx
    elif phi=='R':
        phi_t_idx = t_phi_idx + (T_Y+1)
    elif phi=='G':
        phi_t_idx = t_phi_idx + (T_Y+T_R+2)
    
    #now we can compute the index(row number) of the discretized version of the given state in the Q-table
    idx = distance_idx*len(discrete_v)*len(discrete_phi_t) + velocity_idx*len(discrete_phi_t) + phi_t_idx
    
    return idx
    

In [4]:
def action_selection(Q_row, actions, epsilon):
    '''
    returns the action in a state, based on the epsilon-greedy method.
    Q-row is the row of the Q-table corresponding to a state, containing the Q-values of the action space for that state
    '''
    rand = np.random.uniform(0,1,1) #a rondom number to be compared with epsilon
    if rand>epsilon:#greedy scenario
        candidates = [i for i, x in enumerate(Q_row.flatten()) if x == np.max(Q_row.flatten())] #select the actions with the greatest Q-values
        action_idx = random.choice(candidates)
        
    else:#random scenario
        action_idx = np.random.randint(0,len(actions)) #randomly select an action 
        
    action = actions[action_idx]
    
    return action, action_idx

In [5]:
def update_state(state, action, delta_t, phi_to_T_dict, phi_to_next_phi_dict, std_d, std_v, std_t_phi):
    '''
    in this function the state goes through the state proces with the given action
    '''
    distance = state[0] #distance of the state
    velocity = state[1] #velocity of the state
    phi = state[2] #phase of the state
    t_phi = state[3] #t_phi of the state
    
    distance = distance - velocity*delta_t - 0.5*action*(delta_t**2) + np.random.normal(0, std_d, 1) #update the distance of the state based on the accelaration and state process noise
    velocity = velocity + action*delta_t + np.random.normal(0, std_v, 1) #update the velocity of the state based on the accelaration and state process noise
    
    #update the phi and t_phi of the state based on the accelaration and state process noise
    if ((int(t_phi) + delta_t) <= (phi_to_T_dict[phi] + np.random.normal(0, std_t_phi, 1))):
        phi = phi
        t_phi = int(t_phi) + delta_t

    else:
        phi = phi_to_next_phi_dict[phi]
        t_phi = 0
        
    return (distance[0], velocity[0], phi, t_phi)

In [6]:
def reward_function(distance, velocity, phi, t_phi, v_max, T_Y):
    '''
    reward function for the Q-learning
    '''
    reward = -1 #reward for each taken action
    
    if ((velocity < 0) or (velocity > v_max)): #the reward for the case that the velocity gets less than zero or greater than maximum allowed velocity
        reward = reward - 100
    if distance<=0: #passing the intersection
        if (phi=='G') or (phi=='Y'): #if the vehicle passes the intersection safely
            if velocity>0:
                reward = reward + 100
        else: #if the vehilce violates the red traffic light
            reward = reward - 100
    
    return reward

In [7]:
def TD_function(reward, discount_factor, Q_row, q): 
    return reward + discount_factor*np.max(Q_row.flatten()) - q

Variable Initialization

In [8]:
T_Y = 5 #total timing of yellow phase
T_G = 10 #total timing of green phase
T_R = 10 #total timing of red phase
total_T = T_Y + T_G + T_R

delta_t = 1

phi_to_T_dict = {'Y': T_Y, 'G': T_G, 'R': T_R} #the dictionary to map each phase to its timing
phi_to_next_phi_dict = {'G':'Y', 'Y':'R', 'R':'G'} #the dictionary to map each phase to its next occuring phase
std_d=2 #standard deviation of state process, for distance
std_v=2  #standard deviation of state process, for velocity
std_t_phi=2 #standard deviation of state process, for timing

trials = 100000 #number of loops for training of the Q-learning
episodes = 100 #the maximum length of each trajectory 

learning_rate = 0.1 #learning rate of the training phase in Q-learning
discount_factor = 0.9 #discoundt factor in Q-learning
epsilon = 0.1 #epsilon in training phase of Q-learning

actions = np.asarray([-3, -2, -1, 0, 1, 2, 3]) #action space
discrete_d = np.arange(-8, 121, 8) #the list of distances to discretize the distances
v_max=15 #maximum allowed velocity
discrete_v = np.arange(0, v_max+1, 1) #the valuse to discretize velocity

#discretizing values for the pair of(phi, t_phi)
discrete_phi_t = np.asarray([('Y', 0), ('Y', 1), ('Y', 2), ('Y', 3), ('Y', 4), ('Y', 5)\
                            , ('R', 0), ('R', 1), ('R', 2), ('R', 3), ('R', 4), ('R', 5), ('R', 6), ('R', 7), ('R', 8), ('R', 9), ('R', 10)\
                            , ('G', 0), ('G', 1), ('G', 2), ('G', 3), ('G', 4), ('G', 5), ('G', 6), ('G', 7), ('G', 8), ('G', 9), ('G', 10)])
Q_table = np.zeros((len(discrete_d)*len(discrete_v)*len(discrete_phi_t), len(actions))) #initialize the Q-table



Q-Learning

In [None]:

for trial in range(trials):
    idx_i = np.random.randint(0, Q_table.shape[0], 1) #randomly select an index(row number) of Q-table
    state = map_idx_to_state(idx_i, discrete_d, discrete_v, discrete_phi_t) #map the index to a state 
    mapped_state = map_idx_to_state(idx_i, discrete_d, discrete_v, discrete_phi_t) #map the index to a discretized state
    distance = mapped_state[0] 
    
    for episode in range(episodes):
        
        if distance<=0: # if the vehicle passed the intersection break the loop and go to the next trial
            break
            
        action, action_idx = action_selection(Q_table[idx_i], actions, epsilon) #select the action based on the Q-table and with epsilon-greedy approach
        q = Q_table[idx_i, action_idx] #the q-value related to the (state, action)
        
        state_new = update_state(mapped_state, action, delta_t, phi_to_T_dict, phi_to_next_phi_dict, std_d, std_v, std_t_phi) #the state goes through the state process with action
        idx_i_new = map_state_to_idx(state_new, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R) #index of the updated state in the Q-table
        mapped_state_new = map_idx_to_state(idx_i_new, discrete_d, discrete_v, discrete_phi_t) #discretized state corresponding to the index
        distance = mapped_state_new[0]
        velocity = mapped_state_new[1]
        phi = mapped_state_new[2]
        t_phi = mapped_state_new[3]
        
        reward = reward_function(distance, velocity, phi, t_phi, v_max, T_Y) #reward for getting into the new state
        
        td = TD_function(reward, discount_factor, Q_table[idx_i_new], q)
        
        q = q + learning_rate*td #updating q-value
        Q_table[idx_i,action_idx] = q #updating the q-table with the q-value
        
        idx_i = idx_i_new #updating the index
        state = state_new #updating the state
        mapped_state=mapped_state_new


with open('Q_table.npy','wb') as f:
    np.save(f, Q_table) #save the Q-table

Q-Table Loading

In [11]:
#loading Q-table
with open('Q_table.npy','rb') as f:
    Q_table = np.load(f)
print(f'Percentage of non-zero elements in Qtable:{len(Q_table[Q_table!=0])/(Q_table.shape[0]*Q_table.shape[1])}')

Percentage of non-zero elements in Qtable:0.8177333433373349


Test Scenario

In [13]:
f = open('./q_learning_results.txt', 'w')
state = (120, 15, 'G', 9)
idx_i = map_state_to_idx(state, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R)
mapped_state = map_idx_to_state(idx_i, discrete_d, discrete_v, discrete_phi_t)
distance = mapped_state[0]

while (distance>0):
    f.write(f'actual state: {[str(i) for i in state]}\n')
    print(f'actual state: {state}')
    f.write(f'mapped state: {[str(i) for i in mapped_state]}\n')
    print(f'mapped state: {mapped_state}')
    
    action, action_idx = action_selection(Q_table[idx_i], actions, 0)
    
    f.write(f'Q values for actions of this state:{[str(i) for i in Q_table[idx_i]]}\n')
    print(Q_table[idx_i])
    f.write(f'action: {action}\n\n')
    print(f'action: {action}\n')
    
    state_new = update_state(mapped_state, action, delta_t, phi_to_T_dict, phi_to_next_phi_dict, std_d, std_v, std_t_phi)
    idx_i_new = map_state_to_idx(state_new, discrete_d, discrete_v, discrete_phi_t, T_Y, T_R)
    mapped_state_new = map_idx_to_state(idx_i_new, discrete_d, discrete_v, discrete_phi_t)
    distance = mapped_state_new[0]
    
    state=state_new
    idx_i=idx_i_new
    mapped_state=mapped_state_new
    
print(mapped_state)
print(Q_table[idx_i]) 

actual state: (120, 15, 'G', 9)
mapped state: (120, 15, 'G', '9')
[-0.1         1.92034057  0.         -0.1        -0.11951307 -0.10981
 -0.1194409 ]
action: -2

actual state: (106.0, 13.0, 'G', 10)
mapped state: (104, 13, 'G', '10')
[ 0.46038945  0.44817432 -0.10981    -0.1        -0.20791    -0.109
  6.95878412]
action: 3

actual state: (89.5, 16.0, 'Y', 0)
mapped state: (88, 15, 'Y', '0')
[10.07807667  0.09290867 -0.10985301 -0.3997261  -0.14808479 -0.26922289
 -0.28733912]
action: -3

actual state: (74.5, 12.0, 'Y', 1)
mapped state: (72, 12, 'Y', '1')
[12.440342    0.71772892  0.6393232   0.85084875  0.06995606  0.52045491
  0.79337693]
action: -3

actual state: (61.5, 9.0, 'Y', 2)
mapped state: (64, 9, 'Y', '2')
[ 1.69839126  4.71842747  1.83791077 14.93566279  0.81033627  0.45652254
 -0.2881    ]
action: 0

actual state: (55.0, 9.0, 'Y', 3)
mapped state: (56, 9, 'Y', '3')
[ 1.30192779 17.70633736  4.64967925 -0.297739    0.57174352  2.51583259
  2.99491857]
action: -2

actual sta