In [13]:
'''
gridworld_terminal_TD.py
'''

# libraries
import numpy as np
import sys
from tqdm import tqdm

# state indexing: (x,y) -- for 4x4 grid
# [
#     (0,0), (0,1), (0,2), (0,3)
#     (1,0), (1,1), (1,2), (1,3)
#     (2,0), (2,1), (2,2), (2,3)
#     (3,0), (3,1), (3,2), (3,3)
# ]

# left (0), up (1), right (2), down (3)
v_actions = [np.array([0, -1]),
            np.array([-1, 0]),
            np.array([0, 1]),
            np.array([1, 0])]

i_nActions = len(v_actions)

GAMMA = 0.9
ALPHA = 0.25 #0.1 

def is_terminal(state, ginDim):
    x, y = state
    return (x == 0 and y == 0) or (x == ginDim - 1 and y == ginDim - 1)

def get_initial_state(ginDim):
    # print("ginDim:%s" %ginDim)
    while True:
        state_i = np.random.choice(ginDim)
        state_j = np.random.choice(ginDim)
        if (state_i == 0 and state_j == 0) or (state_i == ginDim-1 and state_j == ginDim-1):
            continue
        else:
            break
    return state_i, state_j


def target_policy_agent(state, gvPolicy, action=None):
    # print(state, action)
    if action is None:
        # t_iActionID = np.random.choice(i_nActions)
        t_iActionID = np.random.choice(list(range(i_nActions)), p=gvPolicy)
        # return the selected action and the prob of selection
        return t_iActionID, v_actions[t_iActionID], gvPolicy[t_iActionID]
    else:
        # return the given action and the prob of selecting that action at given state
        return action, v_actions[action], gvPolicy[action]


def step(giSt, giAc, ginDim):
    giSt = np.array(giSt)
    next_state = (giSt + giAc).tolist()
    x, y = next_state
    reward = -1

    # if you exit the grid, return the original state
    if x < 0 or x >= ginDim or y < 0 or y >= ginDim:
        next_state = giSt.tolist()

    return next_state, reward

def TD0_travel(ginDim, policy_agent, gvPolicy, initial_state=None, initial_action=None):
    
    if initial_state is None:
        # generate a random initial state
        state_i, state_j = get_initial_state(ginDim)
    else:
        state_i, state_j = initial_state
    
    # initial state
    state = (state_i, state_j)
 
    states_and_rewards = [(state, 0)]
    while is_terminal(state, ginDim) == False:
         # get new action
         i_action, action, prob = policy_agent(state, gvPolicy)
         state, reward = step(state, action, ginDim)
         states_and_rewards.append((state,reward))

    return states_and_rewards




def tabular_TD0(gvPolicy, episodes=100, ginDim=4):
    '''
    implement Tabular TD(0) algorithm for estimating v_{\pi} --- i.e. policy prediction/evaluation algorithm
    '''
    
    state_values = np.zeros((ginDim, ginDim))

    V = {}
    for i in range(ginDim):
      for j in range(ginDim):
            V[(i,j)] = 0

    initial_state = get_initial_state(ginDim)

    # travel for several episodes
    for episode in tqdm(range(0, episodes)):
        
        #generate episode
        states_and_rewards = TD0_travel(ginDim, target_policy_agent, gvPolicy, initial_state)
        for t in range(len(states_and_rewards) - 1):
            s, _ = states_and_rewards[t]
            s2, r = states_and_rewards[t+1]
            #update V[s] as we experience it
            x,y = s
            x2,y2 = s2
            V[(x,y)] = V[(x,y)] + ALPHA * (r + GAMMA*V[(x2,y2)] - V[(x,y)])

    for i in range(ginDim):
      for j in range(ginDim):
        state_values[(i,j)] = V[(i,j)]

    return episodes, np.around(state_values,1)



def max_dict(d):
    '''
    :param d: dictionary
    :return: returns argmax (key) and max (value)
    '''
    max_key = None
    max_val = float('-inf')
    for k,v in d.items():
        if v > max_val:
            max_val = v
            max_key = k
    return max_key, max_val


def q_learning_algo(episodes=100, ginDim=4):
    '''
    implement Q-Learning algorithm
    '''
    state_values = np.zeros((ginDim, ginDim))
    best_actions = np.zeros((ginDim, ginDim))

    gvPolicy = [0.25 ,0.25 ,0.25 ,0.25]

    #initialize Q
    Q = {}
    for i in range(ginDim):
        for j in range(ginDim):
            Q[(i,j)] = {}
            for i_action, action in enumerate(v_actions): 
                Q[(i,j)][i_action] = 0
    
    initial_state = get_initial_state(ginDim)

    # travel for several episodes
    for episode in tqdm(range(0, episodes)):

        #generate episode
        s = initial_state
        i_a, a, prob = target_policy_agent(s, gvPolicy)

        while is_terminal(s, ginDim) == False:
            # get next step
            s2, r = step(s, a, ginDim)

            #we will update Q(s,a) AS we experience the episode
            x,y = s
            old_qsa = Q[(x,y)][i_a]

            #Q learning will use max[a'] Q(s',a') in our update
            x2,y2 = s2
            i_a2, max_q_s2a2 = max_dict(Q[(x2,y2)])
            Q[(x,y)][i_a] = Q[(x,y)][i_a] + ALPHA*(r + GAMMA*max_q_s2a2 - Q[(x,y)][i_a])

            #next state becomes current state
            s = s2
            i_a = i_a2
            a = v_actions[i_a2]
    
    # determine policy from Q* and find V* from Q*
    policy = {}
    V = {}
    for i in range(ginDim):
      for j in range(ginDim):    
        s = (i,j)
        i_a, max_q = max_dict(Q[s])
        # way 1
        policy[s] = i_a
        V[s] = max_q
        #way 2
        best_actions[s] = i_a
        state_values[s] = max_q

    return episodes, np.around(state_values,1), best_actions


In [2]:
import math
import matplotlib.pyplot as plt
from matplotlib.table import Table

def draw_policy(gdicPolicy, gsFigName):
    '''
    gdicPolicy: dictionary of values with cell coordinates as keys, and taken actions at the cells as values 
    (0, 0) [0 1 2 3]
    (0, 1) [0 1 2 3]
    ...

    gsFigName: output file name for visualization (e.g. gridworld_opt_policy_VI.png)
    '''

    # left, up, right, down
    ACTIONS = [np.array([0, -1]),
            np.array([-1, 0]),
            np.array([0, 1]),
            np.array([1, 0])]

    ACTIONS_FIGS=[ '←', '↑', '→', '↓']

    fig, ax = plt.subplots()
    ax.set_axis_off()
    tb = Table(ax, bbox=[0, 0, 1, 1])


    dic_policy = dict(np.ndenumerate(gdicPolicy))

    nrows, ncols = int(math.sqrt(len(dic_policy))), int(math.sqrt(len(dic_policy)))
    width, height = 1.0 / ncols, 1.0 / nrows

    # Add cells
    # for (i, j), val in np.ndenumerate(gvOptValues):
    for cell, pol in dic_policy.items():
        
        #val=''
        #for ba in pol:
        #    val+=ACTIONS_FIGS[int(ba)]
        
        val = ACTIONS_FIGS[int(pol)]

        i = cell[0]
        j = cell[1]
        
      
        tb.add_cell(i, j, width, height, text=val,
                loc='center', facecolor='white')

    # Row and column labels...
    for i in range(int(math.sqrt(len(dic_policy)))):
        tb.add_cell(i, -1, width, height, text=i+1, loc='right',
                    edgecolor='none', facecolor='none')
        tb.add_cell(-1, i, width, height/2, text=i+1, loc='center',
                   edgecolor='none', facecolor='none')

    ax.add_table(tb)

    plt.savefig(gsFigName)
    plt.close()

In [17]:
iterCt, policy_values = tabular_TD0([0.25 ,0.25 ,0.25 ,0.25], 
                                    episodes=10000, ginDim=4)

100%|██████████| 10000/10000 [00:06<00:00, 1564.29it/s]


In [18]:
iterCt

10000

In [19]:
policy_values #[10000]

array([[ 0. , -4.6, -7.5, -8.4],
       [-4.4, -6. , -7.2, -7.3],
       [-7. , -7.2, -7. , -5.2],
       [-7.6, -6.9, -5. ,  0. ]])

In [16]:
policy_values #[1000]

array([[ 0. , -4.6, -6.5, -8. ],
       [-4.8, -6.5, -7. , -7.3],
       [-7. , -6.9, -6.7, -5.8],
       [-7.5, -7.1, -5.8,  0. ]])

In [6]:
iterCt, policy_values, policy = q_learning_algo(episodes=1000, ginDim=4)

100%|██████████| 1000/1000 [00:00<00:00, 17439.86it/s]


In [7]:
iterCt

1000

In [8]:
policy_values

array([[ 0. , -1. , -1.9, -2. ],
       [-1. , -1.9, -2.7, -1.9],
       [-1.9, -2.7, -1.9, -1. ],
       [-2. , -1.9, -1. ,  0. ]])

In [None]:
policy

array([[0., 0., 3., 0.],
       [1., 1., 2., 1.],
       [1., 3., 0., 3.],
       [3., 0., 2., 0.]])

In [None]:
s_fig_name = "policy_TD_q_learning_algo_v1.png"
draw_policy(policy, s_fig_name)