In [None]:
import gymnasium as gym
from matplotlib import pyplot as plt
import re
import numpy as np
from tqdm.notebook import trange, tqdm
from math import floor

In [None]:

def row_col_to_seq(row_col, num_cols):
    '''
    Converts row_col to num_cols format

    Args:
        row_col (np.ndarray): Location of shape (1,2)
        num_cols (int): Total number of cols
    Return:
        (np.ndarray): Number format of row and column of shape (1,)
  '''
    return row_col[:,0] * num_cols + row_col[:,1]

def seq_to_col_row(seq, num_cols):
    '''
    Converts state number to rol_col format
    Args:
        seq (int): Number format of state
        num_cols (int): Number of columns
    Return:
        (np.ndarray): Array of shape (1,2) as [[row, col]]
    '''
    r = floor(seq / num_cols)
    c = seq - r * num_cols
    return np.array([[r, c]])

In [None]:
def policy_ep(q_values, state, ep):
    num_actions = q_values.shape[1]
    rand_num = np.random.random()
    if rand_num < ep:
        action = np.random.choice(np.arange(num_actions))
    else:
        action = np.argmax(q_values[state,:])

    return action


In [None]:
env = gym.make('Taxi-v3', render_mode = 'ansi')
s = env.reset()
print(env.render())


+---------+
|[35mR[0m: | : :G|
| : | : : |
| : : : : |
|[43m [0m| : | : |
|Y| : |[34;1mB[0m: |
+---------+




In [None]:

# def run_option(state, option, ep, q_vals):
#     opt_terminal_states = {0:[0,0], 1:[0,4], 2:[4,0],3:[4,3]}
#     # Decoding state for row, col, pass_loc, dest_loc
#     taxi_row, taxi_col, pass_loc, dest_loc = env.decode(state)
#     # Getting row and col for option terminal loc
#     taxi_target_row, taxi_target_col = opt_terminal_states[option]

#     # Checking if target loc is reached or not
#     if taxi_row == taxi_target_row and taxi_col == taxi_target_col:
#         print(f'Taxi checks: state = {taxi_row}, {taxi_col}, target = {taxi_target_row},{taxi_target_col}')

#         # Goal point reached
#         optdone = True
#         # check passeger loc
#         if option == pass_loc:
#             action = 4
#         elif option == dest_loc:
#             action = 5
#         else:
#             action = 1 if (option in [0,1]) else 0
#     else:
#         optdone = False
#         # Choosing action
#         action = policy_ep(q_vals[:,:,option], state, ep)

#     return action,optdone

In [None]:

def run_option(state, option, ep, q_vals):
    opt_terminal_states = {0:[0,0], 1:[0,4], 2:[4,0],3:[4,3]}
    # Decoding state for row, col, pass_loc, dest_loc
    taxi_row, taxi_col, pass_loc, dest_loc = env.decode(state)
    state_act_seq = row_col_to_seq(np.array([[taxi_row,taxi_col]]), 5)
    # Getting row and col for option terminal loc
    taxi_target_row, taxi_target_col = opt_terminal_states[option]

    # Checking if target loc is reached or not
    if taxi_row == taxi_target_row and taxi_col == taxi_target_col:
        #print(f'Taxi checks: state = {taxi_row}, {taxi_col}, target = {taxi_target_row},{taxi_target_col}')

        # Goal point reached
        optdone = True
        # check passeger loc
        if option == pass_loc:
            action = 4
        elif option == dest_loc:
            action = 5
        else:
            action = 1 if (option in [0,1]) else 0
    else:
        optdone = False
        # Choosing action
        action = policy_ep(q_vals[:,:,option], state_act_seq, ep)

    return action,optdone

In [None]:
row = 5
col = 5
num_pass_loc = 5
num_dest_loc = 4
num_options = 4 # Goto R, G, Y, B
num_primitive_act = 6
num_states = 500
tot_actions = num_primitive_act + num_options

In [None]:
gamma = 0.99
alpha = 0.5
ep = 0.01
# ep_max = 0.5
# ep_min = 0.01
# decay = 0.99
#ep_opt = np.ones(4)*ep_max

num_episodes = 1000
opt_terminal_states = {0:[0,0], 1:[0,4], 2:[4,0],3:[4,3]}
q_val_opt = np.zeros((num_pass_loc*num_dest_loc, num_options)) # shape= (20,4)
q_val_actions = np.zeros((row*col, num_primitive_act, num_options)) # shape (25,6,4)


for i in tqdm(range(num_episodes)):
    curr_state = env.reset()[0]
    #taxi_start_row, taxi_start_col, pass_loc, dest_idx = env.decode(curr_state)
    done = False
    steps = 0
    tot_reward = 0

    while not done:
        # Decoding state
        taxi_row, taxi_col, pass_loc, dest_idx = env.decode(curr_state)
        curr_opt_seq = row_col_to_seq(np.array([[pass_loc, dest_idx]]), num_dest_loc)

        # choose option
        option = policy_ep(q_val_opt, curr_opt_seq, ep)

        optdone = False
        reward_bar = 0
        opt_start_state = curr_opt_seq
        k = 0

        while not optdone:
            opt_action,optdone = run_option(state = curr_state,
                                            option = option,
                                            ep = ep,
                                            q_vals = q_val_actions)
            next_state, reward, is_terminal, truncated, t_prob = env.step(opt_action)
            reward_bar = gamma*reward_bar + reward
            tot_reward = tot_reward + reward

            # decode curr, next state
            taxi_curr_row, taxi_curr_col, pass_curr_loc, dest_curr_idx = env.decode(curr_state)
            taxi_next_row, taxi_next_col, pass_next_loc, dest_next_idx = env.decode(next_state)
            curr_act_seq = row_col_to_seq(np.array([[taxi_curr_row,taxi_curr_col]]), 5)
            next_act_seq = row_col_to_seq(np.array([[taxi_next_row,taxi_next_col]]), 5)

            q_val_actions[curr_act_seq,opt_action,option] = q_val_actions[curr_act_seq,opt_action,option] + alpha * (reward + gamma * np.max(q_val_actions[next_act_seq,:,option]) - q_val_actions[curr_act_seq,opt_action,option])

            if is_terminal:
                done = True
            if optdone:
                opt_terminal_state = next_state
            k += 1
            steps += 1
            curr_state = next_state
            #ep_opt[option] = ep_min #max(ep_min,decay*ep_opt[option])

        #decode and encode option start and end
        opt_start_row, opt_start_col, pass_start_loc, dest_start_idx = env.decode(opt_start_state)
        opt_end_row, opt_end_col, pass_end_loc, dest_end_idx = env.decode(opt_terminal_state)
        curr_opt_seq = row_col_to_seq(np.array([[pass_start_loc,dest_start_idx]]), 4)
        end_opt_seq = row_col_to_seq(np.array([[pass_end_loc,dest_end_idx]]), 4)
        q_val_opt[curr_opt_seq, option] = q_val_opt[curr_opt_seq, option] + alpha * (reward_bar + (gamma**k)*np.max(q_val_opt[end_opt_seq]) - q_val_opt[curr_opt_seq, option])

    print(f'ep = {i}, steps = {steps}, total reward = {tot_reward},')



  0%|          | 0/1000 [00:00<?, ?it/s]

ep = 0, steps = 1998, total reward = -3777,
ep = 1, steps = 333, total reward = -321,
ep = 2, steps = 80, total reward = -77,
ep = 3, steps = 363, total reward = -360,
ep = 4, steps = 205, total reward = -202,
ep = 5, steps = 190, total reward = -196,
ep = 6, steps = 364, total reward = -406,
ep = 7, steps = 36, total reward = -15,
ep = 8, steps = 19, total reward = 2,
ep = 9, steps = 93, total reward = -99,
ep = 10, steps = 283, total reward = -442,
ep = 11, steps = 46, total reward = -34,
ep = 12, steps = 75, total reward = -63,
ep = 13, steps = 16, total reward = 5,
ep = 14, steps = 324, total reward = -393,
ep = 15, steps = 49, total reward = -46,
ep = 16, steps = 83, total reward = -107,
ep = 17, steps = 38, total reward = -26,
ep = 18, steps = 20, total reward = 1,
ep = 19, steps = 44, total reward = -32,
ep = 20, steps = 23, total reward = -2,
ep = 21, steps = 15, total reward = 6,
ep = 22, steps = 7, total reward = 14,
ep = 23, steps = 10, total reward = 11,
ep = 24, steps = 7,

In [None]:

xy = seq_to_col_row(25,5)
xy

array([[5, 0]])

In [None]:
row_col_to_seq(np.array([[0,0]]),5)

array([0])

In [None]:
Qopt = {i:np.zeros((500//20,6-2)) for i in range(4)} #Q-values for each option

In [None]:
Qopt.keys()

dict_keys([0, 1, 2, 3])

In [None]:
Qopt[0].shape

(25, 4)